0


Apache Doris 系列: 基础篇-Flink SQL写入Doris

简介

本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分:

  • Flink Doris connector
  • Doris FE 节点配置
  • Flink SQL 写 Doris

Flink Doris connector

Flink Doris connector 本质是通过Stream Load来时实现数据的查询和写入功能。
支持二阶段提交,可实现Exatly Once的写入。

Doris FE 节点配置

1)需在 apache-doris/fe/fe.conf 配置文件添加如下配置:

enable_http_server_v2 = true
  1. 重启 FE 节点
apache-doris/fe/bin/stop_fe.sh
apache-doris/fe/bin/start_fe.sh --daemon

Doris BE 节点配置

1)需在 apache-doris/be/be.conf 配置文件添加如下配置:

enable_stream_load_record = true
  1. 重启 BE 节点
apache-doris/be/bin/stop_be.sh
apache-doris/be/bin/start_be.sh --daemon

Doris 建表语句

CREATETABLE order_info (
  order_date dateNOTNULLCOMMENT'下单日期',
  order_id int(11)NOTNULLCOMMENT'订单id',
  buy_num tinyint(4)NULLCOMMENT'购买件数',
  user_id int(11)NULLCOMMENT'[-9223372036854775808, 9223372036854775807]',
  create_time datetimeNULLCOMMENT'创建时间',
  update_time datetimeNULLCOMMENT'更新时间')ENGINE=OLAP
DUPLICATEKEY(order_date, order_id)COMMENT'OLAP'DISTRIBUTEDBYHASH(order_id) BUCKETS 2
PROPERTIES ("replication_allocation"="tag.location.default: 1");

本例使用的明细模型,仅insert操作,如需update/delete,则需选择Unique模型

生成测试数据

通过Flink SQL自带的datagen生成测试数据:

CREATETABLE order_info_source (
    order_date DATE,
    order_id     INT,
    buy_num      INT,
    user_id      INT,
    create_time  TIMESTAMP(3),
    update_time   TIMESTAMP(3))WITH('connector'='datagen','rows-per-second'='10','fields.order_id.min'='30001','fields.order_id.max'='30500','fields.user_id.min'='10001','fields.user_id.max'='20001','fields.buy_num.min'='10','fields.buy_num.max'='20','number-of-rows'='100')

datagen参数:

'rows-per-second' = '10'

: 每秒发送10条数据

'fields.order_id.min' = '30001'

: order_id最小值为30001

'fields.order_id.max' = '30500'

: order_id最大值为30500

'fields.user_id.min' = '10001'

: user_id最小值为10001

'fields.user_id.max' = '20001'

: user_id最大值为20001

'fields.buy_num.min' = '10'

: buy_num最小值为10

'fields.buy_num.max' = '20'

: buy_num最大值为20

'number-of-rows' = '100'

: 共发送100条数据, 不设置的话会无限量发送数据

更多细节,请前往官网DataGen SQL Connector

注册Doris Sink表

CREATETABLE order_info_sink (  
order_date DATE,  
order_id INT,  
buy_num INT,
user_id INT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3))WITH('connector'='doris','fenodes'='192.168.56.104:8030','table.identifier'='test.order_info_example','username'='test','password'='password123','sink.label-prefix'='sink_doris_label_8')

写入Doris Sink表

insertinto order_info_sink select*from order_info_source

通过Mysql客户端查看Doris表的数据

mysql>select*from  test.order_info_example limit10;+------------+----------+---------+---------+---------------------+---------------------+| order_date | order_id | buy_num | user_id | create_time         | update_time         |+------------+----------+---------+---------+---------------------+---------------------+|2022-09-22|30007|10|10560|2022-09-2207:42:21|2022-09-2207:42:21||2022-09-22|30125|16|17591|2022-09-2207:42:26|2022-09-2207:42:26||2022-09-22|30176|17|10871|2022-09-2207:42:24|2022-09-2207:42:24||2022-09-22|30479|16|19847|2022-09-2207:42:25|2022-09-2207:42:25||2022-09-22|30128|16|19807|2022-09-2207:42:24|2022-09-2207:42:24||2022-09-22|30039|13|18237|2022-09-2207:42:28|2022-09-2207:42:28||2022-09-22|30060|10|18309|2022-09-2207:42:24|2022-09-2207:42:24||2022-09-22|30246|18|10855|2022-09-2207:42:24|2022-09-2207:42:24||2022-09-22|30288|19|12347|2022-09-2207:42:26|2022-09-2207:42:26||2022-09-22|30449|17|11488|2022-09-2207:42:23|2022-09-2207:42:23|+------------+----------+---------+---------+---------------------+---------------------+10rowsinset(0.05 sec)

完整代码

src/main/java/FlinkSQLSinkExample.java


本文转载自: https://blog.csdn.net/weixin_47298890/article/details/127001562
版权归原作者 修破立生 所有, 如有侵权,请联系我们删除。

“Apache Doris 系列: 基础篇-Flink SQL写入Doris”的评论:

还没有评论