0


@KafkaListener原理和动态监听kafka topic

@KafkaListener原理和动态监听topic

1、背景

当使用Kafka时可以使用@KafkaListener很方便的对topic进行监听。但是对于在项目启动时,动态增加topic的监听,这种方式就无法实现,因此需要一种动态监听kafka topic的方式。
这种方式需要读取新增的kafka topic,这个不是难点,使用@Schedule注解轮询就可实现,难点在于如何通过代码监听,实现和@KafkaListener同样的效果。

2、@KafkaListener的原理

在这里插入图片描述
从图中不难理解@KafkaListener从启动到拉取消息的过程,可以看到最终是调用KafkaMessageListenerContainer的start()方法,启动线程调用kafkaConsumer的poll()方法和被注解的方法。

3、解决方案

从上面已经可以看出最终是调用KafkaMessageListenerContainer的start()方法进行监听kafka topic的消息,那么我们将动态变化的kafka配置生成一个KafkaMessageListenerContainer,并启动即可。
以下源码是KafkaMessageListenerContainer的构造函数

publicKafkaMessageListenerContainer(ConsumerFactory<?superK,?superV> consumerFactory,ContainerProperties containerProperties){this(null, consumerFactory, containerProperties,(TopicPartitionInitialOffset[])null);}

因此我们需要构建ConsumerFactory和ContainerProperties,对于ConsumerFactory,其实现类为DefaultKafkaConsumerFactory,构造函数为:

publicDefaultKafkaConsumerFactory(Map<String,Object> configs,@NullableDeserializer<K> keyDeserializer,@NullableDeserializer<V> valueDeserializer){this.configs =newHashMap<>(configs);this.keyDeserializer = keyDeserializer;this.valueDeserializer = valueDeserializer;}

通过kafka的属性和序列化方式即可初始化DefaultKafkaConsumerFactory。
ContainerProperties存放了kafka监听器运行时的相关属性,因此在初始化后,还需要将kafka的相关属性赋值进去。
最后示例代码:

// consumer配置Map<String,Object> configMap =Maps.newHashMap();// 采用手动提交的方式
configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,5000);
configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxxxxx");
configMap.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group");// kafka监听器Deserializer<String> stringDeserializer =newStringDeserializer();DefaultKafkaConsumerFactory<String,String> factory =newDefaultKafkaConsumerFactory<>(configMap, stringDeserializer, stringDeserializer);ContainerProperties props =newContainerProperties("test-topic");
props.setMessageListener(newCustomerMsgHandler("test-topic"));
props.setGroupId(configMap.get(ConsumerConfig.GROUP_ID_CONFIG).toString());
props.setAckMode(ContainerProperties.AckMode.MANUAL);KafkaMessageListenerContainer<String,String> container =newKafkaMessageListenerContainer<>(factory, props);// 启动
container.start();

对于自己处理消息的类,需要实现AcknowledgingMessageListener的onMessage方法:

@Slf4jpublicclassCustomerMsgHandlerimplementsAcknowledgingMessageListener<String,String>{privateString topic;publicCustomerMsgHandler(String topic){this.topic = topic;}@OverridepublicvoidonMessage(ConsumerRecord<String,String> data,Acknowledgment acknowledgment){// doSomething// 因为前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack
        acknowledgment.acknowledge();}}

以上,可以通过读取配置,实例化KafkaMessageListenerContainer并调用其start()方法,实现动态kafka topic的监听。

标签: kafka

本文转载自: https://blog.csdn.net/qq_34826261/article/details/124281878
版权归原作者 毅一s 所有, 如有侵权,请联系我们删除。

“@KafkaListener原理和动态监听kafka topic”的评论:

还没有评论