0


Spark 下一代机器学习教程(一)

原文:Next-Generation Machine Learning with Spark

协议:CC BY-NC-SA 4.0

一、机器学习导论

我可以给你通常的论点。但事实是,发现的前景太甜了。

—杰弗里·辛顿 我

机器学习(ML)是人工智能的一个子领域,即制造智能机器的科学和工程。 ii 人工智能的先驱之一亚瑟·塞缪尔(Arthur Samuel)将机器学习定义为“无需明确编程就能赋予计算机学习能力的研究领域。” iii 图 1-1 展示了人工智能、机器学习、深度学习之间的关系。人工智能(AI)包含其他领域,这意味着尽管所有的机器学习都是 AI,但并非所有的 AI 都是机器学习。人工智能的另一个分支,符号人工智能,是二十世纪大部分时间里占主导地位的人工智能研究范式。 iv 符号人工智能实现被称为专家系统或知识图,它们本质上是使用 if-then 语句通过演绎推理得出逻辑结论的规则引擎。你可以想象,符号人工智能受到几个关键的限制;其中最主要的是在规则引擎中定义规则后修改规则的复杂性。添加更多的规则增加了规则引擎中的知识,但是它不能改变现有的知识。 v 机器学习模型另一方面更加灵活。他们可以根据新数据接受再培训,以学习新知识或修改现有知识。象征性人工智能也涉及重大的人类干预。它依赖于人类的知识,需要人类在规则引擎中硬编码规则。另一方面,机器学习更加动态,从输入数据中学习和识别模式,以产生所需的输出。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 1-1

AI、机器学习、深度学习的关系 vi

深度学习在 2000 年代中期的复兴将人们的注意力重新集中到人工智能和机器学习的连接主义方法上。深度学习的复兴、高速图形处理单元(GPU)的可用性、大数据的出现以及谷歌、脸书、亚马逊、微软和 IBM 等公司的投资创造了推动人工智能复兴的完美风暴。

人工智能和机器学习用例

在过去的十年里,机器学习取得了一系列惊人的进步。这些突破正在扰乱我们的日常生活,并对你能想到的几乎每一个垂直领域产生影响。这绝不是机器学习用例的详尽列表,但它表明了每个行业正在发生的许多创新。

零售

零售业是最早受益于机器学习的行业之一。多年来,在线购物网站一直依赖协作和基于内容的过滤算法来个性化购物体验。在线推荐和高度针对性的营销活动为零售商创造了数百万甚至数十亿的收入。或许是 ML 驱动的在线推荐和个性化的典型代表,亚马逊是最受欢迎(也是最成功)的实现机器学习好处的在线零售商。根据麦肯锡的一项研究,亚马逊网站 35%的收入来自其推荐引擎。 vii 零售的附加机器学习应用包括货架空间规划、货架图优化、目标营销、客户细分和需求预测。

运输

几乎每个主要的汽车制造商都在研究由深度神经网络驱动的人工智能自动驾驶汽车。这些车辆配备了支持 GPU 的计算机,每秒可以处理超过 100 万亿次运算,用于实时人工智能感知、导航和路径规划。运输和物流公司,如 UPS 和 FedEx,使用机器学习进行路线和燃料优化、车队监控、预防性维护、旅行时间估计和智能地理围栏。

金融服务

预测客户终身价值(CLV)、信用风险预测和欺诈检测是金融服务的一些关键机器学习用例。对冲基金和投资银行使用机器学习来分析 Twitter Firehose 的数据,以检测可能推动市场的推文。金融服务的其他常见机器学习用例包括预测下一个最佳行动、流失预测、情感分析和多渠道营销归因。

医疗保健和生物技术

医疗保健是人工智能和机器学习研究和应用的一个重要领域。医院和医疗保健初创企业正在使用人工智能和机器学习来帮助准确诊断威胁生命的疾病,如心脏病、癌症和结核病。人工智能驱动的药物发现以及成像和诊断是人工智能最具代表性的领域 viii 。人工智能还彻底改变了生物技术和基因组学研究的方式,导致了路径分析、微阵列分析、基因预测和功能注释方面的新创新。IX

制造业

具有前瞻性思维的制造商正在使用深度学习进行质量检查,以检测硬件产品上的裂纹、边缘不平和划痕等缺陷。多年来,制造业和工业工程师一直使用生存分析来预测重型设备的故障时间。人工智能机器人正在实现制造过程的自动化,与人类机器人相比,操作速度更快,精度更高,从而提高了生产率,降低了产品缺陷。物联网(IoT)的到来和传感器数据的丰富正在扩大该行业机器学习应用的数量。

政府

机器学习在政府中有着广泛的应用。例如,公共事业公司一直在使用机器学习来监控公用事业管道。异常检测技术有助于检测泄漏和管道破裂,这些泄漏和管道破裂会导致全市范围的服务中断和数百万美元的财产损失。机器学习还被用于实时水质监测,防止污染疾病,并有可能挽救生命。为了节约能源,公有能源公司通过确定用电量的高峰和低谷,使用机器学习来相应地调整能源输出。人工智能支持的网络安全是另一个快速发展的领域,也是一个重要的政府用例,尤其是在当今时代。

机器学习和数据

机器学习模型是使用算法和数据的组合来构建的。使用强大的算法至关重要,但同样重要(有些人可能会说更重要)的是用大量高质量的数据进行训练。一般来说,数据越多,机器学习的表现越好。2001 年,微软的研究人员 Michele Banko 和 Eric Brill 在他们颇具影响力的论文“扩展到非常非常大的自然语言消歧语料库”中首次提出了这个概念。谷歌的研究主管彼得·诺维格(Peter Norvig)在他的论文《数据的不合理有效性》中进一步推广了这一概念。 x 然而,比数量更重要的是质量。每个高质量的模型都是从高质量的特性开始的。这就是特征工程进入画面的地方。特征工程是将原始数据转化为高质量特征的过程。这通常是整个机器学习过程中最艰巨的部分,但也是最重要的部分。我将在本章后面更详细地讨论特征工程。但与此同时,让我们来看看一个典型的机器学习数据集。图 1-2 显示了虹膜数据集的子集。我将在整本书的一些例子中使用这个数据集。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 1-2

机器学习数据集

观察

单行数据被称为观察值或实例。

特征

特征是观察的属性。特征是用作模型输入的独立变量。在图 1-2 中,特征是花瓣长度、花瓣宽度、萼片长度和萼片宽度。

类别标签

类别标签是数据集中的因变量。这是我们试图预测的事情。它是输出。在我们的例子中,我们试图预测鸢尾花的类型:Virginica、Setosa 和 Versicolor。

模型

模型是一种具有预测能力的数学构造。它估计数据集中自变量和因变量之间的关系。Xi

机器学习方法

有不同类型的机器学习方法。决定使用哪一个很大程度上取决于你想要完成什么以及你所拥有的原始数据的种类。

监督学习

监督学习是一种使用训练数据集进行预测的机器学习任务。监督学习可以分为分类或回归。回归用于预测连续值,如“价格”、“温度”或“距离”,而分类用于预测类别,如“是”或“否”、“垃圾邮件”或“非垃圾邮件”,或者“恶性”或“良性”。

分类

分类可能是最常见的监督机器学习任务。您很可能已经遇到过在没有意识到的情况下利用分类的应用程序。常见的使用案例包括医疗诊断、定向营销、垃圾邮件检测、信用风险预测和情感分析等。有三种分类任务:二进制、多类和多标签。

二元分类

如果只有两个类别,则任务是二元或二项式分类。例如,当使用二进制分类算法进行垃圾邮件检测时,输出变量可以有两个类别,“垃圾邮件”或“非垃圾邮件”。对于检测癌症,类别可以是“恶性”或“良性”。对于有针对性的营销,预测某人购买诸如牛奶等物品的可能性,分类可以简单地是“是”或“否”。

多类分类

多类或多项分类任务有三个或更多类别。例如,要预测天气状况,您可能有五个类别:“多雨”、“多云”、“晴天”、“下雪”和“刮风”。为了扩展我们的目标营销示例,多类别分类可用于预测客户是否更有可能购买全脂牛奶、低脂牛奶、低脂牛奶或脱脂牛奶。

多标签分类

在多标签分类中,可以为每个观察值分配多个类别。相比之下,在多类别分类中,只能将一个类别分配给一个观察。使用我们的目标营销示例,多标签分类不仅用于预测客户是否更有可能购买牛奶,还用于预测其他商品,如饼干、黄油、热狗或面包。

分类和回归算法

多年来已经开发了各种分类和回归算法。它们在方法、特征、复杂性、易用性、训练性能和预测准确性方面各不相同。我在下面的文字中描述了最受欢迎的。我在第三章中详细介绍了它们。

支持向量机

支持向量机(SVM)是一种流行的算法,它通过寻找使两个类之间的间隔最大化的最佳超平面来工作,通过尽可能宽的间隙将数据点分成单独的类。最接近分类边界的数据点称为支持向量。

逻辑回归

逻辑回归是预测概率的线性分类器。它使用逻辑(sigmoid)函数将其输出转换为可以映射到两个(二元)类的概率值。通过多项式逻辑回归(softmax)支持多类分类。 xii 当您的数据有明确的决策边界时,线性分类器(如逻辑回归)是合适的。

奈伊夫拜厄斯

朴素贝叶斯是一种基于贝叶斯定理的简单多类线性分类算法。朴素贝叶斯之所以得名,是因为它天真地假设数据集中的要素是独立的,忽略了要素之间任何可能的相关性。现实世界的情况并非如此,朴素贝叶斯仍然表现良好,尤其是在小数据集或高维数据集上。像线性分类器一样,它在非线性分类问题上表现不佳。朴素贝叶斯是一种计算效率高且高度可伸缩的算法,只需要对数据集进行一次传递。对于使用大型数据集的分类任务,这是一个很好的基线模型。它的工作原理是在给定一组特征的情况下,找出一个点属于某个类的概率。

多层感知器

多层感知器是一个前馈人工网络,由几个完全连接的节点层组成。输入图层中的节点对应于输入数据集。中间层中的节点使用逻辑(sigmoid)函数,而最终输出层中的节点使用 softmax 函数来支持多类分类。输出层中的节点数量必须与类的数量相匹配。XIII

决策树

决策树通过学习从输入变量推断出的决策规则来预测输出变量的值。从视觉上看,决策树看起来就像一棵倒置的树,根节点在顶部。每个内部节点都代表对一个属性的测试。叶节点代表一个类标签,而单个分支代表一个测试的结果。决策树很容易解释。与像逻辑回归这样的线性模型相比,决策树不需要特征缩放。它能够处理缺失的特征,并处理连续和分类特征。 xiv 一次性编码分类特征 xv 在使用决策树和基于树的集成时不是必需的,事实上是不鼓励的。独热编码创建了不平衡的树,并且要求树生长得非常深以实现良好的预测性能。对于高基数分类特征来说尤其如此。不利的一面是,决策树对数据中的噪声很敏感,有过度拟合的倾向。由于这种限制,决策树本身很少在现实生产环境中使用。如今,决策树是更强大的集成算法的基础模型,如随机森林和梯度提升树。

随机森林

随机森林是一种集成算法,它使用一组决策树进行分类和回归。它使用一种叫做bagging(bootstrap aggregation)的方法来减少方差,同时保持低偏差。Bagging 从训练数据的子集训练单独的树。除了装袋,兰登森林还采用了另一种叫做的方法装袋。与 bagging(使用观测值的子集)相反,特征 bagging 使用特征(列)的子集。特征装袋旨在减少决策树之间的相关性。如果没有特征打包,单个树将会非常相似,尤其是在只有几个主要特征的情况下。对于分类,单个树的输出或模式的多数投票成为模型的最终预测。对于回归,单个树的输出的平均值成为最终输出(图 3-3 )。Spark 并行训练几棵树,因为每棵树都是在随机森林中独立训练的。

梯度提升树

梯度推进树(GBT)是另一种类似于随机森林的基于树的集成算法。gbt 使用一种称为 boosting to 的技术从弱学习者(浅树)中创建强学习者。GBTs 按顺序训练一组决策树 xvi ,每一棵后继的树减少前一棵的误差。这是通过使用前一个模型的残差来拟合下一个模型 xvii 来完成的。该残差校正过程 xviii 被执行设定的迭代次数,迭代次数由交叉验证确定,直到残差被完全最小化。

XGBoost

XGBoost(极限梯度提升)是目前可用的最好的梯度提升树实现之一。XGBoost 于 2014 年 3 月 27 日由陈天琦发布,作为一个研究项目,它已经成为分类和回归的主流机器学习算法。XGBoost 是使用梯度推进的一般原则设计的,将弱学习者组合成强学习者。但是,虽然梯度提升树是按顺序构建的——慢慢地从数据中学习,以在后续迭代中改进其预测,但 XGBoost 是并行构建树的。

XGBoost 通过其内置的正则化来控制模型复杂性和减少过拟合,从而产生更好的预测性能。为连续特征寻找最佳分割点时,XGBoost 使用近似算法来寻找分割点。 xix 近似分裂法使用离散箱来桶化连续特征,显著加快模型训练。XGBoost 包括另一种使用基于直方图的算法的树生长方法,该方法提供了一种将连续要素分入离散箱的更有效的方法。但是,虽然近似方法每次迭代都创建一组新的面元,但是基于直方图的方法在多次迭代中重复使用面元。这种方法允许使用近似方法无法实现的额外优化,例如缓存二进制文件以及父直方图和兄弟直方图相减 xx 的能力。为了优化排序操作,XGBoost 将排序后的数据存储在内存中的块单元中。排序块可以由并行 CPU 核心高效地分配和执行。XGBoost 可以通过其加权分位数草图算法有效地处理加权数据,可以有效地处理稀疏数据,支持缓存,并通过为大型数据集利用磁盘空间来支持核外计算,因此数据不必放在内存中。XGBoost 不是核心 Spark MLlib 库的一部分,但它作为一个外部包提供。

莱特格姆

多年来,XGBoost 一直是每个人最喜欢的分类和回归算法。最近,LightGBM 成为了王位的新挑战者。它是一个相对较新的基于树的梯度提升变体,类似于 XGBoost。LightGBM 于 2016 年 10 月 17 日发布,是微软分布式机器学习工具包(DMTK)项目的一部分。它被设计成快速和分布式的,导致更快的训练速度和更低的内存使用。它支持 GPU 和并行学习以及处理大型数据集的能力。

