0


@KafkaListener 消费注解解读

在 Spring Kafka 中,消费 Kafka 消息有多种方式,包括单条消费、批量消费、监听特定分区、以及手动控制消息确认(Ack机制)。我们可以通过

@KafkaListener

注解和配置

KafkaListenerContainerFactory

来实现这些功能。下面我会详细解释每种方式的实现。

1. **单条消费 (

ConsumerRecord

)**

默认情况下,

@KafkaListener

注解用于单条消费,每次只处理一条消息。这是最常用的消费方式。

实现示例:
importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@ComponentpublicclassSingleMessageConsumer{@KafkaListener(topics ="my-topic", groupId ="my-group")publicvoidconsume(ConsumerRecord<String,String>record){System.out.println("Received message: "+record.value());// 处理单条消息}}

使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。

2. 批量消费

通过配置批量消费,可以一次性从 Kafka 拉取多条消息并进行处理。需要修改

KafkaListenerContainerFactory

的配置以支持批量消费。

配置
KafkaListenerContainerFactory

首先,在配置类中启用批量消费模式:

importorg.springframework.context.annotation.Bean;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.context.annotation.Configuration;@EnableKafka@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(ConsumerFactory<String,String> consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);// 设置为批量消费模式return factory;}}
实现批量消费:
importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.List;@ComponentpublicclassBatchMessageConsumer{@KafkaListener(topics ="my-topic", groupId ="my-group", containerFactory ="kafkaListenerContainerFactory")publicvoidconsume(List<String> messages){System.out.println("Received batch of messages: "+ messages);// 处理多条消息}}

3. 监听指定分区

如果你只想监听 Kafka 主题中的某些特定分区,可以在

@KafkaListener

中使用

partitions

属性来指定要监听的分区。

实现示例:
importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@ComponentpublicclassPartitionedMessageConsumer{@KafkaListener(topicPartitions =@TopicPartition(topic ="my-topic", partitions ={"0","1"}), 
                   groupId ="my-group")publicvoidconsume(String message){System.out.println("Received message from partition 0 or 1: "+ message);// 处理指定分区的消息}}

在这个例子中,消费者仅监听

my-topic

的第 0 和第 1 分区。

@TopicPartition:
topic-- 需要监听的 Topic 的名称,
partitions – 需要监听 Topic 的分区 id。
partitionOffsets – 可以设置从某个偏移量开始监听,
@PartitionOffset:
partition – 分区 Id,非数组,
initialOffset – 初始偏移量。

@BeanpublicNewTopicbatchWithPartitionTopic(){returnnewNewTopic("topic.batch.partition",8,(short)1);}@KafkaListener(id ="batchWithPartition",clientIdPrefix ="bwp",containerFactory ="batchContainerFactory",
            topicPartitions ={@TopicPartition(topic ="topic.batch.partition",partitions ={"1","3"}),@TopicPartition(topic ="topic.batch.partition",partitions ={"0","4"},
                            partitionOffsets =@PartitionOffset(partition ="2",initialOffset ="100"))})publicvoidbatchListenerWithPartition(List data){
        log.info("topic.batch.partition  receive : ");for(String s : data){
            log.info(s);}}

4. 通过注解获取消息头和消息体

在 Kafka 消息传递过程中,消息包含了消息头(headers)消息体(payload)。可以通过

@Header

注解来获取消息头,通过

@Payload

获取消息体。

实现示例:
importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.support.KafkaHeaders;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Component;@ComponentpublicclassHeaderMessageConsumer{@KafkaListener(topics ="my-topic", groupId ="my-group")publicvoidconsume(@PayloadString message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID)int partition,@Header(KafkaHeaders.OFFSET)long offset,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic){System.out.println("Received message: "+ message);System.out.println("From partition: "+ partition +", topic: "+ topic +", offset: "+ offset);// 处理消息体和消息头}}

在这个示例中,

@Payload

获取消息体,

@Header

获取 Kafka 消息的不同头信息,比如消息来自哪个分区、哪个主题以及偏移量。

5. ACK 机制(手动提交偏移量)

Kafka 消费消息有自动提交和手动提交两种模式。自动提交是默认模式,但你可以通过手动控制

acknowledgement

来实现手动确认机制。

配置手动 Ack:

首先,需要在配置类中启用手动提交:

importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(ConsumerFactory<String,String> consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);// 设置为手动提交ACK模式return factory;}}
实现手动提交 Ack:

在消费代码中,通过

Acknowledgment

参数手动确认消息:

importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.support.Acknowledgment;importorg.springframework.stereotype.Component;@ComponentpublicclassAckMessageConsumer{@KafkaListener(topics ="my-topic", groupId ="my-group", containerFactory ="kafkaListenerContainerFactory")publicvoidconsume(ConsumerRecord<String,String>record,Acknowledgment ack){System.out.println("Received message: "+record.value());// 手动提交偏移量
        ack.acknowledge();}}

在这个示例中,

ack.acknowledge()

用于手动提交偏移量,告诉 Kafka 该消息已经成功处理。这样可以防止消息在未成功处理时被自动确认。

消费总结:

  • **单条消费 (ConsumerRecord)**:每次只消费一条消息,最常见的方式。
  • 批量消费:可以一次性消费多条消息,适合批处理场景。
  • 监听指定分区:通过 @TopicPartition 监听特定分区的消息。
  • 注解方式获取消息头和消息体:通过 @Payload 获取消息体,@Header 获取消息头中的元数据。
  • ACK 机制:通过手动提交偏移量实现对消息的精细控制,避免未处理的消息被自动确认。

常用属性及解释:

  1. topics- 类型: String[]- 作用: 指定要监听的 Kafka 主题(topic)。可以指定一个或多个主题。- 示例:@KafkaListener(topics ={"topic1","topic2"})
  2. topicPartitions- 类型: TopicPartition[]- 作用: 指定监听某个主题的特定分区(Partition)。可以用于精确监听某些分区的消息。- 示例:@KafkaListener(topicPartitions =@TopicPartition(topic ="topic1", partitions ={"0","1"}))- 详细配置: 还可以使用 partitionOffsets 属性设置起始偏移量:@KafkaListener(topicPartitions =@TopicPartition( topic ="topic1", partitionOffsets =@PartitionOffset(partition ="0", initialOffset ="100")))
  3. groupId- 类型: String- 作用: 指定消费者所属的消费组(Consumer Group)。Kafka 会根据消费组进行消息分发和负载均衡。多个消费者可以属于同一消费组,每条消息只会被组中的一个消费者处理。- 示例:@KafkaListener(topics ="topic1", groupId ="my-group")
  4. containerFactory- 类型: String- 作用: 指定要使用的 KafkaListenerContainerFactory,用于配置监听器容器。默认使用 kafkaListenerContainerFactory,可以自定义不同的工厂以支持不同的消费模式(如批量消费)。- 示例:@KafkaListener(topics ="topic1", containerFactory ="batchFactory")
  5. errorHandler- 类型: String- 作用: 指定一个自定义的 KafkaListenerErrorHandler,用于处理消息消费过程中出现的异常。- 示例:@KafkaListener(topics ="topic1", errorHandler ="myErrorHandler")
  6. concurrency- 类型: int- 作用: 指定并发消费者数量,即多少个线程同时消费消息。适用于高吞吐量场景,通过多个消费者实例并发处理。- 示例:@KafkaListener(topics ="topic1", concurrency ="3")
  7. autoStartup- 类型: boolean- 作用: 是否在容器启动时自动启动消费者。默认为 true,如果设置为 false,需要手动启动。- 示例:@KafkaListener(topics ="topic1", autoStartup ="false")
  8. id- 类型: String- 作用: 为 KafkaListener 指定一个唯一的 ID。如果需要手动启动或停止监听器,可以通过这个 ID 进行操作。- 示例:@KafkaListener(id ="myListener", topics ="topic1")
  9. idIsGroup- 类型: boolean- 作用: 当设置为 true 时,id 会被作为消费组 groupId 使用。默认为 false。- 示例:@KafkaListener(id ="myGroup", topics ="topic1", idIsGroup =true)
  10. clientIdPrefix- 类型: String- 作用: 为消费者客户端 ID 添加前缀。Kafka 可以为每个消费者分配一个客户端 ID。- 示例:@KafkaListener(topics ="topic1", clientIdPrefix ="client1-")
  11. beanRef- 类型: String- 作用: 用于指定 Bean 的引用名。默认使用当前 @KafkaListener 注解所在的 Bean 实例。- 示例:@KafkaListener(topics ="topic1", beanRef ="someBean")
  12. properties- 类型: String[]- 作用: 允许在 @KafkaListener 上指定 Kafka 消费者配置属性,优先级高于全局配置。- 示例:@KafkaListener(topics ="topic1", properties ={"auto.offset.reset=earliest","enable.auto.commit=false"})
  13. groupIdExpression- 类型: String- 作用: 用于动态解析 groupId 的值,可以通过 Spring 表达式语言(SpEL)动态获取值。- 示例:@KafkaListener(topics ="topic1", groupIdExpression ="#{environment['kafka.groupId']}")
  14. topicPartitionsExpression- 类型: String- 作用: 动态指定 topicPartitions 的值,支持 SpEL 表达式。- 示例:@KafkaListener(topicPartitionsExpression ="#{T(java.util.Collections).singletonMap('topic1', '0')}")
  15. properties- 类型: String[]- 作用: 指定 Kafka 消费者的配置属性,可以覆盖全局的 Kafka 消费者配置。- 示例:@KafkaListener(topics ="topic1", properties ={"fetch.min.bytes=5000","max.poll.records=10"})
  16. pollTimeout- 类型: long- 作用: 指定 Kafka 消费者轮询的超时时间。默认是 5000 毫秒。- 示例:@KafkaListener(topics ="topic1", pollTimeout ="3000")

综合示例:

@KafkaListener(
    id ="myListener",
    topics ={"topic1","topic2"},
    groupId ="my-group",
    concurrency ="3",
    containerFactory ="batchFactory",
    autoStartup ="true",
    errorHandler ="myErrorHandler",
    properties ={"auto.offset.reset=earliest","enable.auto.commit=false"})publicvoidlisten(List<ConsumerRecord<String,String>> messages){
    messages.forEach(record->{System.out.println("Received message: "+record.value());});}

总结:

  • @KafkaListener 提供了丰富的属性用于配置监听 Kafka 消息的各种细节。
  • 它不仅支持简单的监听配置,还可以通过高级配置实现批量消费、特定分区消费、手动控制 Ack 机制等功能。
  • 通过这些属性,可以为不同的业务场景量身定制 Kafka 消费者逻辑。
标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/weixin_44594317/article/details/142787007
版权归原作者 weixin_44594317 所有, 如有侵权,请联系我们删除。

“@KafkaListener 消费注解解读”的评论:

还没有评论