通过下面这个实例来演示Kafka与Mysql的组合使用
假设有一个学生表student,编写python程序完成如下操作
1读取student表的数据内容,将其转为JSON格式,发送给Kafka
2从Kafka中获取JSON格式数据,打印出来
---------------------------------------------------->
在使用Python操作Mysql之前,需要安装第三方模块python-kafka(在windows命令窗口)
win+r--->输入cmd然后回车
会出现一个小黑窗
输入命令pip install kafka-python安装python-kafka模块
查看我们安装的模块的版本信息(出现kafka-python2.0.2表示我们安装模块成功)
一、先在Windows命令窗口连接上mysql
win+r---->输入cmd---->回车
会出现一个小黑窗,在小黑窗中输入mysql -u root -p然后回车输入密码
二,在school001数据库下创建student表
1.创建school001数据库: create database school001;
2.查看现有的数据库:show databases;
如果有school001表示我们创建库成功
3.使用school001数据库:use school001;
4.创建student表:create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));
- 查看数据库中的表:show tables;
6.向表中插入两条数据
第一条: insert into student values("95001","John","M",23);
第二条: insert into student values("95002","Tom","M",23);
7.查看student表中的数据:
(到这里我们的student表就创建成功了!)
三、在python中创建producer.py文件读取student表的数据内容,将其转为JSON格式,发送给Kafka
# 运行前先在win上启动zookeap和kafka
# 导入相关模块
from kafka import KafkaProducer
import json
# 连接kafka json.dumps(v).encode('utf-8')将json格式的数抠转挨为字节类型,然后使用ut了-8进行编码
producer = KafkaProducer(bootstrap_servers = 'localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))
# 定义一个json格式的数第,json格式以键值对形式保存数掂,每个键值对之间使用逗号隔开
data = {
'sno':'95001',
'sname':'John',
'ssex':'M',
'sage':19
}
# 发送数据
producer.send('test001',data)
# 关闭资源
producer.close()
四、在python中创建consumer.py文件从Kafka中获取JSON格式数据
# 运行前先在win上启动mysql
# 导入消费模块
import json
# 导入kafka的消费模块
from kafka import KafkaConsumer
import json
import pymysql.cursors
# 连接kafka
consumer = KafkaConsumer('test001',bootstrap_servers = 'localhost:9092',group_id=None,auto_offset_reset='earliest')
# 对获取的数据进行解析
for msg in consumer:
# 转换为字符串类型
msg1 = str(msg.value,encoding=('utf-8'))
# 将字符串的数据加载为字典
dict = json.loads(msg1)
# 连接数据库
connect = pymysql.Connect(
host='localhost',
port=3306,
user='root',
passwd='123456',
db='school001',
charset='utf8'
)
# 获取操作数抠库的对象<游标>
cursor = connect.cursor()
#将数抠织存到mysqL(插入数掷)
# 定义sql语句
sql = "select * from student;"
# 将数掐作为参敏传速给sqL,保存到hrgsql
cursor.execute(sql)
# 提交
connect.commit()
for row in cursor.fetchall():
print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row)
print("共查询出", cursor.rowcount, '条数据')
connect.close()
五、运行
1.先在windows命令窗口开启 Zookeeper和Kafka
开启 Zookeeper和Kafka可以参考:(14条消息) Kafka的安装和使用(Windows中)_瑾寰的博客-CSDN博客https://blog.csdn.net/qq_68383591/article/details/130314335?spm=1001.2014.3001.55012.先运行producer.py再运行consumer.py
(在consumer.py中可以看到student表中的两条数据表示我们成功了!)
版权归原作者 瑾寰 所有, 如有侵权,请联系我们删除。