概述
DataX
是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
官网 文档
安装
前提
需要配置python2 的环境, 通过python2运行datax
window
参考 安装
下载地址
http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

解压
搜索行输入windows powerShel

cd 文件的存储位置
1
| cd D:\soft\dev_soft\datax
|
输入tar -zxvf 需要解压的文件名称
验证安装是否成功
1 2 3
| cd D:\soft\dev_soft\datax\bin
python datax.py -r streamreader -w streamwriter
|

案例
window mysql数据库数据同步
环境
依赖
因为网络等问题,会导致依赖下载失败,将采取下面的做法:
下载的压缩文件解压,在lib目录下将这两个依赖安装到本地
datax-core-0.0.1-RELEASE.jar: 地址
datax-common-0.0.1-RELEASE.jar:地址
进入到两个jar所在的文件夹,将这个两个依赖安装到本地maven仓库
1 2 3 4 5
| cd D:\soft\dev_soft\repository
mvn install:install-file -DgroupId=com.alibaba.datax -DartifactId=datax-core -Dversion=0.0.1-RELEASE -Dpackaging=jar -Dfile=datax-core-0.0.1-RELEASE.jar
mvn install:install-file -DgroupId=com.alibaba.datax -DartifactId=datax-common -Dversion=0.0.1-RELEASE -Dpackaging=jar -Dfile=datax-common-0.0.1-RELEASE.jar
|


进入 C:\Users\27477\.m2\repository\com\alibaba
将datax文件夹
的依赖 复制到我们的本地仓库中 D:_soft
项目导入依赖
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>com.alibaba.datax</groupId> <artifactId>datax-core</artifactId> <version>0.0.1-RELEASE</version> </dependency> <dependency> <groupId>com.alibaba.datax</groupId> <artifactId>datax-common</artifactId> <version>0.0.1-RELEASE</version> </dependency>
|
其他需要的依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.13</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.60</version> </dependency>
|
在resource目录下新建一个datax目录,在datax目录下新建test.json文件。

test.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| { "job": { "setting": { "speed": { "channel": 4 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "123456", "connection": [ { "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"], "querySql": ["select * from test_1"] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "123456", "writeMode": "insert", "column": ["id","name","age"], "connection": [ { "table": [ "test_0" ], "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai" } ] } } } ] } }
|
datax工具类
java程序以命令方式启动json资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Slf4j public class DataxUtil { private static String getCurrentClasspath(){ ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); String currentClasspath = classLoader.getResource("").getPath(); String osName = System.getProperty("os.name"); if (osName.startsWith("Win")) { currentClasspath = currentClasspath.substring(1, currentClasspath.length()-1); } return currentClasspath; }
public static void main(String[] args) { System.setProperty("datax.home","D:/soft/dev_soft/datax"); System.out.println(getCurrentClasspath()); String[] datxArgs2 = {"-job", getCurrentClasspath()+"/datax/hg_dr_farm.json", "-mode", "standalone", "-jobid", "-1"}; try { Engine.entry(datxArgs2); } catch (Throwable e) { e.printStackTrace(); } } }
|
表
test_1
1 2 3 4 5 6 7 8 9 10 11 12
| CREATE TABLE `test_1` ( `id` int NOT NULL, `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, `age` int NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `test_1` VALUES (1, 'hh', 12); INSERT INTO `test_1` VALUES (2, 'cc', 123);
|
test_2
1 2 3 4 5 6 7
| DROP TABLE IF EXISTS `test_0`; CREATE TABLE `test_0` ( `id` int NOT NULL, `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, `age` int NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
|
日志
1 2 3 4 5 6 7 8
| ...... 任务启动时刻 : 2023-06-09 02:12:30 任务结束时刻 : 2023-06-09 02:12:40 任务总计耗时 : 10s 任务平均流量 : 1B/s 记录写入速度 : 0rec/s 读出记录总数 : 2 读写失败总数 : 0
|