系列文章目录
一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel
八、DataX源码分析-插件机制
文章目录
DataX是什么?
DataX是阿里开源的异构数据源离线同步工具。它致力于实现包括关系型数据库(如MySQL、Oracle等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
DataX的设计理念是将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源时,只需要将此数据源对接到DataX,便能与已有的数据源实现无缝数据同步。
DataX的架构主要基于Framework + Plugin的设计模式。它将数据读取和写入抽象成为Reader和Writer插件,这些插件可以接入不同的数据源,实现数据的读取和写入操作。同时,DataX提供了丰富的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。
DataX的核心优势包括稳定性、高效性、易用性和扩展性。它经过长时间大规模生产环境的验证,能够保证数据同步的稳定性和可靠性;通过多线程、多进程、流式处理等技术手段,实现高效的数据同步;提供简单易用的配置方式,用户可以通过配置文件来定义数据源、目标端、同步策略等;支持丰富的插件体系,可以方便地扩展新的数据源和目标端。
此外,DataX还提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在库可以承受的范围内达到最佳的同步速度。同时,它还具有强劲的同步性能、健壮的容错机制以及极简的使用体验等特点。
总之,DataX是一个强大而灵活的数据同步工具,能够有效地解决异构数据源之间的数据同步问题。通过合理的配置和优化,它可以帮助用户实现高效、稳定、可靠的数据同步操作。
DataX支持的数据源
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入 。
类型数据源Reader(读)Writer(写)文档RDBMS 关系型数据库MySQL√√读 、写Oracle√√读 、写OceanBase√√读 、写SQLServer√√读 、写PostgreSQL√√读 、写DRDS√√读 、写Kingbase√√读 、写通用RDBMS(支持所有关系型数据库)√√读 、写阿里云数仓数据存储ODPS√√读 、写ADB√写ADS√写OSS√√读 、写OCS√写Hologres√写AnalyticDB For PostgreSQL√写阿里云中间件datahub√√读 、写SLS√√读 、写图数据库阿里云 GDB√√读 、写Neo4j√写NoSQL数据存储OTS√√读 、写Hbase0.94√√读 、写Hbase1.1√√读 、写Phoenix4.x√√读 、写Phoenix5.x√√读 、写MongoDB√√读 、写Cassandra√√读 、写数仓数据存储StarRocks√√读 、写ApacheDoris√写ClickHouse√√读 、写Databend√写Hive√√读 、写kudu√写selectdb√写无结构化数据存储TxtFile√√读 、写FTP√√读 、写HDFS√√读 、写Elasticsearch√写时间序列数据库OpenTSDB√读TSDB√√读 、写TDengine√√读 、写
DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。
DataX的框架设计
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
DataX核心架构
DataX 3.0采用微内核架构模式, 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
核心模块介绍:
- DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
- DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
- 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
- DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。
DataX调度流程:
DataX的调度流程可以分为以下几个步骤:
- Job切分:首先,DataX的Job模块会根据分库分表策略将Job切分成若干个小的Task。这是为了确保每个Task可以独立执行,并且可以并发执行以提高效率。
- 并发数与TaskGroup计算:然后,根据用户配置的并发数,DataX会计算需要分配多少个TaskGroup。计算的方式是将总的Task数量除以每个TaskGroup中的Task数量(通常为5),从而得到TaskGroup的数量。
- TaskGroup分配与启动:接下来,DataX会根据计算出的TaskGroup数量,将Task分配到各个TaskGroup中。每个TaskGroup会启动多个TaskExecutor来执行具体的Task。
- TaskExecutor启动:当TaskGroup启动后,其中的TaskExecutor会启动ReaderThread和WriterThread。ReaderThread负责从数据源读取数据,WriterThread负责将数据写入目标端。这两个线程协同工作,实现了数据的读取、转换和写入过程。
- 数据同步:在每个TaskExecutor中,ReaderThread和WriterThread会不断地从数据源读取数据,并将数据写入目标端,直到所有的数据都同步完成。 整个调度流程依赖于Java底层线程池进行并发控制,DataX通过合理的调度策略和线程管理机制,实现了高效、稳定、可靠的数据同步。
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
- DataXJob根据分库分表切分成了100个Task。
- 根据20个并发,DataX计算共需要分配4个TaskGroup。
- 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
DataX部署和配置
- 工具部署- 方法一、直接下载DataX工具包:DataX下载地址下载后解压至本地某个目录,进入bin目录,即可运行同步作业:
$ cd{YOUR_DATAX_HOME}/bin$ python datax.py {YOUR_JOB.json}
自检脚本: python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json- 方法二、下载DataX源码,自己编译:DataX源码(1)、下载DataX源码:$ git clone [email protected]:alibaba/DataX.git
(2)、通过maven打包:$ cd{DataX_source_code_home}$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包成功,日志显示如下:[INFO] BUILD SUCCESS[INFO] -----------------------------------------------------------------[INFO] Total time: 08:12 min[INFO] Finished at: 2015-12-13T16:26:48+08:00[INFO] Final Memory: 133M/960M[INFO] -----------------------------------------------------------------
打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下:$ cd{DataX_source_code_home}$ ls ./target/datax/datax/bin conf job lib log log_perf plugin script tmp
- 配置示例:从stream读取数据并打印到控制台
- 第一步、创建作业的配置文件(json格式)可以通过命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
$ cd{YOUR_DATAX_HOME}/bin$ python datax.py -r streamreader -w streamwriterDataX (UNKNOWN_DATAX_VERSION), From Alibaba !Copyright (C)2010-2015, Alibaba Group. All Rights Reserved.Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document: https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job.{"job":{"content":[{"reader":{"name":"streamreader", "parameter":{"column":[], "sliceRecordCount":""}}, "writer":{"name":"streamwriter", "parameter":{"encoding":"", "print":true}}}], "setting":{"speed":{"channel":""}}}}
根据模板配置json如下:#stream2stream.json{"job":{"content":[{"reader":{"name":"streamreader","parameter":{"sliceRecordCount":10,"column":[{"type":"long","value":"10"},{"type":"string","value":"hello,你好,世界-DataX"}]}},"writer":{"name":"streamwriter","parameter":{"encoding":"UTF-8","print":true}}}],"setting":{"speed":{"channel":5}}}}
- 第二步:启动DataX$ cd{YOUR_DATAX_DIR_BIN}$ python datax.py ./stream2stream.json
同步结束,显示日志如下:...2023-12-17 11:20:25.263 [job-0] INFO JobContainer - 任务启动时刻 :2023-12-17 11:20:15任务结束时刻 :2023-12-17 11:20:25任务总计耗时 : 10s任务平均流量 : 205B/s记录写入速度 : 5rec/s读出记录总数 :50读写失败总数 :0
- 第一步、创建作业的配置文件(json格式)可以通过命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。