在第十章的時(shí)候,我們討論了批處理——它總是讀取一些文件作為輸入,產(chǎn)生一些新文件作為輸出。這里的輸出就是一種“衍生數(shù)據(jù)”:即,如果有需要,我們可以通過再跑一遍批處理任務(wù)獲取相同的結(jié)果集。從之前章節(jié)的討論我們可以看出,這種思想簡單卻強(qiáng)大:像搜索引擎、推薦系統(tǒng)、分析系統(tǒng)等很多現(xiàn)代常見的數(shù)據(jù)系統(tǒng)都是基于這種思想構(gòu)建的。
然而,在第十章進(jìn)行討論時(shí)我們有一個(gè)很強(qiáng)的假設(shè):輸入數(shù)據(jù)集是有界的——即事先知道輸入尺寸——因此批處理的程序知道輸入何時(shí)結(jié)束。舉個(gè)例子,MapReduce 中非常重要的排序操作,就必須讀入所有待排序的輸入數(shù)據(jù)后才能開始排序并輸出。這是因?yàn)椋詈笠粭l數(shù)據(jù),沒準(zhǔn)可能是被需要排在最前面(具有最小的 key),因此不可能過早對數(shù)據(jù)排序。
但在現(xiàn)實(shí)中,很多數(shù)據(jù)都是無界的且隨著時(shí)間持續(xù)到來的:我們的(各種服務(wù)的)用戶昨天會產(chǎn)生數(shù)據(jù)、今天會產(chǎn)生數(shù)據(jù),明天也將以同樣的方式繼續(xù)產(chǎn)生數(shù)據(jù)。除非你關(guān)門大吉,否則這些程序?qū)罒o休止地工作,因此我們的數(shù)據(jù)庫永遠(yuǎn)也不會到達(dá)一個(gè)“終態(tài)”(complete state)。因此,如果使用批處理的思想來處理這種持續(xù)來到的數(shù)據(jù)流,就會引出一個(gè)數(shù)據(jù)集切分的問題:例如,在一天結(jié)束時(shí)處理這一整天的數(shù)據(jù)、在每小時(shí)結(jié)束時(shí)處理這一小時(shí)的數(shù)據(jù)等等。
但上述切分+批處理的方式有個(gè)問題:太慢了,用戶可能等不及。比如按天處理時(shí),則其處理結(jié)果只有當(dāng)這一天結(jié)束后,再花些時(shí)間去批處理,才能最終看到結(jié)果。為了降低這個(gè)延遲,我們確實(shí)可以用更小的粒度進(jìn)行處理——比如,每秒進(jìn)行一次處理。甚而,干脆拋棄時(shí)間分片的概念,任意數(shù)據(jù)到來的時(shí)候就觸發(fā)數(shù)據(jù)處理邏輯。這就是流式處理(steam processing)背后的基本思想。
通常來說,一個(gè)“流”(steam)指的是隨時(shí)間推移而增量產(chǎn)生的數(shù)據(jù)。這個(gè)概念其實(shí)很多地方都有:Unix 中標(biāo)準(zhǔn)輸入輸出中(stdin、stdout),編程語言中(迭代器),文件系統(tǒng)相關(guān)的 API 中(如 Java 的 FileInputStream
),TCP 連接中,網(wǎng)絡(luò)中傳輸?shù)囊粢曨l等等。
在本章中,我們會將事件流(event stream)當(dāng)做一種數(shù)據(jù)管理機(jī)制:即將我們上一章討論的批量數(shù)據(jù)無界化、增量化。我們首先會討論如何表示、存儲和傳輸數(shù)據(jù)流。在“數(shù)據(jù)庫和數(shù)據(jù)流”一節(jié)中,我們會探索數(shù)據(jù)流和數(shù)據(jù)庫的管理。最后,在“處理數(shù)據(jù)流”一節(jié)中,我們將會討論對這些不間斷的數(shù)據(jù)流進(jìn)行處理的方法和工具,以及基于其構(gòu)建應(yīng)用的一些方法。
在批處理系統(tǒng)中,任務(wù)的輸入和輸出都是文件(可能是單機(jī)文件系統(tǒng)中的、也可能是分布式文件系統(tǒng)中的),那么在流式系統(tǒng)中,承載輸入和輸出的是什么呢?
在批處理系統(tǒng)中,雖然輸入是文件,但第一步也通常是解析成一系列的數(shù)據(jù)記錄(records)。在流式處理的上下中,對應(yīng)數(shù)據(jù)記錄的實(shí)體通常被稱為事件(event)。但他們本質(zhì)上都是一個(gè)東西:一段小的、自包含的(self-contained、不引用其他數(shù)據(jù))、不可變的某個(gè)時(shí)間點(diǎn)發(fā)生的信息數(shù)據(jù)。流式系統(tǒng)中的一個(gè)事件通常會包含一個(gè)時(shí)間戳,來標(biāo)志該事件在某個(gè)時(shí)鐘系統(tǒng)(time-of-day clock)中發(fā)生的時(shí)間點(diǎn)。
下面舉幾個(gè)事件的例子。事件可以是由用戶活動(dòng)產(chǎn)生的,如瀏覽網(wǎng)頁、網(wǎng)上購物;也可以由機(jī)器產(chǎn)生,如周期性的溫度傳感器、CPU 利用率指標(biāo);在使用Unix工具進(jìn)行批處理一節(jié)的例子中,我們提到的 web 服務(wù)器中的每一行日志,也是一個(gè)事件。
我們在第四章中討論過數(shù)據(jù)編碼的事情。事件本質(zhì)上也是數(shù)據(jù),因此可以被編碼為字符串、JSON 或者二進(jìn)制形式。只有編碼之后,事件才能被存儲,如:
也只有在編碼之后,事件才能夠在網(wǎng)絡(luò)中進(jìn)行傳輸,以發(fā)送到其他工作節(jié)點(diǎn)進(jìn)行處理。
在批處理系統(tǒng)中,一個(gè)文件通常是一次寫多次讀的。類似的,在流式處理系統(tǒng)中,一個(gè)事件在被生產(chǎn)者(producer,在不同系統(tǒng)中,也可以稱為 publisher 或者 sender)生成之后,可能會被多個(gè)感興趣的消費(fèi)者(consumer,對應(yīng)的,也可以稱為 subscribers 和 recipients)處理。在文件系統(tǒng)中,文件名可以標(biāo)識一組數(shù)據(jù)記錄;在流式系統(tǒng)中,相關(guān)的事件通常會聚攏到主題(topic)下或者流(stream)中。換句話說,命名后的流類似于文件,但不同的是,流中的是無界數(shù)據(jù)。
原則上,使用文件或者數(shù)據(jù)庫也足夠用以溝通生產(chǎn)者和消費(fèi)者:
批處理系統(tǒng)在以天為粒度處理數(shù)據(jù)時(shí),正是用的這種辦法。
但是,在放到低延遲的持續(xù)數(shù)據(jù)流的上下文中時(shí),如果存儲系統(tǒng)不是專門為此定制的,定時(shí)去拉取(polling)數(shù)據(jù)的代價(jià)會變得很高。且,在數(shù)據(jù)量一定的情況下,你拉取的頻次越高,單次拉到新數(shù)據(jù)的概率就越低,則無效負(fù)載也會隨之升高。因此,在流式系統(tǒng)中,當(dāng)有新事件產(chǎn)生時(shí),按需通知消費(fèi)者會比頻發(fā)拉取更高效(即推比拉高效)。
傳統(tǒng)上,數(shù)據(jù)庫對于這種通知機(jī)制支持的并不是很好:雖然關(guān)系型數(shù)據(jù)中的確有觸發(fā)器(triggers),且可以對數(shù)據(jù)表中的一些事件(如,新插入一行)做出響應(yīng),但響應(yīng)邏輯中能做的很有限(比如做一致性檢查),且通常局限在數(shù)據(jù)庫內(nèi)部(而不能通知到客戶端)。為此,一些專用的工具被開發(fā)出來以進(jìn)行專門的事件通知。
通知消費(fèi)者有新事件產(chǎn)生的一個(gè)常見方法是消息系統(tǒng)(messaging system):生產(chǎn)者將事件以消息的形式發(fā)送到消息系統(tǒng),消息系統(tǒng)將其推送給消費(fèi)者。我們在經(jīng)由消息傳遞的數(shù)據(jù)流一節(jié)簡單提過消息系統(tǒng),本節(jié)我們將會討論更多細(xì)節(jié)。
實(shí)現(xiàn)消息系統(tǒng)最簡單的方式,就是使用 Unix 管道或者 TCP連接來溝通生產(chǎn)者和消費(fèi)者。但大部分消息系統(tǒng)不會如此簡單。比如,Unix 管道和 TCP 連接都是一對一的發(fā)送者和接受者,但成熟的消息系統(tǒng)通常要支持多對多的生產(chǎn)消費(fèi)——即多個(gè)生產(chǎn)者可以將數(shù)據(jù)發(fā)送到一個(gè)主題( topic )下,多個(gè)消費(fèi)者可以共通消費(fèi)這個(gè) topic。
但在這種發(fā)布/訂閱(publish/subscribe)模式之下,不同具體的系統(tǒng)實(shí)現(xiàn)方式千差萬別。沒有一種方案能滿足所有需求。為了理解不同系統(tǒng)的實(shí)現(xiàn),我們可以帶著兩個(gè)問題去考察各個(gè)系統(tǒng):
是否能夠接受消息丟失取決于應(yīng)用層。例如,對于一些周期性上報(bào)的傳感器讀數(shù)來說,偶爾的一兩個(gè)采點(diǎn)的丟失影響不大, 因?yàn)楹竺娴臄?shù)據(jù)會很快的報(bào)上來。然而需要注意,如果消息大面積的丟失,可能也很難立即看出來。另外,如果你的目標(biāo)是對所有到來的事件進(jìn)行計(jì)數(shù),則每條信息都要可靠的傳輸,因?yàn)槿魏我粭l信息的丟失都會導(dǎo)致計(jì)數(shù)錯(cuò)誤。
我們在上一章中討論過批處理的一個(gè)非常友好的性質(zhì)——提供很好的容錯(cuò)保證。即,所有失敗的子任務(wù)會自動(dòng)的進(jìn)行重試、所有失敗任務(wù)的部分輸出會被丟棄。這種做法會讓系統(tǒng)看起來像沒有發(fā)生過任何故障一樣,從而可以讓應(yīng)用層大大簡化編程模型(這些分布式故障如果系統(tǒng)不處理,就要應(yīng)用層自己來處理)。在本章稍后的部分,我們會探討如何在流式處理的上下文中提供類似的保證。
很多消息系統(tǒng)并不借助中間系統(tǒng)節(jié)點(diǎn),而直接使用網(wǎng)絡(luò)來溝通生產(chǎn)者和消費(fèi)者雙方:
這種直接消息系統(tǒng)在其目標(biāo)場景中通常能夠工作的很好,但需要應(yīng)用層代碼自己承擔(dān)、處理消息丟失的可能性。此外,這些系統(tǒng)能夠進(jìn)行的容錯(cuò)很有限:雖然這些系統(tǒng)在檢測到丟包后會進(jìn)行重傳,但它們通常會假設(shè)生產(chǎn)者和消費(fèi)者都一直在線(這是一個(gè)很強(qiáng)的假設(shè))。
如果消費(fèi)者由于某種原因下線了,它可能會錯(cuò)過一些消息。有些協(xié)議會允許生產(chǎn)者重發(fā)失敗的消息,但如果生產(chǎn)者也掛了,這種方法也無濟(jì)于事——生產(chǎn)者會丟掉保存有需要進(jìn)行重試的消息緩存。
這本質(zhì)上是因?yàn)椋@些沒有 broker 的消息系統(tǒng)多表現(xiàn)為庫的形式,本身是沒有狀態(tài)的。如果沒有狀態(tài),就沒有辦法應(yīng)對消息傳輸過程中生產(chǎn)者、消費(fèi)者宕機(jī)重啟的故障。這也是引入 broker 的初衷,但因此消息系統(tǒng)也會變的更加重。
一種廣泛使用的替代方案就是使用消息代理(message broker,也稱為消息隊(duì)列)來發(fā)送消息。消息代理本質(zhì)上是一種專門為消息數(shù)據(jù)優(yōu)化過的數(shù)據(jù)庫。它通常以進(jìn)程的形式跑在服務(wù)器上,生產(chǎn)者和消費(fèi)者作為客戶端與之通信。生產(chǎn)者將消息寫入消息代理,消費(fèi)者從其中讀取以進(jìn)行消費(fèi)。
通過引入一個(gè)消息數(shù)據(jù)存儲代理,消息系統(tǒng)可以更加容易的對客戶端(包括生產(chǎn)者和消費(fèi)者)的來來去去(連接、失聯(lián)和宕機(jī))進(jìn)行容錯(cuò)。這樣,數(shù)據(jù)的持久化職責(zé)被轉(zhuǎn)移到了消息代理上。有些系統(tǒng)中的消息代理將數(shù)據(jù)保存在內(nèi)存中,那么宕機(jī)重啟就仍然有問題;但另一些系統(tǒng)中的消息代理就會把消息持久化到硬盤(通常可配置)中,則就可以容忍宕機(jī)問題。如果遇到慢的消費(fèi)者,就可以使用無限隊(duì)列的方式(而不是丟消息或者背壓)對沒來得及消費(fèi)的數(shù)據(jù)進(jìn)行緩存,當(dāng)然通常來說,能夠存多少數(shù)據(jù)通常也會以配置的方式交給用戶去選擇。
使用消息代理的另外一個(gè)原因是消費(fèi)者通常是異步消費(fèi)的:即當(dāng)發(fā)送一條消息后,生產(chǎn)者等待消息代理確認(rèn)收到(緩存或者持久化)就會結(jié)束,而不會去等待這條消息最終被消費(fèi)者所消費(fèi)。而消息最終被消費(fèi)者所消費(fèi),會發(fā)生在將來的某個(gè)時(shí)間點(diǎn)——大多數(shù)很快,比如幾秒內(nèi),但如果出現(xiàn)大量消息積壓時(shí),這個(gè)時(shí)間也可能會很久。
有一些消息代理甚至能夠參與兩階段提交(使用 XA 或者 JTA,參見 實(shí)踐中的分布式事務(wù) )。這種功能讓消息代理看起來非常像數(shù)據(jù)庫,盡管在實(shí)踐中他們有一些非常重要的區(qū)別:
以上都是傳統(tǒng)視角下的消息代理,這些語義被抽象成了像 JMS 和 AMQP 之類的協(xié)議,并且為 RabbitMQ、ActiveMQ、HornetQ、Qpid、TIBCO 企業(yè)消息服務(wù)、IBM MQ、Azure Service Bus 和 Google Cloud Pub/Sub 等系統(tǒng)實(shí)現(xiàn)。
當(dāng)多個(gè)消費(fèi)者同時(shí)消費(fèi)一個(gè) topic 下的數(shù)據(jù)時(shí),有兩種主要的消費(fèi)方式,
負(fù)載均衡和扇出模式對比
兩種消費(fèi)模式也可以組合起來:如有兩組用戶都訂閱了某個(gè) topic,組間進(jìn)行獨(dú)立消費(fèi)(fan-out)、組內(nèi)進(jìn)行互斥消費(fèi)(load balancing)。
消費(fèi)者可能會在任意時(shí)刻宕機(jī),因此可能會出現(xiàn):消息代理將消息發(fā)送給了消費(fèi)者,但是消費(fèi)者卻沒有對其進(jìn)行消費(fèi)或者僅進(jìn)行了部分消費(fèi),就宕機(jī)了。為了保證該消息不丟,消息代理使用了一種確認(rèn)機(jī)制(類似 TCP 中的 ack):每個(gè)消費(fèi)者必須顯式地告訴消息代理它消費(fèi)完了消息,這樣消息代理才能安全的將消息從隊(duì)列中刪除。
如果消息代理和消費(fèi)者之間的鏈接關(guān)閉或者超時(shí)了,消息代理仍然沒有收到確認(rèn),則會假設(shè)消息沒有被處理,并且重新給另一個(gè)消費(fèi)者發(fā)送消息。但此時(shí)有可能出現(xiàn),在重發(fā)之前消息實(shí)際已經(jīng)被處理過了,只是確認(rèn)消息由于網(wǎng)絡(luò)的原因丟失了。在這種情況下,需要消費(fèi)者進(jìn)行冪等消費(fèi)。
在負(fù)載均衡模式下,重傳可能會造成消費(fèi)者處理消息的亂序。在下圖中,在沒有任何故障時(shí),消費(fèi)者大體是按照消息的生產(chǎn)順序來消費(fèi)的。然而,某一時(shí)刻,消費(fèi)者 2 號在處理消息 m3 時(shí)宕機(jī)了,此時(shí)消費(fèi)者 1 號正在處理消息 m4。由于遲遲沒有等到 m3 的消費(fèi)確認(rèn),消息代理將其重新發(fā)送給了消費(fèi)者 1 號,從而導(dǎo)致消費(fèi)者 1 號以 m4,m3,m5 的順序來處理的消息。即,發(fā)生了亂序處理。
負(fù)載均衡導(dǎo)致的消息亂序
即使消息代理試圖以順序的方式給消費(fèi)者發(fā)送消息(JMS 和 AMQP 都有此類規(guī)定),但由于負(fù)載均衡和重傳機(jī)制的組合,亂序消費(fèi)難以避免。為了避免這個(gè)問題,你可以讓每個(gè)消費(fèi)者使用單獨(dú)的隊(duì)列(即,不用負(fù)載均衡功能,也可以理解,畢竟并行總是有代價(jià)的)。在每條消息都是互相獨(dú)立時(shí),亂序消費(fèi)不是問題;但如果消息間有前后因果依賴,則消息的保序消費(fèi)非常重要。
[1]DDIA 讀書分享會: https://ddia.qtmuniao.com/
本文鏈接:http://www.www897cc.com/showinfo-26-76527-0.htmlDDIA:消息系統(tǒng)—生產(chǎn)者和消費(fèi)者的游戲?
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com