0


Kafka多生产者消费者自动配置

背景

项目中不同的业务可能会使用多个

kafka

,按默认的Kafka配置,最多是支持消费者和生产者使用不同的Kafka,如果两个生产者使用不同的Kafka则需要自定义配置,生成对应的bean。

解决方案

多生产者,多消费者,使用不同的前缀来区分,根据前缀来区分配置,加载配置,实例化对应前缀的

KafkaProperties
kafkaListenerContainerFactory
KafkaTemplate

,每个bean的名称都是带前缀的,使用的时候,按照需要注入对应的bean。

YML配置

spring:kafka:product:bootstrap-servers: 55.1.40.231:9091,55.6.70.231:9091,55.5.70.231:9091properties:sasl:mechanism: PLAIN
          jaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="xxxx";
        security:protocol: SASL_PLAINTEXT
      producer:retries:0acks:-1batch-size:16384linger-ms:0buffer-memory:33554432consumer:group-id: consumer-group-id
        enable-auto-commit:trueauto-commit-interval-ms:1000auto-offset-reset: latest
        session-timeout-ms:120000request-timeout-ms:180000order:bootstrap-servers: 55.10.33.132:9091,55.10.33.132:9092,55.10.33.132:9093,55.10.33.132:9094,55.10.33.132:9095,55.10.33.132:9096,55.10.33.132:9097,55.10.33.132:9098,55.10.33.132:9099,55.10.33.132:9100properties:sasl:mechanism: PLAIN
          jaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user_order" password="xxxxxxx";
        security:protocol: SASL_PLAINTEXT
      producer:retries:3acks:-1batch-size:16384linger-ms:0buffer-memory:33554432consumer:group-id: order-migration
        enable-auto-commit:trueauto-commit-interval-ms:1000auto-offset-reset: latest
        session-timeout-ms:120000request-timeout-ms:180000

自定义KafkaProperties

使用

KafkaProperties

接收配置,但是需要修改下前缀,但是

KafkaProperties

源码改不了,新写一个类继承

KafkaProperties
@Component@Primary@ConfigurationProperties(prefix ="spring.kafka.order")publicclassOrderKafkaPropertiesextendsKafkaProperties{}

如果没有Kafka默认配置,Kafka会自动实例化默认的

KafkaProperties

,如果有多个

KafkaProperties

实例,需要指定一个首选的bean,否则

KafkaAnnotationDrivenConfiguration

类中构造函数会报错。

所以在其中一个加上

@Primary

注解

KafkaTemplate和KafkaListenerContainerFactory配置

有了

KafkaProperties

就可以生成

KafkaTemplate

KafkaListenerContainerFactory

实例

@ConfigurationpublicclassKafkaConfig{@AutowiredprivateOrderKafkaProperties orderKafkaProperties;@Bean("orderKafkaTemplate")publicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}privateProducerFactory<String,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}privateMap<String,Object>producerConfigs(){return contractKafkaProperties.buildProducerProperties();}@Bean("orderKafkaListenerContainerFactory")publicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(3000);return factory;}privateConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}privateMap<String,Object>consumerConfigs(){return contractKafkaProperties.buildConsumerProperties();}}

这样就可以在其他地方直接使用了,生产者就直接

@Autowired
orderKafkaTemplate

,如果是消费者,直接在

@KafkaListener

containerFactory

属性指定

orderKafkaListenerContainerFactory

如果有多个生产者消费者,就增加对应的配置即可。这样简化了配置的读取,除了加了前缀,其他的配置都是和Kafka默认配置一样的,复用Springboot的属性绑定,后续如果有其他配置,加上后能直接生效,无需修改代码。如果修改配置的结构需要代码中读取,然后手动设置,后期修改YML配置和代码都需要修改,比较麻烦。

方案演进

上述方案,如果需要新增一个Kafka的配置,需要新增一个前缀,然后新增对应配置代码,来生成

KafkaProperties

KafkaTemplate

KafkaListenerContainerFactory

实例,但是不同的前缀生成不同的实例代码都是重复的,而且所有的前缀、属性值都由YML配置可以得到,所以代码中生成带前缀的bean可以由代码自动生成,并注册到spring容器中。根据这个思路,写一个

BeanFactoryAware

的实现类。(Aware接口是框架提供给用户用户获取框架中一些对象的接口,比如

BeanFactoryAware

就是获取

BeanFactory

,框架会调用重写的

setBeanFactory

方法,将

BeanFactory

传给我们的实现类)

