微服务集成Windows版kafka
文章目录
1-兼容
Kafka 和 Spring Boot
兼容版本:https://spring.io/projects/spring-kafka/
2-雷点
依赖版本需要匹配Spring Boot版本,这里使用的 <spring-boot.version>3.1.5</spring-boot.version> 版本
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.6.0</version></dependency>
mvnrepository:https://mvnrepository.com/
一个 Maven 仓库的在线查找工具,用于查找和浏览 Java 开发中使用的依赖库(dependencies)的信息
3-安装
- ZookeeperApache ZooKeeper 项目的存档目录:https://archive.apache.org/dist/zookeeper/> 在这个目录下,可以找到 Apache ZooKeeper 发布的历史版本以及与这些版本相关的二进制文件、源代码和其他相关文档。> Kafka 依赖于 Zookeeper,所以首先需要启动 Zookeeper 服务器,这里使用的 apache-zookeeper-3.5.5-bin 版本
- kafkaApache Kafka 官方网站下载:https://kafka.apache.org/downloads> 这里使用的 kafka_2.12-3.5.1 版本
4-配置
- 环境配置(可选操作) 可以将 Kafka 的
bin
目录添加到系统的PATH
环境变量中,方便可以在任何地方运行 Kafka 相关的命令。 - apache-zookeeper-3.5.5-bin\conf\zoo.cfg
# ZooKeeper 基本时间单元,用于计算时间的基本单位(毫秒)tickTime=2000# 存储 ZooKeeper 数据的目录dataDir=D:/myApp/zookeeper/apache-zookeeper-3.5.5-bin/data# 用于接受客户端连接的端口号clientPort=2181# ZooKeeperAdminServer 的端口号(默认端口8080)admin.serverPort=8081
5-启动
- Zookeeper:bin目录
zkServer.cmd
- Kafka:kafka_2.12-3.5.1目录
.\bin\windows\kafka-server-start.bat .\config\server.properties
6-实现
- 生产者
packagecom.xueyi.sample.kafka.producer;importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclassKafkaProducerExample{publicstaticvoidmain(String[] args){// 配置Kafka生产者Properties properties =newProperties();
properties.put("bootstrap.servers","localhost:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者Producer<String,String> producer =newKafkaProducer<>(properties);// 发送消息ProducerRecord<String,String> record =newProducerRecord<>("your_topic","key","Hello , Kafka!");
producer.send(record,(metadata, exception)->{if(exception ==null){System.out.println("Message sent successfully! Topic: "+ metadata.topic()+", Partition: "+ metadata.partition()+", Offset: "+ metadata.offset());}else{
exception.printStackTrace();}});// 关闭生产者
producer.close();}}
- 消费者
packagecom.xueyi.sample.kafka.consumer;importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;publicclassKafkaConsumerExample{publicstaticvoidmain(String[] args){// 配置Kafka消费者Properties properties =newProperties(); properties.put("bootstrap.servers","localhost:9092"); properties.put("group.id","your_group_id"); properties.put("key.deserializer",StringDeserializer.class.getName()); properties.put("value.deserializer",StringDeserializer.class.getName());// 创建Kafka消费者Consumer<String,String> consumer =newKafkaConsumer<>(properties);// 订阅主题 consumer.subscribe(Collections.singletonList("your_topic"));// 拉取消息while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record ->{System.out.println("Received message: Key = "+ record.key()+", Value = "+ record.value()+", Topic = "+ record.topic()+", Partition = "+ record.partition()+", Offset = "+ record.offset());});}}}
- 发送消息> Message sent successfully! Topic: your_topic, Partition: 0, Offset: 7>
- 接收消息> Received message: Key = key, Value = Hello, Kafka!, Topic = your_topic, Partition = 0, Offset = 7
版权归原作者 xinyi_java 所有, 如有侵权,请联系我们删除。