日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

聊聊Flink:Flink的分區機制

來源: 責編: 時間:2024-02-29 14:43:53 189觀看
導讀一、前言flink任務在執行過程中,一個流(stream)包含一個或多個分區(Stream partition)。TaskManager中的一個slot的subtask就是一個stream partition(流分區),一個Job的流(stream)分布在多個不同的Slot上執行。每一個算子可以包

一、前言

flink任務在執行過程中,一個流(stream)包含一個或多個分區(Stream partition)。TaskManager中的一個slot的subtask就是一個stream partition(流分區),一個Job的流(stream)分布在多個不同的Slot上執行。每一個算子可以包含一個或多個子任務(subtask),這些subtask執行在不同的分區中,本質是在不同的線程、不同的物理機或不同的容器中彼此互不依賴地執行。ExL28資訊網——每日最新資訊28at.com

1.1 Flink數據傳輸

  • 組件之間的通信消息傳輸,即Client、JobManager、TaskManager之間的信息傳遞,采用Akka框架(主要用作組件間的協同,如心跳檢測、狀態上報、指標統計、作業提交和部署等)。
  • 算子之間的流數據傳輸

本地線程內的流數據傳輸(同一個SubTask中):同一個SubTask內的兩個Operator(屬于同一個OperatorChain)之間的數據傳輸是方法調用,即上游算子處理完數據后,直接調用下游算子的processElement方法。ExL28資訊網——每日最新資訊28at.com

本地線程間的流數據傳輸(同一個TaskManager的不同SubTask中):即同一個TaskManager(JVM進程)中的不同Task(線程,本質上是SubTask)的算子之間的數據傳輸,通過本地內存進行數據傳遞,存在數據序列化和反序列過程。ExL28資訊網——每日最新資訊28at.com

跨網絡的流數據傳輸(不同TaskManager的SubTask中):采用Netty框架,通過Socket傳遞,也存在數據序列化和反序列過程。ExL28資訊網——每日最新資訊28at.com

flink中的重分區算子定義上下游subtask之間數據傳遞的方式,SubTask之間進行數據傳遞模式有兩種,一種是one-to-one(forwarding)模式,另一種是redistributing的模式。ExL28資訊網——每日最新資訊28at.com

1.2 重分區算子數據傳遞的兩種方式

  • One-to-one:數據不需要重新分布,上游SubTask生產的數據與下游SubTask受到的數據完全一致,數據不需要重分區,也就是數據不需要經過IO,比如下圖中source->map的數據傳遞形式就是One-to-One方式。常見的map、fliter、flatMap等算子的SubTask的數據傳遞都是one-to-one的對應關系。類似于spark中的窄依賴。
  • Redistributing:數據需要通過shuffle過程重新分區,需要經過IO,比如上圖中的map->keyBy。創建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的數據傳遞都是Redistributing方式,但它們具體數據傳遞方式是不同的。類似于spark中的寬依賴。

圖片圖片ExL28資訊網——每日最新資訊28at.com

flink中的重分區算子除了keyBy以外,還有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多種算子,它們的分區方式各不相同。需要注意的是,這些算子中除了keyBy能將DataStream轉化為KeyedStream外,其它重分區算子均不會改變Stream的類型。ExL28資訊網——每日最新資訊28at.com

二、分區策略

數據在算子之間流動需要依靠分區策略(分區器),Flink目前內置了以下幾種分區策略和自定義分區策略。已實現的分區策略對應的API為:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

自定義分區策略的API為CustomPartitionerWrapper。ExL28資訊網——每日最新資訊28at.com

各個API的繼承關系如下圖所示:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

ChannelSelector是分區策略的頂層接口,其決定了記錄應該寫入哪個邏輯通道,通道可理解為下游算子的某個實例,或下游并行算子的某個子任務。該接口的定義源碼如下:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

