0


Spring Boot 自定义kafka 消费者配置 ContainerFactory最佳实践

引入 MAVEN 依赖

版本需要你自己指定

  1. <dependency>
  2. <groupId>com.alibaba</groupId>
  3. <artifactId>fastjson</artifactId>
  4. <version>xxx</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. <version>xxx</version>
  10. </dependency>
  11. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  12. <dependency>
  13. <groupId>org.apache.kafka</groupId>
  14. <artifactId>kafka-clients</artifactId>
  15. <version>xxx</version>
  16. </dependency>

引入Java配置类

  1. /**
  2. * 手动自定义 kafka 消费者 ContainerFactory 配置demo
  3. */
  4. @Configuration
  5. @EnableConfigurationProperties(KafkaProperties.class)
  6. public class KafkaConsumerConfig {
  7. @Autowired
  8. private KafkaProperties properties;
  9. @Value("${监听服务地址}")
  10. private List<String> myServers;
  11. @Bean("myKafkaContainerFactory")
  12. @ConditionalOnBean(ConcurrentKafkaListenerContainerFactoryConfigurer.class)
  13. public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
  14. ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
  15. ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  16. configurer.configure(factory, consumerFactory());
  17. return factory;
  18. }
  19. //获得创建消费者工厂
  20. public ConsumerFactory<Object, Object> consumerFactory() {
  21. KafkaProperties myKafkaProperties = JSON.parseObject(JSON.toJSONString(this.properties), KafkaProperties.class);
  22. //对模板 properties 进行定制化
  23. //....
  24. //例如:定制servers
  25. myKafkaProperties.setBootstrapServers(myServers);
  26. return new DefaultKafkaConsumerFactory<>(myKafkaProperties.buildConsumerProperties());
  27. }
  28. }

