0


聊聊在springboot项目中如何配置多个kafka消费者

前言

不知道大家有没有遇到这样的场景,就是一个项目中要消费多个kafka消息,不同的消费者消费指定kafka消息。遇到这种场景,我们可以通过kafka的提供的api进行配置即可。但很多时候我们会使用spring-kafka来简化开发,可是spring-kafka原生的配置项并没提供多个kafka配置,因此本文就来聊聊如何将spring-kafka进行改造,使之能支持多个kafka配置

正文

1、通过 @ConfigurationProperties指定KafkaProperties前缀

@Primary@ConfigurationProperties(prefix ="lybgeek.kafka.one")@BeanpublicKafkaPropertiesoneKafkaProperties(){returnnewKafkaProperties();}

如果有多个就配置多个,形如

@ConfigurationProperties(prefix ="lybgeek.kafka.two")@BeanpublicKafkaPropertiestwoKafkaProperties(){returnnewKafkaProperties();}@ConfigurationProperties(prefix ="lybgeek.kafka.three")@BeanpublicKafkaPropertiesthreeKafkaProperties(){returnnewKafkaProperties();}

2、配置消费者工厂,消费者工厂绑定对应的KafkaProperties

@BeanpublicConsumerFactorytwoConsumerFactory(@Autowired@Qualifier("twoKafkaProperties")KafkaProperties twoKafkaProperties){returnnewDefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}

3、配置消费者监听器工厂,并绑定指定消费者工厂以及消费者配置

@Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_TWO)publicKafkaListenerContainerFactorytwoKafkaListenerContainerFactory(@Autowired@Qualifier("twoKafkaProperties")KafkaProperties twoKafkaProperties,@Autowired@Qualifier("twoConsumerFactory")ConsumerFactory twoConsumerFactory){ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(twoConsumerFactory);
        factory.setConcurrency(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getConcurrency())?Runtime.getRuntime().availableProcessors(): twoKafkaProperties.getListener().getConcurrency());
        factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getAckMode())?ContainerProperties.AckMode.MANUAL:twoKafkaProperties.getListener().getAckMode());return factory;}

完整的配置示例如下

@Configuration@EnableConfigurationProperties(MultiKafkaComsumeProperties.class)publicclassOneKafkaComsumeAutoConfiguration{@Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE)publicKafkaListenerContainerFactoryoneKafkaListenerContainerFactory(@Autowired@Qualifier("oneKafkaProperties")KafkaProperties oneKafkaProperties,@Autowired@Qualifier("oneConsumerFactory")ConsumerFactory oneConsumerFactory){ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(oneConsumerFactory);
        factory.setConcurrency(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getConcurrency())?Runtime.getRuntime().availableProcessors(): oneKafkaProperties.getListener().getConcurrency());
        factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getAckMode())?ContainerProperties.AckMode.MANUAL:oneKafkaProperties.getListener().getAckMode());return factory;}@Primary@BeanpublicConsumerFactoryoneConsumerFactory(@Autowired@Qualifier("oneKafkaProperties")KafkaProperties oneKafkaProperties){returnnewDefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@Primary@ConfigurationProperties(prefix ="lybgeek.kafka.one")@BeanpublicKafkaPropertiesoneKafkaProperties(){returnnewKafkaProperties();}}

那个 @Primary要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的自动装配不懂要选哪个而报错

