日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當(dāng)前位置:首頁 > 科技  > 軟件

SpringBoot整合Flink CDC,實時追蹤數(shù)據(jù)變動,無縫同步至Redis

來源: 責(zé)編: 時間:2024-04-09 17:23:02 218觀看
導(dǎo)讀環(huán)境:SpringBoot2.7.16 + Flink 1.19.0 + JDK211. 簡介Flink CDC(Flink Change Data Capture)是基于數(shù)據(jù)庫的日志CDC技術(shù),實現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。它搭配Flink計算框架,能夠高效實現(xiàn)海量數(shù)據(jù)的實時集成。Fl

環(huán)境:SpringBoot2.7.16 + Flink 1.19.0 + JDK215vk28資訊網(wǎng)——每日最新資訊28at.com

1. 簡介

Flink CDC(Flink Change Data Capture)是基于數(shù)據(jù)庫的日志CDC技術(shù),實現(xiàn)了全增量一體化讀取的數(shù)據(jù)集成框架。它搭配Flink計算框架,能夠高效實現(xiàn)海量數(shù)據(jù)的實時集成。Flink CDC的核心功能在于實時地監(jiān)視數(shù)據(jù)庫或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動,并將這些變動抽取出來,以便進一步的處理和分析。通過使用Flink CDC,用戶可以輕松地構(gòu)建實時數(shù)據(jù)管道,對數(shù)據(jù)變動進行實時響應(yīng)和處理,為實時分析、實時報表和實時決策等場景提供強大的支持。5vk28資訊網(wǎng)——每日最新資訊28at.com

具體來說,F(xiàn)link CDC的應(yīng)用場景包括但不限于實時數(shù)據(jù)倉庫更新、實時數(shù)據(jù)同步和遷移、實時數(shù)據(jù)處理等。它還可以確保數(shù)據(jù)一致性,并在數(shù)據(jù)發(fā)生變更時能夠準確地捕獲和處理。此外,F(xiàn)link CDC支持與多種數(shù)據(jù)源進行集成,如MySQL、PostgreSQL、Oracle等,并提供了相應(yīng)的連接器,方便數(shù)據(jù)的捕獲和處理。5vk28資訊網(wǎng)——每日最新資訊28at.com

接下來將詳細的介紹關(guān)于MySQL CDC的使用。MySQL CDC 連接器允許從 MySQL 數(shù)據(jù)庫讀取快照數(shù)據(jù)和增量數(shù)據(jù)。5vk28資訊網(wǎng)——每日最新資訊28at.com

支持的數(shù)據(jù)庫5vk28資訊網(wǎng)——每日最新資訊28at.com

Connector5vk28資訊網(wǎng)——每日最新資訊28at.com

Database5vk28資訊網(wǎng)——每日最新資訊28at.com

Driver5vk28資訊網(wǎng)——每日最新資訊28at.com

mysql-cdc5vk28資訊網(wǎng)——每日最新資訊28at.com

  • MySQL:5.6,5.7,8.0.x
  • RDS MYSQL: 5.6,5.7,8.0.x
  • PolarDB MySQL: 5.6,5.7,8.0.x
  • Aurora MySQL 5.6,5.7,8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1

JDBC Driver 8.0.275vk28資訊網(wǎng)——每日最新資訊28at.com

2. 實戰(zhàn)案例

2.1 MySQL開啟Binlog

在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的/my.ini),需要在[mysqld]部分設(shè)置相關(guān)參數(shù)以開啟binlog功能,如下:5vk28資訊網(wǎng)——每日最新資訊28at.com

[mysqld]server-id=1# 格式,行級格式binlog-format=Row# binlog 日志文件的前綴log-bin=mysql-bin# 指定哪些數(shù)據(jù)庫需要記錄二進制日志binlog_do_db=testjpa

除了開啟binlog功能外,F(xiàn)link CDC還需要其他配置和權(quán)限來確保能夠正常連接到MySQL并讀取數(shù)據(jù)。例如,需要授予Flink CDC連接MySQL的用戶必要的權(quán)限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。這些權(quán)限是Flink CDC讀取數(shù)據(jù)和元數(shù)據(jù)所必需的。5vk28資訊網(wǎng)——每日最新資訊28at.com

查看是否開啟了binlog功能

mysql> SHOW VARIABLES LIKE 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin       | ON    |+---------------+-------+

以上就對mysql相關(guān)的配置完成了。5vk28資訊網(wǎng)——每日最新資訊28at.com

2.2 依賴管理

<properties>  <flink.version>1.19.0</flink.version></properties><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-base</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>com.ververica</groupId>  <artifactId>flink-sql-connector-mysql-cdc</artifactId>  <version>3.0.1</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-streaming-java</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-clients</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-table-runtime</artifactId>  <version>${flink.version}</version></dependency>

