0


一碰就头疼的 Kafka 消息重复问题,立马解决!

一碰就头疼的 Kafka 消息重复问题,立马解决!

在这里插入图片描述

一、前言

数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。
在这里插入图片描述
通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。

整理下消息重复的几个场景:
  1. 生产端: 遇到异常,基本解决措施都是 重试 。
  • 场景一:leader分区不可用了,抛 LeaderNotAvailableException 异常,等待选出新 leader 分区。
  • 场景二:Controller 所在 Broker 挂了,抛 NotControllerException 异常,等待 Controller 重新选举。
  • 场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException 异常,等待网络恢复。
  1. 消费端:poll一批数据,处理完毕还没提交 offset ,机子宕机重启了,又会poll上批数据,再度消费就造成了消息重复。

怎么解决?
先来了解下消息的三种投递语义:

  • 最多一次( at most once): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如:mqttQoS = 0
  • 至少一次( at least once): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如:mqtt QoS = 1
  • 精确一次( exactly once): 消息精确发一次,消息不会丢失,也不会被重复发送。例如:mqtt QoS = 2

了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:

  1. Kafka 幂等性 Producer 保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话)
  2. Kafka 事务: 保证生产端发送消息幂等。解决幂等 Producer 的局限性。
  3. 消费端幂等:保证消费端接收消息幂等。蔸底方案。
Kafka 幂等性 Producer

幂等性指 :无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。

幂等性使用示例:在生产端添加对应配置即可

Properties props = new Properties();
props.put("enable.idempotence", ture);//1. 设置幂等
props.put("acks","all");//2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection",5);//3. 注意
  1. 设置幂等,启动幂等。
  2. 配置 acks,注意:一定要设置 acks=all,否则会抛异常。
  3. 配置 max.in.flight.requests.per.connection 需要 <= 5 ,否则会抛异常 OutOfOrderSequenceException
  • 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
  • Kafka >= 1.1, max.in.flight.request.per.connection <= 5

为了更好理解,需要了解下Kafka 幂等机制:
在这里插入图片描述

  1. Producer 每次启动后,会向 Broker 申请一个全局唯一的 pid。(重启后 pid 会变化,这也是弊端之一)
  2. Sequence Numbe:针对每个 <Topic, Partition> 都对应一个从0开始单调递增的 Sequence,同时 Broker端会缓存这个 seq num
  3. 判断是否重复: 拿 <pid, seq num>Broker 里对应的队列 ProducerStateEntry.Queue(默认队列长度为 5)查询是否存在
  • 如果 nextSeq == lastSeq + 1,即 服务端seq + 1 == 生产传入seq,则接收。
  • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即刚初始化,也接收。
  • 反之,要么重复,要么丢消息,均拒绝。

在这里插入图片描述
这种设计针对解决了两个问题:

  1. 消息重复: 场景 Broker 保存消息后还没发送 ack 就宕机了,这时候Producer就会重试,这就造成消息重复。
  2. 消息乱序: 避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。

那什么时候该使用幂等:
3. 如果已经使用

acks=all

,使用幂等也可以。
4. 如果已经使用

acks=0 

或者

acks=1

,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。

Kafka 事务

使用 Kafka 事务解决幂等的弊端:单会话且单分区幂等。
Tips: 这块篇幅较长,这先稍微提及下使用,之后另起一篇。

事务使用示例:分为生产端 和 消费端

Properties props =newProperties();
props.put("enable.idempotence", ture);// 1. 设置幂等
props.put("acks","all");// 2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection",5);// 3. 最大等待数
props.put("transactional.id","my-transactional-id");// 4. 设定事务 idProducer<String,String> producer =newKafkaProducer<String,String>(props);// 初始化事务
producer.initTransactions();try{// 开始事务
    producer.beginTransaction();// 发送数据
    producer.send(newProducerRecord<String,String>("Topic","Key","Value"));// 数据发送及 Offset 发送均成功的情况下,提交事务
    producer.commitTransaction();}catch(ProducerFencedException|OutOfOrderSequenceException|AuthorizationException e){// 数据发送或者 Offset 发送出现异常时,终止事务
    producer.abortTransaction();}finally{// 关闭 Producer 和 Consumer
    producer.close();
    consumer.close();}

