0


kafka消费多个topic的使用

kafka消费多个topic的使用

我们在业务中难免遇到一个kafka消费多个topic的消息,本文帮助大家如何在业务中用一个类消费多个topic消息

一、不同kafka的不同topic

配置类1

@EnableKafka
@Configuration
publicclassKafkaOneConfig{
    @Value("${spring.kafka.one.bootstrap-servers}")private String bootstrapServers;

    @Value("${spring.kafka.one.consumer.group-id}")private String groupId;

    @Value("${spring.kafka.one.consumer.enable-auto-commit}")public boolean enableAutoCommit;

    @Value("${spring.kafka.consumer.consumer.batch}")public boolean batch;

    @Value("${spring.kafka.consumer.max-poll-records}")public String maxPollRecordsConfig;
    @Value("${spring.kafka.consumer.auto-offset-reset}")public String autoOffsetReset;
    @Value("${spring.kafka.properties.request.timeout.ms}")public Integer requestTimeout;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>kafkaOneListenerContainer(){//注意不同kafka的这个方法名不同
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());extracted(factory);return factory;}publicvoidextracted(ConcurrentKafkaListenerContainerFactory<Integer, String> factory){
        factory.setBatchListener(batch);// 批量拉取数据
        factory.setConcurrency(3);// 线程数
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);// ack模式
        factory.getContainerProperties().setPollTimeout(requestTimeout);//请求超时时间}private ConsumerFactory<Integer, String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs(bootstrapServers, groupId));}public Map<String, Object>consumerConfigs(String bootstrapServers, String groupId){
        Map<String, Object> props =newHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不同kafka的连接串
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);// 每次批量获取的数据量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);// latest 接收最新的消息return props;}}

配置类2

@EnableKafka
@Configuration
publicclassKafkaTwoConfigextendsKafkaOneConfig{//继承关系,避免重复代码
    @Value("${spring.kafka.two.bootstrap-servers}")private String bootstrapServers;

    @Value("${spring.kafka.two.consumer.group-id}")private String groupId;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>kafkaTwoListenerContainer(){注意不同kafka的这个方法名不同
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());super.extracted(factory);return factory;}private ConsumerFactory<Integer, String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(super.consumerConfigs(bootstrapServers, groupId));}}

配置文件yml

spring:main:
    allow-bean-definition-overriding:truekafka:one:
      bootstrap-servers:123123consumer:topic:123
        group-id:123
        enable-auto-commit:truetwo:
      bootstrap-servers:456456consumer:topic:456
        group-id:123
        enable-auto-commit:true

配置文件.properties:

spring.kafka.consumer.auto-offset-reset=latest
#spring.kafka.consumer.auto-commit-interval=3000
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.listener.concurrency=10
spring.kafka.Mysql.consumer.batch=true
spring.kafka.listener.type=batch
spring.kafka.listener.ack-mode=batch
spring.kafka.properties.request.timeout.ms=60000

消费者类java

@KafkaListener(topics ={"${spring.kafka.one.consumer.topic}"}, containerFactory ="kafkaOneListenerContainer")// 读取配置的类1方法publicvoidconsume1(List<ConsumerRecord<String, String>> records){}

@KafkaListener(topics ={"${spring.kafka.two.consumer.topic}"}, containerFactory ="kafkaTwoListenerContainer")// 读取配置的类2方法publicvoidconsume2(List<ConsumerRecord<String, String>> records){}

二、同一个kafka的不同topic

直接使用@KafkaListiner

@KafkaListener(id ="123",
            topics ={// 注意kafka每次批量读取数据的时候不是每次只读取一个topic的消息,而是读取多个topic的消息"a","b","c",})publicvoidconsume(List<ConsumerRecord<String, String>> records){}
标签: kafka java 分布式

本文转载自: https://blog.csdn.net/m0_68178610/article/details/130564794
版权归原作者 黑马Y 所有, 如有侵权,请联系我们删除。

“kafka消费多个topic的使用”的评论:

还没有评论