0


SpringBoot——》@KafkaListener

推荐链接:
总结——》【Java】
总结——》【Mysql】
总结——》【Redis】
总结——》【Spring】
总结——》【SpringBoot】
总结——》【MyBatis、MyBatis-Plus】

SpringBoot——》@KafkaListener

方法功能String id() default “”;监听器idString containerFactory() default “”;监听器工厂String[] topics() default {};监听器topicsString topicPattern() default “”;监听器topics匹配正则表达式TopicPartition[] topicPartitions() default {};监听器分区String errorHandler() default “”;异常处理器String groupId() default “”;分组idboolean idIsGroup() default true;是否使用id作为groupId

一、监听器id

@KafkaListener(id ="listenerForSyncEsfCommunity", topics ="test_topic1")

1、在相同容器中,监听器id不能重复

如果ID重复,会报错

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

2、使用默认配置的消费组

kafka.consumer.group-id = xxxxx
// 消费组为xxxxx@KafkaListener(id ="listenerForSyncEsfCommunity",idIsGroup =false)

3、使用自定义的消费组

// 方式一:消费组为listenerForSyncEsfCommunity@KafkaListener(id ="listenerForSyncEsfCommunity")// 方式二:消费组为groupId-test@KafkaListener(id ="listenerForSyncEsfCommunity",idIsGroup =false,groupId ="groupId-test")

二、监听器工厂

1、定义kafkaListenerContainerFactory

@Bean("kafkaListenerContainerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();// consumerGroupId为空时,会用默认的groupId
    factory.setConsumerFactory(consumerFactory("g1"));
    factory.setConcurrency(4);// 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(3000);return factory;}

2、配置containerFactory参数

@KafkaListener(id ="listenerForSyncEsfCommunity", topics ="test_topic1", containerFactory ="kafkaListenerContainerFactory")

三、监听器topics

1、固定监听topics

// 指定多个topic@KafkaListener(id ="listenerForSyncEsfCommunity", topics ={"test_topic1","test_topic2"})

2、动态监听topics

自定义配置:

kafka.consumer.topics=topic1,topic2
// Spring的SpEl表达式@KafkaListener(topics ="#{'${kafka.consumer.topics}'.split(',')}")

四、监听器topics匹配正则表达式

@KafkaListener(id ="listenerForSyncEsfCommunity", topicPattern ="test_.*topic.*")

五、监听器分区

@KafkaListener(id ="listenerForSyncEsfCommunity",  topicPartitions ={@TopicPartition(topic ="topic1", partitions ={"0","1"}),@TopicPartition(topic ="topic2", partitions ="0")})

六、异常处理器

异常处理有2种方式:

  • 方式一:consumer中手动try/catch
  • 方式二:实现KafkaListenerErrorHandler,重写异常处理逻辑

1、实现KafkaListenerErrorHandler

@Component("kafkaErrorHandler")publicclassKafkaDefaultListenerErrorHandlerimplementsKafkaListenerErrorHandler{@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException exception){returnnull;}@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException exception,Consumer<?,?> consumer){//TODOreturnnull;}}

2、配置errorHandler参数

// 调用的时候errorHandler的值填写beanName@KafkaListener(id ="listenerForSyncEsfCommunity", topics ="topic1",errorHandler ="kafkaErrorHandler")

七、分组id

参考监听器id

八、是否使用id作为groupId

参考监听器id

标签: spring boot java kafka

本文转载自: https://blog.csdn.net/weixin_43453386/article/details/128189386
版权归原作者 小仙。 所有, 如有侵权,请联系我们删除。

“SpringBoot——》@KafkaListener”的评论:

还没有评论