抽象類StreamPartitioner實現了ChannelSelector接口,是一個用于流程序的特殊的ChannelSelector,其中定義了一些通用的分區策略方法。Flink中的所有分區策略(分區器)都繼承了StreamPartitioner類,并且實現了各自獨有的分區規則。ExL28資訊網——每日最新資訊28at.com

三、內置分區策略

3.1 BinaryHashPartitioner

該分區策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一種針對BinaryRowData的哈希分區器。BinaryRowData是RowData的實現,可以顯著減少Java對象的序列化/反序列化。RowData用于表示結構化數據類型,運行時通過Table API或SQL管道傳遞的所有頂級記錄都是RowData的實例。關于BinaryHashPartitioner,我們這里不做過多講解。ExL28資訊網——每日最新資訊28at.com

3.2 BroadcastPartitioner

廣播分區策略將上游數據記錄輸出到下游算子的每個并行實例中,即下游每個分區都會有上游的所有數據。使用DataStream的broadcast()方法即可設置該DataStream向下游發送數據時使用廣播分區策略。ExL28資訊網——每日最新資訊28at.com

來一段代碼演示下:ExL28資訊網——每日最新資訊28at.com

/** * 微信公眾號:老周聊架構 */public class PartitionerTest {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(3);        DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);        //1.分區策略前的操作        //輸出dataStream每個元素及所屬的子任務編號        dataStream.map(new RichMapFunction<Integer, Object>() {            @Override            public Object map(Integer value) throws Exception {                System.out.println(String.format("元素值: %s, 分區策略前,子任務編號: %s", value,                        getRuntimeContext().getIndexOfThisSubtask()));                return value;            }        });        //2.設置分區策略        //設置DataStream向下游發送數據時使用的策略        DataStream<Integer> dataStreamAfter = dataStream.broadcast();        //3.分區策略后的操作        dataStreamAfter.map(new RichMapFunction<Integer, Object>() {            @Override            public Object map(Integer value) throws Exception {                System.out.println(String.format("元素值: %s, 分區策略后,子任務編號: %s", value,                        getRuntimeContext().getIndexOfThisSubtask()));                return value;            }        }).print();        env.execute("PartitionerTest Job");    }}

直接IDEA控制臺輸出:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

從輸出結果可以看出,數據共分為3個分區(編號為0、1、2)。執行分區策略前,每個元素所屬的分區:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

執行分區策略后,每個元素所屬的分區如下:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

對比表發現,廣播分區策略將上游每個元素分別發送到了下游算子的所有分區,這種策略會把數據復制多份,向下游算子的每個分區發送一份。ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

我們把上面的任務提交到Flink,同樣也可以看出前面分區前每個子任務兩條數據,分區后每個子任務六條數據。ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

3.3 ForwardPartitioner

轉發分區策略只將元素轉發給本地運行的下游算子的實例,即將元素發送到與當前算子實例在同一個TaskManager的下游算子實例,而不需要進行網絡傳輸。要求上下游算子并行度一樣,這樣上下游算子可以同屬一個子任務。ExL28資訊網——每日最新資訊28at.com

這里把上面的代碼調整下:ExL28資訊網——每日最新資訊28at.com

dataStream.forward()ExL28資訊網——每日最新資訊28at.com

IDEA控制臺輸出:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

從輸出結果可以看出,數據共分為3個分區(編號為0、1、2)。執行分區策略前,每個元素所屬的分區:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

執行分區策略后,每個元素所屬的分區如下:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

對比發現,轉發分區策略將上游同一個分區的元素發送到了下游同一個分區中。使用數據流圖表示如下圖:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

在上下游的算子沒有指定分區策略的情況下,如果上下游的算子并行度一致,則默認使用ForwardPartitioner,否則使用RebalancePartitioner。在StreamGraph類的源碼中可以看到該規則:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

ExL28資訊網——每日最新資訊28at.com

對于ForwardPartitioner,必須保證上下游算子并行度一致,否則會拋出異常。ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

3.4 GlobalPartitioner

全局分區策略將上游所有元素發送到下游子任務編號等于0的分區算子實例上(下游第一個實例)。ExL28資訊網——每日最新資訊28at.com

