State是指流計算過程中計算節點的中間計算結果或元數據屬性,比如 在aggregation過程中要在state中記錄中間聚合結果,比如 Apache Kafka 作為數據源時候,我們也要記錄已經讀取記錄的offset,這些State數據在計算過程中會進行持久化(插入或更新)。本文將詳細介紹一下Flink State,通過本文,你可以了解到:
感謝關注,希望本文對你有所幫助。
Flink 中的狀態分為兩種主要類型:Keyed State 和 Operator State。
值得注意的是:
在 Flink 的 Table API 或 SQL API 中,對于內部的 GroupBy/PartitionBy 操作,Flink 會自動管理 Keyed State。而對于 Source Connector 記錄 offset 這樣的操作,通常是在底層的 DataStream API 中實現的,可能直接使用 Operator State 來管理。例如,Flink Kafka Consumer 會使用 Operator State 來存儲 Kafka 主題的分區 offset,以便在發生故障時能夠從上次成功的檢查點恢復。
State的具體存儲、訪問和維護是由**狀態后端(state backend)**決定的。狀態后端主要負責兩件事情:
Flink提供了三種狀態后端:
RocksDB是一個嵌入式鍵值存儲(key-value store),它可以將數據保存到本地磁盤上,為了從RocksDB中讀寫數據,系統需要對數據進行序列化和反序列化。
在選擇狀態后端時,需要考慮應用的狀態大小、恢復速度、持久性和部署環境。對于生產環境,通常推薦使用 RocksDBStateBackend,因為它能夠提供良好的擴展性和容錯性。
在 Apache Flink 中,對于有狀態的流處理作業,當作業進行擴容(scaling out)或縮容(scaling in)時,即增加或減少并行子任務的數量時,Flink 需要重新分配 OperatorState。這個過程稱為狀態重分配(state redistribution)。
對于 Operator State 的擴容處理,Flink 提供了不同的重分配模式來處理狀態:
對于 ListState 類型的 Operator State,如果流任務的并行度從 N 增加到 M,Flink 會將每個并行實例的狀態分成 M 份,然后將這些分片分配給新的并行實例。如果并行度減少,則相反,狀態將會聚合起來。
圖片
擴容時:
縮容時:
BroadcastState 的數據在擴容或縮容時會被復制到所有的并行實例中。由于 BroadcastState 是以廣播的方式存儲數據,所有并行實例的狀態都是相同的。
圖片
對于 UnionListState 類型的 Operator State,在擴容或縮容時,狀態的每個元素將保持不變,原始狀態的所有元素將被統一地分發到新的并行實例中。這意味著每個元素僅分配給一個并行實例,但所有并行實例的狀態的并集會包括所有原始狀態的元素。隨后由任務自己決定哪些條目該保留,哪些該丟棄。
圖片
思考:Source的擴容(并發數)是否可以超過Source物理存儲的partition數量呢?
在使用像 Apache Kafka 這樣的消息隊列作為數據源(Source)時,消息隊列中的數據被劃分為多個分區(partitions)。這種設計主要是為了支持數據的并行處理以及提高吞吐量。在使用 Flink 或類似的流處理框架時,一個常見的做法是將每個分區分配給一個并行的 Source 實例(也稱為 Source Task 或 Source Operator)進行處理。
如果嘗試將 Source 的并行度(并發數)設置得比物理存儲(比如 Kafka 主題)的分區數量還要高,那么將會有一些并行實例分配不到任何分區,因為分區的數量是固定的,且每個分區只能被一個并行實例消費(至少在 Flink 的默認設置下是這樣)。這會導致資源浪費,因為超出分區數量的那部分并行實例不會做任何實際的數據處理工作,但仍然占用系統資源。
因此,在設置 Source 的并行度時,通常的最佳實踐是:
如果需要增加并行度以提高處理能力,相應地也需要增加物理存儲的分區數量。對于 Kafka 來說,可以通過修改主題的分區配置來實現。
對于 Apache Flink,如果使用的是 Flink Kafka Connector,并且嘗試將并行度設置得比 Kafka 主題的分區數量還要高,Flink 會在作業啟動時進行檢查。如果發現這種配置不匹配的情況,Flink 會拋出異常并終止作業啟動,以避免資源浪費和潛在的配置錯誤。這種設計選擇確保了資源的有效利用和處理能力的合理分配,同時也避免了由于配置錯誤而導致的潛在問題。
KeyedState的算子在擴容時會根據新的任務數量對key進行重分區,為了降低狀態在不同任務之間遷移的成本,Flink不會單獨對key進行在分配,而是會把所有的鍵值分別存到不同的key-group中,每個key-group都包含了部分鍵值對。一個key-group是State分配的原子單位。
key-group的數量在job啟動前必須是確定的且運行中不能改變。由于key-group是state分配的原子單位,而每個operator并行實例至少包含一個key-group,因此operator的最大并行度不能超過設定的key-group的個數,那么在Flink的內部實現上key-group的數量就是最大并行度的值。
為了決定一個key屬于哪個Key-Group,通常會采用一種叫做一致性哈希(Consistent Hashing)的算法。一致性哈希算法的基本思想是將所有的Key和所有的Key-Group都映射到同一個哈希環上。對每個Key進行哈希運算得到一個哈希值,然后在哈希環上找到一個順時針方向最近的Key-Group,這個Key就屬于這個Key-Group。即:Key到指定的key-group的邏輯是利用key的hashCode和maxParallelism取余操作的來分配的。
如下圖當parallelism=2,maxParallelism=10的情況下流上key與key-group的對應關系如下圖所示:
圖片
如上圖key(a)的hashCode是97,與最大并發10取余后是7,被分配到了KG-7中,流上每個event都會分配到KG-0至KG-9其中一個Key-Group中。
上面的Stateful Operation節點的最大并行度maxParallelism的值是10,也就是我們一共有10個Key-Group,當我們并發是2的時候和并發是3的時候分配的情況如下圖:
圖片
先計算每個Operator實例至少分配的Key-Group個數,將不能整除的部分N個,平均分給前N個實例。最終每個Operator實例管理的Key-Groups會在GroupRange中表示,本質是一個區間值。比如上圖是2->3擴容,那每個task的key-group的數量是:10/3≈3,也即是每個task先分3個key-group,然后把剩余的1個key-group分配給第一task。
值得注意的是:
Key-Group機制的特點就是每個具體的key(event)不關心落到具體的哪個task來處理,只關心會落到哪個Key-Group中:
State是Flink流計算的關鍵部分。Flink 中的狀態分為兩種主要類型:Keyed State 和 Operator State。Flink提供了三種狀態后端:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。對于Keyed State 和 Operator State應對擴縮容時有不同的分配方式。
本文鏈接:http://www.www897cc.com/showinfo-26-79982-0.html我們一起深入理解Flink State
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com