日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁(yè) > 科技  > 軟件

聊聊Flink:Flink的分區(qū)機(jī)制

來(lái)源: 責(zé)編: 時(shí)間:2024-02-29 14:43:53 188觀看
導(dǎo)讀一、前言flink任務(wù)在執(zhí)行過(guò)程中,一個(gè)流(stream)包含一個(gè)或多個(gè)分區(qū)(Stream partition)。TaskManager中的一個(gè)slot的subtask就是一個(gè)stream partition(流分區(qū)),一個(gè)Job的流(stream)分布在多個(gè)不同的Slot上執(zhí)行。每一個(gè)算子可以包

一、前言

flink任務(wù)在執(zhí)行過(guò)程中,一個(gè)流(stream)包含一個(gè)或多個(gè)分區(qū)(Stream partition)。TaskManager中的一個(gè)slot的subtask就是一個(gè)stream partition(流分區(qū)),一個(gè)Job的流(stream)分布在多個(gè)不同的Slot上執(zhí)行。每一個(gè)算子可以包含一個(gè)或多個(gè)子任務(wù)(subtask),這些subtask執(zhí)行在不同的分區(qū)中,本質(zhì)是在不同的線程、不同的物理機(jī)或不同的容器中彼此互不依賴地執(zhí)行。Vc428資訊網(wǎng)——每日最新資訊28at.com

1.1 Flink數(shù)據(jù)傳輸

  • 組件之間的通信消息傳輸,即Client、JobManager、TaskManager之間的信息傳遞,采用Akka框架(主要用作組件間的協(xié)同,如心跳檢測(cè)、狀態(tài)上報(bào)、指標(biāo)統(tǒng)計(jì)、作業(yè)提交和部署等)。
  • 算子之間的流數(shù)據(jù)傳輸

本地線程內(nèi)的流數(shù)據(jù)傳輸(同一個(gè)SubTask中):同一個(gè)SubTask內(nèi)的兩個(gè)Operator(屬于同一個(gè)OperatorChain)之間的數(shù)據(jù)傳輸是方法調(diào)用,即上游算子處理完數(shù)據(jù)后,直接調(diào)用下游算子的processElement方法。Vc428資訊網(wǎng)——每日最新資訊28at.com

本地線程間的流數(shù)據(jù)傳輸(同一個(gè)TaskManager的不同SubTask中):即同一個(gè)TaskManager(JVM進(jìn)程)中的不同Task(線程,本質(zhì)上是SubTask)的算子之間的數(shù)據(jù)傳輸,通過(guò)本地內(nèi)存進(jìn)行數(shù)據(jù)傳遞,存在數(shù)據(jù)序列化和反序列過(guò)程。Vc428資訊網(wǎng)——每日最新資訊28at.com

跨網(wǎng)絡(luò)的流數(shù)據(jù)傳輸(不同TaskManager的SubTask中):采用Netty框架,通過(guò)Socket傳遞,也存在數(shù)據(jù)序列化和反序列過(guò)程。Vc428資訊網(wǎng)——每日最新資訊28at.com

flink中的重分區(qū)算子定義上下游subtask之間數(shù)據(jù)傳遞的方式,SubTask之間進(jìn)行數(shù)據(jù)傳遞模式有兩種,一種是one-to-one(forwarding)模式,另一種是redistributing的模式。Vc428資訊網(wǎng)——每日最新資訊28at.com

1.2 重分區(qū)算子數(shù)據(jù)傳遞的兩種方式

  • One-to-one:數(shù)據(jù)不需要重新分布,上游SubTask生產(chǎn)的數(shù)據(jù)與下游SubTask受到的數(shù)據(jù)完全一致,數(shù)據(jù)不需要重分區(qū),也就是數(shù)據(jù)不需要經(jīng)過(guò)IO,比如下圖中source->map的數(shù)據(jù)傳遞形式就是One-to-One方式。常見(jiàn)的map、fliter、flatMap等算子的SubTask的數(shù)據(jù)傳遞都是one-to-one的對(duì)應(yīng)關(guān)系。類似于spark中的窄依賴。
  • Redistributing:數(shù)據(jù)需要通過(guò)shuffle過(guò)程重新分區(qū),需要經(jīng)過(guò)IO,比如上圖中的map->keyBy。創(chuàng)建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的數(shù)據(jù)傳遞都是Redistributing方式,但它們具體數(shù)據(jù)傳遞方式是不同的。類似于spark中的寬依賴。

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

