Flink写入Kafka两阶段提交
端到端的 exactly-once(精准一次)
kafka -> Flink -> kafka
1)输入端
输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)
2)Flink内部
Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义
3)输出端
两阶段提交(2PC)
。
写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的
“预提交”
;等到检查点保存完毕,才会提交事务进行
“正式提交”
。
如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。
必须的配置
1)
必须启用检查点
2)指定 KafkaSink 的
发送级别为 DeliveryGuarantee.EXACTLY_ONCE
3)配置 Kafka 读取数据的
消费者的隔离级别
【默认kafka消费者隔离级别是
读未提交
,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为
读已提交
】
4)事务超时配置
【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此
checkpoint 间隔 < 事务超时时间 < max的15分钟
】
代码实战
kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】
publicclassKafkaEOSDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、启用检查点,设置为精准一次
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.读取 kafkaKafkaSource<String> kafkaSource =KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("topic_1").setValueOnlyDeserializer(newSimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env
.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),"kafkasource");/*
3.写出到 Kafka
精准一次 写入 Kafka,需要满足以下条件,【缺一不可】
1、开启 checkpoint
2、sink 设置保证级别为 精准一次
3、sink 设置事务前缀
4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟
*/KafkaSink<String> kafkaSink =KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092")// 指定序列化器:指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(newSimpleStringSchema()).build())// 【3.1】 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("li-")// 【3.3】 设置事务超时时间.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000+"").build();
kafkasource.sinkTo(kafkaSink);
env.execute();}}
后续读取“ws”这个 topic 的消费者,要
设置事务的隔离级别为“读已提交”
publicclassKafkaEOSConsumer{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用【两阶段提交】写入的 TopicKafkaSource<String> kafkaSource =KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("ws").setValueOnlyDeserializer(newSimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed").build();
env
.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),"kafkasource").print();
env.execute();}}
处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。
版权归原作者 不进大厂不改名二号 所有, 如有侵权,请联系我们删除。