yml模板

  1. #kafka配置,更多配置请参考:KafkaProperties
  2. spring.kafka:
  3. #公共参数,其他的timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms保持默认值
  4. properties:
  5. #这个参数指定producer在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小(batch-size),到达时间后生产者也会发送批量消息到broker。默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息。设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟。
  6. linger.ms: 50 #默认值:0毫秒,当消息发送比较频繁时,增加一些延迟可增加吞吐量和性能。
  7. #这个参数指定producer在一个TCP connection可同时发送多少条消息到broker并且等待broker响应,设置此参数较高的值可以提高吞吐量,但同时也会增加内存消耗。另外,如果设置过高反而会降低吞吐量,因为批量消息效率降低。设置为1,可以保证发送到broker的顺序和调用send方法顺序一致,即便出现失败重试的情况也是如此。
  8. #注意:当前消息符合at-least-once,自kafka1.0.0以后,为保证消息有序以及exactly once,这个配置可适当调大为5。
  9. max.in.flight.requests.per.connection: 1 #默认值:5,设置为1即表示producer在connection上发送一条消息,至少要等到这条消息被broker确认收到才继续发送下一条,因此是有序的。
  10. #生产者的配置,可参考org.apache.kafka.clients.producer.ProducerConfig
  11. producer:
  12. #这个参数可以是任意字符串,它是broker用来识别消息是来自哪个客户端的。在broker进行打印日志、衡量指标或者配额限制时会用到。
  13. clientId: ${spring.application.name} #方便kafkaserver打印日志定位请求来源
  14. bootstrap-servers: 127.0.0.1:8080 #kafka服务器地址,多个以逗号隔开
  15. #acks=0:生产者把消息发送到broker即认为成功,不等待broker的处理结果。这种方式的吞吐最高,但也是最容易丢失消息的。
  16. #acks=1:生产者会在该分区的leader写入消息并返回成功后,认为消息发送成功。如果群首写入消息失败,生产者会收到错误响应并进行重试。这种方式能够一定程度避免消息丢失,但如果leader宕机时该消息没有复制到其他副本,那么该消息还是会丢失。另外,如果我们使用同步方式来发送,延迟会比前一种方式大大增加(至少增加一个网络往返时间);如果使用异步方式,应用感知不到延迟,吞吐量则会受异步正在发送中的数量限制。
  17. #acks=all:生产者会等待所有副本成功写入该消息,这种方式是最安全的,能够保证消息不丢失,但是延迟也是最大的。
  18. #如果是发送日志之类的,允许部分丢失,可指定acks=0,如果想不丢失消息,可配置为all,但需密切关注性能和吞吐量。
  19. acks: all #默认值:1
  20. #当生产者发送消息收到一个可恢复异常时,会进行重试,这个参数指定了重试的次数。在实际情况中,这个参数需要结合retry.backoff.ms(重试等待间隔)来使用,建议总的重试时间比集群重新选举leader的时间长,这样可以避免生产者过早结束重试导致失败。
  21. #另外需注意,当开启重试时,若未设置max.in.flight.requests.per.connection=1,则可能出现发往同一个分区的两批消息的顺序出错,比如,第一批发送失败了,第二批成功了,然后第一批重试成功了,此时两者的顺序就颠倒了。
  22. retries: 2 #发送失败时重试多少次,0=禁用重试(默认值)
  23. #默认情况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy压缩算法由Google研发,这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
  24. compressionType: "none" #如果不开启压缩,可设置为none(默认值),比较大的消息可开启。
  25. #当多条消息发送到一个分区时,Producer会进行批量发送,这个参数指定了批量消息大小的上限(以字节为单位)。当批量消息达到这个大小时,Producer会一起发送到broker;但即使没有达到这个大小,生产者也会有定时机制来发送消息,避免消息延迟过大。
  26. batch-size: 16384 #默认16K,值越小延迟越低,但是吞吐量和性能会降低。0=禁用批量发送
  27. #这个参数设置Producer暂存待发送消息的缓冲区内存的大小,如果应用调用send方法的速度大于Producer发送的速度,那么调用会阻塞一定(max.block.ms)时间后抛出异常。
  28. buffer-memory: 33554432 #缓冲区默认大小32M
  29. #消费者的配置,可参考:org.apache.kafka.clients.consumer.ConsumerConfig
  30. consumer:
  31. #这个参数可以为任意值,用来指明消息从哪个客户端发出,一般会在打印日志、衡量指标、分配配额时使用。
  32. #暂不用提供clientId,2.x版本可放出来,1.x有多个topic且concurrency>1会出现JMX注册时异常
  33. #clientId: ${spring.application.name} #方便kafkaserver打印日志定位请求来源
  34. # 签中kafka集群
  35. bootstrap-servers: 127.0.0.1:8080 #kafka服务器地址,多个以逗号隔开
  36. #这个参数指定了当消费者第一次读取分区或者无offset时拉取那个位置的消息,可以取值为latest(从最新的消息开始消费),earliest(从最老的消息开始消费),none(如果无offset就抛出异常)
  37. autoOffsetReset: latest #默认值:latest
  38. #这个参数指定了消费者是否自动提交消费位移,默认为true。如果需要减少重复消费或者数据丢失,你可以设置为false,然后手动提交。如果为true,你可能需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
  39. enable-auto-commit: false
  40. #周期性自动提交的间隔,单位毫秒
  41. auto-commit-interval: 2000 #默认值:5000
  42. #这个参数允许消费者指定从broker读取消息时最小的Payload的字节数。当消费者从broker读取消息时,如果数据字节数小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。
  43. fetchMinSize: 1 #默认值: 1
  44. #上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。
  45. fetchMaxWait: 500 #默认值:500毫秒
  46. #这个参数控制一个poll()调用返回的记录数,即consumer每次批量拉多少条数据。
  47. maxPollRecords: 500 #默认值:500
  48. listener:
  49. #创建多少个consumer,值必须小于等于Kafk Topic的分区数。
  50. ack-mode: MANUAL_IMMEDIATE
  51. concurrency: 1 #推荐设置为topic的分区数

配置释义

点开 KafkaProperties 这个类,可以看到这个是SpringBoot 自动配置kafka的配置类,引入这个实例,就相当于你拿到了SpringBoot kafka配置模板的参数,就是上述贴的配置,然后再此基础上重新定义你需要改变的配置,这里主要讲消费者配置。

代码中举了个重写监听servers的例子:

  1. //例如:定制servers
  2. myKafkaProperties.setBootstrapServers(myServers);

@KafkaListener 使用 containerFactory

  1. @Slf4j
  2. @Component
  3. public class ConsumerDemo {
  4. //声明consumerID为demo,监听topicName为topic.quick.demo的Topic
  5. //这个消费者的 containerFactory 是SpringBoot 提供的 kafkaListenerContainerFactory 这个bean
  6. @KafkaListener(id = "demo", topics = "topic.quick.demo")
  7. public void listen(String msgData) {
  8. log.info("demo receive : " + msgData);
  9. }
  10. @KafkaListener(topics = "k010", containerFactory = "myKafkaContainerFactory")
  11. public void listen(String msgData, Acknowledgment ack) {
  12. log.info("demo receive : " + msgData);
  13. //手动提交
  14. //enable.auto.commit参数设置成false。那么就是Spring来替为我们做人工提交,从而简化了人工提交的方式。
  15. //所以kafka和springboot结合中的enable.auto.commit为false为spring的人工提交模式。
  16. //enable.auto.commit为true是采用kafka的默认提交模式。
  17. ack.acknowledge();
  18. }
  19. }

