0


Flink 写入 Doris

环境

Doris :doris-2.0.0-alpha1-Unknown
Flink : 1.14.3
Flink-doris-connector :org.apache.doris : flink-doris-connector-1.14_2.12 : 1.1.1

写入方式

Flink DataStraem Api 方式写入 Uniq 模型表

实现

目前 Doris 官网提供两种写入方式:Flink DataStream Api 和 SQL 模式,这里主要使用 Flink DataStream Api 写入。

Flink Connector 写入原理

Flink Connector 写入 Doris 底层仍然采用 Doris Stream Load 方式实现,在原生的基础上结合 Flink 的 Checkpoint 机制和Doris Stream Load 的 two_phase_commit 机制来保证写入的原子性。

  • two_phase_commit在 Doris Stream Load 开启 two_phase_commit 时,Stream Load 导入分为两部分, 数据写入完成即会返回信息给用户,主要信息为此次写入的状态信息,此时数据不可见,事务状态为PRECOMMITTED 状态,输出如下;
[14:59:25:847][INFO] - org.apache.doris.flink.sink.writer.DorisStreamLoad.handlePreCommitResponse(DorisStreamLoad.java:206) - load Result {"TxnId":4121190,
    "Label":"label-doris-1688540300385_2_1",
    "Comment":"",
    "TwoPhaseCommit":"true",
    "Status":"Success",
    "Message":"OK",
    "NumberTotalRows":350,
    "NumberLoadedRows":350,
    "NumberFilteredRows":0,
    "NumberUnselectedRows":0,
    "LoadBytes":726192,
    "LoadTimeMs":59146,
    "BeginTxnTimeMs":0,
    "StreamLoadPutTimeMs":5,
    "ReadDataTimeMs":4,
    "WriteDataTimeMs":59133,
    "CommitAndPublishTimeMs":0}

如果写入失败此处信息中会有错误连接地址,进入连接即可查看错误原因,地址连接类似如下:
http://xxxx/api/_load_error_log?file=__shard_4/error_log_insert_stmt_954ed81f15551cbc-892fd19aacef768b_954ed81f15551cbc_892fd19aacef768b
用户手动触发commit操作之后,数据才可见,输出如下。

[14:59:25:888][INFO] - org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:119) - load result {"status":"Success",
    "msg":"transaction [4121188] commit successfully."}

写入格式

RowData 数据流

主要代码片段:构建Rowdata 数据流

dataStream.process(newProcessFunction<Row,RowData>(){@OverridepublicvoidprocessElement(Row value,Context ctx,Collector<RowData> out)throwsException{// 根据 Row 生成 GenericRowData 数据finalGenericRowData data =newGenericRowData(value.getKind(), value.getArity());// columnAndType 为自己定义的一个类,主要包含 Doris 表对应的字段和字段类型finalString[] columnsName = columnAndType.getColumnsName();finalDataType[] dataTypes = columnAndType.getDataTypes();for(int i =0; i < columnsName.length; i++){finalObject field = value.getFieldAs(columnsName[i]);// 这里需要根据 字段类型转成指定的数据格式 比如 String -> StringData// 否则在写入时会报错 类转换异常 String can not cast StringData// 其他类型同理 这里应为笔者表中只存在两种类型需要转换所以没有补全类型if(dataTypes[i].getLogicalType().getTypeRoot().equals(LogicalTypeRoot.DECIMAL)){
                        data.setField(i,DecimalData.fromBigDecimal(newBigDecimal(value.getFieldAs(columnsName[i])==null?"0": value.getFieldAs(columnsName[i]).toString()),22,4));}elseif(dataTypes[i].getLogicalType().getTypeRoot().equals(LogicalTypeRoot.INTEGER)){
                        data.setField(i,value.getFieldAs(columnsName[i]));}elseif(dataTypes[i].getLogicalType().getTypeRoot().equals(LogicalTypeRoot.VARCHAR)){
                        data.setField(i,StringData.fromString(value.getFieldAs(columnsName[i])==null?"": value.getFieldAs(columnsName[i]).toString()));}else{
                        data.setField(i,StringData.fromString(value.getFieldAs(columnsName[i]).toString()));}}

                out.collect(data);}}).name("TRANS").uid("TRANS")

