0


Kafka之enable.auto.commit使用解析

通过字面意思我们不难理解这是kafka的自动提交功能。

配置消费者(配置ENABLE_AUTO_COMMIT_CONFIG为 true 配置自动提交)

**enable.auto.commit **的默认值是 true;就是默认采用自动提交的机制。

**auto.commit.interval.ms 的默认值是 **5000,单位是毫秒。

此时我们配置消息消费后自动提交offset 位置

  1. @Bean
  2. public KafkaConsumer<String, String> kafkaConsumer() {
  3. Map<String, Object> config = new HashMap<>();
  4. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  5. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  6. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  7. config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
  8. config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  9. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
  10. return consumer;
  11. }

配置消息监听

  1. @Slf4j
  2. @Component
  3. public class PackageMessageConsumer {
  4. @Autowired
  5. KafkaConsumer<String, String> kafkaConsumer;
  6. @Autowired
  7. EventProcessMaster eventProcessMaster;
  8. @PostConstruct
  9. public void listenRealTimeGroup() {
  10. new Thread(() -> consumer()).start();
  11. }
  12. private void consumer() {
  13. List<String> topics = new ArrayList<>();
  14. topics.add("test-kafka-auto.commit");
  15. kafkaConsumer.subscribe(topics);
  16. while(true) {
  17. try {
  18. // 拉取消息时间间隔 ms
  19. ConsumerRecords<String, String> records = kafkaConsumer.poll(10);
  20. for (ConsumerRecord<String, String> record : records) {
  21. String key = record.key();
  22. Object content = record.value();
  23. eventProcessMaster.processRequest(new Event(record.topic(), key, content));
  24. }
  25. } catch (Exception e){
  26. log.error(e.getMessage());
  27. }
  28. }
  29. }
  30. }

配置自动提交offset 位置之后,我们不必关心消息消费到了什么位置,当程序重启后,消息也不会重复消费;

配置消费者(配置ENABLE_AUTO_COMMIT_CONFIG为 false 配置手动提交)

手动提交顾名思义就是每次我们消费后,kafka不会手动更新offset 位置,同时 auto.commit.interval.ms 也就不被再考虑了

  1. @Bean
  2. public KafkaConsumer<String, String> kafkaConsumer() {
  3. Map<String, Object> config = new HashMap<>();
  4. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  5. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  6. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  7. config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
  8. // 与自动提交的区别
  9. config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  10. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
  11. return consumer;
  12. }

当我们设置成手动提交后,不修改其他代码会发现每次重启程序,kafka都会将我们没清理的所有数据都重新消费一遍,与我们需求的幂等性不符,将代码进行完善

  1. @Bean
  2. public KafkaConsumer<String, String> kafkaConsumer() {
  3. Map<String, Object> config = new HashMap<>();
  4. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  5. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  6. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  7. config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
  8. // 与自动提交的区别
  9. config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");// 自动提交时间间隔
  10. config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  11. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
  12. return consumer;
  13. }
  1. @Slf4j
  2. @Component
  3. public class DependPackageMessageConsumer {
  4. @Autowired
  5. KafkaConsumer<String, String> kafkaConsumer;
  6. @Autowired
  7. EventProcessMaster eventProcessMaster;
  8. @PostConstruct
  9. public void listenRealTimeGroup() {
  10. new Thread(() -> consumer()).start();
  11. }
  12. private void consumer() {
  13. List<String> topics = new ArrayList<>();
  14. topics.add("test-kafka-auto.commit");
  15. kafkaConsumer.subscribe(topics);
  16. while(true) {
  17. try {
  18. ConsumerRecords<String, String> records = kafkaConsumer.poll(10);
  19. for (ConsumerRecord<String, String> record : records) {
  20. String key = record.key();
  21. Object content = record.value();
  22. eventProcessMaster.processRequest(new Event(record.topic(), key, content));
  23. }
  24. // 手动提交 offset 位置
  25. kafkaConsumer.commitSync();
  26. } catch (Exception e){
  27. log.error(e.getMessage());
  28. }
  29. }
  30. }
  31. }

加上手动确认后服务重启,每次都会从上次offset 确认的位置开始消费

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/Romantic_lei/article/details/126590216
版权归原作者 Romantic_lei 所有, 如有侵权,请联系我们删除。

“Kafka之enable.auto.commit使用解析”的评论:

还没有评论