在 Spring Kafka 中,消费 Kafka 消息有多种方式,包括单条消费、批量消费、监听特定分区、以及手动控制消息确认(Ack机制)。我们可以通过
@KafkaListener
注解和配置
KafkaListenerContainerFactory
来实现这些功能。下面我会详细解释每种方式的实现。
1. **单条消费 (
ConsumerRecord
)**
默认情况下,
@KafkaListener
注解用于单条消费,每次只处理一条消息。这是最常用的消费方式。
实现示例:
importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@ComponentpublicclassSingleMessageConsumer{@KafkaListener(topics ="my-topic", groupId ="my-group")publicvoidconsume(ConsumerRecord<String,String>record){System.out.println("Received message: "+record.value());// 处理单条消息}}
使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。
2. 批量消费
通过配置批量消费,可以一次性从 Kafka 拉取多条消息并进行处理。需要修改
KafkaListenerContainerFactory
的配置以支持批量消费。
配置
KafkaListenerContainerFactory
:
首先,在配置类中启用批量消费模式:
importorg.springframework.context.annotation.Bean;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.context.annotation.Configuration;@EnableKafka@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(ConsumerFactory<String,String> consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);// 设置为批量消费模式return factory;}}
实现批量消费:
importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.List;@ComponentpublicclassBatchMessageConsumer{@KafkaListener(topics ="my-topic", groupId ="my-group", containerFactory ="kafkaListenerContainerFactory")publicvoidconsume(List<String> messages){System.out.println("Received batch of messages: "+ messages);// 处理多条消息}}
3. 监听指定分区
如果你只想监听 Kafka 主题中的某些特定分区,可以在
@KafkaListener
中使用
partitions
属性来指定要监听的分区。
实现示例:
importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@ComponentpublicclassPartitionedMessageConsumer{@KafkaListener(topicPartitions =@TopicPartition(topic ="my-topic", partitions ={"0","1"}),
groupId ="my-group")publicvoidconsume(String message){System.out.println("Received message from partition 0 or 1: "+ message);// 处理指定分区的消息}}
在这个例子中,消费者仅监听
my-topic
的第 0 和第 1 分区。
@TopicPartition:
topic-- 需要监听的 Topic 的名称,
partitions – 需要监听 Topic 的分区 id。
partitionOffsets – 可以设置从某个偏移量开始监听,
@PartitionOffset:
partition – 分区 Id,非数组,
initialOffset – 初始偏移量。
@BeanpublicNewTopicbatchWithPartitionTopic(){returnnewNewTopic("topic.batch.partition",8,(short)1);}@KafkaListener(id ="batchWithPartition",clientIdPrefix ="bwp",containerFactory ="batchContainerFactory",
topicPartitions ={@TopicPartition(topic ="topic.batch.partition",partitions ={"1","3"}),@TopicPartition(topic ="topic.batch.partition",partitions ={"0","4"},
partitionOffsets =@PartitionOffset(partition ="2",initialOffset ="100"))})publicvoidbatchListenerWithPartition(List data){
log.info("topic.batch.partition receive : ");for(String s : data){
log.info(s);}}
4. 通过注解获取消息头和消息体
在 Kafka 消息传递过程中,消息包含了消息头(headers)和消息体(payload)。可以通过
@Header
注解来获取消息头,通过
@Payload
获取消息体。
实现示例:
importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.support.KafkaHeaders;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Component;@ComponentpublicclassHeaderMessageConsumer{@KafkaListener(topics ="my-topic", groupId ="my-group")publicvoidconsume(@PayloadString message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID)int partition,@Header(KafkaHeaders.OFFSET)long offset,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic){System.out.println("Received message: "+ message);System.out.println("From partition: "+ partition +", topic: "+ topic +", offset: "+ offset);// 处理消息体和消息头}}
在这个示例中,
@Payload
获取消息体,
@Header
获取 Kafka 消息的不同头信息,比如消息来自哪个分区、哪个主题以及偏移量。
5. ACK 机制(手动提交偏移量)
Kafka 消费消息有自动提交和手动提交两种模式。自动提交是默认模式,但你可以通过手动控制
acknowledgement
来实现手动确认机制。
配置手动 Ack:
首先,需要在配置类中启用手动提交:
importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(ConsumerFactory<String,String> consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);// 设置为手动提交ACK模式return factory;}}
实现手动提交 Ack:
在消费代码中,通过
Acknowledgment
参数手动确认消息:
importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.support.Acknowledgment;importorg.springframework.stereotype.Component;@ComponentpublicclassAckMessageConsumer{@KafkaListener(topics ="my-topic", groupId ="my-group", containerFactory ="kafkaListenerContainerFactory")publicvoidconsume(ConsumerRecord<String,String>record,Acknowledgment ack){System.out.println("Received message: "+record.value());// 手动提交偏移量
ack.acknowledge();}}
在这个示例中,
ack.acknowledge()
用于手动提交偏移量,告诉 Kafka 该消息已经成功处理。这样可以防止消息在未成功处理时被自动确认。
消费总结:
- **单条消费 (
ConsumerRecord
)**:每次只消费一条消息,最常见的方式。 - 批量消费:可以一次性消费多条消息,适合批处理场景。
- 监听指定分区:通过
@TopicPartition
监听特定分区的消息。 - 注解方式获取消息头和消息体:通过
@Payload
获取消息体,@Header
获取消息头中的元数据。 - ACK 机制:通过手动提交偏移量实现对消息的精细控制,避免未处理的消息被自动确认。
常用属性及解释:
topics
- 类型:String[]
- 作用: 指定要监听的 Kafka 主题(topic)。可以指定一个或多个主题。- 示例:@KafkaListener(topics ={"topic1","topic2"})
topicPartitions
- 类型:TopicPartition[]
- 作用: 指定监听某个主题的特定分区(Partition)。可以用于精确监听某些分区的消息。- 示例:@KafkaListener(topicPartitions =@TopicPartition(topic ="topic1", partitions ={"0","1"}))
- 详细配置: 还可以使用partitionOffsets
属性设置起始偏移量:@KafkaListener(topicPartitions =@TopicPartition( topic ="topic1", partitionOffsets =@PartitionOffset(partition ="0", initialOffset ="100")))
groupId
- 类型:String
- 作用: 指定消费者所属的消费组(Consumer Group)。Kafka 会根据消费组进行消息分发和负载均衡。多个消费者可以属于同一消费组,每条消息只会被组中的一个消费者处理。- 示例:@KafkaListener(topics ="topic1", groupId ="my-group")
containerFactory
- 类型:String
- 作用: 指定要使用的KafkaListenerContainerFactory
,用于配置监听器容器。默认使用kafkaListenerContainerFactory
,可以自定义不同的工厂以支持不同的消费模式(如批量消费)。- 示例:@KafkaListener(topics ="topic1", containerFactory ="batchFactory")
errorHandler
- 类型:String
- 作用: 指定一个自定义的KafkaListenerErrorHandler
,用于处理消息消费过程中出现的异常。- 示例:@KafkaListener(topics ="topic1", errorHandler ="myErrorHandler")
concurrency
- 类型:int
- 作用: 指定并发消费者数量,即多少个线程同时消费消息。适用于高吞吐量场景,通过多个消费者实例并发处理。- 示例:@KafkaListener(topics ="topic1", concurrency ="3")
autoStartup
- 类型:boolean
- 作用: 是否在容器启动时自动启动消费者。默认为true
,如果设置为false
,需要手动启动。- 示例:@KafkaListener(topics ="topic1", autoStartup ="false")
id
- 类型:String
- 作用: 为KafkaListener
指定一个唯一的 ID。如果需要手动启动或停止监听器,可以通过这个 ID 进行操作。- 示例:@KafkaListener(id ="myListener", topics ="topic1")
idIsGroup
- 类型:boolean
- 作用: 当设置为true
时,id
会被作为消费组groupId
使用。默认为false
。- 示例:@KafkaListener(id ="myGroup", topics ="topic1", idIsGroup =true)
clientIdPrefix
- 类型:String
- 作用: 为消费者客户端 ID 添加前缀。Kafka 可以为每个消费者分配一个客户端 ID。- 示例:@KafkaListener(topics ="topic1", clientIdPrefix ="client1-")
beanRef
- 类型:String
- 作用: 用于指定 Bean 的引用名。默认使用当前@KafkaListener
注解所在的 Bean 实例。- 示例:@KafkaListener(topics ="topic1", beanRef ="someBean")
properties
- 类型:String[]
- 作用: 允许在@KafkaListener
上指定 Kafka 消费者配置属性,优先级高于全局配置。- 示例:@KafkaListener(topics ="topic1", properties ={"auto.offset.reset=earliest","enable.auto.commit=false"})
groupIdExpression
- 类型:String
- 作用: 用于动态解析groupId
的值,可以通过 Spring 表达式语言(SpEL)动态获取值。- 示例:@KafkaListener(topics ="topic1", groupIdExpression ="#{environment['kafka.groupId']}")
topicPartitionsExpression
- 类型:String
- 作用: 动态指定topicPartitions
的值,支持 SpEL 表达式。- 示例:@KafkaListener(topicPartitionsExpression ="#{T(java.util.Collections).singletonMap('topic1', '0')}")
properties
- 类型:String[]
- 作用: 指定 Kafka 消费者的配置属性,可以覆盖全局的 Kafka 消费者配置。- 示例:@KafkaListener(topics ="topic1", properties ={"fetch.min.bytes=5000","max.poll.records=10"})
pollTimeout
- 类型:long
- 作用: 指定 Kafka 消费者轮询的超时时间。默认是 5000 毫秒。- 示例:@KafkaListener(topics ="topic1", pollTimeout ="3000")
综合示例:
@KafkaListener(
id ="myListener",
topics ={"topic1","topic2"},
groupId ="my-group",
concurrency ="3",
containerFactory ="batchFactory",
autoStartup ="true",
errorHandler ="myErrorHandler",
properties ={"auto.offset.reset=earliest","enable.auto.commit=false"})publicvoidlisten(List<ConsumerRecord<String,String>> messages){
messages.forEach(record->{System.out.println("Received message: "+record.value());});}
总结:
@KafkaListener
提供了丰富的属性用于配置监听 Kafka 消息的各种细节。- 它不仅支持简单的监听配置,还可以通过高级配置实现批量消费、特定分区消费、手动控制 Ack 机制等功能。
- 通过这些属性,可以为不同的业务场景量身定制 Kafka 消费者逻辑。
版权归原作者 weixin_44594317 所有, 如有侵权,请联系我们删除。