0


springboot使用@KafkaListener监听多个kafka配置

    背景: 使用springboot整合kafka时, springboot默认读取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener时指定topic即可, 当服务中需要监听多个kafka时, 需要配置多个kafka, 这种方式不适用

    方案: 可以手动读取不同kafka配置信息, 创建不同的Kafka 监听容器工厂, 使用@KafkaListener时指定相应的容器工厂, 代码如下:
  1. 导入依赖
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
  1. yml配置
kafka:
  # 默认消费者配置
  default-consumer:
    # 自动提交已消费offset
    enable-auto-commit: true
    # 自动提交间隔时间
    auto-commit-interval: 1000
    # 消费的超时时间
    poll-timeout: 1500
    # 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
    auto.offset.reset: latest
    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
    session.timeout.ms: 120000
    # 消费请求超时时间
    request.timeout.ms: 180000
  # 1号kafka配置
  test1:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx
  # 2号kafka配置
  test2:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx
  1. 容器工厂配置
package com.zhdx.modules.backstage.config;

import com.google.common.collect.Maps;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.Map;

/**
 * kafka监听容器工厂配置
 * <p>
 * 拓展其他消费者配置只需配置指定的属性和bean即可
 */
@EnableKafka
@Configuration
@RefreshScope
public class KafkaListenerContainerFactoryConfig {

    /**
     *  test1 kafka配置
     */
    @Value("${kafka.test1.bootstrap-servers}")
    private String test1KafkaServerUrls;

    @Value("${kafka.test1.consumer.group-id}")
    private String test1GroupId;

    @Value("${kafka.test1.consumer.sasl.mechanism}")
    private String test1SaslMechanism;

    @Value("${kafka.test1.consumer.security.protocol}")
    private String test1SecurityProtocol;

    @Value("${kafka.test1.consumer.sasl.jaas.config}")
    private String test1SaslJaasConfig;
    /**
     *  test2 kafka配置
     */
    @Value("${kafka.test2.bootstrap-servers}")
    private String test2KafkaServerUrls;

    @Value("${kafka.test2.consumer.group-id}")
    private String test2GroupId;

    @Value("${kafka.test2.consumer.sasl.mechanism}")
    private String test2SaslMechanism;

    @Value("${kafka.test2.consumer.security.protocol}")
    private String test2SecurityProtocol;

    @Value("${kafka.test2.consumer.sasl.jaas.config}")
    private String test2SaslJaasConfig;

    /**
     * 默认消费者配置
     */
    @Value("${kafka.default-consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${kafka.default-consumer.poll-timeout}")
    private int pollTimeout;

    @Value("${kafka.default-consumer.auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${kafka.default-consumer.session.timeout.ms}")
    private int sessionTimeoutMs;

    @Value("${kafka.default-consumer.request.timeout.ms}")
    private int requestTimeoutMs;

    /**
     * test1消费者配置
     */
    public Map<String, Object> test1ConsumerConfigs() {
        Map<String, Object> props = getDefaultConsumerConfigs();
        // broker server地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls);
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId);
        // 加密
        props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism);
        props.put("security.protocol", test1SecurityProtocol);
        // 账号密码
        props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig);
        return props;
    }
    
    /**
     * test2消费者配置
     */
    public Map<String, Object> test2ConsumerConfigs() {
        Map<String, Object> props = getDefaultConsumerConfigs();
        // broker server地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls);
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId);
        // 加密
        props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism);
        props.put("security.protocol", test2SecurityProtocol);
        // 账号密码
        props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig);
        return props;
    }

    /**
     * 默认消费者配置
     */
    private Map<String, Object> getDefaultConsumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        // 自动提交(按周期)已消费offset 批量消费下设置false
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        // 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        // 消费请求超时时间
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
        // 序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return props;
    }

    /**
     * 消费者工厂类
     */
    public ConsumerFactory<String, String> initConsumerFactory(Map<String, Object> consumerConfigs) {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs);
    }

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> initKafkaListenerContainerFactory(
        Map<String, Object> consumerConfigs) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(initConsumerFactory(consumerConfigs));
        // 是否开启批量消费
        factory.setBatchListener(false);
        // 消费的超时时间
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        return factory;
    }

    /**
     * 创建test1 Kafka 监听容器工厂。
     *
     * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象
     */
    @Bean(name = "test1KafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test1KafkaListenerContainerFactory() {
        Map<String, Object> consumerConfigs = this.test1ConsumerConfigs();
        return initKafkaListenerContainerFactory(consumerConfigs);
    }
    

    /**
     * 创建test2 Kafka 监听容器工厂。
     *
     * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象
     */
    @Bean(name = "test2KafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test2KafkaListenerContainerFactory() {
        Map<String, Object> consumerConfigs = this.test2ConsumerConfigs();
        return initKafkaListenerContainerFactory(consumerConfigs);
    }
}
4. @KafkaListener使用
package com.zhdx.modules.backstage.kafka;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka监听器
 */
@Slf4j
@Component
public class test1KafkaListener {
    @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx")
    public void handleHyPm(ConsumerRecord<String, String> record) {
        log.info("消费到topic xxx消息:{}", JSON.toJSONString(record.value()));
    }
}

本文转载自: https://blog.csdn.net/qq_40313687/article/details/135340134
版权归原作者 道不平 所有, 如有侵权,请联系我们删除。

“springboot使用@KafkaListener监听多个kafka配置”的评论:

还没有评论