flink中的重分區(qū)算子除了keyBy以外,還有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多種算子,它們的分區(qū)方式各不相同。需要注意的是,這些算子中除了keyBy能將DataStream轉(zhuǎn)化為KeyedStream外,其它重分區(qū)算子均不會(huì)改變Stream的類型。Vc428資訊網(wǎng)——每日最新資訊28at.com

二、分區(qū)策略

數(shù)據(jù)在算子之間流動(dòng)需要依靠分區(qū)策略(分區(qū)器),F(xiàn)link目前內(nèi)置了以下幾種分區(qū)策略和自定義分區(qū)策略。已實(shí)現(xiàn)的分區(qū)策略對(duì)應(yīng)的API為:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

自定義分區(qū)策略的API為CustomPartitionerWrapper。Vc428資訊網(wǎng)——每日最新資訊28at.com

各個(gè)API的繼承關(guān)系如下圖所示:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

ChannelSelector是分區(qū)策略的頂層接口,其決定了記錄應(yīng)該寫(xiě)入哪個(gè)邏輯通道,通道可理解為下游算子的某個(gè)實(shí)例,或下游并行算子的某個(gè)子任務(wù)。該接口的定義源碼如下:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

抽象類StreamPartitioner實(shí)現(xiàn)了ChannelSelector接口,是一個(gè)用于流程序的特殊的ChannelSelector,其中定義了一些通用的分區(qū)策略方法。Flink中的所有分區(qū)策略(分區(qū)器)都繼承了StreamPartitioner類,并且實(shí)現(xiàn)了各自獨(dú)有的分區(qū)規(guī)則。Vc428資訊網(wǎng)——每日最新資訊28at.com

三、內(nèi)置分區(qū)策略

3.1 BinaryHashPartitioner

該分區(qū)策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一種針對(duì)BinaryRowData的哈希分區(qū)器。BinaryRowData是RowData的實(shí)現(xiàn),可以顯著減少Java對(duì)象的序列化/反序列化。RowData用于表示結(jié)構(gòu)化數(shù)據(jù)類型,運(yùn)行時(shí)通過(guò)Table API或SQL管道傳遞的所有頂級(jí)記錄都是RowData的實(shí)例。關(guān)于BinaryHashPartitioner,我們這里不做過(guò)多講解。Vc428資訊網(wǎng)——每日最新資訊28at.com

3.2 BroadcastPartitioner

廣播分區(qū)策略將上游數(shù)據(jù)記錄輸出到下游算子的每個(gè)并行實(shí)例中,即下游每個(gè)分區(qū)都會(huì)有上游的所有數(shù)據(jù)。使用DataStream的broadcast()方法即可設(shè)置該DataStream向下游發(fā)送數(shù)據(jù)時(shí)使用廣播分區(qū)策略。Vc428資訊網(wǎng)——每日最新資訊28at.com

來(lái)一段代碼演示下:Vc428資訊網(wǎng)——每日最新資訊28at.com

/** * 微信公眾號(hào):老周聊架構(gòu) */public class PartitionerTest {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(3);        DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);        //1.分區(qū)策略前的操作        //輸出dataStream每個(gè)元素及所屬的子任務(wù)編號(hào)        dataStream.map(new RichMapFunction<Integer, Object>() {            @Override            public Object map(Integer value) throws Exception {                System.out.println(String.format("元素值: %s, 分區(qū)策略前,子任務(wù)編號(hào): %s", value,                        getRuntimeContext().getIndexOfThisSubtask()));                return value;            }        });        //2.設(shè)置分區(qū)策略        //設(shè)置DataStream向下游發(fā)送數(shù)據(jù)時(shí)使用的策略        DataStream<Integer> dataStreamAfter = dataStream.broadcast();        //3.分區(qū)策略后的操作        dataStreamAfter.map(new RichMapFunction<Integer, Object>() {            @Override            public Object map(Integer value) throws Exception {                System.out.println(String.format("元素值: %s, 分區(qū)策略后,子任務(wù)編號(hào): %s", value,                        getRuntimeContext().getIndexOfThisSubtask()));                return value;            }        }).print();        env.execute("PartitionerTest Job");    }}

