日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

SpringBoot整合Flink CDC,實時追蹤數據變動,無縫同步至Redis

來源: 責編: 時間:2024-04-09 17:23:02 193觀看
導讀環境:SpringBoot2.7.16 + Flink 1.19.0 + JDK211. 簡介Flink CDC(Flink Change Data Capture)是基于數據庫的日志CDC技術,實現了全增量一體化讀取的數據集成框架。它搭配Flink計算框架,能夠高效實現海量數據的實時集成。Fl

環境:SpringBoot2.7.16 + Flink 1.19.0 + JDK21rCs28資訊網——每日最新資訊28at.com

1. 簡介

Flink CDC(Flink Change Data Capture)是基于數據庫的日志CDC技術,實現了全增量一體化讀取的數據集成框架。它搭配Flink計算框架,能夠高效實現海量數據的實時集成。Flink CDC的核心功能在于實時地監視數據庫或數據流中發生的數據變動,并將這些變動抽取出來,以便進一步的處理和分析。通過使用Flink CDC,用戶可以輕松地構建實時數據管道,對數據變動進行實時響應和處理,為實時分析、實時報表和實時決策等場景提供強大的支持。rCs28資訊網——每日最新資訊28at.com

具體來說,Flink CDC的應用場景包括但不限于實時數據倉庫更新、實時數據同步和遷移、實時數據處理等。它還可以確保數據一致性,并在數據發生變更時能夠準確地捕獲和處理。此外,Flink CDC支持與多種數據源進行集成,如MySQL、PostgreSQL、Oracle等,并提供了相應的連接器,方便數據的捕獲和處理。rCs28資訊網——每日最新資訊28at.com

接下來將詳細的介紹關于MySQL CDC的使用。MySQL CDC 連接器允許從 MySQL 數據庫讀取快照數據和增量數據。rCs28資訊網——每日最新資訊28at.com

支持的數據庫rCs28資訊網——每日最新資訊28at.com

ConnectorrCs28資訊網——每日最新資訊28at.com

DatabaserCs28資訊網——每日最新資訊28at.com

DriverrCs28資訊網——每日最新資訊28at.com

mysql-cdcrCs28資訊網——每日最新資訊28at.com

  • MySQL:5.6,5.7,8.0.x
  • RDS MYSQL: 5.6,5.7,8.0.x
  • PolarDB MySQL: 5.6,5.7,8.0.x
  • Aurora MySQL 5.6,5.7,8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1

JDBC Driver 8.0.27rCs28資訊網——每日最新資訊28at.com

2. 實戰案例

2.1 MySQL開啟Binlog

在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的/my.ini),需要在[mysqld]部分設置相關參數以開啟binlog功能,如下:rCs28資訊網——每日最新資訊28at.com

[mysqld]server-id=1# 格式,行級格式binlog-format=Row# binlog 日志文件的前綴log-bin=mysql-bin# 指定哪些數據庫需要記錄二進制日志binlog_do_db=testjpa

除了開啟binlog功能外,Flink CDC還需要其他配置和權限來確保能夠正常連接到MySQL并讀取數據。例如,需要授予Flink CDC連接MySQL的用戶必要的權限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。這些權限是Flink CDC讀取數據和元數據所必需的。rCs28資訊網——每日最新資訊28at.com

查看是否開啟了binlog功能

mysql> SHOW VARIABLES LIKE 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin       | ON    |+---------------+-------+

以上就對mysql相關的配置完成了。rCs28資訊網——每日最新資訊28at.com

2.2 依賴管理

<properties>  <flink.version>1.19.0</flink.version></properties><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-base</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>com.ververica</groupId>  <artifactId>flink-sql-connector-mysql-cdc</artifactId>  <version>3.0.1</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-streaming-java</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-clients</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-table-runtime</artifactId>  <version>${flink.version}</version></dependency>

2.3 代碼實現

