0


大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库

大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库

可以通过Flink操作(读取、插入、修改、删除)支持存储在Doris中的数据。本文介绍了如何通过Datastream和Flink操作Doris。

注意:

  • 修改和删除仅支持唯一键模型。
  • 当前的删除是为了支持Flink CDC访问数据以实现自动删除。如果要删除其他数据访问方法,您需要自行实现。

一、版本兼容性

在这里插入图片描述

二、使用

Maven

添加 flink-doris-connector

<!-- flink-doris-connector --><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>1.6.0</version></dependency>
  • 请根据不同的Flink版本替换相应的Connector和Flink依赖版本。
  • 也可以从这里下载相关版本的jar包。

flink-doris-connector下载地址:

编译

  • 编译时直接运行sh build.sh即可。
  • 编译成功后,会在dist目录下生成目标jar包,如:flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的类路径中以使用 Flink-Doris-Connector。例如,Flink 运行在 Local 模式,则将此文件放在 lib/ 文件夹中。 Flink运行在Yarn集群模式下,将此文件放入预部署包中。

三、Flink SQL

read

-- doris sourceCREATETABLE flink_doris_source (
     name STRING,
     age INT,
     price DECIMAL(5,2),
     sale DOUBLE)WITH('connector'='doris','fenodes'='FE_IP:HTTP_PORT','table.identifier'='database.table','username'='root','password'='password');

write

--enable checkpoint
SET 'execution.checkpointing.interval'='10s';

-- doris sink
CREATE TABLE flink_doris_sink (
     name STRING,
     age INT,
     price DECIMAL(5,2),
     sale DOUBLE
     )
     WITH ('connector'='doris',
       'fenodes'='FE_IP:HTTP_PORT',
       'table.identifier'='db.table',
       'username'='root',
       'password'='password',
       'sink.label-prefix'='doris_label');

-- submit insert job
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

四、DataStream

read

DorisOptions.Builder builder =DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource =DorisSource.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(newSimpleListDeserializationSchema()).build();

env.fromSource(dorisSource,WatermarkStrategy.noWatermarks(),"doris source").print();

write

DorisSink通过StreamLoad向Doris写入数据,DataStream写入时支持不同的序列化方式

字符串数据流(SimpleStringSerializer)

// enable checkpoint
env.enableCheckpointing(10000);// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);DorisSink.Builder<String> builder =DorisSink.builder();DorisOptions.Builder dorisBuilder =DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");Properties properties =newProperties();// When the upstream is writing json, the configuration needs to be enabled.//properties.setProperty("format", "json");//properties.setProperty("read_json_by_line", "true");DorisExecutionOptions.Builder executionBuilder =DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris")//streamload label prefix.setDeletable(false).setStreamLoadProp(properties);;

builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(newSimpleStringSerializer())//serialize according to string.setDorisOptions(dorisBuilder.build());//mock string sourceList<Tuple2<String,Integer>> data =newArrayList<>();
data.add(newTuple2<>("doris",1));DataStreamSource<Tuple2<String,Integer>> source = env.fromCollection(data);

source.map((MapFunction<Tuple2<String,Integer>,String>) t -> t.f0 +"\t"+ t.f1).sinkTo(builder.build());//mock json string source//env.fromElements("{\"name\":\"zhangsan\",\"age\":1}").sinkTo(builder.build());

RowData数据流(RowDataSerializer)

// enable checkpoint
env.enableCheckpointing(10000);// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);//doris sink optionDorisSink.Builder<RowData> builder =DorisSink.builder();DorisOptions.Builder dorisBuilder =DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");// json format to streamloadProperties properties =newProperties();
properties.setProperty("format","json");
properties.setProperty("read_json_by_line","true");DorisExecutionOptions.Builder executionBuilder =DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris")//streamload label prefix.setDeletable(false).setStreamLoadProp(properties);//streamload params//flink rowdata's schemaString[] fields ={"city","longitude","latitude","destroy_date"};DataType[] types ={DataTypes.VARCHAR(256),DataTypes.DOUBLE(),DataTypes.DOUBLE(),DataTypes.DATE()};

builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder()//serialize according to rowdata.setFieldNames(fields).setType("json")//json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata sourceDataStream<RowData> source = env.fromElements("").map(newMapFunction<String,RowData>(){@OverridepublicRowDatamap(String value)throwsException{GenericRowData genericRowData =newGenericRowData(4);
             genericRowData.setField(0,StringData.fromString("beijing"));
             genericRowData.setField(1,116.405419);
             genericRowData.setField(2,39.916927);
             genericRowData.setField(3,LocalDate.now().toEpochDay());return genericRowData;}});

source.sinkTo(builder.build());

SchemaChange数据流(JsonDebeziumSchemaSerializer)

// enable checkpoint
env.enableCheckpointing(10000);Properties props =newProperties();
props.setProperty("format","json");
props.setProperty("read_json_by_line","true");DorisOptions dorisOptions =DorisOptions.builder().setFenodes("127.0.0.1:8030").setTableIdentifier("test.t1").setUsername("root").setPassword("").build();DorisExecutionOptions.Builder executionBuilder =DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix").setStreamLoadProp(props).setDeletable(true);DorisSink.Builder<String> builder =DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());

env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"MySQL Source").sinkTo(builder.build());

五、Lookup Join

CREATETABLE fact_table (`id`BIGINT,`name` STRING,`city` STRING,`process_time`as proctime())WITH('connector'='kafka',...);createtable dim_city(`city` STRING,`level`INT,`province` STRING,`country` STRING
)WITH('connector'='doris','fenodes'='127.0.0.1:8030','jdbc-url'='jdbc:mysql://127.0.0.1:9030','table.identifier'='dim.dim_city','username'='root','password'='');SELECT a.id, a.name, a.city, c.province, c.country,c.levelFROM fact_table a
LEFTJOIN dim_city FOR SYSTEM_TIME ASOF a.process_time AS c
ON a.city = c.city
  • 这个命令是用于创建两个表和一个查询语句。第一个表是名为"fact_table"的表,它有四个列,分别为"id"、“name”、“city"和"process_time”。其中,"id"列是BIGINT类型,"name"和"city"列是STRING类型,"process_time"列是一个基于当前系统时间的计算列,它使用"proctime()"函数实现。
  • 第二个表是名为"dim_city"的表,它有四个列,分别为"city"、“level”、“province"和"country”。其中,“city”、“province"和"country"列是STRING类型,“level"列是INT类型。该表使用"Doris"作为存储引擎,连接器为"connector”,并且需要指定连接器的其他参数,如"fenodes”、“jdbc-url”、“table.identifier”、"username"和"password"等。
  • 最后一个命令是一个查询语句,它使用"LEFT JOIN"将"fact_table"和"dim_city"两个表进行连接,并使用"FOR SYSTEM_TIME AS OF"来指定连接时的时间戳,这里使用了"process_time"列的值。查询结果包括"id"、“name”、“city”、“province”、"country"和"level"这些列。

六、配置

通用配置项

fenodes:

  • Doris FE http地址,支持多个地址,用逗号分隔

benodes:

  • Doris BE http地址,支持多个地址,以逗号分隔。

jdbc-url:

  • jdbc连接信息,如:jdbc:mysql://127.0.0.1:9030

table.identifier:

  • Doris表名,如:db.tbl

auto-redirect:

  • 默认值:true
  • 是否重定向 StreamLoad 请求。开启后StreamLoad会通过FE写入,不再显示BE信息。

doris.request.retries:

  • 默认值:3
  • 向 Doris 发送请求的重试次数

doris.request.connect.timeout.ms:

  • 默认值:30000
  • 向 Doris 发送请求的连接超时

doris.request.read.timeout.ms:

  • 默认值:30000
  • 读取向 Doris 发送请求的超时

源配置项

doris.request.query.timeout.s:

  • 默认值:3600
  • 查询Doris的超时时间,默认1小时,-1表示无超时限制

doris.request.tablet.size:

  • 默认值:Integer. MAX_VALUE
  • 一个Partition对应的Doris Tablet数量。该值设置得越小,生成的Partition就越多。这提高了 Flink 端的并行度,但同时也给 Doris 带来了更大的压力。