在几个基准测试和公共数据集上的实验中,LightGBM 显示出比 XGBoost 更快的速度和更好的准确性。它比 XGBoost 有几个优点。LightGBM 利用直方图将连续特征分成离散的箱。这为 LightGBM 提供了优于 XGBoost(默认情况下,XGBoost 使用基于预排序的算法进行树学习)的几个性能优势,例如减少了内存使用、减少了计算每次分割的增益的成本,以及减少了并行学习的通信成本。LightGBM 通过对其兄弟节点和父节点执行直方图减法来计算节点的直方图,从而实现了额外的性能提升。在线基准测试显示,在某些任务中,LightGBM 比 XGBoost(不含宁滨)快 11 到 15 倍。 xxi LightGBM 通过逐叶生长(最佳优先)的方式,一般在精度上优于 XGBoost。有两种主要的策略来训练决策树,级别方式和叶方式。对于大多数基于树的集成(包括 XGBoost),逐层树生长是生长决策树的传统方式。LightGBM 引入了逐叶增长策略。与水平方向生长相比,叶方向生长通常收敛更快 xxii 并实现更低的损失。 xxiii 和 XGBoost 一样,LightGBM 不是核心 Spark MLlib 库的一部分,但它作为一个外部包提供。LightGBM 的大部分特性最近都移植到了 XGBoost 上。我将在第三章详细讨论这两种算法。

回归

回归是一种用于预测连续数值的监督机器学习任务。举几个例子来说,流行的用例包括销售和需求预测、预测股票、房屋或商品价格以及天气预报。决策树、随机森林、梯度提升树、XGBoost 和 LightGBM 也可以用于回归。我将在第三章更详细地讨论回归。

线性回归

线性回归用于检查一个或多个自变量和因变量之间的线性关系。对单个自变量和单个连续因变量之间关系的分析称为简单线性回归。多元回归是简单线性回归的扩展,用于根据多个自变量预测因变量的值。

生存回归

生存回归,也称为死亡时间分析或失败时间分析,用于预测特定事件将要发生的时间。生存回归与线性回归的主要区别在于其处理删失的能力,删失是一种缺失数据问题,事件发生的时间未知。

无监督学习

无监督学习是一种机器学习任务,它在没有标记响应的帮助下发现数据集中隐藏的模式和结构。当您只能访问输入数据,而训练数据不可用或难以获得时,无监督学习是理想的选择。常见的方法包括聚类、主题建模、异常检测、推荐和主成分分析。

使聚集

聚类是一种无监督的机器学习任务,用于对具有某些相似性的未标记观察值进行分组。流行的聚类用例包括客户细分、欺诈分析和异常检测。在训练数据缺乏或不可用的情况下,聚类也经常用于为分类器生成训练数据。

k 均值

K-Means 是最流行的无监督聚类学习算法之一。K-Means 的工作原理是随机分配质心作为每个聚类的起点。该算法基于欧几里德距离迭代地将每个数据点分配到最近的质心。然后,它通过计算属于该聚类的所有点的平均值来计算每个聚类的新质心。当达到预定义的迭代次数,或者每个数据点都被分配到其最近的质心,并且不再有可以执行的重新分配时,该算法停止迭代。

主题建模

主题模型自动导出一组文档中的主题。这些主题可用于基于内容的推荐、文档分类、维度缩减和特征化。

潜在狄利克雷分配

潜在狄利克雷分配(LDA)是由戴维·布雷、吴恩达和迈克尔·乔丹在 2003 年提出的,尽管乔纳森·k·普里查德、马修·斯蒂芬斯和彼得·唐纳利在 2000 年也提出了一种用于群体遗传学的类似算法。应用于机器学习的 LDA 是基于图形模型的,并且是第一个包含基于 GraphX 构建的 Spark MLlib 的算法。潜在狄利克雷分配广泛用于主题建模。

异常检测

异常或异常值检测可识别出明显偏离大多数数据集的罕见观察值。它经常用于发现欺诈性金融交易、识别网络安全威胁或执行预测性维护,仅举几个使用案例。

隔离森林

隔离森林是一种基于树的集成算法,用于异常检测,该算法是由刘飞东尼、婷和开发的。 xxiv 与大多数异常检测技术不同,隔离森林试图显式检测实际异常值,而不是识别正常数据点。隔离林的运行基于这样一个事实,即数据集中通常存在少量异常值,因此易于进行隔离。 xxv 从正常数据点中分离异常值是有效的,因为它需要的条件较少。相比之下,隔离正常数据点通常会涉及更多条件。与其他基于树的集合类似,隔离林建立在一组称为隔离树的决策树上,每棵树都有整个数据集的一个子集。异常分数被计算为森林中树木的平均异常分数。异常分值来自分割数据点所需的条件数量。接近 1 的异常分数表示异常,而小于 0.5 的分数表示非异常观察。

单类支持向量机

支持向量机(SVM)通过使用最佳超平面,以尽可能宽的间隔将数据点划分为单独的类别,从而对数据进行分类。在一类 SVM 中,模型根据只有一个“正常”类的数据进行训练。与正常示例不同的数据点被视为异常。

降维

当数据集中有大量要素时,降维至关重要。例如,基因组学和工业分析领域的机器学习用例通常涉及数千甚至数百万个特征。高维数使得模型更加复杂,增加了过度拟合的机会。在某一点上添加更多的特征实际上会降低模型的性能。此外,对高维数据的训练需要大量的计算资源。这些被统称为维度的诅咒。降维技术旨在克服维数灾难。

主成分分析

主成分分析(PCA)是一种无监督的机器学习技术,用于降低特征空间的维度。它检测要素之间的相关性,并生成数量减少的线性不相关要素,同时保留原始数据集中的大部分方差。这些更紧凑、线性不相关的特征被称为主成分。主成分按其解释方差的降序排列。其他降维技术包括奇异值分解(SVD)和线性判别分析(LDA)。

推荐

提供个性化推荐是机器学习最流行的应用之一。几乎每个主要零售商,如亚马逊、阿里巴巴、沃尔玛和塔吉特,都根据顾客行为提供某种个性化推荐。网飞、Hulu 和 Spotify 等流媒体服务根据用户的口味和偏好提供电影或音乐推荐。协作过滤、基于内容的过滤和关联规则学习(用于购物篮分析)是构建推荐系统最流行的方法。Spark MLlib 支持用于协作过滤的交替最小二乘法(ALS ),以及用于购物篮分析的 FP-Growth 和 PrefixSpan。

半监督学习

在某些情况下,访问带标签的数据既费钱又费时。在标记响应稀少的情况下,半监督学习结合监督和非监督学习技术来进行预测。在半监督学习中,利用未标记数据来扩充标记数据以提高模型精度。

强化学习

强化学习试图通过试错来学习,以确定哪种行为提供了最大的回报。强化学习有三个组成部分:代理(决策者或学习者)、环境(代理与之交互的内容)和动作(代理可以执行的内容)。 xxvi 这种类型的学习经常用于游戏、导航和机器人。

深度学习

深度学习是机器学习和人工智能的一个子领域,使用深度、多层人工神经网络。它促成了人工智能领域最近的许多突破。虽然深度学习可以用于更平凡的分类任务,但当应用于更复杂的问题时,如医疗诊断、面部识别、自动驾驶汽车、欺诈分析和智能语音控制助理,它的真正力量会大放异彩。 xxvii 在某些领域,深度学习已经让计算机能够匹配甚至超越人类的能力。

神经网络

神经网络是一类算法,其操作类似于人脑的互连神经元。神经网络包含由互连节点组成的多层。通常有一个输入层、一个或多个隐藏层和一个输出层。数据经由输入层通过神经网络。隐藏层通过加权连接网络处理数据。隐藏层中的节点为输入分配权重,并将其与一组系数组合。数据通过一个节点的激活函数,该函数决定了层的输出。最后,数据到达输出层,输出层产生神经网络的最终输出。 xxviii 具有几个隐含层的神经网络称为“深度”神经网络。层次越多,网络就越深,通常网络越深,学习就变得越复杂,它能解决的问题也越复杂。

卷积神经网络

卷积神经网络(简称为 convnet 或 CNN)是一种特别擅长分析图像的神经网络(尽管它们也可以应用于音频和文本数据)。卷积神经网络层中的神经元以三维方式排列:高度、宽度和深度。CNN 使用卷积层来学习其输入特征空间(图像)中的局部模式,例如纹理和边缘。相反,全连接(密集)层学习全局模式。 xxix 卷积层中的神经元仅连接到其之前层的一小部分区域,而不是像密集层那样连接到所有神经元。密集层的完全连接结构会导致大量的参数,这是低效的,并可能很快导致过度拟合。 xxx 我在第七章更详细地介绍了深度学习和深度卷积神经网络。

特征工程

特征工程是转换数据以创建可用于训练机器学习模型的特征的过程。通常,原始数据需要通过多种数据准备和提取技术进行转换。例如,对于非常大的数据集,可能需要降维。可能需要从其他要素创建新要素。基于距离的算法需要特征缩放。当分类特征被一键编码时,一些算法执行得更好。文本数据通常需要标记化和特征矢量化。将原始数据转换为特征后,对其进行评估,并选择最具预测能力的特征。

特征工程是机器学习的一个重要方面。几乎在每一个机器学习的努力中,如果要成功,生成高度相关的特征是不言自明的。不幸的是,特征工程是一项复杂而耗时的任务,通常需要领域专业知识。这是一个反复的过程,包括集思广益的特征,创建它们并研究它们对模型准确性的影响。事实上,根据福布斯的一项调查,典型的数据科学家将大部分时间用于准备数据(图 1-3 )。XXXI

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 1-3

数据准备约占数据科学家工作的 80%

特征工程任务可以分为几类:特征选择、特征重要性、特征提取和特征构造。XXXII

特征选择

特征选择是识别重要特征和消除不相关或冗余特征的重要预处理步骤。它提高了预测性能和模型训练效率,并降低了维数。必须移除不相关的特征,因为它们会对模型的准确性产生负面影响,并降低模型训练的速度。某些特征可能没有任何预测能力,或者它们可能与其他特征是冗余的。但是我们如何确定这些特征是否相关呢?领域知识至关重要。例如,如果您正在建立一个模型来预测贷款违约的概率,它有助于了解在量化信用风险时需要考虑哪些因素。你可以从借款人的债务收入比开始。还有其他借款特定的因素要考虑,如借款人的信用评分,就业时间,就业职称和婚姻状况。经济增长等整个市场的考虑也可能很重要。人口统计学和心理学信息也应该被考虑。一旦你有了一个特性列表,有几种方法可以客观地确定它们的重要性。有多种特征选择方法可帮助您为模型选择正确的特征。XXXIII

过滤方法

过滤方法使用统计技术(如卡方检验、相关系数和信息增益)为每个特征分配等级。

包装方法

包装方法使用特征的子集来训练模型。然后,您可以根据模型的性能添加或删除特征。包装器方法的常见示例有递归特征消除、向后消除和向前选择。

嵌入式方法

嵌入式方法结合了过滤器和包装器方法使用的技术。流行的例子包括套索和岭回归、正则化树和随机多项式 logit。XXXIV

特征重要性

基于树的集成(如 Random Forest、XGBoost 和 LightGBM)提供了一种基于为每个特征计算的特征重要性分数的特征选择方法。分数越高,该特性对提高模型准确性越重要。随机森林中的特征重要性也称为基于基尼系数的重要性或杂质平均减少(MDI)。随机森林的一些实现利用不同的方法来计算特征重要性,这种方法被称为基于精度的重要性或平均精度下降(MDA)。 xxxv 基于准确度的重要性是基于特征被随机置换时预测准确度的降低来计算的。我将在第三章中详细讨论随机森林、XGBoost 和 LightGBM 中的特性重要性。

相关系数是特征选择方法的基本形式。相关系数代表两个变量之间线性关系的强度。对于线性问题,您可以使用相关性来选择相关要素(要素类相关性)和识别冗余要素(要素内相关性)。

特征抽出

当数据集中有大量要素时,要素提取至关重要。例如,基因组学和工业分析领域的机器学习用例通常涉及数千甚至数百万个特征。高维数使得模型更加复杂,增加了过度拟合的机会。此外,对高维数据的训练需要大量的计算资源。特征提取通常涉及使用降维技术。主成分分析(PCA)、线性判别分析(LDA)和奇异值分解(SVD)是一些最流行的降维算法,也用于特征提取。

特征构造

为了提高模型的准确性,有时需要根据现有要素构建新要素。有几种方法可以做到这一点。您可以组合或聚合要素。在某些情况下,您可能想要拆分它们。例如,您的模型可能受益于将大多数事务数据中常见的时间戳属性拆分成几个更细粒度的属性:秒、分钟、小时、日、月和年。然后,您可能希望使用这些属性来构造更多的特性,比如 dayofweek、weekofmonth、monthofyear 等等。特征构造既是艺术,又是科学,是特征工程中最困难和最耗时的方面之一。精通要素构造通常是经验丰富的数据科学家与新手之间的区别。

模型评估

在分类中,每个数据点都有一个已知的标签和一个模型生成的预测类。通过比较每个数据点的已知标注和预测类别,结果可分为四类:预测类别为正且标注为正的真阳性(TP );预测类别为负且标注为负的真阴性(TN );预测类别为正但标注为负的假阳性(FP );预测类别为负且标注为正的假阴性(FN)。这四个值构成了大多数分类任务评估指标的基础。它们经常出现在一个叫做混淆矩阵的表格中(表格 1-1 )。

表 1-1

混淆矩阵

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

准确

准确度是分类模型的评估标准。它被定义为正确预测数除以预测总数。

)

在数据集不平衡的情况下,精确度不是理想的衡量标准。为了举例说明,考虑具有 90 个阴性和 10 个阳性样本的假设分类任务;将所有分类为负面给出 0.90 的准确度分数。精度和召回率是评估用类不平衡数据训练的模型的更好的度量。

精确

精确度被定义为真阳性的数量除以真阳性的数量加上假阳性的数量。精度显示了当模型的预测为正时,模型正确的频率。例如,如果您的模型预测了 100 个癌症发生,但其中 10 个是不正确的预测,则您的模型的精确度为 90%。在误报成本很高的情况下,精确度是一个很好的度量标准。

)

回忆

召回是一个很好的衡量标准,可以用在假阴性成本很高的情况下。召回被定义为真阳性的数量除以真阳性的数量加上假阴性的数量。

)

F1 度量

F1 测量值或 F1 分数是精确度和召回率的调和平均值或加权平均值。这是评估多类分类器的一个常见性能指标。当存在不均匀的阶级分布时,这也是一个很好的衡量标准。F1 成绩最好的是 1,最差的是 0。一个好的 F1 测量意味着你有很低的假阴性和假阳性。F1 测度定义如下:

)

受试者工作特性下的面积(AUROC)

接受者操作特征下的面积(AUROC)是用于评估二元分类器的常见性能度量。受试者工作特性(ROC)是绘制真阳性率与假阳性率的图表。曲线下面积(AUC)是 ROC 曲线下的面积。AUC 可以解释为模型对随机正例的排序高于随机负例的概率。 xxxvi 曲线下面积越大(AUROC 越接近 1.0),模型表现越好。AUROC 为 0.5 的模型是无用的,因为它的预测准确性与随机猜测一样好。

过度拟合和欠拟合

