0


五.海量数据实时分析-FlinkCDC+DorisConnector实现数据的全量增量同步

前言

前面四篇文字都在学习Doris的理论知识,也是比较枯燥,当然Doris的理论知识还很多,我们后面慢慢学,本篇文章我们尝试使用SpringBoot来整合Doris完成基本的CRUD。

由于 Doris 高度兼容 Mysql 协议,两者在 SQL 语法方面有着比较强的一致性,另外 Mysql 客户端也是 Doris 官方选择的客户端。因此,如需对 Mysql 进行数据分析,使用 Doris 的迁移成本较低。但是对于数据规模特别大的情况下Mysql的方式还是不太建议的,所以这里还会介绍一种 方式就是通过CDC+DorisConnector

一.整合Mybatis操作Doris

1.准备数据库

CREATETABLE`doris_test`(`id`intNULLCOMMENT"id",`price`decimal(10,2)NULLCOMMENT"价格",`title`varchar(20)NULLCOMMENT"标题")ENGINE=OLAP
 DUPLICATEKEY(`id`)DISTRIBUTEDBYHASH(`id`) BUCKETS 1
 PROPERTIES ("replication_num"="1");
Query OK,0rows affected (0.06 sec)

2.搭建SpringBoot项目

第一步:创建SpringBoot项目,导入依赖,主要是Mybatis相关的依赖

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--如果要用传统的xml或properties配置,则需要添加此依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId></dependency><!--mybatisplus持久层依赖--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.23</version></dependency></dependencies>

3.整合Mybatis

创建好启动类,实体类,服务层,持久层,SQL映射文件等,这里和我们操作Mysql没有任何区别.

启动类如下,通过 @MapperScan 来扫描Mapper接口

@SpringBootApplication@MapperScan(basePackages ="cn.whale.mapper")publicclassDorisApplication{publicstaticvoidmain(String[] args){SpringApplication.run(DorisApplication.class,args);}}

服务层和持久层代码如下 , 省略了部分代码

@ServicepublicclassDorisServiceImplimplementsDorisService{@AutowiredDorisMapper dorisMapper;@OverridepublicList<Order>listDoris(){return dorisMapper.listDoris();}@Overridepublicintadd(Order order){return dorisMapper.add(order);}}publicinterfaceDorisMapper{List<Order>listDoris();intadd(Order order);}

下面是sql映射文件,和操作数据库没有任何区别

<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPEmapperPUBLIC"-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mappernamespace="cn.whale.mapper.DorisMapper"><selectid="listDoris"resultType="cn.whale.domain.Order">
        select id,price,title from orders
    </select><insertid="add"parameterType="cn.whale.domain.Order">
        INSERT INTO orders(id,price,title) VALUES(#{id},#{price},#{title})
    </insert></mapper>

4.编写配置文件

server:port:8080spring:#Doris数据库连接配置datasource:driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.220.253:9030/app_db?characterEncoding=utf-8&useSSL=falseusername: root
    password:123456type: com.alibaba.druid.pool.DruidDataSource
    initial-size:500min-idle:500max-active:500#mybatis的相关配置mybatis:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.wudl.doris.domain.DorisTest

5.编写测试类

注入Service,完成查询和添加的测试

packagecn.whale.service;importcn.whale.DorisApplication;importcn.whale.domain.Order;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;importjava.math.BigDecimal;importjava.util.List;@RunWith(SpringRunner.class)@SpringBootTest(classes=DorisApplication.class)publicclassDorisServiceTest{@AutowiredprivateDorisService dorisService;@TestpublicvoidlistDoris(){List<Order> orders = dorisService.listDoris();
        orders.stream().forEach(System.out::println);}@TestpublicvoidaddDoris()throwsInterruptedException{for(int i =100; i>0; i--){Order order =newOrder();
            order.setId(Long.valueOf(i));
            order.setPrice(newBigDecimal(i));
            order.setTitle(i+"元");
            dorisService.add(order);}Thread.sleep(100000);}}

二.SpringBoot整合FlinkCDC+doris-connector实时同步

上面通过Mysql客户端的方式来进行数据的同步方式虽然使用起来比较简单,但是不适合大量数据的实时同步,性能不是很好。Doris官方提供了doris-connector进行Doris读写,那么我们可以通过FlinkCDC监听Mysql数据变更,然后同步到SpringBoot项目中,对数据进行处理后,然后通过doris-connector同步到Doris。虽然我们之前也介绍过直接通过FlinkCDC同步Mysql到Doris,那种方式我们没办法对数据进行处理。

1.搭建项目导入依赖

版本兼容
在这里插入图片描述

  • flink-connector-mysql-cdc :flinkCDC,实时监听Mysql增量或者全量同步
  • flink-doris-connector :用来读写Doris的驱动
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.13</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!--mysql -cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency><!-- flink doris connector --><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.13_2.12</artifactId><version>1.0.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency></dependencies>

2.定义消息序列化器

定义序列化器,当拿到Mysql的数据后我们可以通过虚拟化器对数据进行格式化

/**
 * @desc mysql消息读取自定义序列化
 **/publicclassMysqlDeserializationimplementsDebeziumDeserializationSchema<String>{publicstaticfinalString TS_MS ="ts_ms";publicstaticfinalString BIN_FILE ="file";publicstaticfinalString POS ="pos";publicstaticfinalString CREATE ="CREATE";publicstaticfinalString BEFORE ="before";publicstaticfinalString AFTER ="after";publicstaticfinalString SOURCE ="source";publicstaticfinalString UPDATE ="UPDATE";/**
     *
     * 反序列化数据,转为变更JSON对象
     */@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<String> collector){//拿到数据Struct struct =(Struct) sourceRecord.value();//输出数据
        collector.collect(getJsonObject(struct, AFTER).toJSONString());}/**
     *
     * 从原数据获取出变更之前或之后的数据
     */privateJSONObjectgetJsonObject(Struct value,String fieldElement){Struct element = value.getStruct(fieldElement);JSONObject jsonObject =newJSONObject();if(element !=null){Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for(Field field : fieldList){Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);}}return jsonObject;}@OverridepublicTypeInformation<String>getProducedType(){returnTypeInformation.of(String.class);}}