2.3 代碼實現(xiàn)

@Componentpublic class MonitorMySQLCDC implements InitializingBean {  // 該隊列專門用來臨時保存變化的數(shù)據(jù)(實際生產(chǎn)環(huán)境,你應(yīng)該使用MQ相關(guān)的產(chǎn)品)  public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ;    private final StringRedisTemplate stringRedisTemplate ;  // 保存到redis中key的前綴  private final String PREFIX = "users:" ;  // 數(shù)據(jù)發(fā)生變化后的sink處理  private final CustomSink customSink ;  public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {    this.customSink = customSink ;    this.stringRedisTemplate = stringRedisTemplate ;  }    @Override  public void afterPropertiesSet() throws Exception {    // 啟動異步線程,實時處理隊列中的數(shù)據(jù)    new Thread(() -> {      while(true) {        try {          Map<String, Object> result = queue.take();          this.doAction(result) ;        } catch (Exception e) {          e.printStackTrace();        }      }    }).start() ;    Properties jdbcProperties = new Properties() ;    jdbcProperties.setProperty("useSSL", "false") ;    MySqlSource<String> source = MySqlSource.<String>builder()        .hostname("127.0.0.1")        .port(3306)        // 可配置多個數(shù)據(jù)庫        .databaseList("testjpa")        // 可配置多個表        .tableList("testjpa.users")        .username("root")        .password("123123")        .jdbcProperties(jdbcProperties)        // 包括schema的改變        .includeSchemaChanges(true)        // 反序列化設(shè)置        // .deserializer(new StringDebeziumDeserializationSchema())        .deserializer(new JsonDebeziumDeserializationSchema(true))        // 啟動模式;關(guān)于啟動模式下面詳細介紹        .startupOptions(StartupOptions.initial())        .build() ;    // 環(huán)境配置    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;    // 設(shè)置 6s 的 checkpoint 間隔    env.enableCheckpointing(6000) ;    // 設(shè)置 source 節(jié)點的并行度為 4    env.setParallelism(4) ;    env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")        // 添加Sink        .addSink(this.customSink) ;    env.execute() ;  }    @SuppressWarnings("unchecked")  private void doAction(Map<String, Object> result) throws Exception {    Map<String, Object> payload = (Map<String, Object>) result.get("payload") ;    String op = (String) payload.get("op") ;    switch (op) {      // 更新和插入操作      case "u", "c" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("after") ;        String id = after.get("id").toString();        System.out.printf("操作:%s, ID: %s%n", op, id) ;        stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;      }      // 刪除操作      case "d" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("before") ;        String id = after.get("id").toString();        stringRedisTemplate.delete(PREFIX + id) ;      }     }  }  }

啟動模式:5vk28資訊網(wǎng)——每日最新資訊28at.com

  • initial (默認):在第一次啟動時對受監(jiān)視的數(shù)據(jù)庫表執(zhí)行初始快照,并繼續(xù)讀取最新的 binlog。
  • earliest-offset:跳過快照階段,從可讀取的最早 binlog 位點開始讀取
  • latest-offset:首次啟動時,從不對受監(jiān)視的數(shù)據(jù)庫表執(zhí)行快照, 連接器僅從 binlog 的結(jié)尾處開始讀取,這意味著連接器只能讀取在連接器啟動之后的數(shù)據(jù)更改。
  • specific-offset:跳過快照階段,從指定的 binlog 位點開始讀取。位點可通過 binlog 文件名和位置指定,或者在 GTID 在集群上啟用時通過 GTID 集合指定。
  • timestamp:跳過快照階段,從指定的時間戳開始讀取 binlog 事件。

數(shù)據(jù)處理Sink

@Componentpublic class CustomSink extends RichSinkFunction<String> {  private ObjectMapper mapper = new ObjectMapper();  @Override  public void invoke(String value, Context context) throws Exception {    System.out.printf("數(shù)據(jù)發(fā)生變化: %s%n", value);    TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {    };    Map<String, Object> result = mapper.readValue(value, valueType);    Map<String, Object> payload = (Map<String, Object>) result.get("payload");    String op = (String) payload.get("op") ;    // 不對讀操作處理    if (!"r".equals(op)) {      MonitorMySQLCDC.queue.put(result);    }  }}

以上就是實現(xiàn)通過FlinkCDC實時通過數(shù)據(jù)到Redis的所有代碼。5vk28資訊網(wǎng)——每日最新資訊28at.com

2.4 Web監(jiān)控頁面

引入flink web依賴5vk28資訊網(wǎng)——每日最新資訊28at.com

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-runtime-web</artifactId>  <version>${flink.version}</version></dependency>

環(huán)境配置5vk28資訊網(wǎng)——每日最新資訊28at.com

Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;

web監(jiān)聽9090端口。5vk28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片5vk28資訊網(wǎng)——每日最新資訊28at.com

通過web控制臺你可以管理查看到更多的信息。5vk28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-82367-0.htmlSpringBoot整合Flink CDC,實時追蹤數(shù)據(jù)變動,無縫同步至Redis

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 圖解 CSS Grid 布局,一起來看看 CSS Grid 布局是如何使用的

下一篇: 架構(gòu)見解:使用Instagram示例設(shè)計高效的多層緩存

標簽:
  • 熱門焦點
  • 消息稱迪士尼要拍真人版《魔發(fā)奇緣》:女主可能也找黑人演員

