文章目录
概述
在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。
在Spring Boot中,要实现动态的控制或关闭消费以及动态开启或关闭监听,可以使用Spring Kafka提供的一些功能。
思路
首先,需要配置Kafka消费者的相关属性。在Spring Boot中,可以通过在application.properties或application.yml文件中添加相应的配置来实现。
以下是一个示例配置:
spring.kafka.consumer.bootstrap-servers=<Kafka服务器地址>
spring.kafka.consumer.group-id=<消费者组ID>
接下来,可以创建一个Kafka消费者,使用
@KafkaListener
注解来指定要监听的Kafka主题,并编写相应的消息处理方法。例如:
importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@ComponentpublicclassKafkaConsumer{@KafkaListener(topics ="<Kafka主题>")publicvoidreceive(String message){// 处理接收到的消息}}
现在,你可以使用以下两种方法来控制或关闭消费以及动态开启或关闭监听:
**方法1:使用
@KafkaListener
注解的
autoStartup
属性**
@KafkaListener
注解具有一个名为
autoStartup
的属性,可以用于控制是否自动启动消费者。默认情况下,它的值为
true
,表示自动启动。如果将其设置为
false
,则消费者将不会自动启动。
@KafkaListener(topics ="<Kafka主题>", autoStartup ="false")publicvoidreceive(String message){// 处理接收到的消息}
要在运行时动态启动消费者,你可以通过
KafkaListenerEndpointRegistry
bean来手动启动:
@AutowiredprivateKafkaListenerEndpointRegistry endpointRegistry;// 启动消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").start();
同样,你也可以使用
stop()
方法来停止消费者:
// 停止消费者
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").stop();
**方法2:使用
KafkaListenerEndpointRegistry
bean的
pause()
和
resume()
方法**
KafkaListenerEndpointRegistry
bean提供了
pause()
和
resume()
方法,用于暂停和恢复消费者的监听。
@AutowiredprivateKafkaListenerEndpointRegistry endpointRegistry;// 暂停消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").pause();// 恢复消费者监听
endpointRegistry.getListenerContainer("<KafkaListener的bean名称>").resume();
使用这些方法,可以在运行时动态地控制或关闭消费,以及动态地开启或关闭监听。
Code
importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importorg.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;importorg.springframework.kafka.listener.ContainerProperties;importjava.util.HashMap;importjava.util.Map;/**
* @author artisan
*/@Slf4j@ConfigurationpublicclassKafkaConfig{@Value("${spring.kafka.bootstrap-servers}")privateString bootstrapServer;@Value("${spring.kafka.consumer.auto-offset-reset}")privateString autoOffsetReset;@Value("${spring.kafka.consumer.enable-auto-commit}")privateString enableAutoCommit;@Value("${spring.kafka.consumer.key-deserializer}")privateString keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")privateString valueDeserializer;@Value("${spring.kafka.consumer.group-id}")privateString group_id;@Value("${spring.kafka.consumer.max-poll-records}")privateString maxPollRecords;@Value("${spring.kafka.consumer.max-poll-interval-ms}")privateString maxPollIntervalMs;@Value("${spring.kafka.listener.concurrency}")privateInteger concurrency;privatefinalString consumerInterceptor ="net.zf.module.system.kafka.interceptor.FailureRateInterceptor";/**
* 消费者配置信息
*/@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>(32);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollIntervalMs);
props.put(ConsumerConfig.GROUP_ID_CONFIG,group_id);
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,consumerInterceptor );return props;}/**
* 消费者批量工厂
*/@BeanpublicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>batchFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(newDefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setBatchListener(true);
factory.setConcurrency(concurrency);return factory;}/**
* 异常处理器
*
* @return
*/@BeanpublicConsumerAwareListenerErrorHandlerconsumerAwareListenerErrorHandler(){return(message, exception, consumer)->{// log.error("消息{} , 异常原因{}", message, exception.getMessage());
log.error("consumerAwareListenerErrorHandler called");returnnull;};}}
使用
@KafkaListener(topicPattern =KafkaTopicConstant.ATTACK_MESSAGE +".*",
containerFactory ="batchFactory",
errorHandler ="consumerAwareListenerErrorHandler",
id ="attackConsumer")publicvoidprocessMessage(List<String> records,Acknowledgment ack){
log.info("AttackKafkaConsumer 当前线程 {} , 本次拉取的数据总量:{} ",Thread.currentThread().getId(), records.size());try{List<AttackMessage> attackMessages =newArrayList();
records.stream().forEach(record->{
messageExecutorFactory.process(KafkaTopicConstant.ATTACK_MESSAGE).execute(record, attackMessages);});if(!attackMessages.isEmpty()){
attackMessageESService.addDocuments(attackMessages,false);}}finally{
ack.acknowledge();}}
在这段代码中,@KafkaListener注解表示这是一个Kafka消费者,
- topicPattern参数指定了该消费者要监听的主题的模式,即以
KafkaTopicConstant.ATTACK_MESSAGE
开头的所有主题。 - containerFactory参数指定了用于创建Kafka监听器容器的工厂类别名。
- errorHandler参数指定了用于处理监听器抛出异常的错误处理器。id参数指定了该消费者的ID。
在该消费者的方法中,当有消息到达时,records参数将包含一组消息记录,ack参数用于手动确认已经消费了这些消息。
在方法中,首先记录了当前线程ID和拉取的数据总量。将消息记录逐一处理,并将处理结果存储在一个名为attackMessages的列表中。如果列表不为空,则将其添加到ES搜索引擎中。
最后,手动确认已经消费了这些消息。
【控制】
importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.config.KafkaListenerEndpointRegistry;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;@Slf4j@RestControllerpublicclassKafkaConsumerController{@AutowiredprivateKafkaListenerEndpointRegistry registry;/**
* 开启监听
*/@GetMapping("/start")publicvoidstart(){// 判断监听容器是否启动,未启动则将其启动if(!registry.getListenerContainer("attackConsumer").isRunning()){
log.info("start ");
registry.getListenerContainer("attackConsumer").start();}// 将其恢复
registry.getListenerContainer("attackConsumer").resume();
log.info("resume over ");}/**
* 关闭监听
*/@GetMapping("/pause")publicvoidpause(){// 暂停监听
registry.getListenerContainer("attackConsumer").pause();
log.info("pause");}}
扩展
KafkaListenerEndpointRegistry
KafkaListenerEndpointRegistry
是 Spring Kafka 提供的一个组件,用于管理 Kafka 消费者监听器的注册和启动。它是一个接口,提供了管理 Kafka 监听器容器的方法,如注册和启动监听器容器,以及暂停和恢复监听器容器等。
在 Spring Boot 应用程序中使用
@KafkaListener
注解时,Spring Kafka 会自动创建一个
KafkaListenerEndpointRegistry
实例,并使用它来管理所有的 Kafka 监听器容器。 它是 Spring Kafka 中的一个核心组件,用于实现 Kafka 消费者的监听和控制。
版权归原作者 小小工匠 所有, 如有侵权,请联系我们删除。