关闭kafka自动消费
配置自定义容器工厂
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.stereotype.Component;@Component@Configurationpublicclass kafkaConfig {@AutowiredprivateConsumerFactory<String,String> consumerFactory;@Bean("pingKafkaFactory")publicConcurrentKafkaListenerContainerFactory<String,String>delayContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> container =newConcurrentKafkaListenerContainerFactory<String,String>();
container.setConsumerFactory(consumerFactory);//禁止自动启动
container.setAutoStartup(false);return container;}}
在消费监听器上使用工厂,并设置id
@KafkaListener(topics ="#{pingProperties.getTopic().split(',')}",id ="pingConsumer",containerFactory ="pingKafkaFactory")
这样,启动项目后,就不会自动消费了。
手动开启和关闭消费
importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.config.KafkaListenerEndpointRegistry;importorg.springframework.stereotype.Service;/**
* Kafka消费监听服务实现类.
*/@Service@Slf4jpublicclassKafkaConsumerListenerServiceImplimplementsKafkaConsumerListenerService{/**
* registry.
*/@AutowiredprivateKafkaListenerEndpointRegistry registry;/**
* 开启监听.
*
* @param listenerId 监听ID
*/@OverridepublicvoidstartListener(String listenerId){//判断监听容器是否启动,未启动则将其启动if(!registry.getListenerContainer(listenerId).isRunning()){
registry.getListenerContainer(listenerId).start();}//项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思//registry.getListenerContainer(listenerId).stop();
log.info(listenerId +"开启监听成功。");}/**
* 停止监听.
*
* @param listenerId 监听ID
*/@OverridepublicvoidstopListener(String listenerId){
registry.getListenerContainer(listenerId).stop();
log.info(listenerId +"停止监听成功。");}}
本文转载自: https://blog.csdn.net/A434534658/article/details/134606582
版权归原作者 阿拉的梦想 所有, 如有侵权,请联系我们删除。
版权归原作者 阿拉的梦想 所有, 如有侵权,请联系我们删除。