0


【SpringBoot系列】SpringBoot整合Kafka(含源码)

文章目录

579a429daf314744b995f37351b46548

前言

在现代的微服务架构中,消息队列已经成为了一个不可或缺的组件。

它能够帮助我们在不同的服务之间传递消息,并且能够确保这些消息不会丢失。

在众多的消息队列中,Kafka 是一个非常出色的选择。

它能够处理大量的实时数据,并且提供了强大的持久化能力。

在本文中,我们将会探讨如何在 SpringBoot 中整合 Kafka。


什么是Kafka?

Apache Kafka 是一个开源的流处理平台,由 LinkedIn 团队开发并于 2011 年贡献给 Apache 基金会。Kafka 以其高吞吐量、可扩展性和容错性而闻名。它是一个基于发布/订阅模式的消息系统,通常用于大型实时数据流处理应用。

Kafka 的主要组件包括:

  • Producer:负责发布消息到 Kafka 服务器。
  • Broker:是 Kafka 服务器实例,负责消息的存储、接收和发送。
  • Consumer:从 Kafka 服务器读取消息。
  • Topic:消息的类别或者说是消息的标签,Producer 将消息发布到特定的 Topic,Consumer 从特定的 Topic 读取消息。

Kafka 可以在分布式系统中用于构建实时流数据管道,它可以在系统或应用之间可靠地获取数据。此外,Kafka 可以和 Apache Storm、Apache Hadoop、Apache Spark 等进行集成,用于大数据处理和分析。


Kafka的应用场景?

日志收集:

一个公司可能有很多服务器,每个服务器上运行着很多服务,Kafka 可以用来实现这些服务的日志收集功能。各服务的日志分别发送到 Kafka 的不同 Topic 中。

消息系统:

Kafka 能够作为一个大规模的消息处理系统,各生产者将消息发送到 Kafka,消费者从 Kafka 中读取消息进行处理。

用户活动跟踪:

Kafka 也常用于用户活动跟踪和实时分析。例如,用户的点击、搜索等行为可以实时写入到 Kafka,然后进行实时或者离线分析。

在 Kafka 上可以进行实时的流处理。例如,使用 Apache Storm 集成 Kafka 来进行实时的数据处理。

指标和日志聚合:

统计数据和监控数据也是 Kafka 的一个重要应用场景。例如,通过 Kafka 可以收集各种分布式应用的数据,然后进行统一的处理和分析。

事件源:

Kafka 可以作为大规模事件处理的源头,例如,用户的行为、系统的状态等都可以作为事件,通过 Kafka 进行分发处理。


示例

版本依赖

模块版本SpringBoot3.1.0JDK17

代码
KafkaConfig
  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfig {
  4. @Bean
  5. public KafkaReceiver listener() {
  6. return new KafkaReceiver();
  7. }
  8. }
KafkaSender
  1. @Component
  2. @Slf4j
  3. public class KafkaSender {
  4. @Resource
  5. private KafkaTemplate<String, Object> kafkaTemplate;
  6. public void send(String topic, String key, String data) {
  7. //发送消息
  8. CompletableFuture<SendResult<String, Object>> completable = kafkaTemplate.send(topic, key, data);
  9. completable.whenCompleteAsync((result, ex) -> {
  10. if (null == ex) {
  11. log.info(topic + "生产者发送消息成功:" + result.toString());
  12. } else {
  13. log.info(topic + "生产者发送消息失败:" + ex.getMessage());
  14. }
  15. });
  16. }
  17. }
KafkaReceiver
  1. @Component
  2. @Slf4j
  3. public class KafkaReceiver {
  4. /**
  5. * 下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用","隔开
  6. */
  7. @KafkaListener(topics = {"testTopic"})
  8. public void receive(ConsumerRecord<?, ?> record){
  9. log.info("消费者收到的消息key: " + record.key());
  10. log.info("消费者收到的消息value: " + record.value().toString());
  11. }
  12. }
KafkaController
  1. /**
  2. * kafka 测试接口
  3. */
  4. @RestController
  5. public class KafkaController {
  6. @Autowired
  7. private KafkaSender kafkaSender;
  8. @GetMapping("/sendMessageToKafka")
  9. public String sendMessageToKafka() {
  10. Map<String, String> messageMap = new HashMap();
  11. messageMap.put("message", "hello world!");
  12. ObjectMapper objectMapper = new ObjectMapper();
  13. String data = null;
  14. try {
  15. data = objectMapper.writeValueAsString(messageMap);
  16. } catch (JsonProcessingException e) {
  17. throw new RuntimeException(e);
  18. }
  19. String key = String.valueOf(UUID.randomUUID());
  20. //kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)
  21. kafkaSender.send("testTopic", key, data);
  22. return "ok";
  23. }
  24. }
测试

http://127.0.0.1:8080/sendMessageToKafka

image-20231203195342080


image-20231203195407585


遇见问题

Error connecting to node xxxxxx:9092 (id: 0 rack: null)

Error connecting to node iZbp127a9vpra4v3kmkkmzZ:9092 (id: 0 rack: null)

解决方案

修改本地物理机hosts文件。文件目录:C:\Windows\System32\drivers\etc

image-20231203183127501

新增 xx.xx.xx.xx iZbp127a9vpra4v3kmkkmzZ

如果没生效,则需要重启系统


总结

通过上述的步骤,我们已经成功地在 SpringBoot 中整合了 Kafka。

这使得我们的应用程序能够在不同的服务之间传递消息,而不需要担心消息的丢失。

我们也看到,通过使用 SpringBoot,我们可以非常轻松地完成这个过程。

希望这篇文章能够帮助你在自己的项目中更好地使用 Kafka。


源码获取

如果需要完整源码请关注公众号"架构殿堂" ,回复 "SpringBoot+Kafka"即可获得


写在最后

感谢您的支持和鼓励! 😊🙏

如果大家对相关文章感兴趣,可以关注公众号"架构殿堂",会持续更新AIGC,java基础面试题, netty, spring boot, spring cloud等系列文章,一系列干货随时送达!

csdn-end


本文转载自: https://blog.csdn.net/jinxinxin1314/article/details/134770887
版权归原作者 fking86 所有, 如有侵权,请联系我们删除。

“【SpringBoot系列】SpringBoot整合Kafka(含源码)”的评论:

还没有评论