doris.batch.size:

  • 默认值:1024
  • 一次从BE读取数据的最大行数。增加该值会减少 Flink 和 Doris 之间建立的连接数量。从而减少网络延迟带来的额外时间开销。

doris.exec.mem.limit:

  • 默认值:2147483648
  • 单个查询的内存限制。默认为2GB,以字节为单位

doris.deserialize.arrow.async:

  • 默认值:FALSE
  • 是否支持flink-doris-connector迭代所需的Arrow格式异步转换为RowBatch

doris.deserialize.queue.size:

  • 默认值:64
  • Arrow格式的内部处理队列的异步转换,当doris.deserialize.arrow.async为true时有效

doris.read.field:

  • 读取Doris表的列名列表,以逗号分隔

doris.filter.query:

  • 过滤读取数据的表达式,这个表达式透明传递给Doris。 Doris使用这个表达式来完成源端的数据过滤。例如年龄=18。

接收器配置项

sink.label-prefix:

  • Stream加载导入使用的标签前缀。在2pc场景下,需要全局唯一性来保证Flink的EOS语义。

sink.properties.*:

  • 导入流负载参数。 例如: ‘sink.properties.column_separator’ = ', ’ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符, ‘\x01’ 将转换为二进制 0x01
  • JSON格式导入 ‘sink.properties.format’ = ‘json’ ‘sink.properties.按行读取 json’ = ‘true’ 详细参数请参考这里。

sink.enable-delete:

  • 默认值:TRUE
  • 是否启用删除。该选项需要Doris表开启批量删除功能(Doris 0.15+版本默认开启),且仅支持Unique模型。

sink.enable-2pc:

  • 默认值:TRUE
  • 是否启用两阶段提交(2pc),默认为true,以保证Exactly-Once语义。

sink.buffer-size:

  • 默认值:1MB
  • 写入数据缓存缓冲区的大小,以字节为单位。不建议修改,默认配置即可

sink.buffer-count:

  • 默认值:3
  • 写入数据缓冲区的数量。不建议修改,默认配置即可

sink.max-retries:

  • 默认值:3
  • Commit失败后最大重试次数,默认3

sink.use-cache:

  • 默认值:false
  • 发生异常时,是否使用内存缓存进行恢复。启用后,Checkpoint 期间的数据将保留在缓存中。

sink.enable.batch-mode:

  • 默认值:false
  • 是否使用批处理模式写入Doris。使能后,写入时序不依赖于Checkpoint。写入是通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数控制的。输入机会。 同时开启后,Exactly-once语义将无法保证。 Uniq模型可以用来实现幂等性。

sink.flush.queue-size:

  • 默认值:2
  • 在批处理模式下,缓存的列大小。

sink.buffer-flush.max-rows:

  • 默认值:50000
  • 批处理模式下,单批写入的最大数据行数。

sink.buffer-flush.max-bytes:

  • 默认值:10MB
  • 在批处理模式下,单批写入的最大字节数。

sink.buffer-flush.interval:

  • 默认值:10s
  • 批处理模式下,异步刷新缓存的时间间隔

sink.ignore.update-before:

  • 默认值:true
  • 是否忽略update-before事件,默认忽略。

查找Join配置项

lookup.cache.max-rows

  • 默认值:-1
  • 查找缓存的最大行数,默认值为-1,不启用缓存

lookup.cache.ttl:

  • 默认值:10s
  • 查找缓存的最大时间,默认10s

lookup.max-retries:

  • 默认值:1
  • 查找查询失败后重试的次数

lookup.jdbc.async:

  • 默认值:false
  • 是否启用异步查找,默认为false

lookup.jdbc.read.batch.size:

  • 默认值:128
  • 异步查找下,每个查询的最大批量大小

lookup.jdbc.read.batch.queue-size:

  • 默认值:256
  • 异步查找时中间缓冲队列的大小

lookup.jdbc.read.thread-size:

  • 默认值:3
  • 每个任务中用于查找的jdbc线程数

七、Doris 和 Flink 列类型映射

