0


springboot 开启和关闭kafka消费

关闭kafka自动消费

配置自定义容器工厂

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.stereotype.Component;@Component@Configurationpublicclass kafkaConfig {@AutowiredprivateConsumerFactory<String,String> consumerFactory;@Bean("pingKafkaFactory")publicConcurrentKafkaListenerContainerFactory<String,String>delayContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> container =newConcurrentKafkaListenerContainerFactory<String,String>();
        container.setConsumerFactory(consumerFactory);//禁止自动启动
        container.setAutoStartup(false);return container;}}

在消费监听器上使用工厂,并设置id

@KafkaListener(topics ="#{pingProperties.getTopic().split(',')}",id ="pingConsumer",containerFactory ="pingKafkaFactory")

这样,启动项目后,就不会自动消费了。

手动开启和关闭消费

importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.config.KafkaListenerEndpointRegistry;importorg.springframework.stereotype.Service;/**
 * Kafka消费监听服务实现类.
 */@Service@Slf4jpublicclassKafkaConsumerListenerServiceImplimplementsKafkaConsumerListenerService{/**
     * registry.
     */@AutowiredprivateKafkaListenerEndpointRegistry registry;/**
     * 开启监听.
     *
     * @param listenerId 监听ID
     */@OverridepublicvoidstartListener(String listenerId){//判断监听容器是否启动,未启动则将其启动if(!registry.getListenerContainer(listenerId).isRunning()){
            registry.getListenerContainer(listenerId).start();}//项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思//registry.getListenerContainer(listenerId).stop();
        log.info(listenerId +"开启监听成功。");}/**
     * 停止监听.
     *
     * @param listenerId 监听ID
     */@OverridepublicvoidstopListener(String listenerId){
        registry.getListenerContainer(listenerId).stop();
        log.info(listenerId +"停止监听成功。");}}

本文转载自: https://blog.csdn.net/A434534658/article/details/134606582
版权归原作者 阿拉的梦想 所有, 如有侵权,请联系我们删除。

“springboot 开启和关闭kafka消费”的评论:

还没有评论