0


同步存量数据

同步存量数据

把数据源端的数据通过Kettle读取,然后通过消息队列中间件(Kafka)导出

软件准备

Kettle:pdi-ce-9.3.0.0-428

kettle简介

Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,绿色无需安装,数据抽取高效稳定。中文名称叫水壶,它允许你管理来自不同数据库的数据,通过提供一个图形化的用户环境来描述你想做什么,而不是你想怎么做。Kettle中有两种脚本文件,transformation(ktr)和job(kjb),transformation完成针对数据的基础转换,job则完成整个工作流的控制。

Kettle目前包含五个产品:Spoon、Pan、Chef、Kithcen、Encr。

SPOON: 是一个图形用户界面,允许你通过图形界面来设计ETL转换过程(Transformation)和任务。

PAN: 转换(trasform)执行器;允许你批量运行由Spoon设计的ETL转换 (如使用一个时间调度器)。Pan是一个后台执行的程序,没有图形界面。

CHEF: 允许你创建任务(Job)。 任务通过允许每个转换,任务,脚本等等,更有利于自动化更新数据仓库的复杂工作。任务通过允许每个转换,任务,脚本等等。任务将会被检查,看看是否正确地运行了。

KITHCEN: 作业(job)执行器;允许你批量使用由Chef设计的任务 (如使用一个时间调度器)。KITCHEN也是一个后台运行的程序。

ENCR: 用来加密连接数据库密码与集群时使用的密码。

kettle使用

学习参考:https://blog.csdn.net/yuan2019035055/article/details/120409547?spm=1001.2014.3001.5506

D:\Kettle\pdi-ce-9.3.0.0-428\data-integration\lib需在此目录下导入达梦数据库的连接驱动jar包。

在这里插入图片描述

启动D:\Kettle\pdi-ce-9.3.0.0-428\data-integration\Spoon.bat

在这里插入图片描述

连接达梦数据库

在这里插入图片描述

达梦数据库连接配置,我腾讯云上的数据库。

在这里插入图片描述

右键新建的达梦数据库连接,“共享”可以让说有转换和工作可以选择此连接。

在这里插入图片描述

右上角新建一个资源库,我在mysql里新建了一个库用来当作kettle的资源库。

在这里插入图片描述

在这里插入图片描述

Dbeaver数据库可视化工具,可参考:(用于可视化达梦数据库)https://blog.csdn.net/qq_32144799/article/details/122620632?ops_request_misc=&request_id=&biz_id=102&utm_term=Dbeaver%E8%BF%9E%E6%8E%A5%E8%BE%BE%E6%A2%A6%E6%95%B0%E6%8D%AE%E5%BA%93&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduweb~default-0-122620632.142v10control,157v12control&spm=1018.2226.3001.4187

Offset Explorer可视化操作kafka工具,可参考:https://blog.csdn.net/weixin_56193843/article/details/120238989?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165349076216781667826290%2522%252C%2522scm%2522%253A%252220140713.130102334…%2522%257D&request_id=165349076216781667826290&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduend~default-1-120238989-null-null.142v10control,157v12control&utm_term=offset+explorer%E8%BF%9E%E6%8E%A5kafka&spm=1018.2226.3001.4187

单表同步操作

流程简介:将本地mysql的r_kettle数据库中的im_message表,通过kafka分流同步到腾讯云服务器中达梦数据库的SYSDBA.im_message中。

1、将源数据以json格式输出给Kafka producer

在这里插入图片描述

以下为各个steps的配置:

表输入

在这里插入图片描述

json output

在这里插入图片描述

在这里插入图片描述

kafka producer

在这里插入图片描述

2、从kafka消费数据,通过“字段选择”选择出生产者配置的Message 字段,如上图所示。将data进行Json解析,在输出到目标表中。

在这里插入图片描述

以下为各个steps的配置:

kafka consumer

Consumer group为必填项

在这里插入图片描述

在这里插入图片描述

结果集需要配置一个转换,配置到上图所示。

在这里插入图片描述

在这里插入图片描述

字段选择

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0eYrUviX-1653493912533)(F:\Desktop\Snipaste_2022-05-25_23-28-01.png)]在这里插入图片描述

json input

在这里插入图片描述

在这里插入图片描述

表输出

在这里插入图片描述

在这里插入图片描述

构建完成后,先执行第二步,启动后它会持续运行,需手动停止,再执行第一步,当有新的数据被生产者提供到kafka队列中,第二步会自动开始消费数据。

演示

r_kettle中新增一张表后,通过脚本插入数据

publicclassMysqlBatchUtil{privateString sql="INSERT INTO im_message (sender,send_time,receiver,content,is_read,read_time) VALUES (?,now(),?,?,?,now())";publicstaticConnectiongetConnection(){Connection con =null;try{Class.forName("com.p6spy.engine.spy.P6SpyDriver");String url ="jdbc:p6spy:mysql://127.0.0.1:3306/r_kettle?useServerPrepStmts=false&rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";String user ="root";String password ="Weih@nfdw2022";
            con =DriverManager.getConnection(url, user, password);}catch(ClassNotFoundException|SQLException e){
            e.printStackTrace();}return con;}publicvoiddoStore()throwsSQLException,InterruptedException{Connection conn =getConnection();// 设置手动提交
        conn.setAutoCommit(false);int count =0;PreparedStatement ps = conn.prepareStatement(sql);Date begin=newDate();for(int i=0;i<=100000;i++){

            ps.setString(1,"sen"+i+"der");//          ps.setTimestamp(2, new Timestamp(1));

            ps.setString(2,"rece"+i+"iver");

            ps.setString(3,"cont"+i+"ent");

            ps.setDouble(4,Math.random());//ps.setDate(5, new java.sql.Date(i));// 加入批量处理
            ps.addBatch();

            count++;// 执行批量处理
            ps.executeBatch();// 提交
            conn.commit();//            Thread.sleep(1);}Date end=newDate();System.out.println("数量="+count);System.out.println("运行时间="+(end.getTime()-begin.getTime()));

        conn.close();}publicstaticvoidmain(String[] args){try{newMysqlBatchUtil().doStore();}catch(Exception e){
            e.printStackTrace();}}}

共6620条数据

在这里插入图片描述

建立目标库的表,此时为0条数据

在这里插入图片描述

执行单表同步操作的第二步,使之处于运行状态

在这里插入图片描述

执行第一步骤

在这里插入图片描述

first这个topic中分了三个分区,会以3:3:2的形式分配数据,再按一个一个的分区进行消费。

在这里插入图片描述

6620条数据输出完成

在这里插入图片描述

刷新查看数据库,插入成功!

在这里插入图片描述


本文转载自: https://blog.csdn.net/m0_46323730/article/details/124976440
版权归原作者 Jessluo 所有, 如有侵权,请联系我们删除。

“同步存量数据”的评论:

还没有评论