直接IDEA控制臺(tái)輸出:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

從輸出結(jié)果可以看出,數(shù)據(jù)共分為3個(gè)分區(qū)(編號(hào)為0、1、2)。執(zhí)行分區(qū)策略前,每個(gè)元素所屬的分區(qū):Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

執(zhí)行分區(qū)策略后,每個(gè)元素所屬的分區(qū)如下:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

對(duì)比表發(fā)現(xiàn),廣播分區(qū)策略將上游每個(gè)元素分別發(fā)送到了下游算子的所有分區(qū),這種策略會(huì)把數(shù)據(jù)復(fù)制多份,向下游算子的每個(gè)分區(qū)發(fā)送一份。Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

我們把上面的任務(wù)提交到Flink,同樣也可以看出前面分區(qū)前每個(gè)子任務(wù)兩條數(shù)據(jù),分區(qū)后每個(gè)子任務(wù)六條數(shù)據(jù)。Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

3.3 ForwardPartitioner

轉(zhuǎn)發(fā)分區(qū)策略只將元素轉(zhuǎn)發(fā)給本地運(yùn)行的下游算子的實(shí)例,即將元素發(fā)送到與當(dāng)前算子實(shí)例在同一個(gè)TaskManager的下游算子實(shí)例,而不需要進(jìn)行網(wǎng)絡(luò)傳輸。要求上下游算子并行度一樣,這樣上下游算子可以同屬一個(gè)子任務(wù)。Vc428資訊網(wǎng)——每日最新資訊28at.com

這里把上面的代碼調(diào)整下:Vc428資訊網(wǎng)——每日最新資訊28at.com

dataStream.forward()Vc428資訊網(wǎng)——每日最新資訊28at.com

IDEA控制臺(tái)輸出:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

從輸出結(jié)果可以看出,數(shù)據(jù)共分為3個(gè)分區(qū)(編號(hào)為0、1、2)。執(zhí)行分區(qū)策略前,每個(gè)元素所屬的分區(qū):Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

執(zhí)行分區(qū)策略后,每個(gè)元素所屬的分區(qū)如下:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

對(duì)比發(fā)現(xiàn),轉(zhuǎn)發(fā)分區(qū)策略將上游同一個(gè)分區(qū)的元素發(fā)送到了下游同一個(gè)分區(qū)中。使用數(shù)據(jù)流圖表示如下圖:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

在上下游的算子沒(méi)有指定分區(qū)策略的情況下,如果上下游的算子并行度一致,則默認(rèn)使用ForwardPartitioner,否則使用RebalancePartitioner。在StreamGraph類的源碼中可以看到該規(guī)則:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

Vc428資訊網(wǎng)——每日最新資訊28at.com

對(duì)于ForwardPartitioner,必須保證上下游算子并行度一致,否則會(huì)拋出異常。Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

3.4 GlobalPartitioner

全局分區(qū)策略將上游所有元素發(fā)送到下游子任務(wù)編號(hào)等于0的分區(qū)算子實(shí)例上(下游第一個(gè)實(shí)例)。Vc428資訊網(wǎng)——每日最新資訊28at.com

這里把上面的代碼調(diào)整下:Vc428資訊網(wǎng)——每日最新資訊28at.com

dataStream.global()

IDEA控制臺(tái)輸出:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

分區(qū)前:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

分區(qū)后:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

全局分區(qū)策略將上游所有分區(qū)中的所有元素發(fā)送到了下游編號(hào)為0的分區(qū)中:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