這里把上面的代碼調整下:ExL28資訊網——每日最新資訊28at.com

dataStream.global()

IDEA控制臺輸出:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

分區前:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

分區后:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

全局分區策略將上游所有分區中的所有元素發送到了下游編號為0的分區中:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

3.5 .KeyGroupStreamPartitioner

Key分區策略根據元素Key的Hash值輸出到下游算子指定的實例。keyBy()算子底層正是使用的該分區策略,底層最終會調用KeyGroupStreamPartitioner的selectChannel()方法,計算每個Key對應的通道索引(通道編號,可理解為分區編號),根據通道索引將Key發送到下游相應的分區中。selectChannel()方法源碼如下:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

總的來說,Flink底層計算通道索引(分區編號)的流程如下:ExL28資訊網——每日最新資訊28at.com

  • 計算Key的HashCode值。
  • 將Key的HashCode值進行特殊的Hash處理,即MathUtils.murmurHash(keyHash),返回一個非負哈希碼。
  • 將非負哈希碼除以最大并行度取余數,得到keyGroupId,即Key組索引。
  • 使用公式keyGroupId×parallelism/maxParallelism得到分區編號。parallelism為當前算子的并行度,即通道數量;maxParallelism為系統默認支持的最大并行度,即128。

3.6 RebalancePartitioner

平衡分區策略使用循環遍歷下游分區的方式,將上游元素均勻分配給下游算子的每個實例。每個下游算子的實例都具有相等的負載。當數據流中的元素存在數據傾斜時,使用該策略對性能有很大的提升。ExL28資訊網——每日最新資訊28at.com

這里把上面的代碼調整下:ExL28資訊網——每日最新資訊28at.com

dataStream.setParallelism(2);ExL28資訊網——每日最新資訊28at.com

dataStreamAfter.setParallelism(3);ExL28資訊網——每日最新資訊28at.com

dataStream.rebalance()ExL28資訊網——每日最新資訊28at.com

IDEA控制臺輸出:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

分區前:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

分區后:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

平衡分區策略將上游所有元素均勻發送到了下游算子的所有分區:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

3.7 RescalePartitioner

重新調節分區策略基于上下游算子的并行度,將元素以循環的方式輸出到下游算子的每個實例。類似于平衡分區策略,但又與平衡分區策略不同。ExL28資訊網——每日最新資訊28at.com

上游算子將元素發送到下游哪一個算子實例,取決于上游和下游算子的并行度。例如,如果上游算子的并行度為2,而下游算子的并行度為4,那么一個上游算子實例將把元素均勻分配給兩個下游算子實例,而另一個上游算子實例將把元素均勻分配給另外兩個下游算子實例。相反,如果下游算子的并行度為2,而上游算子的并行度為4,那么兩個上游算子實例將分配給一個下游算子實例,而另外兩個上游算子實例將分配給另一個下游算子實例。ExL28資訊網——每日最新資訊28at.com

假設上游算子并行度為2,分區編號為A和B,下游算子并行度為4,分區編號為1、2、3、4,那么A將把數據循環發送給1和2,B則把數據循環發送給3和4。假設上游算子并行度為4,編號為A、B、C、D,下游算子并行度為2,編號為1、2,那么A和B把數據發送給1,C和D則把數據發送給2。ExL28資訊網——每日最新資訊28at.com

這里把上面的代碼調整下:ExL28資訊網——每日最新資訊28at.com

dataStream.rescale()

同時將第一個map算子的并行度設置為2,第二個map算子的并行度設置為4。ExL28資訊網——每日最新資訊28at.com

IDEA控制臺輸出:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

分區前:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

分區后:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

接下來改變map算子的并行度,將第一個map算子的并行度設置為4,第二個map算子的并行度設置為2。ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

如果想將元素均勻地輸出到下游算子的每個實例,以實現負載均衡,同時又不希望使用平衡分區策略的全局負載均衡,則可以使用重新調節分區策略。該策略會盡可能避免數據在網絡間傳輸,而能否避免還取決于TaskManager的Task Slot數量、上下游算子的并行度等。ExL28資訊網——每日最新資訊28at.com

