0


微服务集成Windows版kafka

微服务集成Windows版kafka

文章目录

1-兼容

Kafka 和 Spring Boot

兼容版本https://spring.io/projects/spring-kafka/
在这里插入图片描述

2-雷点

依赖版本需要匹配Spring Boot版本,这里使用的 <spring-boot.version>3.1.5</spring-boot.version> 版本

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.6.0</version></dependency>

mvnrepositoryhttps://mvnrepository.com/

一个 Maven 仓库的在线查找工具,用于查找和浏览 Java 开发中使用的依赖库(dependencies)的信息

3-安装

  1. ZookeeperApache ZooKeeper 项目的存档目录https://archive.apache.org/dist/zookeeper/> 在这个目录下,可以找到 Apache ZooKeeper 发布的历史版本以及与这些版本相关的二进制文件、源代码和其他相关文档。> Kafka 依赖于 Zookeeper,所以首先需要启动 Zookeeper 服务器,这里使用的 apache-zookeeper-3.5.5-bin 版本
  2. kafkaApache Kafka 官方网站下载https://kafka.apache.org/downloads> 这里使用的 kafka_2.12-3.5.1 版本

4-配置

  1. 环境配置(可选操作) 可以将 Kafka 的 bin 目录添加到系统的 PATH 环境变量中,方便可以在任何地方运行 Kafka 相关的命令。
  2. apache-zookeeper-3.5.5-bin\conf\zoo.cfg# ZooKeeper 基本时间单元,用于计算时间的基本单位(毫秒)tickTime=2000# 存储 ZooKeeper 数据的目录dataDir=D:/myApp/zookeeper/apache-zookeeper-3.5.5-bin/data# 用于接受客户端连接的端口号clientPort=2181# ZooKeeperAdminServer 的端口号(默认端口8080)admin.serverPort=8081

5-启动

  1. Zookeeper:bin目录zkServer.cmd
  2. Kafka:kafka_2.12-3.5.1目录.\bin\windows\kafka-server-start.bat .\config\server.properties

6-实现

  1. 生产者
packagecom.xueyi.sample.kafka.producer;importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclassKafkaProducerExample{publicstaticvoidmain(String[] args){// 配置Kafka生产者Properties properties =newProperties();
        properties.put("bootstrap.servers","localhost:9092");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者Producer<String,String> producer =newKafkaProducer<>(properties);// 发送消息ProducerRecord<String,String> record =newProducerRecord<>("your_topic","key","Hello , Kafka!");
        producer.send(record,(metadata, exception)->{if(exception ==null){System.out.println("Message sent successfully! Topic: "+ metadata.topic()+", Partition: "+ metadata.partition()+", Offset: "+ metadata.offset());}else{
                exception.printStackTrace();}});// 关闭生产者
        producer.close();}}
  1. 消费者packagecom.xueyi.sample.kafka.consumer;importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;publicclassKafkaConsumerExample{publicstaticvoidmain(String[] args){// 配置Kafka消费者Properties properties =newProperties(); properties.put("bootstrap.servers","localhost:9092"); properties.put("group.id","your_group_id"); properties.put("key.deserializer",StringDeserializer.class.getName()); properties.put("value.deserializer",StringDeserializer.class.getName());// 创建Kafka消费者Consumer<String,String> consumer =newKafkaConsumer<>(properties);// 订阅主题 consumer.subscribe(Collections.singletonList("your_topic"));// 拉取消息while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record ->{System.out.println("Received message: Key = "+ record.key()+", Value = "+ record.value()+", Topic = "+ record.topic()+", Partition = "+ record.partition()+", Offset = "+ record.offset());});}}}
  2. 发送消息> Message sent successfully! Topic: your_topic, Partition: 0, Offset: 7> 在这里插入图片描述
  3. 接收消息> Received message: Key = key, Value = Hello, Kafka!, Topic = your_topic, Partition = 0, Offset = 7在这里插入图片描述

本文转载自: https://blog.csdn.net/qq_40629687/article/details/135388910
版权归原作者 xinyi_java 所有, 如有侵权,请联系我们删除。

“微服务集成Windows版kafka”的评论:

还没有评论