3.5 .KeyGroupStreamPartitioner

Key分區(qū)策略根據(jù)元素Key的Hash值輸出到下游算子指定的實(shí)例。keyBy()算子底層正是使用的該分區(qū)策略,底層最終會(huì)調(diào)用KeyGroupStreamPartitioner的selectChannel()方法,計(jì)算每個(gè)Key對(duì)應(yīng)的通道索引(通道編號(hào),可理解為分區(qū)編號(hào)),根據(jù)通道索引將Key發(fā)送到下游相應(yīng)的分區(qū)中。selectChannel()方法源碼如下:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

總的來(lái)說(shuō),F(xiàn)link底層計(jì)算通道索引(分區(qū)編號(hào))的流程如下:Vc428資訊網(wǎng)——每日最新資訊28at.com

  • 計(jì)算Key的HashCode值。
  • 將Key的HashCode值進(jìn)行特殊的Hash處理,即MathUtils.murmurHash(keyHash),返回一個(gè)非負(fù)哈希碼。
  • 將非負(fù)哈希碼除以最大并行度取余數(shù),得到keyGroupId,即Key組索引。
  • 使用公式keyGroupId×parallelism/maxParallelism得到分區(qū)編號(hào)。parallelism為當(dāng)前算子的并行度,即通道數(shù)量;maxParallelism為系統(tǒng)默認(rèn)支持的最大并行度,即128。

3.6 RebalancePartitioner

平衡分區(qū)策略使用循環(huán)遍歷下游分區(qū)的方式,將上游元素均勻分配給下游算子的每個(gè)實(shí)例。每個(gè)下游算子的實(shí)例都具有相等的負(fù)載。當(dāng)數(shù)據(jù)流中的元素存在數(shù)據(jù)傾斜時(shí),使用該策略對(duì)性能有很大的提升。Vc428資訊網(wǎng)——每日最新資訊28at.com

這里把上面的代碼調(diào)整下:Vc428資訊網(wǎng)——每日最新資訊28at.com

dataStream.setParallelism(2);Vc428資訊網(wǎng)——每日最新資訊28at.com

dataStreamAfter.setParallelism(3);Vc428資訊網(wǎng)——每日最新資訊28at.com

dataStream.rebalance()Vc428資訊網(wǎng)——每日最新資訊28at.com

IDEA控制臺(tái)輸出:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

分區(qū)前:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

分區(qū)后:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

平衡分區(qū)策略將上游所有元素均勻發(fā)送到了下游算子的所有分區(qū):Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

3.7 RescalePartitioner

重新調(diào)節(jié)分區(qū)策略基于上下游算子的并行度,將元素以循環(huán)的方式輸出到下游算子的每個(gè)實(shí)例。類似于平衡分區(qū)策略,但又與平衡分區(qū)策略不同。Vc428資訊網(wǎng)——每日最新資訊28at.com

上游算子將元素發(fā)送到下游哪一個(gè)算子實(shí)例,取決于上游和下游算子的并行度。例如,如果上游算子的并行度為2,而下游算子的并行度為4,那么一個(gè)上游算子實(shí)例將把元素均勻分配給兩個(gè)下游算子實(shí)例,而另一個(gè)上游算子實(shí)例將把元素均勻分配給另外兩個(gè)下游算子實(shí)例。相反,如果下游算子的并行度為2,而上游算子的并行度為4,那么兩個(gè)上游算子實(shí)例將分配給一個(gè)下游算子實(shí)例,而另外兩個(gè)上游算子實(shí)例將分配給另一個(gè)下游算子實(shí)例。Vc428資訊網(wǎng)——每日最新資訊28at.com

假設(shè)上游算子并行度為2,分區(qū)編號(hào)為A和B,下游算子并行度為4,分區(qū)編號(hào)為1、2、3、4,那么A將把數(shù)據(jù)循環(huán)發(fā)送給1和2,B則把數(shù)據(jù)循環(huán)發(fā)送給3和4。假設(shè)上游算子并行度為4,編號(hào)為A、B、C、D,下游算子并行度為2,編號(hào)為1、2,那么A和B把數(shù)據(jù)發(fā)送給1,C和D則把數(shù)據(jù)發(fā)送給2。Vc428資訊網(wǎng)——每日最新資訊28at.com

