日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

什么是數據同步利器DataX,如何使用?

來源: 責編: 時間:2024-02-04 09:01:10 225觀看
導讀今天給大家分享一個阿里開源的數據同步工具DataX,在Github擁有14.8k的star,非常受歡迎,地址:https://github.com/alibaba/DataX什么是 Datax?DataX 是阿里云 DataWorks數據集成 的開源版本,使用Java 語言編寫,在阿里巴巴集

今天給大家分享一個阿里開源的數據同步工具DataX,在Github擁有14.8k的star,非常受歡迎,地址:https://github.com/alibaba/DataXGIK28資訊網——每日最新資訊28at.com

什么是 Datax?

DataX 是阿里云 DataWorks數據集成 的開源版本,使用Java 語言編寫,在阿里巴巴集團內被廣泛使用的離線數據同步工具/平臺。DataX 實現了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各種異構數據源之間高效的數據同步功能。GIK28資訊網——每日最新資訊28at.com

圖片圖片GIK28資訊網——每日最新資訊28at.com

應用場景有那些?

  1. 數據倉庫同步:DataX 可以幫助將數據從一個數據倉庫(如關系型數據庫、大數據存儲系統等)同步到另一個數據倉庫,實現數據的遷移、備份或復制。
  2. 數據庫遷移:當我們需要將數據從一個數據庫平臺遷移到另一個數據庫平臺時,DataX 可以幫助完成數據的轉移和轉換工作
  3. 數據集成與同步:DataX 可以用作數據集成工具,用于將多個數據源的數據進行整合和同步。它支持多種數據源,包括關系型數據庫、NoSQL 數據庫、文件系統等,可以將這些數據源的數據整合到一個目標數據源中。
  4. 數據清洗與轉換:DataX 提供了豐富的數據轉換能力,可以對數據進行清洗、過濾、映射、格式轉換等操作。這對于數據倉庫、數據湖和數據集市等數據存儲和分析平臺非常有用,可以幫助提高數據質量和一致性。
  5. 數據備份與恢復:DataX 可以用于定期備份和恢復數據。通過配置定時任務,可以將數據從源端備份到目標端,并在需要時進行數據恢復。

DataX支持那些數據源?

圖片圖片GIK28資訊網——每日最新資訊28at.com

架構設計

圖片圖片GIK28資訊網——每日最新資訊28at.com

DataX作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。GIK28資訊網——每日最新資訊28at.com

  • Reader:Reader為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
  • Writer:Writer為數據寫入模塊,負責不斷向Framework取數據,并將數據寫入到目的端。
  • Framework:Framework用于連接reader和writer,作為兩者的數據傳輸通道,并處理緩沖,流控,并發,數據轉換等核心技術問題。

DataX 開源版本支持單機多線程模式完成同步作業運行,如下圖GIK28資訊網——每日最新資訊28at.com

圖片圖片GIK28資訊網——每日最新資訊28at.com

  1. DataX完成單個數據同步的作業,稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
  2. DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便于并發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
  3. 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的并發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的并發運行完畢分配好的所有Task,默認單個任務組的并發數量為5。
  4. 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
  5. DataX作業運行起來之后, Job監控并等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0

DataX調度流程

舉例來說,用戶提交了一個DataX作業,并且配置了20個并發,目的是將一個100張表的mysql數據同步到odps里面。DataX的調度決策是:GIK28資訊網——每日最新資訊28at.com

  1. Job根據分表切分成了100個Task。
  2. 根據20個并發,DataX計算需要分配4個TaskGroup。
  3. 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責5個并發共計運行25個Task。

如何使用 Datax?

點擊datax 下載,下載后解壓至本地某個目錄,如下圖GIK28資訊網——每日最新資訊28at.com

圖片圖片GIK28資訊網——每日最新資訊28at.com

用例說明

這里為了方便演示,我們同步MySQL的user_info表至MySQL的ods_test_mysql_user_info_m,同步條件為更新時間字段,如下GIK28資訊網——每日最新資訊28at.com

在實際工作中你可以選擇不同類型的數據源測試GIK28資訊網——每日最新資訊28at.com

