0


使用Dask DataFrames 解决Pandas中并行计算的问题

如何将20GB的CSV文件放入16GB的RAM中。

如果你对Pandas有一些经验,并且你知道它最大的问题——它不容易扩展。有解决办法吗?

是的-Dask DataFrames。

大多数Dask API与Pandas相同,但是Dask可以在所有CPU内核上并行运行。它甚至可以在集群上运行,但这是另一个话题。

今天你将看到Dask在处理20GB CSV文件时比Pandas快多少。运行时值将因PC而异,所以我们将比较相对值。郑重声明,我使用的是MBP 16”8核i9, 16GB内存。

本文的结构如下:

  1. 数据集生成
  2. 处理单个CSV文件
  3. 处理多个CSV文件
  4. 结论

数据集生成

我们可以在线下载数据集,但这不是本文的重点。我们只对数据集大小感兴趣,而不是里面的东西。

因此,我们将创建一个有6列的虚拟数据集。第一列是一个时间戳——以一秒的间隔采样的整个年份,其他5列是随机整数值。

为了让事情更复杂,我们将创建20个文件,从2000年到2020年,每年一个。

在开始之前,请确保在笔记本所在的位置创建一个数据文件夹。下面是创建CSV文件的代码片段:

 import numpy as np
 import pandas as pd
 import dask.dataframe as dd
 from datetime import datetime
 for year in np.arange(2000, 2021):
     dates = pd.date_range(
         start=datetime(year=year, month=1, day=1),
         end=datetime(year=year, month=12, day=31),
         freq=’S’
     )
     df = pd.DataFrame()
     df[‘Date’] = dates
     for i in range(5):
         df[f’X{i}’] = np.random.randint(low=0, high=100, size=len(df))    df.to_csv(f’data/{year}.csv’, index=False)

你现在可以使用一个基本的Linux命令来列出数据目录:

 !ls -lh data/

以下是结果:

正如您所看到的,所有20个文件的大小都在1GB左右(更准确地说是1.09)。上面的代码片段需要一些时间来执行,但仍然比下载一个20GB文件要少得多。

接下来,让我们看看如何处理和聚合单个CSV文件。

处理单个CSV文件

目标:读取一个单独的CSV文件,分组的值按月,并计算每个列的总和。

用Pandas加载单个CSV文件再简单不过了。read_csv()函数接受parse_dates参数,该参数自动将一个或多个列转换为日期类型。

这个很有用,因为我们可以直接用dt。以访问月的值。下面是完整的代码片段:

 %%time
 df = pd.read_csv(‘data/2000.csv’, parse_dates=[‘Date’])
 monthly_total = df.groupby(df[‘Date’].dt.month).sum()

这是总运行时间:

对于1GB的文件来说还不错,但是运行时取决于您的硬件。让我们对Dask做同样的事情。这是代码:

 %%time
 df = dd.read_csv(‘data/2000.csv’, parse_dates=[‘Date’])
 monthly_total = df.groupby(df[‘Date’].dt.month).sum().compute()

与往常一样,在调用compute()函数之前,Dask不会完成任何处理。你可以看到下面的总运行时间:

让我们来比较一下不同点:

这并不是一个显著的区别,但Dask总体上是一个更好的选择,即使是对于单个数据文件。这是一个很好的开始,但是我们真正感兴趣的是同时处理多个文件。

接下来让我们探讨如何做到这一点。

处理多个CSV文件

目标:读取所有CSV文件,按年值分组,并计算每列的总和。

使用Pandas处理多个数据文件是一项乏味的任务。简而言之,你必须一个一个地阅读文件,然后把它们垂直地叠起来。

如果您考虑一下,单个CPU内核每次加载一个数据集,而其他内核则处于空闲状态。这不是最有效的方法。

glob包将帮助您一次处理多个CSV文件。您可以使用data/*. CSV模式来获取data文件夹中的所有CSV文件。然后,你必须一个一个地循环读它们。最后,可以将它们连接起来并进行聚合。

下面是完整的代码片段:

 %%time
 import glob
 all_files = glob.glob('data/*.csv')
 dfs = []
 for fname in all_files:
     dfs.append(pd.read_csv(fname, parse_dates=['Date']))
 df = pd.concat(dfs, axis=0)
 yearly_total = df.groupby(df['Date'].dt.year).sum()

下面是运行时的结果:

15分半钟似乎太多了,但您必须考虑到在此过程中使用了大量交换内存,因为没有办法将20+GB的数据放入16GB的RAM中。如果notebook 完全崩溃,使用少量的CSV文件。

让我们看看Dask提供了哪些改进。它接受read_csv()函数的glob模式,这意味着您不必使用循环。在调用compute()函数之前,不会执行任何操作,但这就是库的工作方式。

下面是加载和聚合的完整代码片段:

 %%time
 
 df = dd.read_csv(‘data/*.csv’, parse_dates=[‘Date’])
 yearly_total = df.groupby(df[‘Date’].dt.year).sum().compute()

下面是运行时的结果:

让我们来比较一下不同点:

正如您所看到的,当处理多个文件时,差异更显著——在Dask中大约快2.5倍。一个明显的赢家,毋庸置疑。

让我们在下一节结束这些内容。

结论

今天,您学习了如何从Pandas切换到Dask,以及当数据集变大时为什么应该这样做。Dask的API与Pandas是99%相同的,所以你应该不会有任何切换困难。

请记住—有些数据格式在Dask中是不支持的—例如XLS、Zip和GZ。此外,排序操作也不受支持,因为它不方便并行执行。

作者:Dario Radečić

原文地址:https://towardsdatascience.com/dask-dataframes-how-to-run-pandas-in-parallel-with-ease-b8b1f6b2646b

deephub翻译组

标签:

“使用Dask DataFrames 解决Pandas中并行计算的问题”的评论:

还没有评论