**这里消费端

Consumer 

需要设置下配置:

isolation.level 

参数**

  • read_uncommitted: 这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  • read_committed: 表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
消费端幂等

“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。
只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。

典型的方案是使用:消息表,来去重:
在这里插入图片描述

  • 上述例子中,消费端拉取到一条消息后,开启事务,将消息Id 新增到本地消息表中,同时更新订单信息。
  • 如果消息重复,则新增操作insert会异常,同时触发事务回滚。

二、案例:Kafka 幂等性 Producer 使用

环境搭建可参考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic

准备工作如下:

  1. Zookeeper:本地使用 Docker 启动
$ docker run -d --name zookeeper -p 2181:2181 zookeeper
a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
  1. Kafka:版本 2.7.1,源码编译启动(看上文源码搭建启动)
  2. 启动生产者:Kafka 源码中 exmaple
  3. 启动消息者:可以用 Kafka 提供的脚本
# 举个栗子:topic 需要自己去修改
$ cd ./kafka-2.7.1-src/bin
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

**创建

topic

:** 1副本,2 分区

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2

# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe

生产者代码:
在这里插入图片描述

publicclassKafkaProducerApplication{privatefinalProducer<String,String> producer;finalString outTopic;publicKafkaProducerApplication(finalProducer<String,String> producer,finalString topic){this.producer = producer;
        outTopic = topic;}publicvoidproduce(finalString message){finalString[] parts = message.split("-");finalString key, value;if(parts.length >1){
            key = parts[0];
            value = parts[1];}else{
            key =null;
            value = parts[0];}finalProducerRecord<String,String> producerRecord
            =newProducerRecord<>(outTopic, key, value);
        producer.send(producerRecord,(recordMetadata, e)->{if(e !=null){
                        e.printStackTrace();}else{System.out.println("key/value "+ key +"/"+ value +"\twritten to topic[partition] "+ recordMetadata.topic()+"["+ recordMetadata.partition()+"] at offset "+ recordMetadata.offset());}});}publicvoidshutdown(){
        producer.close();}publicstaticvoidmain(String[] args){finalProperties props =newProperties();

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
        props.put(ProducerConfig.ACKS_CONFIG,"all");

        props.put(ProducerConfig.CLIENT_ID_CONFIG,"myApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);finalString topic ="myTopic";finalProducer<String,String> producer =newKafkaProducer<>(props);finalKafkaProducerApplication producerApp =newKafkaProducerApplication(producer, topic);String filePath ="/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";try{List<String> linesToProduce =Files.readAllLines(Paths.get(filePath));
            linesToProduce.stream().filter(l ->!l.trim().isEmpty()).forEach(producerApp::produce);System.out.println("Offsets and timestamps committed in batch from "+ filePath);}catch(IOException e){System.err.printf("Error reading file %s due to %s %n", filePath, e);}finally{
            producerApp.shutdown();}}}

启动生产者后,控制台输出如下:
在这里插入图片描述
启动消费者:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic

在这里插入图片描述

修改配置 acks
**启用幂等的情况下,调整

acks

配置,生产者启动后结果是怎样的:**

  • 修改配置 acks = 1
  • 修改配置 acks = 0

会直接报错:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer.
Otherwise we cannot guarantee idempotence.

在这里插入图片描述
修改配置 max.in.flight.requests.per.connection

启用幂等的情况下,调整此配置,结果是怎样的:

max.in.flight.requests.per.connection > 5

会怎样?
在这里插入图片描述
当然会报错:

Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.

在这里插入图片描述

标签: kafka linq

本文转载自: https://blog.csdn.net/weixin_42081445/article/details/135624131
版权归原作者 一名技术极客 所有, 如有侵权,请联系我们删除。

“一碰就头疼的 Kafka 消息重复问题,立马解决!”的评论:

还没有评论