【原標題:微服務(wù)進階避坑策略 微服務(wù)異步架構(gòu)MQ之RocketMQ方案】“我們大家都知道把一個微服務(wù)架構(gòu)變成一個異步架構(gòu)只需要加一個MQ,現(xiàn)在市面上有很多MQ的開源框架。到底選擇哪一個MQ的開源框架才合適呢?”
一、什么是MQ?MQ的原理是什么?
MQ就是消息隊列,是Message Queue的縮寫。消息隊列是一種通信方式。消息的本質(zhì)就是一種數(shù)據(jù)結(jié)構(gòu)。因為MQ把項目中的消息集中式的處理和存儲,所以MQ主要有解耦,并發(fā),和削峰的功能。
1,解耦:
MQ的消息生產(chǎn)者和消費者互相不關(guān)心對方是否存在,通過MQ這個中間件的存在,使整個系統(tǒng)達到解耦的作用。
如果服務(wù)之間用RPC通信,當(dāng)一個服務(wù)跟幾百個服務(wù)通信時,如果那個服務(wù)的通信接口改變,那么幾百個服務(wù)的通信接口都的跟著變動,這是非常頭疼的一件事。
但是采用MQ之后,不管是生產(chǎn)者或者消費者都可以單獨改變自己。他們的改變不會影響到別的服務(wù)。從而達到解耦的目的。為什么要解耦呢?說白了就是方便,減少不必要的工作量。
2,并發(fā)
MQ有生產(chǎn)者集群和消費者集群,所以客戶端是億級用戶時,他們都是并行的。從而大大提升響應(yīng)速度。
3,削峰
因為MQ能存儲的消息量很大,所以他可以把大量的消息請求先存下了,然后再并發(fā)的方式慢慢處理。
如果采用RPC通信,每一次請求用調(diào)用RPC接口,當(dāng)請求量巨大的時候,因為RPC的請求是很耗資源的,所以巨大的請求一定會壓垮服務(wù)器。
削峰的目的是用戶體驗變好,并且使整個系統(tǒng)穩(wěn)定。能承受大量請求消息。
二、現(xiàn)在市面上有什么MQ,
重點介紹RocketMQ
現(xiàn)在市面上的MQ有很多,主要有RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ,Kafka等等,這些都是開源的MQ產(chǎn)品。以前很多人推薦使用RabbitMQ,他也是非常好用的MQ產(chǎn)品,這里不做過多的介紹。Kafka也是高吞吐量的老大,我們這里也不介紹。
我們重點介紹一下RocketMQ,RocketMQ是阿里巴巴在2012年開源的分布式消息中間件,目前已經(jīng)捐贈給Apache軟件基金會,并于并于2017年9月25日成為 Apache 的頂級項目。
作為經(jīng)歷過多次阿里巴巴雙十一這種“超級工程”的洗禮并有穩(wěn)定出色表現(xiàn)的國產(chǎn)中間件,以其高性能、低延時和高可靠等特性近年來已經(jīng)也被越來越多的國內(nèi)企業(yè)使用。
功能概覽圖
可以看見RocketMQ支持定時和延時消息,這是RabbitMQ所沒有的能力。
RocketMQ的物理結(jié)構(gòu)
從這里可以看出,RocketMQ涉及到四大集群,producer,Name Server,Consumer,Broker。
Producer集群:
是生產(chǎn)者集群,負責(zé)產(chǎn)生消息,向消費者發(fā)送由業(yè)務(wù)應(yīng)用程序系統(tǒng)生成的消息,RocketMQ提供三種方式發(fā)送消息:同步,異步,單向。
一,普通消息
1,同步原理圖
同步消息關(guān)鍵代碼
try { SendResult sendResult = producer.send(msg); // 同步發(fā)送消息,只要不拋異常就是成功 if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } catch (Exception e) { System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); }}
2,異步原理圖
異步消息關(guān)鍵代碼
producer.sendAsync(msg, new SendCallback() {@Overridepublic void onSuccess(final SendResult sendResult) { // 消費發(fā)送成功 System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); }@Overridepublic void onException(OnExceptionContext context) { System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());}});
3,單向(Oneway)發(fā)送原理圖
單向只發(fā)送,不等待返回,所以速度最快,一般在微秒級,但可能丟失
單向(Oneway)發(fā)送消息關(guān)鍵代碼
producer.sendOneway(msg);
三種發(fā)送消息具體代碼請參考文檔:https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.6.566.7e49793fuueSlB[1]
二,定時消息和延時消息
發(fā)送定時消息關(guān)鍵代碼
try { // 定時消息,單位毫秒(ms),在指定時間戳(當(dāng)前時間之后)進行投遞,例如 2016-03-07 16:21:00 投遞。如果被設(shè)置成當(dāng)前時間戳之前的某個時刻,消息將立刻投遞給消費者。 long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime(); msg.setStartDeliverTime(timeStamp); // 發(fā)送消息,只要不拋異常就是成功 SendResult sendResult = producer.send(msg); System.out.println("MessageId:"+sendResult.getMessageId());}catch (Exception e) { // 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); }
發(fā)送延時消息關(guān)鍵代碼
try { // 延時消息,單位毫秒(ms),在指定延遲時間(當(dāng)前時間之后)進行投遞,例如消息在 3 秒后投遞 long delayTime = System.currentTimeMillis() + 3000; // 設(shè)置消息需要被投遞的時間 msg.setStartDeliverTime(delayTime); SendResult sendResult = producer.send(msg); // 同步發(fā)送消息,只要不拋異常就是成功 if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); }} catch (Exception e) { // 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); }
注意事項
1,定時和延時消息的 msg.setStartDeliverTime 參數(shù)需要設(shè)置成當(dāng)前時間戳之后的某個時刻(單位毫秒)。如果被設(shè)置成當(dāng)前時間戳之前的某個時刻,消息將立刻投遞給消費者。
2,定時和延時消息的 msg.setStartDeliverTime 參數(shù)可設(shè)置40天內(nèi)的任何時刻(單位毫秒),超過40天消息發(fā)送將失敗。
3,StartDeliverTime 是服務(wù)端開始向消費端投遞的時間。 如果消費者當(dāng)前有消息堆積,那么定時和延時消息會排在堆積消息后面,將不能嚴格按照配置的時間進行投遞。
4,由于客戶端和服務(wù)端可能存在時間差,消息的實際投遞時間與客戶端設(shè)置的投遞時間之間可能存在偏差。
5,設(shè)置定時和延時消息的投遞時間后,依然受 3 天的消息保存時長限制。例如,設(shè)置定時消息 5 天后才能被消費,如果第 5 天后一直沒被消費,那么這條消息將在第8天被刪除。
6,除 Java 語言支持延時消息外,其他語言都不支持延時消息。
發(fā)布消息原理圖
三,事務(wù)消息
RocketMQ提供類似X/Open XA的分布式事務(wù)功能來確保業(yè)務(wù)發(fā)送方和MQ消息的最終一致性,其本質(zhì)是通過半消息的方式把分布式事務(wù)放在MQ端來處理。
原理圖
其中:
1,發(fā)送方向消息隊列 RocketMQ 服務(wù)端發(fā)送消息。
2,服務(wù)端將消息持久化成功之后,向發(fā)送方 ACK 確認消息已經(jīng)發(fā)送成功,此時消息為半消息。
3,發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
4,發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(Commit 或是 Rollback),服務(wù)端收到 Commit 狀態(tài)則將半消息標記為可投遞,訂閱方最終將收到該消息;服務(wù)端收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會接受該消息。
5,在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務(wù)端,經(jīng)過固定時間后服務(wù)端將對該消息發(fā)起消息回查。
6,發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
7,發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認,服務(wù)端仍按照步驟 4 對半消息進行操作。
RocketMQ的半消息機制的注意事項是
1,根據(jù)第六步可以看出他要求發(fā)送方提供業(yè)務(wù)回查接口。
2,不能保證發(fā)送方的消息冪等,在ack沒有返回的情況下,可能存在重復(fù)消息
3,消費方要做冪等處理。
核心代碼
final BusinessService businessService = new BusinessService(); // 本地業(yè)務(wù)
TransactionProducer producer = ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl());producer.start();Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());try { SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message msg, Object arg) { // 消息 ID(有可能消息體一樣,但消息 ID 不一樣,當(dāng)前消息 ID 在控制臺無法查詢) String msgId = msg.getMsgID(); // 消息體內(nèi)容進行 crc32,也可以使用其它的如 MD5 long crc32Id = HashUtil.crc32Code(msg.getBody()); // 消息 ID 和 crc32id 主要是用來防止消息重復(fù) // 如果業(yè)務(wù)本身是冪等的,可以忽略,否則需要利用 msgId 或 crc32Id 來做冪等 // 如果要求消息絕對不重復(fù),推薦做法是對消息體 body 使用 crc32 或 MD5 來防止重復(fù)消息 Object businessServiceArgs = new Object(); TransactionStatus transactionStatus =TransactionStatus.Unknow; try { boolean isCommit = businessService.execbusinessService(businessServiceArgs); if (isCommit) { // 本地事務(wù)成功則提交消息 transactionStatus = TransactionStatus.CommitTransaction; } else { // 本地事務(wù)失敗則回滾消息 transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) {log.error("Message Id:{}", msgId, e); } System.out.println(msg.getMsgID());log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name()); return transactionStatus; } }, null); }catch (Exception e) { // 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace();}
具體代碼參考文檔:https://help.aliyun.com/document_detail/29548.html?spm=a2c4g.11186623.6.570.5d5738a49FJl1t[2]
所有消息發(fā)布原理圖
producer完全無狀態(tài),可以集群部署。
Name Server集群:
NameServer是一個幾乎無狀態(tài)的節(jié)點,可集群部署,節(jié)點之間無任何信息同步,NameServer很像注冊中心的功能。
聽說阿里之前的NameServer 是用ZooKeeper做的,可能因為Zookeeper不能滿足大規(guī)模并發(fā)的要求,所以之后NameServer 是阿里自研的。
NameServer其實就是一個路由表,他管理Producer和Comsumer之間的發(fā)現(xiàn)和注冊。
Broker集群:
Broker部署相對復(fù)雜,Broker分為Master與Slave,一個Master可以對應(yīng)多個Slaver,但是一個Slaver只能對應(yīng)一個Master,Master與Slaver的對應(yīng)關(guān)系通過指定相同的BrokerName。
不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slaver。Master可以部署多個。每個Broker與NameServer集群中的所有節(jié)點建立長連接,定時注冊Topic信息到所有的NameServer。
Consumer集群:
訂閱方式
消息隊列 RocketMQ 支持以下兩種訂閱方式:
集群訂閱:同一個 Group ID 所標識的所有 Consumer 平均分攤消費消息。 例如某個 Topic 有 9 條消息,一個 Group ID 有 3 個 Consumer 實例,那么在集群消費模式下每個實例平均分攤,只消費其中的 3 條消息。
// 集群訂閱方式設(shè)置(不設(shè)置的情況下,默認為集群訂閱方式)properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
廣播訂閱:同一個 Group ID 所標識的所有 Consumer 都會各自消費某條消息一次。 例如某個 Topic 有 9 條消息,一個 Group ID 有 3 個 Consumer 實例,那么在廣播消費模式下每個實例都會各自消費 9 條消息。
// 廣播訂閱方式設(shè)置properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BR
OADCASTING);
訂閱消息關(guān)鍵代碼:
Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe("TopicTestMQ", "TagA||TagB", **new** MessageListener() { //訂閱多個 Tagpublic Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage;}});//訂閱另外一個 Topicconsumer.subscribe("TopicTestMQ-Other", "*", **new** MessageListener() { //訂閱全部 Tagpublic Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage;}});consumer.start();