简介
本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分:
- Flink Doris connector
- Doris FE 节点配置
- Flink SQL 写 Doris
Flink Doris connector
Flink Doris connector 本质是通过Stream Load来时实现数据的查询和写入功能。
支持二阶段提交,可实现Exatly Once的写入。
Doris FE 节点配置
1)需在 apache-doris/fe/fe.conf 配置文件添加如下配置:
enable_http_server_v2 = true
- 重启 FE 节点
apache-doris/fe/bin/stop_fe.sh
apache-doris/fe/bin/start_fe.sh --daemon
Doris BE 节点配置
1)需在 apache-doris/be/be.conf 配置文件添加如下配置:
enable_stream_load_record = true
- 重启 BE 节点
apache-doris/be/bin/stop_be.sh
apache-doris/be/bin/start_be.sh --daemon
Doris 建表语句
CREATETABLE order_info (
order_date dateNOTNULLCOMMENT'下单日期',
order_id int(11)NOTNULLCOMMENT'订单id',
buy_num tinyint(4)NULLCOMMENT'购买件数',
user_id int(11)NULLCOMMENT'[-9223372036854775808, 9223372036854775807]',
create_time datetimeNULLCOMMENT'创建时间',
update_time datetimeNULLCOMMENT'更新时间')ENGINE=OLAP
DUPLICATEKEY(order_date, order_id)COMMENT'OLAP'DISTRIBUTEDBYHASH(order_id) BUCKETS 2
PROPERTIES ("replication_allocation"="tag.location.default: 1");
本例使用的明细模型,仅insert操作,如需update/delete,则需选择Unique模型
生成测试数据
通过Flink SQL自带的datagen生成测试数据:
CREATETABLE order_info_source (
order_date DATE,
order_id INT,
buy_num INT,
user_id INT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3))WITH('connector'='datagen','rows-per-second'='10','fields.order_id.min'='30001','fields.order_id.max'='30500','fields.user_id.min'='10001','fields.user_id.max'='20001','fields.buy_num.min'='10','fields.buy_num.max'='20','number-of-rows'='100')
datagen参数:
'rows-per-second' = '10'
: 每秒发送10条数据
'fields.order_id.min' = '30001'
: order_id最小值为30001
'fields.order_id.max' = '30500'
: order_id最大值为30500
'fields.user_id.min' = '10001'
: user_id最小值为10001
'fields.user_id.max' = '20001'
: user_id最大值为20001
'fields.buy_num.min' = '10'
: buy_num最小值为10
'fields.buy_num.max' = '20'
: buy_num最大值为20
'number-of-rows' = '100'
: 共发送100条数据, 不设置的话会无限量发送数据
更多细节,请前往官网DataGen SQL Connector
注册Doris Sink表
CREATETABLE order_info_sink (
order_date DATE,
order_id INT,
buy_num INT,
user_id INT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3))WITH('connector'='doris','fenodes'='192.168.56.104:8030','table.identifier'='test.order_info_example','username'='test','password'='password123','sink.label-prefix'='sink_doris_label_8')
写入Doris Sink表
insertinto order_info_sink select*from order_info_source
通过Mysql客户端查看Doris表的数据
mysql>select*from test.order_info_example limit10;+------------+----------+---------+---------+---------------------+---------------------+| order_date | order_id | buy_num | user_id | create_time | update_time |+------------+----------+---------+---------+---------------------+---------------------+|2022-09-22|30007|10|10560|2022-09-2207:42:21|2022-09-2207:42:21||2022-09-22|30125|16|17591|2022-09-2207:42:26|2022-09-2207:42:26||2022-09-22|30176|17|10871|2022-09-2207:42:24|2022-09-2207:42:24||2022-09-22|30479|16|19847|2022-09-2207:42:25|2022-09-2207:42:25||2022-09-22|30128|16|19807|2022-09-2207:42:24|2022-09-2207:42:24||2022-09-22|30039|13|18237|2022-09-2207:42:28|2022-09-2207:42:28||2022-09-22|30060|10|18309|2022-09-2207:42:24|2022-09-2207:42:24||2022-09-22|30246|18|10855|2022-09-2207:42:24|2022-09-2207:42:24||2022-09-22|30288|19|12347|2022-09-2207:42:26|2022-09-2207:42:26||2022-09-22|30449|17|11488|2022-09-2207:42:23|2022-09-2207:42:23|+------------+----------+---------+---------+---------------------+---------------------+10rowsinset(0.05 sec)
完整代码
src/main/java/FlinkSQLSinkExample.java
版权归原作者 修破立生 所有, 如有侵权,请联系我们删除。