主要代码片段: 构建 RowData Doris Sink

DorisSink.Builder<RowData> builder =DorisSink.builder();DorisOptions.Builder dorisBuilder =DorisOptions.builder();// Fe 地址
        dorisBuilder.setFenodes("xxxx:8030")// 表.setTableIdentifier("test_1229.test1")// 用户名.setUsername("root")// 密码.setPassword("");Properties properties =newProperties();
        properties.setProperty("format","json");
        properties.setProperty("read_json_by_line","true");DorisExecutionOptions.Builder  executionBuilder =DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix("label-doris-"+tableName+System.currentTimeMillis())//streamload label prefix.setStreamLoadProp(properties)// 表是从 RowData 中识别 RowKind 已支持删除操作 .setDeletable(true);//streamload params

        builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build())//serialize according to rowdata.setSerializer(RowDataSerializer.builder()// Doris 列名.setFieldNames(columnAndType.getColumnsName())//json format.setType("json").enableDelete(true)// Doris 列类型.setFieldType(columnAndType.getDataTypes()).build()).setDorisOptions(dorisBuilder.build());return builder
                .build();

DorisColumnAndType 类

publicclassDorisColumnAndTypeimplementsSerializable{privateString[] columnsName;privateDataType[]DataTypes;publicString[]getColumnsName(){return columnsName;}publicvoidsetColumnsName(String[] columnsName){this.columnsName = columnsName;}publicDataType[]getDataTypes(){returnDataTypes;}publicvoidsetDataTypes(DataType[] dataTypes){DataTypes= dataTypes;}@OverridepublicStringtoString(){return"DorisColumnAndType{"+"columnsName="+Arrays.toString(columnsName)+", DataTypes="+Arrays.toString(DataTypes)+'}';}}

SchemaChange 数据流

如果上游 Kafka 数据为 debezium-json 格式建议使用该方式写入较为方便。

主要代码片段:构建 Doris Sink

Properties props =newProperties();
props.setProperty("format","json");
props.setProperty("read_json_by_line","true");DorisOptions dorisOptions =DorisOptions.builder()// FE 地址.setFenodes("xxxxx:8030")// Doris 库名+表明.setTableIdentifier("test_1229.test1")// Doris 用户名.setUsername("root")// Doris 密码.setPassword("").build();DorisExecutionOptions.Builder  executionBuilder =DorisExecutionOptions.builder();// 设置 LabelPrefix 即 Stream Load LabelPrefix
executionBuilder.setLabelPrefix("label-doris"+System.currentTimeMillis()).setStreamLoadProp(props)// 是否识别数据中的删除标识,即 识别 —D 的数据进行删除.setDeletable(true);DorisSink.Builder<String> builder =DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());finalDorisSink<String> dorisSink = builder.build();

Flink 任务 UI 大致如下,不同的处理 UI 图可能不一样:
image.png

写入异常分析

  1. Reason: column(DORIS_DELETE_SIGN) values is null while columns is not nullable. src line [2023070520230705H701010018100190512303096D NULL NULL];

DORIS_DELETE_SIGN 该字段为 Doris 隐藏列, DorisSink默认会根据 RowKind 来区分事件的类型,通常这种在 cdc 情况下可以直接获取到事件类型,对隐藏列__DORIS_DELETE_SIGN__进行赋值达到删除的目的,而Kafka则需要根据业务逻辑判断,显示的传入隐藏列的值。在上述的

executionBuilder.setLabelPrefix("label-doris"+System.currentTimeMillis()).setStreamLoadProp(props)// 是否识别数据中的删除标识,即 识别 —D 的数据进行删除.setDeletable(true);

如果没有设置 setDeletable 为True,将导致该字段为空从而导致写入失败。

标签: flink 大数据

本文转载自: https://blog.csdn.net/weixin_39750695/article/details/131558387
版权归原作者 本旺 所有, 如有侵权,请联系我们删除。

“Flink 写入 Doris”的评论:

还没有评论