kafka消费多个topic的使用
我们在业务中难免遇到一个kafka消费多个topic的消息,本文帮助大家如何在业务中用一个类消费多个topic消息
一、不同kafka的不同topic
配置类1
@EnableKafka
@Configuration
publicclassKafkaOneConfig{
@Value("${spring.kafka.one.bootstrap-servers}")private String bootstrapServers;
@Value("${spring.kafka.one.consumer.group-id}")private String groupId;
@Value("${spring.kafka.one.consumer.enable-auto-commit}")public boolean enableAutoCommit;
@Value("${spring.kafka.consumer.consumer.batch}")public boolean batch;
@Value("${spring.kafka.consumer.max-poll-records}")public String maxPollRecordsConfig;
@Value("${spring.kafka.consumer.auto-offset-reset}")public String autoOffsetReset;
@Value("${spring.kafka.properties.request.timeout.ms}")public Integer requestTimeout;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>kafkaOneListenerContainer(){//注意不同kafka的这个方法名不同
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());extracted(factory);return factory;}publicvoidextracted(ConcurrentKafkaListenerContainerFactory<Integer, String> factory){
factory.setBatchListener(batch);// 批量拉取数据
factory.setConcurrency(3);// 线程数
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);// ack模式
factory.getContainerProperties().setPollTimeout(requestTimeout);//请求超时时间}private ConsumerFactory<Integer, String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs(bootstrapServers, groupId));}public Map<String, Object>consumerConfigs(String bootstrapServers, String groupId){
Map<String, Object> props =newHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 不同kafka的连接串
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);// 每次批量获取的数据量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);// latest 接收最新的消息return props;}}
配置类2
@EnableKafka
@Configuration
publicclassKafkaTwoConfigextendsKafkaOneConfig{//继承关系,避免重复代码
@Value("${spring.kafka.two.bootstrap-servers}")private String bootstrapServers;
@Value("${spring.kafka.two.consumer.group-id}")private String groupId;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>kafkaTwoListenerContainer(){注意不同kafka的这个方法名不同
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());super.extracted(factory);return factory;}private ConsumerFactory<Integer, String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(super.consumerConfigs(bootstrapServers, groupId));}}
配置文件yml
spring:main:
allow-bean-definition-overriding:truekafka:one:
bootstrap-servers:123123consumer:topic:123
group-id:123
enable-auto-commit:truetwo:
bootstrap-servers:456456consumer:topic:456
group-id:123
enable-auto-commit:true
配置文件.properties:
spring.kafka.consumer.auto-offset-reset=latest
#spring.kafka.consumer.auto-commit-interval=3000
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.listener.concurrency=10
spring.kafka.Mysql.consumer.batch=true
spring.kafka.listener.type=batch
spring.kafka.listener.ack-mode=batch
spring.kafka.properties.request.timeout.ms=60000
消费者类java
@KafkaListener(topics ={"${spring.kafka.one.consumer.topic}"}, containerFactory ="kafkaOneListenerContainer")// 读取配置的类1方法publicvoidconsume1(List<ConsumerRecord<String, String>> records){}
@KafkaListener(topics ={"${spring.kafka.two.consumer.topic}"}, containerFactory ="kafkaTwoListenerContainer")// 读取配置的类2方法publicvoidconsume2(List<ConsumerRecord<String, String>> records){}
二、同一个kafka的不同topic
直接使用@KafkaListiner
@KafkaListener(id ="123",
topics ={// 注意kafka每次批量读取数据的时候不是每次只读取一个topic的消息,而是读取多个topic的消息"a","b","c",})publicvoidconsume(List<ConsumerRecord<String, String>> records){}
本文转载自: https://blog.csdn.net/m0_68178610/article/details/130564794
版权归原作者 黑马Y 所有, 如有侵权,请联系我们删除。
版权归原作者 黑马Y 所有, 如有侵权,请联系我们删除。