文章目录
一、
@KafkaListener
属性概述
KafkaListener有若干的配置属性,这些配置属性使用或者是结合使用,可以方便快捷的帮助我们实现kafka消费者数据监听的需求。这里的属性比较多,先大概了解一下,后续我会介绍。
public@interfaceKafkaListener{/**
* 消费者的id,如果没有配置或默认生成一个。如果配置了会覆盖groupId,笔者的经验这个配置不需要配
*/Stringid()default"";/**
* 配置一个bean,类型为:org.springframework.kafka.config.KafkaListenerContainerFactory
*/StringcontainerFactory()default"";/**
* 三选一:该消费者组监听的Topic名称
*/String[]topics()default{};/**
* 三选一:通过为消费者组指定表达式匹配监听多个Topic(笔者从来没用过,也不建议使用)
*/StringtopicPattern()default"";/**
* 三选一:消费组指定监听Topic的若干分区。
*/TopicPartition[]topicPartitions()default{};/**
* 没用过,不知道作用
*/StringcontainerGroup()default"";/**
* Listener的异常处理器,后续会介绍
* @since 1.3
*/StringerrorHandler()default"";/**
* 消费者组的分组id
* @since 1.3
*/StringgroupId()default"";/**
* 设否设置id属性为消费组组id
* @since 1.3
*/booleanidIsGroup()defaulttrue;/**
* 消费者组所在客户端的客户端id的前缀,用于kafka客户端分类
* @since 2.1.1
*/StringclientIdPrefix()default"";/**
* 用于SpEL表达式,获取当前Listener的配置信息
* 如获取监听Topic列表的SpEL表达式为 : "#{__listener.topicList}"
* @return the pseudo bean name.
* @since 2.1.2
*/StringbeanRef()default"__listener";/**
* 当前消费者组启动多少了消费者线程,并行执行消费动作
* @since 2.2
*/Stringconcurrency()default"";/**
* 是否自动启动,true or false
* @since 2.2
*/StringautoStartup()default"";/**
* Kafka consumer 属性配置,支持所有的apache kafka 消费者属性配置
* 但不包括group.id 和 client.id 配置属性
* @since 2.2.4
*/String[]properties()default{};/**
* 笔者从来没用过,自己理解下面的这段英文吧
* When false and the return type is an {@link Iterable} return the result as the
* value of a single reply record instead of individual records for each element.
* Default true. Ignored if the reply is of type {@code Iterable<Message<?>>}.
* @return false to create a single reply record.
* @since 2.3.5
*/booleansplitIterables()defaulttrue;/**
* 笔者从来没用过,自己理解下面的这段英文吧
* Set the bean name of a
* {@link org.springframework.messaging.converter.SmartMessageConverter} (such as the
* {@link org.springframework.messaging.converter.CompositeMessageConverter}) to use
* in conjunction with the
* {@link org.springframework.messaging.MessageHeaders#CONTENT_TYPE} header to perform
* the conversion to the required type. If a SpEL expression is provided
* ({@code #{...}}), the expression can either evaluate to a
* {@link org.springframework.messaging.converter.SmartMessageConverter} instance or a
* bean name.
* @return the bean name.
* @since 2.7.1
*/StringcontentTypeConverter()default"";}
二、结合自定义配置是实现监听(常规用法)
通常我们会把消费者监听的主题,消费者组名称,消费者组中消费者数量等常用信息做成自定义配置(而不是在代码中写死),如下所示:
myconsumer:topic: topic-a,topic-b
group-id: group-demo
concurrency:5
下面的消费者监听器监听了两个topic:topic-a,topic-b(使用SpEL表达式逗号分割为字符串数组),该消费者组命名为group-demo,包含5个消费者线程并行消费。
@KafkaListener(topics ="#{'${myconsumer.topic}'.split(',')}",
groupId ="${myconsumer.group-id}",
concurrency="${myconsumer.concurrency}")publicvoidreadMsg(ConsumerRecord consumerRecord){//监听到数据之后,进行处理操作}
三、指定Topic分区及偏移量进行消费
在某些特殊场景下,希望消费Topic主题中的某几个分区(不是全部分区消费)。或者是针对某个分区从指定的偏移量开始消费。
@KafkaListener(topicPartitions ={@TopicPartition(topic ="topic-a", partitions ={"0","1"}),@TopicPartition(topic ="topic-b", partitions ={"0","4"},partitionOffsets =@PartitionOffset(partition ="1", initialOffset ="300"))})publicvoidreadMsg(ConsumerRecord<?,?>record){}
上面例子监听
topic-a
的0,1分区(可能包含不只2个分区);监听
topic-b
的第0和4分区 ,并且第1分区从offset为300的开始消费;
五、指定监听器工厂(实现消息过滤)
@ConfigurationpublicclassKafkaInitialConfiguration{// 监听器工厂@AutowiredprivateConsumerFactory consumerFactory;// 配置一个消息过滤策略@BeanpublicConcurrentKafkaListenerContainerFactorymyFilterContainerFactory(){ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);//设置消息过滤策略
factory.setRecordFilterStrategy(newRecordFilterStrategy(){@Overridepublicbooleanfilter(ConsumerRecord consumerRecord){//这里做逻辑判断//返回true的消息将会被丢弃returntrue;}});return factory;}}
使用方法,myFilterContainerFactory是上文中bean方法的名称。
@KafkaListener(containerFactory ="myFilterContainerFactory")
六、properties配置
除了上面提到的一些配置属性,实际上apache kafka consumer支持的原生配置属性,要比Spring 提供的配置属性多得多。所有的apache kafka原生配置属性都可以通过properties配置来传递。
@KafkaListener(properties ={"enable.auto.commit:false","max.poll.interval.ms:6000"})
七、注解方式获取消息头及消息体
- @Payload:获取的是消息的消息体,也就是发送的数据内容
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的key
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的
- @Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的TopicName
- @Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取到数据时间的时间戳
@KafkaListener(topics ="topic-a")publicvoidreadMsg(@PayloadString data,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)Integer key,@Header(KafkaHeaders.RECEIVED_PARTITION_ID)int partition,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP)long ts){}
八、消息转发
Spring-Kafka只需要通过一个@SendTo注解即可以实现消息的转发,被注解方法的return值即转发的消息内容:
@ComponentpublicclassKafkaConsumer{// 消费监听@KafkaListener(topics ={"topic1"})@SendTo("topic2")publicStringlisten1(String data){System.out.println("业务A收到消息:"+ data);return data +"(已处理)";}// 消费监听@KafkaListener(topics ={"topic2"})publicvoidlisten2(String data){System.out.println("业务B收到消息:"+ data);}}
- 消息发往topic1,topic1处理完成的数据通过SendTo发往topic2
- topic2如果收到数据,打印
业务B收到消息:
+ 你发送的消息数据
版权归原作者 字母哥哥 所有, 如有侵权,请联系我们删除。