0


【kafka专栏】SpringBoot下`@KafkaListener`消费监听属性详解

文章目录

一、

@KafkaListener

属性概述

KafkaListener有若干的配置属性,这些配置属性使用或者是结合使用,可以方便快捷的帮助我们实现kafka消费者数据监听的需求。这里的属性比较多,先大概了解一下,后续我会介绍。

public@interfaceKafkaListener{/**
    * 消费者的id,如果没有配置或默认生成一个。如果配置了会覆盖groupId,笔者的经验这个配置不需要配
    */Stringid()default"";/**
    * 配置一个bean,类型为:org.springframework.kafka.config.KafkaListenerContainerFactory
    */StringcontainerFactory()default"";/**
    * 三选一:该消费者组监听的Topic名称
    */String[]topics()default{};/**
    * 三选一:通过为消费者组指定表达式匹配监听多个Topic(笔者从来没用过,也不建议使用)
    */StringtopicPattern()default"";/**
    * 三选一:消费组指定监听Topic的若干分区。
    */TopicPartition[]topicPartitions()default{};/**
    * 没用过,不知道作用
    */StringcontainerGroup()default"";/**
    * Listener的异常处理器,后续会介绍
    * @since 1.3
    */StringerrorHandler()default"";/**
    * 消费者组的分组id
    * @since 1.3
    */StringgroupId()default"";/**
    * 设否设置id属性为消费组组id
    * @since 1.3
    */booleanidIsGroup()defaulttrue;/**
    * 消费者组所在客户端的客户端id的前缀,用于kafka客户端分类
    * @since 2.1.1
    */StringclientIdPrefix()default"";/**
    * 用于SpEL表达式,获取当前Listener的配置信息
    * 如获取监听Topic列表的SpEL表达式为 : "#{__listener.topicList}"
    * @return the pseudo bean name.
    * @since 2.1.2
    */StringbeanRef()default"__listener";/**
    * 当前消费者组启动多少了消费者线程,并行执行消费动作
    * @since 2.2
    */Stringconcurrency()default"";/**
    * 是否自动启动,true or false
    * @since 2.2
    */StringautoStartup()default"";/**
    * Kafka consumer 属性配置,支持所有的apache kafka 消费者属性配置
    * 但不包括group.id 和 client.id 配置属性
    * @since 2.2.4
    */String[]properties()default{};/**
    * 笔者从来没用过,自己理解下面的这段英文吧
    * When false and the return type is an {@link Iterable} return the result as the
    * value of a single reply record instead of individual records for each element.
    * Default true. Ignored if the reply is of type {@code Iterable<Message<?>>}.
    * @return false to create a single reply record.
    * @since 2.3.5
    */booleansplitIterables()defaulttrue;/**
    *  笔者从来没用过,自己理解下面的这段英文吧
    * Set the bean name of a
    * {@link org.springframework.messaging.converter.SmartMessageConverter} (such as the
    * {@link org.springframework.messaging.converter.CompositeMessageConverter}) to use
    * in conjunction with the
    * {@link org.springframework.messaging.MessageHeaders#CONTENT_TYPE} header to perform
    * the conversion to the required type. If a SpEL expression is provided
    * ({@code #{...}}), the expression can either evaluate to a
    * {@link org.springframework.messaging.converter.SmartMessageConverter} instance or a
    * bean name.
    * @return the bean name.
    * @since 2.7.1
    */StringcontentTypeConverter()default"";}

在这里插入图片描述

二、结合自定义配置是实现监听(常规用法)

通常我们会把消费者监听的主题,消费者组名称,消费者组中消费者数量等常用信息做成自定义配置(而不是在代码中写死),如下所示:

myconsumer:topic: topic-a,topic-b
    group-id: group-demo
    concurrency:5

下面的消费者监听器监听了两个topic:topic-a,topic-b(使用SpEL表达式逗号分割为字符串数组),该消费者组命名为group-demo,包含5个消费者线程并行消费。

@KafkaListener(topics ="#{'${myconsumer.topic}'.split(',')}",
        groupId ="${myconsumer.group-id}",
        concurrency="${myconsumer.concurrency}")publicvoidreadMsg(ConsumerRecord consumerRecord){//监听到数据之后,进行处理操作}

三、指定Topic分区及偏移量进行消费

在某些特殊场景下,希望消费Topic主题中的某几个分区(不是全部分区消费)。或者是针对某个分区从指定的偏移量开始消费。

@KafkaListener(topicPartitions ={@TopicPartition(topic ="topic-a", partitions ={"0","1"}),@TopicPartition(topic ="topic-b", partitions ={"0","4"},partitionOffsets =@PartitionOffset(partition ="1", initialOffset ="300"))})publicvoidreadMsg(ConsumerRecord<?,?>record){}

上面例子监听

topic-a

的0,1分区(可能包含不只2个分区);监听

topic-b

的第0和4分区 ,并且第1分区从offset为300的开始消费;

五、指定监听器工厂(实现消息过滤)

@ConfigurationpublicclassKafkaInitialConfiguration{// 监听器工厂@AutowiredprivateConsumerFactory consumerFactory;// 配置一个消息过滤策略@BeanpublicConcurrentKafkaListenerContainerFactorymyFilterContainerFactory(){ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);//设置消息过滤策略
        factory.setRecordFilterStrategy(newRecordFilterStrategy(){@Overridepublicbooleanfilter(ConsumerRecord consumerRecord){//这里做逻辑判断//返回true的消息将会被丢弃returntrue;}});return factory;}}

使用方法,myFilterContainerFactory是上文中bean方法的名称。

@KafkaListener(containerFactory ="myFilterContainerFactory")

六、properties配置

除了上面提到的一些配置属性,实际上apache kafka consumer支持的原生配置属性,要比Spring 提供的配置属性多得多。所有的apache kafka原生配置属性都可以通过properties配置来传递。

@KafkaListener(properties ={"enable.auto.commit:false","max.poll.interval.ms:6000"})

七、注解方式获取消息头及消息体

  • @Payload:获取的是消息的消息体,也就是发送的数据内容
  • @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的key
  • @Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的
  • @Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的TopicName
  • @Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取到数据时间的时间戳
@KafkaListener(topics ="topic-a")publicvoidreadMsg(@PayloadString data,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)Integer key,@Header(KafkaHeaders.RECEIVED_PARTITION_ID)int partition,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP)long ts){}

八、消息转发

Spring-Kafka只需要通过一个@SendTo注解即可以实现消息的转发,被注解方法的return值即转发的消息内容:

@ComponentpublicclassKafkaConsumer{// 消费监听@KafkaListener(topics ={"topic1"})@SendTo("topic2")publicStringlisten1(String data){System.out.println("业务A收到消息:"+ data);return data +"(已处理)";}// 消费监听@KafkaListener(topics ={"topic2"})publicvoidlisten2(String data){System.out.println("业务B收到消息:"+ data);}}
  • 消息发往topic1,topic1处理完成的数据通过SendTo发往topic2
  • topic2如果收到数据,打印业务B收到消息: + 你发送的消息数据
标签: kafka spring boot java

本文转载自: https://blog.csdn.net/hanxiaotongtong/article/details/125698275
版权归原作者 字母哥哥 所有, 如有侵权,请联系我们删除。

“【kafka专栏】SpringBoot下`@KafkaListener`消费监听属性详解”的评论:

还没有评论