這里把上面的代碼調(diào)整下:Vc428資訊網(wǎng)——每日最新資訊28at.com

dataStream.rescale()

同時(shí)將第一個(gè)map算子的并行度設(shè)置為2,第二個(gè)map算子的并行度設(shè)置為4。Vc428資訊網(wǎng)——每日最新資訊28at.com

IDEA控制臺(tái)輸出:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

分區(qū)前:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

分區(qū)后:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

接下來(lái)改變map算子的并行度,將第一個(gè)map算子的并行度設(shè)置為4,第二個(gè)map算子的并行度設(shè)置為2。Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

如果想將元素均勻地輸出到下游算子的每個(gè)實(shí)例,以實(shí)現(xiàn)負(fù)載均衡,同時(shí)又不希望使用平衡分區(qū)策略的全局負(fù)載均衡,則可以使用重新調(diào)節(jié)分區(qū)策略。該策略會(huì)盡可能避免數(shù)據(jù)在網(wǎng)絡(luò)間傳輸,而能否避免還取決于TaskManager的Task Slot數(shù)量、上下游算子的并行度等。Vc428資訊網(wǎng)——每日最新資訊28at.com

3.8 ShufflePartitioner

隨機(jī)分區(qū)策略將上游算子元素輸出到下游算子的隨機(jī)實(shí)例中。元素會(huì)被均勻分配到下游算子的每個(gè)實(shí)例。這種策略可以實(shí)現(xiàn)計(jì)算任務(wù)的負(fù)載均衡。Vc428資訊網(wǎng)——每日最新資訊28at.com

這里把上面的代碼調(diào)整下:Vc428資訊網(wǎng)——每日最新資訊28at.com

dataStream.shuffle()Vc428資訊網(wǎng)——每日最新資訊28at.com

這里就不做過(guò)多演示了。我們下面來(lái)看下自定義分區(qū)策略。Vc428資訊網(wǎng)——每日最新資訊28at.com

四、自定義分區(qū)策略

自定義分區(qū)策略的API為CustomPartitionerWrapper。該策略允許開(kāi)發(fā)者自定義規(guī)則將上游算子元素發(fā)送到下游指定的算子實(shí)例中。Vc428資訊網(wǎng)——每日最新資訊28at.com

4.1 新建自定義分區(qū)器

新建分區(qū)器類MyCustomPartitioner并實(shí)現(xiàn)接口Partitioner(Object表示分區(qū)Key的數(shù)據(jù)類型),實(shí)現(xiàn)其中未實(shí)現(xiàn)的方法partition(),在該方法中添加相應(yīng)的分區(qū)邏輯。Vc428資訊網(wǎng)——每日最新資訊28at.com

/** * 自定義分區(qū)策略 * 微信公眾號(hào):老周聊架構(gòu) */public class MyCustomPartitioner implements Partitioner {    @Override    public int partition(Object key, int numPartitions) {        if (key.equals("chinese")) {            return 0;        } else if (key.equals("math")) {            return 1;        } else {            return 2;        }    }}

上述代碼通過(guò)partition()方法取得分區(qū)編號(hào),將Key值等于chinese的元素分配到編號(hào)為0的分區(qū),將Key值等于math的元素分配到編號(hào)為1的分區(qū),其余元素分配到編號(hào)為2的分區(qū)。Vc428資訊網(wǎng)——每日最新資訊28at.com

4.2 使用自定義分區(qū)器

調(diào)用DataStream的partitionCustom()方法傳入自定義分區(qū)器類MyCustomPartitioner的實(shí)例,可以對(duì)DataStream按照自定義規(guī)則進(jìn)行重新分區(qū),代碼如下:Vc428資訊網(wǎng)——每日最新資訊28at.com

