0


springboot kafka消费者启动/停止监听控制,启动时只消费此时之后的数据

1.业务需求

在springboot项目中,使用spring-kafka消费kafka数据。希望能够控制消费者(KafkaConsumer)启动或停止消费,并且在启动消费时只消费当前时刻以后生产的数据(最新生产的数据),也就是说,启动消费之前未消费的数据不再消费。

2.实现

2.1.创建消费监听

按照官方文档创建一个监听。
官方文档地址

KafkaConsumer.java

@Slf4j@ComponentpublicclassKafkaConsumer{@KafkaListener(
            id ="consumer-id",
            topics ={"topic1","topic1","topic3"},
            groupId ="group-id")publicvoidlisten(ConsumerRecord<?,?> record){Optional<?> kafkaMessage =Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Object message = kafkaMessage.get();String topic = record.topic();
            log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);}}}

2.2.控制启动/停止消费

通过KafkaListenerEndpointRegistry拿到listenerContainer,操作它即可达到控制目的。
创建一个Kafak控制类,实现控制代码。

KafkaCtrlHandler.java

@Slf4j@ComponentpublicclassKafkaCtrlHandler{@AutowiredprivateKafkaListenerEndpointRegistry registry;/**
     * 开始消费
     */publicvoidstart(){MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");assert listenerContainer !=null;if(!listenerContainer.isRunning()){
            listenerContainer.start();}
        listenerContainer.resume();
        log.info("kafka consumer开始消费");}/**
     * 停止消费
     */publicvoidstop(){MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");assert listenerContainer !=null;
        listenerContainer.pause();
        log.info("kafka consumer停止消费");}}

这样即可通过KafkaCtrlHandler 实例来控制消费者开始或者暂停监听。

2.3.控制启动消费时只消费最新数据

让KafkaConsumer类实现org.springframework.kafka.listener包下的ConsumerSeekAware接口,并实现onPartitionsAssigned方法。
监听创建时,设置各个分区的偏移量。
具体原理待研究,有懂的大佬请留言指教。

新的KafkaConsumer.java

@Slf4j@ComponentpublicclassKafkaConsumerimplementsConsumerSeekAware{@OverridepublicvoidonPartitionsAssigned(Map<TopicPartition,Long> assignments,@NonNullConsumerSeekAware.ConsumerSeekCallback callback){
        assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
                     topicPartition.partition()));}@KafkaListener(
            id ="consumer-id",
            topics ={"topic1","topic1","topic3"},
            groupId ="group-id")publicvoidlisten(ConsumerRecord<?,?> record){Optional<?> kafkaMessage =Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Object message = kafkaMessage.get();String topic = record.topic();
            log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);}}}

注意,修改上面的kafka控制类KafkaCtrlHandler.java,停止消费时让监听器停止(stop)而非暂停(pause)。这样监听才会重新创建并设置各分区的偏移量。

新KafkaCtrlHandler.java

@Slf4j@ComponentpublicclassKafkaCtrlHandler{@AutowiredprivateKafkaListenerEndpointRegistry registry;/**
     * 开始消费
     */publicvoidstart(){MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");assert listenerContainer !=null;if(!listenerContainer.isRunning()){
            listenerContainer.start();}
        listenerContainer.resume();
        log.info("kafka consumer开始消费");}/**
     * 停止消费
     */publicvoidstop(){MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");assert listenerContainer !=null;// !!!!这里变了!!!!
        listenerContainer.stop();
        log.info("kafka consumer停止消费");}}

2.4.设置springboot 启动时消费者监听不自动启动

创建配置类

KafkaInitialConfiguration.java

@Slf4j@ConfigurationpublicclassKafkaInitialConfiguration{// 监听器工厂@AutowiredprivateConsumerFactory<String,String> consumerFactory;@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>customContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);//设置是否自动启动
        factory.setAutoStartup(false);return factory;}}

配置监听工厂

新的KafkaConsumer.java

@Slf4j@ComponentpublicclassKafkaConsumerimplementsConsumerSeekAware{@OverridepublicvoidonPartitionsAssigned(Map<TopicPartition,Long> assignments,@NonNullConsumerSeekAware.ConsumerSeekCallback callback){
        assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
                     topicPartition.partition()));}@KafkaListener(
            id ="consumer-id",
            topics ={"topic1","topic1","topic3"},
            groupId ="group-id",// !!!这里变了!!!!!
            containerFactory ="customContainerFactory")publicvoidlisten(ConsumerRecord<?,?> record){Optional<?> kafkaMessage =Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Object message = kafkaMessage.get();String topic = record.topic();
            log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);}}}
标签: kafka spring boot java

本文转载自: https://blog.csdn.net/qq_44651842/article/details/128921709
版权归原作者 程小白丶 所有, 如有侵权,请联系我们删除。

“springboot kafka消费者启动/停止监听控制,启动时只消费此时之后的数据”的评论:

还没有评论