我們知道RocketMQ主要分為消息 生產(chǎn)、存儲(消息堆積)、消費 三大塊領(lǐng)域。
那接下來,我們白話一下,RocketMQ是如何存儲消息的,揭秘消息存儲全過程。
注意,如果白話中不小心提到相關(guān)代碼配置與類名,請參考RocketMQ 4.9.4版本
RocketMQ使用了一種基于日志的存儲方式,將消息以順序?qū)懭氲姆绞阶芳拥轿募校瑥亩鴮崿F(xiàn)高性能的消息存儲和讀取。
RocketMQ的消息存儲方式可以分為兩個類型:CommitLog 和ConsumeQueue 。
圖片
還有一個文件類型是indexfile,主要用于控制臺消息檢索,不影響消息的寫入與消費,我們就不展開了。
CommitLog文件存儲了Producer端寫入的消息主體內(nèi)容,它以追加寫入的方式將消息存儲到磁盤上的文件中。
單個文件大小默認1G ,文件名長度為20位(左邊補零,剩余為起始偏移量),當(dāng)文件滿了,寫入下一個文件。
比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)?shù)谝粋€文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。
它的主要特點是:順序?qū)懀请S機讀(被ConsumeQueue讀取)。
雖然是隨機讀,但是利用package機制,可以批量地從磁盤讀取,作為cache存到內(nèi)存中,加速后續(xù)的讀取速度。
Broker單個實例下所有的隊列共用一個日志數(shù)據(jù)文件CommitLog來存儲。而Kafka采用的是獨立型的存儲結(jié)構(gòu),每個隊列一個文件。
ConsumeQueue文件是用于支持消息消費的存儲結(jié)構(gòu)。保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
消費者 通過 順序讀取 ConsumeQueue文件,可以快速定位到消息在CommitLog中的物理存儲位置,從而實現(xiàn)快速消息的拉取和消費。
從實際物理存儲的角度來看,每個主題Topic下的每個隊列Queue對應(yīng)一個ConsumeQueue文件。
生產(chǎn)者端的消息是順序?qū)懭隒ommitLog,消費者端是順序讀取ConsumeQueue。但是根據(jù)ConsumeQueue的起始物理位置偏移量offset讀取消息真實內(nèi)容,實際是隨機讀取CommitLog。實現(xiàn)了 消息生產(chǎn)與消息消費、數(shù)據(jù)存儲和數(shù)據(jù)索引 相互分離。
Broker在把消息寫入日志文件的過程中,如果在剛收到消息時,Broker異常宕機了,那么內(nèi)存中尚未寫入磁盤的消息就會丟失了。
因此,RocketMQ持久化消息分為兩種:同步刷盤和異步刷盤(默認配置)。
異步刷盤是指Broker收到消息后先存儲到PageCache,然后立即通知Producer消息已存儲成功,可以繼續(xù)處理業(yè)務(wù)邏輯。此后,Broker會啟動一個異步線程將消息持久化到磁盤。然而,如果Broker在持久化到磁盤之前發(fā)生故障,消息將會丟失。
## 刷盤策略配置flushDiskType = ASYNC_FLUSH
注意,寫入PageCache后,應(yīng)用服務(wù)宕機消息不丟失,只有機器斷電或宕機會有少量消息丟失。
相比之下,同步刷盤的方式是在消息存儲到緩存后不立即通知Producer,而是等待消息被持久化到磁盤后再通知Producer。這種方式確保了消息不會丟失,但性能不如異步刷盤高。一般用于金融業(yè)務(wù)。
## 刷盤策略配置flushDiskType = SYNC_FLUSH
在選擇刷盤方式時,需要根據(jù)業(yè)務(wù)場景進行權(quán)衡。
即使Broker采用同步刷盤策略,但如果刷盤完成后磁盤損壞,會導(dǎo)致所有存儲在磁盤上的消息丟失。
即使采用了主從復(fù)制,如果主節(jié)點在刷盤完成后還沒有來得及將數(shù)據(jù)同步給從節(jié)點就發(fā)生了磁盤故障,同樣會導(dǎo)致數(shù)據(jù)丟失。
所以我們可以配置同步機制,等待從節(jié)點復(fù)制完成主節(jié)點的消息后,才去通知Producer完成了消息存儲。
## 主從同步策略配置brokerRole=SYNC_MASTER
RocketMQ通過使用內(nèi)存映射文件(包括CommitLog、 ConsumeQueue等文件)來提高IO訪問性能,也就是我們常說的零拷貝技術(shù)。
Java在NIO包里,引入了sendFile(FileChannel類)和MMAP(MappedByteBuffer類)兩種實現(xiàn)方式的零拷貝技術(shù)。
主流的MQ都會使用零拷貝技術(shù),來提升IO:
在不開啟RocketMQ的內(nèi)存映射增強方案時,RocketMQ的讀和寫都只會簡單直接使用MMAP。
但是,MappedByteBuffer也存在一些缺陷:
為此,RocketMQ通過transientStorePoolEnable參數(shù)控制,對寫入進行了優(yōu)化。
如果開啟了這個參數(shù),會將寫入拆分為兩步, 寫入緩沖區(qū) + 異步刷盤 的增強策略。
## 刷盤策略配置flushDiskType = ASYNC_FLUSH transientStorePoolEnable = true
MappedFile會提前申請一塊直接內(nèi)存用作緩沖區(qū),放棄使用mmap直接寫文件。
數(shù)據(jù)先寫入緩沖區(qū),然后異步線程每200ms(且臟數(shù)據(jù)達到16K,commitCommitLogLeastPages = 4)將緩沖區(qū)的數(shù)據(jù)commit寫入FileChannel。
再喚醒定時服務(wù)(FlushRealTimeService類)將FileChannel里的數(shù)據(jù)持久化到磁盤。flush函數(shù)和commit一樣也可以傳入一個刷盤頁數(shù),當(dāng)臟頁數(shù)量達到16K時(flushLeastPages = 4),會進行刷盤操作,調(diào)用FileChannel的force將內(nèi)存中的數(shù)據(jù)持久化到磁盤。
開啟transientStorePoolEnable參數(shù)后,性能最好,但是相對來說持久化最不可靠
RocketMQ 使用存儲時長作為消息存儲的依據(jù),即每個節(jié)點對外承諾消息的存儲時長。在存儲時長范圍內(nèi)的消息都會被保留,無論消息是否被消費;超過時長限制的消息則會被清理掉。
需要注意的是,在RocketMQ中,消息存儲時長并不能完整控制消息的實際保存時間。
因為消息存儲仍然使用本地磁盤,本地磁盤空間不足時,為保證服務(wù)穩(wěn)定性消息仍然會被強制清理,導(dǎo)致消息的實際保存時長小于設(shè)置的保存時長。
建議在存儲成本可控的前提下,盡可能延長消息存儲時長。延長消息存儲時長,可以為緊急故障恢復(fù)、應(yīng)急問題排查和消息回溯帶來更多的可操作空間。
本文鏈接:http://www.www897cc.com/showinfo-26-5709-0.html三分鐘白話RocketMQ系列—— 如何存儲消息
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com