第一步:引入maven依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
第二步:新增配置文件
以下为大致结构,供参考
spring:kafka:# 第一个kafka的配置first:bootstrap-servers: xxx.xxx.xxx.xxx:xxxx
producer:retries: x
acks:-1consumer:enable-auto-commit:falsegroup-id: first-consumer
listener:ack-mode: xx
# 第二个kafka的配置second:bootstrap-servers: xxx.xxx.xxx.xxx:xxxx
producer:batch-size: xxxx
buffer-memory: xxxxxx
consumer:auto-offset-reset: earliest
group-id: second-consumer
listener:concurrency: xx
第三步:新增配置类
第一个kafka的配置类
/**
* 第一个kafka配置
*
* @author 小流慢影
* @date 2023年5月15日
*/@ConfigurationpublicclassFirstKafkaConfig{/**
* 读取第一个kafka配置
* Primary注解表示默认以这个为准
*
* @return 第一个kafka配置
*/@Primary@ConfigurationProperties(prefix ="spring.kafka.first")@BeanpublicKafkaPropertiesfirstKafkaProperties(){returnnewKafkaProperties();}/**
* 构建第一个kafka的生产者发送template
*
* @param firstKafkaProperties 第一个kafka配置
* @return 第一个kafka的生产者发送template
*/@Primary@BeanpublicKafkaTemplate<String,String>firstKafkaTemplate(@Autowired@Qualifier("firstKafkaProperties")KafkaProperties firstKafkaProperties){returnnewKafkaTemplate<>(firstProducerFactory(firstKafkaProperties));}/**
* 构建第一个kafka的消费者监听容器工厂
*
* @param firstKafkaProperties 第一个kafka配置
* @return 第一个kafka的消费者监听容器工厂
*/@BeanpublicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>firstKafkaListenerContainerFactory(@Autowired@Qualifier("firstKafkaProperties")KafkaProperties firstKafkaProperties){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(firstConsumerFactory(firstKafkaProperties));return factory;}/**
* 新建第一个kafka的消费者工厂
*
* @param firstKafkaProperties 第一个kafka配置
* @return 第一个kafka的消费者工厂
*/privateConsumerFactory<?superInteger,?superString>firstConsumerFactory(KafkaProperties firstKafkaProperties){returnnewDefaultKafkaConsumerFactory<>(firstKafkaProperties.buildConsumerProperties());}/**
* 新建第一个kafka的生产者工厂
*
* @param firstKafkaProperties 第一个kafka配置
* @return 第一个kafka的生产者工厂
*/privateDefaultKafkaProducerFactory<String,String>firstProducerFactory(KafkaProperties firstKafkaProperties){returnnewDefaultKafkaProducerFactory<>(firstKafkaProperties.buildProducerProperties());}}
第二个kafka的配置类
/**
* 第二个kafka配置
*
* @author 小流慢影
* @date 2023年5月15日
*/@ConfigurationpublicclassSecondKafkaConfig{/**
* 读取第二个kafka配置
*
* @return 第二个kafka配置
*/@ConfigurationProperties(prefix ="spring.kafka.second")@BeanpublicKafkaPropertiessecondKafkaProperties(){returnnewKafkaProperties();}/**
* 构建第二个kafka的生产者发送template
*
* @param secondKafkaProperties 第二个kafka配置
* @return 第二个kafka的生产者发送template
*/@BeanpublicKafkaTemplate<String,String>secondKafkaTemplate(@Autowired@Qualifier("secondKafkaProperties")KafkaProperties secondKafkaProperties){returnnewKafkaTemplate<>(secondProducerFactory(secondKafkaProperties));}/**
* 构建第二个kafka的消费者监听容器工厂
*
* @param secondKafkaProperties 第二个kafka配置
* @return 第二个kafka的消费者监听容器工厂
*/@BeanpublicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>secondKafkaListenerContainerFactory(@Autowired@Qualifier("secondKafkaProperties")KafkaProperties secondKafkaProperties){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties));return factory;}/**
* 新建第二个kafka的消费者工厂
*
* @param secondKafkaProperties 第二个kafka配置
* @return 第二个kafka的消费者工厂
*/privateConsumerFactory<?superInteger,?superString>secondConsumerFactory(KafkaProperties secondKafkaProperties){returnnewDefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties());}/**
* 新建第二个kafka的生产者工厂
*
* @param secondKafkaProperties 第二个kafka配置
* @return 第二个kafka的生产者工厂
*/privateDefaultKafkaProducerFactory<String,String>secondProducerFactory(KafkaProperties secondKafkaProperties){returnnewDefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties());}}
第四步:用
生产者用法
/**
* 第一个kafka配置默认就可以用,因为配置了@Primary
*/@ResourceprivateKafkaTemplate<String,String> firstKafkaTemplate;/**
* 第二个kafka配置需要指定下名字
*/@Resource(name ="secondKafkaTemplate")privateKafkaTemplate<String,String> secondKafkaTemplate;
消费者用法
/**
* 测试消费者
* 这里要消费哪一个kafka消息,containerFactory就需要配成上面相对应的消费者监听容器工厂
* @param record 消息
* @param ack ack
*/@KafkaListener(
containerFactory ="secondKafkaListenerContainerFactory",
topics ={"xxxx"},
groupId ="second-consumer")publicvoidtestConsumer(ConsumerRecord<?,?>record,Acknowledgment ack){//do something}
本文转载自: https://blog.csdn.net/qq_35893873/article/details/130620679
版权归原作者 小流慢影 所有, 如有侵权,请联系我们删除。
版权归原作者 小流慢影 所有, 如有侵权,请联系我们删除。