一 DataX简介
1.1 DataX
1.1.1 Data X概览
DataX 是阿里云DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
1.1.2 DataX3.0框架设计
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
1.1.3 DataX3.0插件体系
类型
数据源
Reader(读)
Writer(写)
文档
RDBMS 关系型数据库
MySQL
√
√
读、写
Oracle
√
√
读、写
OceanBase
√
√
读、写
SQLServer
√
√
读、写
PostgreSQL
√
√
读、写
DRDS
√
√
读、写
达梦
√
√
读、写
通用RDBMS(支持所有关系型数据库)
√
√
读、写
阿里云数仓数据存储
ODPS
√
√
读、写
ADS
√
写
OSS
√
√
读、写
OCS
√
√
读、写
NoSQL数据存储
OTS
√
√
读、写
Hbase0.94
√
√
读、写
Hbase1.1
√
√
读、写
MongoDB
√
√
读、写
Hive
√
√
读、写
无结构化数据存储
TxtFile
√
√
读、写
FTP
√
√
读、写
HDFS
√
√
读、写
Elasticsearch
√
写
1.1.4 DataX3.0六大核心优势
1、可靠的数据质量监控
1)完美解决数据传输个别类型失真问题
2)提供作业全链路的流量、数据量运行时监控
3)提供脏数据探测
2、丰富的数据转换功能
3、精准的速度控制
4、强劲的同步性能
5、健壮的容错机制
6、极简的使用体验
1.2 DataX-Web
DataX Web是在DataX之上开发的分布式数据同步工具,提供简单易用的 操作界面,降低用户使用DataX的学习成本,缩短任务配置时间,避免配置过程中出错。用户可通过页面选择数据源即可创建数据同步任务,支持RDBMS、Hive、HBase、ClickHouse、MongoDB等数据源,RDBMS数据源可批量创建数据同步任务,支持实时查看数据同步进度及日志并提供终止同步功能,集成并二次开发,可根据时间、自增主键增量同步数据。
任务"执行器"支持集群部署,支持执行器多节点路由策略选择,支持超时控制、失败重试、失败告警、任务依赖,执行器CPU、内存、负载的监控等等。数据转换UDF、表结构同步、数据同步血缘等更为复杂的业务场景。
简单来说用户可以通过图形化web,构建DataX Json,可以轻松调度各Job启停,DataX-Web也提供了诸如阻塞处理、超时警告等等功能辅助生产,对于少量数据同步任务,DataX-Web完全可以胜任,并且大大减少了工作量。
二 DataX及DataX-Web部署
2.1 DataX安装
2.1.1 解压安装
从github(https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz)
下载安装包或者下载源码打包(在这里使用下载安装包的方式),上传到服务器运行tar –zxvf安装
2.1.2 验证安装
datax/job文件夹下自带一个简单job任务,可以以此测试是否安装成功,
python /usr/datax/bin/datax.py ../job/job.json。具体使用在后文中介绍。
可能遇到的问题:
(1)Description:[DataX引擎配置错误,该问题通常是由于DataX安装错误引起,请联系您的运维解决.].- 获取作业配置信息失败:/usr/datax/job/job,json - java.io.FileNotFoundException: File '/usr/datax/job/job,json' does not exist
解决办法
rm -rf /usr/datax/plugin//._ 删除插件文件夹中的隐藏文件
(2)Description:[DataX引擎配置错误,该问题通常是由于DataX安装错误引起,请联系您的运维解决.]. - 在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数
解决办法
修改datax/conf/core.json, core -> transport -> channel -> speed -> "byte": 2000000,将单个channel的大小改为2MB即可。
2.2 DataX-Web安装
2.2.1 基础软件
(1)MySQL (5.5+) 必选,对应客户端可以选装, Linux服务上若安装mysql的客户端可以通过部署脚本快速初始化数据库
(2)JDK (1.8.0_xxx) 必选
(3)Maven (3.6.1+) 必选
(4)DataX 必选
(5)Python (2.x) (支持Python3需要修改替换datax/bin下面的三个python文件,替换文件在doc/datax-web/datax-python3下) 必选,主要用于调度执行底层DataX的启动脚本,默认的方式是以Java子进程方式执行DataX,用户可以选择以Python方式来做自定义的改造
2.2.2 打包安装
这次选择源码打包安装(GitHub - WeiYe-Jing/datax-web: DataX集成可视化页面,选择数据源即可一键生成数据同步任务,支持RDBMS、Hive、HBase、ClickHouse、MongoDB等数据源,批量创建RDBMS数据同步任务,集成开源调度系统,支持分布式、增量同步数据、实时查看运行日志、监控执行器资源、KILL运行进程、数据源信息加密等。)
打包 mvn clean install
将jar包上传到服务器并tar –zxvf安装,
进入datax-web-2.1.2/bin ,bash运行install.sh进行交互式安装,交互式安装会依次解压各模块的package以及调用configure配置脚本,可以逐步判断安装包是否有误。
如果服务上有MySQL,则执行安装脚本过程中会出现:
Scan out mysql command, so begin to initalize the database
Do you want to initalize database with sql: [{INSTALL_PATH}/bin/db/datax-web.sql]? (Y/N)y
Please input the db host(default: 127.0.0.1):
Please input the db port(default: 3306):
Please input the db username(default: root):
Please input the db password(default: ):
Please input the db name(default: exchangis)
可以输入数据库地址,端口号,用户名,密码以及数据库名称,如果没有安装MySQL,则可以取用目录下/bin/db/datax-web.sql脚本手动执行,完成后修改相关配置文件
vi ./modules/datax-admin/conf/bootstrap.properties
#Database
#DB_HOST=
#DB_PORT=
#DB_USERNAME=
#DB_PASSWORD=
#DB_DATABASE=
注意MySQL-version务必高于5.5,实际测试中版本过低会导致配置表创建出错
2.2.3 配置
在项目目录: /modules/datax-admin/bin/env.properties 配置邮件服务(可跳过)
MAIL_USERNAME=""
MAIL_PASSWORD=""
此文件中包括一些默认配置参数,例如:server.port,具体请查看文件。
在项目目录下/modules/datax-execute/bin/env.properties 指定PYTHON_PATH的路径
vi ./modules/{module_name}/bin/env.properties
执行datax的python脚本地址:PYTHON_PATH=
保持和datax-admin服务的端口一致;默认是9527,如果没改datax-admin的端口,可以忽略:DATAX_ADMIN_PORT=
此文件中包括一些默认配置参数,例如:executor.port,json.path,data.path等,具体请查看文件。
2.2.4 验证安装
启动服务bash bin/start-all.sh
可以通过start.sh m {moule_name}单一启动某一模块服务。
停止同理
jps查看是否出现DataXAdminApplication和DataXExecutorApplication进程
部署完成后,在浏览器中输入http://{ip}:{port}/index.html就可以访问对应的主界面(ip为datax-admin部署所在服务器ip,port为datax-admin 指定的运行端口)
输入用户名 admin 密码 123456 就可以直接访问系统
还可在多设备上部署以维持负载均衡,防止单节点挂掉导致任务停摆。
通过Spring框架在windows运行
2.2.5 运行简单任务
实现MySQL间数据同步。
注意需要在数据库连接后面添加如下字符编码规则参数,否则表中汉字不能识别
?useUnicode=true&characterEncoding=utf8
由于构建过程全程可以由web进行可视化操作,细节不再赘述,详见文末实例。
2.3 增量同步
相比于全量同步每次都把整张表同步一次,增量同步只更新新增的部分,全量同步在某些情况下效率较低,例如某张表数据量较大,但是每天数据的变化比例很低,若对其采用每日全量同步,则会重复同步和存储大量相同的数据。
DataX在数据增量同步方面比较欠缺,DataX-Web在这方面做了补足。
2.3.1 主键自增进行增量同步(MySQL->MySQL)
① 任务类型选DataX任务
② 辅助参数选择主键自增
③ 增量主键开始ID选择,即sql中查询ID的开始ID,用户使用此选项方便第一次的全量同步。第一次同步完成后,该ID被更新为上一次的任务触发时最大的ID,任务失败不更新。
④ 增量时间字段,-DstartId='%s' -DendId='%s' 先来解析下这段字符串
1、-D是DataX参数的标识符,必配
2、-D后面的startId和endId是DataX json中where条件的id字段标识符,必须和json中的变量名称保持一致,endId是任务在每次执行时获取当前表maxId,也是下一次任务的startId
3、='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致
4、注意-DstartId='%s'和-DendId='%s' 中间有一个空格,空格必须保留并且是一个空格
5、reader数据源,选择任务同步的读数据源
6、配置reader数据源中需要同步数据的表名及该表的主键
将json中原本的colum去掉,改为querySql(关于querySql、querySql、preSql、postSql、splitPk等配置参数,详见datax(27):不太常见配置项querySql、preSql、postSql、splitPk[通俗易懂] - 全栈程序员必看)
此处的关键点在${startId},${endId},${}是DataX动态参数的固定格式,startId,endId就是我们页面配置中 -DstartId='%s' -DendId='%s'中的startId,endId,注意字段一定要一致。
可能遇到的问题:
问题原因
由4.3.1中④可知,endId是任务在每次执行时获取当前表startId,也是下一次任务startId,每次任务,endId和startId的变化过程都是:
1、自定义初始startId
2、endId获取reader表中主键最大值
3、目标表更新主键介于startId,和endId之间的行
4、成功运行?startId=endId :startId=startId
5、循环进行2-4
如果运行中出现一次上述错误,例如导入重复数据而引发的主键唯一错误,那么从这个出错开始,后续的所有运行都会出错,虽然最终数据依然会完成同步,但每一次运行都相当于进行了一次全量同步,增量同步的设置将失去意义。
解决办法
任务构建阶段填写报警邮件,在出现报错时及时发现并处理。
2.3.2 时间自增进行增量同步(MySQL->MySQL)
前提是表中有类似updatetime字段,可以为时间标准格式或者时间戳。
注意:
第一次时间增量时间要早于表的全部update_time,就是把时间戳传入json的lastTime和currentTime;
FROM_UNIXTIME是为了将表里面的标准格式转换为时间戳进行比较,如果表里默认为时间戳就不需要这个函数;
querySql中使用了select后需要把reader里面column,splitP需要删除掉 ;
“writeMode”: "update"相对insert会好一些;
2.4 分区表写入
2.4.1 修改源码加入自动创建分区功能
对于写入时如果不提前创建分区就会报:
有如下解决方案:
1、修改DataX源码中hdfswriter下的hdfshelper.java,添加createPath方法:
public boolean createPath(String filePath) {
Path path = new Path(filePath);
boolean exist = false;
try {
if (fileSystem.exists(path)) {
String message = String.format("文件路径[%s]已存在,无需创建!",
"message:filePath =" + filePath);
LOG.info(message);
exist = true;
} else {
exist = fileSystem.mkdirs(path);
}
} catch (IOException e) {
String message = String.format("创建文件路径[%s]时发生网络IO异常,请检查您的网络是否正常!",
"message:filePath =" + filePath);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
return exist;
}
2、修改hdfswriter目录下的HdfsWriter.java,在最后的else调用createPath并修改返回日志
3、将源码打包并替换datax/plugin/writer/hdfswriter/hdfswriter-0.0.1-SNAPSHOT.jar
4、测试
来源:
(二次开发DataX以支持HIVE分区表_datax二次开发_MaxineSgr的博客-CSDN博客)
2.4.2 运行分区实例
官方文档中有解释,DataX只支持一次写入单个分区,可以说DataX对hive分区表非常不友好。
DataX-web将写入过程做了简化,但仍然无法做到灵活的创建分区和一次多个写入
任务的设置如下
如果按上图设置,最后的分区生成是这样的:
其中分区字段仅支持类似时间的字段,分区时间最多可以选到-20,即当前时间的前二十天。
json设置如下,${partition}是固定格式,不能更改
由最后创建的目录可以推断partition是由源码中输入的day与系统当前日期更改格式并计算天数后拼凑成的参数。目前网上没有教程但理论上可以自行修改这部分源码以让分区字段与日期选择更加灵活
注意由此动态分区自动生成路径的方法,写入后可能在Hive端无法查询数据,用MSCK REPAIR TABLE将没有写入metastore的分区信息写入metastore。
MSCK REPAIR TABLE hive_partition_test
运行结果:
在生产中,假设今日是一月二日,数据接收后存入一张日
三 Kettle
3.1 Kettle简介
1、Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、 Linux、 Unix上运行,绿色无需安装,数据抽取高效稳定。
2、Kettle 中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。
3、Kettle这个ETL工具集,它允许你管理来自不同数据库的数据,通过提供一个图形化的用户环境来描述你想做什么,而不是你想怎么做。
4、Kettle中有两种脚本文件,transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制。
5、Kettle(现在已经更名为PDI,Pentaho Data Integration-Pentaho)。
3.2 下载及配置
(1)Kettle下载安装非常简单,官网下载安装包,找下载数最多的下载解压即可
(Pentaho from Hitachi Vantara - Browse Files at SourceForge.net)
(2)将Mysql连接驱动置于pdi-ce\data-integration\lib下
(3)如果出现hive连接错误的情况,需要将hive lib下的jar包复制到pdi-ce\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\hdp30\lib
并将hive和hadoop的配置文件都复制到pdi-ce-8.3.0.0-371\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\hdp30
具体教程可见(kettle连接Hive配置(一)-爱码网)
需要注意的是,官网下载的pdi-9.3版本不知为何缺少hive连接的相关配置文件,且目前缺少该版本的博客,可以下载8.3版本
但是kettle8.3版本不兼容mysql8以上版本的jdbc-connecter,若设备使用的是mysql8以上版本,有如下解决办法
1、数据库类型选择Generic Database,相当于不用kettle预设的类型,自定义连接
2、在kettle的安装目录下data-integration\simple-jndi\jdbc.properties加入jdbc的连接信息,相当于预设好一个连接,直接使用
然后在连接类型MySQL中选择预设好的JNDI
教程参考(kettle连接mysql8.0以上版本_北顾南望的博客-CSDN博客_kettle连接mysql8.0)
3.3 简单运行
设置一个流程将两个MySQL表的内容输入,排序后根据,id将
可以看到并更改字段关系映射
一定要仔细定义数据同步中的高级标签,否则会报错“It was not possible to find operation field [null] in the input stream!”
mysql->mysql运行时长
hive->mysql运行时长
在“作业”中可以对编辑好的“转换”设计运行间隔以达到定时同步的功能。
总结
1、DataX
作为阿里的开源项目,DataX的稳定性与可维护性非常有保障,网上有大量的测试与报错博客,更有补足功能、提供UI操作的DataX-web,使得DataX可以投入到项目中来。另:官方提供了对TDengine的reader和writer。
但DataX有缺陷,作为开源项目,某些功能官方维护不及时,针对某些急需实现的功能(如实例1、(3)中提到的writer过程中的null问题),网上有修改源码的教程。由于本次测试涉及面较窄,生产生活中还可能存在类似情况,到时候需要自主寻错、修改源码。
且DataX对Hive数据的读取可以看作对hdfs上存储的数据进行读取,只能是顺序的,且不能对列有细致操作,实际工作中有类似需要,只能先在数据库中操作,再进行同步作业。
DataX-web提供了一个方便快捷的向分区表写入的方法,但不能自由地多次地写入分区,只能一次写入一个分区,分区字段不能选择除时间以外的字段,时间的选择并不自由,只能选当前时间前20天内的一天作为分区。如果有相关需求,可以进行源码的二次开发,建议二次开发时一定做好记录,提高部门同事间沟通效率,以提高工作效率。
虽有不足但DataX总体上能承担日常生活中的数据同步工作。DataX官方插件有严格的格式设置,编写json时请先查看相关插件的参数介绍。
2、Kettle
Kettle提供了强大的图形化Spoon,可以做到中小型需求不写一行代码,大大减少了工作量,且对于Hive数据的读取可以做列的操作,数据的读取是依靠字段进行的而不是顺序进行的,比DataX、Sqoop更加精细。kettle重点是中间变换工作,集成功能速度较慢
时间对比,上图为datax,下图为kettle,可以看到差距还是比较大的
MySQL->MySQL:
Hive->MySQL:
可以看到在同一台机器上运行同一份数据时,在MySQL->MySQL和Hive->MySQL的情况下,仅29万条数据,Kettle和DataX在效率上就存在明显差距。随着数据量的提升,差距会愈发明显。因此若选用Kettle则需要考虑在海量数据的前提下的调优问题,业务越复杂,需要优化的组件就越多。
且Kettle spoon对于定时运行缺乏集成化的设置,一旦业务数量变多,执行定时调度时,就只能通过系统自带的定时任务调度去进行管理。无法统一,假如要做统一的管理,需要安装一套jenkins,但配置和后续的运维成本可能较高。以及kettle的内存占用较高,无法最大效率地利用服务器资源
实例 datax在常见数据库之间的测试
1、mysql <-> mysql <-> hive <-> hive
注意:
(1)hive作为数据源的时候,注意在reader设置中不要跳过表头,否则会缺失第一列数据
(2)单机版的DataX,在脏数据统计上有点小问题。举个例子:先运行了一个任务A,假设这个任务A有5条脏数据,errorlimit设置了record:0,在运行时,这个任务是一定会因为脏数据而终止执行。任务A终止执行后,你紧接着运行任务B,假设任务B没有脏数据,errorlimit也设置了record:0,但有可能任务B在运行时,它也会报“在运行的过程中捕获了5条脏数据,任务结束”。也就是任务A的脏数据会影响任务B。问题出在LocalTGCommunicationManager这个类中,它使用了一个静态变量taskGroupCommunicationMap来存储脏数据。
可能遇到的问题:
(1)(问题可能出现在hdfsreader等)
2022-12-15 13:59:56.077 [0-0-0-reader] ERROR StdoutPluginCollector - 脏数据:
{"message":"No enum constant com.alibaba.datax.plugin.unstructuredstorage.reader. UnstructuredStorageReaderUtil.Type.BIGINT","record":[],"type":"reader"}
问题原因:datax支持的数据类型与hive有不同
DataX 数据类型
Hive表 数据类型
Long
TINYINT,SMALLINT,INT,BIGINT
Double
FLOAT,DOUBLE
String
String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY
Boolean
BOOLEAN
Date
Date,TIMESTAMP
其余多种数据库间数据类型映射,可参考(datax与多种数据库间数据类型映射_datax 数据类型_chimchim66的博客-CSDN博客)
解决办法:
将reader中所有其余数据库的数据类型依次按照对照表改为datax支持的数据类型。
(2)(问题可能出现在hdfsreader等)
问题原因:
在read过程中,\N不能转换为DATE属性的值,报类型转换错误
解决办法:
1、在reader中设置,使空值不再参与转换,直接跳过
2、设中转表将DATE类型改为STING类型,再由数据库或olap组件内部转换格式。
(3)(问题可能出现在将数据同步后的Hive又作为数据源输出时)
类型转换错误,无法将[]转换为[date]等等一系列类似错误
问题原因:
hdfswriter并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,hdfswriter会将null值存储为空字符串(’’),但Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。
解决办法:
1、修改datahdfswriter的源码,增加自定义null值存储格式的逻辑,参考(记Datax3.0解决MySQL抽数到HDFSNULL变为空字符的问题_谭正强的博客-CSDN博客_datax nullformat),然后mvn打包。在hdfswriter阶段就把null值划分清楚,避免后续问题积少成多。
在hive表同步到hive表的过程中reader和writer都要设置nullFormat参数。
2、或是在Hive中建表时指定null值存储格式为空字符串(’’)(不推荐)
例如:
DROP TABLE IF EXISTS base_province;
CREATE EXTERNAL TABLE base_province
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '省份名称',
`region_id` STRING COMMENT '地区ID',
`area_code` STRING COMMENT '地区编码',
`iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
`iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
) COMMENT '省份表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/base_province/';
(4)(问题可能出现在hdfsreader、hdfswriter等)
在同步过程中,出现列数据混乱,或日志出现类型转换错误
问题原因:
由于datax对于hdfs读写非常格式化,如果建表的列顺序不对或列分隔符出现在表中的话,都会引起数据读写的混乱的情况。
解决办法:
实际建表中建议hive列分隔符不要出现任何表中可能有的元素,例如“\t”,“,”“;”等。创建列的先后顺序严格一致,以减少同步过程中数据混乱。
版权归原作者 省略号的搬运工 所有, 如有侵权,请联系我们删除。