0


Kafka批量消费

在Spring Kafka中,使用

  1. @KafkaListener

注解处理批量信息时,首先需要开启批量监听模式,并配置相应的consumer参数来控制批量消费行为。以下是配置和处理批量消息的基本步骤:

  1. 配置Kafka消费者工厂: 设置batchListener属性为true,使@KafkaListener支持批量消费。@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory());// 开启批量监听模式 factory.setBatchListener(true);// 其他相关配置,比如并发度、错误处理等return factory;}
  2. 配置消费者参数: 设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG,指定每次poll请求从Kafka服务器获取的最大记录数。并且关闭offset自动提交enable-auto-commit: false# application.properties 或 application.ymlspring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: my-group max-poll-records: 100 # 其他配置项,如enable-auto-commit, auto-offset-reset等
  3. 编写批量处理方法: 定义一个方法,其参数是一个包含多条消息的列表,@KafkaListener注解下的方法将会接收到批量的消息。@KafkaListener(topics ="my-topic")publicvoidprocessMessages(List<ConsumerRecord<String,String>> records,Acknowledgment acknowledgment){try{// 处理批量消息for(ConsumerRecord<String,String> record : records){// 对每条消息进行处理}// 成功处理后手动提交偏移量 acknowledgment.acknowledge();}catch(Exception e){// 错误处理,记录错误,考虑是否重试或者有其他补偿措施 log.error("Error processing message batch", e);}}
  4. 处理异常和偏移量提交: 当批量处理消息时,需要注意的是,一旦消息处理完成且没有错误,应当手动提交偏移量,以确认这些消息已经被成功消费。如果有消息处理失败,则可能需要根据业务需求选择不同的策略,比如重新尝试处理整个批次、跳过错误消息或者记录错误信息稍后处理。

通过以上步骤,

  1. @KafkaListener

就能按照批处理的方式接收并处理Kafka主题中的消息了。

批量消费Kafka中的消息,然后将这些消息放入队列中,最后利用线程池异步处理这些队列中的消息。这种方式有助于优化资源利用率,尤其是当消息处理逻辑较为耗时或者IO密集型时,可以有效提升系统的并行处理能力和吞吐量。

  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.kafka.support.Acknowledgment;
  3. import org.springframework.stereotype.Component;
  4. import org.springframework.util.concurrent.ListenableFuture;
  5. import org.springframework.util.concurrent.ListenableFutureCallback;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  7. import java.util.List;
  8. import java.util.concurrent.BlockingQueue;
  9. import java.util.concurrent.LinkedBlockingQueue;
  10. @Component
  11. public class BatchMessageProcessor {
  12. private final ThreadPoolTaskExecutor taskExecutor;
  13. private final BlockingQueue<ConsumerRecord<String, String>> messageQueue = new LinkedBlockingQueue<>();
  14. public BatchMessageProcessor(ThreadPoolTaskExecutor taskExecutor) {
  15. this.taskExecutor = taskExecutor;
  16. }
  17. @KafkaListener(topics = "my-topic", batch = true)
  18. public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
  19. for (ConsumerRecord<String, String> record : records) {
  20. // 将消费到的消息放入队列
  21. messageQueue.offer(record);
  22. }
  23. // 异步处理消息队列
  24. processMessageQueue(acknowledgment);
  25. }
  26. private void processMessageQueue(Acknowledgment acknowledgment) {
  27. List<ConsumerRecord<String, String>> messagesToProcess;
  28. synchronized (messageQueue) {
  29. // 从队列中批量取出消息
  30. messagesToProcess = new ArrayList<>(messageQueue.size());
  31. messageQueue.drainTo(messagesToProcess, 100); // 假设批量处理100条
  32. }
  33. if (!messagesToProcess.isEmpty()) {
  34. ListenableFuture<?> future = taskExecutor.submit(() -> {
  35. for (ConsumerRecord<String, String> record : messagesToProcess) {
  36. // 实际处理消息的逻辑
  37. processSingleMessage(record);
  38. }
  39. // 所有消息处理完毕后提交偏移量
  40. acknowledgment.acknowledge();
  41. });
  42. // 可以添加回调函数,用于处理线程池任务执行后的结果
  43. future.addCallback(new ListenableFutureCallback<Object>() {
  44. @Override
  45. public void onSuccess(Object result) {
  46. // 处理成功逻辑
  47. }
  48. @Override
  49. public void onFailure(Throwable ex) {
  50. // 处理失败逻辑,如日志记录、重试等
  51. }
  52. });
  53. }
  54. }
  55. private void processSingleMessage(ConsumerRecord<String, String> record) {
  56. // 这里实现单个消息的具体处理逻辑
  57. }
  58. }
标签: kafka 分布式

本文转载自: https://blog.csdn.net/QGhurt/article/details/136954135
版权归原作者 一蓑烟雨任平生2024 所有, 如有侵权,请联系我们删除。

“Kafka批量消费”的评论:

还没有评论