概述

​ DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

官网 文档

安装

前提

需要配置python2 的环境, 通过python2运行datax

window

参考 安装

下载地址

http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

解压

搜索行输入windows powerShel

cd 文件的存储位置

1
cd  D:\soft\dev_soft\datax

输入tar -zxvf 需要解压的文件名称

1
tar -zxvf  datax.tar.gz

验证安装是否成功

1
2
3
cd D:\soft\dev_soft\datax\bin

python datax.py -r streamreader -w streamwriter

案例

window mysql数据库数据同步

环境

  • jdk1.8
  • mysql8.0

依赖

因为网络等问题,会导致依赖下载失败,将采取下面的做法:

下载的压缩文件解压,在lib目录下将这两个依赖安装到本地

datax-core-0.0.1-RELEASE.jar: 地址

datax-common-0.0.1-RELEASE.jar:地址

进入到两个jar所在的文件夹,将这个两个依赖安装到本地maven仓库

1
2
3
4
5
cd D:\soft\dev_soft\repository

mvn install:install-file -DgroupId=com.alibaba.datax -DartifactId=datax-core -Dversion=0.0.1-RELEASE -Dpackaging=jar -Dfile=datax-core-0.0.1-RELEASE.jar

mvn install:install-file -DgroupId=com.alibaba.datax -DartifactId=datax-common -Dversion=0.0.1-RELEASE -Dpackaging=jar -Dfile=datax-common-0.0.1-RELEASE.jar

进入 C:\Users\27477\.m2\repository\com\alibabadatax文件夹的依赖 复制到我们的本地仓库中 D:_soft

项目导入依赖

1
2
3
4
5
6
7
8
9
10
11
<!--datax-->
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>0.0.1-RELEASE</version>
</dependency>

其他需要的依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>

在resource目录下新建一个datax目录,在datax目录下新建test.json文件。

test.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
"job": {
"setting": {
"speed": {
"channel": 4
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"],
"querySql": ["select * from test_1"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"writeMode": "insert",
"column": ["id","name","age"],
"connection": [
{
"table": [
"test_0"
],
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"
}
]
}
}
}
]
}
}

datax工具类

java程序以命令方式启动json资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
public class DataxUtil {
// 获取项目类路径下的json资源的方法
private static String getCurrentClasspath(){
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String currentClasspath = classLoader.getResource("").getPath();
// 当前操作系统
String osName = System.getProperty("os.name");
if (osName.startsWith("Win")) {
// 删除path中最前面的/
currentClasspath = currentClasspath.substring(1, currentClasspath.length()-1);
}
return currentClasspath;
}


public static void main(String[] args) {
// datax的安装路径,如linux为:/opt/datax
System.setProperty("datax.home","D:/soft/dev_soft/datax");
System.out.println(getCurrentClasspath());
String[] datxArgs2 = {"-job", getCurrentClasspath()+"/datax/hg_dr_farm.json", "-mode", "standalone", "-jobid", "-1"};
try {
Engine.entry(datxArgs2);
} catch (Throwable e) {
e.printStackTrace();
}
}
}

test_1

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE `test_1`  (
`id` int NOT NULL,
`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`age` int NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of test_1
-- ----------------------------
INSERT INTO `test_1` VALUES (1, 'hh', 12);
INSERT INTO `test_1` VALUES (2, 'cc', 123);

test_2

1
2
3
4
5
6
7
DROP TABLE IF EXISTS `test_0`;
CREATE TABLE `test_0` (
`id` int NOT NULL,
`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`age` int NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

日志

1
2
3
4
5
6
7
8
......
任务启动时刻 : 2023-06-09 02:12:30
任务结束时刻 : 2023-06-09 02:12:40
任务总计耗时 : 10s
任务平均流量 : 1B/s
记录写入速度 : 0rec/s
读出记录总数 : 2
读写失败总数 : 0