0


自定义kafka客户端消费topic

文章目录

自定义kafka客户端消费topic

结论

使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。

1 背景

后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic

2 spring集成2.1.8.RELEASE版本不支持autoStartup属性

使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没有找到可以直接配置属性autoStartup = "false"来手动启动topic,可能是版本低的原因,如果有可以支持的版本,也可以打在评论区,我去验证一下。

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.8.RELEASE</version></dependency>
@KafkaListener(topics ="<Kafka主题>", autoStartup ="false")publicvoidreceive(String message){// 处理接收到的消息 }

3 自定义kafka客户端消费topic

3.1 yml配置

spring:
  kafka:
      bootstrap-servers: 19.125.105.6:9092,19.125.105.7,19.125.105.8:9092
      consumer:
        group-id: data-dev
        enable-auto-commit: true
        auto-offset-reset: latest
        auto-commit-interval: 1000
      topic:
        costomTopic: costomData

3.2 KafkaConfig客户端配置

kafka其他配置项和原有的kafka客户端配置一样,只有额外增加了一个cutomConsumer让spring来管理,方便手动启动客户端来使用

importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.*;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaConfig{@Value("${spring.kafka.bootstrap-servers}")privateString bootstrapServers;@Value("${spring.kafka.consumer.group-id}")privateString groupId;@Value("${spring.kafka.consumer.enable-auto-commit}")privateboolean enableAutoCommit;@Value("${spring.kafka.consumer.auto-offset-reset}")privateString autoOffsetReset;//    @Value("${spring.kafka.listener.concurrency}")//    private Integer concurrency;@Value("${spring.kafka.consumer.auto-commit-interval}")privateInteger autoCommitInterval;@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());// concurrency
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);return factory;}privateProducerFactory<String,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}publicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}privateMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG,0);
        props.put(ProducerConfig.ACKS_CONFIG,"1");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return props;}privateMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return props;}@BeanpublicKafkaConsumercutomConsumer(){// 新建一个自定义启动消费者KafkaConsumer consumer =newKafkaConsumer<>(consumerConfigs());return consumer;}}

3.3 手动启动消费客户端

这里手动启动消费客户端只有在配置了costomTopic才开始启动,如果需要动态指定启停topic

@ComponentpublicclassCutomKafkaConsumer{// 使用cutomConsumer实例消费@AutowiredprivateKafkaConsumer cutomConsumer;@Value("${spring.kafka.topic.costomTopic:}")publicvoidsetCostomTopic(String costomTopic){// 手动启动消费类,防止下级模块默认不配置costomTopic导致启动报错if(StringUtils.isEmpty(costomTopic)){return;}// 使这个消费者订阅对应话题
        cutomConsumer.subscribe(Collections.singleton(costomTopic));// 单线程拉取消息ExecutorService consumerExecutor =Executors.newSingleThreadExecutor();
        consumerExecutor.submit(newRunnable(){@Overridepublicvoidrun(){while(true){ConsumerRecords<String,String> records = cutomConsumer.poll(3000);if(!records.iterator().hasNext()){continue;}try{// 捕获异常,防止顶级消费循环被异常中断
                        records.forEach(record ->operate(record));}catch(Exception e){
                        log.error("消费数据失败,失败原因: {}", e.getMessage(), e);}// 通过异步的方式提交位移
                    cutomConsumer.commitAsync(((offsets, exception)->{if(exception ==null){
                            offsets.forEach((topicPartition, metadata)->{System.out.println(topicPartition +" -> offset="+ metadata.offset());});}else{
                            exception.printStackTrace();// 如果出错了,同步提交位移
                            cutomConsumer.commitSync(offsets);}}));}}});}}publicvoidoperate(ConsumerRecord<String,String> record){
    log.info("kafkaTwoContainerFactory.operate start. key: {}, value : {}", record.key(), record.value());}

参考:
Kafka消费者——API开发
Kafka Consumer如何实现精确一次消费数据
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
@KafkaListener 详解及消息消费启停控制
kafka多个消费者消费一个topic_kafka消费者组与重平衡机制,了解一下
kafka学习(五):消费者分区策略(再平衡机制)
Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析

标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/Master_Shifu_/article/details/134942167
版权归原作者 Master_Shifu_ 所有, 如有侵权,请联系我们删除。

“自定义kafka客户端消费topic”的评论:

还没有评论