/** * 自定義分區(qū)策略 * 微信公眾號(hào):老周聊架構(gòu) */public class CustomPartitionerTest {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(3);        DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");        //1.分區(qū)策略前的操作        //輸出dataStream每個(gè)元素及所屬的子任務(wù)編號(hào)        SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =                dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {                    @Override                    public Map<String, Integer> map(String value) throws Exception {                        System.out.println(String.format("元素值: %s, 分區(qū)策略前,子任務(wù)編號(hào): %s", value,                                getRuntimeContext().getIndexOfThisSubtask()));                        Map<String, Integer> map = new HashMap<>();                        map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));                        return map;                    }                }).setParallelism(2);        //2.設(shè)置分區(qū)策略        //設(shè)置DataStream向下游發(fā)送數(shù)據(jù)時(shí)使用的策略        DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);        //3.分區(qū)策略后的操作        dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {            @Override            public Map<String, Integer> map(Map<String, Integer> value) throws Exception {                System.out.println(String.format("元素值: %s, 分區(qū)策略后,子任務(wù)編號(hào): %s", value,                        getRuntimeContext().getIndexOfThisSubtask()));                return value;            }        }).setParallelism(3).print();        env.execute("CustomPartitionerTest Job");    }}

分區(qū)前:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

分區(qū)后:Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

自定義分區(qū)策略將上游所有元素按照自定義的規(guī)則發(fā)送到了下游的3個(gè)分區(qū)中。Vc428資訊網(wǎng)——每日最新資訊28at.com

把任務(wù)給到Flink上去跑,發(fā)現(xiàn):Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

這是因?yàn)榉盒筒脸旅娴腄ataStream泛型需要指定類型,不能Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

小知識(shí):Vc428資訊網(wǎng)——每日最新資訊28at.com

在編譯之后程序會(huì)采取去泛型化的措施。也就是說(shuō)Java中的泛型,只在編譯階段有效。在編譯過(guò)程中,正確檢驗(yàn)泛型結(jié)果后,在運(yùn)行時(shí)會(huì)將泛型的相關(guān)信息擦除,編譯器只會(huì)在對(duì)象進(jìn)入JVM和離開(kāi)JVM的邊界處添加類型檢查和轉(zhuǎn)換的方法,泛型的信息不會(huì)進(jìn)入到運(yùn)行時(shí)階段,這就是所謂的Java類型擦除。Vc428資訊網(wǎng)——每日最新資訊28at.com

類型加好以后,再跑一下任務(wù),會(huì)出現(xiàn)任務(wù)成功。Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com

Vc428資訊網(wǎng)——每日最新資訊28at.com

圖片圖片Vc428資訊網(wǎng)——每日最新資訊28at.com


Vc428資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-75362-0.html聊聊Flink:Flink的分區(qū)機(jī)制

聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com

上一篇: 掌握 Python 棧,輕松實(shí)現(xiàn)進(jìn)制轉(zhuǎn)換

下一篇: .NET中Enum的應(yīng)用你知道多少,它的作用和優(yōu)點(diǎn)是什么?

