Python小案例(十)利用PySpark循环写入数据
在做数据分析的时候,往往需要回溯历史数据。但有时候构建历史数据时需要变更参数重复跑数,公司的数仓调度系统往往只支持日期这一个参数,而且为临时数据生产调度脚本显得有点浪费。这个时候就可以结合python的字符串格式化和PySpark的Hive写入,就可以完成循环写入临时数据。
⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接企业hive集群的
案例一:多参数循环写入临时表
案例背景:写入每天的热搜数据,热搜类型分为当日、近1日、近2日、近3日。这里为了方便,简化了循环的力度。
from pyspark.sql import*# spark配置
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.executor.instances","20") \
.config("spark.executor.cores","2") \
.config("spark.executor.memory","8g") \
.config("spark.driver.memory","8g") \
.enableHiveSupport() \
.getOrCreate()# 导入其他相关库import pandas as pd
from datetime import datetime
# sql创建临时表
sql_create ='''
CREATE TABLE temp.loop_write_example
(
cnt string comment "近n日cnt"
)
PARTITIONED BY (`point_date` string, `dtype` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
spark.sql(sql_create)
DataFrame[]
构造日期
'{dt}'
和热搜类型
{num}
两个参数
# sql写入临时表
sql_insert ='''
insert overwrite table temp.loop_write_example partition (point_date = '{dt}',dtype={num})
select
sum(if(dt between date_add('{dt}',-{num}) and '{dt}',cnt,null)) as cnt
from
temp.loop_write_example_fake_data
where
dt between date_add('{dt}',-4) and '{dt}'
'''
dates = pd.date_range('2021-01-01','2021-01-10').strftime("%Y-%m-%d").to_list()# 日期范围
# 循环写入临时表for point_date in dates:if point_date>='2021-01-01'and point_date<'2021-01-03':for dtype inrange(0,4):
start_time = datetime.now()
spark.sql(sql_insert.format(dt=point_date, num=dtype))
end_time=datetime.now()print(point_date, dtype,"succeed",'耗时'+str((end_time-start_time).seconds)+'秒')
2021-01-01 0 succeed 耗时8秒
2021-01-01 1 succeed 耗时7秒
2021-01-01 2 succeed 耗时8秒
2021-01-01 3 succeed 耗时8秒
2021-01-02 0 succeed 耗时8秒
2021-01-02 1 succeed 耗时8秒
2021-01-02 2 succeed 耗时8秒
2021-01-02 3 succeed 耗时8秒
案例二:并发批量写入hdfs
案例背景:将2亿+题目按规则分批写入hdfs,供研发通过接口查询,每个hdfs要求最大1000w。
from pyspark.sql import*
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.executor.instances","20") \
.config("spark.executor.cores","2") \
.config("spark.executor.memory","8g") \
.config("spark.driver.memory","8g") \
.enableHiveSupport() \
.getOrCreate()import math
import pandas as pd
from datetime import datetime
import time
import os
# 为了方便,通过规则生成的数据存入临时表temp.hh_qids中,规则细节无需了解# 查看数据量级
df_cnt = spark.sql('select count(1) as cnt from temp.hh_qids').toPandas()
N = df_cnt['cnt'].loc[0]# 获取数据量级print(N)
273230858
# 创建表,通过参数i生成表后缀
creat_sql ='''
CREATE TABLE IF NOT EXISTS temp.hh_mult_write_{i}
(
questionid string comment "题目ID"
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
# 写入表,写入上述创建的临时表
insert_sql ='''
insert overwrite table temp.hh_mult_write_{i}
select
questionid
from
temp.hh_qids
where
ceil(rn/10000000)={i}
order by
questionid
limit 100000000
'''
- 循环写入
%%time
# 通过循环创建多个临时表并写入for i inrange(1,math.ceil(N/10000000)+1):
start_time = datetime.now()
spark.sql(creat_sql.format(i=i))# 创建表
spark.sql(insert_sql.format(i=i))# 写入表
end_time=datetime.now()print(f"成功写入hh_mult_write_{i},"+'耗时'+str((end_time-start_time).seconds)+'秒')
成功写入hh_mult_write_1,耗时38秒
成功写入hh_mult_write_2,耗时59秒
成功写入hh_mult_write_3,耗时36秒
成功写入hh_mult_write_4,耗时34秒
成功写入hh_mult_write_5,耗时29秒
成功写入hh_mult_write_6,耗时26秒
成功写入hh_mult_write_7,耗时44秒
成功写入hh_mult_write_8,耗时43秒
成功写入hh_mult_write_9,耗时32秒
成功写入hh_mult_write_10,耗时49秒
成功写入hh_mult_write_11,耗时33秒
成功写入hh_mult_write_12,耗时34秒
成功写入hh_mult_write_13,耗时38秒
成功写入hh_mult_write_14,耗时24秒
成功写入hh_mult_write_15,耗时40秒
成功写入hh_mult_write_16,耗时34秒
成功写入hh_mult_write_17,耗时39秒
成功写入hh_mult_write_18,耗时45秒
成功写入hh_mult_write_19,耗时50秒
成功写入hh_mult_write_20,耗时35秒
成功写入hh_mult_write_21,耗时46秒
成功写入hh_mult_write_22,耗时38秒
成功写入hh_mult_write_23,耗时29秒
成功写入hh_mult_write_24,耗时31秒
成功写入hh_mult_write_25,耗时28秒
成功写入hh_mult_write_26,耗时36秒
成功写入hh_mult_write_27,耗时32秒
成功写入hh_mult_write_28,耗时17秒
CPU times: user 124 ms, sys: 31.8 ms, total: 156 ms
Wall time: 17min 15s
这次通过大量级数据实战演示,可以发现效率还可以,写入28个文件仅需17min 15s。但日常业务中可能存在更复杂的写入或者更大的量级,那有没有办法提高效率呢?
大家都知道python的循环是单线程的,在一次循环结束前是不会调起下次循环的。而调度系统一般也可以支持并发,那python是不是也能通过并发实现多线程呢?当然可以了,方法有不少,但我实验后发现还是
joblib
好用。
这里通过一个简单的小case演示
joblib
的效果
# 查看集群服务器cpu数量print(os.cpu_count())
48
%%time # 查看简单循环的执行时间:15sfor i inrange(5):for j inrange(3): time.sleep(1)print(i*j)
0
0
0
0
1
2
0
2
4
0
3
6
0
4
8
CPU times: user 12.2 ms, sys: 6.18 ms, total: 18.3 ms
Wall time: 15 s%%time # 查看多线程下的执行时间:1.35s(好家伙,快了10倍多!)from joblib import Parallel, delayed defproduct2(x,y): time.sleep(1)return x*y # n_jobs=-1表示使用全部cpu Parallel(n_jobs=-1)(delayed(product2)(i,j)for i inrange(5)for j inrange(3))
CPU times: user 111 ms, sys: 233 ms, total: 344 ms
Wall time: 1.35 s[0, 0, 0, 0, 1, 2, 0, 2, 4, 0, 3, 6, 0, 4, 8]
大家可以看到,提速效果还是杠杠滴,那实际应用会不会也如此优秀呢?
- 并发写入
# 构造函数-将单次循环的主要过程包装成函数以便Parallel调用defcreat_insert(i):
start_time = datetime.now()
spark.sql(creat_sql.format(i=i))# 创建表
spark.sql(insert_sql.format(i=i))# 写入表
end_time=datetime.now()
print_str =f"成功写入hh_mult_test_{i},"+'耗时'+str((end_time-start_time).seconds)+'秒'return print_str
%%time
# 并发写入from joblib import Parallel, delayed
# 集群服务器大家都在用,在做大任务处理时,不建议使用全部cpu,这里使用一半足矣
Parallel(n_jobs=24, prefer="threads")(delayed(creat_insert)(i)for i inrange(1,math.ceil(N/10000000)+1))
CPU times: user 87.6 ms, sys: 18.8 ms, total: 106 ms
Wall time: 1min 49s
['成功写入hh_mult_test_1,耗时44秒',
'成功写入hh_mult_test_2,耗时41秒',
'成功写入hh_mult_test_3,耗时83秒',
'成功写入hh_mult_test_4,耗时49秒',
'成功写入hh_mult_test_5,耗时89秒',
'成功写入hh_mult_test_6,耗时71秒',
'成功写入hh_mult_test_7,耗时89秒',
'成功写入hh_mult_test_8,耗时72秒',
'成功写入hh_mult_test_9,耗时83秒',
'成功写入hh_mult_test_10,耗时77秒',
'成功写入hh_mult_test_11,耗时80秒',
'成功写入hh_mult_test_12,耗时65秒',
'成功写入hh_mult_test_13,耗时53秒',
'成功写入hh_mult_test_14,耗时109秒',
'成功写入hh_mult_test_15,耗时81秒',
'成功写入hh_mult_test_16,耗时73秒',
'成功写入hh_mult_test_17,耗时41秒',
'成功写入hh_mult_test_18,耗时78秒',
'成功写入hh_mult_test_19,耗时84秒',
'成功写入hh_mult_test_20,耗时93秒',
'成功写入hh_mult_test_21,耗时68秒',
'成功写入hh_mult_test_22,耗时78秒',
'成功写入hh_mult_test_23,耗时48秒',
'成功写入hh_mult_test_24,耗时88秒',
'成功写入hh_mult_test_25,耗时54秒',
'成功写入hh_mult_test_26,耗时59秒',
'成功写入hh_mult_test_27,耗时62秒',
'成功写入hh_mult_test_28,耗时37秒']
可以看到,每个文件的写入时间与循环差不多,都是在60秒左右。但整体只花了1min 49s,快了10倍以上。
- 删除测试数据
%%time
# 测试数据量较大,无端占用公司资源是不对的,所以需要删除下。# 但要我手动一个个删除那也是不可能的,做个简单的for循环即可for i inrange(1,29):
drop_sql='''
DROP TABLE IF EXISTS temp.hh_mult_test_1{i};
'''
spark.sql(drop_sql.format(i=i))# 删除表
CPU times: user 3.94 ms, sys: 1.96 ms, total: 5.91 ms
Wall time: 148 ms
总结
至此,python小案例系列也结束了,案例基本来源于我的日常业务。在处理复杂需求,提升工作效率方面,Python还是有一席之地的。不知道大家有没有什么实用的python处理日常需求的小案例呢?
共勉~
版权归原作者 HsuHeinrich 所有, 如有侵权,请联系我们删除。