文章目录
一、安装与简介
- pyspark的spark dataframe背后的运算是java实现的,为了他们解决在jvm上运行效率较低的问题,python社区引入Pyarrow,使得避开spark原生使用python
pickle
的序列化和反序列化python对象(如pandas df转为spark df速度很慢,就是因为时间耗在数据序列化上了) - 下载PyArrow如下,并且在代码中设置
spark.conf.set(“spark.sql.execution.arrow.enabled”,“true”
。
conda install -c conda-forge pyarrow或pip install pyarrow
- Arrow是一个Python库,为创建,操作,格式化和转换日期,时间和时间戳提供了一种明智的,人性化的方法。 它实现和更新日期时间类型,填补功能上的空白,并提供支持许多常见创建场景的智能模块API。
二、使用小栗子
- 离线测试时,有时为了更快使用dataloader对测试集预测,而且df较大,可以使用上面介绍的
PyArrow
进行对df分块读入然后使用dataloader pa.Table.from_pandas(all_predcit)
可以将pd.df格式的all_predict
转为pyarrow.lib.Table
的dfpyarrow.parquet.write_table
函数可以保存pyarrow.lib.Table
格式的df
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow.parquet import write_table
df_test_all6 = pq.ParquetFile(df_test_all6_parquet)# 逐块读取数据
i =0
out_path ="data/pq_predict_ans.parquet"for batch in df_test_all6.iter_batches():
batch_df = batch.to_pandas()print("batch_df test:\n", batch_df)
beat_dense_features, beat_sparse_features, beat_train_dataloader, \
beat_val_dataloader, beat_test_dataloader = pq_dataloader_ans(batch_df)# model已经设置为model.eval()
batch_y_pred = predict_model(model, beat_test_dataloader)
batch_df['predict_prob']= batch_y_pred
if i ==0:
all_predcit = batch_df
all_predcit = pa.Table.from_pandas(all_predcit)else:
batch_df = pa.Table.from_pandas(batch_df)# all_predcit = pq.write_table(all_predcit + batch_df, out_path)
all_predcit = pa.concat_tables([all_predcit, batch_df])
i = i +1# save prediction result
pq.write_table(all_predcit, out_path)
Reference
[1] Python parquet.write_table方法代码示例
[2] python中对arrow库的总结
[3] 官方文档:pyarrow.parquet.write_table
[4] Is it possible to read parquet files in chunks?-stackoverflow
[5] 官方文档:pyarrow.concat_tables
[6] https://github.com/apache/arrow/issues/2192
[7] Using Apache PyArrow to optimize Spark & Pandas DataFrames conversions
[8] Python之pyarrow:pyarrow的简介、安装、使用方法之详细攻略
[9] pandas读取大量数据的分块处理
[10] https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.columns
版权归原作者 山顶夕景 所有, 如有侵权,请联系我们删除。