0


springboot 实现kafka多源配置

文章目录

背景

实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。

核心配置

自动化配置类

  1. importcom.example.kafka.autoconfig.CustomKafkaDataSourceRegister;importcom.example.kafka.autoconfig.kafkaConsumerConfig;importorg.springframework.beans.BeansException;importorg.springframework.beans.factory.BeanFactory;importorg.springframework.beans.factory.BeanFactoryAware;importorg.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;importorg.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;importorg.springframework.boot.context.properties.EnableConfigurationProperties;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Import;importorg.springframework.kafka.annotation.EnableKafka;@EnableKafka@Configuration(
  2. proxyBeanMethods =false)@ConditionalOnWebApplication@EnableConfigurationProperties({kafkaConsumerConfig.class})@Import({CustomKafkaDataSourceRegister.class})publicclassMyKafkaAutoConfigurationimplementsBeanFactoryAware,SmartInstantiationAwareBeanPostProcessor{publicMyKafkaAutoConfiguration(){}publicvoidsetBeanFactory(BeanFactory beanFactory)throwsBeansException{
  3. beanFactory.getBean(CustomKafkaDataSourceRegister.class);}}

注册生产者、消费者核心bean到spring

  1. public void afterPropertiesSet() {
  2. Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories();
  3. if (factories != null && !factories.isEmpty()) {
  4. factories.forEach((factoryName, consumerConfig) -> {
  5. KafkaProperties.Listener listener = consumerConfig.getListener();
  6. Integer concurrency = consumerConfig.getConcurrency();
  7. // 创建监听容器工厂
  8. ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);
  9. // 注册到容器
  10. if (!beanFactory.containsBean(factoryName)) {
  11. beanFactory.registerSingleton(factoryName, containerFactory);
  12. }
  13. });
  14. }
  15. Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates();
  16. if (!ObjectUtils.isEmpty(templates)) {
  17. templates.forEach((templateName, producerConfig) -> {
  18. //registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);
  19. //注册spring bean的两种方式
  20. registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));
  21. });
  22. }
  23. }

配置spring.factories

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2. com.example.kafka.MyKafkaAutoConfiguration

yml配置

  1. spring:
  2. kafka:
  3. multiple:
  4. consumer:
  5. factories:
  6. test-factory:
  7. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  8. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9. bootstrap-servers: 192.168.56.112:9092
  10. group-id: group_a
  11. concurrency: 25
  12. fetch-min-size: 1048576
  13. fetch-max-wait: 3000
  14. listener:
  15. type: batch
  16. properties:
  17. spring-json-trusted-packages: '*'
  18. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  19. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. auto-offset-reset: latest
  21. producer:
  22. templates:
  23. test-template:
  24. bootstrap-servers: 192.168.56.112:9092
  25. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  26. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  27. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  28. value-serializer: org.apache.kafka.common.serialization.StringSerializer

使用

在这里插入图片描述

在这里插入图片描述

源码仓库

https://github.com/fafeidou/shield

标签: spring boot kafka linq

本文转载自: https://blog.csdn.net/qq_37362891/article/details/139391610
版权归原作者 it噩梦 所有, 如有侵权,请联系我们删除。

“springboot 实现kafka多源配置”的评论:

还没有评论