1.业务需求
在springboot项目中,使用spring-kafka消费kafka数据。希望能够控制消费者(KafkaConsumer)启动或停止消费,并且在启动消费时只消费当前时刻以后生产的数据(最新生产的数据),也就是说,启动消费之前未消费的数据不再消费。
2.实现
2.1.创建消费监听
按照官方文档创建一个监听。
官方文档地址
KafkaConsumer.java
@Slf4j@ComponentpublicclassKafkaConsumer{@KafkaListener(
id ="consumer-id",
topics ={"topic1","topic1","topic3"},
groupId ="group-id")publicvoidlisten(ConsumerRecord<?,?> record){Optional<?> kafkaMessage =Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Object message = kafkaMessage.get();String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);}}}
2.2.控制启动/停止消费
通过KafkaListenerEndpointRegistry拿到listenerContainer,操作它即可达到控制目的。
创建一个Kafak控制类,实现控制代码。
KafkaCtrlHandler.java
@Slf4j@ComponentpublicclassKafkaCtrlHandler{@AutowiredprivateKafkaListenerEndpointRegistry registry;/**
* 开始消费
*/publicvoidstart(){MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");assert listenerContainer !=null;if(!listenerContainer.isRunning()){
listenerContainer.start();}
listenerContainer.resume();
log.info("kafka consumer开始消费");}/**
* 停止消费
*/publicvoidstop(){MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");assert listenerContainer !=null;
listenerContainer.pause();
log.info("kafka consumer停止消费");}}
这样即可通过KafkaCtrlHandler 实例来控制消费者开始或者暂停监听。
2.3.控制启动消费时只消费最新数据
让KafkaConsumer类实现org.springframework.kafka.listener包下的ConsumerSeekAware接口,并实现onPartitionsAssigned方法。
监听创建时,设置各个分区的偏移量。
具体原理待研究,有懂的大佬请留言指教。
新的KafkaConsumer.java
@Slf4j@ComponentpublicclassKafkaConsumerimplementsConsumerSeekAware{@OverridepublicvoidonPartitionsAssigned(Map<TopicPartition,Long> assignments,@NonNullConsumerSeekAware.ConsumerSeekCallback callback){
assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
topicPartition.partition()));}@KafkaListener(
id ="consumer-id",
topics ={"topic1","topic1","topic3"},
groupId ="group-id")publicvoidlisten(ConsumerRecord<?,?> record){Optional<?> kafkaMessage =Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Object message = kafkaMessage.get();String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);}}}
注意,修改上面的kafka控制类KafkaCtrlHandler.java,停止消费时让监听器停止(stop)而非暂停(pause)。这样监听才会重新创建并设置各分区的偏移量。
新KafkaCtrlHandler.java
@Slf4j@ComponentpublicclassKafkaCtrlHandler{@AutowiredprivateKafkaListenerEndpointRegistry registry;/**
* 开始消费
*/publicvoidstart(){MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");assert listenerContainer !=null;if(!listenerContainer.isRunning()){
listenerContainer.start();}
listenerContainer.resume();
log.info("kafka consumer开始消费");}/**
* 停止消费
*/publicvoidstop(){MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");assert listenerContainer !=null;// !!!!这里变了!!!!
listenerContainer.stop();
log.info("kafka consumer停止消费");}}
2.4.设置springboot 启动时消费者监听不自动启动
创建配置类
KafkaInitialConfiguration.java
@Slf4j@ConfigurationpublicclassKafkaInitialConfiguration{// 监听器工厂@AutowiredprivateConsumerFactory<String,String> consumerFactory;@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>customContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);//设置是否自动启动
factory.setAutoStartup(false);return factory;}}
配置监听工厂
新的KafkaConsumer.java
@Slf4j@ComponentpublicclassKafkaConsumerimplementsConsumerSeekAware{@OverridepublicvoidonPartitionsAssigned(Map<TopicPartition,Long> assignments,@NonNullConsumerSeekAware.ConsumerSeekCallback callback){
assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
topicPartition.partition()));}@KafkaListener(
id ="consumer-id",
topics ={"topic1","topic1","topic3"},
groupId ="group-id",// !!!这里变了!!!!!
containerFactory ="customContainerFactory")publicvoidlisten(ConsumerRecord<?,?> record){Optional<?> kafkaMessage =Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Object message = kafkaMessage.get();String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);}}}
版权归原作者 程小白丶 所有, 如有侵权,请联系我们删除。