目录
问题
线上ELK日志发现kafka消费者消费到重复消息
排查原因
生产者原因
由于生产方本身就发送了重复的消息,导致消费到重复消息
消费者可能原因
消费方采用的是循环poll的模式,具体是在多线程分租户去批量处理的消息
while(true){ConsumerRecords consumerRecords;do{
consumerRecords =this.consumer.poll(60000L);}while(consumerRecords ==null);if(!consumerRecords.isEmpty()){long beginTime =System.currentTimeMillis();Map<String,List> recordMap =this.consumerHandler.slice(consumerRecords);List<ConsumerFutureDto> sliceFutureList =this.submitToExecutorService(recordMap);this.checkReinstanceConsumerExecutorService(unDoneSliceFutures);
sliceFutureList.forEach((item)->{
unDoneSliceFutures.add(item.getFuture());});if(this.checkSliceFuture(beginTime, sliceFutureList, unDoneSliceFutures)){this.commitOffset(this.consumer, consumerRecords);
LOGGER.info("消费消息(kafka-core)- 异常时提交偏移量");}else{this.commitOffset(this.consumer, consumerRecords);}}}
已有的幂等处理:在对poll出来的同一批次消息做分组时(按照同一租户),根据消息唯一业务字段标志sessionId去数据库中查询是否已有改消息,如果有就不处理。
消费业务逻辑处理时间:假设为5s,最后一步才对消息落库。
发生重复消费的原因可能有以下这几种情况
1. 重复消息被分到同一消费者的同一批次处理
重复的消息在某一个消费者同一poll的消息批次。假如该消息之前都没有入过库,那么这个时候根据sessionId去数据库查询,是查询不到的,所以重复消息的幂等校验就失效了,会造成重复消费。
2. 重复消息被分到同一消费者的不同批次处理
重复的消息在某一个消费者不同poll的消息批次,即两条消息是第一条执行完后第二条才执行的。由于第一条已经入库,所以上述的幂等校验就起作用了,第二条消息消费时,不会重复消费。
3. 重复消息被几乎同时分到不同消费者处理
重复的消息被分到不同的消费者,并且几乎同时处理消息。这样会造成,上述的幂等校验失效,因为查询数据库时,此时消息都未落库(该消息之前尚未被消费过),根据sessionId去过滤,查不到,所以两个消费者都会对该消息做处理。
解决方案
1.对批量消息进行去重
针对同一消费者同一批次的重复消息,在分组时先进行根据唯一标志sessionId进行去重。
publicMap<String,List<AbsMealDeductDto>>transferMealResourceDeduct(List<MealResourceTextRobotEventDto> eventList){if(CollectionUtils.isEmpty(eventList)){returnCollections.emptyMap();}//做幂等去重
eventList = eventList.stream().filter(e ->StringUtils.isNotBlank(e.getSessionId())).collect(Collectors.collectingAndThen(Collectors.toCollection(()->newTreeSet<>(Comparator.comparing(MealResourceTextRobotEventDto::getSessionId))),ArrayList::new));}
2. 对分配到不同消费者的消息进行redis去重
消费者执行消费前,先把唯一标识sessionId,放入到redis,如果已存在,那么就不进行处理
3. 生产者在发送消息前先进行路由分区
生产者发送消息按照唯一标识相关字段进行分区,如租户id,这样可以保证重复的消息都在同一分区。由于patition只能被同一消费者组的某一个消费者消费,所以可以保证重复消息不会被多个消费者消费。这种情况如果是多个消费者组就不适用了。
消费超时发送重平衡导致重复消费
以上都是项目实际情况,产生原因是因为生产者发送了重复消息,而消费方未对各种情况做幂等校验。如果生产者本身没有发送重复消息,消费者会不会发送重复消费呢?答案是也有可能。
kafka消费原理简介
先介绍几个关键的配置参数
enable.auto.commit
是否自动提交,默认为true
auto.commit.interval.ms
如果设置了 enable.auto.commit的值为true,则该值定义了消费者偏移量向Kafka提交的频率。默认为5000ms,即5s。kafka consumer 是在每次 poll () 之前去判断:是否已经到了要 commint 的时间( commitTime <= now ()),如果已经达到提交时间,就会提交 offset, 同时计算好下一次要 commit 的时间戳,所以他实际不是严格的每 5 秒提交一次,他的提交时间间隔是大于等于 5 秒,因为当你一次 poll () 请求处理超过 5 秒时,他也是在下一次 poll () 时才去提交 offset(不是定时任务提交 offset)。这个参数,根据数据的处理的频率进行设置,在没 commit offset 之前如果 consumer 挂掉,发生 rebalance,他所处理的数据就会重新被其它 consumer 重复处理。所以也不宜设置过大,导致有大量数据实际已经被处理,但是未提交 offset,导致产生 “数据积压” 假象。
session.timeout.ms
当使用Kafka的消费组的时候,消费者周期性地向broker发送心跳数表明自己的存在。如果经过该超时时间还没有收到消费者的心跳,则broker将消费者从消费组移除,并启动再平衡。该值必须在broker配置 group.min.session.timeout.ms和group.max.session.timeout.ms之间。
max.poll.interval.ms
使用消费组的时候调用poll()方法的时间间隔。该条目指定了消费者调用poll()方法的最大时间间隔。如果在此时间内消费者没有调用poll()方法,则broker认为消费者失败,触发再平衡,将分区分配给消费组中其他消费者。
max.poll.records
一次调用poll()方法返回的记录最大数量。
重复消费原因
消费时间大于两次poll最大间隔,导致重平衡,以致消费者消费完成后不能提交offset(下次poll时,会先把上次拉的数据提交offset,提交是在poll方法做的)导致重复消费。
解决办法
减小每次poll的数量或增大最大poll间隔超时检测。
其他
poll(5000)中5000的含义
1.poll()方法里传的是时间(ms),而不是Kafka返回的记录条数。
2.Kafka轮询一次就相当于拉取(poll)一定时间段broker中可消费的数据, 在这个指定时间段里拉取,时间到了就立刻返回数据。
3.例如poll(5000): 如果拉到数据的话 会立即放回;如果拉不到数据的话,这个是最长的等待时间;
比如5s,如果一直没有数据的话,每5s拉一次返回一次,有数据就立即返回再拉
如果poll处理时发生异常怎么办?
因为是在while循环里不停的拉,所以出现异常后就不继续执行了,如果不做额外处理异常,线程就会停止,相当于消费者挂了。为了不出现消费者挂了这种情况,一般都要做异常处理。
while(true){ConsumerRecords consumerRecords;do{
consumerRecords =this.consumer.poll(60000L);}while(consumerRecords ==null);if(!consumerRecords.isEmpty()){long beginTime =System.currentTimeMillis();Map<String,List> recordMap =this.consumerHandler.slice(consumerRecords);//提交线程池,并返回执行结果FutureList<ConsumerFutureDto> sliceFutureList =this.submitToExecutorService(recordMap);this.checkReinstanceConsumerExecutorService(unDoneSliceFutures);
sliceFutureList.forEach((item)->{
unDoneSliceFutures.add(item.getFuture());});if(this.checkSliceFuture(beginTime, sliceFutureList, unDoneSliceFutures)){this.commitOffset(this.consumer, consumerRecords);
LOGGER.info("消费消息(kafka-core)- 异常时提交偏移量");}else{this.commitOffset(this.consumer, consumerRecords);}}}
上述的例子中就是线程池多线程处理,如果消费线程发生异常,也会执行commitOffset。
版权归原作者 Maybe_9527 所有, 如有侵权,请联系我们删除。