Flink实现同时消费多个kafka topic,并输出到多个topic
1.说明
1)代码使用的flink版本为1.16.1,旧版本的依赖及api可能不同,同时使用了hutool的JSON工具类,两者均可自行更换;
2)本次编写的两个方案,均只适用于数据源topic来自同一个集群,且kafka消费组相同,暂未研究flink的connect算子join多条流
2.依赖引用
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.1</flink.version><hutool.version>5.8.15</hutool.version></properties><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><!--flink--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><exclusions><exclusion><artifactId>commons-lang3</artifactId><groupId>org.apache.commons</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><exclusions><exclusion><artifactId>commons-lang3</artifactId><groupId>org.apache.commons</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency>
3. 方案一:适用于sink topic存在跨集群等kafka生产者配置信息不相同的情况
代码涉及Hadoop相关环境,若无该环境的同学,可以设置为本地路径
3.1配置文件
# 输入topic列表newInputTopic=hive_data_input_topic
# 输出topic列表newOutputTopic=topic-test
3.2 java代码
publicstaticvoidmain(String[] args)throwsException{// 设置操作HDFS的用户System.setProperty("HADOOP_USER_NAME","hadoop");// 获取命令行参数,args[0] 为配置文件路径 input/customer.propertiesParameterTool parameterTool =ParameterTool.fromPropertiesFile(args[0]);String inputTopic = parameterTool.get("newInputTopic");String outputTopic = parameterTool.get("newOutputTopic");// 构建输入topicArrayList<String> inputTopicList =newArrayList<>();
inputTopicList.add("canal_mysql_input_topic");if(!StringUtils.isNullOrWhitespaceOnly(inputTopic)){
inputTopicList.add(inputTopic);}// 构建输出topicMap<String,String> hashMap =newHashMap<>();
hashMap.put("ap_article","canal_input_topic");
hashMap.put("ap_user","cast_topic_input");if(!StringUtils.isNullOrWhitespaceOnly(outputTopic)){
hashMap.put("hive_table_orders","topic-test");}// 构建配置Configuration configuration =newConfiguration();// 设定本地flink dashboard的webUi访问端口,即http://localhost:9091
configuration.setString("rest.port","9091");// 设定从指定的checkpoint恢复,此处为HDFS路径,可更换为本地路径"file:///D:\\test\\flink-tuning\\checkpoint\\jobId\\chk-xx"String savePointPath ="hdfs://masterNode:8020/flink-tuning/checkpoint/b66ee8431170f07764db0e777c58848a/chk-36";// 设置savepoint路径,以及是否允许本次提交的程序有新增有状态算子,必须给原来的算子配置uid作为唯一标识,否则会出现问题SavepointRestoreSettings restoreSettings =SavepointRestoreSettings.forPath(savePointPath,true);SavepointRestoreSettings.toConfiguration(restoreSettings, configuration);// 获取执行环境StreamExecutionEnvironment environment =StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 开启检查点,设置检查点间隔时间
environment.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);// 设置状态后端类型
environment.setStateBackend(newHashMapStateBackend());CheckpointConfig checkpointConfig = environment.getCheckpointConfig();// 设置checkpoint文件存放路径,设置本地路径:file:///D:\\test\\flink-tuning\\checkpoint
checkpointConfig.setCheckpointStorage("hdfs://masterNode:8020/flink-tuning/checkpoint");// 设置并发数,同时最多可以有几个checkpoint执行
checkpointConfig.setMaxConcurrentCheckpoints(1);// checkpoint失败次数,超过此次数,job挂掉(checkpoint不会重试,会等待下一个checkpoint)
checkpointConfig.setTolerableCheckpointFailureNumber(5);// 超时多久没完成checkpoint,任务失败
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));// 手动cancel掉job时,保留在外部系统的checkpoint不会被删除
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 从kafka读取数据KafkaSource<String> kafkaSource =KafkaSource.<String>builder().setBootstrapServers("192.168.200.130:9092").setTopics(inputTopicList).setGroupId("group-test-savepoint")// 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setProperty("partition.discovery.interval.ms","10000")// 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费.setValueOnlyDeserializer(newSimpleStringSchema()).build();SingleOutputStreamOperator<String> streamSource = environment.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka source").uid("kafka_source")// 最好设置一下算子的id.setParallelism(5);// 设置并行度 = topic分区数// 此处使用循环,会开辟map键值对个数的算子链,多个filter --> sink算子链,详情见下图// map中可配置topic所属集群,以及鉴权信息等,此处省略for(String key : hashMap.keySet()){// filter算子根据数据中的表名table与topic之间的映射关系,过滤数据SingleOutputStreamOperator<String> outputStreamOperator = streamSource.filter(vo ->{JSONObject jsonObject =JSONUtil.parseObj(vo);String tableName =(String) jsonObject.get("table");return tableName.equals(key);}).uid("filter-"+ key).setParallelism(5);// 构建kafka sinkKafkaSink<String> kafkaSink =KafkaSink.<String>builder()// kafka集群,可根据不同topic所在集群不同,动态更换ip.setBootstrapServers("192.168.200.130:9092")// 自定义kafka序列化器.setRecordSerializer(KafkaRecordSerializationSchema.builder()// 根据映射获取输出topic.setTopic(hashMap.get(key)).setValueSerializationSchema(newSimpleStringSchema()).build())// 一致性语义:至少一次.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// sink算子
outputStreamOperator.sinkTo(kafkaSink).uid("sink-"+ key).setParallelism(1);}// 执行
environment.execute();
3.3 运行图(ps:为了更好的展示循环中包含算子,将sink算子并行度设为了1,发生了rebalance)
4.方案二:适用于输入及输出topic都用属于一个集群的场景
4.1 配置文件同上
4.2 Java代码
publicstaticvoidmain(String[] args)throwsException{// 环境配置同上,故此处省略。。。// 从kafka读取数据KafkaSource<String> kafkaSource =KafkaSource.<String>builder().setBootstrapServers("192.168.200.130:9092").setTopics(inputTopicList).setGroupId("group-test-savepoint")// 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setProperty("partition.discovery.interval.ms","10000")// 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费.setValueOnlyDeserializer(newSimpleStringSchema()).build();SingleOutputStreamOperator<String> streamSource = environment.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka source").uid("kafka_source")// 最好设置一下算子的id.setParallelism(5);// 设置并行度 = topic分区数// 输出到kafka,此处没有循环,只会产生一条算子链KafkaSink<String> kafkaSink =KafkaSink.<String>builder().setBootstrapServers("192.168.200.130:9092")// 输出topic的kafka集群固定.setRecordSerializer((KafkaRecordSerializationSchema<String>)(data, context, timestamp)->{JSONObject jsonObject =JSONUtil.parseObj(data);// 获取表名String table =(String) jsonObject.get("table");// 获取topicString topic = hashMap.get(table);returnnewProducerRecord<>(topic, data.getBytes(StandardCharsets.UTF_8));}).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// sink算子
streamSource.sinkTo(kafkaSink).uid("sink-"+ key).setParallelism(1);}// 执行
environment.execute();
5. 业务使用场景:
版权归原作者 l柳の舞 所有, 如有侵权,请联系我们删除。