Python小案例(九)PySpark读写数据
有些业务场景需要Python直接读写Hive集群,也需要Python对MySQL进行操作。pyspark就是为了方便python读取Hive集群数据,当然环境搭建也免不了数仓的帮忙,常见的如开发企业内部的
Jupyter Lab
。
⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接公司hive集群的
利用PySpark读写Hive数据
# 设置PySpark参数from pyspark.sql import*
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.executor.instances","20") \
.config("spark.executor.cores","2") \
.config("spark.executor.memory","8g") \
.config("spark.driver.memory","8g") \
.enableHiveSupport() \
.getOrCreate()# 导入其他相关库import pandas as pd
from datetime import datetime
import pymysql # mysql连接库
创建hive表
sql_hive_create ='''
CREATE TABLE IF NOT EXISTS temp.hive_mysql
(
id int comment "id"
,dtype string comment "类型"
,cnt int comment "数量"
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
spark.sql(sql_hive_create)
DataFrame[]
写入hive表
sql_hive_insert ='''
insert overwrite table temp.hive_mysql
select 1 as id, 'A' as dtype, 10 as cnt
union all
select 2 as id, 'B' as dtype, 23 as cnt
'''
spark.sql(sql_hive_insert)
DataFrame[]
读取hive表
sql_hive_query ='''
select
id
,dtype
,cnt
from
temp.hive_mysql
'''
df = spark.sql(sql_hive_query).toPandas()
df.head()
iddtypecnt01A1012B23
利用Python读写MySQL数据
连接mysql
# 数据库信息
config ={'host':'***',# 默认127.0.0.1'user':'*',# 用户名'password':'*',# 密码'port':3306# 端口,默认为3306'database':'dbname'# 数据库名称}
# 校验关联是否成功
con = pymysql.connect(**config)# 建立mysql连接
cursor = con.cursor()# 获得游标
cursor.execute("show tables")# 查询表
1335
创建mysql表
sql_mysql_create ='''
CREATE TABLE IF NOT EXISTS `hive_mysql`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键'
,`hmid` int(30) NOT NULL DEFAULT '0' COMMENT 'hmid'
,`dtype` varchar(30) NOT NULL DEFAULT 'total_count' COMMENT '类型'
,`cnt` int(30) NOT NULL DEFAULT '0' COMMENT '数量'
,`dbctime` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间'
,`dbutime` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间'
,PRIMARY KEY (`id`)
,UNIQUE KEY `u_key` (`dtype`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT '题目数量'
'''# cursor.execute(sql_mysql_create) # 无建表权限,可申请权限或者内部管理工具手动建表
写入mysql表
insert_mysql_sql ='''
insert into hive_mysql (hmid, dtype, cnt) values (%s, %s, %s)
'''
try:
con = pymysql.connect(**config)# 建立mysql连接
cursor = con.cursor()# 获得游标# 清空数据
cursor.execute('truncate table hive_mysql')for i inrange(df.__len__()):# 插入的数据类型需要与数据库中字段类型保持一致
cursor.execute(insert_mysql_sql,(int(df.iloc[i,0]), df.iloc[i,1],int(df.iloc[i,2])))# 提交所有执行命令
con.commit()print('数据写入成功!')
cursor.close()# 关闭游标except Exception as e:raise e
finally:
con.close()# 关闭连接
数据写入成功!
读取mysql表
sql_mysql_query ='''
select
hmid
,dtype
,cnt
from
hive_mysql
'''
try:
con = pymysql.connect(**config)# 建立mysql连接
cursor = con.cursor()# 获得游标
cursor.execute(sql_mysql_query)# 执行sql语句
df_mysql = pd.DataFrame(cursor.fetchall())# 获取结果转为dataframe# 提交所有执行命令
con.commit()
cursor.close()# 关闭游标except Exception as e:raise e
finally:
con.close()# 关闭连接
df_mysql.head()
01201A1012B23
利用PySpark写入MySQL数据
日常最常见的是利用PySpark将数据批量写入MySQL,减少删表建表的操作。但由于笔者当前公司线上环境没有配置mysql的驱动,下述方法没法使用。
MySQL的安全性要求很高,正常情况下,分析师关于MySQL的权限是比较低的。所以很多关于MySQL的操作方法也是无奈之举~
# ## 线上环境需配置mysql的驱动# sp = spark.sql(sql_hive_query)# sp.write.jdbc(url="jdbc:mysql://***:3306/dbname", # dbname为库名,必须已存在(该语句不会创建库)# mode="overwrite", # 模式分为overwrite 重写表 append表内内容追加# table="hive_mysql", # 表名,表不需要去创建,可以自己生成# properties={'driver':'com.mysql.jdbc.Driver', 'user':'*', 'password':'*'})
总结
Python读取Hive数据,以及利用Python关联Hive和MySQL是后续自动化操作的基础,因此简单的理解PySpark如何进行Hive操作即可。
共勉~
版权归原作者 HsuHeinrich 所有, 如有侵权,请联系我们删除。