3.8 ShufflePartitioner

隨機分區策略將上游算子元素輸出到下游算子的隨機實例中。元素會被均勻分配到下游算子的每個實例。這種策略可以實現計算任務的負載均衡。ExL28資訊網——每日最新資訊28at.com

這里把上面的代碼調整下:ExL28資訊網——每日最新資訊28at.com

dataStream.shuffle()ExL28資訊網——每日最新資訊28at.com

這里就不做過多演示了。我們下面來看下自定義分區策略。ExL28資訊網——每日最新資訊28at.com

四、自定義分區策略

自定義分區策略的API為CustomPartitionerWrapper。該策略允許開發者自定義規則將上游算子元素發送到下游指定的算子實例中。ExL28資訊網——每日最新資訊28at.com

4.1 新建自定義分區器

新建分區器類MyCustomPartitioner并實現接口Partitioner(Object表示分區Key的數據類型),實現其中未實現的方法partition(),在該方法中添加相應的分區邏輯。ExL28資訊網——每日最新資訊28at.com

/** * 自定義分區策略 * 微信公眾號:老周聊架構 */public class MyCustomPartitioner implements Partitioner {    @Override    public int partition(Object key, int numPartitions) {        if (key.equals("chinese")) {            return 0;        } else if (key.equals("math")) {            return 1;        } else {            return 2;        }    }}

上述代碼通過partition()方法取得分區編號,將Key值等于chinese的元素分配到編號為0的分區,將Key值等于math的元素分配到編號為1的分區,其余元素分配到編號為2的分區。ExL28資訊網——每日最新資訊28at.com

4.2 使用自定義分區器

調用DataStream的partitionCustom()方法傳入自定義分區器類MyCustomPartitioner的實例,可以對DataStream按照自定義規則進行重新分區,代碼如下:ExL28資訊網——每日最新資訊28at.com