@Component@Slf4jpublicclassEmallBeanFactoryAwareimplementsBeanFactoryAware{@AutowiredprivateEnvironment environment;privatestaticfinalStringSPRING_KAFKA_PREFIX="spring.kafka";@OverridepublicvoidsetBeanFactory(BeanFactory beanFactory)throwsBeansException{if(beanFactory instanceofDefaultListableBeanFactory){DefaultListableBeanFactory defaultListableBeanFactory =(DefaultListableBeanFactory) beanFactory;Binder binder =Binder.get(environment);//将YML中属性值映射到MAP中,后面根据配置前缀生成bean并注册到容器中,TODO 绑定可能有异常,加try catch稳一点BindResult<Map> bindResultWithPrefix = binder.bind(SPRING_KAFKA_PREFIX,Bindable.of(Map.class));if(!bindResultWithPrefix.isBound()){return;}Map map = bindResultWithPrefix.get();Set set = map.keySet();Set<String> kafkaPropertyFiledNames =getKafkaPropertyFiledNames();//如果配置多个primary, 只设置第一个,TODO项目启动过程中,这个变量是否有并发问题boolean hasSetPrimary =false;//实例化每个带前缀的KafkaProperties、KafkaTemplate、for(Object object : set){String prefix = object.toString();if(kafkaPropertyFiledNames.contains(prefix)){//不带前缀的正常配置忽略continue;}String configPrefix =SPRING_KAFKA_PREFIX+"."+ prefix;BindResult<KafkaProperties> kafkaPropertiesBindResult;try{
                    kafkaPropertiesBindResult = binder.bind(configPrefix,Bindable.of(KafkaProperties.class));if(!kafkaPropertiesBindResult.isBound()){continue;}}catch(Exception e){//一些配置不是在KafkaProperties属性,但是也不是前缀配置,在这一步会绑定失败,比如spring.kafka.topics配置,//一些配置的名称是带-,KafkaProperties属性是驼峰,绑定是会出异常的,异常忽略
                    log.error("auto register kafka properties error, prefix is: {}", configPrefix);continue;}//注册生产者(TODO 没配置生产者是否会报错)KafkaProperties kafkaProperties = kafkaPropertiesBindResult.get();String propertiesBeanName = prefix +"KafkaProperties";boolean isBeanExist = defaultListableBeanFactory.containsBean(propertiesBeanName);if(!isBeanExist){String primaryConfig = configPrefix +".primary";//没有默认的kafka配置,需要设置下primaryBindResult<Boolean> primaryBindResult = binder.bind(primaryConfig,Bindable.of(Boolean.class));if(primaryBindResult.isBound()&& primaryBindResult.get()&&!hasSetPrimary){BeanDefinitionBuilder beanDefinitionBuilder =BeanDefinitionBuilder.genericBeanDefinition(KafkaProperties.class);
                        defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());
                        defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                        defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);
                        hasSetPrimary =true;}else{
                        defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);}}//注册生产者KafkaTemplateString templateBeanName = prefix +"KafkaTemplate";if(!defaultListableBeanFactory.containsBean(templateBeanName)){KafkaTemplate kafkaTemplate =newKafkaTemplate<String,String>(newDefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()));
                    defaultListableBeanFactory.registerSingleton(templateBeanName, kafkaTemplate);}String beanName = prefix +"KafkaListenerContainerFactory";if(!defaultListableBeanFactory.containsBean(beanName)){//注册消费者listener(TODO 没配置消费者是否会报错)ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
                    factory.setConsumerFactory(newDefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
                    factory.setConcurrency(10);
                    factory.getContainerProperties().setPollTimeout(3000);
                    defaultListableBeanFactory.registerSingleton(beanName, factory);}}}}privatestaticSet<String> getKafkaPropertyFiledNames (){Set<String> names =newHashSet<>();Field[] declaredFields =KafkaProperties.class.getDeclaredFields();if(declaredFields.length ==0){return names;}for(Field declaredField : declaredFields){
            names.add(declaredField.getName());}return names;}}

遇到的问题

手动注册的bean代码中@Autowire无法注入

手动注册的无法@Autowire,直接加@Lazy注解,先忽略bean注册的先后顺序

多个KafkaProperties实例,无法确定使用哪一个

因为使用前缀的配置方式,bean名称也是带前缀的,没有默认的Kafka配置,框架会自动生成对应的bean,

KafkaAnnotationDrivenConfiguration

中的

KafkaProperties

属性是根据类型注入的,如果配置有多个前缀,注入的时候无法确定使用哪一个,所以增加一个primary配置,自动生成的时候设置下。

既有带前缀,又有不带前缀使用默认配置的

自动配置代码中有一段是根据yml中配置的key,判断是否是

KafkaProperties

类中的字段,如果是就忽略,让框架自动按默认配置,有些字段yml中是带-,如

bootstrap-servers

KafkaProperties

中是驼峰命名bootstrapServers,绑定的时候会抛异常,影响应用启动,这种异常可以忽略,直接用try catch捕获。

设置Bean为Primary

第二个问题中,多个相同类型的Bean如何设置其中一个bean为Primary,手动注册bean,如果有实例对象,可以直接使用

BeanFactory

registerSingleton(beanName, object)

,如果没有实例对象,可以直接使用类名,通过

BeanFactory

registerBeanDefinition(beanName, beanDefinition)

来注册,如果要设置bean为Primary,必须通过

BeanDefinition

来设置,但是通过框架的绑定是直接生成实例对象的,如果通过

registerSingleton

来注册,通过beanName获取

BeanDefinition

是会抛异常的,因为没有

BeanDefinition

,所以需要将对象实例和

BeanDefinition

关联起来,就是上面这段代码

//注册BeanDefinition
defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());//注册对象实例,使用相同的bean名称
defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);//再获取BeanDefinition就能获取到,而且这个bean就是上面注册的实例对象
defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);

本文转载自: https://blog.csdn.net/AE86JayChou/article/details/128043113
版权归原作者 AE86Jag 所有, 如有侵权,请联系我们删除。

“Kafka多生产者消费者自动配置”的评论:

还没有评论