Doris类型Flink类型NULL_TYPENULLBOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEDATEDATEDATETIMETIMESTAMPDECIMALDECIMALCHARSTRINGLARGEINTSTRINGVARCHARSTRINGSTRINGSTRINGDECIMALV2DECIMALARRAYARRAYMAPMAPJSONSTRINGVARIANTSTRINGIPV4STRINGIPV6STRING
从connector-1.6.1版本开始,增加了对Variant、IPV6、IPV4三种数据类型读取的支持。读取 IPV6 和 Variant 需要 Doris 2.1.1 或更高版本。

八、使用Flink CDC访问Doris的示例

SET 'execution.checkpointing.interval'='10s';
CREATE TABLE cdc_mysql_source (id int
  ,name VARCHAR
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector'='mysql-cdc',
 'hostname'='127.0.0.1',
 'port'='3306',
 'username'='root',
 'password'='password',
 'database-name'='database',
 'table-name'='table');

-- Support synchronous insert/update/delete events
CREATE TABLE doris_sink (id INT,
name STRING
) 
WITH ('connector'='doris',
  'fenodes'='127.0.0.1:8030',
  'table.identifier'='database.table',
  'username'='root',
  'password'='',
  'sink.properties.format'='json',
  'sink.properties.read_json_by_line'='true',
  'sink.enable-delete'='true', -- Synchronize delete events
  'sink.label-prefix'='doris_label');

insert into doris_sink select id,name from cdc_mysql_source;

九、使用FlinkSQL通过CDC访问并实现部分列更新的示例

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';CREATETABLE cdc_mysql_source (
   id int,name STRING,bank STRING,age int,PRIMARYKEY(id)NOTENFORCED)WITH(
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1','port'='3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);CREATETABLE doris_sink (
    id INT,
    name STRING,
    bank STRING,
    age int)WITH(
  'connector' = 'doris',
  'fenodes' = '127.0.0.1:8030',
  'table.identifier' = 'database.table',
  'username' = 'root',
  'password' = '',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial_columns' = 'true' --Enable partial column updates
);

insert into doris_sink select id,name,bank,age from cdc_mysql_source;

十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)

MySQL同步示例

<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s\-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools\
     lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
     mysql-sync-database\--database test_db \
     --mysql-conf hostname=127.0.0.1 \
     --mysql-conf port=3306\
     --mysql-conf username=root \
     --mysql-conf password=123456\
     --mysql-conf database-name=mysql_db \
     --including-tables "tbl1|test.*"\
     --sink-conf fenodes=127.0.0.1:8030 \
     --sink-conf username=root \
     --sink-conf password=123456\
     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=1

Oracle同步示例

<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools \
      ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\
      oracle-sync-database \--database test_db \
      --oracle-conf hostname=127.0.0.1 \
      --oracle-conf port=1521\
      --oracle-conf username=admin \
      --oracle-conf password="password"\
      --oracle-conf database-name=XE \
      --oracle-conf schema-name=ADMIN \
      --including-tables "tbl1|tbl2"\
      --sink-conf fenodes=127.0.0.1:8030 \
      --sink-conf username=root \
      --sink-conf password=\
      --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
      --sink-conf sink.label-prefix=label \
      --table-conf replication_num=1

PostgreSQL 同步示例

<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools \
     ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
     postgres-sync-database \--database db1\
     --postgres-conf hostname=127.0.0.1 \
     --postgres-conf port=5432\
     --postgres-conf username=postgres \
     --postgres-conf password="123456"\
     --postgres-conf database-name=postgres \
     --postgres-conf schema-name=public \
     --postgres-conf slot.name=test \
     --postgres-conf decoding.plugin.name=pgoutput \
     --including-tables "tbl1|tbl2"\
     --sink-conf fenodes=127.0.0.1:8030 \
     --sink-conf username=root \
     --sink-conf password=\
     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=1

SQLServer同步示例

<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools \
     ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
     sqlserver-sync-database \--database db1\
     --sqlserver-conf hostname=127.0.0.1 \
     --sqlserver-conf port=1433\
     --sqlserver-conf username=sa \
     --sqlserver-conf password="123456"\
     --sqlserver-conf database-name=CDC_DB \
     --sqlserver-conf schema-name=dbo \
     --including-tables "tbl1|tbl2"\
     --sink-conf fenodes=127.0.0.1:8030 \
     --sink-conf username=root \
     --sink-conf password=\
     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=1

十一、使用FlinkCDC更新Key列