模型的糟糕表现是由过度拟合或拟合不足造成的。过度拟合是指模型过于拟合训练数据。过度拟合的模型对训练数据表现良好,但对新的、看不见的数据表现不佳。过度拟合的反义词是拟合不足。对于欠拟合,模型过于简单,并且没有学习训练数据集中的相关模式,这是因为模型过于规则或者需要训练更长时间。模型很好地适应新的、看不见的数据的能力被称为泛化。这是每个模型调整练习的目标。防止过度拟合的几种既定方法包括使用更多数据或特征子集、交叉验证、放弃、修剪、提前停止和正则化。 xxxvii 对于深度学习来说,数据增强是一种常见的正则化形式。为了减少欠拟合,建议添加更多相关特征。对于深度学习,可以考虑在一层中增加更多的节点,或者在神经网络中增加更多的层,以增加模型的容量。xxxviii

型号选择

模型选择包括评估拟合的机器学习模型,并通过尝试用用户指定的超参数组合拟合底层估计器来输出最佳模型。使用 Spark MLlib,通过 CrossValidator 和 TrainValidationSplit 估计器执行模型选择。CrossValidator 为超参数调整和模型选择执行 k 重交叉验证和网格搜索。它将数据集分成一组随机的、不重叠的分区褶皱,用作训练和测试数据集。例如,如果 k=3 折,k 折交叉验证将生成 3 个训练和测试数据集对(每个折仅用作测试数据集一次),每个数据集对使用 2/3 用于训练数据,1/3 用于测试。XXXIXTrainValidationSplit 是超参数调优的另一个估计器。与 k-fold 交叉验证(这是一种昂贵的操作)相比,TrainValidationSplit 只对每个参数组合评估一次,而不是 k 次。

摘要

这一章是机器学习的快速介绍。为了更彻底的治疗,我建议统计学习的元素,第二版。、特雷弗·哈斯蒂等人的(施普林格,2016 年)和加雷斯·詹姆斯等人的统计学习导论(施普林格,2013 年)。关于深度学习的介绍,我推荐伊恩·古德菲勒等人的深度学习(麻省理工出版社,2016)。虽然机器学习已经存在很长时间了,但使用大数据来训练机器学习模型是最近才发展起来的。作为最受欢迎的大数据框架,Spark 的独特定位是成为构建大规模企业级机器学习应用的卓越平台。让我们在下一章深入探讨 Spark 和 Spark MLlib。

参考

  1. raffi khatchadourian《DOOMSDAY 发明》,纽约人。com ,2015 年, www.newyorker.com/magazine/2015/11/23/doomsday-invention-artificial-intelligence-nick-bostrom
  2. 约翰·麦卡锡;“什么是人工智能?”,stanford.edu,2007, www-formal.stanford.edu/jmc/whatisai/node1.html
  3. 克里斯·尼科尔森;《人工智能(AI) vs 机器学习 vs 深度学习》,skimind.ai,2019, https://skymind.ai/wiki/ai-vs-machine-learning-vs-deep-learning
  4. 玛尔塔·加内洛和默里·沙纳汉;《用符号化人工智能调和深度学习:表示对象和关系》,sciencedirect.com,2019, www.sciencedirect.com/science/article/pii/S2352154618301943
  5. 克里斯·尼科尔森;“符号推理(Symbolic AI)和机器学习”;skymind.ai,2019, https://skymind.ai/wiki/symbolic-reasoning
  6. 迈克尔·科普兰;“人工智能、机器学习、深度学习有什么区别?”,nvidia.com,2016, https://blogs.nvidia.com/blog/2016/07/29/whats-difference-artificial-intelligence-machine-learning-deep-learning-ai/
  7. 伊恩·麦肯齐等人;“零售商如何才能跟上消费者”,mckinsey.com,2013, www.mckinsey.com/industries/retail/our-insights/how-retailers-can-keep-up-with-consumers
  8. CB 洞察;“从药物 R&D 到诊断学:医疗保健领域的 90+人工智能创业公司”,cbinsights.com,2019, www.cbinsights.com/research/artificial-intelligence-startups-healthcare/
  9. Ragothaman Yennamali《机器学习在生物学中的应用》,kolabtree.com,2019, www.kolabtree.com/blog/applications-of-machine-learning-in-biology/
  10. 泽维尔·阿马特里安;《在机器学习中,什么更好:更多的数据还是更好的算法》,kdnuggets.com,2015, www.kdnuggets.com/2015/06/machine-learning-more-data-better-algorithms.html
  11. 穆罕默德·古勒;“借助 Spark 进行大数据分析”,2015 年出版
  12. 阿帕奇 Spark《多项逻辑回归》,spark.apache.org,2019, https://spark.apache.org/docs/latest/ml-classification-regression.html#multinomial-logistic-regression
  13. 阿帕奇 Spark《多层感知器分类器》,spark.apache.org,2019, https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
  14. 分析 Vidhya 内容团队;《从零开始的基于树的建模完整教程(用 R & Python 编写)》,AnalyticsVidhya.com,2016, www.analyticsvidhya.com/blog/2016/04/complete-tutorial-tree-based-modeling-scratch-in-python/#one
  15. LightGBM《分类特征的最优分割》,lightgbm.readthedocs.io,2019, https://lightgbm.readthedocs.io/en/latest/Features.html
  16. 约瑟夫·布拉德利和马尼什·阿姆德;《MLlib 中的随机森林与助推》,Databricks,2015, https://databricks.com/blog/2015/01/21/random-forests-and-boosting-in-mllib.html
  17. 分析 Vidhya 内容团队;《理解 XGBoost 背后数学的端到端指南》,analyticsvidhya.com,2018, www.analyticsvidhya.com/blog/2018/09/an-end-to-end-guide-to-understand-the-math-behind-xgboost/
  18. 本·戈尔曼;“一位 Kaggle 大师解释渐变增强,”Kaggle.com,2017, http://blog.kaggle.com/2017/01/23/a-kaggle-master-explains-gradient-boosting/
  19. 莉娜·肖;《XGBoost:简明技术概述》,KDNuggets,2017, www.kdnuggets.com/2017/10/xgboost-concise-technical-overview.html
  20. Philip Hyunsu Cho“快速直方图优化生长器,8 到 10 倍加速”,DMLC,2017, https://github.com/dmlc/xgboost/issues/1950
  21. Laurae"基准测试 light GBM:light GBM 与 xgboost 相比有多快?",medium.com,2017 年
