最近在做微服务的迁移改造工作,其中有一个服务需要订阅多个Kafka,如果使用spring kafka自动配置的话只能配置一个Kafka,不符合需求,该文总结了如何配置多个Kafka,希望对您有帮助。
文章目录
准备工作
- 自己搭建一个Kafka 从官方下载Kafka,选择对应Spring Boot 的版本,好在Kafka支持的版本范围比较广,当前最新版本是
3.2.1
,支持2.12-3.2.1
范围的版本,覆盖了Spring Boot 2.0x-Spring Boot 3.0.x
。https://kafka.apache.org/downloads
- 解压安装 进入
bin
目录,执行如下命令,按照如下顺序启动Linux
# 配置文件选择自己对应的目录
zookeeper-server-start.sh ../config/zookeeper.properties
Windows
windows/zookeeper-server-start.bat ../config/zookeeper.properties
打开另外一个终端,启动KafkaServer
Linux
kafka-server-start.sh ../config/server.properties
Windows
windows/kafka-server-start.bat ../config/server.properties
最小化配置Kafka
如下是最小化配置Kafka
pom.xml 引入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
application.properties
server.port=8090
spring.application.name=single-kafka-server
#kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092#消费者分组,配置后,自动创建
spring.kafka.consumer.group-id=default_group
KafkaProducer 生产者
@Slf4j@Component@EnableSchedulingpublicclassKafkaProducer{@ResourceprivateKafkaTemplate kafkaTemplate;privatevoidsendTest(){//topic 会自动创建
kafkaTemplate.send("topic1","hello kafka");}@Scheduled(fixedRate =1000*10)publicvoidtestKafka(){
log.info("send message...");sendTest();}}
KafkaConsumer 消费者
@Slf4j@ComponentpublicclassKafkaConsumer{@KafkaListener(topics ={"topic1"})publicvoidprocessMessage(String spuId){
log.warn("process spuId ={}", spuId);}}
运行效果:
多Kafka配置
配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖
pom.xml
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
application.properties
server.port=8090
spring.application.name=kafka-server
#kafka1#服务器地址
spring.kafka.one.bootstrap-servers=localhost:9092
spring.kafka.one.consumer.group-id=default_group
#kafka2
spring.kafka.two.bootstrap-servers=localhost:9092
spring.kafka.two.consumer.group-id=default_group2
第一个Kafka配置,
需要区分各Bean的名称
KafkaOneConfig
@ConfigurationpublicclassKafkaOneConfig{@Value("${spring.kafka.one.bootstrap-servers}")privateString bootstrapServers;@Value("${spring.kafka.one.consumer.group-id}")privateString groupId;@BeanpublicKafkaTemplate<String,String>kafkaOneTemplate(){returnnewKafkaTemplate<>(producerFactory());}@Bean(name ="kafkaOneContainerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaOneContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(3000);return factory;}privateProducerFactory<String,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}privateConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}privateMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return props;}privateMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);return props;}}
kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息
kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,
producerFactory 生产者工厂
consumerFactory 消费者工厂
producerConfigs 生产者配置
consumerConfigs 消费者配置
同样创建第二个Kafka,配置含义,同第一个Kafka
KafkaTwoConfig
@ConfigurationpublicclassKafkaTwoConfig{@Value("${spring.kafka.two.bootstrap-servers}")privateString bootstrapServers;@Value("${spring.kafka.two.consumer.group-id}")privateString groupId;@BeanpublicKafkaTemplate<String,String>kafkaTwoTemplate(){returnnewKafkaTemplate<>(producerFactory());}@Bean(name ="kafkaTwoContainerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaTwoContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(3000);return factory;}privateProducerFactory<String,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}publicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}privateMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return props;}privateMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);return props;}}
创建一个测试的消费者,注意配置不同的监听容器
containerFactory
KafkaConsumer
@Slf4j@ComponentpublicclassKafkaConsumer{@KafkaListener(topics ={"topic1"}, containerFactory ="kafkaOneContainerFactory")publicvoidoneProcessItemcenterSpuMessage(String spuId){
log.warn("one process spuId ={}", spuId);}@KafkaListener(topics ={"topic2"},containerFactory ="kafkaTwoContainerFactory")publicvoidtwoProcessItemcenterSpuMessage(String spuId){
log.warn("two process spuId ={}", spuId);}}
创建一个测试的生产者,定时往两个topic中发送消息
KafkaProducer
@Slf4j@ComponentpublicclassKafkaProducer{@ResourceprivateKafkaTemplate kafkaOneTemplate;@ResourceprivateKafkaTemplate kafkaTwoTemplate;privatevoidsendTest(){
kafkaOneTemplate.send("topic1","hello kafka one");
kafkaTwoTemplate.send("topic2","hello kafka two");}@Scheduled(fixedRate =1000*10)publicvoidtestKafka(){
log.info("send message...");sendTest();}}
最后运行效果:
其他kafka文章:
【从面试题看源码】-看完Kafka性能优化-让你吊打面试官
版权归原作者 阿提说说 所有, 如有侵权,请联系我们删除。