基于消息的系統(tǒng)模型,不一定需要broker(消息隊列服務(wù)端)。市面上的的Akka(actor模型)、ZeroMQ等,其實都是基于消息的系統(tǒng)設(shè)計范式,但是沒有broker。
我們之所以要設(shè)計一個消息隊列,并且配備broker,無外乎要做兩件事情:
1、消息的轉(zhuǎn)儲,在更合適的時間點投遞,或者通過一系列手段輔助消息最終能送達消費機。
2、規(guī)范一種范式和通用的模式,以滿足解耦、最終一致性、錯峰等需求。掰開了揉碎了看,最簡單的消息隊列可以做成一個消息轉(zhuǎn)發(fā)器,把一次RPC做成兩次RPC。發(fā)送者把消息投遞到服務(wù)端(以下簡稱broker),服務(wù)端再將消息轉(zhuǎn)發(fā)一手到接收端,就是這么簡單。
一般來講,設(shè)計消息隊列的整體思路是先build一個整體的數(shù)據(jù)流,例如producer發(fā)送給broker,broker發(fā)送給consumer,consumer回復(fù)消費確認,broker刪除/備份消息等。
利用RPC將數(shù)據(jù)流串起來。然后考慮RPC的高可用性,盡量做到無狀態(tài),方便水平擴展。之后考慮如何承載消息堆積,然后在合適的時機投遞消息,而處理堆積的最佳方式,就是存儲,存儲的選型需要綜合考慮性能/可靠性和開發(fā)維護成本等諸多因素。
為了實現(xiàn)廣播功能,我們必須要維護消費關(guān)系,可以利用zk/config server等保存消費關(guān)系。
在完成了上述幾個功能后,消息隊列基本就實現(xiàn)了。然后我們可以考慮一些高級特性,如可靠投遞,事務(wù)特性,性能優(yōu)化等。
下面我們會以設(shè)計消息隊列時重點考慮的模塊為主線,穿插灌輸一些消息隊列的特性實現(xiàn)方法,來具體分析設(shè)計實現(xiàn)一個消息隊列時的方方面面。
實現(xiàn)隊列基本功能
RPC通信協(xié)議
剛才講到,所謂消息隊列,無外乎兩次RPC加一次轉(zhuǎn)儲,當(dāng)然需要消費端最終做消費確認的情況是三次RPC。既然是RPC,就必然牽扯出一系列話題,什么負載均衡啊、服務(wù)發(fā)現(xiàn)啊、通信協(xié)議啊、序列化協(xié)議啊,等等。在這一塊,我的強烈建議是不要重復(fù)造輪子。利用公司現(xiàn)有的RPC框架:Thrift也好,Dubbo也好,或者是其他自定義的框架也好。因為消息隊列的RPC,和普通的RPC沒有本質(zhì)區(qū)別。當(dāng)然了,自主利用Memchached或者Redis協(xié)議重新寫一套RPC框架并非不可(如MetaQ使用了自己封裝的Gecko NIO框架,卡夫卡也用了類似的協(xié)議)。但實現(xiàn)成本和難度無疑倍增。排除對效率的極端要求,都可以使用現(xiàn)成的RPC框架。
簡單來講,服務(wù)端提供兩個RPC服務(wù),一個用來接收消息,一個用來確認消息收到。并且做到不管哪個server收到消息和確認消息,結(jié)果一致即可。當(dāng)然這中間可能還涉及跨IDC的服務(wù)的問題。這里和RPC的原則是一致的,盡量優(yōu)先選擇本機房投遞。你可能會問,如果producer和consumer本身就在兩個機房了,怎么辦?首先,broker必須保證感知的到所有consumer的存在。其次,producer盡量選擇就近的機房就好了。
高可用
其實所有的高可用,是依賴于RPC和存儲的高可用來做的。先來看RPC的高可用,美團的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服務(wù)自動發(fā)現(xiàn),負載均衡等功能。而消息隊列的高可用,只要保證broker接受消息和確認消息的接口是冪等的,并且consumer的幾臺機器處理消息是冪等的,這樣就把消息隊列的可用性,轉(zhuǎn)交給RPC框架來處理了。
那么怎么保證冪等呢?最簡單的方式莫過于共享存儲。broker多機器共享一個DB或者一個分布式文件/kv系統(tǒng),則處理消息自然是冪等的。就算有單點故障,其他節(jié)點可以立刻頂上。另外failover可以依賴定時任務(wù)的補償,這是消息隊列本身天然就可以支持的功能。存儲系統(tǒng)本身的可用性我們不需要操太多心,放心大膽的交給DBA們吧!
對于不共享存儲的隊列,如Kafka使用分區(qū)加主備模式,就略微麻煩一些。需要保證每一個分區(qū)內(nèi)的高可用性,也就是每一個分區(qū)至少要有一個主備且需要做數(shù)據(jù)的同步,關(guān)于這塊HA的細節(jié),可以參考下篇pull模型消息系統(tǒng)設(shè)計。
服務(wù)端承載消息堆積的能力
消息到達服務(wù)端如果不經(jīng)過任何處理就到接收者了,broker就失去了它的意義。為了滿足我們錯峰/流控/最終可達等一系列需求,把消息存儲下來,然后選擇時機投遞就顯得是順理成章的了。
只是這個存儲可以做成很多方式。比如存儲在內(nèi)存里,存儲在分布式KV里,存儲在磁盤里,存儲在數(shù)據(jù)庫里等等。但歸結(jié)起來,主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),并且理論上能承載更大限度的消息堆積(外存的空間遠大于內(nèi)存)。
但并不是每種消息都需要持久化存儲。很多消息對于投遞性能的要求大于可靠性的要求,且數(shù)量極大(如日志)。這時候,消息不落地直接暫存內(nèi)存,嘗試幾次failover,最終投遞出去也未嘗不可。
市面上的消息隊列普遍兩種形式都支持。當(dāng)然具體的場景還要具體結(jié)合公司的業(yè)務(wù)來看。
存儲子系統(tǒng)的選擇
我們來看看如果需要數(shù)據(jù)落地的情況下各種存儲子系統(tǒng)的選擇。理論上,從速度來看,文件系統(tǒng)>分布式KV(持久化)>分布式文件系統(tǒng)>數(shù)據(jù)庫,而可靠性卻截然相反。還是要從支持的業(yè)務(wù)場景出發(fā)作出最合理的選擇,如果你們的消息隊列是用來支持支付/交易等對可靠性要求非常高,但對性能和量的要求沒有這么高,而且沒有時間精力專門做文件存儲系統(tǒng)的研究,DB是最好的選擇。
但是DB受制于IOPS,如果要求單broker 5位數(shù)以上的QPS性能,基于文件的存儲是比較好的解決方案。整體上可以采用數(shù)據(jù)文件+索引文件的方式處理,具體這塊的設(shè)計比較復(fù)雜,可以參考下篇的存儲子系統(tǒng)設(shè)計。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其編程接口較友好,性能也比較可觀,如果在可靠性要求不是那么高的場景,也不失為一個不錯的選擇。
消費關(guān)系解析
現(xiàn)在我們的消息隊列初步具備了轉(zhuǎn)儲消息的能力。下面一個重要的事情就是解析發(fā)送接收關(guān)系,進行正確的消息投遞了。
市面上的消息隊列定義了一堆讓人暈頭轉(zhuǎn)向的名詞,如JMS 規(guī)范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。拋開現(xiàn)象看本質(zhì),無外乎是單播與廣播的區(qū)別。所謂單播,就是點到點;而廣播,是一點對多點。當(dāng)然,對于互聯(lián)網(wǎng)的大部分應(yīng)用來說,組間廣播、組內(nèi)單播是最常見的情形。
消息需要通知到多個業(yè)務(wù)集群,而一個業(yè)務(wù)集群內(nèi)有很多臺機器,只要一臺機器消費這個消息就可以了。
當(dāng)然這不是絕對的,很多時候組內(nèi)的廣播也是有適用場景的,如本地緩存的更新等等。另外,消費關(guān)系除了組內(nèi)組間,可能會有多級樹狀關(guān)系。這種情況太過于復(fù)雜,一般不列入考慮范圍。所以,一般比較通用的設(shè)計是支持組間廣播,不同的組注冊不同的訂閱。組內(nèi)的不同機器,如果注冊一個相同的ID,則單播;如果注冊不同的ID(如IP地址+端口),則廣播。
至于廣播關(guān)系的維護,一般由于消息隊列本身都是集群,所以都維護在公共存儲上,如config server、zookeeper等。維護廣播關(guān)系所要做的事情基本是一致的:
1、發(fā)送關(guān)系的維護。2、發(fā)送關(guān)系變更時的通知。
隊列高級特性設(shè)計
上面都是些消息隊列基本功能的實現(xiàn),下面來看一些關(guān)于消息隊列特性相關(guān)的內(nèi)容,不管可靠投遞/消息丟失與重復(fù)以及事務(wù)乃至于性能,不是每個消息隊列都會照顧到,所以要依照業(yè)務(wù)的需求,來仔細衡量各種特性實現(xiàn)的成本,利弊,最終做出最為合理的設(shè)計。
可靠投遞(最終一致性)
這是個激動人心的話題,完全不丟消息,究竟可不可能?答案是,完全可能,前提是消息可能會重復(fù),并且,在異常情況下,要接受消息的延遲。
方案說簡單也簡單,就是每當(dāng)要發(fā)生不可靠的事情(RPC等)之前,先將消息落地,然后發(fā)送。當(dāng)失敗或者不知道成功失?。ū热绯瑫r)時,消息狀態(tài)是待發(fā)送,定時任務(wù)不停輪詢所有待發(fā)送消息,最終一定可以送達。
具體來說:
1、producer往broker發(fā)送消息之前,需要做一次落地。2、請求到server后,server確保數(shù)據(jù)落地后再告訴客戶端發(fā)送成功。3、支持廣播的消息隊列需要對每個待發(fā)送的endpoint,持久化一個發(fā)送狀態(tài),直到所有endpoint狀態(tài)都OK才可刪除消息。
對于各種不確定(超時、down機、消息沒有送達、送達后數(shù)據(jù)沒落地、數(shù)據(jù)落地了回復(fù)沒收到),其實對于發(fā)送方來說,都是一件事情,就是消息沒有送達。
重推消息所面臨的問題就是消息重復(fù)。重復(fù)和丟失就像兩個噩夢,你必須要面對一個。好在消息重復(fù)還有處理的機會,消息丟失再想找回就難了。
Anyway,作為一個成熟的消息隊列,應(yīng)該盡量在各個環(huán)節(jié)減少重復(fù)投遞的可能性,不能因為重復(fù)有解決方案就放縱的亂投遞。
最后說一句,不是所有的系統(tǒng)都要求最終一致性或者可靠投遞,比如一個論壇系統(tǒng)、一個招聘系統(tǒng)。一個重復(fù)的簡歷或話題被發(fā)布,可能比丟失了一個發(fā)布顯得更讓用戶無法接受。不斷重復(fù)一句話,任何基礎(chǔ)組件要服務(wù)于業(yè)務(wù)場景。
消費確認
當(dāng)broker把消息投遞給消費者后,消費者可以立即響應(yīng)我收到了這個消息。但收到了這個消息只是第一步,我能不能處理這個消息卻不一定?;蛟S因為消費能力的問題,系統(tǒng)的負荷已經(jīng)不能處理這個消息;或者是剛才狀態(tài)機里面提到的消息不是我想要接收的消息,主動要求重發(fā)。
把消息的送達和消息的處理分開,這樣才真正的實現(xiàn)了消息隊列的本質(zhì)-解耦。所以,允許消費者主動進行消費確認是必要的。當(dāng)然,對于沒有特殊邏輯的消息,默認Auto Ack也是可以的,但一定要允許消費方主動ack。
對于正確消費ack的,沒什么特殊的。但是對于reject和error,需要特別說明。
reject這件事情,往往業(yè)務(wù)方是無法感知到的,系統(tǒng)的流量和健康狀況的評估,以及處理能力的評估是一件非常復(fù)雜的事情。舉個極端的例子,收到一個消息開始build索引,可能這個消息要處理半個小時,但消息量卻是非常的小。所以reject這塊建議做成滑動窗口/線程池類似的模型來控制,消費能力不匹配的時候,直接拒絕,過一段時間重發(fā),減少業(yè)務(wù)的負擔(dān)。
但業(yè)務(wù)出錯這件事情是只有業(yè)務(wù)方自己知道的,就像上文提到的狀態(tài)機等等。這時應(yīng)該允許業(yè)務(wù)方主動ack error,并可以與broker約定下次投遞的時間。
重復(fù)消息和順序消息
上文談到重復(fù)消息是不可能100%避免的,除非可以允許丟失,那么,順序消息能否100%滿足呢? 答案是可以,但條件更為苛刻:1、允許消息丟失。2、從發(fā)送方到服務(wù)方到接受者都是單點單線程。
所以絕對的順序消息基本上是不能實現(xiàn)的,當(dāng)然在METAQ/Kafka等pull模型的消息隊列中,單線程生產(chǎn)/消費,排除消息丟失,也是一種順序消息的解決方案。
一般來講,一個主流消息隊列的設(shè)計范式里,應(yīng)該是不丟消息的前提下,盡量減少重復(fù)消息,不保證消息的投遞順序。
談到重復(fù)消息,主要是兩個話題:1、如何鑒別消息重復(fù),并冪等的處理重復(fù)消息。2、一個消息隊列如何盡量減少重復(fù)消息的投遞。
先來看看第一個話題,每一個消息應(yīng)該有它的唯一身份。不管是業(yè)務(wù)方自定義的,還是根據(jù)IP/PID/時間戳生成的MessageId,如果有地方記錄這個MessageId,消息到來是能夠進行比對就能完成重復(fù)的鑒定。數(shù)據(jù)庫的唯一鍵/bloom filter/分布式KV中的key,都是不錯的選擇。由于消息不能被永久存儲,所以理論上都存在消息從持久化存儲移除的瞬間上游還在投遞的可能(上游因種種原因投遞失敗,不停重試,都到了下游清理消息的時間)。這種事情都是異常情況下才會發(fā)生的,畢竟是小眾情況。兩分鐘消息都還沒送達,多送一次又能怎樣呢?冪等的處理消息是一門藝術(shù),因為種種原因重復(fù)消息或者錯亂的消息還是來到了,說兩種通用的解決方案:
版本號
舉個簡單的例子,一個產(chǎn)品的狀態(tài)有上線/下線狀態(tài)。如果消息1是下線,消息2是上線。不巧消息1判重失敗,被投遞了兩次,且第二次發(fā)生在2之后,如果不做重復(fù)性判斷,顯然最終狀態(tài)是錯誤的。
但是,如果每個消息自帶一個版本號。上游發(fā)送的時候,標(biāo)記消息1版本號是1,消息2版本號是2。如果再發(fā)送下線消息,則版本號標(biāo)記為3。下游對于每次消息的處理,同時維護一個版本號。
每次只接受比當(dāng)前版本號大的消息。初始版本為0,當(dāng)消息1到達時,將版本號更新為1。消息2到來時,因為版本號>1.可以接收,同時更新版本號為2.當(dāng)另一條下線消息到來時,如果版本號是3.則是真實的下線消息。如果是1,則是重復(fù)投遞的消息。
如果業(yè)務(wù)方只關(guān)心消息重復(fù)不重復(fù),那么問題就已經(jīng)解決了。但很多時候另一個頭疼的問題來了,就是消息順序如果和想象的順序不一致。比如應(yīng)該的順序是12,到來的順序是21。則最后會發(fā)生狀態(tài)錯誤。
參考TCP/IP協(xié)議,如果想讓亂序的消息最后能夠正確的被組織,那么就應(yīng)該只接收比當(dāng)前版本號大一的消息。并且在一個session周期內(nèi)要一直保存各個消息的版本號。
如果到來的順序是21,則先把2存起來,待1到來后,先處理1,再處理2,這樣重復(fù)性和順序性要求就都達到了。
狀態(tài)機
基于版本號來處理重復(fù)和順序消息聽起來是個不錯的主意,但凡事總有瑕疵。使用版本號的最大問題是:對發(fā)送方必須要求消息帶業(yè)務(wù)版本號;下游必須存儲消息的版本號,對于要嚴(yán)格保證順序的。
還不能只存儲最新的版本號的消息,要把亂序到來的消息都存儲起來。而且必須要對此做出處理。試想一個永不過期的"session",比如一個物品的狀態(tài),會不停流轉(zhuǎn)于上下線。那么中間環(huán)節(jié)的所有存儲就必須保留,直到在某個版本號之前的版本一個不丟的到來,成本太高。
就剛才的場景看,如果消息沒有版本號,該怎么解決呢?業(yè)務(wù)方只需要自己維護一個狀態(tài)機,定義各種狀態(tài)的流轉(zhuǎn)關(guān)系。例如,"下線"狀態(tài)只允許接收"上線"消息,“上線”狀態(tài)只能接收“下線消息”,如果上線收到上線消息,或者下線收到下線消息,在消息不丟失和上游業(yè)務(wù)正確的前提下。要么是消息發(fā)重了,要么是順序到達反了。這時消費者只需要把“我不能處理這個消息”告訴投遞者,要求投遞者過一段時間重發(fā)即可。而且重發(fā)一定要有次數(shù)限制,比如5次,避免死循環(huán),就解決了。
舉例子說明,假設(shè)產(chǎn)品本身狀態(tài)是下線,1是上線消息,2是下線消息,3是上線消息,正常情況下,消息應(yīng)該的到來順序是123,但實際情況下收到的消息狀態(tài)變成了3123。
那么下游收到3消息的時候,判斷狀態(tài)機流轉(zhuǎn)是下線->上線,可以接收消息。然后收到消息1,發(fā)現(xiàn)是上線->上線,拒絕接收,要求重發(fā)。然后收到消息2,狀態(tài)是上線->下線,于是接收這個消息。
此時無論重發(fā)的消息1或者3到來,還是可以接收。另外的重發(fā),在一定次數(shù)拒絕后停止重發(fā),業(yè)務(wù)正確。
中間件對于重復(fù)消息的處理
回歸到消息隊列的話題來講。上述通用的版本號/狀態(tài)機/ID判重解決方案里,哪些是消息隊列該做的、哪些是消息隊列不該做業(yè)務(wù)方處理的呢?其實這里沒有一個完全嚴(yán)格的定義,但回到我們的出發(fā)點,我們保證不丟失消息的情況下盡量少重復(fù)消息,消費順序不保證。那么重復(fù)消息下和亂序消息下業(yè)務(wù)的正確,應(yīng)該是由消費方保證的,我們要做的是減少消息發(fā)送的重復(fù)。
我們無法定義業(yè)務(wù)方的業(yè)務(wù)版本號/狀態(tài)機,如果API里強制需要指定版本號,則顯得過于綁架客戶了。況且,在消費方維護這么多狀態(tài),就涉及到一個消費方的消息落地/多機間的同步消費狀態(tài)問題,復(fù)雜度指數(shù)級上升,而且只能解決部分問題。
減少重復(fù)消息的關(guān)鍵步驟:
1、broker記錄MessageId,直到投遞成功后清除,重復(fù)的ID到來不做處理,這樣只要發(fā)送者在清除周期內(nèi)能夠感知到消息投遞成功,就基本不會在server端產(chǎn)生重復(fù)消息。
2、對于server投遞到consumer的消息,由于不確定對端是在處理過程中還是消息發(fā)送丟失的情況下,有必要記錄下投遞的IP地址。決定重發(fā)之前詢問這個IP,消息處理成功了嗎?如果詢問無果,再重發(fā)。
事務(wù)
持久性是事務(wù)的一個特性,然而只滿足持久性卻不一定能滿足事務(wù)的特性。還是拿扣錢/加錢的例子講。滿足事務(wù)的一致性特征,則必須要么都不進行,要么都能成功。
解決方案從大方向上有兩種:1、兩階段提交,分布式事務(wù)。2、本地事務(wù),本地落地,補償發(fā)送。
分布式事務(wù)存在的最大問題是成本太高,兩階段提交協(xié)議,對于仲裁down機或者單點故障,幾乎是一個無解的黑洞。對于交易密集型或者I/O密集型的應(yīng)用,沒有辦法承受這么高的網(wǎng)絡(luò)延遲,系統(tǒng)復(fù)雜性。
并且成熟的分布式事務(wù)一定構(gòu)建與比較靠譜的商用DB和商用中間件上,成本也太高。
那如何使用本地事務(wù)解決分布式事務(wù)的問題呢?以本地和業(yè)務(wù)在一個數(shù)據(jù)庫實例中建表為例子,與扣錢的業(yè)務(wù)操作同一個事務(wù)里,將消息插入本地數(shù)據(jù)庫。如果消息入庫失敗,則業(yè)務(wù)回滾;如果消息入庫成功,事務(wù)提交。
然后發(fā)送消息(注意這里可以實時發(fā)送,不需要等定時任務(wù)檢出,以提高消息實時性)。以后的問題就是前文的最終一致性問題所提到的了,只要消息沒有發(fā)送成功,就一直靠定時任務(wù)重試。
這里有一個關(guān)鍵的點,本地事務(wù)做的,是業(yè)務(wù)落地和消息落地的事務(wù),而不是業(yè)務(wù)落地和RPC成功的事務(wù)。這里很多人容易混淆,如果是后者,無疑是事務(wù)嵌套RPC,是大忌,會有長事務(wù)死鎖等各種風(fēng)險。
而消息只要成功落地,很大程度上就沒有丟失的風(fēng)險(磁盤物理損壞除外)。而消息只要投遞到服務(wù)端確認后本地才做刪除,就完成了producer->broker的可靠投遞,并且當(dāng)消息存儲異常時,業(yè)務(wù)也是可以回滾的。
本地事務(wù)存在兩個最大的使用障礙:1、配置較為復(fù)雜,“綁架”業(yè)務(wù)方,必須本地數(shù)據(jù)庫實例提供一個庫表。2、對于消息延遲高敏感的業(yè)務(wù)不適用。
話說回來,不是每個業(yè)務(wù)都需要強事務(wù)的。扣錢和加錢需要事務(wù)保證,但下單和生成短信卻不需要事務(wù),不能因為要求發(fā)短信的消息存儲投遞失敗而要求下單業(yè)務(wù)回滾。所以,一個完整的消息隊列應(yīng)該定義清楚自己可以投遞的消息類型,如事務(wù)型消息,本地非持久型消息,以及服務(wù)端不落地的非可靠消息等。對不同的業(yè)務(wù)場景做不同的選擇。另外事務(wù)的使用應(yīng)該盡量低成本、透明化,可以依托于現(xiàn)有的成熟框架,如Spring的聲明式事務(wù)做擴展。業(yè)務(wù)方只需要使用@Transactional標(biāo)簽即可。
性能相關(guān)
異步/同步
首先澄清一個概念,異步,同步和oneway是三件事。異步,歸根結(jié)底你還是需要關(guān)心結(jié)果的,但可能不是當(dāng)時的時間點關(guān)心,可以用輪詢或者回調(diào)等方式處理結(jié)果;同步是需要當(dāng)時關(guān)心的結(jié)果的;而oneway是發(fā)出去就不管死活的方式,這種對于某些完全對可靠性沒有要求的場景還是適用的,但不是我們重點討論的范疇。
回歸來看,任何的RPC都是存在客戶端異步與服務(wù)端異步的,而且是可以任意組合的:客戶端同步對服務(wù)端異步,客戶端異步對服務(wù)端異步,客戶端同步對服務(wù)端同步,客戶端異步對服務(wù)端同步。
對于客戶端來說,同步與異步主要是拿到一個Result,還是Future(Listenable)的區(qū)別。實現(xiàn)方式可以是線程池,NIO或者其他事件機制,這里先不展開講。
服務(wù)端異步可能稍微難理解一點,這個是需要RPC協(xié)議支持的。參考servlet 3.0規(guī)范,服務(wù)端可以吐一個future給客戶端,并且在future done的時候通知客戶端。
整個過程可以參考下面的代碼:
客戶端同步服務(wù)端異步。
客戶端同步服務(wù)端同步。
客戶端異步服務(wù)端同步(這里用線程池的方式)。
客戶端異步服務(wù)端異步。
上面說了這么多,其實是想讓大家脫離兩個誤區(qū):1、RPC只有客戶端能做異步,服務(wù)端不能。2、異步只能通過線程池。
那么,服務(wù)端使用異步最大的好處是什么呢?說到底,是解放了線程和I/O。試想服務(wù)端有一堆I/O等待處理,如果每個請求都需要同步響應(yīng),每條消息都需要結(jié)果立刻返回,那么就幾乎沒法做I/O合并(當(dāng)然接口可以設(shè)計成batch的,但可能batch發(fā)過來的仍然數(shù)量較少)。而如果用異步的方式返回給客戶端future,就可以有機會進行I/O的合并,把幾個批次發(fā)過來的消息一起落地(這種合并對于MySQL等允許batch insert的數(shù)據(jù)庫效果尤其明顯),并且徹底釋放了線程。不至于說來多少請求開多少線程,能夠支持的并發(fā)量直線提高。
來看第二個誤區(qū),返回future的方式不一定只有線程池。換句話說,可以在線程池里面進行同步操作,也可以進行異步操作,也可以不使用線程池使用異步操作(NIO、事件)。
回到消息隊列的議題上,我們當(dāng)然不希望消息的發(fā)送阻塞主流程(前面提到了,server端如果使用異步模型,則可能因消息合并帶來一定程度上的消息延遲),所以可以先使用線程池提交一個發(fā)送請求,主流程繼續(xù)往下走。
但是線程池中的請求關(guān)心結(jié)果嗎?Of course,必須等待服務(wù)端消息成功落地,才算是消息發(fā)送成功。所以這里的模型,準(zhǔn)確地說事客戶端半同步半異步(使用線程池不阻塞主流程,但線程池中的任務(wù)需要等待server端的返回),server端是純異步??蛻舳说木€程池wait在server端吐回的future上,直到server端處理完畢,才解除阻塞繼續(xù)進行。
總結(jié)一句,同步能夠保證結(jié)果,異步能夠保證效率,要合理的結(jié)合才能做到最好的效率。
批量
談到批量就不得不提生產(chǎn)者消費者模型。但生產(chǎn)者消費者模型中最大的痛點是:消費者到底應(yīng)該何時進行消費。大處著眼來看,消費動作都是事件驅(qū)動的。主要事件包括:
1、攢夠了一定數(shù)量。
2、到達了一定時間。
3、隊列里有新的數(shù)據(jù)到來。
對于及時性要求高的數(shù)據(jù),可用采用方式3來完成,比如客戶端向服務(wù)端投遞數(shù)據(jù)。只要隊列有數(shù)據(jù),就把隊列中的所有數(shù)據(jù)刷出,否則將自己掛起,等待新數(shù)據(jù)的到來。
在第一次把隊列數(shù)據(jù)往外刷的過程中,又積攢了一部分數(shù)據(jù),第二次又可以形成一個批量。偽代碼如下:
這種方式是消息延遲和批量的一個比較好的平衡,但優(yōu)先響應(yīng)低延遲。延遲的最高程度由上一次發(fā)送的等待時間決定。但可能造成的問題是發(fā)送過快的話批量的大小不夠滿足性能的極致。
相反對于可以用適量的延遲來換取高性能的場景來說,用定時/定量二選一的方式可能會更為理想,既到達一定數(shù)量才發(fā)送,但如果數(shù)量一直達不到,也不能干等,有一個時間上限。
具體說來,在上文的submit之前,多判斷一個時間和數(shù)量,并且Runnable內(nèi)部維護一個定時器,避免沒有新任務(wù)到來時舊的任務(wù)永遠沒有機會觸發(fā)發(fā)送條件。對于server端的數(shù)據(jù)落地,使用這種方式就非常方便。
為什么網(wǎng)絡(luò)請求小包合并成大包會提高性能?主要原因有兩個:
1、減少無謂的請求頭,如果你每個請求只有幾字節(jié),而頭卻有幾十字節(jié),無疑效率非常低下。
2、減少回復(fù)的ack包個數(shù)。把請求合并后,ack包數(shù)量必然減少,確認和重發(fā)的成本就會降低。
上文提到的消息隊列,大多是針對push模型的設(shè)計?,F(xiàn)在市面上有很多經(jīng)典的也比較成熟的pull模型的消息隊列,如Kafka、MetaQ等。這跟JMS中傳統(tǒng)的push方式有很大的區(qū)別,可謂另辟蹊徑。