標(biāo)簽:
  • 熱門焦點(diǎn)
  • 一篇聊聊Go錯(cuò)誤封裝機(jī)制

    %w 是用于錯(cuò)誤包裝(Error Wrapping)的格式化動(dòng)詞。它是用于 fmt.Errorf 和 fmt.Sprintf 函數(shù)中的一個(gè)特殊格式化動(dòng)詞,用于將一個(gè)錯(cuò)誤(或其他可打印的值)包裝在一個(gè)新的錯(cuò)誤中。使
  • 得物效率前端微應(yīng)用推進(jìn)過(guò)程與思考

    一、背景效率工程隨著業(yè)務(wù)的發(fā)展,組織規(guī)模的擴(kuò)大,越來(lái)越多的企業(yè)開(kāi)始意識(shí)到協(xié)作效率對(duì)于企業(yè)團(tuán)隊(duì)的重要性,甚至是決定其在某個(gè)行業(yè)競(jìng)爭(zhēng)中突圍的關(guān)鍵,是企業(yè)長(zhǎng)久生存的根本。得物
  • 量化指標(biāo)是與非:挽救被量化指標(biāo)扼殺的技術(shù)團(tuán)隊(duì)

    作者 | 劉新翠整理 | 徐杰承本文整理自快狗打車技術(shù)總監(jiān)劉新翠在WOT2023大會(huì)上的主題分享,更多精彩內(nèi)容及現(xiàn)場(chǎng)PPT,請(qǐng)關(guān)注51CTO技術(shù)棧公眾號(hào),發(fā)消息【W(wǎng)OT2023PPT】即可直接領(lǐng)取
  • 一文掌握 Golang 模糊測(cè)試(Fuzz Testing)

    模糊測(cè)試(Fuzz Testing)模糊測(cè)試(Fuzz Testing)是通過(guò)向目標(biāo)系統(tǒng)提供非預(yù)期的輸入并監(jiān)視異常結(jié)果來(lái)發(fā)現(xiàn)軟件漏洞的方法。可以用來(lái)發(fā)現(xiàn)應(yīng)用程序、操作系統(tǒng)和網(wǎng)絡(luò)協(xié)議等中的漏洞或
  • 使用AIGC工具提升安全工作效率

    在日常工作中,安全人員可能會(huì)涉及各種各樣的安全任務(wù),包括但不限于:開(kāi)發(fā)某些安全工具的插件,滿足自己特定的安全需求;自定義github搜索工具,快速查找所需的安全資料、漏洞poc、exp
  • 造車兩年股價(jià)跌六成,小米的估值邏輯變了嗎?

    如果從小米官宣造車后的首個(gè)交易日起持有小米集團(tuán)的股票,那么截至2023年上半年最后一個(gè)交易日,投資者將浮虧59.16%,同區(qū)間的恒生科技指數(shù)跌幅為52.78%
  • 網(wǎng)傳小米汽車開(kāi)始篩選交付中心 建筑面積不低于3000平方米

    7月7日消息,近日有微博網(wǎng)友@長(zhǎng)三角行健者爆料稱,據(jù)經(jīng)銷商集團(tuán)反饋,小米汽車目前已經(jīng)開(kāi)始了交付中心的篩選工作,要求候選場(chǎng)地至少有120個(gè)車位,建筑不能低
  • 三星折疊屏手機(jī)去年銷售近1000萬(wàn)臺(tái) 今年目標(biāo)定為1500萬(wàn)

    7月29日消息,三星率先發(fā)力可折疊手機(jī)市場(chǎng),在全球市場(chǎng)已經(jīng)取得了非常亮眼的成績(jī),接下來(lái)會(huì)進(jìn)一步鞏固和擴(kuò)大這一優(yōu)勢(shì)。三星在推出Galaxy Z Flip5和Galax
  • 滴滴違法違規(guī)被罰80.26億 共存在16項(xiàng)違法事實(shí)

    滴滴違法違規(guī)被罰80.26億 存在16項(xiàng)違法事實(shí)開(kāi)始于2121年7月,歷經(jīng)一年時(shí)間,網(wǎng)絡(luò)安全審查辦公室對(duì)“滴滴出行”網(wǎng)絡(luò)安全審查終于有了一個(gè)暫時(shí)的結(jié)束。據(jù)“網(wǎng)信
Top 主站蜘蛛池模板: 宁乡县| 当阳市| 汝城县| 龙口市| 易门县| 新疆| 杨浦区| 湖南省| 扎兰屯市| 常熟市| 论坛| 乃东县| 上犹县| 太原市| 淮阳县| 盘锦市| 罗田县| 灵石县| 彭山县| 密云县| 淮阳县| 德兴市| 高州市| 秭归县| 双牌县| 临汾市| 临猗县| 西安市| 苏尼特右旗| 休宁县| 阜新市| 汉阴县| 恩平市| 通榆县| 五家渠市| 本溪| 桑植县| 新建县| 石楼县| 临颍县| 天镇县|