0


【Kafka】消息重复场景及解决

目录

生产者阶段重复

根本原因

生产发送的消息没有收到正确的broke响应,导致生产者重试。

生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。

过程

在这里插入图片描述

过程如下:

  1. new KafkaProducer()后创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消息;
  2. 调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;
  3. 后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
  4. 如果发送成功,那么返回成功;
  5. 如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;

记录顺序问题

如果设置

max.in.flight.requests.per.connection

大于1(默认5,单个连接上发送的未确认请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出了)。大于1可能会改变记录的顺序,因为如果将两个batch发送到单个分区,第一个batch处理失败并重试,但是第二个batch处理成功,那么第二个batch处理中的记录可能先出现被消费。

设置

 max.in.flight.requests.per.connection

为1,可能会影响吞吐量,可以解决单个生产者发送顺序问题。如果多个生产者,还是不能保证消息的顺序消费,如生产者1先发送一个请求,生产者2后发送请求,此时生产者1返回可恢复异常,重试一定次数成功了。虽然生产者1先发送消息,但生产者2发送的消息会被先消费。

解决方案

  1. 启动kafka的幂等性

要启动kafka的幂等性,设置:

enable.idempotence=true

,以及

ack=all

以及

retries

1 。

  1. ack=0,不重试

这个方案可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。

生产者和broker阶段消息丢失

原因

  1. ack=0,不重试

生产者发送消息完,不管结果了,如果发送失败也就丢失了。

  1. ack=1,leader crash

生产者发送消息完,只等待Leader写入成功就返回了,Leader分区丢失了,此时Follower没来及同步,消息丢失。

  1. unclean.leader.election.enable 配置true

允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。生产者发送异步消息,只等待Lead写入成功就返回,Leader分区丢失,此时ISR中没有Follower,Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失。

解决

  1. 禁用unclean选举,ack=all
ack=all / -1,tries > 1,unclean.leader.election.enable : false

生产者发完消息,等待Follower同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过5个,一般三个。

不允许unclean Leader选举。

  1. 配置:min.insync.replicas > 1

当生产者将 acks 设置为 all (或 -1 )时,

min.insync.replicas>1

。指定确认消息写成功需要的最小副本数量。达不到这个最小值,生产者将引发一个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

当一起使用时, min.insync.replicas 和 ack 允许执行更大的持久性保证。一个典型的场景是创建一个复制因子为3的主题,设置min.insync复制到2个,用 all 配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常。

  1. 失败的offset单独记录

生产者发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进行单独处理

消费者阶段重复

根本原因

数据消费完没有及时提交offset到broker。

消息消费端在消费过程中挂掉没有及时提交offset到broke,另一个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。

解决

  1. 取消自动提交

每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。

  1. 下游做幂等

一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把offset或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新。


本文转载自: https://blog.csdn.net/qq_43745578/article/details/136189022
版权归原作者 Ethan-running 所有, 如有侵权,请联系我们删除。

“【Kafka】消息重复场景及解决”的评论:

还没有评论