同步存量数据
把数据源端的数据通过Kettle读取,然后通过消息队列中间件(Kafka)导出
软件准备
Kettle:pdi-ce-9.3.0.0-428
kettle简介
Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,绿色无需安装,数据抽取高效稳定。中文名称叫水壶,它允许你管理来自不同数据库的数据,通过提供一个图形化的用户环境来描述你想做什么,而不是你想怎么做。Kettle中有两种脚本文件,transformation(ktr)和job(kjb),transformation完成针对数据的基础转换,job则完成整个工作流的控制。
Kettle目前包含五个产品:Spoon、Pan、Chef、Kithcen、Encr。
SPOON: 是一个图形用户界面,允许你通过图形界面来设计ETL转换过程(Transformation)和任务。
PAN: 转换(trasform)执行器;允许你批量运行由Spoon设计的ETL转换 (如使用一个时间调度器)。Pan是一个后台执行的程序,没有图形界面。
CHEF: 允许你创建任务(Job)。 任务通过允许每个转换,任务,脚本等等,更有利于自动化更新数据仓库的复杂工作。任务通过允许每个转换,任务,脚本等等。任务将会被检查,看看是否正确地运行了。
KITHCEN: 作业(job)执行器;允许你批量使用由Chef设计的任务 (如使用一个时间调度器)。KITCHEN也是一个后台运行的程序。
ENCR: 用来加密连接数据库密码与集群时使用的密码。
kettle使用
学习参考:https://blog.csdn.net/yuan2019035055/article/details/120409547?spm=1001.2014.3001.5506
D:\Kettle\pdi-ce-9.3.0.0-428\data-integration\lib需在此目录下导入达梦数据库的连接驱动jar包。
启动D:\Kettle\pdi-ce-9.3.0.0-428\data-integration\Spoon.bat
连接达梦数据库
达梦数据库连接配置,我腾讯云上的数据库。
右键新建的达梦数据库连接,“共享”可以让说有转换和工作可以选择此连接。
右上角新建一个资源库,我在mysql里新建了一个库用来当作kettle的资源库。
单表同步操作
流程简介:将本地mysql的r_kettle数据库中的im_message表,通过kafka分流同步到腾讯云服务器中达梦数据库的SYSDBA.im_message中。
1、将源数据以json格式输出给Kafka producer
以下为各个steps的配置:
表输入
json output
kafka producer
2、从kafka消费数据,通过“字段选择”选择出生产者配置的Message 字段,如上图所示。将data进行Json解析,在输出到目标表中。
以下为各个steps的配置:
kafka consumer
Consumer group为必填项
结果集需要配置一个转换,配置到上图所示。
字段选择
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0eYrUviX-1653493912533)(F:\Desktop\Snipaste_2022-05-25_23-28-01.png)]
json input
表输出
构建完成后,先执行第二步,启动后它会持续运行,需手动停止,再执行第一步,当有新的数据被生产者提供到kafka队列中,第二步会自动开始消费数据。
演示
r_kettle中新增一张表后,通过脚本插入数据
publicclassMysqlBatchUtil{privateString sql="INSERT INTO im_message (sender,send_time,receiver,content,is_read,read_time) VALUES (?,now(),?,?,?,now())";publicstaticConnectiongetConnection(){Connection con =null;try{Class.forName("com.p6spy.engine.spy.P6SpyDriver");String url ="jdbc:p6spy:mysql://127.0.0.1:3306/r_kettle?useServerPrepStmts=false&rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";String user ="root";String password ="Weih@nfdw2022";
con =DriverManager.getConnection(url, user, password);}catch(ClassNotFoundException|SQLException e){
e.printStackTrace();}return con;}publicvoiddoStore()throwsSQLException,InterruptedException{Connection conn =getConnection();// 设置手动提交
conn.setAutoCommit(false);int count =0;PreparedStatement ps = conn.prepareStatement(sql);Date begin=newDate();for(int i=0;i<=100000;i++){
ps.setString(1,"sen"+i+"der");// ps.setTimestamp(2, new Timestamp(1));
ps.setString(2,"rece"+i+"iver");
ps.setString(3,"cont"+i+"ent");
ps.setDouble(4,Math.random());//ps.setDate(5, new java.sql.Date(i));// 加入批量处理
ps.addBatch();
count++;// 执行批量处理
ps.executeBatch();// 提交
conn.commit();// Thread.sleep(1);}Date end=newDate();System.out.println("数量="+count);System.out.println("运行时间="+(end.getTime()-begin.getTime()));
conn.close();}publicstaticvoidmain(String[] args){try{newMysqlBatchUtil().doStore();}catch(Exception e){
e.printStackTrace();}}}
共6620条数据
建立目标库的表,此时为0条数据
执行单表同步操作的第二步,使之处于运行状态
执行第一步骤
first这个topic中分了三个分区,会以3:3:2的形式分配数据,再按一个一个的分区进行消费。
6620条数据输出完成
刷新查看数据库,插入成功!
版权归原作者 Jessluo 所有, 如有侵权,请联系我们删除。