@Componentpublic class MonitorMySQLCDC implements InitializingBean {  // 該隊列專門用來臨時保存變化的數據(實際生產環境,你應該使用MQ相關的產品)  public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ;    private final StringRedisTemplate stringRedisTemplate ;  // 保存到redis中key的前綴  private final String PREFIX = "users:" ;  // 數據發生變化后的sink處理  private final CustomSink customSink ;  public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {    this.customSink = customSink ;    this.stringRedisTemplate = stringRedisTemplate ;  }    @Override  public void afterPropertiesSet() throws Exception {    // 啟動異步線程,實時處理隊列中的數據    new Thread(() -> {      while(true) {        try {          Map<String, Object> result = queue.take();          this.doAction(result) ;        } catch (Exception e) {          e.printStackTrace();        }      }    }).start() ;    Properties jdbcProperties = new Properties() ;    jdbcProperties.setProperty("useSSL", "false") ;    MySqlSource<String> source = MySqlSource.<String>builder()        .hostname("127.0.0.1")        .port(3306)        // 可配置多個數據庫        .databaseList("testjpa")        // 可配置多個表        .tableList("testjpa.users")        .username("root")        .password("123123")        .jdbcProperties(jdbcProperties)        // 包括schema的改變        .includeSchemaChanges(true)        // 反序列化設置        // .deserializer(new StringDebeziumDeserializationSchema())        .deserializer(new JsonDebeziumDeserializationSchema(true))        // 啟動模式;關于啟動模式下面詳細介紹        .startupOptions(StartupOptions.initial())        .build() ;    // 環境配置    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;    // 設置 6s 的 checkpoint 間隔    env.enableCheckpointing(6000) ;    // 設置 source 節點的并行度為 4    env.setParallelism(4) ;    env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")        // 添加Sink        .addSink(this.customSink) ;    env.execute() ;  }    @SuppressWarnings("unchecked")  private void doAction(Map<String, Object> result) throws Exception {    Map<String, Object> payload = (Map<String, Object>) result.get("payload") ;    String op = (String) payload.get("op") ;    switch (op) {      // 更新和插入操作      case "u", "c" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("after") ;        String id = after.get("id").toString();        System.out.printf("操作:%s, ID: %s%n", op, id) ;        stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;      }      // 刪除操作      case "d" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("before") ;        String id = after.get("id").toString();        stringRedisTemplate.delete(PREFIX + id) ;      }     }  }  }

啟動模式:rCs28資訊網——每日最新資訊28at.com

  • initial (默認):在第一次啟動時對受監視的數據庫表執行初始快照,并繼續讀取最新的 binlog。
  • earliest-offset:跳過快照階段,從可讀取的最早 binlog 位點開始讀取
  • latest-offset:首次啟動時,從不對受監視的數據庫表執行快照, 連接器僅從 binlog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之后的數據更改。
  • specific-offset:跳過快照階段,從指定的 binlog 位點開始讀取。位點可通過 binlog 文件名和位置指定,或者在 GTID 在集群上啟用時通過 GTID 集合指定。
  • timestamp:跳過快照階段,從指定的時間戳開始讀取 binlog 事件。

數據處理Sink

@Componentpublic class CustomSink extends RichSinkFunction<String> {  private ObjectMapper mapper = new ObjectMapper();  @Override  public void invoke(String value, Context context) throws Exception {    System.out.printf("數據發生變化: %s%n", value);    TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {    };    Map<String, Object> result = mapper.readValue(value, valueType);    Map<String, Object> payload = (Map<String, Object>) result.get("payload");    String op = (String) payload.get("op") ;    // 不對讀操作處理    if (!"r".equals(op)) {      MonitorMySQLCDC.queue.put(result);    }  }}

以上就是實現通過FlinkCDC實時通過數據到Redis的所有代碼。rCs28資訊網——每日最新資訊28at.com

2.4 Web監控頁面

引入flink web依賴rCs28資訊網——每日最新資訊28at.com

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-runtime-web</artifactId>  <version>${flink.version}</version></dependency>

環境配置rCs28資訊網——每日最新資訊28at.com

Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;

web監聽9090端口。rCs28資訊網——每日最新資訊28at.com

圖片圖片rCs28資訊網——每日最新資訊28at.com

通過web控制臺你可以管理查看到更多的信息。rCs28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-82367-0.htmlSpringBoot整合Flink CDC,實時追蹤數據變動,無縫同步至Redis

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 圖解 CSS Grid 布局,一起來看看 CSS Grid 布局是如何使用的

下一篇: 架構見解:使用Instagram示例設計高效的多層緩存

標簽:
  • 熱門焦點
  • 紅魔電競平板評測:大屏幕硬實力

