0


@KafkaListener的配置使用

@KafkaListener注解来自spring-kafka包。使用@KafkaListener消费消息,需要按照spring-kafka指定的格式填写kafka配置信息,即可自动装配生成相关的KafkaConsumer实例,然后使用@KafkaListener消费消息。这里需要注意,使用自动装载方式生成KafkaConsumer实例时,spring-kafka的配置参数与原生kafka的配置参数在格式上略有不同,因此,本文主要介绍了spring-kafka自动装载方式下生产者、消费者常用的配置参数,供参考使用:

1、依赖项

<!-- spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version></dependency><!-- kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version></dependency><!-- 配置信息补全提示 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>

2、配置文件

spring:kafka:producer:bootstrap-servers: 172.*.*.1:8423,172.*.*.2:8423,172.*.*.3:8423,172.*.*.4:8423,172.*.*.5:8423key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      ### 这里无效,因为这是Kafka服务器的配置# auto.create.topics.enable: false# 生产者信息properties:sasl.mechanism: SCRAM-SHA-512security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='***' password='md5(***)';consumer:bootstrap-servers: 172.*.*.1:8423,172.*.*.2:8423,172.*.*.3:8423,172.*.*.4:8423,172.*.*.5:8423key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id:***# 拉取数据数量上限(不满足时等待poll-timeout毫秒)max-poll-records:200# 拉取数据字节下限(不满足时等待fetch-max-wait毫秒)fetch-min-size:1# 拉取数据等待上限(不满足fetch-min-size的等待时间)fetch-max-wait:5000# 手动提交偏移量enable-auto-commit:false# 偏移量复位方式 earliest latest noneauto-offset-reset: earliest
      # 消费者信息properties:sasl.mechanism: SCRAM-SHA-512security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='***' password='md5(***)';listener:# 拉取数据方式: 批量type: batch
      # 请求数据小于max-poll-records,poll方法会持续请求,直到超时poll-timeout:500# 指定listener容器中的线程数,用于提高并发量(可在代码中配置)# concurrency: 6ack-mode: manual_immediate
    properties:# 拉取数据间隔(须大于消息处理耗时)max:poll:interval:ms:600000# group coordinator判定消费实例僵死并踢除的时间阈值session:timeout:ms:120000#默认10000

3、代码块

@Slf4j@ComponentpublicclassXxxKafkaListener{@AutowiredXxxKafkaConsumer xxxKafkaConsumer;// @KafkaListener(topics = "#{'${topics.xxx}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}")@KafkaListener(topics ="#{'${topics.xxx}'.split(',')}",concurrency ="#{'${concur.xxx}'}")publicvoidlistenXxx(ConsumerRecords<?,?> records,Acknowledgment ack){try{/// 消息处理/// Iterator<ConsumerRecord<?,?>> iterator = (Iterator)records.iterator();/// while(iterator.hasNext()){///     JSONObject json = JSON.parseObject((String)iterator.next().value());///     ....../// }/// 消息处理
            xxxKafkaConsumer.processRecords(records);}catch(Exception e){/// 上述语句抛出异常后,直接运行至切面,不会执行下述语句
            log.error("处理xxx信息异常:{}", e);}
        ack.acknowledge();}}

4、关于KafkaListener接口

在Spring Boot中,@KafkaListener 注解主要是依赖于 KafkaMessageListenerContainer 类。该类是Spring Kafka提供的一种消息监听器容器,它可以根据配置信息监听并消费Kafka消息。当我们在方法上添加@KafkaListener注解时,Spring Boot会自动创建 KafkaMessageListenerContainer 实例,并将消息路由到相应的处理方法。

public@interfaceKafkaListener{/// 监听器id(可用来命名消费者线程)Stringid()default"";/// 监听器工厂StringcontainerFactory()default"";/// 监听器主题String[]topics()default{};/// 监听器主题,匹配正则表达式StringtopicPattern()default"";/// 监听器主题&分区TopicPartition[]topicPartitions()default{};/// 异常处理器StringerrorHandler()default"";/// 消费组idStringgroupId()default"";/// 是否使用id作为groupIdbooleanidIsGroup()defaulttrue;}

4.1 containerFactory 监听器工厂

/// myKafkaListenerContainerFactory 代表了一个kafka集群@KafkaListener(
        containerFactory ="myKafkaListenerContainerFactory",
        topics ="#{'${spring.kafka.topics}'.split(',')}",
        groupId ="${spring.kafka.consumer.group}")@Bean(name ="myKafkaListenerContainerFactory")publicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>myKafkaListenerContainerFactory(){returninitKafkaListenerContainerFactory(ConfigManager.get("spring.kafka.consumer.brokers","127.0.0.1:9092"));}

4.2 监听器的topic

topic的配置方式有3种,分别是topics、topicPattern、topicPartitions;

(1)topics,可以指定多个topic

@KafkaListener( topics ={"topic1","topic2"},/// 或 topics = "#{'${spring.kafka.topics}'.split(',')}",
                groupId ="${spring.kafka.consumer.group_id}")

(2)topicPattern,支持正则表达式

@KafkaListener(topicPattern ="topic_*", concurrency ="6")publicvoidonMessage(@PayloadString data,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)ByteBuffer key,Acknowledgment ack,//手动提交offset@Header(KafkaHeaders.RECEIVED_PARTITION_ID)int partition,@Header(KafkaHeaders.OFFSET)long offSet,Consumer<?,?> consumer //消费者 )

(3)topicPartitions,可以为监听器配置主题和分区(及可选的初始偏移量)

// 监听topic1的0,1分区;监听topic2的0分区,1分区从offset为100的开始消费;@KafkaListener(id ="thing2", topicPartitions ={@TopicPartition(topic ="topic1", partitions ={"0","1"}),@TopicPartition(topic ="topic2", partitions ="0", partitionOffsets =@PartitionOffset(partition ="1", initialOffset ="100"))})publicvoidonMessage(ConsumerRecord<?,?> record){...}

4.3 errorHandler 异常处理器

errorHandler指定了错误处理器的beanName:

@KafkaListener(
        topics ="#{'${spring.kafka.topics}'.split(',')}",
        groupId ="${spring.kafka.consumer.group_id}",
        errorHandler ="errorHandler")

可以在consumer中手动try/catch,也可实现 KafkaListenerErrorHandler 复用异常处理逻辑;

@Component("errorHandler")publicclassMyKafkaListenerErrorHandlerimplementsKafkaListenerErrorHandler{@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException exception){/// handle error ......returnnull;}@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException exception,Consumer<?,?> consumer){/// handle error ......returnnull;}}

4.4 groupId 监听器的消费组

如果配置了属性groupId,则groupId优先级最高

@KafkaListener(id ="consumer-id1", idIsGroup =false, topics ="topic1", groupId ="consumer_group")
标签: kafka spring java

本文转载自: https://blog.csdn.net/Pluto_CSND/article/details/135404985
版权归原作者 Pluto_CSND 所有, 如有侵权,请联系我们删除。

“@KafkaListener的配置使用”的评论:

还没有评论