@Configuration@ConditionalOnClass(KafkaTemplate.class)@EnableConfigurationProperties(KafkaProperties.class)@Import({KafkaAnnotationDrivenConfiguration.class,KafkaStreamsAnnotationDrivenConfiguration.class})publicclassKafkaAutoConfiguration{privatefinalKafkaProperties properties;privatefinalRecordMessageConverter messageConverter;publicKafkaAutoConfiguration(KafkaProperties properties,ObjectProvider<RecordMessageConverter> messageConverter){this.properties = properties;this.messageConverter = messageConverter.getIfUnique();}@Bean@ConditionalOnMissingBean(KafkaTemplate.class)publicKafkaTemplate<?,?>kafkaTemplate(ProducerFactory<Object,Object> kafkaProducerFactory,ProducerListener<Object,Object> kafkaProducerListener){KafkaTemplate<Object,Object> kafkaTemplate =newKafkaTemplate<>(kafkaProducerFactory);if(this.messageConverter !=null){
            kafkaTemplate.setMessageConverter(this.messageConverter);}
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean@ConditionalOnMissingBean(ProducerListener.class)publicProducerListener<Object,Object>kafkaProducerListener(){returnnewLoggingProducerListener<>();}@Bean@ConditionalOnMissingBean(ConsumerFactory.class)publicConsumerFactory<?,?>kafkaConsumerFactory(){returnnewDefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());}@Bean@ConditionalOnMissingBean(ProducerFactory.class)publicProducerFactory<?,?>kafkaProducerFactory(){DefaultKafkaProducerFactory<?,?> factory =newDefaultKafkaProducerFactory<>(this.properties.buildProducerProperties());String transactionIdPrefix =this.properties.getProducer().getTransactionIdPrefix();if(transactionIdPrefix !=null){
            factory.setTransactionIdPrefix(transactionIdPrefix);}return factory;}@Bean@ConditionalOnProperty(name ="spring.kafka.producer.transaction-id-prefix")@ConditionalOnMissingBeanpublicKafkaTransactionManager<?,?>kafkaTransactionManager(ProducerFactory<?,?> producerFactory){returnnewKafkaTransactionManager<>(producerFactory);}@Bean@ConditionalOnProperty(name ="spring.kafka.jaas.enabled")@ConditionalOnMissingBeanpublicKafkaJaasLoginModuleInitializerkafkaJaasInitializer()throwsIOException{KafkaJaasLoginModuleInitializer jaas =newKafkaJaasLoginModuleInitializer();Jaas jaasProperties =this.properties.getJaas();if(jaasProperties.getControlFlag()!=null){
            jaas.setControlFlag(jaasProperties.getControlFlag());}if(jaasProperties.getLoginModule()!=null){
            jaas.setLoginModule(jaasProperties.getLoginModule());}
        jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean@ConditionalOnMissingBeanpublicKafkaAdminkafkaAdmin(){KafkaAdmin kafkaAdmin =newKafkaAdmin(this.properties.buildAdminProperties());
        kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}}

同项目使用多个kafka消费者示例

1、在项目的pom引入spring-kafka GAV

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2、在项目的yml中配置如下内容

lybgeek:
    kafka:
        multi:
            comsume-enabled:false
        one:
            producer:
                # kafka生产者服务端地址
                bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:10.1.4.71:32643}
                # 生产者重试的次数
                retries: ${KAFKA_PRODUCER_RETRIES:0}
                # 每次批量发送的数据量
                batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
                # 每次批量发送消息的缓冲区大小
                buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
                # 指定消息key和消息体的编码方式
                key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
                value-serializer:  ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
                # acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
                acks: ${KAFKA_PRODUCER_ACK:1}

            consumer:
                bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:10.1.4.71:32643}
                # 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
                auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
                #  是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
                enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
                # 指定消息key和消息体的解码方式
                key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
                value-deserializer:  ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
            listener:
                # 在侦听器容器中运行的线程数。
                concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
                missing-topics-fatal:false
                ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
                
    two:
        producer:
            # kafka生产者服务端地址
            bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:192.168.1.3:9202}
            # 生产者重试的次数
            retries: ${KAFKA_PRODUCER_RETRIES:0}
            # 每次批量发送的数据量
            batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
            # 每次批量发送消息的缓冲区大小
            buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
            # 指定消息key和消息体的编码方式
            key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
            value-serializer:  ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
            # acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
            acks: ${KAFKA_PRODUCER_ACK:1}

            consumer:
                bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:192.168.1.3:9202}
                # 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
                auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
                #  是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
                enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
                # 指定消息key和消息体的解码方式
                key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
                value-deserializer:  ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
            listener:
                # 在侦听器容器中运行的线程数。
                concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
                missing-topics-fatal:false
                ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}

3、配置生产者

privateKafkaTemplate kafkaTemplate;@OverridepublicMqRespsendSync(MqReq mqReq){ListenableFuture<SendResult<String,String>> result =this.send(mqReq);MqResp mqResp =this.buildMqResp(result);return mqResp;}

这个KafkaTemplate绑定的就是@Primary配置的kafkaProperties

4、配置消费者监听,并绑定containerFactory

@LybGeekKafkaListener(id ="createUser",containerFactory =MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE,topics =Constant.USER_TOPIC)publicclassUserComsumerextendsBaseComusmeListener{@AutowiredprivateUserService userService;@OverridepublicbooleanisRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad){User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);System.out.println("-----------------------");return userService.isExistUserByUsername(user.getUsername());}@OverridepublicbooleandoBiz(KafkaComsumePayLoad kafkaComsumerPayLoad){User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);System.out.println(user);return userService.save(user);}}

通过指定containerFactory ,来消费指定的kafka消息

5、测试

User user =User.builder().username("test").email("[email protected]").fullname("test").mobile("1350000001").password("1234561").build();
      userService.saveAndPush(user);

发送消息,观察控制台输出

: messageKey:【null】,topic:【user-sync】存在重复消息数据-->【{"email":"[email protected]","fullname":"test","mobile":"1350000000","password":"123456","username":"test"}】

会出现这样,是因为数据库已经有这条记录了,刚好验证一下重复消费

总结

本文实现的核心其实就是通过注入多个kafkaProperties来实现多配置 ,不知道大家有没有发现,就是改造后的配置,配置消费者后,生产者仍然也要配置。因为如果不配置,走的就是kafkaProperties默认的配置信息,即为localhost。还有细心的朋友也许会发现我示例中的消费者监听使用的注解是@LybGeekKafkaListener,这个和 @KafkaListener实现的功能基本一致。因为本示例和之前的文章聊聊如何实现一个带幂等模板的kafka消费者监听是同份代码,就直接复用了

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/kingwinstar/article/details/125274544
版权归原作者 linyb极客之路 所有, 如有侵权,请联系我们删除。

“聊聊在springboot项目中如何配置多个kafka消费者”的评论:

还没有评论