一般来说,在业务数据库中,数字被用作表的主键,例如学生表中使用数字(id)作为主键,但随着业务的发展,与数据对应的数字可能会发生变化。在这种情况下,使用FlinkCDC + Doris Connector进行数据同步可以自动更新Doris主键列中的数据。

原理

Flink CDC的底层采集工具是Debezium。Debezium内部使用op字段来识别相应的操作:op字段的值为c、u、d和r,分别对应创建、更新、删除和读取。对于主键列的更新,FlinkCDC将向下游发送DELETE和INSERT事件,在数据同步到Doris后,会自动更新主键列的数据。

示例

Flink程序可以参考上述CDC同步示例。任务成功提交后,在MySQL端执行更新主键列语句(update student set id = ‘1002’ where id = ‘1001’),以修改Doris中的数据。

十二、使用Flink根据指定的列删除数据

通常,Kafka中的消息使用特定的字段来标记操作类型,例如{“op_type”:“delete”,data:{…}}。对于这种类型的数据,希望删除op_type=delete的数据。

默认情况下,DorisSink将根据RowKind来区分事件类型。通常,在cdc的情况下,可以直接获取事件类型,并将隐藏列__DORIS_DELETE_SIGN__赋值以实现删除的目的,而Kafka需要基于业务逻辑进行判断,显示传递给隐藏列的值。

-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH ('connector'='kafka',
  ...
);

