色婷婷狠狠18禁久久YY,CHINESE性内射高清国产,国产女人18毛片水真多1,国产AV在线观看

rocketmq延時(shí)隊(duì)列實(shí)現(xiàn)原理

rocketmq延時(shí)隊(duì)列實(shí)現(xiàn)原理?

RocketMQ是一款開源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù),提供低延時(shí)的、高可靠、萬億級(jí)容量、靈活可伸縮的消息發(fā)布與訂閱服務(wù)。

它前身是MetaQ,是阿里基于Kafka的設(shè)計(jì)使用Java進(jìn)行自主研發(fā)的。在2012年,阿里將其開源, 在2016年,阿里將其捐獻(xiàn)給Apache軟件基金會(huì)(Apache Software Foundation,簡(jiǎn)稱為ASF),正式成為孵化項(xiàng)目。2017 年,Apache軟件基金會(huì)宣布RocketMQ已孵化成為 Apache頂級(jí)項(xiàng)目(Top Level Project,簡(jiǎn)稱為TLP ),是國(guó)內(nèi)首個(gè)互聯(lián)網(wǎng)中間件在 Apache上的頂級(jí)項(xiàng)目。

延遲消息

生產(chǎn)者把消息發(fā)送到消息隊(duì)列中以后,并不期望被立即消費(fèi),而是等待指定時(shí)間后才可以被消費(fèi)者消費(fèi),這類消息通常被稱為延遲消息。

在RocketMQ中,支持延遲消息,但是不支持任意時(shí)間精度的延遲消息,只支持特定級(jí)別的延遲消息。如果要支持任意時(shí)間精度,不能避免在Broker層面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免產(chǎn)生巨大的性能開銷。

消息延遲級(jí)別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個(gè)級(jí)別。在發(fā)送消息時(shí),設(shè)置消息延遲級(jí)別即可,設(shè)置消息延遲級(jí)別時(shí)有以下3種情況:

設(shè)置消息延遲級(jí)別等于0時(shí),則該消息為非延遲消息。設(shè)置消息延遲級(jí)別大于等于1并且小于等于18時(shí),消息延遲特定時(shí)間,如:設(shè)置消息延遲級(jí)別等于1,則延遲1s;設(shè)置消息延遲級(jí)別等于2,則延遲5s,以此類推。設(shè)置消息延遲級(jí)別大于18時(shí),則該消息延遲級(jí)別為18,如:設(shè)置消息延遲級(jí)別等于20,則延遲2h。

延遲消息示例

首先,寫一個(gè)消費(fèi)者,用于消費(fèi)延遲消息:

再寫一個(gè)延遲消息的生產(chǎn)者,用于發(fā)送延遲消息:

運(yùn)行生產(chǎn)者以后,就會(huì)發(fā)送一條延遲消息:

10秒鐘后,消費(fèi)者收到的這條延遲消息:

延遲消息的原理分析

以下分析的RocketMQ源碼的版本號(hào)是4.7.1,版本不同源碼略有差別。

CommitLog

在org.apache.rocketmq.store.CommitLog中,針對(duì)延遲消息做了一些處理:

可以看到,每一個(gè)延遲消息的主題都被暫時(shí)更改為SCHEDULE_TOPIC_XXXX,并且根據(jù)延遲級(jí)別延遲消息變更了新的隊(duì)列Id。接下來,處理延遲消息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。

ScheduleMessageService

ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore進(jìn)行初始化的,初始化包括構(gòu)造對(duì)象和調(diào)用load方法。最后,再執(zhí)行ScheduleMessageService的start方法:

遍歷所有延遲級(jí)別,根據(jù)延遲級(jí)別獲得對(duì)應(yīng)隊(duì)列的偏移量,如果偏移量不存在,則設(shè)置為0。然后為每個(gè)延遲級(jí)別創(chuàng)建定時(shí)任務(wù),第一次啟動(dòng)任務(wù)延遲為1秒,第二次及以后的啟動(dòng)任務(wù)延遲才是延遲級(jí)別相應(yīng)的延遲時(shí)間。

然后,又創(chuàng)建了一個(gè)定時(shí)任務(wù),用于持久化每個(gè)隊(duì)列消費(fèi)的偏移量。持久化的頻率由flushDelayOffsetInterval屬性進(jìn)行配置,默認(rèn)為10秒。

定時(shí)任務(wù)

ScheduleMessageService的start方法執(zhí)行之后,每個(gè)延遲級(jí)別都創(chuàng)建自己的定時(shí)任務(wù),這里的定時(shí)任務(wù)的具體實(shí)現(xiàn)就在DeliverDelayedMessageTimerTask類之中,它核心代碼是executeOnTimeup方法之中,我們來看一下主要部分:

如果沒有獲取到對(duì)應(yīng)的消息隊(duì)列,則在DELAY_FOR_A_WHILE(默認(rèn)為100)毫秒后再執(zhí)行任務(wù)。如果獲取到了,就繼續(xù)執(zhí)行下面操作:

如果沒有獲取到有效消息,則在DELAY_FOR_A_WHILE(默認(rèn)為100)毫秒后再執(zhí)行任務(wù)。如果獲取到了,就繼續(xù)執(zhí)行下面操作:

如果當(dāng)前消息不到消費(fèi)的時(shí)間,則在countdown毫秒后再執(zhí)行任務(wù)。如果到消費(fèi)的時(shí)間,就繼續(xù)執(zhí)行下面操作:

如果獲取到消息,則繼續(xù)執(zhí)行下面操作:

清除了消息的延遲級(jí)別,并且恢復(fù)了真正的消息主題和隊(duì)列Id,重新把消息發(fā)送到真正的消息隊(duì)列上以后,消費(fèi)者就可以立即消費(fèi)了。

總結(jié)

經(jīng)過以上對(duì)源碼的分析,可以總結(jié)出延遲消息的實(shí)現(xiàn)步驟:

如果消息的延遲級(jí)別大于0,則表示該消息為延遲消息,修改該消息的主題為SCHEDULE_TOPIC_XXXX,隊(duì)列Id為延遲級(jí)別減1。消息進(jìn)入SCHEDULE_TOPIC_XXXX的隊(duì)列中。定時(shí)任務(wù)根據(jù)上次拉取的偏移量不斷從隊(duì)列中取出所有消息。根據(jù)消息的物理偏移量和大小再次獲取消息。根據(jù)消息屬性重新創(chuàng)建消息,清除延遲級(jí)別,恢復(fù)原主題和隊(duì)列Id。重新發(fā)送消息到原主題的隊(duì)列中,供消費(fèi)者進(jìn)行消費(fèi)。

java毫秒,rocketmq延時(shí)隊(duì)列實(shí)現(xiàn)原理