0


Spark进行独热编码

本文总计 500 字,预计阅读需要 2-3 分钟

Pandas 进行独热编码是非常简单的,那么如果使用 Spark该怎么做

独热编码(One-hot-encoding) 是为机器学习建模准备数据时最常见的步骤之一。独热编码将分类数据转换为二进制向量表示。此方法为原始类别列中的每个唯一值创建一个新列。

Spark 非常擅长处理 PB 级的数据。但在复杂的建模技术方面,Spark 的 对于ML 支持还不够强大。所以是很多时候数据预处理是在 Spark 中完成,而下游建模是用纯 Python 或其他高级语言(如 Julia)实现的。

首先,我们需要设置 PySpark 会话并读取原始数据集。

from pyspark.sql import SparkSession 
spark = SparkSession.builder \    
          .appName("One_Hot_Encoding_Blog") \    
          .getOrCreate() 
df = spark.read \    
      .option("header", True) \    
      .csv("ohe_explanation.csv") 
df.show()

上面的代码产生以下结果:

+------+
| Color|
+------+
|   Red|
|   Red|
|Yellow|
| Green|
|Yellow|
+------+

现在,我们已经成功地将数据读入 PySpark dataframe,让我们看看在 PySpark 中实现 one-hot-encoding 的最简单(有问题的)方法。

将字符串值转换为数字标签/索引,One-Hot-Encode 数字标签到 VectorUDT (pyspark.ml.linalg.VectorUDT)

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

indexer = StringIndexer(inputCol="Color", outputCol="ColorNumericIndex")
df = indexer.fit(df).transform(df)
ohe = OneHotEncoder(inputCol="ColorNumericIndex", outputCol="ColorOHEVector")
df = ohe.fit(df).transform(df)
df.show()

返回的结果如下:

+------+-----------------+--------------+
| Color|ColorNumericIndex|ColorOHEVector|
+------+-----------------+--------------+
|   Red|              0.0| (2,[0],[1.0])|
|   Red|              0.0| (2,[0],[1.0])|
|Yellow|              1.0| (2,[1],[1.0])|
| Green|              2.0|     (2,[],[])|
|Yellow|              1.0| (2,[1],[1.0])|
+------+-----------------+--------------+

这对于在 Spark 中完成下游建模的情况非常有用。PySpark 在将 one-hot-encoded 向量包装成密集向量 (VectorUDT) 方面做得非常出色,这在 Spark 世界中表现出色,但是它不能以 csv 格式写出向量。并且也不是人类可以理解的(尤其是对于数量很大的列)。

df.write.option("header", True).csv("ohe_result.csv")

如果我们要将上面的结果写入到CSV中就会得到提示:

pyspark.sql.utils.AnalysisException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.

下面是我们需要的解决方案。要创建可解释的 One Hot Encoder,需要为每个不同的值创建一个单独的列。这很容易使用 pyspark的 inbuiltwithColumn 函数通过传递一个 UDF(用户定义函数)作为参数来完成。

from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

我们只需要做2步:

  1. 收集列中需要进行编码的所有不同值
  2. 为每个收集的值创建一个新列,其列名格式为 <<original column name>>_ <<distinct value>>,表示记录中存在 (1) 或不存在 (0)
distinct_values = list(df.select("Color")
                       .distinct()
                       .toPandas()["Color"])

上面这段代码需要在运行代码的环境中安装pandas 0.23.2或以上版本。否则,代码将抛出以下错误:

ImportError: Pandas >= 0.23.2 must be installed; however, it was not found.

Pandas能够帮助我们处理很多事情,但实际上可能存在各种限制,例如无法在执行环境中安装新库。在这样的情况下,我们可以使用一个更 Spark 原生的方式,它不需要安装任何额外的库。

distinct_values = df.select("Color")\
                    .distinct()\
                    .rdd\
                    .flatMap(lambda x: x).collect()

要创建 One Hot Encoded 列,我们执行以下操作:

for distinct_value in distinct_values:
    function = udf(lambda item: 
                   1 if item == distinct_value else 0, 
                   IntegerType())
    new_column_name = "Color"+'_'+distinct_value
    df = df.withColumn(new_column_name, function(col("Color")))
df.show()

结果显示如下:

+------+-----------+------------+---------+
| Color|Color_Green|Color_Yellow|Color_Red|
+------+-----------+------------+---------+
|   Red|          0|           0|        1|
|   Red|          0|           0|        1|
|Yellow|          0|           1|        0|
| Green|          1|           0|        0|
|Yellow|          0|           1|        0|
+------+-----------+------------+---------+

这个dataframe可以轻松的保存成csv文件供下游的任务使用, 并且这个输出与 pandas 使用 get_dummies 函数输出其 One Hot Encoded 值的方式非常相似。

“Spark进行独热编码”的评论:

还没有评论