CREATE TABLE DORIS_SINK(id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH ('connector'='doris',
  'fenodes'='127.0.0.1:8030',
  'table.identifier'='db.table',
  'username'='root',
  'password'='',
  'sink.enable-delete'='false',        -- false means not to get the event type from RowKind
  'sink.properties.columns'='id, name, __DORIS_DELETE_SIGN__'  -- Display the importcolumn of the specified streamload
);

INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
from KAFKA_SOURCE;
  • 这段代码是一个示例,演示了如何使用Flink从Kafka源表读取数据,并将其写入Doris目标表。具体来说,如果源表中的数据op_type字段的值为"delete",则希望在Doris目标表中删除相应的数据。
  • 首先,在Kafka源表的定义中,我们有一个data字段用于存储源数据的JSON字符串,以及一个op_type字段用于标识操作类型。
  • 然后,在Doris目标表的定义中,我们有一个id字段和一个name字段来存储数据的具体内容,还有一个名为__DORIS_DELETE_SIGN__的隐藏列,用于标识是否要进行删除操作。
  • 在INSERT INTO语句中,我们将从Kafka源表中选择data字段的id和name,并使用json_value函数提取相应的值。同时,我们使用if函数将op_type字段的值与"delete"进行比较,如果相等则将__DORIS_DELETE_SIGN__赋值为1,否则赋值为0。
  • 最后,将处理后的数据插入到Doris目标表中。
  • 总之,这段代码的作用是根据源表中的op_type字段值,将对应的数据删除或写入到Doris目标表中。

十三、最佳实践应用场景

  • 使用Flink Doris Connector最适合的场景是实时/批量将源数据同步到Doris(Mysql、Oracle、PostgreSQL)中,然后使用Flink对Doris和其他数据源中的数据进行联合分析。您还可以使用Flink Doris Connector。

其他注意事项:

  • Flink Doris Connector主要依赖于Checkpoint进行流式写入,因此Checkpoint之间的时间间隔就是数据的可见延迟时间。
  • 为了确保Flink的Exactly Once语义,Flink Doris Connector默认启用两阶段提交,Doris在1.1版本之后默认启用两阶段提交。1.0可以通过修改BE参数来启用。

十四、常见问题解答

Doris Source读取数据后,为什么流会结束?

  • 目前Doris Source是有界流,不支持CDC读取。

Flink能否读取Doris并执行条件下推?

  • 通过配置doris.filter.query参数可以实现。

如何写入位图类型?

CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH ('connector'='doris',
   'fenodes'='127.0.0.1:8030',
   'table.identifier'='test.bitmap_test',
   'username'='root',
   'password'='',
   'sink.label-prefix'='doris_label',
   'sink.properties.columns'='dt,page,user_id,user_id=to_bitmap(user_id)')

errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

  • 在Exactly-Once场景中,Flink Job必须从最新的Checkpoint/Savepoint重新启动,否则会报告上述错误。当不需要Exactly-Once时,可以通过关闭2PC提交(sink.enable-2pc=false)或更改不同的sink.label-prefix来解决。

errCode = 2, detailMessage = transaction [19650] not found

  • 发生在Commit阶段,Checkpoint中记录的事务ID在FE端已过期,在此时再次提交时会出现上述错误。此时无法从Checkpoint启动,可以通过修改fe.conf中的streaming_label_keep_max_second配置来延长过期时间,默认为12小时。

errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

  • 这是因为同一库的并发导入超过了100,可以通过调整fe.conf的max_running_txn_num_per_db参数来解决。详细信息,请参考max_running_txn_num_per_db。
  • 同时,如果一个任务频繁修改标签并重新启动,也可能导致此错误发生。在2pc场景(重复/聚合模型)中,每个任务的标签需要唯一,在从Checkpoint重新启动时,Flink任务将主动中止之前已经成功预提交但未提交的事务。频繁修改标签并重新启动将导致大量已成功预提交的事务无法中止,占用事务。在Unique模型下,也可以关闭2pc,实现幂等写入。

当Flink向Uniq模型写入一批数据时,如何确保数据的顺序?

  • 您可以添加序列列的配置来确保顺序。

Flink任务没有报错,但数据无法同步?

  • 在Connector1.1.0之前,数据是批量写入的,并且写入是由数据驱动的。需要确定上游是否有数据写入。在1.1.0之后,它依赖于Checkpoint,并且必须启用Checkpoint才能进行写入。

tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235

  • 通常发生在Connector1.1.0之前,这是因为写入频率过快,导致版本过多。可以通过设置sink.batch.size和sink.batch.interval参数来减少Streamload的频率。

源表和Doris表应如何对应?

  • 在使用Flink Connector导入数据时,需要注意两个方面。第一,源表的列和类型应与Flink SQL中的列和类型对应;第二,Flink SQL中的列和类型必须与Doris表的列和类型匹配。

TApplicationException: get_next failed: out of sequence response: expected 4 but got 3

  • 这是由于 Thrift 中的并发错误造成的。建议您尽可能使用最新的连接器和兼容的 Flink 版本。

DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc

  • 您可以在TaskManager中搜索日志中止事务响应,并根据HTTP返回码判断是客户端问题还是服务器问题。

org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. “xx” encountered at row x, column xx

  • 这个问题主要是由于条件中的varchar/string类型,需要进行引号转义。正确的写法是xxx = ‘‘xxx’’。这样,Flink SQL解析器会将连续的两个单引号解释为一个单引号字符,而不是字符串的结束,并将拼接的字符串作为属性的值。例如:t1 ‍>= ‘2024-01-01’ 可以写为 ‘doris.filter.query’ = ‘t1 ‍>=’‘2024-01-01’'。

Failed to connect to backend: http://host:webserver_port, and BE is still alive

  • 这个问题可能是由于配置了be的IP地址,而该地址无法被外部的Flink集群访问。这主要是因为在连接fe时,be的地址是通过fe进行解析的。例如,如果将be地址添加为’127.0.0.1’,那么Flink集群通过fe获取到的be地址将是’127.0.0.1:webserver_port’,并且Flink将连接到该地址。当出现这种问题时,可以通过将be的实际对应的外部IP地址添加到"with"属性中来解决:‘benodes’=“be_ip:webserver_port,be_ip:webserver_port…”。对于整个数据库的同步,可以使用以下属性:–sink-conf benodes=be_ip:webserver,be_ip:webserver…。

当使用Flink-connector将MySQL数据同步到Doris时,时间戳之间存在几小时的时间差。

  • Flink Connector默认使用UTC+8时区从MySQL同步整个数据库。如果您的数据位于不同的时区,您可以使用以下配置进行调整,例如:–mysql-conf debezium.date.format.timestamp.zone=“UTC+3”。

本文转载自: https://blog.csdn.net/zhengzaifeidelushang/article/details/141161699
版权归原作者 最笨的羊羊 所有, 如有侵权,请联系我们删除。

“大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库”的评论:

还没有评论