    8月5日消息,迪士尼確實有點忙,忙著將不少動畫改成真人版,繼《美人魚》后,真人版《白雪公主》、《魔發(fā)奇緣》也在路上了。據(jù)外媒消息稱,迪士尼將打造真人版
  • JavaScript 混淆及反混淆代碼工具

    介紹在我們開始學(xué)習(xí)反混淆之前,我們首先要了解一下代碼混淆。如果不了解代碼是如何混淆的,我們可能無法成功對代碼進行反混淆,尤其是使用自定義混淆器對其進行混淆時。什么是混
  • 服務(wù)存儲設(shè)計模式:Cache-Aside模式

    Cache-Aside模式一種常用的緩存方式,通常是把數(shù)據(jù)從主存儲加載到KV緩存中,加速后續(xù)的訪問。在存在重復(fù)度的場景,Cache-Aside可以提升服務(wù)性能,降低底層存儲的壓力,缺點是緩存和底
  • 一篇文章帶你了解 CSS 屬性選擇器

    屬性選擇器對帶有指定屬性的 HTML 元素設(shè)置樣式。可以為擁有指定屬性的 HTML 元素設(shè)置樣式,而不僅限于 class 和 id 屬性。一、了解屬性選擇器CSS屬性選擇器提供了一種簡單而
  • 一文掌握 Golang 模糊測試(Fuzz Testing)

    模糊測試(Fuzz Testing)模糊測試(Fuzz Testing)是通過向目標系統(tǒng)提供非預(yù)期的輸入并監(jiān)視異常結(jié)果來發(fā)現(xiàn)軟件漏洞的方法。可以用來發(fā)現(xiàn)應(yīng)用程序、操作系統(tǒng)和網(wǎng)絡(luò)協(xié)議等中的漏洞或
  • 一個注解實現(xiàn)接口冪等,這樣才優(yōu)雅!

    場景碼猿慢病云管理系統(tǒng)中其實高并發(fā)的場景不是很多,沒有必要每個接口都去考慮并發(fā)高的場景,比如添加住院患者的這個接口,具體的業(yè)務(wù)代碼就不貼了,業(yè)務(wù)偽代碼如下:圖片上述代碼有
  • 中國家電海外掘金正當(dāng)時|出海專題

    作者|吳南南編輯|胡展嘉運營|陳佳慧出品|零態(tài)LT(ID:LingTai_LT)2023年,出海市場戰(zhàn)況空前,中國創(chuàng)業(yè)者在海外紛紛摩拳擦掌,以期能夠把中國的商業(yè)模式、創(chuàng)業(yè)理念、戰(zhàn)略打法輸出海外,他們依
  • 認真聊聊東方甄選:如何告別低垂的果實

    來源:山核桃作者:財經(jīng)無忌爆火一年后,俞敏洪和他的東方甄選依舊是頗受外界關(guān)心的&ldquo;網(wǎng)紅&rdquo;。7月5日至9日,為期5天的東方甄選&ldquo;甘肅行&rdquo;首次在自有App內(nèi)直播,
  • iQOO Neo8 Pro即將開售:到手價3099元起 安卓性能最強旗艦

    5月23日,iQOO如期舉行了新品發(fā)布會,全新的iQOO Neo8系列也正式與大家見面,包含iQOO Neo8和iQOO Neo8 Pro兩個版本,其中標準版搭載高通驍龍8+,而Pro版更
Top 主站蜘蛛池模板: 长沙市| 濮阳县| 读书| 逊克县| 科技| 铜山县| 大洼县| 长春市| 宁城县| 成都市| 靖江市| 囊谦县| 长宁区| 隆安县| 绥德县| 镶黄旗| 延津县| 南城县| 钟山县| 六枝特区| 塔城市| 调兵山市| 大港区| 杭锦后旗| 娱乐| 保靖县| 盐亭县| 竹溪县| 泰安市| 三河市| 蓝田县| 衡阳县| 读书| 铜梁县| 海门市| 青河县| 乌拉特后旗| 嘉黎县| 苏尼特右旗| 潮安县| 湖州市|