0


Kafka 下载安装及使用总结

1. 下载安装

官网下载地址:Apache Kafka

下载对应的文件

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

上传到服务器上,解压

tar-xzf kafka_2.13-3.7.0.tgz

目录结果如下

├── bin
│   └── windows
├── config
│   └── kraft
├── libs
├── licenses
└── site-docs

官方文档:Apache Kafka

kafka 有两种启动方式,ZooKeeper 和 KRaft,这里采用 KRaft 的方式,使用 kraft 目录下的配置文件

config
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-mirror-maker.properties
├── connect-standalone.properties
├── consumer.properties
├── kraft
│   ├── broker.properties
│   ├── controller.properties
│   └── server.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
├── trogdor.conf
└── zookeeper.properties

修改

server.properties

文件

  • process.roles:KRaft 模式角色 - broker- controller- broker,controller
  • node.id:节点 ID,需要为每个节点分配一个唯一的 ID
  • controller.quorum.voters:Controller 的投票者配置
  • log.dirs:日志目录

初始化集群,先生成一个 UUID

./bin/kafka-storage.sh random-uuid

再执行命令,使用生成的 UUID 完成集群初始化

./bin/kafka-storage.sh format-t thCDFveGRleJro7zTaOOGA -c ./config/kraft/server.properties

然后启动 Kafka

./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

2. Spring Boot 集成

2.1 引入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.1.4</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.0</version></dependency></dependencies>

2.2 配置文件

spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: fable-group

2.3 生产者

使用

KafkaTemplate

实现消息的生产

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassProducerController{privatefinalKafkaTemplate<String,String> kafkaTemplate;@AutowiredpublicProducerController(KafkaTemplate<String,String> kafkaTemplate){this.kafkaTemplate = kafkaTemplate;}@GetMapping("/send/message")publicStringsendMessage(){
        kafkaTemplate.send("test","Hello, Kafka!");return"Message sent successfully";}}

2.4 消费者

添加

@KafkaListener

注解,监听消息

importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Service;@ServicepublicclassConsumerService{@KafkaListener(topics ="test", groupId ="fable-group")publicvoidlisten(ConsumerRecord<String,String> consumerRecord){System.out.println("Received message: "+ consumerRecord.value());}}

2.5 启动类

添加

@EnableKafka

注解

importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.kafka.annotation.EnableKafka;@SpringBootApplication@EnableKafkapublicclassFableApplication{publicstaticvoidmain(String[] args){SpringApplication.run(FableApplication.class, args);}}

2.6 测试

访问

/send/message

接口,可以看到控制台打印出接收到的消息

Received message: Hello, Kafka!
标签: kafka 分布式

本文转载自: https://blog.csdn.net/ACE_U_005A/article/details/142326780
版权归原作者 GreyFable 所有, 如有侵权,请联系我们删除。

“Kafka 下载安装及使用总结”的评论:

还没有评论