0


spring-kafka消费消息后的提交方式

鸟瞰图

自动提交

设置自动提交:spring.kafka.consumer.enable-auto-commit=true

相当于设置了kafka-clients包本身的Consumer的配置enable.auto.commit=true

此时到消息的提交是底层kafka-client负责的,不需要我们关心。这种配置有消息丢失的风险。

一旦通过poll方法获取的消息没有真正处理,就不会再消费了。相当于丢消息了。

非自动提交

enable.auto.commit=false

这里的非自动是针对下层的kafka-clients来说的。即poll方法调用后,需要使用方来调用commitSync/commicAsync方法来最终完成提交。这种做法可以避免消息丢失,但是有可能导致消息重复消费。

而这里的非自动提交,spring定义了很多种情况AckMode有很多种,spring又把它分为了2种情况。

  1. spring为我们完成提交。只是从使用感觉上来说,和自动提交类似。

  2. 我们调用Spring的回调API acknowledgment.acknowledge(); 来自己把控。

RECORD模式

Commit the offset after each record is processed by the listener.
在listener处理完消息返回回来后,立即由KafkaMessageListenerContainer帮我们Ack服务端。
如果我们是单条消费的listener:那么此时实际的效果就是spring一次性帮我们poll n条消息回来,我们每处理完一条,spring就commit一次。这种做法非常浪费性能。好处就是不容易出差错。

   如果是批量消费的listener:那么此时实际的效果就是spring一次性帮我们poll n条消息回来,我们处理完,listener方法返回时,spring触发commit。此时和批量一次性提交没什么区别。

BATCH模式

Commit the offsets of all records returned by the previous poll after they all have been processed by the listener.

只有上一次poll方法得到的所有消息都被listener处理过了,container才一次性帮我们ack服务端。这是默认的AckMode。
ack的时机就在调用doProcessCommits方法时,其实不是poll后,立即ack。而是消息给listener了,等下一次poll时,才会真正ack。所以如果你将spring.kafka.listener.idle-between-polls(两次poll的间隔) 设置得极大,就会发现spring一直没帮你提交,因为下次poll还没触发。

单条消费的listener&批量消费的listener此时的逻辑都是一样的,spring帮我们缓存offset的信息,下次poll之前一起提交。

TIME模式

spring.kafka.listener.ack-time可以配合设置提交时间的阈值,到时间了spring帮我们ack。

值得注意的是,spring其实没有单独开一个定时器线程什么的定期检查缓存起来的待提交信息是否已经到规定的时间阈值了。其实这个检查提交的动作任然是用调用poll的线程去做的。和batch模式不同的只在于如果时间没到,就触发不提交。

单条消费的listener:因为单条消费的逻辑在每次处理完每条消息后都有对应的尝试提交的动作,此时如果达到了时间阈值就会提交。这是个很适合的场景。

批量消费的listener:只有下次poll之前去检查时间阈值提交。ack的时间设置得过大。会导致poll了很多次任然没有真正提交。并不合适。

COUNT模式

spring.kafka.listener.ack-count可以配合设置数量的阈值,到数量了spring帮我们ack。

时机和time模式一样,不赘述。

单条消费的listener:同time模式

批量消费的listener:只有下次poll之前去检查数量阈值提交。但是因为他是批量消费的,假设poll一次是10条消息。spring.kafka.listener.ack-count设置为5次。那么spring会在5次poll之后才会ack。即真实消费了50条消息后才会ack。感觉怪怪的。实际也其实没有这种使用场景。别这样用。

MANUAL模式

手动模式,即开发者自己调用acknowledgment.acknowledge()来ack。

    如果设置了手动ack,但是listener又没有注入Acknowledgement,或者没有调用。container真的不会帮忙提交,一旦重启又从来开始消费。

    如果调用了,其实也不是马上就ack给server端,其实是存起来,等这批次poll到的消息全部处理完了。再提交。触发的时机还是在下次poll之前。这种模式其实和AckMode.Batch的效果差别很小。相当于容器来帮你ack还是自己调用的区别。最多自己调用可以灵活控制ack的位置。算是一种假的手动提交。

不管是哪一种,真正ack到服务端的时机都是在下次一poll之前。此时批量&单条的listener逻辑一致。

MANUAL_IMMEDIATE模式

调用acknowledgment.acknowledge()的时候就立即ack到server端了。

单条消息listener:调用一次ack一次。

批量消息listener:其实和BATCH没什么区别。因为本来就是一次性拿到了所有的poll到的消息。

springboot默认设置:非自动提交,提交模式batch。

如何测试判断提交的时机?

将org.springframework.kafka.listener.KafkaMessageListenerContainer的日志等级设置为DEBUG。

logging.level.org.springframework.kafka.listener.KafkaMessageListenerContainer=debug
标签: spring kafka java

本文转载自: https://blog.csdn.net/lazyguy21/article/details/142137983
版权归原作者 大飞Tobi 所有, 如有侵权,请联系我们删除。

“spring-kafka消费消息后的提交方式”的评论:

还没有评论