3.Mysql监听器

监听器的作用主要是监听Mysql的数据变更后,通过序列化器把数据进行处理后,然后同步到Doris中

/**
 * @desc mysql变更监听
 **/@ComponentpublicclassMysqlEventListenerimplementsApplicationRunner{@Overridepublicvoidrun(ApplicationArguments args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//设置Mysql数据源DebeziumSourceFunction<String> dataChangeInfoMySqlSource =buildDataChangeSource();DataStream<String> streamSource = env
                .addSource(dataChangeInfoMySqlSource,"mysql-source").setParallelism(1);//streamSource.addSink(dataChangeSink);Properties pro =newProperties();
        pro.setProperty("format","json");
        pro.setProperty("strip_outer_array","true");//配置Doris信息
        streamSource.addSink(DorisSink.sink(DorisExecutionOptions.builder().setBatchSize(3).setBatchIntervalMs(10L).setMaxRetries(3).setStreamLoadProp(pro).setEnableDelete(true).build(),DorisOptions.builder().setFenodes("192.168.220.253:8030").setTableIdentifier("app_db.orders").setUsername("root").setPassword("123456").build()));
        env.execute("mysql-stream-cdc");}/**
     * 构造变更数据源
     */privateDebeziumSourceFunction<String>buildDataChangeSource(){returnMySqlSource.<String>builder().hostname("192.168.220.253").port(3307).databaseList("app_db").tableList("app_db.orders").username("root").password("123456")/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 * timestamp:指定时间戳进行数据导入(大于等于指定时间读取数据)
                 */.startupOptions(StartupOptions.latest())//序列化.deserializer(newMysqlDeserialization()).serverTimeZone("GMT+8").build();}}

4.最后编写一个启动类

@SpringBootApplicationpublicclassFlinkCdcApplication{publicstaticvoidmain(String[] args){SpringApplication.run(FlinkCdcApplication.class, args);}}

5.准备好数据库

我的Mysql使用的是 :app_db.orders 对应的是Doris中的 app_db.orders ,字段类型,字段名需要一致哦。然后启动项目,修改Mysql的数据,Doris会自动同步过去

三.其他同步方式

Doris还支持其他的数据读写方式,具体的可以根据官网案例去尝试地址:https://doris.apache.org/zh-CN/docs/1.2/ecosystem/spark-doris-connector

标签: Doris 大数据

本文转载自: https://blog.csdn.net/u014494148/article/details/142320091
版权归原作者 墨家巨子@俏如来 所有, 如有侵权,请联系我们删除。

“五.海量数据实时分析-FlinkCDC+DorisConnector实现数据的全量增量同步”的评论:

还没有评论