前言
不知道大家有没有遇到这样的场景,就是一个项目中要消费多个kafka消息,不同的消费者消费指定kafka消息。遇到这种场景,我们可以通过kafka的提供的api进行配置即可。但很多时候我们会使用spring-kafka来简化开发,可是spring-kafka原生的配置项并没提供多个kafka配置,因此本文就来聊聊如何将spring-kafka进行改造,使之能支持多个kafka配置
正文
1、通过 @ConfigurationProperties指定KafkaProperties前缀
@Primary@ConfigurationProperties(prefix ="lybgeek.kafka.one")@BeanpublicKafkaPropertiesoneKafkaProperties(){returnnewKafkaProperties();}
如果有多个就配置多个,形如
@ConfigurationProperties(prefix ="lybgeek.kafka.two")@BeanpublicKafkaPropertiestwoKafkaProperties(){returnnewKafkaProperties();}@ConfigurationProperties(prefix ="lybgeek.kafka.three")@BeanpublicKafkaPropertiesthreeKafkaProperties(){returnnewKafkaProperties();}
2、配置消费者工厂,消费者工厂绑定对应的KafkaProperties
@BeanpublicConsumerFactorytwoConsumerFactory(@Autowired@Qualifier("twoKafkaProperties")KafkaProperties twoKafkaProperties){returnnewDefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}
3、配置消费者监听器工厂,并绑定指定消费者工厂以及消费者配置
@Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_TWO)publicKafkaListenerContainerFactorytwoKafkaListenerContainerFactory(@Autowired@Qualifier("twoKafkaProperties")KafkaProperties twoKafkaProperties,@Autowired@Qualifier("twoConsumerFactory")ConsumerFactory twoConsumerFactory){ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(twoConsumerFactory);
factory.setConcurrency(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getConcurrency())?Runtime.getRuntime().availableProcessors(): twoKafkaProperties.getListener().getConcurrency());
factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getAckMode())?ContainerProperties.AckMode.MANUAL:twoKafkaProperties.getListener().getAckMode());return factory;}
完整的配置示例如下
@Configuration@EnableConfigurationProperties(MultiKafkaComsumeProperties.class)publicclassOneKafkaComsumeAutoConfiguration{@Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE)publicKafkaListenerContainerFactoryoneKafkaListenerContainerFactory(@Autowired@Qualifier("oneKafkaProperties")KafkaProperties oneKafkaProperties,@Autowired@Qualifier("oneConsumerFactory")ConsumerFactory oneConsumerFactory){ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(oneConsumerFactory);
factory.setConcurrency(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getConcurrency())?Runtime.getRuntime().availableProcessors(): oneKafkaProperties.getListener().getConcurrency());
factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getAckMode())?ContainerProperties.AckMode.MANUAL:oneKafkaProperties.getListener().getAckMode());return factory;}@Primary@BeanpublicConsumerFactoryoneConsumerFactory(@Autowired@Qualifier("oneKafkaProperties")KafkaProperties oneKafkaProperties){returnnewDefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@Primary@ConfigurationProperties(prefix ="lybgeek.kafka.one")@BeanpublicKafkaPropertiesoneKafkaProperties(){returnnewKafkaProperties();}}
那个 @Primary要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的自动装配不懂要选哪个而报错
@Configuration@ConditionalOnClass(KafkaTemplate.class)@EnableConfigurationProperties(KafkaProperties.class)@Import({KafkaAnnotationDrivenConfiguration.class,KafkaStreamsAnnotationDrivenConfiguration.class})publicclassKafkaAutoConfiguration{privatefinalKafkaProperties properties;privatefinalRecordMessageConverter messageConverter;publicKafkaAutoConfiguration(KafkaProperties properties,ObjectProvider<RecordMessageConverter> messageConverter){this.properties = properties;this.messageConverter = messageConverter.getIfUnique();}@Bean@ConditionalOnMissingBean(KafkaTemplate.class)publicKafkaTemplate<?,?>kafkaTemplate(ProducerFactory<Object,Object> kafkaProducerFactory,ProducerListener<Object,Object> kafkaProducerListener){KafkaTemplate<Object,Object> kafkaTemplate =newKafkaTemplate<>(kafkaProducerFactory);if(this.messageConverter !=null){
kafkaTemplate.setMessageConverter(this.messageConverter);}
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean@ConditionalOnMissingBean(ProducerListener.class)publicProducerListener<Object,Object>kafkaProducerListener(){returnnewLoggingProducerListener<>();}@Bean@ConditionalOnMissingBean(ConsumerFactory.class)publicConsumerFactory<?,?>kafkaConsumerFactory(){returnnewDefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());}@Bean@ConditionalOnMissingBean(ProducerFactory.class)publicProducerFactory<?,?>kafkaProducerFactory(){DefaultKafkaProducerFactory<?,?> factory =newDefaultKafkaProducerFactory<>(this.properties.buildProducerProperties());String transactionIdPrefix =this.properties.getProducer().getTransactionIdPrefix();if(transactionIdPrefix !=null){
factory.setTransactionIdPrefix(transactionIdPrefix);}return factory;}@Bean@ConditionalOnProperty(name ="spring.kafka.producer.transaction-id-prefix")@ConditionalOnMissingBeanpublicKafkaTransactionManager<?,?>kafkaTransactionManager(ProducerFactory<?,?> producerFactory){returnnewKafkaTransactionManager<>(producerFactory);}@Bean@ConditionalOnProperty(name ="spring.kafka.jaas.enabled")@ConditionalOnMissingBeanpublicKafkaJaasLoginModuleInitializerkafkaJaasInitializer()throwsIOException{KafkaJaasLoginModuleInitializer jaas =newKafkaJaasLoginModuleInitializer();Jaas jaasProperties =this.properties.getJaas();if(jaasProperties.getControlFlag()!=null){
jaas.setControlFlag(jaasProperties.getControlFlag());}if(jaasProperties.getLoginModule()!=null){
jaas.setLoginModule(jaasProperties.getLoginModule());}
jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean@ConditionalOnMissingBeanpublicKafkaAdminkafkaAdmin(){KafkaAdmin kafkaAdmin =newKafkaAdmin(this.properties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}}
同项目使用多个kafka消费者示例
1、在项目的pom引入spring-kafka GAV
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2、在项目的yml中配置如下内容
lybgeek:
kafka:
multi:
comsume-enabled:false
one:
producer:
# kafka生产者服务端地址
bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:10.1.4.71:32643}
# 生产者重试的次数
retries: ${KAFKA_PRODUCER_RETRIES:0}
# 每次批量发送的数据量
batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
# 每次批量发送消息的缓冲区大小
buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
# 指定消息key和消息体的编码方式
key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
value-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
# acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
acks: ${KAFKA_PRODUCER_ACK:1}
consumer:
bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:10.1.4.71:32643}
# 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
# 指定消息key和消息体的解码方式
key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
value-deserializer: ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
listener:
# 在侦听器容器中运行的线程数。
concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
missing-topics-fatal:false
ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
two:
producer:
# kafka生产者服务端地址
bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:192.168.1.3:9202}
# 生产者重试的次数
retries: ${KAFKA_PRODUCER_RETRIES:0}
# 每次批量发送的数据量
batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
# 每次批量发送消息的缓冲区大小
buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
# 指定消息key和消息体的编码方式
key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
value-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
# acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
acks: ${KAFKA_PRODUCER_ACK:1}
consumer:
bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:192.168.1.3:9202}
# 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
# 指定消息key和消息体的解码方式
key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
value-deserializer: ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
listener:
# 在侦听器容器中运行的线程数。
concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
missing-topics-fatal:false
ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
3、配置生产者
privateKafkaTemplate kafkaTemplate;@OverridepublicMqRespsendSync(MqReq mqReq){ListenableFuture<SendResult<String,String>> result =this.send(mqReq);MqResp mqResp =this.buildMqResp(result);return mqResp;}
这个KafkaTemplate绑定的就是@Primary配置的kafkaProperties
4、配置消费者监听,并绑定containerFactory
@LybGeekKafkaListener(id ="createUser",containerFactory =MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE,topics =Constant.USER_TOPIC)publicclassUserComsumerextendsBaseComusmeListener{@AutowiredprivateUserService userService;@OverridepublicbooleanisRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad){User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);System.out.println("-----------------------");return userService.isExistUserByUsername(user.getUsername());}@OverridepublicbooleandoBiz(KafkaComsumePayLoad kafkaComsumerPayLoad){User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);System.out.println(user);return userService.save(user);}}
通过指定containerFactory ,来消费指定的kafka消息
5、测试
User user =User.builder().username("test").email("[email protected]").fullname("test").mobile("1350000001").password("1234561").build();
userService.saveAndPush(user);
发送消息,观察控制台输出
: messageKey:【null】,topic:【user-sync】存在重复消息数据-->【{"email":"[email protected]","fullname":"test","mobile":"1350000000","password":"123456","username":"test"}】
会出现这样,是因为数据库已经有这条记录了,刚好验证一下重复消费
总结
本文实现的核心其实就是通过注入多个kafkaProperties来实现多配置 ,不知道大家有没有发现,就是改造后的配置,配置消费者后,生产者仍然也要配置。因为如果不配置,走的就是kafkaProperties默认的配置信息,即为localhost。还有细心的朋友也许会发现我示例中的消费者监听使用的注解是@LybGeekKafkaListener,这个和 @KafkaListener实现的功能基本一致。因为本示例和之前的文章聊聊如何实现一个带幂等模板的kafka消费者监听是同份代码,就直接复用了
demo链接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template
版权归原作者 linyb极客之路 所有, 如有侵权,请联系我们删除。