Spring Boot 整合 Kafka 详解
本文将详细介绍如何在 Spring Boot 项目中整合 Apache Kafka,包括 Kafka 的配置、消息的同步和异步发送。
1. 环境准备
在开始之前,请确保你已经安装并配置好 Kafka 集群。如果还没有,请参考 Kafka 官方文档进行安装和配置。
2. 创建 Spring Boot 项目
2.1 使用 Spring Initializr 创建项目
访问 Spring Initializr,选择以下配置:
- Project: Maven Project
- Language: Java
- Spring Boot: 2.2.2.RELEASE
- Dependencies: Spring for Apache Kafka
点击 “Generate” 按钮,下载生成的项目,并解压到本地。
3. 添加依赖
在
pom.xml
文件中添加 Kafka 依赖:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>
4. 配置 Kafka
在
src/main/resources
目录下创建
application.yml
文件,并添加以下配置:
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
配置说明:
bootstrap-servers
: Kafka broker 的地址列表。consumer
: 消费者配置,包括消费者组 ID、偏移量重置策略、键和值的反序列化器。producer
: 生产者配置,包括键和值的序列化器。
5. 创建 Kafka 生产者
在
src/main/java/com/example/demo
目录下创建
KafkaProducerConfig.java
文件,并添加以下代码:
packagecom.example.demo;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaProducerConfig{@BeanpublicProducerFactory<String,String>producerFactory(){Map<String,Object> configProps =newHashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);returnnewDefaultKafkaProducerFactory<>(configProps);}@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}}
6. 发送消息
在
src/main/java/com/example/demo
目录下创建
KafkaProducerService.java
文件,并添加以下代码:
packagecom.example.demo;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Service;importorg.springframework.util.concurrent.ListenableFuture;importorg.springframework.util.concurrent.ListenableFutureCallback;@ServicepublicclassKafkaProducerService{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;privatestaticfinalStringTOPIC="my-topic";// 同步发送消息publicvoidsendMessageSync(String message){try{
kafkaTemplate.send(TOPIC, message).get();System.out.println("同步消息发送成功: "+ message);}catch(Exception e){
e.printStackTrace();System.out.println("同步消息发送失败: "+ message);}}// 异步发送消息publicvoidsendMessageAsync(String message){ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(TOPIC, message);
future.addCallback(newListenableFutureCallback<SendResult<String,String>>(){@OverridepublicvoidonSuccess(SendResult<String,String> result){System.out.println("异步消息发送成功: "+ message);}@OverridepublicvoidonFailure(Throwable ex){
ex.printStackTrace();System.out.println("异步消息发送失败: "+ message);}});}}
7. 测试 Kafka 生产者
在
src/main/java/com/example/demo
目录下创建
DemoApplication.java
文件,并添加以下代码:
packagecom.example.demo;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.CommandLineRunner;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublicclassDemoApplicationimplementsCommandLineRunner{@AutowiredprivateKafkaProducerService kafkaProducerService;publicstaticvoidmain(String[] args){SpringApplication.run(DemoApplication.class, args);}@Overridepublicvoidrun(String... args)throwsException{
kafkaProducerService.sendMessageSync("Hello, Kafka (Sync)!");
kafkaProducerService.sendMessageAsync("Hello, Kafka (Async)!");}}
8. 运行效果
运行
DemoApplication
类,将看到控制台输出如下消息:
同步消息发送成功: Hello, Kafka (Sync)!
异步消息发送成功: Hello, Kafka (Async)!
如果 Kafka 生产者发送消息失败,将看到错误信息。
9. 总结
本文详细介绍了如何在 Spring Boot 项目中整合 Apache Kafka,包括 Kafka 的配置、消息的同步和异步发送。通过理解和实践这些内容,可以帮助你更好地掌握 Spring Boot 与 Kafka 的整合与应用。希望本文对你有所帮助,如有任何疑问或建议,欢迎留言讨论。
版权归原作者 九转成圣 所有, 如有侵权,请联系我们删除。