前言:
上一篇分享了 Kafka 的一些基本概念及应用场景,本篇我们来分享一下在 Spring Boot 项目中如何使用 Kafka。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 集成 Kafka
引入 Kafka 依赖
在项目的 pom.xml 文件中引入 Kafka 依赖,如下:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency>
添加 Kafka 配置
在 application.properties 文件中配置 Kafka 相关配置,如下:
#kafka server 地址
spring.kafka.bootstrap-servers=10.xxx.4.xxx:9092,10.xxx.4.xxx::9092,10.xxx.4.xxx::9092
spring.kafka.producer.acks = 1
spring.kafka.producer.retries = 0
spring.kafka.producer.batch-size = 30720000
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#消费者配置
spring.kafka.consumer.group-id = test-kafka
#是否开启手动提交 默认自动提交
spring.kafka.consumer.enable-auto-commit = true
#如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000 自动提交已消费offset时间间隔
spring.kafka.consumer.auto-commit-interval = 5000
#earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
#kafka 没有创建指定的 topic 下 项目启动是否报错 true false
spring.kafka.listener.missing-topics-fatal = false
#Kafka 的消费模式 single 每次单条消费消息 batch 每次批量消费消息
spring.kafka.listener.type = single
#一次调用 poll() 操作时返回的最大记录数 默认为 500 条
spring.kafka.consumer.max-poll-records = 2
#消息 ACK 模式 有7种
spring.kafka.listener.ack-mode = manual_immediate
#kafka session timeout
spring.kafka.consumer.session.timeout.ms = 300000
配置 Kafka Producer
我们创建一个配置类,并配置生产者工厂,配置 KafkaTemplate。
packagecom.order.service.config;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.core.ProducerFactory;importjava.util.HashMap;importjava.util.Map;/**
* @author :author
* @description:
* @modified By:
* @version: V1.0
*/@Configuration@EnableKafkapublicclassKafkaProducerConfig{@Value("${spring.kafka.bootstrap-servers}")privateString servers;@Bean("myProducerKafkaProps")publicMap<String,Object>getMyKafkaProps(){Map<String,Object> props =newHashMap<>(4);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return props;}@BeanpublicProducerFactory<String,String>newProducerFactory(){returnnewDefaultKafkaProducerFactory<>(getMyKafkaProps());}@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(newProducerFactory());}}
配置 Kafka Cousumer
我们创建一个配置类,配置消费者工厂和监听容器。
packagecom.order.service.config;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importorg.springframework.kafka.listener.ContainerProperties;importjava.util.HashMap;importjava.util.Map;/**
* @author :author
* @description:
* @modified By:
* @version: V1.0
*/@Configuration@EnableKafkapublicclassKafkaConsumerConfig{@Value("${spring.kafka.bootstrap-servers}")privateString servers;@Value("${spring.kafka.consumer.group-id}")privateString groupId;@Value("${spring.kafka.consumer.auto-offset-reset}")privateString offsetReset;@Value("${spring.kafka.consumer.max-poll-records}")privateString maxPollRecords;@Value("${spring.kafka.consumer.auto-commit-interval}")privateString autoCommitIntervalMs;@Value("${spring.kafka.consumer.enable-auto-commit}")privateboolean enableAutoCommit;@Bean("myConsumerKafkaProps")publicMap<String,Object>getMyKafkaProps(){Map<String,Object> props =newHashMap<>(12);//是否自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//kafak 服务器
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//消费组id
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//一次调用poll()操作时返回的最大记录数,默认值为500
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//自动提交时间间隔 默认 5秒
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);return props;}/**
* @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer < java.lang.String, java.lang.String>>
* @date 2024/10/22 19:41
* @description kafka 消费者工厂
*/@Bean("myContainerFactory")publicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(newDefaultKafkaConsumerFactory<>(getMyKafkaProps()));// 并发创建的消费者数量
factory.setConcurrency(3);// 开启批处理
factory.setBatchListener(true);//拉取超时时间
factory.getContainerProperties().setPollTimeout(1500);//是否自动提交 ACK kafka 默认是自动提交if(!enableAutoCommit){//共有其中方式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);}return factory;}}
Kafka 消息发送
创建一个 Kafka 的 Producer,注入 KafkaTemplate,完成消息发送。
packagecom.order.service.kafka.producer;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Component;/**
* @ClassName: KafkaProducer
* @Author: Author
* @Date: 2024/10/22 19:22
* @Description:
*/@Slf4j@ComponentpublicclassMyKafkaProducer{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;publicvoidsendMessage(String message){this.kafkaTemplate.send("my-topic", message);}}
Kafka 消息消费
创建一个 Kafka 的 Consumer,使用 @KafkaListener 注解完成消息消费。
packagecom.order.service.kafka.consumer;importlombok.extern.slf4j.Slf4j;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;/**
* @ClassName: KafkaConsumer
* @Author: Author
* @Date: 2024/10/22 19:22
* @Description:
*/@Slf4j@ComponentpublicclassMyKafkaConsumer{@KafkaListener(id ="my-kafka-consumer",
idIsGroup =false, topics ="my-topic",
containerFactory ="myContainerFactory")publicvoidlisten(String message){
log.info("消息消费成功,消息内容:{}", message);}}
Kafka 消息发送消费测试
触发消息发送后,得到如下结果:
2024-10-22 20:22:43.041 INFO 36496 ---[-consumer-0-C-1] c.o.s.kafka.consumer.MyKafkaConsumer : 消息消费成功,消息内容:第一条 kafka 消息
结果符合预期。
以上我们简单的分享了使用 Spring Boot 集成 Kafka 的过程,希望帮助到有需要的朋友。
如有不正确的地方欢迎各位指出纠正。
版权归原作者 码农爱java 所有, 如有侵权,请联系我们删除。