今天給大家分享一個阿里開源的數據同步工具DataX,在Github擁有14.8k的star,非常受歡迎,地址:https://github.com/alibaba/DataX
DataX 是阿里云 DataWorks數據集成 的開源版本,使用Java 語言編寫,在阿里巴巴集團內被廣泛使用的離線數據同步工具/平臺。DataX 實現了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各種異構數據源之間高效的數據同步功能。
圖片
圖片
圖片
DataX作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
DataX 開源版本支持單機多線程模式完成同步作業運行,如下圖
圖片
舉例來說,用戶提交了一個DataX作業,并且配置了20個并發,目的是將一個100張表的mysql數據同步到odps里面。DataX的調度決策是:
點擊datax 下載,下載后解壓至本地某個目錄,如下圖
圖片
這里為了方便演示,我們同步MySQL的user_info表至MySQL的ods_test_mysql_user_info_m,同步條件為更新時間字段,如下
在實際工作中你可以選擇不同類型的數據源測試
drop table ods_test_mysql_user_info_mCREATE TABLE `user_info` ( `id` int NOT NULL COMMENT 'ID', `name` varchar(50) NOT NULL COMMENT '名稱', `sex` tinyint NOT NULL COMMENT '性別 1男 2女', `phone` varchar(11) COMMENT '手機', `address` varchar(1000) COMMENT '地址', `age` int COMMENT '年齡', `create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '創建時間', `update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改時間', PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用戶信息表';CREATE TABLE `ods_test_mysql_user_info_m` ( `id` int NOT NULL COMMENT 'ID', `name` varchar(50) NOT NULL COMMENT '名稱', `sex` tinyint NOT NULL COMMENT '性別 1男 2女', `phone` varchar(11) COMMENT '手機', `address` varchar(1000) COMMENT '地址', `age` int COMMENT '年齡', `create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '創建時間', `update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改時間', PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用戶信息數倉表';
在user_info表中插入數據如下
圖片
在 datax 的 script 目錄,創建ods_test_mysql_user_info_m.json文件,配置如下,mysqlreader表示讀取端,mysqlwriter表示寫入端
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": ["id","name","sex","phone","address","age","create_time","update_time"], "splitPk": "id", "connection": [ { "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"], "table": ["user_info"] } ], "password": "root", "username": "root", "where": "update_time > '${updateTime}' " } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "replace", "column": ["id","name","sex","phone","address","age","create_time","update_time"], "connection": [ { "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false", "table": ["ods_test_mysql_user_info_m"] } ], "username": "root", "password": "root", "preSql": [], "session": [ "set session sql_mode='ANSI'" ] } } } ], "setting": { "speed": { "channel": "5" } } }}
為了更貼合實際,寫一個調度腳本sync.sh支持動態參數來執行任務
#!/bin/bash## 執行示例 sh /Users/weizhao.dong/Documents/soft/datax/datax-script/call.sh /Users/weizhao.dong/Documents/soft/datax/datax-script/dwd_g2park_inout_report_s.json 1jsnotallow=$1echo '執行腳本:'$jsonScriptinterval=$2echo "時間間隔(分鐘):"$intervalnow_time=$(date '+%Y-%m-%d %H:%M:%S')echo "當前時間:"$now_timeupdate_time=$(date -v -${interval}M '+%Y-%m-%d %H:%M:%S')#linux 更新時間獲取#update_time=$(date -d "${now_time} $interval minute ago" +"%Y-%m-%d %H:%M:%S")echo "更新時間:"$update_time#執行python3 /Users/weizhao.dong/Documents/soft/datax/bin/datax.py $jsonScript -p "-DupdateTime='${update_time}'"
假設我們要執以上ods_test_mysql_user_info_m.json腳本,并且同步十分鐘之前的數據,如下
./sync.sh ods_test_mysql_user_info_m.json 10
圖片
執行./sync.sh ods_test_mysql_user_info_m.json 10進行同步
圖片
圖片
以上結果可能有些人有疑問,就三條數據執行時間為 10s,其實這個 10s主要是初始化時間,耗時過長,同步的數據量多了優勢就體現出來了,以下為實際生產同步數據結果,可以看到同步63102條耗時22s
以上我們只是通過一個簡單的示例來演示了dataX如何使用,如果只是一次性同步,沒問題,但是如果是周期性進行同步,有以下幾種方式推薦
這種方式是最簡單的,可以使用操作系統中的crontab定時調度,通過crontab -e編輯corn 任務,添加對應腳本即可
在種方式在大數據領域用的比較多,典型場景就是 mysql 同步到數倉,海豚調度器內置了 datax 并且提供了圖形化配置界面,配置起來非常方便
圖片
圖片
同時每次執行都有記錄,并且都有對應的日志
圖片
定時調度框架都支持調度 shell 腳本,通過傳入對應參數也可執行
本文鏈接:http://www.www897cc.com/showinfo-26-72430-0.html什么是數據同步利器DataX,如何使用?
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: 【踩坑指南】線程池使用不當的五個坑