概述
Flink端到端的精确一次语义是流处理中的关键概念,涉及状态一致性和结果正确性。这一特性涵盖了整个数据处理流程,包括数据源、流处理器和外部存储系统三个主要组件。
Flink内部通过检查点机制实现精确一次语义,但要达到端到端的一致性,还需要考虑输入和输出端的保证:
- 输入端:- 要求数据源具有重放能力,如Kafka- 在Source任务中将数据读取偏移量保存为状态
- 输出端: 需要实现以下两种写入方式之一: a) 幂等写入:操作可重复执行,但只导致一次结果更改 b) 事务写入:分为两种实现方式:- 预写日志(WAL):适用于不支持事务的系统,Flink中DataStream API提供了一个模板类GenericWriteAheadSink,用来实现这种事务型的写入方式。- 两阶段提交(2PC):需要外部系统支持事务,但能提供更好的性能和真正的端到端精确一次保证,可以通过实现TwoPhaseCommittingSink接口实现。
实现端到端精确一次语义虽然复杂,但对于要求高数据一致性的应用来说至关重要。它能够保证在发生故障时,系统可以恢复到正确的状态,既不丢失数据,也不重复处理。然而,不同的实现方式都有其优缺点和适用场景。这对于需要精确数据处理的场景,如金融交易或关键业务分析,非常重要。但在实际应用中,需要在一致性级别和处理性能之间进行权衡,选择最适合特定需求的解决方案。
端到端的状态一致性前提条件
- 输入端支持支持源端重放
- Flink内部启动检查点exactly-once语义
- 输出端幂等写入或事务写入
kafka-flink-kafka场景在具体应用中,实现真正的端到端exactly-once,还需要有一些额外的配置:
(1)必须启用检查点,enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
(2)指定KafkaSink的发送级别为DeliveryGuarantee.EXACTLY_ONCE,启用两阶段提交
(3)配置Kafka下游消费者读取数据的隔离级别
预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而Kafka中默认的隔离级别isolation.level是read_uncommitted,也就是可以读取未提交的数据。所以应该将隔离级别配置为read_committed,表示只有被提交的消息才会被消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。
(4)事务超时配置
Flink的Kafka连接器中配置的事务超时时间【transaction.timeout.ms】默认是1小时,而Kafka集群配置的事务最大超时时间【transaction.max.timeout.ms】默认是15分钟。所以在检查点保存时间很长时,有可能出现Kafka已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者。
他们的关系应该为: CheckpointTimeout < 【事务超时时间transaction.timeout.ms默认是60分钟】 <= 【kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟】。
Flink checkpoint 与 2pc 的关系
Flink 中端到端exactly-once的最佳实现,当然就是两阶段提交(2PC)。Flink 的 KafkaSink 中的两阶段提交机制与 Checkpoint 密切相关,它们共同工作以实现 exactly-once 语义。让我详细解释它们之间的关系:
- Checkpoint 机制概述:Checkpoint 是 Flink 的容错机制,它定期为整个作业创建一致性快照。这个快照包含了所有算子的状态,以及输入流的位置(例如,Kafka 的偏移量)。
- KafkaSink 的两阶段提交:两阶段提交是一种保证事务一致性的协议。在 KafkaSink 中,它被用来确保数据写入 Kafka 的过程与 Flink 的 Checkpoint 同步。
- 两阶段提交与 Checkpoint 的协作过程:a. 预提交阶段(Pre-commit):- 当 Checkpoint 开始时,KafkaSink 会预提交当前的事务。- 这个阶段中,KafkaSink 将数据写入 Kafka,但保持在未提交状态,隔离级别为read_uncommitted的consumer可读。- Flink 调用 KafkaWriter 的 prepareCommit 方法,生成KafkaCommittable 相关代码: org.apache.flink.connector.kafka.sink.KafkaWriter#flush - 数据写入 org.apache.flink.connector.kafka.sink.KafkaWriter#prepareCommit -预提交 org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator#prepareSnapshotPreBarrierb. Checkpoint 完成:- Flink 作业的所有算子完成状态快照。checkpoint快照元数据包括:KafkaSource (上游消费的位移等)和 KafkaSink(下游预提交的相关事务状态等)c. 提交阶段(Commit):- 只有在 Checkpoint 成功完成后,KafkaSink 才会提交事务。Flink 调用 KafkaCommitter 的 commit 方法,利用预提交阶段生成的KafkaCommittable提交事务,数据对下游隔离级别为read_committed的 consumer才可见。- 在 Checkpoint 完成后,如果在 KafkaSink 提交事务之前,服务挂掉,从 Checkpoint 恢复,从 Checkpoint 和下游 Kafka 集群中获取当前producer事务的提交状态为未提交,处理源端重放数据,重新提交当前事务的数据(此时如果没有预提交,可能会导致数据丢失)。- 如果在 KafkaSink 提交事务之后,上游 consumer 提交偏移量之前,服务挂掉,当前事务已提交,则不处理源端重放数据。- 到此为止还未提交上游consumer Kafka offset。如果任务失败从 Checkpoint 恢复,不会重复消费数据;否则会重复消费。 相关代码: org.apache.flink.connector.kafka.sink.KafkaCommitter#commit org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer#commitTransactiond. 提交上游consumer偏移量,为了获得上游源端重放的能力,偏移量的提交需要在事务提交之后。 相关代码: org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager#commitOffsetse. 回滚(如果需要):- 如果 Checkpoint 失败,事务会被回滚,确保不会有部分数据被提交。
注意:这里的上游可以为任意支持数据回放的source(不必支持事务),下游可以为任意支持事务的sink。上下游可以为同一个或者不同的Kafka集群
- 实现 Exactly-Once 语义:- 通过将 Kafka 事务与 Flink Checkpoint 同步,确保每条数据都被精确地处理一次。- 即使在作业失败和恢复的情况下,也能保证数据不会重复写入或丢失。
- 工作流程:a. 数据处理:- Flink 作业处理数据流。b. Checkpoint 触发:- Flink 定期触发 Checkpoint(例如,每5秒)。c. KafkaSink 预提交:- KafkaSink 将数据写入 Kafka,但保持在未提交状态。d. 状态快照:- 所有算子(包括 KafkaSink)保存其状态。e. Checkpoint 完成:- 当所有算子都完成状态保存后,Checkpoint 被视为完成。f. 事务提交:- KafkaSink 提交 Kafka 事务,使数据对消费者可见。
- 故障恢复:- 如果作业在 Checkpoint 之间失败,它会从最近的成功 Checkpoint 恢复。- 未提交的事务会被回滚,确保没有重复数据写入 Kafka。
- 注意事项:- 使用两阶段提交会增加一些延迟,因为数据需要等到 Checkpoint 完成后才能被提交。- 需要确保 Kafka 的事务超时时间大于 Checkpoint 间隔。- Kafka 会自动清理超时的事务。你可以调整以下参数来控制超时时间: transaction.max.timeout.ms: 事务的最大超时时间(默认为 15 分钟) transactional.id.expiration.ms: 事务 ID 的过期时间(默认为 7 天)
- 性能和一致性的平衡:- 增加 Checkpoint 频率可以减少故障时的数据丢失风险,但会增加系统开销。- 减少 Checkpoint 频率可以提高吞吐量,但可能增加恢复时间和潜在的数据重复处理。
总结:
实现关键在于 KafkaWriter 和 KafkaCommitter 类,它们分别处理预提交和提交阶段。prepareCommit 方法在 Checkpoint 开始时被调用,预提交事务。commit 方法在 Checkpoint 成功后被调用,真正提交事务。整个过程与 Flink 的 Checkpoint 机制紧密集成,实现了 exactly-once 语义。通过这种机制,Flink 的 KafkaSink 能够在保证高吞吐量的同时,实现强一致性的 exactly-once 处理语义。
代码示例
FlinkKafkaExactlyOnceExample.java
packagecom.pony.endtoend;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.connector.base.DeliveryGuarantee;importorg.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;importorg.apache.flink.connector.kafka.sink.KafkaSink;importorg.apache.flink.connector.kafka.source.KafkaSource;importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;importorg.apache.flink.runtime.state.hashmap.HashMapStateBackend;importorg.apache.flink.streaming.api.CheckpointingMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.CheckpointConfig;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.ProcessFunction;importorg.apache.flink.util.Collector;importorg.apache.kafka.clients.producer.ProducerConfig;publicclassFlinkKafkaExactlyOnceExample{privatestaticfinalStringKAFKA_SERVER1="x.x.x.x:9092";privatestaticfinalStringKAFKA_SERVER2="x.x.x.x:9091";publicstaticvoidmain(String[] args)throwsException{// 创建 Flink 运行环境,支持exactly-once语义finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000*60,CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60*1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(newHashMapStateBackend());
env.setMaxParallelism(100);
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointStorage("file://"+System.getProperty("user.dir")+"/ck");// 配置 Kafka 源,支持源端重放KafkaSource<String> source =KafkaSource.<String>builder().setBootstrapServers(KAFKA_SERVER1).setTopics("t1").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.committedOffsets()).setValueOnlyDeserializer(newSimpleStringSchema()).build();// 从 Kafka 读取数据DataStream<String> stream = env.fromSource(source,WatermarkStrategy.noWatermarks(),"Kafka Source");// 进行简单处理:将输入字符串转换为大写DataStream<String> processedStream = stream.process(newProcessFunction<String,String>(){@OverridepublicvoidprocessElement(String value,Context ctx,Collector<String> out){
out.collect(value.toUpperCase());}});// 配置 Kafka 接收器,支持事务写入KafkaSink<String> sink =KafkaSink.<String>builder().setBootstrapServers(KAFKA_SERVER2).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("t2").setValueSerializationSchema(newSimpleStringSchema()).build())// 启用两阶段提交.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 设置事务的前缀.setTransactionalIdPrefix("my-transaction-id-prefix")// 设置事务超时时间:CheckpointTimeout < 【事务超时时间transaction.timeout.ms默认是60分钟】 <= 【kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟】.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"900000").build();// 将处理后的数据写入 Kafka
processedStream.sinkTo(sink);// 执行作业
env.execute("Kafka Exactly-Once Example");}}
KafkaTransactionConsumer.java
packagecom.pony.endtoend;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.time.Duration;importjava.util.Arrays;importjava.util.Properties;/**
* Created by pony
*/publicclassKafkaTransactionConsumer{privatestaticfinalStringKAFKA_SERVER="x.x.x.x:9091";publicstaticvoidmain(String[] args){Properties props =newProperties();
props.put("bootstrap.servers",KAFKA_SERVER);
props.put("group.id","g2");
props.put("client.id","client1");// 设置隔离级别
props.put("isolation.level","read_committed");// 关闭自动提交
props.put("enable.auto.commit","false");
props.put("auto.commit.interval.ms","1000");
props.put("max.poll.interval.ms","300000");// 在没有offset的情况下采取的拉取策略,[latest, earliest, none]
props.put("auto.offset.reset","latest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("t2"));while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String> record : records){System.out.printf(Thread.currentThread().getName()+": partition = %d, offset = %d, key = %s, value = %s, timestamp = %s,timestampType = %s %n", record.partition(), record.offset(), record.key(), record.value(), record.timestamp(), record.timestampType());}
consumer.commitSync();}}}
maven依赖:
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.3</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>provided</scope></dependency><!--为了能在本地IDE启动看到flinkwebUI--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency></dependencies>
日志配置文件:
log4j.rootLogger=debug,stdout,file
log.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c:%M:%L] [%p] - %m%n
# Define the console appender
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=${log.pattern}
log4j.appender.stdout.Threshold=INFO
# Define an appender for File
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=./logs/flink-file.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.append=false
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=${log.pattern}
#log4j.appender.file.Threshold=DEBUG
# Define an appender for Daily Rolling File
log4j.appender.dailyFile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.dailyFile.File=./logs/flink-daily.log
log4j.appender.dailyFile.DatePattern='.'yyyy-MM-dd
log4j.appender.dailyFile.layout=org.apache.log4j.PatternLayout
log4j.appender.dailyFile.layout.ConversionPattern=${log.pattern}
log4j.appender.dailyFile.Threshold=DEBUG
配置运行环境
启动kafka集群
- step-1
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper:latest
- step-2
# kafka1docker run -d\--name kafka1 \--restart=always \-p9091:9092 \-eKAFKA_BROKER_ID=1\-eKAFKA_ZOOKEEPER_CONNECT=x.x.x.x:2181/kafka1 \-eKAFKA_ADVERTISED_LISTENERS=PLAINTEXT://x.x.x.x:9091 \-eKAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-t wurstmeister/kafka
# kafka2docker run -d\--name kafka2 \--restart=always \-p9092:9092 \-eKAFKA_BROKER_ID=1\-eKAFKA_ZOOKEEPER_CONNECT=x.x.x.x:2181/kafka2 \-eKAFKA_ADVERTISED_LISTENERS=PLAINTEXT://x.x.x.x:9092 \-eKAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-t wurstmeister/kafka
启动standalone Flink级群:
# 创建网络docker network create flink-network-standalone
# 启动jobmanagerdocker run -d\--name flink-jm \--hostname flink-jm \-v /Users/lijianjun46/Documents/old_data/other/idea_workspace/flink-demo01/ck:/Users/lijianjun46/Documents/old_data/other/idea_workspace/flink-demo01/ck \-p8082:8081 \--envFLINK_PROPERTIES="jobmanager.rpc.address: flink-jm"\--network flink-network-standalone \
flink:1.16.3-java8 \
jobmanager
# 启动taskmanagerdocker run -d\--name flink-tm \--hostname flink-tm \--envFLINK_PROPERTIES="jobmanager.rpc.address: flink-jm"\--network flink-network-standalone \
flink:1.16.3-java8 \
taskmanager \-Dtaskmanager.memory.process.size=1024m \-Dtaskmanager.numberOfTaskSlots=5\-Drest.flamegraph.enabled=true
版权归原作者 PONY LEE 所有, 如有侵权,请联系我们删除。