如果在@KafkaListener属性中没有指定 containerFactory 那么Spring Boot 会默认注入 name 为“kafkaListenerContainerFactory” 的 containerFactory。具体源码可跟踪:KafkaListenerAnnotationBeanPostProcessor中的常量:

  1. public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";

消费者重试机制配置

  1. public class KafkaConsumerConfig {
  2. @Bean
  3. @Primary
  4. public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
  5. log.warn("kafkaErrorHandler begin to Handle");
  6. // <1> 创建 DeadLetterPublishingRecoverer 对象
  7. ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
  8. // <2> 创建 FixedBackOff 对象 设置重试间隔 10秒 次数为 3次
  9. BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
  10. // <3> 创建 SeekToCurrentErrorHandler 对象
  11. return new SeekToCurrentErrorHandler(recoverer, backOff);
  12. }
  13. }

消息重试和死信队列的应用

除了上面谈到的通过手动Ack模式来控制消息偏移量外,其实Spring-kafka内部还封装了可重试消费消息的语义,也就是可以设置为当消费数据出现异常时,重试这个消息。而且可以设置重试达到多少次后,让消息进入预定好的Topic。也就是死信队列里。下面代码演示了这种效果:

  1. @Autowired
  2. private KafkaTemplate template;
  3. @Bean
  4. public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
  5. ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
  6. ConsumerFactory<Object, Object> kafkaConsumerFactory,
  7. KafkaTemplate<Object, Object> template) {
  8. ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  9. configurer.configure(factory, kafkaConsumerFactory);
  10. //最大重试三次
  11. factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
  12. return factory;
  13. }
  14. @GetMapping("/send/{input}")
  15. public void sendFoo(@PathVariable String input) {
  16. template.send("topic-kl", input);
  17. }
  18. @KafkaListener(id = "webGroup", topics = "topic-kl")
  19. public String listen(String input) {
  20. logger.info("input value: {}", input);
  21. throw new RuntimeException("dlt");
  22. }
  23. @KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
  24. public void dltListen(String input) {
  25. logger.info("Received from DLT: " + input);
  26. }

上面应用,在topic-kl监听到消息会,会触发运行时异常,然后监听器会尝试三次调用,当到达最大的重试次数后。消息就会被丢掉重试死信队列里面去。死信队列的Topic的规则是,业务Topic名字+“.DLT”。如上面业务Topic的name为“topic-kl”,那么对应的死信队列的Topic就是“topic-kl.DLT”

Spring-kafka消息消费用法探秘

@KafkaListener的使用

前面在简单集成中已经演示过了@KafkaListener接收消息的能力,但是@KafkaListener的功能不止如此,其他的比较常见的,使用场景比较多的功能点如下:

  • 显示的指定消费哪些Topic和分区的消息,
  • 设置每个Topic以及分区初始化的偏移量,
  • 设置消费线程并发度
  • 设置消息异常处理
  1. @KafkaListener(id = "webGroup", topicPartitions = {
  2. @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
  3. @TopicPartition(topic = "topic2", partitions = "0",
  4. partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
  5. },concurrency = "6",errorHandler = "myErrorHandler")
  6. public String listen(String input) {
  7. logger.info("input value: {}", input);
  8. return "successful";
  9. }

手动Ack模式

手动ACK模式,由业务逻辑控制提交偏移量。比如程序在消费时,有这种语义,特别异常情况下不确认ack,也就是不提交偏移量,那么你只能使用手动Ack模式来做了。开启手动首先需要关闭自动提交,然后设置下consumer的消费模式

  1. spring.kafka.consumer.enable-auto-commit=false
  2. spring.kafka.listener.ack-mode=manual

上面的设置好后,在消费时,只需要在@KafkaListener监听方法的入参加入Acknowledgment 即可,执行到ack.acknowledge()代表提交了偏移量

  1. @KafkaListener(id = "webGroup", topics = "topic-kl")
  2. public String listen(String input, Acknowledgment ack) {
  3. logger.info("input value: {}", input);
  4. if ("kl".equals(input)) {
  5. ack.acknowledge();
  6. }
  7. return "successful";
  8. }
标签: kafka spring boot java

本文转载自: https://blog.csdn.net/jinzhiyong01/article/details/125424115
版权归原作者 靳智涌(jack) 所有, 如有侵权,请联系我们删除。

“Spring Boot 自定义kafka 消费者配置 ContainerFactory最佳实践”的评论:

还没有评论