0


【SpringBoot系列】SpringBoot整合Kafka(含源码)

文章目录

579a429daf314744b995f37351b46548

前言

在现代的微服务架构中,消息队列已经成为了一个不可或缺的组件。

它能够帮助我们在不同的服务之间传递消息,并且能够确保这些消息不会丢失。

在众多的消息队列中,Kafka 是一个非常出色的选择。

它能够处理大量的实时数据,并且提供了强大的持久化能力。

在本文中,我们将会探讨如何在 SpringBoot 中整合 Kafka。


什么是Kafka?

Apache Kafka 是一个开源的流处理平台,由 LinkedIn 团队开发并于 2011 年贡献给 Apache 基金会。Kafka 以其高吞吐量、可扩展性和容错性而闻名。它是一个基于发布/订阅模式的消息系统,通常用于大型实时数据流处理应用。

Kafka 的主要组件包括:

  • Producer:负责发布消息到 Kafka 服务器。
  • Broker:是 Kafka 服务器实例,负责消息的存储、接收和发送。
  • Consumer:从 Kafka 服务器读取消息。
  • Topic:消息的类别或者说是消息的标签,Producer 将消息发布到特定的 Topic,Consumer 从特定的 Topic 读取消息。

Kafka 可以在分布式系统中用于构建实时流数据管道,它可以在系统或应用之间可靠地获取数据。此外,Kafka 可以和 Apache Storm、Apache Hadoop、Apache Spark 等进行集成,用于大数据处理和分析。


Kafka的应用场景?

日志收集:

一个公司可能有很多服务器,每个服务器上运行着很多服务,Kafka 可以用来实现这些服务的日志收集功能。各服务的日志分别发送到 Kafka 的不同 Topic 中。

消息系统:

Kafka 能够作为一个大规模的消息处理系统,各生产者将消息发送到 Kafka,消费者从 Kafka 中读取消息进行处理。

用户活动跟踪:

Kafka 也常用于用户活动跟踪和实时分析。例如,用户的点击、搜索等行为可以实时写入到 Kafka,然后进行实时或者离线分析。

在 Kafka 上可以进行实时的流处理。例如,使用 Apache Storm 集成 Kafka 来进行实时的数据处理。

指标和日志聚合:

统计数据和监控数据也是 Kafka 的一个重要应用场景。例如,通过 Kafka 可以收集各种分布式应用的数据,然后进行统一的处理和分析。

事件源:

Kafka 可以作为大规模事件处理的源头,例如,用户的行为、系统的状态等都可以作为事件,通过 Kafka 进行分发处理。


示例

版本依赖

模块版本SpringBoot3.1.0JDK17

代码
KafkaConfig
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public KafkaReceiver listener() {
        return new KafkaReceiver();
    }

}
KafkaSender
@Component
@Slf4j
public class KafkaSender {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic, String key, String data) {
        //发送消息
        CompletableFuture<SendResult<String, Object>> completable = kafkaTemplate.send(topic, key, data);
        completable.whenCompleteAsync((result, ex) -> {
            if (null == ex) {
                log.info(topic + "生产者发送消息成功:" + result.toString());
            } else {
                log.info(topic + "生产者发送消息失败:" + ex.getMessage());
            }
        });
    }
}
KafkaReceiver
@Component
@Slf4j
public class KafkaReceiver {
    /**
     * 下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用","隔开
     */
    @KafkaListener(topics = {"testTopic"})
    public void receive(ConsumerRecord<?, ?> record){
        log.info("消费者收到的消息key: " + record.key());
        log.info("消费者收到的消息value: " + record.value().toString());
    }
}
KafkaController
/**
 * kafka 测试接口
 */
@RestController
public class KafkaController {
    @Autowired
    private KafkaSender kafkaSender;

    @GetMapping("/sendMessageToKafka")
    public String sendMessageToKafka() {
        Map<String, String> messageMap = new HashMap();
        messageMap.put("message", "hello world!");
        ObjectMapper objectMapper = new ObjectMapper();
        String data = null;
        try {
            data = objectMapper.writeValueAsString(messageMap);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        String key = String.valueOf(UUID.randomUUID());
        //kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)
        kafkaSender.send("testTopic", key, data);
        return "ok";
    }
}
测试

http://127.0.0.1:8080/sendMessageToKafka

image-20231203195342080


image-20231203195407585


遇见问题

Error connecting to node xxxxxx:9092 (id: 0 rack: null)

Error connecting to node iZbp127a9vpra4v3kmkkmzZ:9092 (id: 0 rack: null)

解决方案

修改本地物理机hosts文件。文件目录:C:\Windows\System32\drivers\etc

image-20231203183127501

新增 xx.xx.xx.xx iZbp127a9vpra4v3kmkkmzZ

如果没生效,则需要重启系统


总结

通过上述的步骤,我们已经成功地在 SpringBoot 中整合了 Kafka。

这使得我们的应用程序能够在不同的服务之间传递消息,而不需要担心消息的丢失。

我们也看到,通过使用 SpringBoot,我们可以非常轻松地完成这个过程。

希望这篇文章能够帮助你在自己的项目中更好地使用 Kafka。


源码获取

如果需要完整源码请关注公众号"架构殿堂" ,回复 "SpringBoot+Kafka"即可获得


写在最后

感谢您的支持和鼓励! 😊🙏

如果大家对相关文章感兴趣,可以关注公众号"架构殿堂",会持续更新AIGC,java基础面试题, netty, spring boot, spring cloud等系列文章,一系列干货随时送达!

csdn-end


本文转载自: https://blog.csdn.net/jinxinxin1314/article/details/134770887
版权归原作者 fking86 所有, 如有侵权,请联系我们删除。

“【SpringBoot系列】SpringBoot整合Kafka(含源码)”的评论:

还没有评论