目录
1. 查看数据同步模板
我自己在下面的模板文件中添加了一些说明注释
[root@bigdata001 datax]# bin/datax.py -r mysqlreader -w hdfswriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the mysqlreader document:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
Please refer to the hdfswriter document:
https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [], # 可以填写["*"]表示同步所有列。还支持["1", "2.3", "true", "'bazhen.csy'", "null", "upper('a')"], 分别表示整形、浮点数、布尔值、字符串、空指针,表达式
"connection": [
{
"jdbcUrl": [], # 支持多个连接地址。会依次进行连接测试,选择一个可用的进行查询数据
"table": [] # 支持同步多个表。多个表必须schema相同
}
],
"password": "",
"username": "",
"where": "",
"splitPk": "", # 一般是主键,只支持整形字段。先计算min(splitPk)、max(splitPk),再进行范围分区划分,将job划分成多个task。不指定则只有一个task
"querySql": ["select id, name from person where id < 10;"] # 这个是我自己添加的。有了querySql会自动忽略column、table、where
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [], # 必须和reader的列数量对应
"compress": "", # 默认不填写,表示不压缩。text文件支持gzip、bzip2; orc文件支持NONE、SNAPPY
"defaultFS": "",
"fieldDelimiter": "",
"fileName": "",
"fileType": "", # 目前仅支持text和orc。其中orc需指定compress为SNAPPY
"path": "", # 该路径必须存在
"writeMode": "" # append:表示新建一个文件插入数据;nonConflict:有fileName为前缀的文件直接报错
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
[root@bigdata001 datax]#
2. 高可用HA的HDFS配置
配置参考如下:
"defaultFS": "hdfs://192.168.8.111:9000",
"hadoopConfig": {
"dfs.nameservices": "nnha",
"dfs.ha.namenodes.nnha": "nn1,nn2,nn3",
"dfs.namenode.rpc-address.nnha.nn1": "192.168.8.111:9870",
"dfs.namenode.rpc-address.nnha.nn2": "192.168.8.112:9870",
"dfs.namenode.rpc-address.nnha.nn3": "192.168.8.113:9870",
"dfs.client.failover.proxy.provider.nnha": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"fieldDelimiter": "|"
3. MysqlReader针对Mysql类型转换说明
DataX内部类型Mysql数据类型Longint, tinyint, smallint, mediumint, int, bigintDoublefloat, double, decimalStringvarchar, char, tinytext, text, mediumtext, longtext, yearDatedate, datetime, timestamp, timeBooleanbit, boolBytestinyblob, mediumblob, blob, longblob, varbinary
4. HdfsWriter支持大部分Hive类型
DataX内部类型HIVE数据类型LongTINYINT,SMALLINT,INT,BIGINTDoubleFLOAT,DOUBLEStringSTRING,VARCHAR,CHARBooleanBOOLEANDateDATE,TIMESTAMP
5. Mysql准备数据如下
mysql> create table person(
-> id bigint,
-> name varchar(64)
-> );
Query OK, 0 rows affected (0.06 sec)
mysql>
mysql> insert into person(id, name) values(1, 'yi'), (2, 'er');
Query OK, 2 rows affected (0.02 sec)
Records: 2 Duplicates: 0 Warnings: 0
mysql>
mysql> select * from person;
+------+------+
| id | name |
+------+------+
| 1 | yi |
| 2 | er |
+------+------+
2 rows in set (0.00 sec)
mysql>
6. 新建job/mysql2hdfs.json
内容如下:
[root@bigdata001 datax]# cat job/mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id", "name"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://192.168.8.115:3306/test"],
"table": ["person"]
}
],
"password": "Root_123",
"username": "root",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}],
"compress": "",
"defaultFS": "hdfs://192.168.8.111:9000",
"fieldDelimiter": "|",
"fileName": "person.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
[root@bigdata001 datax]#
7. 执行job
会先写入临时文件,如果成功,则将临时文件rename,再删除临时文件;如果失败,直接删除临时文件
[root@bigdata001 datax]# bin/datax.py job/mysql2hdfs.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
......省略部分......
2022-06-14 10:16:33.551 [0-0-0-writer] INFO HdfsWriter$Task - begin do write...
2022-06-14 10:16:33.551 [0-0-0-writer] INFO HdfsWriter$Task - write to file : [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d]
......省略部分......
2022-06-14 10:16:43.512 [job-0] INFO HdfsWriter$Job - start rename file [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d] to file [hdfs://192.168.8.111:9000/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d].
......省略部分......
2022-06-14 10:16:43.915 [job-0] INFO HdfsWriter$Job - start delete tmp dir [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f] .
......省略部分......
2022-06-14 10:16:44.034 [job-0] INFO JobContainer -
任务启动时刻 : 2022-06-14 10:16:31
任务结束时刻 : 2022-06-14 10:16:44
任务总计耗时 : 12s
任务平均流量 : 0B/s
记录写入速度 : 0rec/s
读出记录总数 : 2
读写失败总数 : 0
[root@bigdata001 datax]#
8. 查看hdfs的文件
会在该文件名后添加随机的后缀,作为每个线程写入的实际文件名
[root@bigdata001 ~]# hadoop fs -cat /person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d
1|yi
2|er
[root@bigdata001 ~]#
版权归原作者 Bulut0907 所有, 如有侵权,请联系我们删除。