目录
环境搭建
前置条件
- JDK 安装教程自行百度-这个比较简单。
- zookeeper1. zookeeper安装参考地址((2条消息) 快速搭建-分布式远程调用框架搭建-dubbo+zookper+springboot demo 演示_康世行的博客-CSDN博客)
1 ,复制 conf 文件夹下面的 zoo_sample.cfg 改名为 zoo.cfg 即可。因为没有配置文件,zookeeper 无法启动2 创建 dataDir 的临时目录 mkdir -p /temp/zookeeper //配置文件参考下面截图 修改完配置文件之后进行启动3 ,启动 sh zkServer.sh start
2. 修改zookeeper配合文件3. 启动成功ps aux|grep zookeeper
4. 开放端口号1 开放2181 端口 1.1 查看已经开发的端口 ,避免端口冲突 firewall-cmd --list-ports 1.2 开放2181 端口 firewall-cmd --zone=public --add-port=2181/tcp --permanent2 重启防火墙 使用规则生效 firewall-cmd --reload 2(因为我使用的是腾讯云,所以还得把腾讯云的控制台防火墙端口放开 2181
## 安装kafka 2.12.x 版本1. 下载kafka安装包cd /optwget http://archive.apache.org/dist/kafka/2.8.2/kafka_2.12-2.8.2.tgz
1. 安装遇到的问题(由于网站证书不安全导致)2. 解决方案sudo yum install -y ca-certificates //继续使用weget进行下载
3. 下载成功2. 安装1. 解压tar zxvf kafka_2.12-2.8.2.tgz
2. 进入kafka目录cd kafka_2.12-2.8.2/创建 logs 目录mkdir logs
3. 修改配置文件# 修改以下配置# 1.broker.id : 配置的是集群环境,要求每台kafka都有唯一的brokerid# 2.log.dir : 数据存放的目录# 3.zookeeper.connect : zookeeper连接池地址信息(zookeeper集群)# 4.delete.topic.enable : 是否直接删除topic# 5.host.name : 主机名称# 6.listeners=PLAINTEXT://server1:9092vim /opt/kafka_2.12-2.8.1/config/server.propertiesadvertised.listeners=PLAINTEXT://:9092 //在配置文件把这行注释解开log.dirs=/opt/kafka_2.12-2.8.1/logszookeeper.connect=server1:2181,server2:2181,server3:2181# 文件尾部添加以下内容delete.topic.enable=true# 退出并保存
4. 启动./kafka-server-start.sh -daemon ../config/server.properties //后台启动ps aux|grep kafka //查询kafka 运行状态
使用示例(发送消息)
- 服务器端测试kafka发送消息和消费消息1. 创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
2. 查看已经创建的topic./kafka-topics.sh --list --zookeeper localhost:2181
3. 发送消息./kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
4. 消费消息./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
- 代码测试pom
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>springBoot-kafka-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springBoot-kafka-demo</name> <description>springBoot-kafka-demo</description> <properties> <java.version>1.8</java.version> <fastjson.version>1.2.58</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
ymlserver: port: 8081spring: application: name: kafka-demo kafka: bootstrap-servers: 124.222.227.132:9092 consumer: group-id: kafka-demo-kafka-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
controllerpackage com.example.springbootkafkademo.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @Description 测试发送消息 * @ClassName TestController * @Author 康世行 * @Date 22:09 2023/2/5 * @Version 1.0 **/@RestController@RequestMapping("/test")public class TestController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("/send/{msg}/{topic}") public String sendMessage(@PathVariable("msg") String msg,@PathVariable("topic") String topic){ //发送消息到kafka kafkaTemplate.send(topic,msg); return "发送成功!"; }}
servicepackage com.example.springbootkafkademo.service;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;/** * @Description 监听发送的消息 * @ClassName HelloListener * @Author 康世行 * @Date 22:14 2023/2/5 * @Version 1.0 **/@Servicepublic class HelloListener { /** * 消费者端:指定监听话题 * * @param consumerRecord 监听到数据 */ @KafkaListener(topics = {"testTopic"}) public void handlerMsg(ConsumerRecord<String, String> consumerRecord) { System.out.println("接收到消息:消息值:" + consumerRecord.value() + ", 消息偏移量:" + consumerRecord.offset()); }}
测试127.0.0.1:8081/test/send/测试kafka发送消息32/testTopic测试结果``感谢阅读~~,希望对您有帮助。
本文转载自: https://blog.csdn.net/kangshihang1998/article/details/128894457
版权归原作者 康世行 所有, 如有侵权,请联系我们删除。
版权归原作者 康世行 所有, 如有侵权,请联系我们删除。