    前言:三年的疫情因為要上網課的原因激活了平板市場,如今網課的時代已經過去,大家的生活都恢復到了正軌,這也就意味著,真正考驗平板電腦生存的環境來了。也就是面對著這種殘酷的
  • 5月iOS設備性能榜:M1 M2依舊是榜單前五

    和上個月一樣,沒有新品發布的iOS設備性能榜的上榜設備并沒有什么更替,僅僅只有跑分變化而產生的排名變動,剛剛開始的蘋果WWDC2023,推出的產品也依舊是新款Mac Pro、新款Mac Stu
  • 5月安卓手機好評榜:魅族20 Pro奪冠

    性能榜和性價比榜之后,我們來看最后的安卓手機好評榜,數據來源安兔兔評測,收集時間2023年5月1日至5月31日,僅限國內市場。第一名:魅族20 Pro好評率:97.50%不得不感慨魅族老品牌還
  • 轎車從天而降電動車主被撞身亡 超速搶道所致:現場視頻讓網友吵翻

    近日,上海青浦區法院判決轎車從天而降電動車主被撞身亡案,轎車車主被判有期徒刑一年。案件顯示當時男子駕駛轎車在上海某路段行駛,前車忽然轉彎提速超車,
  • 線程通訊的三種方法!通俗易懂

    線程通信是指多個線程之間通過某種機制進行協調和交互,例如,線程等待和通知機制就是線程通訊的主要手段之一。 在 Java 中,線程等待和通知的實現手段有以下幾種方式:Object 類下
  • 一文看懂為蘋果Vision Pro開發應用程序

    譯者 | 布加迪審校 | 重樓蘋果的Vision Pro是一款混合現實(MR)頭戴設備。Vision Pro結合了虛擬現實(VR)和增強現實(AR)的沉浸感。其高分辨率顯示屏、先進的傳感器和強大的處理能力
  • 共享單車的故事講到哪了?

    來源丨海克財經與共享充電寶相差不多,共享單車已很久沒有被國內熱點新聞關照到了。除了一再漲價和用戶直呼用不起了。近日多家媒體再發報道稱,成都、天津、鄭州等地多個共享單
  • 郭明錤稱華為和江淮汽車合作開發問界MPV,定價100萬左右、計劃明年量產

    8 月 1 日消息,郭明錤今天在 Medium 平臺發布博文,稱華為正在和江淮汽車合作,開發售價在 100 萬元的問界 MPV,預計在 2024 年第 2 季度量產,銷量目標為
  • iQOO 11S新品發布會

    iQOO將在7月4日19:00舉行新品發布會,推出杭州亞運會電競賽事官方用機iQOO 11S。
Top 主站蜘蛛池模板: 永泰县| 保康县| 保山市| 大余县| 南宁市| 阳朔县| 孝昌县| 铁力市| 马山县| 易门县| 西藏| 茌平县| 苗栗县| 友谊县| 马山县| 兰考县| 德令哈市| 靖安县| 绵竹市| 且末县| 康马县| 奇台县| 丹寨县| 永昌县| 伊宁县| 县级市| 漳州市| 石城县| 鹤岗市| 富平县| 桂林市| 东辽县| 山阴县| 区。| 中阳县| 长武县| 沂源县| 云浮市| 德钦县| 朔州市| 朝阳县|