drop table ods_test_mysql_user_info_mCREATE TABLE `user_info` (  `id` int NOT NULL COMMENT 'ID',  `name` varchar(50) NOT NULL COMMENT '名稱',  `sex` tinyint NOT NULL COMMENT '性別 1男 2女',  `phone` varchar(11) COMMENT '手機', `address` varchar(1000)  COMMENT '地址', `age` int  COMMENT '年齡', `create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '創建時間',  `update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改時間',  PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用戶信息表';CREATE TABLE `ods_test_mysql_user_info_m` (  `id` int NOT NULL COMMENT 'ID',  `name` varchar(50) NOT NULL COMMENT '名稱',  `sex` tinyint NOT NULL COMMENT '性別 1男 2女',  `phone` varchar(11) COMMENT '手機', `address` varchar(1000)  COMMENT '地址', `age` int  COMMENT '年齡', `create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '創建時間',  `update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改時間',  PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用戶信息數倉表';

在user_info表中插入數據如下GIK28資訊網——每日最新資訊28at.com

圖片圖片GIK28資訊網——每日最新資訊28at.com

創建作業的配置文件(json格式)

在 datax 的 script 目錄,創建ods_test_mysql_user_info_m.json文件,配置如下,mysqlreader表示讀取端,mysqlwriter表示寫入端GIK28資訊網——每日最新資訊28at.com

{    "job": {        "content": [            {                "reader": {                    "name": "mysqlreader",                    "parameter": {                        "column": ["id","name","sex","phone","address","age","create_time","update_time"],                       "splitPk": "id",                        "connection": [                            {                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"],                                "table": ["user_info"]                            }                        ],                        "password": "root",                        "username": "root",                        "where": "update_time > '${updateTime}' "                    }                },                "writer": {                    "name": "mysqlwriter",                    "parameter": {                       "writeMode": "replace",                        "column": ["id","name","sex","phone","address","age","create_time","update_time"],                        "connection": [                            {                                "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false",                                "table": ["ods_test_mysql_user_info_m"]                            }                        ],                        "username": "root",                        "password": "root",                        "preSql": [],                        "session": [                          "set session sql_mode='ANSI'"                        ]                    }                }            }        ],        "setting": {            "speed": {                "channel": "5"            }        }    }}

創建執行腳本

為了更貼合實際,寫一個調度腳本sync.sh支持動態參數來執行任務GIK28資訊網——每日最新資訊28at.com

#!/bin/bash## 執行示例 sh /Users/weizhao.dong/Documents/soft/datax/datax-script/call.sh /Users/weizhao.dong/Documents/soft/datax/datax-script/dwd_g2park_inout_report_s.json 1jsnotallow=$1echo '執行腳本:'$jsonScriptinterval=$2echo "時間間隔(分鐘):"$intervalnow_time=$(date '+%Y-%m-%d %H:%M:%S')echo "當前時間:"$now_timeupdate_time=$(date -v -${interval}M  '+%Y-%m-%d %H:%M:%S')#linux 更新時間獲取#update_time=$(date -d "${now_time} $interval minute ago" +"%Y-%m-%d %H:%M:%S")echo "更新時間:"$update_time#執行python3 /Users/weizhao.dong/Documents/soft/datax/bin/datax.py $jsonScript -p "-DupdateTime='${update_time}'"

假設我們要執以上ods_test_mysql_user_info_m.json腳本,并且同步十分鐘之前的數據,如下GIK28資訊網——每日最新資訊28at.com

./sync.sh ods_test_mysql_user_info_m.json 10

測試

圖片圖片GIK28資訊網——每日最新資訊28at.com

執行./sync.sh ods_test_mysql_user_info_m.json 10進行同步GIK28資訊網——每日最新資訊28at.com

圖片圖片GIK28資訊網——每日最新資訊28at.com

圖片GIK28資訊網——每日最新資訊28at.com

圖片圖片GIK28資訊網——每日最新資訊28at.com

以上結果可能有些人有疑問,就三條數據執行時間為 10s,其實這個 10s主要是初始化時間,耗時過長,同步的數據量多了優勢就體現出來了,以下為實際生產同步數據結果,可以看到同步63102條耗時22sGIK28資訊網——每日最新資訊28at.com

推薦用法

以上我們只是通過一個簡單的示例來演示了dataX如何使用,如果只是一次性同步,沒問題,但是如果是周期性進行同步,有以下幾種方式推薦GIK28資訊網——每日最新資訊28at.com

crontab調度

這種方式是最簡單的,可以使用操作系統中的crontab定時調度,通過crontab -e編輯corn 任務,添加對應腳本即可GIK28資訊網——每日最新資訊28at.com

海豚調度器

在種方式在大數據領域用的比較多,典型場景就是 mysql 同步到數倉,海豚調度器內置了 datax 并且提供了圖形化配置界面,配置起來非常方便GIK28資訊網——每日最新資訊28at.com

圖片圖片GIK28資訊網——每日最新資訊28at.com

圖片圖片GIK28資訊網——每日最新資訊28at.com

同時每次執行都有記錄,并且都有對應的日志GIK28資訊網——每日最新資訊28at.com

圖片圖片GIK28資訊網——每日最新資訊28at.com

定時任務框架(elasticjob/xxl-job)

定時調度框架都支持調度 shell 腳本,通過傳入對應參數也可執行GIK28資訊網——每日最新資訊28at.com

圖片 圖片 GIK28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-72430-0.html什么是數據同步利器DataX,如何使用?

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 【踩坑指南】線程池使用不當的五個坑

下一篇: 如何在PHP中使用 Caddy2 協同服務

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 酒泉市| 德阳市| 民乐县| 铅山县| 泰州市| 太谷县| 青阳县| 三门县| 新竹县| 木兰县| 阳谷县| 和硕县| 楚雄市| 榆林市| 西林县| 百色市| 周口市| 调兵山市| 灵宝市| 子长县| 九龙县| 天峻县| 巴林左旗| 合山市| 定安县| 乡城县| 宜春市| 阿尔山市| 武邑县| 柳江县| 玉田县| 孝感市| 万全县| 安龙县| 宜春市| 黄浦区| 邵东县| 婺源县| 清河县| 青州市| 富宁县|