文章目录
前言
Kafka 是由 Apache 软件基金会开发的一个开源消息队列平台,它是一种高性能、可扩展、分布式的发布-订阅消息系统。Kafka 的架构被设计为高效、低延迟,并具有高吞吐量、持久性和可靠性。
在 Kafka 中,生产者将消息发布到主题(topic)中,消费者则从主题中消费消息,使用者可以将其看作一个 highly scalable 分布式 commit log 或者消息系统 (Messaging system),每个消息包含一个 key,一个 value 和一个额外的 timestamp。消息保留时间通过配置进行控制,当时间或空间满了的时候就根据策略来清除老数据,默认情况下老数据只保存 7 天。
特点:
1.高吞吐量:Kafka 在发布-订阅消息方面具有非常高的性能。它可以几乎实时地处理高速流入的大量数据。
实时处理:Kafka 能够处理高达数以百万计的消息,并准确地将消息排序和在群组内进行调度。
2.持久性和可靠性:与传统的消息系统不同,Kafka 具有持久性和可靠性。客户端自己提交当前偏移量,避免了可能出现的重复读取问题。
3.可扩展性:Kafka 可以在不繁琐的配置或修改信息格式等环节就能进行扩展。
4.多样化数据类型和来源:通过使用支持多种编程语言和操作系统的 API,Kafka 可以连接到许多各种来源的应用程序。
总之,Kafka 具有高性能、低时延,适合处理大规模物联网设备、日志、报警信息、传感器数据、消息等。
所以今天就来写一份关于熟悉Kafka的基本使用方法的实验,希望可以与小伙伴们一起探讨~~😉😉
一、实验平台
(1)操作系统:Windows7及以上(我用的是Windows 11)
(2)Kafka版本:kafka_2.12-2.4.0
(3)MySQL版本:8.0
二、实验内容
一、Kafka与MySQL的组合使用
1.实验要求
假设有一个学生表student,如下表所示,编写Python程序完成如下操作。
(1)读取student表的数据内容,将其转换为JSON格式,发送给Kafka
(2)从Kafka中获取JSON格式数据,打印出来
snosnamessexsage95001JohnM2395002TomM23
2.在MySQL中操作
(1)打开MySQL
方式一:
方式二:
可以通过 DOS 命令启动 MySQL 服务,windows+R,在搜索框中输入cmd,进去之后再输入services.msc,就进去服务系统里了,再启动就行
进去以后输入密码就可以开始执行mysql语句了
(2)创建数据库
create database school001;
(3)查看数据库
show databases;
发现数据库已经被创建完成
(4)使用该数据库
use school001;
(5)在该数据库中创建student表
create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));
(6)查询该数据库中的student表
show tables;
(7)向student表中插入值
insert into student values("95001","John","M",23);
insert into student values("95002","Tom","M",23);
(8)查询student表中的数据
select * from student;
查询结果:
(到这里我们的student表就创建成功了!)😊😊
3.安装Kafka
简单介绍:
Kafka 的运行需要 Java 环境的支持,因此,安装 Kafka 前需要在 Windows 操作系统中安装 JDK
访问 Kafka 官网,下载 Kafka 2.4.0的安装文件 kafka 2.12-2.4.0.1gz,解压缩到" C : \ "目录下(也可以放到D盘,不过最好放在D盘根目录下,不然后续代码容易报错,我试过)
因为 Katka 的运行依赖于 Zookeeper ,因此,还需要下载并安装 Zookeeper 。当然, Kafka 也内置了 Zookeeper 服务,因此,也可以不额外安装 Zookeeper ,直接使用内置的Zookeeper 服务。为简单起见,这里直接使用内置的Zookeeper 服务。
win+r—>输入cmd然后回车
输入命令pip install kafka-python安装python-kafka模块
查看我们安装的模块的版本信息(出现kafka-python2.0.2表示我们安装模块成功)
具体怎么安装可参考:kafka安装部署
4.使用Kafka
在实验中要用到Kafka就要先启动它的Zookeeper服务和Kafka,
且在实验过程中,千万不可以将其关闭,一旦关闭,服务就会停止
😡😡
在 Windows 操作系统中
打开第1个 cmd 命令行窗口,启动 Zookeeper 服务
:
cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties
注意,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Zookeeper 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Zookeeper 服务就会停止
如图:
打开第2个 cmd 命令行窗口,然后输入如下命令启动 Kafka 服务
:
cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-server-start.bat .\config\server.Properties
同样地,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Kafka 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Kafka 服务就会停止
若执行上面的命令以后,如果启动失败,并且出现提示信息"此时不应有\QuickTime\QTSstem\QTJava.zip ",则需要把环境变量 CLASSPATH 的相关信息删除。具体方法是,
右键单击"计算机",再单击"属性"一"高级系统设置"一"环境变量",然后,找到变量 CLASSPATH ,把类似下面的信息删除:
C : Program Files (x86) QuickTime\QTSystem QTJava . zip
然后重新启动计算机,让配置修改生效。重新启动计算机以后,再次按照上面的方法启动Zookeeper和Kafka
为了测试 Kafka ,这里创建一个主题,名称为" topic_test ",其包含一个分区,只有一个副本。
在第3个 cmd 命令行窗口中执行如下命令
:
cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-topics.bat -- create -- zookeeper localhost:2181-- replication -
factor 1-- partitions 1-- topic topic_test
可以继续执行如下命令,查看 topic _ test 是否创建成功:
.\bin\windows\kafka-topics.bat -- list -- zookeeper localhost:2181
如果创建成功,就可以在执行结果中看到 topic _ test
继续在第3个 cmd 命令行窗口中执行如下命令,创建一个生产者来产生消息
:
.\bin\windows\kafka-console-producer.bat -- broker-list localhost :9092 -topic topic_test
该命令执行以后,屏幕上的光标会持续闪烁,这时,可以用键盘输入一些内容,例如:
I love Kafka
Kafka is good
新建第4个 cmd 命令行窗口,执行如下命令来消费消息
:
cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic_test --from-beginning
该命令执行以后,屏幕上显示刚才输入的语句" I love Kafka “和” Kafka is good "
5.在PyCharm中操作
创建一个.py文件,写入以下代码,用于实现读取student表的数据内容,将其转换为JSON格式,发送给Kafka的功能
# 运行前先在win上启动zookeap和kafka# 导入相关模块from kafka import KafkaProducer
import json
# 连接kafka json.dumps(v).encode('utf-8')将json格式的数抠转挨为字节类型,然后使用ut了-8进行编码
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))# 定义一个json格式的数第,json格式以键值对形式保存数掂,每个键值对之间使用逗号隔开
data ={'sno':'95001','sname':'John','ssex':'M','sage':23}# 发送数据
producer.send('test001', data)# 关闭资源
producer.close()
运行结果如下图所示:
创建一个.py文件,写入以下代码,用于实现从Kafka中获取JSON格式数据,打印出来的功能
# 运行前先在win上启动mysql# 导入消费模块import json
# 导入kafka的消费模块from kafka import KafkaConsumer
import json
import pymysql.cursors
# 连接kafka
consumer = KafkaConsumer('test001', bootstrap_servers='localhost:9092', group_id=None, auto_offset_reset='earliest')# 对获取的数据进行解析for msg in consumer:# 转换为字符串类型
msg1 =str(msg.value, encoding=('utf-8'))# 将字符串的数据加载为字典dict= json.loads(msg1)# 连接数据库
connect = pymysql.Connect(
host='localhost',
port=3306,
user='root',
passwd='xxxxxxxx',#这是你MySQL数据库的密码
db='school001',
charset='utf8')# 获取操作数抠库的对象<游标>
cursor = connect.cursor()# 将数抠织存到mysqL(插入数掷)# 定义sql语句
sql ="select * from student;"# 将数据作为参数传速给sqL,保存到hrgsql
cursor.execute(sql)# 提交
connect.commit()for row in cursor.fetchall():print("sno:%s\tsname:%s\tssex:%s\tsage:%d"% row)print("共查询出", cursor.rowcount,'条数据')
connect.close()
运行结果如下图所示:
二、消费者手动提交
1.实验要求
生成一个data.json文件,内容如下:
data = [
{“name”: “Tony”, “age”: 21, “hobbies”: [“basketball”, “tennis”]},
{“name”: “Lisa”, “age”: 20, “hobbies”: [“sing”, “dance”]},
]
根据上面给出的data.json文件,执行如下操作。
(1)编写生产者程序,将JSON文件数据发送给Kafka。
(2)编写消费者程序,读取Kafka的JSON格式数据,并手动提交偏移量。
2.在PyCharm中操作
创建一个Test写入以下代码,来实现生成data.json文件的功能
:
import json
data =[{"name":"Tony","age":21,"hobbies":["basketball","tennis"]},{"name":"Lisa","age":20,"hobbies":["sing","dance"]},]withopen('../../data.json','w')as f:
json.dump(data, f)
创建一个.py文件,编写生产者程序,来实现将JSON文件数据发送给Kafka的功能
# 可以使用 Python 的 json 模块读取 data.json 文件,并将数据转换为字符串后发送给 Kafkafrom kafka import KafkaProducer
import json
data =[{"name":"Tony","age":21,"hobbies":["basketball","tennis"]},{"name":"Lisa","age":20,"hobbies":["sing","dance"]}]
producer = KafkaProducer(bootstrap_servers='localhost:9092')for item in data:# 将数据转换为字符串格式并发送给 Kafka 主题 test
message = json.dumps(item).encode('utf-8')
producer.send('test', value=message)
producer.close()
运行结果如下图所示:
创建一个.py文件,编写消费者程序,来实现读取Kafka的JSON格式数据,并手动提交偏移量的功能
# 我们可以使用 Kafka 消费者 API 进行数据消费,并在处理完每个消息后手动提交偏移量。from kafka import KafkaConsumer
import json
# 配置 Kafka 消费者,指定主题和分组等信息
consumer = KafkaConsumer('test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,# 禁止自动提交偏移量
group_id='my-group')# 循环消费 Kafka 消息for message in consumer:# 将传入的二进制消息内容解码为 JSON 格式的字符串
item = json.loads(message.value.decode('utf-8'))print(item)# 手动提交偏移量,确保下次消费时从正确的位置开始
consumer.commit()
运行结果如下图所示:
三、Kafka消费者订阅分区
1.实验要求
在命令行窗口中启动Kafka后,手动创建主题 “assign_topic” ,分区数量为2。具体命令如下:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic
根据上面给出的主题,完成如下操作。
(1)编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic” 。
(2)编写消费者程序1,订阅主题的分区0,只消费分区0数据。
(3)编写消费者程序2,订阅主题的分区1,只消费分区1数据。
2.在终端操作
首先要完成主题以及分区的创建才能编写程序,不然程序会报错
步骤:
- 使用windows+r,在弹窗中输入cmd打开终端
- 在终端中输入命令,创建主题和分区:
cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic
结果如下图(这是我之前已经创建好的结果图):
3.在PyCharm中操作
创建一个.py文件,写入以下代码,用于实现编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic的功能
:
from kafka import KafkaProducer
import uuid
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])for i inrange(5):
message =str(uuid.uuid4()).encode('utf-8')
producer.send('assign_topic', value=message)
producer.close()
运行结果如下图所示:
创建一个.py文件,写入以下代码,用于实现订阅主题的分区0,只消费分区0数据的功能
:
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,
consumer_timeout_ms=1000)
consumer.assign([TopicPartition('assign_topic',0)])for message in consumer:print("Partition 0 - Message value: {}".format(message.value))
consumer.close()
运行结果如下图所示:
创建一个.py文件,写入以下代码,用于实现订阅主题的分区1,只消费分区1数据的功能
:
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,
consumer_timeout_ms=1000)
consumer.assign([TopicPartition('assign_topic',1)])for message in consumer:print("Partition 1 - Message value: {}".format(message.value))
consumer.close()
运行结果如下图所示:
三、实验小bug
1. Kafka连接报错:kafka.errors.NoBrokersAvailable: NoBrokersAvailable 是什么原因:?
答:是因为程序运行了多次的原因
把tmp文件和logs文件里面的东西都删掉,就可以解决了
2. 为什么消费者程序1中有东西输出而消费者程序2中什么却什么也没输出?
消费者程序1和消费者程序2是对同一个主题的两个消费者应用程序。可以针对以下情况进行分析。
在主题 assign_topic 中,Kafka有多个分区,可用于并行处理消息。在这里,被消费的消息都来自此主题的第一个分区(即分区 0)。
消费者程序1使用了 .subscribe() 方法来订阅主题,这将导致消费者加入到消费组中,然后通过负载均衡策略从所有分区接收消息。因此,消费者程序1输出打印了分区 0 中的消息。
消费者程序2使用了 .assign() 方法手动分配消费者处理的分区,而且只分配了主题 assign_topic 的第一个分区(即分区 0)。但是,由于该程序没有运行足够长的时间,并且没有消费到任何未提交的偏移量,所以当应用程序终止时不会向Kafka服务器发送任何提交请求,这就可能导致在下一次启动时重复消费确认过的消息。因此,在生产环境中,请务必根据具体情况定期地提交所消费的分区的偏移量。
总结
以上就是对Kafka的基本使用方法的实验啦,有不明白的地方可以留言哦,希望能共同进步~~😀😀😀😀😀😀
版权归原作者 @¥文竹¥ 所有, 如有侵权,请联系我们删除。