0


Windows系统下python版本pyflink从pulsar中读取数据并写入到mysql当中以及异常问题解决。

文章目录

需求说明

  1. 现在有采集的数据上传到了pulsar中,利用flink将数据从pulsar中读取。
  2. 将从pulsar中读取的数据利用flink写到mysql数据库中。
  3. flink设置为流式模式。因为采集的数据上传到pulsar需要被立刻消费写入到mysql当中。

前言

   flink提供了python api的接口,本篇文章针对如何将上传到的pulsar上的**json数据**(根据项目的要求实际上是json数据需要序列化之后变成字符串类型)使用**DataStream connectors**而不是TabApi.
    在此过程中遇到一些问题,因为官网给的示例维护貌似不是最新的,存在了一些问题,这个后面会说,为了节省大家阅读的时间,将实现步骤和遇到的问题在本章统一说一下也当做自己的笔记。

   flink官网网址:https://nightlies.apache.org/flink/flink-docs-release-1.18/

在这里插入图片描述
也可以去看官网1.19版本的,但是目前看的1.19的是没有提供jar包下载,所以采用1.18版本的flink
在这里插入图片描述
pulsar python查看链接https://pulsar.apache.org/docs/3.3.x/client-libraries-python/
在这个链接上可以去看看用python怎样去上传以及获取pulsar中指定主题下的数据,在调试中可以参考用一下(可用可不用,不需要了解这方面的可以跳过下面的代码区域。)

import pulsar

defup_data():# 创建 Pulsar 客户端
    client = pulsar.Client("pulsar://ip:port")# 创建生产者
    producer = client.create_producer('persistent://主题')# 要发送的 JSON 数据
    json_data =[{
   },{
   },{
   }.....]# 发送 JSON 数据for record in json_data:
        producer.send(str(record).encode('utf-8'))# 关闭客户端
    client.close()defpulsar_read_data():# 创建 Pulsar 客户端
    client = pulsar.Client("pulsar://ip:port")# 创建 reader
    reader = client.create_reader('persistent://主题',
        pulsar.MessageId.latest,
        schema=pulsar.schema.BytesSchema())try:whileTrue:try:
                msg = reader.read_next(timeout_millis=2000)# print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
                msg_data = msg.data().decode('utf-8')# 解码字节数据# msg_datas = msg_data.replace("'", '"')# msg_json = json.loads(msg_datas)  # 转换为 JSON 格式print("Received message:", msg_data)# print("Message ID:", msg.message_id())except pulsar.Timeout:breakexcept Exception as e:print(f"An error occurred: {
     e}")finally:
        reader.close()
        client.close()

版本冲突问题

   为了解决版本冲突问题,我这里选择主要用到的库包和版本对应关系如下:

pip install apache-flink==1.18.1 -i https://pypi.tuna.tsinghua.edu.cn/simple

pip install pulsar-client==3.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple # 选择合适自己的pulsar-client版本

问题一:下载apache-flink包出现错误

   在下载的时候pip会提示错误,大家可以先看一下是不是**pycodestyle**和**isort**这两个包没有下载导致,目前作者遇到过这样的错误。直接安装一下这两个包就行

解决措施

pip install pycodestyle -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install isort -i https://pypi.tuna.tsinghua.edu.cn/simple

   接着继续说库包版本选择上。这里apache-flink是flink python api的库包。选择1.18.1这个版本,pulsar按照官网建议在3.0版本以上

在这里插入图片描述

   下面这个google-cloud-bigquery-storage在从pulsar的主题获取数据步骤可以先不安装,后面会介绍是咋回事。

pip install google-cloud-bigquery-storage==1.1.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

从pulsar指定主题中获取数据

   从pulsar的指定主题中获取数据这个在官网上有案例演示:https://nightlies.apache.org/flink/flink-docs-release-1.18/api/python/examples/datastream/connectors.html#pulsar

在这里插入图片描述
可以看到这个示例中是需要jar包的,这个在1.18版本下可以直接下载
在这里插入图片描述
随后我按照官网的示例写好了自己的代码,将示例代码运行的时候往pulsar的指定主题下上传数据,示例代码改动的地方为添加库包的方式(项目下新建了一个libs包)和从pulsar中的消费模式。

import logging
import sys
import os
from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink, StartCursor, \
    StopCursor, DeliveryGuarantee, TopicRoutingMode

if __name__ =='__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO,format="%(message)s")

    SERVICE_URL ='pulsar://localhost:port'#  替换自己的
    ADMIN_URL ='http://localhost:port'# 替换自己的

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    current_dir = os.path.abspath(os.path.dirname(__file__))# 获取工作目录绝对路径
    jar_path = os.path.join(current_dir,"libs")
    jars =[]forfilein os.listdir(jar_path):iffile.endswith('.jar'):
            file_path = current_dir +'/'+'libs'+'/'+file
            jars.append(file_path.replace(f'\\',f'/'))

    str_jars =';'.join(['file:///'+ jar for jar in jars])
    env.add_jars(str_jars)

    pulsar_source = PulsarSource.builder() \
        .set_service_url(SERVICE_URL) \
        .set_admin_url(ADMIN_URL) \
        .set_topics('pulsar主题名称') \
        .set_start_cursor(StartCursor.earliest()) \
        .set_unbounded_stop_cursor(StopCursor.never()) \
        .set_subscription_name('pyflink_subscription') \
        .set_deserialization_schema(SimpleStringSchema()) \
        .set_config('pulsar.source.enableAutoAcknowledgeMessage',True) \
        .set_properties({
   'pulsar.source.autoCommitCursorInterval':'1000'}) \
        .build()

    ds = env.from_source(source=pulsar_source,with ds.execute_and_collect()as result:for i in result:print(f"数据:{
     i}")
    env.execute()
   配置项解释如下:

    StartCursor.latest():从最新的消息开始读取
    StartCursor.earliest(): 从最早的消息开始读取
    StartCursor.timestamp(timestamp): 从指定时间点开始读取。这个时间点是以毫秒为单位的
    StartCursor.offset(offset): 从指定的偏移量开始读取。这个偏移量是相对于主题分区的一个位置。
    StartCursor.position(position): 从指定的位置开始读取。这个位置可以是EARLIEST、LATEST或者是一个具体的消息ID。
    
    set_service_url(service_url):指定 Pulsar 服务的 URL。
    set_admin_url(admin_url):指定 Pulsar 管理服务的 URL。
    set_topics(topic):设置要订阅的主题。
    set_subscription_name('my_test'):指定订阅的名称,
标签: windows python mysql

本文转载自: https://blog.csdn.net/zzp28218/article/details/140770338
版权归原作者 zzp28218 所有, 如有侵权,请联系我们删除。

“Windows系统下python版本pyflink从pulsar中读取数据并写入到mysql当中以及异常问题解决。”的评论:

还没有评论