flink 1.13 streamload写入doris
前言
官方教程详细介绍了基于flink 1.16的各种写入方式,本文主要介绍的是基于flink 1.13的RowData 数据流(RowDataSerializer)写入
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.1</flink.version></properties><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.13_2.12</artifactId><version>1.0.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
一、目标表
-- doris sink table CREATETABLEIFNOTEXISTS events (`user_id`INTNOTNULLCOMMENT'用户id',`date`DATECOMMENT'接收日期',`event_type`varchar(256)COMMENT'事件类型',`province`varchar(128)COMMENT'省份',`city`varchar(128)COMMENT'城市'`receive_time`DATETIME)DUPLICATEKEY(`user_id`,`receive_date`)COMMENT'事件表'PARTITIONBY RANGE (`receive_date`)(FROM("2023-01-01")TO("2023-09-14")INTERVAL1DAY)DISTRIBUTEDBYHASH(`user_id`) BUCKETS 1
PROPERTIES ("replication_allocation"="tag.location.default: 1","compression"="LZ4","dynamic_partition.enable"="true","dynamic_partition.time_unit"="DAY","dynamic_partition.end"="3","dynamic_partition.prefix"="p","dynamic_partition.buckets"="1");
二、map
publicclassEventMapFunctionextendsRichMapFunction<Event,RowData>{privatestaticfinalLogger log =LoggerFactory.getLogger(EventMapFunction.class);@OverridepublicRowDatamap(Event event){GenericRowData genericRowData =newGenericRowData(6);try{//map 字段映射
genericRowData.setField(0, event.getUserId());// 字符型需要转化,否则会报错
genericRowData.setField(1,StringData.fromString(event.getReceiveDate()));
genericRowData.setField(2,StringData.fromString(event.getEventType()));
genericRowData.setField(3,StringData.fromString(event.getProvince()));
genericRowData.setField(4,StringData.fromString(event.getCity()));
genericRowData.setField(5,StringData.fromString(event.getReceiveTime()));}catch(Exception e){
log.error("Event data map error : "+ e);}return genericRowData;}}
三、DorisSink
publicclassDorisSinkUtil{publicstaticSinkFunction<RowData>getDorisSink(String table,String labelPrefix){//写入格式 Properties properties =newProperties();
properties.setProperty("read_json_by_line","true");
properties.setProperty("format","json");
properties.setProperty("strip_outer_array","true");SinkFunction<RowData> dorisSink =DorisSink.sink(getEventFields(),getEventDataType(),DorisReadOptions.builder().build(),DorisExecutionOptions.builder().setBatchSize(3).setBatchIntervalMs(0L).setMaxRetries(3).setStreamLoadProp(properties).build(),DorisOptions.builder().setFenodes(readValue("doris.fenodes")).setTableIdentifier(table).setUsername(readValue("doris.username")).setPassword(readValue("doris.password")).build());return dorisSink;}//字段及类型一一对应publicstaticString[]getEventFields(){returnnewString[]{"user_id","receive_date","event_type","province","city","receive_time"};}publicstaticLogicalType[]getEventDataType(){returnnewLogicalType[]{newIntType(),newVarCharType(256),newVarCharType(256),newVarCharType(128),newVarCharType(128),newVarCharType(256)};}}
四、job主类
publicstaticvoidmain(String[] args){try{/** 一、 创建flink流式执行环境 */finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(0);/** 二、 获取Source */KafkaSource<String> eventSource =getKafkaSource(readValue("kafka.broker.id"),readValue("kafka.event.topic"),readValue("kafka.event.group.id"));/** 三、消费 Source */SingleOutputStreamOperator<String> eventSourceStream = env.fromSource(eventSource,WatermarkStrategy.noWatermarks(),"kafkaSource_event").setParallelism(12);/** 四、解析数据 */SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = eventSourceStream.flatMap(newEventParseFlatMap()).setParallelism(12);/** 五、将java bean类型转化为rowdata类型 */SingleOutputStreamOperator<RowData> eventStream = eventSingleOutputStreamOperator .map(newEventMapFunction()).setParallelism(12);/** 六、构建doris sink */SinkFunction<RowData> eventSink =DorisSinkUtil.getDorisSink(readValue("doris.table.event"),Constants.EVENT_LABEL_PREFIX);/** 七、输出至doris */
eventStream.addSink(eventSink).setParallelism(12);
env.execute("EventJob");}catch(Exception e){LOG.error("EventJob failed to activate", e);}}
本文转载自: https://blog.csdn.net/weixin_44378305/article/details/132868339
版权归原作者 派大星的海洋ku 所有, 如有侵权,请联系我们删除。
版权归原作者 派大星的海洋ku 所有, 如有侵权,请联系我们删除。