/** * 自定義分區策略 * 微信公眾號:老周聊架構 */public class CustomPartitionerTest {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(3);        DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");        //1.分區策略前的操作        //輸出dataStream每個元素及所屬的子任務編號        SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =                dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {                    @Override                    public Map<String, Integer> map(String value) throws Exception {                        System.out.println(String.format("元素值: %s, 分區策略前,子任務編號: %s", value,                                getRuntimeContext().getIndexOfThisSubtask()));                        Map<String, Integer> map = new HashMap<>();                        map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));                        return map;                    }                }).setParallelism(2);        //2.設置分區策略        //設置DataStream向下游發送數據時使用的策略        DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);        //3.分區策略后的操作        dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {            @Override            public Map<String, Integer> map(Map<String, Integer> value) throws Exception {                System.out.println(String.format("元素值: %s, 分區策略后,子任務編號: %s", value,                        getRuntimeContext().getIndexOfThisSubtask()));                return value;            }        }).setParallelism(3).print();        env.execute("CustomPartitionerTest Job");    }}

分區前:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

分區后:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

自定義分區策略將上游所有元素按照自定義的規則發送到了下游的3個分區中。ExL28資訊網——每日最新資訊28at.com

把任務給到Flink上去跑,發現:ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

這是因為泛型擦除,下面的DataStream泛型需要指定類型,不能ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

小知識:ExL28資訊網——每日最新資訊28at.com

在編譯之后程序會采取去泛型化的措施。也就是說Java中的泛型,只在編譯階段有效。在編譯過程中,正確檢驗泛型結果后,在運行時會將泛型的相關信息擦除,編譯器只會在對象進入JVM和離開JVM的邊界處添加類型檢查和轉換的方法,泛型的信息不會進入到運行時階段,這就是所謂的Java類型擦除。ExL28資訊網——每日最新資訊28at.com

類型加好以后,再跑一下任務,會出現任務成功。ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com

ExL28資訊網——每日最新資訊28at.com

圖片圖片ExL28資訊網——每日最新資訊28at.com


ExL28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-75362-0.html聊聊Flink:Flink的分區機制

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 掌握 Python 棧,輕松實現進制轉換

下一篇: .NET中Enum的應用你知道多少,它的作用和優點是什么?

標簽:
  • 熱門焦點
  • MIX Fold3包裝盒泄露 新機本月登場

    小米的全新折疊屏旗艦MIX Fold3將于本月發布,近日該機的真機包裝盒在網上泄露。從圖上來看,新的MIX Fold3包裝盒在外觀設計方面延續了之前的方案,變化不大,這也是目前小米旗艦
  • K60至尊版剛預熱 一加Ace2 Pro正面硬剛

    Redmi這邊剛如火如荼的宣傳了K60 Ultra的各種技術和硬件配置,作為競品的一加也坐不住了。一加中國區總裁李杰發布了兩條微博,表示在自家的一加Ace2上早就已經采用了和PixelWo
  • CSS單標簽實現轉轉logo

    轉轉品牌升級后更新了全新的Logo,今天我們用純CSS來實現轉轉的新Logo,為了有一定的挑戰性,這里我們只使用一個標簽實現,將最大化的使用CSS能力完成Logo的繪制與動畫效果。新logo
  • 從零到英雄:高并發與性能優化的神奇之旅

    作者 | 波哥審校 | 重樓作為公司的架構師或者程序員,你是否曾經為公司的系統在面對高并發和性能瓶頸時感到手足無措或者焦頭爛額呢?筆者在出道那會為此是吃盡了苦頭的,不過也得
  • 2天漲粉255萬,又一賽道在抖音爆火

    來源:運營研究社作者 | 張知白編輯 | 楊佩汶設計 | 晏談夢潔這個暑期,旅游賽道徹底火了:有的「地方」火了&mdash;&mdash;貴州村超旅游收入 1 個月超過 12 億;有的「博主」火了&m
  • 微博大門常打開,迎接海外畫師漂洋東渡

    作者:互聯網那些事&ldquo;起猛了,我能看得懂日語了&rdquo;。&ldquo;為什么日本人說話我能聽懂?&rdquo;&ldquo;中文不像中文,日語不像日語,但是我竟然看懂了&rdquo;&hellip;&hell
  • 疑似小米14外觀設計圖曝光:后置相機模組變化不大

    下半年的大幕已經開啟,而誰將成為下半年手機圈的主角就成為了大家關注的焦點,其中被傳有望拿下新一代驍龍8 Gen3旗艦芯片的小米14系列更是備受大家矚
  • 質感不錯!OPPO K11渲染圖曝光:旗艦IMX890傳感器首次下放

    一直以來,OPPO K系列機型都保持著較為均衡的產品體驗,歷來都是2K價位的明星機型,去年推出的OPPO K10和OPPO K10 Pro兩款機型憑借各自的出色配置,堪稱有
  • 榮耀Magicbook V 14 2021曙光藍版本正式開售,擁有觸摸屏

    榮耀 Magicbook V 14 2021 曙光藍版本正式開售,搭載 i7-11390H 處理器與 MX450 顯卡,配備 16GB 內存與 512GB SSD,重 1.48kg,厚 14.5mm,具有 1.5mm 鍵盤鍵程、
Top 主站蜘蛛池模板: 满洲里市| 界首市| 罗平县| 韶关市| 万盛区| 达日县| 察雅县| 河津市| 毕节市| 乳山市| 西吉县| 霍邱县| 荆门市| 泊头市| 南召县| 临海市| 绥宁县| 朝阳市| 郧西县| 汉阴县| 平谷区| 兴和县| 浮山县| 曲阜市| 德庆县| 昂仁县| 开江县| 万源市| 康平县| 辽阳市| 扶绥县| 新建县| 桦南县| 富裕县| 长白| 蒙山县| 西丰县| 宜都市| 禄劝| 美姑县| 沧源|