推荐链接:
总结——》【Java】
总结——》【Mysql】
总结——》【Redis】
总结——》【Spring】
总结——》【SpringBoot】
总结——》【MyBatis、MyBatis-Plus】
SpringBoot——》@KafkaListener
方法功能String id() default “”;监听器idString containerFactory() default “”;监听器工厂String[] topics() default {};监听器topicsString topicPattern() default “”;监听器topics匹配正则表达式TopicPartition[] topicPartitions() default {};监听器分区String errorHandler() default “”;异常处理器String groupId() default “”;分组idboolean idIsGroup() default true;是否使用id作为groupId
一、监听器id
@KafkaListener(id ="listenerForSyncEsfCommunity", topics ="test_topic1")
1、在相同容器中,监听器id不能重复
如果ID重复,会报错
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
2、使用默认配置的消费组
kafka.consumer.group-id = xxxxx
// 消费组为xxxxx@KafkaListener(id ="listenerForSyncEsfCommunity",idIsGroup =false)
3、使用自定义的消费组
// 方式一:消费组为listenerForSyncEsfCommunity@KafkaListener(id ="listenerForSyncEsfCommunity")// 方式二:消费组为groupId-test@KafkaListener(id ="listenerForSyncEsfCommunity",idIsGroup =false,groupId ="groupId-test")
二、监听器工厂
1、定义kafkaListenerContainerFactory
@Bean("kafkaListenerContainerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();// consumerGroupId为空时,会用默认的groupId
factory.setConsumerFactory(consumerFactory("g1"));
factory.setConcurrency(4);// 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);return factory;}
2、配置containerFactory参数
@KafkaListener(id ="listenerForSyncEsfCommunity", topics ="test_topic1", containerFactory ="kafkaListenerContainerFactory")
三、监听器topics
1、固定监听topics
// 指定多个topic@KafkaListener(id ="listenerForSyncEsfCommunity", topics ={"test_topic1","test_topic2"})
2、动态监听topics
自定义配置:
kafka.consumer.topics=topic1,topic2
// Spring的SpEl表达式@KafkaListener(topics ="#{'${kafka.consumer.topics}'.split(',')}")
四、监听器topics匹配正则表达式
@KafkaListener(id ="listenerForSyncEsfCommunity", topicPattern ="test_.*topic.*")
五、监听器分区
@KafkaListener(id ="listenerForSyncEsfCommunity", topicPartitions ={@TopicPartition(topic ="topic1", partitions ={"0","1"}),@TopicPartition(topic ="topic2", partitions ="0")})
六、异常处理器
异常处理有2种方式:
- 方式一:consumer中手动try/catch
- 方式二:实现KafkaListenerErrorHandler,重写异常处理逻辑
1、实现KafkaListenerErrorHandler
@Component("kafkaErrorHandler")publicclassKafkaDefaultListenerErrorHandlerimplementsKafkaListenerErrorHandler{@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException exception){returnnull;}@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException exception,Consumer<?,?> consumer){//TODOreturnnull;}}
2、配置errorHandler参数
// 调用的时候errorHandler的值填写beanName@KafkaListener(id ="listenerForSyncEsfCommunity", topics ="topic1",errorHandler ="kafkaErrorHandler")
七、分组id
参考监听器id
八、是否使用id作为groupId
参考监听器id
版权归原作者 小仙。 所有, 如有侵权,请联系我们删除。