文章目录
需求说明
- 现在有采集的数据上传到了pulsar中,利用flink将数据从pulsar中读取。
- 将从pulsar中读取的数据利用flink写到mysql数据库中。
- flink设置为流式模式。因为采集的数据上传到pulsar需要被立刻消费写入到mysql当中。
前言
flink提供了python api的接口,本篇文章针对如何将上传到的pulsar上的**json数据**(根据项目的要求实际上是json数据需要序列化之后变成字符串类型)使用**DataStream connectors**而不是TabApi.
在此过程中遇到一些问题,因为官网给的示例维护貌似不是最新的,存在了一些问题,这个后面会说,为了节省大家阅读的时间,将实现步骤和遇到的问题在本章统一说一下也当做自己的笔记。
flink官网网址:https://nightlies.apache.org/flink/flink-docs-release-1.18/
也可以去看官网1.19版本的,但是目前看的1.19的是没有提供jar包下载,所以采用1.18版本的flink
pulsar python查看链接https://pulsar.apache.org/docs/3.3.x/client-libraries-python/
在这个链接上可以去看看用python怎样去上传以及获取pulsar中指定主题下的数据,在调试中可以参考用一下(可用可不用,不需要了解这方面的可以跳过下面的代码区域。)
import pulsar
defup_data():# 创建 Pulsar 客户端
client = pulsar.Client("pulsar://ip:port")# 创建生产者
producer = client.create_producer('persistent://主题')# 要发送的 JSON 数据
json_data =[{
},{
},{
}.....]# 发送 JSON 数据for record in json_data:
producer.send(str(record).encode('utf-8'))# 关闭客户端
client.close()defpulsar_read_data():# 创建 Pulsar 客户端
client = pulsar.Client("pulsar://ip:port")# 创建 reader
reader = client.create_reader('persistent://主题',
pulsar.MessageId.latest,
schema=pulsar.schema.BytesSchema())try:whileTrue:try:
msg = reader.read_next(timeout_millis=2000)# print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
msg_data = msg.data().decode('utf-8')# 解码字节数据# msg_datas = msg_data.replace("'", '"')# msg_json = json.loads(msg_datas) # 转换为 JSON 格式print("Received message:", msg_data)# print("Message ID:", msg.message_id())except pulsar.Timeout:breakexcept Exception as e:print(f"An error occurred: {
e}")finally:
reader.close()
client.close()
版本冲突问题
为了解决版本冲突问题,我这里选择主要用到的库包和版本对应关系如下:
pip install apache-flink==1.18.1 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pulsar-client==3.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple # 选择合适自己的pulsar-client版本
问题一:下载apache-flink包出现错误
在下载的时候pip会提示错误,大家可以先看一下是不是**pycodestyle**和**isort**这两个包没有下载导致,目前作者遇到过这样的错误。直接安装一下这两个包就行
解决措施
pip install pycodestyle -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install isort -i https://pypi.tuna.tsinghua.edu.cn/simple
接着继续说库包版本选择上。这里apache-flink是flink python api的库包。选择1.18.1这个版本,pulsar按照官网建议在3.0版本以上
下面这个google-cloud-bigquery-storage在从pulsar的主题获取数据步骤可以先不安装,后面会介绍是咋回事。
pip install google-cloud-bigquery-storage==1.1.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
从pulsar指定主题中获取数据
从pulsar的指定主题中获取数据这个在官网上有案例演示:https://nightlies.apache.org/flink/flink-docs-release-1.18/api/python/examples/datastream/connectors.html#pulsar
可以看到这个示例中是需要jar包的,这个在1.18版本下可以直接下载
随后我按照官网的示例写好了自己的代码,将示例代码运行的时候往pulsar的指定主题下上传数据,示例代码改动的地方为添加库包的方式(项目下新建了一个libs包)和从pulsar中的消费模式。
import logging
import sys
import os
from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink, StartCursor, \
StopCursor, DeliveryGuarantee, TopicRoutingMode
if __name__ =='__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,format="%(message)s")
SERVICE_URL ='pulsar://localhost:port'# 替换自己的
ADMIN_URL ='http://localhost:port'# 替换自己的
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
current_dir = os.path.abspath(os.path.dirname(__file__))# 获取工作目录绝对路径
jar_path = os.path.join(current_dir,"libs")
jars =[]forfilein os.listdir(jar_path):iffile.endswith('.jar'):
file_path = current_dir +'/'+'libs'+'/'+file
jars.append(file_path.replace(f'\\',f'/'))
str_jars =';'.join(['file:///'+ jar for jar in jars])
env.add_jars(str_jars)
pulsar_source = PulsarSource.builder() \
.set_service_url(SERVICE_URL) \
.set_admin_url(ADMIN_URL) \
.set_topics('pulsar主题名称') \
.set_start_cursor(StartCursor.earliest()) \
.set_unbounded_stop_cursor(StopCursor.never()) \
.set_subscription_name('pyflink_subscription') \
.set_deserialization_schema(SimpleStringSchema()) \
.set_config('pulsar.source.enableAutoAcknowledgeMessage',True) \
.set_properties({
'pulsar.source.autoCommitCursorInterval':'1000'}) \
.build()
ds = env.from_source(source=pulsar_source,with ds.execute_and_collect()as result:for i in result:print(f"数据:{
i}")
env.execute()
配置项解释如下:
StartCursor.latest():从最新的消息开始读取
StartCursor.earliest(): 从最早的消息开始读取
StartCursor.timestamp(timestamp): 从指定时间点开始读取。这个时间点是以毫秒为单位的
StartCursor.offset(offset): 从指定的偏移量开始读取。这个偏移量是相对于主题分区的一个位置。
StartCursor.position(position): 从指定的位置开始读取。这个位置可以是EARLIEST、LATEST或者是一个具体的消息ID。
set_service_url(service_url):指定 Pulsar 服务的 URL。
set_admin_url(admin_url):指定 Pulsar 管理服务的 URL。
set_topics(topic):设置要订阅的主题。
set_subscription_name('my_test'):指定订阅的名称,
版权归原作者 zzp28218 所有, 如有侵权,请联系我们删除。