0


Kafka整合springcloud

1、系统架构
spring-cloud版本Hoxton.SR4spring-boot版本

  1. 2.2.6.RELEASE

java版本1.8Kafka版本2.4.0.RELEASE
2、pom引入Kafka依赖

  1. <!--Kafka消息队列-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>2.4.0.RELEASE</version>
  6. </dependency>

!!!!!注意!!!!!

spring-kafka与spring-boot的版本对应,详情参考网址:Spring for Apache Kafka

3、编写yml配置

  1. #kafka配置
  2. kafka:
  3. producer:
  4. bootstrap-servers: {你的Kafka服务器IP}
  5. retries: 0
  6. batch-size: 4096 #单位是字节
  7. buffer-memory: 33554432 #单位是字节,默认是33554432字节即32MB
  8. #序列化类,可以自己写然后配置在这里
  9. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  10. consumer:
  11. enable-auto-commit: false #禁止自动提交offest
  12. auto-offset-reset: latest
  13. bootstrap-servers: {你的Kafka服务器IP}
  14. group-id: {你的Kafka群组id}

4、编写Kafka配置类,注册消费者在这里

  1. /**
  2. * Kafka配置
  3. * @date 2024年01月18日 11:30
  4. */
  5. @Configuration
  6. @EnableKafka
  7. public class KafkaConfig {
  8. Logger logger= LoggerFactory.getLogger(KafkaConfig.class);
  9. /**
  10. * 注册消费者
  11. * @date 2024/1/18 11:31
  12. * @return KafkaReceiver
  13. */
  14. @Bean
  15. public ZfxtMsgPendingTaskReceiver listener() {
  16. return new ZfxtMsgPendingTaskReceiver();
  17. }
  18. }

5、编写生产者

  1. /**
  2. * 发送
  3. * @date 2024/1/18 16:32
  4. * @param topic Kafka标签名
  5. * @param key 消息id
  6. * @param data 数据
  7. */
  8. public void send(String topic, String key, MsgPendingTaskKafkaDTO data) {
  9. //发送消息
  10. ListenableFuture<SendResult<String, MsgPendingTaskKafkaDTO>> send = kafkaTemplate.send(topic, key, data);
  11. //异步发送,编写监听器监听结果
  12. send.completable().whenCompleteAsync((result, ex) -> {
  13. String s = result.toString();
  14. if (null == ex) {
  15. //成功则打点日志
  16. logger.info("{}生产者发送消息成功:{}", topic, s);
  17. } else {
  18. //这里发生错误则存日志进数据库
  19. }
  20. });
  21. }

6、编写消费者

  1. /**
  2. * 消费者
  3. * @date 2024/1/18 16:36
  4. * @param msg 消息
  5. */
  6. @KafkaListener(topics = {你的消息标题,对应发送者的topic字段})
  7. public void receive(ConsumerRecord<String, String> msg, Acknowledgment ack){
  8. logger.info("我收到了消息");
  9. //定义Kafka唯一消息id,避免消息重复消费
  10. //成功的有没有
  11. boolean success=zfxtMsgPendingTaskService.hasKey(msg.key());
  12. //失败的有没有
  13. boolean exception = kafkaExceptionService.hasKey(msg.key());
  14. if(success|| exception){
  15. //两个之中有一个就不处理了
  16. logger.info("消息重复消费");
  17. }else{
  18. //没有自定义序列化要我们自己手动转json
  19. String value = msg.value();
  20. MsgPendingTaskKafkaDTO dto = JSON.parseObject(value, MsgPendingTaskKafkaDTO.class);
  21. //业务处理
  22. }
  23. //手动提交偏移量
  24. ack.acknowledge();
  25. }

7、要是服务异常导致不能消费或者网络波动导致消费消息失败咋办呢?

我们可以编写消息重试机制,具体如下:

  1. /**
  2. * 配置消息重试机制
  3. * @date 2024/1/19 9:44
  4. * @param consumerFactory
  5. * @param exceptionService kafka异常日志记录服务
  6. * @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer<java.lang.String,java.lang.String>>
  7. */
  8. @Bean
  9. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,KafkaExceptionService exceptionService) {
  10. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  11. factory.setConsumerFactory(consumerFactory);
  12. //设置超时时间1.5s
  13. factory.getContainerProperties().setPollTimeout(1500);
  14. logger.info("执行kafka容器工厂配置");
  15. // 消息是否或异常重试次数3次 (5000=5秒 3=重试3次)
  16. // 注意: 没有配置配置手动提交offset,不生效, 因为没有配置手动提交时消息接受到就会自动提交,不会管程序是否异常
  17. // DefaultErrorHandler构造参数包含两个参数:
  18. // 一个是ConsumerRecordRecoverer消息回收处理器,用于超过重试次数执行对应消息回收
  19. // (里面包含:{
  20. // 1、consumerRecord记录kafaka消息的属性,topic,分区,offest偏移量;e:对应异常
  21. // 2、BiConsumer二元消费者(一元消费者可以参考Collection的forEach函数),accept方法用于编写消费者具体操作,andThen方法用于控制消费者
  22. // }
  23. // 得益于这玩意我们可以不需要写handler直接通过lambda函数的方式来编写详细代码
  24. // )
  25. // 一个是BackOff延时执行器(interval:时间间隔;maxAttempts:retry次数)
  26. //自定义错误消息处理器
  27. SeekToCurrentErrorHandler defaultErrorHandler = new SeekToCurrentErrorHandler(((consumerRecord, e) -> {
  28. logger.info("kafka消息消费出现异常,e:{}",e.getMessage());
  29. //超过重试次数记录日志
  30. Object key = consumerRecord.key();
  31. if(exceptionService.hasKey(key.toString())){
  32. logger.warn("id重复");
  33. }else{
  34. // TODO: 2024/1/30 记录日志
  35. }
  36. }), new FixedBackOff(5000, 3));
  37. //多个可使用batchErrorHandler
  38. //设置默认错误处理器
  39. factory.setErrorHandler(defaultErrorHandler);
  40. // 最后配置手动提交offset
  41. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
  42. return factory;
  43. }

就这样吧

标签: kafka spring cloud

本文转载自: https://blog.csdn.net/vazrgqcy2/article/details/135669157
版权归原作者 蛮荒兽人持键小子 所有, 如有侵权,请联系我们删除。

“Kafka整合springcloud”的评论:

还没有评论