0


数据同步工具DataX从Mysql同步数据到HDFS实战

目录

1. 查看数据同步模板

我自己在下面的模板文件中添加了一些说明注释

[root@bigdata001 datax]# bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [],                                             # 可以填写["*"]表示同步所有列。还支持["1", "2.3", "true", "'bazhen.csy'", "null", "upper('a')"], 分别表示整形、浮点数、布尔值、字符串、空指针,表达式
                        "connection": [
                            {
                                "jdbcUrl": [],                                     # 支持多个连接地址。会依次进行连接测试,选择一个可用的进行查询数据
                                "table": []                                         # 支持同步多个表。多个表必须schema相同
                            }
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": "",
                        "splitPk": "",                                            # 一般是主键,只支持整形字段。先计算min(splitPk)、max(splitPk),再进行范围分区划分,将job划分成多个task。不指定则只有一个task
                        "querySql": ["select id, name from person where id < 10;"]    # 这个是我自己添加的。有了querySql会自动忽略column、table、where
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [],                                            # 必须和reader的列数量对应
                        "compress": "",                                       # 默认不填写,表示不压缩。text文件支持gzip、bzip2; orc文件支持NONE、SNAPPY
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "",                                          # 目前仅支持text和orc。其中orc需指定compress为SNAPPY
                        "path": "",                                                # 该路径必须存在
                        "writeMode": ""                                       # append:表示新建一个文件插入数据;nonConflict:有fileName为前缀的文件直接报错
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
[root@bigdata001 datax]#

2. 高可用HA的HDFS配置

配置参考如下:

                        "defaultFS": "hdfs://192.168.8.111:9000", 
                        "hadoopConfig": {
                            "dfs.nameservices": "nnha",
                            "dfs.ha.namenodes.nnha": "nn1,nn2,nn3",
                            "dfs.namenode.rpc-address.nnha.nn1": "192.168.8.111:9870",
                            "dfs.namenode.rpc-address.nnha.nn2": "192.168.8.112:9870",
                            "dfs.namenode.rpc-address.nnha.nn3": "192.168.8.113:9870",
                            "dfs.client.failover.proxy.provider.nnha": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        },
                        "fieldDelimiter": "|"

3. MysqlReader针对Mysql类型转换说明

DataX内部类型Mysql数据类型Longint, tinyint, smallint, mediumint, int, bigintDoublefloat, double, decimalStringvarchar, char, tinytext, text, mediumtext, longtext, yearDatedate, datetime, timestamp, timeBooleanbit, boolBytestinyblob, mediumblob, blob, longblob, varbinary

4. HdfsWriter支持大部分Hive类型

DataX内部类型HIVE数据类型LongTINYINT,SMALLINT,INT,BIGINTDoubleFLOAT,DOUBLEStringSTRING,VARCHAR,CHARBooleanBOOLEANDateDATE,TIMESTAMP

5. Mysql准备数据如下

mysql> create table person(
    -> id bigint,
    -> name varchar(64)
    -> );
Query OK, 0 rows affected (0.06 sec)

mysql> 
mysql> insert into person(id, name) values(1, 'yi'), (2, 'er');
Query OK, 2 rows affected (0.02 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> 
mysql> select * from person;
+------+------+
| id   | name |
+------+------+
|    1 | yi   |
|    2 | er   |
+------+------+
2 rows in set (0.00 sec)

mysql> 

6. 新建job/mysql2hdfs.json

内容如下:

[root@bigdata001 datax]# cat job/mysql2hdfs.json 
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id", "name"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://192.168.8.115:3306/test"], 
                                "table": ["person"]
                            }
                        ], 
                        "password": "Root_123", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}], 
                        "compress": "", 
                        "defaultFS": "hdfs://192.168.8.111:9000", 
                        "fieldDelimiter": "|", 
                        "fileName": "person.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
[root@bigdata001 datax]#

7. 执行job

会先写入临时文件,如果成功,则将临时文件rename,再删除临时文件;如果失败,直接删除临时文件

[root@bigdata001 datax]# bin/datax.py job/mysql2hdfs.json 

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
......省略部分......
2022-06-14 10:16:33.551 [0-0-0-writer] INFO  HdfsWriter$Task - begin do write...
2022-06-14 10:16:33.551 [0-0-0-writer] INFO  HdfsWriter$Task - write to file : [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d]
......省略部分......
2022-06-14 10:16:43.512 [job-0] INFO  HdfsWriter$Job - start rename file [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d] to file [hdfs://192.168.8.111:9000/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d].
......省略部分......
2022-06-14 10:16:43.915 [job-0] INFO  HdfsWriter$Job - start delete tmp dir [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f] .
......省略部分......
2022-06-14 10:16:44.034 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2022-06-14 10:16:31
任务结束时刻                    : 2022-06-14 10:16:44
任务总计耗时                    :                 12s
任务平均流量                    :                0B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

[root@bigdata001 datax]#

8. 查看hdfs的文件

会在该文件名后添加随机的后缀,作为每个线程写入的实际文件名

[root@bigdata001 ~]# hadoop fs -cat /person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d
1|yi
2|er
[root@bigdata001 ~]#
标签: hdfs mysql hadoop

本文转载自: https://blog.csdn.net/yy8623977/article/details/125272844
版权归原作者 Bulut0907 所有, 如有侵权,请联系我们删除。

“数据同步工具DataX从Mysql同步数据到HDFS实战”的评论:

还没有评论