0


Python小案例(九)PySpark读写数据

Python小案例(九)PySpark读写数据

有些业务场景需要Python直接读写Hive集群,也需要Python对MySQL进行操作。pyspark就是为了方便python读取Hive集群数据,当然环境搭建也免不了数仓的帮忙,常见的如开发企业内部的

Jupyter Lab

⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接公司hive集群的

利用PySpark读写Hive数据

# 设置PySpark参数from pyspark.sql import*
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.instances","20") \
    .config("spark.executor.cores","2") \
    .config("spark.executor.memory","8g") \
    .config("spark.driver.memory","8g") \
    .enableHiveSupport() \
    .getOrCreate()# 导入其他相关库import pandas as pd
from datetime import datetime
import pymysql  # mysql连接库

创建hive表

sql_hive_create ='''
CREATE TABLE IF NOT EXISTS temp.hive_mysql
    (
        id int comment "id"
        ,dtype string comment "类型"
        ,cnt int comment "数量"
    )
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''

spark.sql(sql_hive_create)
DataFrame[]

写入hive表

sql_hive_insert ='''
insert overwrite table temp.hive_mysql

select 1 as id, 'A' as dtype, 10 as cnt

union all

select 2 as id, 'B' as dtype, 23 as cnt
'''

spark.sql(sql_hive_insert)
DataFrame[]

读取hive表

sql_hive_query ='''
select 
    id
    ,dtype
    ,cnt
from
    temp.hive_mysql
'''

df = spark.sql(sql_hive_query).toPandas()
df.head()

iddtypecnt01A1012B23

利用Python读写MySQL数据

连接mysql

# 数据库信息
config ={'host':'***',# 默认127.0.0.1'user':'*',# 用户名'password':'*',# 密码'port':3306# 端口,默认为3306'database':'dbname'# 数据库名称}
# 校验关联是否成功
con = pymysql.connect(**config)# 建立mysql连接
cursor = con.cursor()# 获得游标
cursor.execute("show tables")# 查询表
1335

创建mysql表

sql_mysql_create ='''
CREATE TABLE IF NOT EXISTS `hive_mysql`
    (
        `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键'
        ,`hmid` int(30) NOT NULL DEFAULT '0' COMMENT 'hmid'
        ,`dtype` varchar(30) NOT NULL DEFAULT 'total_count' COMMENT '类型'
        ,`cnt` int(30) NOT NULL DEFAULT '0' COMMENT '数量'

        ,`dbctime` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间'
        ,`dbutime` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间'

        ,PRIMARY KEY (`id`)
        ,UNIQUE KEY `u_key` (`dtype`)
    ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT '题目数量'
'''# cursor.execute(sql_mysql_create) # 无建表权限,可申请权限或者内部管理工具手动建表

写入mysql表

insert_mysql_sql ='''
insert into hive_mysql (hmid, dtype, cnt) values (%s, %s, %s)
'''
try:
    con = pymysql.connect(**config)# 建立mysql连接
    cursor = con.cursor()# 获得游标# 清空数据
    cursor.execute('truncate table hive_mysql')for i inrange(df.__len__()):# 插入的数据类型需要与数据库中字段类型保持一致
        cursor.execute(insert_mysql_sql,(int(df.iloc[i,0]), df.iloc[i,1],int(df.iloc[i,2])))# 提交所有执行命令
    con.commit()print('数据写入成功!')
    cursor.close()# 关闭游标except Exception as e:raise e
finally:
    con.close()# 关闭连接
数据写入成功!

读取mysql表

sql_mysql_query ='''
select 
    hmid
    ,dtype
    ,cnt
from
    hive_mysql
'''
try:
    con = pymysql.connect(**config)# 建立mysql连接
    cursor = con.cursor()# 获得游标
    
    cursor.execute(sql_mysql_query)# 执行sql语句
    df_mysql = pd.DataFrame(cursor.fetchall())# 获取结果转为dataframe# 提交所有执行命令
    con.commit()
    
    cursor.close()# 关闭游标except Exception as e:raise e
finally:
    con.close()# 关闭连接
df_mysql.head()

01201A1012B23

利用PySpark写入MySQL数据

日常最常见的是利用PySpark将数据批量写入MySQL,减少删表建表的操作。但由于笔者当前公司线上环境没有配置mysql的驱动,下述方法没法使用。

MySQL的安全性要求很高,正常情况下,分析师关于MySQL的权限是比较低的。所以很多关于MySQL的操作方法也是无奈之举~

# ## 线上环境需配置mysql的驱动# sp = spark.sql(sql_hive_query)# sp.write.jdbc(url="jdbc:mysql://***:3306/dbname",   # dbname为库名,必须已存在(该语句不会创建库)#               mode="overwrite",     # 模式分为overwrite 重写表    append表内内容追加#               table="hive_mysql",    # 表名,表不需要去创建,可以自己生成#               properties={'driver':'com.mysql.jdbc.Driver', 'user':'*', 'password':'*'})

总结

Python读取Hive数据,以及利用Python关联Hive和MySQL是后续自动化操作的基础,因此简单的理解PySpark如何进行Hive操作即可。

共勉~

标签: python hive

本文转载自: https://blog.csdn.net/weixin_39293132/article/details/128903668
版权归原作者 HsuHeinrich 所有, 如有侵权,请联系我们删除。

“Python小案例(九)PySpark读写数据”的评论:

还没有评论