0


Kafka:消费者手动提交

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。

两种手动提交方式:

  • commitSync(同步提交):

必须等待offset提交完毕,再去消费下一批数据。

同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败)

  • commitAsync(异步提交) :

发送完提交offset请求后,就开始消费下一批数据了。

异步提交则没有失败重试机制,有可能提交失败。

注意:

关闭自动提交

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import org.apache.kafka.common.serialization.StringDeserializer;
  6. import java.time.Duration;
  7. import java.util.ArrayList;
  8. import java.util.Properties;
  9. public class CustomConsumerByHandSync {
  10. public static void main(String[] args) {
  11. // 0 配置
  12. Properties properties = new Properties();
  13. // 连接 bootstrap.servers
  14. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.129:9092");
  15. // 反序列化
  16. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  17. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  18. // 配置消费者组id
  19. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
  20. // 手动提交 记得关闭false
  21. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
  22. // 1 创建一个消费者
  23. // KafkaConsumer<K, V>
  24. // 由于消息形式是 key value 为 "", "hello"
  25. // 所以泛型K为key为String类型 泛型V为传递消息的类型,此处发送字符串用String类型
  26. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  27. // 2 订阅主题 test
  28. ArrayList<String> topics = new ArrayList<>();
  29. topics.add("test");
  30. kafkaConsumer.subscribe(topics);
  31. // 3 消费数据
  32. while (true){
  33. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  34. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  35. System.out.println(consumerRecord);
  36. }
  37. // 手动提交offset
  38. // 同步提交
  39. // kafkaConsumer.commitSync();
  40. // 异步提交
  41. kafkaConsumer.commitAsync();
  42. }
  43. }
  44. }

异常处理

同步提交处理(有自动重试)

  1. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  2. process(records); // 处理消息
  3. try {
  4. consumer.commitSync();
  5. } catch (CommitFailedException e) {
  6. handle(e); // 处理提交失败异常
  7. }

异步提交处理(没有自动重试)

  1. try {
  2. while(true) {
  3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  4. process(records); // 处理消息
  5. commitAysnc(); // 使用异步提交规避阻塞
  6. }
  7. } catch(Exception e) {
  8. handle(e); // 处理异常
  9. } finally {
  10. try {
  11. consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
  12. } finally {
  13. consumer.close();
  14. }
  15. }

本文转载自: https://blog.csdn.net/weixin_45427648/article/details/129698546
版权归原作者 程序员无羡 所有, 如有侵权,请联系我们删除。

“Kafka:消费者手动提交”的评论:

还没有评论