0


Kafka整合springcloud

1、系统架构
spring-cloud版本Hoxton.SR4spring-boot版本

2.2.6.RELEASE

java版本1.8Kafka版本2.4.0.RELEASE
2、pom引入Kafka依赖

<!--Kafka消息队列-->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.4.0.RELEASE</version>
</dependency>

!!!!!注意!!!!!

spring-kafka与spring-boot的版本对应,详情参考网址:Spring for Apache Kafka

3、编写yml配置

  #kafka配置
  kafka:
    producer:
      bootstrap-servers: {你的Kafka服务器IP}
      retries: 0
      batch-size: 4096         #单位是字节
      buffer-memory: 33554432   #单位是字节,默认是33554432字节即32MB
      #序列化类,可以自己写然后配置在这里
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      enable-auto-commit: false #禁止自动提交offest
      auto-offset-reset: latest 
      bootstrap-servers: {你的Kafka服务器IP}
      group-id: {你的Kafka群组id}

4、编写Kafka配置类,注册消费者在这里

/**
 * Kafka配置
 * @date 2024年01月18日 11:30
 */

@Configuration
@EnableKafka
public class KafkaConfig {
    Logger logger= LoggerFactory.getLogger(KafkaConfig.class);

    /**
     * 注册消费者
     * @date 2024/1/18 11:31
     * @return KafkaReceiver
     */
    @Bean
    public ZfxtMsgPendingTaskReceiver listener() {
        return new ZfxtMsgPendingTaskReceiver();
    }
}

5、编写生产者

/**
     * 发送
     * @date 2024/1/18 16:32
     * @param topic Kafka标签名
     * @param key 消息id
     * @param data 数据
     */
    public void send(String topic, String key, MsgPendingTaskKafkaDTO data) {
        //发送消息
        ListenableFuture<SendResult<String, MsgPendingTaskKafkaDTO>> send = kafkaTemplate.send(topic, key, data);
        //异步发送,编写监听器监听结果
        send.completable().whenCompleteAsync((result, ex) -> {
            String s = result.toString();
            if (null == ex) {
                //成功则打点日志
                logger.info("{}生产者发送消息成功:{}", topic, s);
            } else {
                //这里发生错误则存日志进数据库
            }
        });
    }

6、编写消费者

    /**
     * 消费者
     * @date 2024/1/18 16:36
     * @param msg 消息
     */
    @KafkaListener(topics = {你的消息标题,对应发送者的topic字段})
    public void receive(ConsumerRecord<String, String> msg, Acknowledgment ack){
        logger.info("我收到了消息");
        //定义Kafka唯一消息id,避免消息重复消费
        //成功的有没有
        boolean success=zfxtMsgPendingTaskService.hasKey(msg.key());
        //失败的有没有
        boolean exception = kafkaExceptionService.hasKey(msg.key());
        if(success|| exception){
             //两个之中有一个就不处理了
            logger.info("消息重复消费");
        }else{
            //没有自定义序列化要我们自己手动转json
            String value = msg.value();
            MsgPendingTaskKafkaDTO dto = JSON.parseObject(value, MsgPendingTaskKafkaDTO.class);
            //业务处理
        }
        //手动提交偏移量
        ack.acknowledge();
    }

7、要是服务异常导致不能消费或者网络波动导致消费消息失败咋办呢?

我们可以编写消息重试机制,具体如下:

    /**
     * 配置消息重试机制
     * @date 2024/1/19 9:44
     * @param consumerFactory
     * @param exceptionService kafka异常日志记录服务
     * @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer<java.lang.String,java.lang.String>>
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,KafkaExceptionService exceptionService) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        //设置超时时间1.5s
        factory.getContainerProperties().setPollTimeout(1500);

        logger.info("执行kafka容器工厂配置");

        // 消息是否或异常重试次数3次 (5000=5秒  3=重试3次)
        // 注意: 没有配置配置手动提交offset,不生效, 因为没有配置手动提交时消息接受到就会自动提交,不会管程序是否异常
        // DefaultErrorHandler构造参数包含两个参数:
        // 一个是ConsumerRecordRecoverer消息回收处理器,用于超过重试次数执行对应消息回收
        // (里面包含:{
        // 1、consumerRecord记录kafaka消息的属性,topic,分区,offest偏移量;e:对应异常
        // 2、BiConsumer二元消费者(一元消费者可以参考Collection的forEach函数),accept方法用于编写消费者具体操作,andThen方法用于控制消费者
        // }
        // 得益于这玩意我们可以不需要写handler直接通过lambda函数的方式来编写详细代码
        // )
        // 一个是BackOff延时执行器(interval:时间间隔;maxAttempts:retry次数)

        //自定义错误消息处理器
        SeekToCurrentErrorHandler defaultErrorHandler = new SeekToCurrentErrorHandler(((consumerRecord, e) -> {
            logger.info("kafka消息消费出现异常,e:{}",e.getMessage());
            //超过重试次数记录日志
            Object key = consumerRecord.key();
            if(exceptionService.hasKey(key.toString())){
                logger.warn("id重复");
            }else{
                // TODO: 2024/1/30 记录日志
            }
        }), new FixedBackOff(5000, 3));

        //多个可使用batchErrorHandler
        //设置默认错误处理器
        factory.setErrorHandler(defaultErrorHandler);

        // 最后配置手动提交offset
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

就这样吧

标签: kafka spring cloud

本文转载自: https://blog.csdn.net/vazrgqcy2/article/details/135669157
版权归原作者 蛮荒兽人持键小子 所有, 如有侵权,请联系我们删除。

“Kafka整合springcloud”的评论:

还没有评论