[`https://medium.com/implodinggradients/benchmarking-lightgbm-how-fast-is-lightgbm-vs-xgboost-15d224568031`](https://medium.com/implodinggradients/benchmarking-lightgbm-how-fast-is-lightgbm-vs-xgboost-15d224568031)
  1. LightGBM《在速度和内存使用上的优化》,lightgbm.readthedocs.io,2019, https://lightgbm.readthedocs.io/en/latest/Features.html
  2. 大卫·马克思;“决策树:逐叶(最佳优先)和逐级树遍历”,stackexchange.com,2018, https://datascience.stackexchange.com/questions/26699/decision-trees-leaf-wise-best-first-and-level-wise-tree-traverse
  3. 费托尼刘,婷,-周华;《隔离森林》,acm.org,2008, https://dl.acm.org/citation.cfm?id=1511387
  4. 亚历杭德罗·科雷亚·巴恩森;“利用隔离森林进行异常检测的好处”,easysol.net,2016, https://blog.easysol.net/using-isolation-forests-anamoly-detection/
  5. SAS《机器学习》,sas.com,2019, www.sas.com/en_us/insights/analytics/machine-learning.html
  6. 英伟达;《深度学习》,developer.nvidia.com,2019, https://developer.nvidia.com/deep-learning
  7. SAS《神经网络如何工作》,sas.com,2019, www.sas.com/en_us/insights/analytics/neural-networks.html
  8. 弗朗索瓦·乔莱;“计算机视觉的深度学习”,2018,用 Python 进行深度学习
  9. 安德烈·卡帕西;《卷积神经网络(CNN/conv nets)》,github.io,2019, http://cs231n.github.io/convolutional-networks/
  10. 吉尔出版社;“清洗大数据:最耗时、最不愉快的数据科学任务”,调查称,“forbes.com,2016, www.forbes.com/sites/gilpress/2016/03/23/data-preparation-most-time-consuming-least-enjoyable-data-science-task-survey-says/#680347536f63
  11. 杰森·布朗利;“发现特征工程,如何工程化特征,如何擅长”,machinelearningmastery.com,2014, https://machinelearningmastery.com/discover-feature-engineering-how-to-engineer-features-and-how-to-get-good-at-it/
  12. 杰森·布朗利;《特征选择导论》,MachineLearningMastery.com,2014, https://machinelearningmastery.com/an-introduction-to-feature-selection/
  13. 绍拉夫·考什克;《特征选择方法介绍及实例》,Analyticsvidhya.com,2016, www.analyticsvidhya.com/blog/2016/12/introduction-to-feature-selection-methods-with-an-example-or-how-to-select-the-right-variables/
  14. 杰克·霍尔;《随机森林的可变重要性是如何计算的》,DisplayR,2018, www.displayr.com/how-is-variable-importance-calculated-for-a-random-forest/
  15. 谷歌;“分类:ROC 曲线和 AUC”,developers.google.com,2019, https://developers.google.com/machine-learning/crash-course/classification/roc-and-auc
  16. 韦恩·汤普森;《机器学习最佳实践:理解泛化》,blogs.sas.com,2017, https://blogs.sas.com/content/subconsciousmusings/2017/09/05/machine-learning-best-practices-understanding-generalization/
  17. 杰森·布朗利;《深度学习神经网络如何避免过拟合》,machinelearningmaster.com,2018, https://machinelearningmastery.com/introduction-to-regularization-to-reduce-overfitting-and-improve-generalization-error/
  18. 火花;《CrossValidator》,spark.apache.org,2019, https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.tuning.CrossValidator

二、Spark 和 Spark MLlib 简介

简单的模型和大量的数据胜过基于较少数据的更复杂的模型。

—彼得·诺维格 我

Spark 是一个统一的大数据处理框架,用于处理和分析大型数据集。Spark 提供 Scala、Python、Java 和 R 的高级 API,具有强大的库,包括用于机器学习的 MLlib、用于 SQL 支持的 Spark SQL、用于实时流的 Spark Streaming 和用于图形处理的 GraphX。 ii Spark 由马泰·扎哈里亚(Matei Zaharia)在加州大学伯克利分校的 AMPLab 创立,后捐赠给阿帕奇软件基金会,于 2014 年 2 月 24 日成为顶级项目。 iii 第一版于 2017 年 5 月 30 日发布。 iv

概观

Spark 的开发是为了解决 Hadoop 最初的数据处理框架 MapReduce 的局限性。Matei Zaharia 在加州大学伯克利分校和脸书分校(他在那里实习)看到了 MapReduce 的局限性,并试图创建一个更快、更通用、多用途的数据处理框架,以处理迭代和交互式应用程序。 v 它提供了一个统一的平台(图 2-1 ),支持流式、交互式、图形处理、机器学习、批处理等多种类型的工作负载。 vi Spark 作业的运行速度比同等的 MapReduce 作业快好几倍,这是因为它具有快速的内存功能和高级 DAG(定向非循环图)执行引擎。Spark 是用 Scala 编写的,因此它是 Spark 事实上的编程接口。我们将在整本书中使用 Scala。我们将在第七章中使用 PySpark,这是用于 Spark 的 Python API,用于分布式深度学习。本章是我上一本书下一代大数据(2018 年出版)中第五章的更新版本。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-1

Apache Spark 生态系统

集群管理器

集群管理器管理和分配集群资源。Spark 支持 Spark(独立调度程序)、YARN、Mesos 和 Kubernetes 附带的独立集群管理器。

体系结构

在高层次上,Spark 将 Spark 应用程序任务的执行分布在集群节点上(图 2-2 )。每个 Spark 应用程序在其驱动程序中都有一个 SparkContext 对象。SparkContext 表示到集群管理器的连接,集群管理器为 Spark 应用程序提供计算资源。在连接到集群之后,Spark 在您的 worker 节点上获取执行器。然后 Spark 将您的应用程序代码发送给执行器。一个应用程序通常会运行一个或多个作业来响应一个 Spark 动作。然后,Spark 将每个作业划分为更小的阶段或任务的有向无环图(DAG)。然后,每个任务被分发并发送给工作节点上的执行器来执行。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-2

Apache Spark 架构

每个 Spark 应用程序都有自己的一组执行器。因为来自不同应用程序的任务在不同的 JVM 中运行,所以一个 Spark 应用程序不会干扰另一个 Spark 应用程序。这也意味着,如果不使用外部数据源,比如 HDFS 或 S3,Spark 应用程序很难共享数据。使用 Tachyon(又名 Alluxio)等堆外内存存储可以使数据共享更快更容易。我将在这一章的后面更详细地讨论 Alluxio。

执行 Spark 应用程序

您可以使用交互式 shell (spark-shell 或 pyspark)或提交应用程序(spark-submit)来执行 spark 应用程序。一些人更喜欢使用基于网络的交互式笔记本,如 Apache Zeppelin 和 Jupyter,来与 Spark 进行交互。Databricks 和 Cloudera 等商业供应商也提供了他们自己的交互式笔记本环境。我将在整章中使用火花壳。在带有集群管理器(如 YARN)的环境中,启动 Spark 应用程序有两种部署模式。

集群模式

在集群模式下,驱动程序运行在由 YARN 管理的主应用程序中。客户端可以退出而不影响应用程序的执行。以集群模式启动应用程序或 spark-shell:

spark-shell --master yarn --deploy-mode cluster

spark-submit --classmypath.myClass --master yarn --deploy-mode cluster

客户端模式

在客户端模式下,驱动程序在客户端运行。应用程序主机仅用于向 YARN 请求资源。要在客户端模式下启动应用程序或 spark-shell:

spark-shell --master yarn --deploy-mode client

spark-submit --classmypath.myClass --master yarn --deploy-mode client

火花壳简介

您通常使用交互式 shell 进行特定的数据分析或探索。也是学习 Spark API 的好工具。Spark 的交互 shell 有 Spark 或者 Python 两种版本。在下面的示例中,我们将创建一个城市 RDD,并将它们全部转换为大写字母。当您启动 spark-shell 时,会自动创建一个名为“spark”的 SparkSession,如清单 2-1 所示。

spark-shell

Spark context Web UI available at http://10.0.2.15:4041
Spark context available as'sc'(master = local[*], app id= local-1574144576837).
Spark session available as'spark'.
Welcome to
      ____              __
     / __/__  ___ _____//__
    _\ \/ _ \/ _ `/ __/  '_//___/.__/\_,_/_//_/\_\   version 2.4.4/_/

Using Scala version 2.11.12(OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :helpfor more information.

scala>val myCities = sc.parallelize(List("tokyo","new york","sydney","san francisco"))

scala>val uCities = myCities.map{x =>x.toUpperCase}

scala>uCities.collect.foreach(println)
TOKYO
NEW YORK
SYDNEY
SAN FRANCISCO

Listing 2-1Introduction to spark-shell

火花会议

如图 2-2 所示,SparkContext 支持访问所有 Spark 特性和功能。驱动程序使用 SparkContext 来访问其他上下文,如 StreamingContext、SQLContext 和 HiveContext。从 Spark 2.0 开始,SparkSession 提供了与 Spark 交互的单一入口点。Spark 1.x 中通过 SparkContext、SQLContext、HiveContext 和 StreamingContext 提供的所有功能现在都可以通过 SparkSession 访问。 vii 你可能仍然会遇到用 Spark 1.x 编写的代码。

val sparkConf = new SparkConf().setAppName("MyApp").setMaster("local")

val sc = new SparkContext(sparkConf).set("spark.executor.cores","4")

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

在 Spark 2.x 中,您不必显式创建 SparkConf、SparkContext 或 SQLContext,因为它们的所有功能都已经包含在 SparkSession 中。

val spark = SparkSession.
builder().
appName("MyApp").
config("spark.executor.cores","4").
getOrCreate()

弹性分布式数据集(RDD)

RDD 是一个有弹性的不可变分布式对象集合,跨集群中的一个或多个节点进行分区。rdd 可以通过两种类型的操作并行处理和操作:转换和操作。

Note

RDD 是 Spark 1.x 中 Spark 的主要编程接口。从 Spark 2.0 开始,数据集已经取代 RDD 成为主要的 API。由于更丰富的编程界面和更好的性能,建议用户从 RDD 切换到数据集/数据框架。我将在本章后面讨论数据集和数据帧。

创建 RDD

创建 RDD 非常简单。你可以从现有的 Scala 集合中创建一个 RDD,或者从存储在 HDFS 或 S3 的外部文件中读取。

平行放置

并行化从 Scala 集合创建一个 RDD。

val data =(1 to 5).toList
val rdd = sc.parallelize(data)
val cities = sc.parallelize(List("tokyo","new york","sydney","san francisco"))
文本文件

文本文件从储存在 HDFS 或 S3 的文本文件创建 RDD。

val rdd = sc.textFile("hdfs://master01:9000/files/mydirectory")

val rdd = sc.textFile("s3a://mybucket/files/mydata.csv")

请注意,RDD 是不可变的。数据转换会产生另一个 RDD,而不是修改当前的 RDD。RDD 操作可以分为两类:转换和行动。

转换

转换是创建新 RDD 的操作。我描述了一些最常见的转换。有关完整的列表,请参考在线 Spark 文档。

地图

Map 对 RDD 中的每个元素执行一个函数。它创建并返回结果的新 RDD。地图的返回类型不必与原始 RDD 的类型相同。

val cities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
val upperCaseCities = myCities.map{x =>x.toUpperCase}
upperCaseCities.collect.foreach(println)
TOKYO
NEW YORK
PARIS
SAN FRANCISCO

让我们展示另一个地图的例子。

val lines = sc.parallelize(List("Michael Jordan","iPhone"))

val words = lines.map(line =>line.split(" "))

words.collect

res2: Array[Array[String]]= Array(Array(Michael, Jordan), Array(iPhone))
平面地图

FlatMap 对 RDD 中的每个元素执行一个函数,然后对结果进行拼合。

val lines = sc.parallelize(List("Michael Jordan","iPhone"))

val words = lines.flatMap(line =>line.split(" "))

words.collect

res3: Array[String]= Array(Michael, Jordan, iPhone)
过滤器

Filter 返回一个仅包含与指定条件匹配的元素的 RDD。

val lines = sc.parallelize(List("Michael Jordan","iPhone","Michael Corleone"))

val words = lines.map(line =>line.split(" "))

val results = words.filter(w =>w.contains("Michael"))

results.collect

res9: Array[Array[String]]= Array(Array(Michael, Jordan), Array(Michael, Corleone))
明显的

Distinct 只返回不同的值。

val cities1 = sc.parallelize(List("tokyo","tokyo","paris","sydney"))

val cities2 = sc.parallelize(List("perth","tokyo","canberra","sydney"))

val cities3 = cities1.union(cities2)

cities3.distinct.collect.foreach(println)

sydney
perth
canberra
tokyo
paris
ReduceByKey

ReduceByKey 使用指定的 reduce 函数将值与同一个键组合在一起。

val pairRDD = sc.parallelize(List(("a",1),("b",2),("c",3),("a",30),("b",25),("a",20)))
val sumRDD = pairRDD.reduceByKey((x,y)=>x+y)
sumRDD.collect
res15: Array[(String, Int)]= Array((b,27),(a,51),(c,3))

Keys 返回只包含键的 RDD。

val rdd = sc.parallelize(List(("a","Larry"),("b","Curly"),("c","Moe")))

val keys = rdd.keys

keys.collect.foreach(println)

a
b
c
价值观念

值返回仅包含值的 RDD。

val rdd = sc.parallelize(List(("a","Larry"),("b","Curly"),("c","Moe")))

val value = rdd.values

value.collect.foreach(println)

Larry
Curly
Moe
内部连接

内部连接基于连接谓词返回两个 RDD 中所有元素的 RDD。

val data = Array((100,"Jim Hernandez"),(101,"Shane King"))
val employees = sc.parallelize(data)

val data2 = Array((100,"Glendale"),(101,"Burbank"))
val cities = sc.parallelize(data2)

val data3 = Array((100,"CA"),(101,"CA"),(102,"NY"))
val states = sc.parallelize(data3)

val record = employees.join(cities).join(states)

record.collect.foreach(println)(100,((Jim Hernandez,Glendale),CA))(101,((Shane King,Burbank),CA))
RightOuterJoin 和 LeftOuterJoin

RightOuterJoin 返回右 RDD 中元素的 RDD,即使左 RDD 中没有匹配的行。LeftOuterJoin 等效于列顺序不同的 RightOuterJoin。

val record = employees.join(cities).rightOuterJoin(states)

record.collect.foreach(println)(100,(Some((Jim Hernandez,Glendale)),CA))(102,(None,NY))(101,(Some((Shane King,Burbank)),CA))
联盟

Union 返回包含两个或更多 RDD 组合的 RDD。

val  data = Array((103,"Mark Choi","Torrance","CA"),(104,"Janet Reyes","RollingHills","CA"))
val employees = sc.parallelize(data)
val  data = Array((105,"Lester Cruz","VanNuys","CA"),(106,"John White","Inglewood","CA"))
val employees2 = sc.parallelize(data)
val rdd = sc.union([employees, employees2])
rdd.collect.foreach(println)(103,MarkChoi,Torrance,CA)(104,JanetReyes,RollingHills,CA)(105,LesterCruz,VanNuys,CA)(106,JohnWhite,Inglewood,CA)
减去

Subtract 返回仅包含第一个 RDD 中的元素的 RDD。

val data = Array((103,"Mark Choi","Torrance","CA"),(104,"Janet Reyes","Rolling Hills","CA"),(105,"Lester Cruz","Van Nuys","CA"))

val rdd = sc.parallelize(data)

val data2 = Array((103,"Mark Choi","Torrance","CA"))
val rdd2 = sc.parallelize(data2)

val employees = rdd.subtract(rdd2)

employees.collect.foreach(println)(105,LesterCruz,Van Nuys,CA)(104,JanetReyes,Rolling Hills,CA)
联合

联合减少了 RDD 中的分区数量。在大型 RDD 上执行过滤后,您可能需要使用合并。虽然过滤减少了新 RDD 消耗的数据量,但它继承了原始 RDD 的分区数量。如果新的 RDD 比原来的 RDD 小得多,它可能会有成百上千个小分区,这可能会导致性能问题。

当您想减少 Spark 在写入 HDFS 时生成的文件数量,防止可怕的“小文件”问题时,Coalesce 也很有用。每个分区作为单独的文件写入 HDFS。请注意,使用 coalesce 时,您可能会遇到性能问题,因为在写入 HDFS 时,您会有效地降低并行度。如果发生这种情况,请尝试增加分区的数量。在下面的例子中,我们只将一个拼花文件写入 HDFS。

df.coalesce(1).write.mode("append").parquet("/user/hive/warehouse/Mytable")
再分

重新分区可以减少或增加 RDD 中的分区数量。减少分区时通常会使用联合,因为它比重新分区更有效。增加分区数量有助于提高写入 HDFS 时的并行度。在下面的例子中,我们向 HDFS 写了六个拼花文件。

df.repartition(6).write.mode("append").parquet("/user/hive/warehouse/Mytable")

Note

合并通常比重新分区快。重新分区将执行完全洗牌,创建新分区并在工作节点之间平均分配数据。通过使用现有分区,联合最大限度地减少了数据移动并避免了完全洗牌。

行动

动作是向驱动程序返回值的 RDD 操作。我列出了一些最常见的动作。请参考在线 Spark 文档,了解完整的操作列表。

收集

Collect 将整个数据集作为数组返回给驱动程序。

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
myCities.collect
res2: Array[String]= Array(tokyo, new york, paris, san francisco)
数数

Count 返回数据集中元素的数量。

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
myCities.count
res3: Long =4

Take 将数据集的前 n 个元素作为数组返回。

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))
myCities.take(2)
res4: Array[String]= Array(tokyo, new york)
为每一个

Foreach 对数据集的每个元素执行一个函数。

val myCities = sc.parallelize(List("tokyo","new york","paris","san francisco"))

myCities.collect.foreach(println)

tokyo
newyork
paris
sanFrancisco
懒惰评估

Spark 支持惰性求值,这对于大数据处理至关重要。Spark 中的所有转换都是延迟计算的。Spark 不会立即执行转换。您可以继续定义更多的转换。当您最终想要最终结果时,您执行一个动作,这将导致转换被执行。

贮藏

默认情况下,每次运行操作时,都会重新执行每个转换。您可以使用 cache 或 persist 方法在内存中缓存 RDD,以避免多次重复执行转换。

蓄电池

累加器是只被“添加”的变量。它们通常用于实现计数器。在示例中,我使用累加器将数组的元素相加:

val accum = sc.longAccumulator("Accumulator 01")

sc.parallelize(Array(10,20,30,40)).foreach(x =>accum.add(x))

accum.value
res2: Long =100
广播变量

广播变量是存储在每个节点内存中的只读变量。Spark 使用高速广播算法来减少复制广播变量的网络延迟。使用广播变量在每个节点上存储数据集的副本是一种更快的方法,而不是将数据存储在 HDFS 或 S3 这样的慢速存储引擎中。

val broadcastVar = sc.broadcast(Array(10,20,30))

broadcastVar.value
res0: Array[Int]= Array(10,20,30)

Spark SQL、数据集和数据框架 API

开发 Spark SQL 是为了使处理和分析结构化数据变得更加容易。数据集类似于 RDD,因为它支持强类型,但在后台数据集有一个更有效的引擎。从 Spark 2.0 开始,数据集 API 现在是主要的编程接口。DataFrame 只是一个带有命名列的数据集,类似于关系表。Spark SQL 和 DataFrames 一起为处理和分析结构化数据提供了强大的编程接口。这里有一个关于如何使用 DataFrames API 的简单例子。

val jsonDF = spark.read.json("/jsondata/customers.json")

jsonDF.show
+---+------+--------------+-----+------+-----+|age|  city|          name|state|userid|zip|+---+------+--------------+-----+------+-----+|35|Frisco| Jonathan West|   TX|200|75034||28|Dallas|Andrea Foreman|   TX|201|75001||69| Plano|  Kirsten Jung|   TX|202|75025||52| Allen|Jessica Nguyen|   TX|203|75002|+---+------+--------------+-----+------+-----+

jsonDF.select ("age","city").show

+---+------+|age|  city|+---+------+|35|Frisco||28|Dallas||69| Plano||52| Allen|+---+------+

jsonDF.filter($"userid"<202).show()+---+------+--------------+-----+------+-----+|age|  city|          name|state|userid|zip|+---+------+--------------+-----+------+-----+|35|Frisco| Jonathan West|   TX|200|75034||28|Dallas|Andrea Foreman|   TX|201|75001|+---+------+--------------+-----+------+-----+

jsonDF.createOrReplaceTempView("jsonDF")

val df = spark.sql("SELECT userid, zip FROM jsonDF")

df.show
+------+-----+|userid|zip|+------+-----+|200|75034||201|75001||202|75025||203|75002|+------+-----+

Note

Spark 2.0 中统一了数据帧和数据集 API。DataFrame 现在只是行数据集的类型别名,其中行是通用的非类型化对象。相比之下,Dataset 是强类型对象 Dataset[T]的集合。Scala 支持强类型和非类型 API,而在 Java 中,Dataset[T]是主要的抽象。DataFrames 是 R 和 Python 的主要编程接口,因为它缺乏对编译时类型安全的支持。

Spark 数据源

读写不同的文件格式和数据源是最常见的数据处理任务之一。在我们的示例中,我们将同时使用 RDD 和 DataFrames API。

战斗支援车

Spark 为您提供了从 CSV 文件中读取数据的不同方法。您可以先将数据读入 RDD,然后将其转换为 DataFrame。

val dataRDD = sc.textFile("/sparkdata/customerdata.csv")
val parsedRDD = dataRDD.map{_.split(",")}caseclassCustomerData(customerid: Int, name: String, city: String, state: String,zip: String)
val dataDF = parsedRDD.map{ a =>CustomerData (a(0).toInt, a(1).toString, a(2).toString,a(3).toString,a(4).toString)}.toDF

从 Spark 2.0 开始,CSV 连接器已经内置。

val dataDF = spark.read.format("csv").option("header","true").load("/sparkdata/customerdata.csv")

可扩展置标语言

Databricks 有一个 Spark XML 包,可以轻松读取 XML 数据。

cat users.xml

<userid>100</userid><name>Wendell Ryan</name><city>San Diego</city><state>CA</state><zip>92102</zip><userid>101</userid><name>Alicia Thompson</name><city>Berkeley</city><state>CA</state><zip>94705</zip><userid>102</userid><name>Felipe Drummond</name><city>Palo Alto</city><state>CA</state><zip>94301</zip><userid>103</userid><name>Teresa Levine</name><city>Walnut Creek</city><state>CA</state><zip>94507</zip>

hadoop fs -mkdir /xmldata
hadoop fs -put users.xml /xmldata

spark-shell --packages  com.databricks:spark-xml_2.10:0.4.1

使用 Spark XML 创建一个数据框架。在本例中,我们指定了行标记和 XML 文件所在的 HDFS 路径。

import com.databricks.spark.xml._

val xmlDF = spark.read
            .option("rowTag","user").xml("/xmldata/users.xml");

xmlDF: org.apache.spark.sql.DataFrame =[city: string, name: string, state: string, userid: bigint,zip: bigint]

我们也来看看数据。

xmlDF.show

+------------+---------------+-----+------+-----+|        city|           name|state|userid|zip|+------------+---------------+-----+------+-----+|   San Diego|   Wendell Ryan|   CA|100|92102||    Berkeley|Alicia Thompson|   CA|101|94705||   Palo Alto|Felipe Drummond|   CA|102|94301||Walnut Creek|  Teresa Levine|   CA|103|94507|+------------+---------------+-----+------+-----+

数据

我们将创建一个 JSON 文件作为这个例子的样本数据。确保该文件位于 HDFS 名为/jsondata 的文件夹中。

cat users.json

{"userid":200,"name":"Jonathan West","city":"Frisco","state":"TX","zip":"75034","age":35}{"userid":201,"name":"Andrea Foreman","city":"Dallas","state":"TX","zip":"75001","age":28}{"userid":202,"name":"Kirsten Jung","city":"Plano","state":"TX","zip":"75025","age":69}{"userid":203,"name":"Jessica Nguyen","city":"Allen","state":"TX","zip":"75002","age":52}

从 JSON 文件创建一个数据帧。

val jsonDF = spark.read.json("/jsondata/users.json")

jsonDF: org.apache.spark.sql.DataFrame =[age: bigint, city: string, name: string, state: string, userid: bigint,zip: string]

检查日期

jsonDF.show

+---+------+--------------+-----+------+-----+|age|  city|          name|state|userid|zip|+---+------+--------------+-----+------+-----+|35|Frisco| Jonathan West|   TX|200|75034||28|Dallas|Andrea Foreman|   TX|201|75001||69| Plano|  Kirsten Jung|   TX|202|75025||52| Allen|Jessica Nguyen|   TX|203|75002|+---+------+--------------+-----+------+-----+

关系数据库和 MPP 数据库

我们在这个例子中使用 MySQL,但也支持其他关系数据库和 MPP 引擎,如 Oracle、Snowflake、Redshift、Impala、Presto 和 Azure DW。通常,只要关系数据库有 JDBC 驱动程序,就应该可以从 Spark 访问它。性能取决于您的 JDBC 驱动程序对批处理操作的支持。请查看您的 JDBC 驱动程序文档以了解更多详细信息。

mysql -u root -pmypassword

create databases salesdb;

use salesdb;

create table customers (
customerid INT,
name VARCHAR(100),
city VARCHAR(100),
state CHAR(3),zip  CHAR(5));

spark-shell --driver-class-path mysql-connector-java-5.1.40-bin.jar

启动火花壳。

将 CSV 文件读入 RDD,并将其转换为数据帧。

val dataRDD = sc.textFile("/home/hadoop/test.csv")
val parsedRDD = dataRDD.map{_.split(",")}caseclassCustomerData(customerid: Int, name: String, city: String, state: String,zip: String)

val dataDF = parsedRDD.map{ a =>CustomerData (a(0).toInt, a(1).toString, a(2).toString,a(3).toString,a(4).toString)}.toDF

将数据框注册为临时表,以便我们可以对其运行 SQL 查询。

dataDF.createOrReplaceTempView("dataDF")

让我们设置连接属性。

val jdbcUsername ="myuser"
val jdbcPassword ="mypass"
val jdbcHostname ="10.0.1.112"
val jdbcPort =3306
val jdbcDatabase ="salesdb"
val jdbcrewriteBatchedStatements ="true"
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}&rewriteBatchedStatements=${jdbcrewriteBatchedStatements}"

val connectionProperties = new java.util.Properties()

这将允许我们指定正确的保存模式

追加、覆盖等等。

import org.apache.spark.sql.SaveMode

将 SELECT 语句返回的数据插入到 MySQL salesdb 数据库中存储的 customer 表中。

spark.sql("select * from dataDF").write
          .mode(SaveMode.Append).jdbc(jdbcUrl,"customers", connectionProperties)

让我们用 JDBC 读一个表格。让我们用一些测试数据填充 MySQL 中的 users 表。确保 salesdb 数据库中存在 users 表。

mysql -u root -pmypassword

use salesdb;

describe users;+--------+--------------+------+-----+---------+-------+| Field  | Type         | Null | Key | Default | Extra |+--------+--------------+------+-----+---------+-------+| userid | bigint(20)| YES  || NULL    ||| name   | varchar(100)| YES  || NULL    ||| city   | varchar(100)| YES  || NULL    ||| state  | char(3)| YES  || NULL    |||zip| char(5)| YES  || NULL    ||| age    | tinyint(4)| YES  || NULL    ||+--------+--------------+------+-----+---------+-------+

select *from users;
Empty set(0.00 sec)

insert into users values (300,'Fred Stevens','Torrance','CA',90503,23);

insert into users values (301,'Nancy Gibbs','Valencia','CA',91354,49);

insert into users values (302,'Randy Park','Manhattan Beach','CA',90267,21);

insert into users values (303,'Victoria Loma','Rolling Hills','CA',90274,75);

select *from users;+--------+---------------+-----------------+-------+-------+------+| userid | name          | city            | state |zip| age  |+--------+---------------+-----------------+-------+-------+------+|300| Fred Stevens  | Torrance        | CA    |90503|23||301| Nancy Gibbs   | Valencia        | CA    |91354|49||302| Randy Park    | Manhattan Beach | CA    |90267|21||303| Victoria Loma | Rolling Hills   | CA    |90274|75|+--------+---------------+-----------------+-------+-------+------+

spark-shell --driver-class-path mysql-connector-java-5.1.40-bin.jar --jars mysql-connector-java-5.1.40-bin.jar

让我们设置 jdbcurl 和连接属性。

val jdbcURL = s"jdbc:mysql://10.0.1.101:3306/salesdb?user=myuser&password=mypass"

val connectionProperties = new java.util.Properties()

我们可以从整个表中创建一个数据帧。

val df = spark.read.jdbc(jdbcURL,"users", connectionProperties)

df.show

+------+-------------+---------------+-----+-----+---+|userid|         name|           city|state|zip|age|+------+-------------+---------------+-----+-----+---+|300| Fred Stevens|       Torrance|   CA|90503|23||301|  Nancy Gibbs|       Valencia|   CA|91354|49||302|   Randy Park|Manhattan Beach|   CA|90267|21||303|Victoria Loma|  Rolling Hills|   CA|90274|75|+------+-------------+---------------+-----+-----+---+

镶木地板

在拼花地板上读写很简单。

val df = spark.read.load("/sparkdata/employees.parquet")

df.select("id","firstname","lastname","salary").write
          .format("parquet").save("/sparkdata/myData.parquet")

You can run SELECT statements on Parquet files directly.

val df = spark.sql("SELECT * FROM parquet.`/sparkdata/myData.parquet`")

巴什

从 Spark 访问 HBase 有多种方式。例如,可以使用 SaveAsHadoopDataset 将数据写入 HBase。启动 HBase shell。

创建一个 HBase 表并用测试数据填充它。

hbase shell

create 'users','cf1'

启动火花壳。

spark-shell

val hconf = HBaseConfiguration.create()
val jobConf = new JobConf(hconf, this.getClass)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"users")

val num = sc.parallelize(List(1,2,3,4,5,6))

val theRDD = num.filter.map(x=>{

      val rowkey ="row"+ x

val put = new Put(Bytes.toBytes(rowkey))

      put.add(Bytes.toBytes("cf1"), Bytes.toBytes("fname"), Bytes.toBytes("my fname"+ x))(newImmutableBytesWritable, put)})
theRDD.saveAsHadoopDataset(jobConf)

您还可以使用 Spark 中的 HBase 客户端 API 来读写 HBase 中的数据。如前所述,Scala 可以访问所有 Java 库。

启动 HBase shell。创建另一个 HBase 表,并用测试数据填充它。

hbase shell

create 'employees','cf1'

put 'employees','400','cf1:name','Patrick Montalban'
put 'employees','400','cf1:city','Los Angeles'
put 'employees','400','cf1:state','CA'
put 'employees','400','cf1:zip','90010'
put 'employees','400','cf1:age','71'
put 'employees','401','cf1:name','Jillian Collins'
put 'employees','401','cf1:city','Santa Monica'
put 'employees','401','cf1:state','CA'
put 'employees','401','cf1:zip','90402'
put 'employees','401','cf1:age','45'

put 'employees','402','cf1:name','Robert Sarkisian'
put 'employees','402','cf1:city','Glendale'
put 'employees','402','cf1:state','CA'
put 'employees','402','cf1:zip','91204'
put 'employees','402','cf1:age','29'

put 'employees','403','cf1:name','Warren Porcaro'
put 'employees','403','cf1:city','Burbank'
put 'employees','403','cf1:state','CA'
put 'employees','403','cf1:zip','91523'
put 'employees','403','cf1:age','62'

让我们验证数据是否成功地插入到我们的 HBase 表中。

scan 'employees'

ROW       COLUMN+CELL
 400      column=cf1:age, timestamp=1493105325812, value=71400      column=cf1:city, timestamp=1493105325691, value=Los Angeles
 400      column=cf1:name, timestamp=1493105325644, value=Patrick Montalban
 400      column=cf1:state, timestamp=1493105325738, value=CA
 400      column=cf1:zip, timestamp=1493105325789, value=90010401      column=cf1:age, timestamp=1493105334417, value=45401      column=cf1:city, timestamp=1493105333126, value=Santa Monica
 401      column=cf1:name, timestamp=1493105333050, value=Jillian Collins
 401      column=cf1:state, timestamp=1493105333145, value=CA
 401      column=cf1:zip, timestamp=1493105333165, value=90402402      column=cf1:age, timestamp=1493105346254, value=29402      column=cf1:city, timestamp=1493105345053, value=Glendale
 402      column=cf1:name, timestamp=1493105344979, value=Robert Sarkisian
 402      column=cf1:state, timestamp=1493105345074, value=CA
 402      column=cf1:zip, timestamp=1493105345093, value=91204403      column=cf1:age, timestamp=1493105353650, value=62403      column=cf1:city, timestamp=1493105352467, value=Burbank
 403      column=cf1:name, timestamp=1493105352445, value=Warren Porcaro
 403      column=cf1:state, timestamp=1493105352513, value=CA
 403      column=cf1:zip, timestamp=1493105352549, value=91523

启动火花壳。

spark-shell

import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Get;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;

val configuration = HBaseConfiguration.create()

指定 HBase 表和行键。

val table = new HTable(configuration,"employees");
val g = new Get(Bytes.toBytes("401"))
val result = table.get(g);

从表中提取值。

val val2 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name"));
val val3 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("city"));
val val4 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("state"));
val val5 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("zip"));
val val6 = result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age"));

将这些值转换为适当的数据类型。

val id= Bytes.toString(result.getRow())
val name = Bytes.toString(val2);
val city = Bytes.toString(val3);
val state = Bytes.toString(val4);
val zip= Bytes.toString(val5);
val age = Bytes.toShort(val6);

打印数值。

println(" employee id: "+id+" name: "+ name +" city: "+ city +" state: "+ state +" zip: "+zip+" age: "+ age);

employee id:401 name: Jillian Collins city: Santa Monica state: CA zip:90402 age:13365

让我们使用 HBase API 写入 HBase。

val configuration = HBaseConfiguration.create()
val table = new HTable(configuration,"employees");

指定新的行键。

val p = new Put(new String("404").getBytes());

用新值填充单元格。

p.add("cf1".getBytes(),"name".getBytes(), new String("Denise Shulman").getBytes());
p.add("cf1".getBytes(),"city".getBytes(), new String("La Jolla").getBytes());
p.add("cf1".getBytes(),"state".getBytes(), new String("CA").getBytes());
p.add("cf1".getBytes(),"zip".getBytes(), new String("92093").getBytes());
p.add("cf1".getBytes(),"age".getBytes(), new String("56").getBytes());

写入 HBase 表。

table.put(p);
table.close();

确认值已成功插入 HBase 表中。

启动 HBase shell。

hbase shell

scan 'employees'

ROW       COLUMN+CELL
 400      column=cf1:age, timestamp=1493105325812, value=71400      column=cf1:city, timestamp=1493105325691, value=Los Angeles
 400      column=cf1:name, timestamp=1493105325644, value=Patrick Montalban
 400      column=cf1:state, timestamp=1493105325738, value=CA
 400      column=cf1:zip, timestamp=1493105325789, value=90010401      column=cf1:age, timestamp=1493105334417, value=45401      column=cf1:city, timestamp=1493105333126, value=Santa Monica
 401      column=cf1:name, timestamp=1493105333050, value=Jillian Collins
 401      column=cf1:state, timestamp=1493105333145, value=CA
 401      column=cf1:zip, timestamp=1493105333165, value=90402402      column=cf1:age, timestamp=1493105346254, value=29402      column=cf1:city, timestamp=1493105345053, value=Glendale
 402      column=cf1:name, timestamp=1493105344979, value=Robert Sarkisian

 402      column=cf1:state, timestamp=1493105345074, value=CA
 402      column=cf1:zip, timestamp=1493105345093, value=91204403      column=cf1:age, timestamp=1493105353650, value=62403      column=cf1:city, timestamp=1493105352467, value=Burbank
 403      column=cf1:name, timestamp=1493105352445, value=Warren Porcaro
 403      column=cf1:state, timestamp=1493105352513, value=CA
 403      column=cf1:zip, timestamp=1493105352549, value=91523404      column=cf1:age, timestamp=1493123890714, value=56404      column=cf1:city, timestamp=1493123890714, value=La Jolla
 404      column=cf1:name, timestamp=1493123890714, value=Denise Shulman
 404      column=cf1:state, timestamp=1493123890714, value=CA
 404      column=cf1:zip, timestamp=1493123890714, value=92093

虽然通常速度较慢,但也可以通过 SQL 查询引擎(如 Impala 或 Presto)访问 HBase。

亚马逊 S3

亚马逊 S3 是一个流行的对象存储,经常被用作临时集群的数据存储。它还是备份和冷数据的经济高效的存储方式。从 S3 读取数据就像从 HDFS 或任何其他文件系统读取数据一样。

阅读来自亚马逊 S3 的 CSV 文件。请确保您已经配置了 S3 凭据。

val myCSV = sc.textFile("s3a://mydata/customers.csv")

将 CSV 数据映射到 RDD。

import org.apache.spark.sql.Row

val myRDD = myCSV.map(_.split(',')).map(e ⇒ Row(r(0).trim.toInt, r(1), r(2).trim.toInt, r(3)))

创建一个模式。

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};

val mySchema = StructType(Array(
StructField("customerid",IntegerType,false),
StructField("customername",StringType,false),
StructField("age",IntegerType,false),
StructField("city",StringType,false)))

val myDF = spark.createDataFrame(myRDD, mySchema)

使用

您可以使用 SolrJ 从 Spark 与 Solr 进行交互。 viii

import java.net.MalformedURLException;import org.apache.solr.client.solrj.SolrServerException;import org.apache.solr.client.solrj.impl.HttpSolrServer;import org.apache.solr.client.solrj.SolrQuery;import org.apache.solr.client.solrj.response.QueryResponse;import org.apache.solr.common.SolrDocumentList;

val solr = new HttpSolrServer("http://master02:8983/solr/mycollection");

val query = new SolrQuery();

query.setQuery("*:*");
query.addFilterQuery("userid:3");
query.setFields("userid","name","age","city");
query.setStart(0);
query.set("defType","edismax");

val response = solr.query(query);
val results = response.getResults();

println(results);

从 Spark 访问 Solr 集合的一个更简单的方法是通过 spark-solr 包。Lucidworks 启动了 spark-solr 项目来提供 Spark-Solr 集成。与 solrJ 相比,使用 spark-solr 要简单和强大得多,它允许你从 Solr 集合中创建数据帧。

首先从 spark-shell 导入 JAR 文件。

spark-shell --jars spark-solr-3.0.1-shaded.jar

指定集合和连接信息。

val options = Map("collection"->"mycollection","zkhost"->"{ master02:8983/solr}")

创建一个数据框架。

val solrDF = spark.read.format("solr").options(options).load

微软优越试算表

虽然我通常不推荐从 Spark 访问 Excel 电子表格,但是某些用例需要这种能力。一家名为 Crealytics 的公司开发了一个用于与 Excel 交互的 Spark 插件。该库需要 Spark 2.x。可以使用- packages 命令行选项添加该包。

spark-shell --packages com.crealytics:spark-excel_2.11:0.9.12

从 Excel 工作表创建数据框架。

val ExcelDF = spark.read
    .format("com.crealytics.spark.excel").option("sheetName","sheet1").option("useHeader","true").option("inferSchema","true").option("treatEmptyValuesAsNulls","true").load("budget.xlsx")

将数据帧写入 Excel 工作表。

ExcelDF2.write
  .format("com.crealytics.spark.excel").option("sheetName","sheet1").option("useHeader","true").mode("overwrite").save("budget2.xlsx")

你可以在他们的 GitHub 页面上找到更多的细节:github.com/crealytics.

安全 FTP

从 SFTP 服务器下载文件和向其写入数据帧也是一个流行的请求。SpringML 提供了一个 Spark SFTP 连接器库。该库需要 Spark 2.x 并利用 jsch,这是 SSH2 的一个 Java 实现。对 SFTP 服务器的读写将作为单个进程执行。

spark-shell --packages com.springml:spark-sftp_2.11:1.1.

从 SFTP 服务器中的文件创建一个数据帧。

val sftpDF = spark.read.format("com.springml.spark.sftp").
            option("host","sftpserver.com").
            option("username","myusername").
            option("password","mypassword").
            option("inferSchema","true").
            option("fileType","csv").
            option("delimiter",",").
            load("/myftp/myfile.csv")

将数据帧作为 CSV 文件写入 FTP 服务器。

sftpDF2.write.format("com.springml.spark.sftp").
      option("host","sftpserver.com").
      option("username","myusername").
      option("password","mypassword").
      option("fileType","csv").
      option("delimiter",",").
      save("/myftp/myfile.csv")

你可以在他们的 GitHub 页面上找到更多的细节:github.com/springml/spark-sftp.

Spark MLlib 简介

机器学习是 Spark 的主要应用之一。Spark MLlib 包括用于回归、分类、聚类、协作过滤和频繁模式挖掘的流行机器学习算法。它还为构建管线、模型选择和调整以及特征选择、提取和转换提供了广泛的功能。

Spark MLlib 算法

Spark MLlib 包括大量用于各种任务的机器学习算法。我们将在接下来的章节中介绍其中的大部分。

分类

  • 逻辑回归(二项式和多项式)
  • 决策图表
  • 随机森林
  • 梯度提升树
  • 多层感知器
  • 线性支持向量机
  • 奈伊夫拜厄斯
  • 一对一休息

回归

  • 线性回归
  • 决策图表
  • 随机森林
  • 梯度提升树
  • 生存回归
  • 保序回归

聚类

  • k 均值
  • 平分 K-均值
  • 高斯混合模型
  • 潜在狄利克雷分配

协同过滤

  • 交替最小二乘法

频繁模式挖掘

  • FP-增长
  • 前缀 Span

ML 管道

Spark MLlib 的早期版本只包含一个基于 RDD 的 API。基于数据框架的 API 现在是 Spark 的主要 API。一旦基于数据帧的 API 达到特性对等,基于 RDD 的 API 在 Spark 2.3 中将被弃用。 x 基于 RDD 的 API 将在 Spark 3.0 中被移除。基于 DataFrames 的 API 通过提供更高级别的抽象来表示类似于关系数据库表的表格数据,使转换功能变得容易,这使它成为实现管道的自然选择。

Spark MLlib API 引入了几个创建机器学习管道的概念。图 2-3 显示了一个用于处理文本数据的简单 Spark MLlib 管道。记号赋予器将文本分解成一个单词包,将单词附加到输出数据帧上。词频

逆文档频率(TF

IDF)将数据帧作为输入,将单词包转换为特征向量,并将它们添加到第三个数据帧中。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-3

一个简单的 Spark MLlib 流水线

管道

流水线是创建机器学习工作流的一系列相连的阶段。一个阶段可以是一个转换器或估计器。

变压器

转换器将一个数据帧作为输入,并输出一个新的数据帧,其中附加了附加列。新数据帧包括来自输入数据帧的列和附加列。

估计量

估计器是一种机器学习算法,可以根据训练数据拟合模型。估计器接受训练数据并产生机器学习模型。

ParamGridBuilder

ParamGridBuilder 用于构建参数网格。CrossValidator 执行网格搜索,并用参数网格中用户指定的超参数组合来训练模型。

交叉验证器

CrossValidator 交叉评估拟合的机器学习模型,并通过尝试用用户指定的超参数组合拟合底层估计器来输出最佳模型。使用 CrossValidator 或 TrainValidationSplit 估计器进行模型选择。

求值程序

评估者计算你的机器学习模型的性能。它输出精度和召回率等指标来衡量拟合模型的表现。赋值器的示例包括分别用于二进制和多类分类任务的 BinaryClassificationEvaluator 和 multiclasclassificationevaluator,以及用于回归任务的 RegressionEvaluator。

特征提取、变换和选择

大多数情况下,在使用原始数据拟合模型之前,需要进行额外的预处理。例如,基于距离的算法要求特征标准化。当分类数据被一键编码时,一些算法执行得更好。文本数据通常需要标记化和特征矢量化。对于非常大的数据集,可能需要降维。Spark MLlib 包含了一个针对这些任务类型的转换器和估算器的大集合。我将讨论 Spark MLlib 中一些最常用的变压器和估算器。

StringIndexer

大多数机器学习算法不能直接处理字符串,需要数据为数字格式。StringIndexer 是一个将标签的字符串列转换为索引的估计器。它支持四种不同的方法来生成索引:alphabetDesc、alphabetAsc、frequencyDesc 和 frequencyAsc。默认值设置为 frequencyDesc,最频繁的标签设置为 0,结果按标签频率降序排序。

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0,"car"),(1,"car"),(2,"truck"),(3,"van"),(4,"van"),(5,"van"))).toDF("id","class")

df.show
+---+-----+|id|class|+---+-----+|0|  car||1|  car||2|truck||3|  van||4|  van||5|  van|+---+-----+
val model = new StringIndexer().setInputCol("class").setOutputCol("classIndex")

val indexer = model.fit(df)

val indexed = indexer.transform(df)

indexed.show()+---+-----+----------+|id|class|classIndex|+---+-----+----------+|0|  car|1.0||1|  car|1.0||2|truck|2.0||3|  van|0.0||4|  van|0.0||5|  van|0.0|+---+-----+----------+

Tokenizer

当分析文本数据时,通常有必要将句子分成单独的术语或单词。记号赋予器正是这样做的。您可以使用 RegexTokenizer 的正则表达式执行更高级的标记化。标记化通常是机器学习 NLP 流水线的第一步。我将在第四章更详细地讨论自然语言处理(NLP)。

import org.apache.spark.ml.feature.Tokenizer

val df = spark.createDataFrame(Seq((0,"Mark gave a speech last night in Laguna Beach"),(1,"Oranges are full of nutrients and low in calories"),(2,"Eddie Van Halen is amazing"))).toDF("id","sentence")

 df.show(false)+---+-------------------------------------------------+|id|sentence                                         |+---+-------------------------------------------------+|0|Mark gave a speech last night in Laguna Beach    ||1|Oranges are full of nutrients and low in calories||2|Eddie Van Halen is amazing                       |+---+-------------------------------------------------+

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")

val tokenized = tokenizer.transform(df)

tokenized.show(false)+---+-------------------------------------------------+|id|sentence                                         |+---+-------------------------------------------------+|0|Mark gave a speech last night in Laguna Beach    ||1|Oranges are full of nutrients and low in calories||2|Eddie Van Halen is amazing                       |+---+-------------------------------------------------++-----------------------------------------------------------+|words                                                      |+-----------------------------------------------------------+|[mark, gave, a, speech, last, night,in, laguna, beach]||[oranges, are, full, of, nutrients,and, low,in, calories]||[eddie, van, halen,is, amazing]|+-----------------------------------------------------------+

向量汇编器

Spark MLlib 算法要求将要素存储在单个向量列中。通常,训练数据会以表格格式出现,数据存储在单独的列中。VectorAssembler 是一个转换器,它将一组列合并成一个向量列。

import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(
  Seq((0,50000,7,1))).toDF("id","income","employment_length","marital_status")

val assembler = new VectorAssembler().setInputCols(Array("income","employment_length","marital_status")).setOutputCol("features")

val df2 = assembler.transform(df)

df2.show(false)+---+------+-----------------+--------------+-----------------+|id|income|employment_length|marital_status|features         |+---+------+-----------------+--------------+-----------------+|0|50000|7|1|[50000.0,7.0,1.0]|+---+------+-----------------+--------------+-----------------+

标准鞋匠

正如在第一章中所讨论的,一些机器学习算法需要将特征规范化才能正常工作。StandardScaler 是一种将要素归一化为单位标准差和/或零均值的估计器。它接受两个参数:withst 和 withMean。用将特征缩放到单位标准偏差。默认情况下,该参数设置为 true。将【带平均值的 设置为“真”,则在缩放之前,数据以平均值为中心。默认情况下,该参数设置为 false。

import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(
  Seq((0,186,200,56),(1,170,198,42))).toDF("id","height","weight","age")

val assembler = new VectorAssembler().setInputCols(Array("height","weight","age")).setOutputCol("features")

val df2 = assembler.transform(df)

df2.show(false)+---+------+------+---+------------------+|id|height|weight|age|features          |+---+------+------+---+------------------+|0|186|200|56|[186.0,200.0,56.0]||1|170|198|42|[170.0,198.0,42.0]|+---+------+------+---+------------------+

val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)

val model = scaler.fit(df2)

val scaledData = model.transform(df2)

scaledData.select("features","scaledFeatures").show(false)+------------------+------------------------------------------------------+|features          |scaledFeatures                                        |+------------------+------------------------------------------------------+|[186.0,200.0,56.0]|[16.440232662587228,141.42135623730948,5.656854249492]||[170.0,198.0,42.0]|[15.026019100214134,140.0071426749364,4.2426406871192]|+------------------+------------------------------------------------------+

用于重定数据比例的其他转换器包括 Normalizer、MinMaxScaler 和 MaxAbsScaler。请查看 Apache Spark 在线文档以了解更多详细信息。

停用词去除器

常用于文本分析,从字符串序列中删除停用词。停用词,如 I、the 和 a,对文档的意义没有太大贡献。

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover().setInputCol("data").setOutputCol("output")

val dataSet = spark.createDataFrame(Seq((0, Seq("She","is","a","cute","baby")),(1, Seq("Bob","never","went","to","Seattle")))).toDF("id","data")

val df = remover.transform(dataSet)

df.show(false)+---+-------------------------------+---------------------------+|id|data                           |output                     |+---+-------------------------------+---------------------------+|0|[She,is, a, cute, baby]|[cute, baby]||1|[Bob, never, went, to, Seattle]|[Bob, never, went, Seattle]|+---+-------------------------------+---------------------------+

n-克

当执行文本分析时,将术语组合成 n 元语法(文档中术语的组合)有时是有利的。创建 n 元语法有助于从文档中提取更有意义的信息。例如,“San”和“Diego”这两个词本身没有什么意义,但是将它们组合成一个词“San Diego”可以提供更多的上下文信息。我们将在第四章的后面使用 n-gram。

import org.apache.spark.ml.feature.NGram

val df = spark.createDataFrame(Seq((0, Array("Los","Angeles","Lobos","San","Francisco")),(1, Array("Stand","Book","Case","Phone","Mobile","Magazine")),(2, Array("Deep","Learning","Machine","Algorithm","Pizza")))).toDF("id","words")

val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")

val df2 = ngram.transform(df)

df2.select("ngrams").show(false)+---------------------------------------------------------------------+|ngrams                                                               |+---------------------------------------------------------------------+|[Los Angeles, Angeles Lobos, Lobos San, San Francisco]||[Stand Book, Book Case, Case Phone, Phone Mobile, Mobile Magazine]||[Deep Learning, Learning Machine, Machine Algorithm, Algorithm Pizza]|+---------------------------------------------------------------------+

onehotencoderestomator 口腔癌

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0,"Male"),(1,"Male"),(2,"Female"),(3,"Female"),(4,"Female"),(5,"Male"))).toDF("id","gender")

df.show()+---+------+|id|gender|+---+------+|0|  Male||1|  Male||2|Female||3|Female||4|Female||5|  Male|+---+------+

val indexer = new StringIndexer().setInputCol("gender").setOutputCol("genderIndex")

val indexed = indexer.fit(df).transform(df)

indexed.show()+---+------+-----------+|id|gender|genderIndex|+---+------+-----------+|0|  Male|1.0||1|  Male|1.0||2|Female|0.0||3|Female|0.0||4|Female|0.0||5|  Male|1.0|+---+------+-----------+import org.apache.spark.ml.feature.OneHotEncoderEstimator

val encoder = new OneHotEncoderEstimator().setInputCols(Array("genderIndex")).setOutputCols(Array("genderEnc"))

val encoded = encoder.fit(indexed).transform(indexed)

encoded.show()+---+------+-----------+-------------+|id|gender|genderIndex|    genderEnc|+---+------+-----------+-------------+|0|  Male|1.0|(1,[],[])||1|  Male|1.0|(1,[],[])||2|Female|0.0|(1,[0],[1.0])||3|Female|0.0|(1,[0],[1.0])||4|Female|0.0|(1,[0],[1.0])||5|  Male|1.0|(1,[],[])|+---+------+-----------+-------------+

SQL 转换器

SQLTransformer 允许您使用 SQL 执行数据转换。虚拟表“THIS”对应于输入数据集。

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0,5.2,6.7),(2,25.5,8.9))).toDF("id","col1","col2")

val transformer = new SQLTransformer().setStatement("SELECT ABS(col1 - col2) as c1, MOD(col1, col2) as c2 FROM __THIS__")

val df2 = transformer.transform(df)

df2.show()+----+-----------------+|  c1|               c2|+----+-----------------+|1.5|5.2||16.6|7.699999999999999|+----+-----------------+

术语频率–逆文档频率(TF–IDF)

TF

IDF 或词频

逆文档频率是文本分析中常用的一种特征矢量化方法。它经常被用来表示一个术语或单词对语料库中的文档的重要性。转换器 HashingTF 使用特征散列将术语转换成特征向量。估计器 IDF 对 HashingTF(或 CountVectorizer)生成的向量进行缩放。我将在第四章更详细地讨论 TF

IDF。

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val df = spark.createDataFrame(Seq((0,"Kawhi Leonard is the league MVP"),(1,"Caravaggio pioneered the Baroque technique"),(2,"Using Apache Spark is cool"))).toDF("label","sentence")

df.show(false)+-----+------------------------------------------+|label|sentence                                  |+-----+------------------------------------------+|0|Kawhi Leonard is the league MVP           ||1|Caravaggio pioneered the Baroque technique||2|Using Apache Spark is cool                |+-----+------------------------------------------+

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")

val df2 = tokenizer.transform(df)

df2.select("label","words").show(false)+-----+------------------------------------------------+|label|words                                           |+-----+------------------------------------------------+|0|[kawhi, leonard,is, the, league, mvp]||1|[caravaggio, pioneered, the, baroque, technique]||2|[using, apache, spark,is, cool]|+-----+------------------------------------------------+

val hashingTF = new HashingTF().setInputCol("words").setOutputCol("features").setNumFeatures(20)

val df3 = hashingTF.transform(df2)

df3.select("label","features").show(false)+-----+-----------------------------------------------+|label|features                                       |+-----+-----------------------------------------------+|0|(20,[1,4,6,10,11,18],[1.0,1.0,1.0,1.0,1.0,1.0])||1|(20,[1,5,10,12],[1.0,1.0,2.0,1.0])||2|(20,[1,4,5,15],[1.0,1.0,1.0,2.0])|+-----+-----------------------------------------------+
val idf = new IDF().setInputCol("features").setOutputCol("scaledFeatures")

val idfModel = idf.fit(df3)

val df4 = idfModel.transform(df3)

df4.select("label","scaledFeatures").show(3,50)+-----+--------------------------------------------------+|label|                                    scaledFeatures|+-----+--------------------------------------------------+|0|(20,[1,4,6,10,11,18],[0.0,0.28768207245178085,0...||1|(20,[1,5,10,12],[0.0,0.28768207245178085,0.5753...||2|(20,[1,4,5,15],0.0,0.28768207245178085,0.28768...|+-----+--------------------------------------------------+

主成分分析

主成分分析(PCA)是一种降维技术,它将相关特征组合成一组较小的线性不相关特征,称为主成分。PCA 在图像识别和异常检测等多个领域都有应用。我将在第 [4 章更详细地讨论 PCA。

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(4.2,5.4,8.9,6.7,9.1),
  Vectors.dense(3.3,8.2,7.0,9.0,7.2),
  Vectors.dense(6.1,1.4,2.2,4.3,2.9))
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(2).fit(df)

val result = pca.transform(df).select("pcaFeatures")

result.show(false)+---------------------------------------+|pcaFeatures                            |+---------------------------------------+|[13.62324332562565,3.1399510055159445]||[14.130156836243236,-1.432033103462711]||[3.4900743524527704,0.6866090886347056]|+---------------------------------------+

卡方选择器

ChiSqSelector 使用卡方独立性检验进行特征选择。卡方检验是一种检验两个分类变量之间关系的方法。 numTopFeatures 是默认的选择方法。它返回一组基于卡方检验的特征,或最具预测影响的特征。其他选择方法包括百分位数、fpr、fdr 和 fwe

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq((0, Vectors.dense(5.1,2.9,5.6,4.8),0.0),(1, Vectors.dense(7.3,8.1,45.2,7.6),1.0),(2, Vectors.dense(8.2,12.6,19.5,9.21),1.0))

val df = spark.createDataset(data).toDF("id","features","class")

val selector = new ChiSqSelector().setNumTopFeatures(1).setFeaturesCol("features").setLabelCol("class").setOutputCol("selectedFeatures")

val df2 = selector.fit(df).transform(df)

df2.show()+---+--------------------+-----+----------------+|id|            features|class|selectedFeatures|+---+--------------------+-----+----------------+|0|[5.1,2.9,5.6,4.8]|0.0|[5.1]||1|[7.3,8.1,45.2,7.6]|1.0|[7.3]||2|[8.2,12.6,19.5,9.21]|1.0|[8.2]|+---+--------------------+-----+----------------+

相互关系

相关性评估两个变量之间线性关系的强度。对于线性问题,您可以使用相关性来选择相关要素(要素类相关性)和识别冗余要素(要素内相关性)。Spark MLlib 支持皮尔逊和斯皮尔曼的相关性。在下面的示例中,correlation 计算输入向量的相关矩阵。

import org.apache.spark.ml.linalg.{Matrix, Vectors}import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val data = Seq(
  Vectors.dense(5.1,7.0,9.0,6.0),
  Vectors.dense(3.2,1.1,6.0,9.0),
  Vectors.dense(3.5,4.2,9.1,3.0),
  Vectors.dense(9.1,2.6,7.2,1.8))

val df = data.map(Tuple1.apply).toDF("features")+-----------------+|         features|+-----------------+|[5.1,7.0,9.0,6.0]||[3.2,1.1,6.0,9.0]||[3.5,4.2,9.1,3.0]||[9.1,2.6,7.2,1.8]|+-----------------+

val Row(c1: Matrix)= Correlation.corr(df,"features").head

c1: org.apache.spark.ml.linalg.Matrix =1.0-0.01325851107237613-0.08794286922175912-0.6536434849076798-0.013258511072376131.00.8773748081826724-0.1872850762579899-0.087942869221759120.87737480818267241.0-0.46050932066780714-0.6536434849076798-0.1872850762579899-0.460509320667807141.0

val Row(c2: Matrix)= Correlation.corr(df,"features","spearman").head

c2: org.apache.spark.ml.linalg.Matrix =1.00.3999999999999990.19999999999999898-0.80000000000000140.3999999999999991.00.8000000000000035-0.199999999999997430.199999999999998980.80000000000000351.0-0.39999999999999486-0.8000000000000014-0.19999999999999743-0.399999999999994861.0

还可以计算 DataFrame 列中存储的值的相关性,如下所示。

dataDF.show

+------------+-----------+------------+-----------+-----------+-----+|sepal_length|sepal_width|petal_length|petal_width|class|label|+------------+-----------+------------+-----------+-----------+-----+|5.1|3.5|1.4|0.2|Iris-setosa|0.0||4.9|3.0|1.4|0.2|Iris-setosa|0.0||4.7|3.2|1.3|0.2|Iris-setosa|0.0||4.6|3.1|1.5|0.2|Iris-setosa|0.0||5.0|3.6|1.4|0.2|Iris-setosa|0.0||5.4|3.9|1.7|0.4|Iris-setosa|0.0||4.6|3.4|1.4|0.3|Iris-setosa|0.0||5.0|3.4|1.5|0.2|Iris-setosa|0.0||4.4|2.9|1.4|0.2|Iris-setosa|0.0||4.9|3.1|1.5|0.1|Iris-setosa|0.0||5.4|3.7|1.5|0.2|Iris-setosa|0.0||4.8|3.4|1.6|0.2|Iris-setosa|0.0||4.8|3.0|1.4|0.1|Iris-setosa|0.0||4.3|3.0|1.1|0.1|Iris-setosa|0.0||5.8|4.0|1.2|0.2|Iris-setosa|0.0||5.7|4.4|1.5|0.4|Iris-setosa|0.0||5.4|3.9|1.3|0.4|Iris-setosa|0.0||5.1|3.5|1.4|0.3|Iris-setosa|0.0||5.7|3.8|1.7|0.3|Iris-setosa|0.0||5.1|3.8|1.5|0.3|Iris-setosa|0.0|+------------+-----------+------------+-----------+-----------+-----+

dataDF.stat.corr("petal_length","label")
res48: Double =0.9490425448523336

dataDF.stat.corr("petal_width","label")
res49: Double =0.9564638238016178

dataDF.stat.corr("sepal_length","label")
res50: Double =0.7825612318100821

dataDF.stat.corr("sepal_width","label")
res51: Double =-0.41944620026002677

评估指标

如第一章所述,精确度、召回率和准确度是评估模型性能的重要评估指标。然而,它们可能并不总是某些问题的最佳度量。

受试者工作特性下的面积(AUROC)

接受者操作特征下的面积(AUROC)是用于评估二元分类器的常见性能度量。受试者工作特性(ROC)是绘制真阳性率与假阳性率的图表。曲线下面积(AUC)是 ROC 曲线下的面积。AUC 可以解释为模型对随机正例的排序高于随机负例的概率。 xii 曲线下面积越大(AUROC 越接近 1.0),模型表现越好。AUROC 为 0.5 的模型是无用的,因为它的预测准确性与随机猜测一样好。

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC").setRawPredictionCol("rawPrediction").setLabelCol("label")

F1 度量

F1 测量值或 F1 分数是精确度和召回率的调和平均值或加权平均值。这是评估多类分类器的一个常见性能指标。当存在不均匀的阶级分布时,这也是一个很好的衡量标准。F1 成绩最好的是 1,最差的是 0。一个好的 F1 测量意味着你有很低的假阴性和假阳性。F1 度量的公式为:*F1-Measure = 2∫(精度∫召回)/(精度+召回)*。

import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val evaluator = new MulticlassClassificationEvaluator().setMetricName("f1").setLabelCol("label").setPredictionCol("prediction")

均方根误差(RMSE)

均方根误差(RMSE)是回归任务中最常见的指标。RMSE 就是均方误差(MSE)的平方根。MSE 表示回归线与一组数据点的接近程度,方法是将这些点到回归线的距离或“误差”取平方。XIIIMSE 越小,拟合越好。但是,MSE 与原始数据的单位不匹配,因为该值是平方的。RMSE 与输出的单位相同。

import org.apache.spark.ml.evaluation.RegressionEvaluator

val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")

我将在后续章节中介绍其他评估指标,如误差平方和(WSSSE)和轮廓系数。有关 Spark MLlib 支持的所有评估指标的完整列表,请参考 Spark 的在线文档。

模型持久性

Spark MLlib 允许您保存模型并在以后加载它们。如果您想要将您的模型与第三方应用程序集成,或者与团队的其他成员共享它们,这将特别有用。

保存单个随机森林模型

rf = RandomForestClassifier(numBin=10,numTrees=30)
model = rf.fit(training)
model.save("modelpath")

加载单个随机森林模型

val model2 = RandomForestClassificationModel.load("modelpath")

保存完整的管道

val pipeline = new Pipeline().setStages(Array(labelIndexer,vectorAssembler, rf))
val cv = new CrossValidator().setEstimator(pipeline)
val model = cv.fit(training)
model.save("modelpath")

加载完整的管道

val model2 = CrossValidatorModel.load("modelpath")

Spark MLlib 示例

让我们来看一个例子。我们将使用来自 UCI 机器学习知识库的心脏病数据集 xiv 来预测心脏病的存在。这些数据是由罗伯特·德特拉诺医学博士和他的团队在弗吉尼亚医学中心、长滩和克利夫兰诊所基金会收集的。历史上,克利夫兰数据集一直是众多研究的主题,因此我们将使用该数据集。原始数据集有 76 个属性,但其中只有 14 个用于 ML 研究(表 2-1 )。我们将进行二项式分类,确定患者是否患有心脏病(列表 2-2 )。

表 2-1

克利夫兰心脏病数据集属性信息
|

属性

|

描述

|
| — | — |
| 年龄 | 年龄 |
| 性 | 性 |
| 丙酸纤维素 | 胸痛型 |
| treatbps | 静息血压 |
| 胆固醇 | 血清胆固醇(毫克/分升) |
| 前沿系统 | 空腹血糖> 120 毫克/分升 |
| 尊重 | 静息心电图结果 |
| 塔尔巴赫 | 达到最大心率 |
| 考试 | 运动诱发的心绞痛 |
| 旧峰 | 相对于静息运动诱发的 ST 段压低 |
| 倾斜 | 运动 ST 段峰值的斜率 |
| 大约 | 荧光镜染色的主要血管数量(0-3) |
| 塔尔 | 铊压力测试结果 |
| 数字 | 预测属性——心脏病的诊断 |

我们开始吧。下载文件,并将其复制到 HDFS。

wget http://archive.ics.uci.edu/ml/machine-learning-databases/heart-disease/cleveland.data

head -n 10 processed.cleveland.data

63.0,1.0,1.0,145.0,233.0,1.0,2.0,150.0,0.0,2.3,3.0,0.0,6.0,067.0,1.0,4.0,160.0,286.0,0.0,2.0,108.0,1.0,1.5,2.0,3.0,3.0,267.0,1.0,4.0,120.0,229.0,0.0,2.0,129.0,1.0,2.6,2.0,2.0,7.0,137.0,1.0,3.0,130.0,250.0,0.0,0.0,187.0,0.0,3.5,3.0,0.0,3.0,041.0,0.0,2.0,130.0,204.0,0.0,2.0,172.0,0.0,1.4,1.0,0.0,3.0,056.0,1.0,2.0,120.0,236.0,0.0,0.0,178.0,0.0,0.8,1.0,0.0,3.0,062.0,0.0,4.0,140.0,268.0,0.0,2.0,160.0,0.0,3.6,3.0,2.0,3.0,357.0,0.0,4.0,120.0,354.0,0.0,0.0,163.0,1.0,0.6,1.0,0.0,3.0,063.0,1.0,4.0,130.0,254.0,0.0,2.0,147.0,0.0,1.4,2.0,1.0,7.0,253.0,1.0,4.0,140.0,203.0,1.0,2.0,155.0,1.0,3.1,3.0,0.0,7.0,1

hadoop fs -put processed.cleveland.data /tmp/data

我们使用 spark-shell 来交互式地训练我们的模型。

spark-shell

val dataDF = spark.read.format("csv").option("header","true").option("inferSchema","true").load(d("/tmp/data/processed.cleveland.data").toDF("id","age","sex","cp","trestbps","chol","fbs","restecg","thalach","exang","oldpeak","slope","ca","thal","num")

dataDF.printSchema
root
 |--id: string (nullable = false)|-- age:float(nullable = true)|-- sex:float(nullable = true)|-- cp:float(nullable = true)|-- trestbps:float(nullable = true)|-- chol:float(nullable = true)|-- fbs:float(nullable = true)|-- restecg:float(nullable = true)|-- thalach:float(nullable = true)|-- exang:float(nullable = true)|-- oldpeak:float(nullable = true)|-- slope:float(nullable = true)|-- ca:float(nullable = true)|-- thal:float(nullable = true)|-- num:float(nullable = true)

val myFeatures = Array("age","sex","cp","trestbps","chol","fbs","restecg","thalach","exang","oldpeak","slope","ca","thal","num")import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler().setInputCols(myFeatures).setOutputCol("features")

val dataDF2 = assembler.transform(dataDF)import org.apache.spark.ml.feature.StringIndexer

val labelIndexer = new StringIndexer().setInputCol("num").setOutputCol("label")

val dataDF3 = labelIndexer.fit(dataDF2).transform(dataDF2)

val dataDF4 = dataDF3.where(dataDF3("ca").isNotNull).where(dataDF3("thal").isNotNull).where(dataDF3("num").isNotNull)

val Array(trainingData, testData)= dataDF4.randomSplit(Array(0.8,0.2),101)import org.apache.spark.ml.classification.RandomForestClassifier

val rf = new RandomForestClassifier().setFeatureSubsetStrategy("auto").setSeed(101)import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")import org.apache.spark.ml.tuning.ParamGridBuilder

val pgrid = new ParamGridBuilder().addGrid(rf.maxBins, Array(10,20,30)).addGrid(rf.maxDepth, Array(5,10,15)).addGrid(rf.numTrees, Array(20,30,40)).addGrid(rf.impurity, Array("gini","entropy")).build()import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline().setStages(Array(rf))import org.apache.spark.ml.tuning.CrossValidator

val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(pgrid).setNumFolds(3)

Listing 2-2Performing Binary Classification Using Random Forest

我们现在可以拟合模型了。

val model = cv.fit(trainingData)

对测试数据进行预测。

val prediction = model.transform(testData)

我们来评价一下模型。

import org.apache.spark.ml.param.ParamMap

val pm = ParamMap(evaluator.metricName ->"areaUnderROC")

val aucTestData = evaluator.evaluate(prediction, pm)

图形处理

Spark 包括一个名为 GraphX 的图形处理框架。有一个独立的包叫做 GraphFrames,它基于 DataFrames。GraphFrames 目前不是核心 Apache Spark 的一部分。在撰写本文时,GraphX 和 GraphFrames 仍在积极开发中。XV??【我盖 GraphX】第六章第六章。

超越 Spark MLlib:第三方机器学习集成

由于无数开源贡献者以及微软和谷歌等公司,Spark 可以访问第三方框架和库的丰富生态系统。虽然我涵盖了核心 Spark MLlib 算法,但这本书专注于更强大的下一代算法和框架,如 XGBoost、LightGBM、Isolation Forest、Spark NLP 和分布式深度学习。我将在接下来的章节中介绍它们。

使用 Alluxio 优化 Spark 和 Spark MLlib

Alluxio,原名 Tachyon,是加州大学伯克利分校 AMPLab 的一个开源项目。Alluxio 是一个以内存为中心的分布式存储系统,最初是由李皓原在 2012 年作为一个研究项目开发的,当时他是 AMPLab 的一名博士生和 Apache Spark 创始人。 xvi 该项目是 Berkeley 数据分析栈(BDAS)的存储层。2015 年,李创立了 Alluxio,Inc .以实现 Alluxio 的商业化,并获得了安德森·霍洛维茨(Andre essen Horowitz)750 万美元的现金注入。如今,Alluxio 拥有来自英特尔、IBM、雅虎和 Red Hat 等全球 50 个组织的 200 多名贡献者。几家知名公司目前正在生产中使用 Alluxio,如百度、阿里巴巴、Rackspace 和巴克莱。XVII

Alluxio 可用于优化 Spark 机器学习和深度学习工作负载,方法是将超快大数据存储到超大数据集。由 Alluxio 进行的深度学习基准测试显示,当从 Alluxio 而不是 S3 读取数据时,性能有显著提高。XVIII

体系结构

Alluxio 是一个以内存为中心的分布式存储系统,旨在成为大数据事实上的存储统一层。它提供了一个虚拟化层,统一了对不同存储引擎(如本地文件系统、HDFS、S3 和 NFS)和计算框架(如 Spark、MapReduce、Hive 和 Presto)的访问。图 2-4 给你一个 Alluxio 架构的概述。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-4

Alluxio 架构概述

Alluxio 是协调数据共享和指导数据访问的中间层,同时为计算框架和大数据应用程序提供高性能低延迟的内存速度。Alluxio 与 Spark 和 Hadoop 无缝集成,只需要少量的配置更改。通过利用 Alluxio 的统一命名空间功能,应用程序只需连接到 Alluxio 即可访问存储在任何受支持的存储引擎中的数据。Alluxio 有自己的原生 API 以及 Hadoop 兼容的文件系统接口。便利类使用户能够执行最初为 Hadoop 编写的代码,而无需任何代码更改。REST API 提供了对其他语言的访问。我们将在本章的后面探讨 API。

Alluxio 的统一命名空间特性不支持关系数据库和 MPP 引擎,如 Redshift 或 Snowflake,也不支持文档数据库,如 MongoDB。当然,支持向 Alluxio 和上面提到的存储引擎写入数据。开发人员可以使用 Spark 等计算框架从红移表创建数据帧,并以 Parquet 或 CSV 格式存储在 Alluxio 文件系统中,反之亦然(图 2-5 )。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-5

Alluxio 技术架构

为什么要用 Alluxio?

显著提高大数据处理性能和可扩展性

这些年来,内存变得越来越便宜,而其性能却变得越来越快。与此同时,硬盘驱动器的性能只是略有改善。毫无疑问,在内存中处理数据比在磁盘上处理数据快一个数量级。在几乎所有的编程范例中,我们都被建议在内存中缓存数据以提高性能。Apache Spark 优于 MapReduce 的一个主要优势是它能够缓存数据。Alluxio 将这一点提升到了一个新的水平,为大数据应用程序提供的不仅仅是一个缓存层,而是一个成熟的分布式高性能以内存为中心的存储系统。

百度正在运营世界上最大的 Alluxio 集群之一,1000 个工作节点处理超过 2PB 的数据。借助 Alluxio,百度在查询和处理时间方面的性能平均提高了 10 倍,最高可达 30 倍,显著提高了百度做出重要业务决策的能力。 xix 巴克莱发表文章描述了他们与 Alluxio 的经历。巴克莱数据科学家 Gianmario Spacagna 和高级分析主管 Harry Powell 能够使用 Alluxio 将他们的 Spark 工作从数小时调整到数秒。中国最大的旅游搜索引擎之一 Qunar.com 使用 Alluxio 后,性能提升了 15 到 300 倍。 xxi

多个框架和应用程序可以以内存速度共享数据

一个典型的大数据集群有多个会话运行不同的计算框架,如 Spark 和 MapReduce。对于 Spark,每个应用程序都有自己的执行器进程,执行器中的每个任务都运行在自己的 JVM 上,将 Spark 应用程序相互隔离。这意味着 Spark(和 MapReduce)应用程序无法共享数据,除了写入 HDFS 或 S3 等存储系统。如图 2-6 所示,Spark 作业和 MapReduce 作业使用存储在 HDFS 或 S3 的相同数据。在图 2-7 中,多个 Spark 作业使用相同的数据,每个作业在自己的堆空间中存储自己版本的数据。 xxii 不仅数据会重复,通过 HDFS 或 S3 共享数据也会很慢,尤其是当你共享大量数据时。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-7

不同的工作通过 HDFS 或 S3 共享数据

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-6

不同的框架通过 HDFS 或 S3 共享数据

通过使用 Alluxio 作为堆外存储(图 2-8 ),多个框架和作业可以以内存速度共享数据,减少数据重复,提高吞吐量,减少延迟。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-8

不同的作业和框架以内存速度共享数据

在应用程序终止或出现故障时提供高可用性和持久性

在 Spark 中,执行器进程和执行器内存驻留在同一个 JVM 中,所有缓存的数据都存储在 JVM 堆空间中(图 2-9 )。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-9

Spark 作业有自己的堆内存

当作业完成或由于某种原因 JVM 由于运行时异常而崩溃时,所有缓存在堆空间中的数据都将丢失,如图 2-10 和 2-11 所示。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-11

Spark 作业崩溃或完成。堆空间丢失

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-10

火花作业崩溃或完成

解决方案是使用 Alluxio 作为堆外存储(图 2-12 )。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-12

Spark 使用 Alluxio 作为堆外存储

在这种情况下,即使 Spark JVM 崩溃,数据在 Alluxio 中仍然可用(图 2-13 和 2-14 )。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-14

Spark 作业崩溃或完成。堆空间丢失。堆外内存仍然可用

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-13

火花作业崩溃或完成

优化整体内存使用并最大限度地减少垃圾收集

通过使用 Alluxio,内存使用效率大大提高,因为数据在作业和框架之间共享,并且因为数据存储在堆外,所以垃圾收集也被最小化,从而进一步提高了作业和应用程序的性能(图 2-15 )。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 2-15

多个 Spark 和 MapReduce 作业可以访问存储在 Alluxio 中的相同数据

降低硬件要求

Alluxio 的大数据处理速度明显快于 HDFS 和 S3。IBM 的测试显示,在写入 io 方面,Alluxio 比 HDFS 快 110 倍。 xxiii 有了这样的性能,对额外硬件的需求就会减少,从而节省基础设施和许可成本。

阿帕奇 Spark 和 Alluxio

您在 Alluxio 中访问数据的方式类似于从 Spark 中访问存储在 HDFS 和 S3 的数据。

val dataRDD = sc.textFile("alluxio://localhost:19998/test01.csv")

val parsedRDD = dataRDD.map{_.split(",")}caseclassCustomerData(userid: Long, city: String, state: String, age: Short)

val dataDF = parsedRDD.map{ a =>CustomerData(a(0).toLong, a(1).toString, a(2).toString, a(3).toShort)}.toDF

dataDF.show()+------+---------------+-----+---+|userid|           city|state|age|+------+---------------+-----+---+|300|       Torrance|   CA|23||302|Manhattan Beach|   CA|21|+------+---------------+-----+---+

摘要

本章向您简要介绍了 Spark 和 Spark MLlib,足以让您掌握执行常见数据处理和机器学习任务所需的技能。我的目标是让你尽快熟悉情况。为了更彻底的治疗,比尔钱伯斯和马泰扎哈里亚(O’Reilly,2018)的《火花:权威指南》提供了对火花的全面介绍。Irfan Elahi (Apress,2019)的 Scala 编程用于大数据分析,Jason Swartz (O’Reilly,2014)的学习 Scala ,以及 Martin Odersky、Lex Spoon 和 Bill Venners (Artima,2016)的Scala 编程都是对 Scala 的很好介绍。我还介绍了 Alluxio,这是一个内存分布式计算平台,可用于优化大规模机器学习和深度学习工作负载。

参考

  1. 彼得·诺维格等人;《数据的不合理有效性》,googleuserconent.com,2009, https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/35179.pdf
  2. 火花;《星火总览》,spark.apache.org,2019, https://spark.apache.org/docs/2.2.0/
  3. 阿帕奇软件基金会;“Apache 软件基金会宣布 Apache Spark 为顶级项目”,blogs.apache.org,2014, https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces50
  4. 火花;《星火新闻》,spark.apache.org,2019, https://spark.apache.org/news/
  5. reddit“Matei Zaharia AMA,”reddit.com,2015 年,
  6. 数据块;《阿帕奇星火》,Databricks.com,2019, https://databricks.com/spark/about
  7. 数据块;《如何在 Apache Spark 2.0 中使用 SparkSession》,Databricks.com,2016, https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
  8. Solr《利用索勒吉》,lucene.apache.org,2019, https://lucene.apache.org/solr/guide/6_6/using-solrj.html
  9. Lucidworks《Lucidworks Spark/Solr 集成》,github.com,2019, https://github.com/lucidworks/spark-solr
  10. 火花;《机器学习库(MLlib)指南》,spark.apache.org, http://spark.apache.org/docs/latest/ml-guide.html
  11. 火花;《OneHotEncoderEstimator》,spark.apache.org,2019, https://spark.apache.org/docs/latest/ml-features#onehotencoderestimator
  12. 谷歌;“分类:ROC 曲线和 AUC”,developers.google.com,2019, https://developers.google.com/machine-learning/crash-course/classification/roc-and-auc
  13. 斯蒂芬妮·格伦;《均方误差:定义与实例》,statisticshowto.datasciencecentral.com,2013, www.statisticshowto.datasciencecentral.com/mean-squared-error/
  14. 安朵斯·雅诺西,威廉·施泰因布鲁恩,马蒂亚斯·普菲斯特勒,罗伯特·德特拉诺;《心脏病数据集》,archive.ics.uci.edu,1988, http://archive.ics.uci.edu/ml/datasets/heart+Disease
  15. 火花;“GraphX”,spark.apache.org,2019, https://spark.apache.org/graphx/
  16. 克里斯·马特曼;“孵化器的阿帕奇火花”,mail-archives.apache.org,2013 年,
[`http://mail-archives.apache.org/mod_mbox/incubator-general/201306.mbox/%3CCDD80F64.D5F9D%[email protected]%3E`](http://mail-archives.apache.org/mod_mbox/incubator-general/201306.mbox/%253CCDD80F64.D5F9D%2525chris.a.mattmann%2540jpl.nasa.gov%253E)
  1. 李皓原;“Alluxio,原名超光速粒子,随着 1.0 版本进入新时代,”alluxio.io,2016, www.alluxio.com/blog/alluxio-formerly-tachyon-is-entering-a-new-era-with-10-release
  2. 傅;《用 Alluxio 实现深度学习的灵活快速存储》,alluxio.io,2018, www.alluxio.io/blog/flexible-and-fast-storage-for-deep-learning-with-alluxio/
  3. Alluxio“Alluxio 虚拟化分布式存储,以内存速度进行 Pb 级计算,”globenewswire.com,2016, www.marketwired.com/press-release/alluxio-virtualizes-distributed-storage-petabyte-scale-computing-in-memory-speeds-2099053.html
  4. 亨利·鲍威尔和吉安马里奥·斯帕卡尼亚;“用超光速粒子让不可能成为可能:将火花工作从数小时加速到数秒,”dzone.com,2016, https://dzone.com/articles/Accelerate-In-Memory-Processing-with-Spark-from-Hours-to-Seconds-With-Tachyon
  5. 李皓原;“Alluxio 在 Strata+Hadoop World Beijing 2016 上的主题演讲”,slideshare.net,2016, www.slideshare.net/Alluxio/alluxio-keynote-at-stratahadoop-world-beijing-2016-65172341
  6. 费明·s。《用用例入门超光速粒子》,intel.com,2016, https://software.intel.com/en-us/blogs/2016/02/04/getting-started-with-tachyon-by-use-cases
  7. 吉尔·韦尔尼克;“用于超快大数据处理的超光速粒子”,ibm.com,2015, www.ibm.com/blogs/research/2015/08/tachyon-for-ultra-fast-big-data-processing/
标签: VKDoc

本文转载自: https://blog.csdn.net/wizardforcel/article/details/141337881
版权归原作者 绝不原创的飞龙 所有, 如有侵权,请联系我们删除。

“Spark 下一代机器学习教程(一)”的评论:

还没有评论