PDF版报告请见github地址:https://github.com/xrervip/HIT_BigDataAnalysisProject
目录
第1章 需求分析
1.1 研究问题的背景
随着线上电商业务有业务的发展,根据客户已有的购买信息为客户推荐他们可能在未来购买的商品这一需求逐渐成为了数据驱动商业智能的重要趋势;如果做好产品推荐与引导,是评估一个在线电子商务平台发展潜力的重要指标。在做产品推荐时,一个很重要的手段便是对产品进行关联分析;;关联分析做推荐时,主要用于个性化不强的场景。比如根据购买记录,通过关联分析发现群体购买习惯的内在共性,指导超市产品摆放。对于偏个性化场景,比如给目标用户推荐产品,可以先找出购买习惯与目标用户相似的人群,对此特定人群的购买记录进行关联分析,然后将分析出的规则与目标用户的购买记录结合,进行推荐。
1.2 问题的需求分析
通过发掘潜在客户,精准营销,其主要是思路是通过关联分析,发现许多购买A的用户还会购买B,即有规则A—>B,可通过有购买B产品行为的用户,找到A产品的潜在意向用户,进行精准营销。
关联分析有个最著名的案例就是:啤酒与尿布的故事:
“原来,在美国,妇女们经常会嘱咐她们的丈夫下班以后给孩子买一点尿布回来,而丈夫在买完尿布后,大都会顺手买回一瓶自己爱喝的啤酒(由此看出美国人爱喝酒)。商家通过对一年多的原始交易记录进行详细的分析,发现了这对神奇的组合。于是就毫不犹豫地将尿布与啤酒摆放在一起售卖,通过它们的关联性,互相促进销售。“啤酒与尿布”的故事一度是营销界的神话”。
输入的待分析关联规则挖掘数据如下记录所示:
示例:
\1. 柑橘类水果 人造黄油 即食汤 半成品面包
\2. 咖啡 热带水果 酸奶
\3. 全脂牛奶
\4. 奶油乳酪 肉泥 仁果类水果 酸奶
\5. 炼乳 长面包 其他蔬菜 全脂牛奶
\6. 腐蚀性清洁剂 黄油 白饭 全脂牛奶 酸奶
\7. 面包卷
\8. 瓶装啤酒 开胃酒 其他蔬菜 面包卷 超高温杀菌的牛奶
\9. 盆栽
\10. 谷物 全脂牛奶
因此我们的分析目的便是在一定的正确率要求下,获取某种购物关联规则,例如:瓶装啤酒 —>长面包,全脂牛奶 —>肉泥 等关联规则。
随着大数据时代的到来,海量数据对于以频繁项集挖掘为代表的很多传统数据挖掘算法的有效性和效率提出了挑战,因此我们可以通过大数据计算框架的手段来解决这些问题。
1.3 研究问题的挑战
频繁项集关联规则挖掘中有多种可供选择的算法,其中一种便是Apriori算法,这种算法简单、易理解、数据要求低,但是 Apriori算法本身也有很多不足之处,对数据库的扫描次数过多,而且扫描后可能产生大量的候选项集,在频繁项目集长度变大的情况下,运算时间显著增加,如何在大数据处理框架解决该问题是一个值得研究的话题。
第二章 系统设计
2.1 apriori算法概论
Apriori 算法是一种最有影响力的挖掘布尔关联规则的频繁项集算法,它是由Rakesh Agrawal 和Ramakrishnan Skrikant 提出的。它使用一种称作逐层搜索的迭代方法,k 项集用于探索(k+1) 项集。首先,找出频繁 1 项集的集合。该集合记作。用于找频繁2项集的集合 ,而 用于找,….,如此下去,直到不能找到 项集。每轮寻找 需要进行一次数据库扫描。为提高频繁项集逐层产生的效率,通常采用一种称作Apriori 性质用于压缩搜索空间。性质具体内容为:一是频繁项集的所有非空子集都必须也是频繁的,二是非频繁项集的所有父集都是非频繁的。
2.2 apriori算法基本流程
首先找出所有的频集,这些项集出现的频繁性至少不限于设定的的最小支持度。然后由频集产生强关联规则,这些规则必须满足最小支持度和最小可信度。然后使用第1步找到的频集产生期望的规则,产生只包含集合的项的所有规则,其中每一条规则的右部只有一项,这里采用的是中规则的定义。一旦这些规则被生成,那么只有那些大于预设的最小可信度的规则才被留下来。
流程如下:
第一步通过迭代,检索出事务数据库中的所有频繁项集,即支持度不低于用户设定的阈值的项集;
第二步利用频繁项集构造出满足用户最小信任度的规则。
apriori算法的基本流程图
2.3 Apache Spark简介
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Spark采用 RDD机制,也就是弹性分布式数据集,spark中的很多特性都和他有关,所以先谈一个RDD是一个分布式对象集合,本质上是一个只读的分区记录集合 ,提供了一个抽象的数据架构,不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理 ,一个RDD的不同分区可以被保存到集群中不同的节点上, 从而可以在集群中的不同节点上进行并行计算,不同RDD之间的转换操作形成依赖关系,可以实现数据流水处理,避免中间数据存储。
RDD提供了一种高度受限的共享内存模型,其是只读的记录分区的集合,不能直接修改 ,类似软件构造中的不可变类,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和 group by)而创建得到新的RDD ,这里,RDD提供了一组丰富的操作以支持常见的数据运算,分为 “动作”(Action)和“转换”(Transformation)两种类型
因此RDD具有很多的优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单,计算的中间结果持久化到内存,数据在内存中的多个RDD操作之间进行传递,避免了不必要的读写磁盘开销,放的数据可以是Java对象,避免了不必要的对象序列化和反序列化。
Apache Spark的架构设计图
2.4 系统各部分简介
2.4.1数据预处理模块
读取来自数据集的购物篮数据,然后对其进行预处理其,包含的过程有数据抽样、数据过滤、数据标准化和数据清洗,对缺失值进行补全等。
2.4.2 传统关联规则挖掘算法模块
本文为了便于与大数据处理框架下运行的算法进行比较,将传统频繁项集挖掘算法Apriori基于scala进行实现,并期望在单机单线程的环境下运行,作为后续实验分析的baseline存在。
2.4.3基于大数据处理框架的关联规则挖掘模块
在第一部分的基础上,将传统频繁项集挖掘Apriori算法在Apache Spark大数据处理框架下进行实现,并算法中使用的数据结构使用RDD进行重写,并期望在单机伪分布式的环境下运行。
2.4.4基于大数据处理框架的有限扫描关联规则挖掘模块
在第二部分大数据处理框架下的Apriori算法的基础上,面向海量数据的频繁项集改进挖掘算法,通过随机取样的方式,使算法其能够在不损失过多效果的同时,尽可能解决大规模数据带来的问题,期望在第二部分相同的环境下运行并比对效果。
第3章 工程实现:传统Apriori关联规则挖掘算法
3.1 简介
Apriori算法是第一个关联规则挖掘算法,也是最经典的算法。它利用逐层搜索的迭代方法找出数据库中项集的关系,以形成规则,其过程由连接(类矩阵运算)与剪枝(去掉那些没必要的中间结果)组成。该算法中项集的概念即为项的集合。包含K个项的集合为k项集。项集出现的频率是包含项集的事务数,称为项集的频率。如果某项集满足最小支持度,则称它为频繁项集,在本次大作业中,传统Apriori算法仅作为评估时的baseline作为对照。
3.2 算法思想
该算法的基本思想是:首先找出所有的频集,这些项集出现的频繁性至少和预定义的最小支持度一样。然后由频集产生强关联规则,这些规则必须满足最小支持度和最小可信度。然后使用第1步找到的频集产生期望的规则,产生只包含集合的项的所有关联规则。
当寻找频繁项集时,我们首先需要考虑的是频繁的定义,在Apriori中与频繁有关的是两个属性:一个是项集的支持度,另一个是置信度;支持度被定义为数据集中包含该项集的记录所占的比例;这个度量是针对项集来说的,因此可以定义一个最小支持度,在进行挖掘时只保留满足最小支持度的项集。置信度是针对一条关联规则来定义的,这条规则的可信度被定义为关联规则整体项集的概率除以关联规则左面项集的概率。
3.3 代码实现
scala实现:
主要算法描述:该算法有多个迭代组成,每个迭代可以分为两步,第一步首先检索出事务数据库中的所有频繁项集,即支持度不低于用户设定的阈值的项集;第二步再利用频繁项集构造出满足用户最小信任度的规则,按照这个规则进行多次迭代,直到不能再产生新的频繁项集时终止。
- def runApriori(minSupport: Double = 0.02, minConfidence: Double = 0.35) = {
- var itemCombs = itemSet.map(word => (Set(word), getSupport(Set(word))))
- .filter(wordSupportPair => (wordSupportPair._2 > minSupport))
- var k: Int = 1
- for (itemComb <- itemCombs) {
- this.toRetItems += (itemComb._1 -> itemComb._2)
- }
- var currentLSet: Set[Set[String]] = itemCombs.map(wordSupportPair => wordSupportPair._1).toSet
- k = k + 1
- while (currentLSet.size > 0) {
- val currentCSet: Set[Set[String]] = currentLSet.map(wordSet => currentLSet.map(wordSet1 => wordSet | wordSet1))
- .reduceRight((set1, set2) => set1 | set2)
- .filter(wordSet => (wordSet.size == k))
- val currentItemCombs = currentCSet.map(wordSet => (wordSet, getSupport(wordSet)))
- .filter(wordSupportPair => (wordSupportPair._2 > minSupport))
- currentLSet = currentItemCombs.map(wordSupportPair => wordSupportPair._1).toSet
- var associationRules: List[(Set[String], Set[String], Double)] = List()
- var CurrentToRetItems : Map[Set[String], Double] = Map()
- for (itemComb <- currentItemCombs) {
- CurrentToRetItems += (itemComb._1 -> itemComb._2)
- }
- this.toRetItems = this.toRetItems ++ CurrentToRetItems
- CurrentToRetItems.keys.foreach(item =>
- item.subsets.filter(wordSet => (wordSet.size < item.size & wordSet.size > 0))
- .foreach(subset => {
- associationRules = associationRules :+ (subset, item diff subset,
- this.toRetItems(item).toDouble / this.toRetItems(subset).toDouble)
- }
- )
- )
- this.associationRules=this.associationRules ++ associationRules.filter(rule => rule._3 > minConfidence)
- k += 1 }
在runApriori()方法中:
currentCSet代表当前的候选集合
currentItemCombs 代表从候选集合中选出的频繁项到该项支持度的映射
associationRules是根据频繁项生成的关联规则到置信度的映射
如何设置最小支持度与最小置信度并没有统一的标准,大部分情况下都是根据业务经验和现实需求设置初始值,然后经过多次调整,获取与业务相符的关联规则结果。本文中选取模型的预先输入参数为:最小支持度0.02、最小置信度0.35,可以通过实际测试以及考虑预期结果来获得几个比较好的参数。
3.4 程序运行输出
在
java version “1.8.0_251”
scala version 2.12.12的环境下运行
最小支持度0.02
最小置信度0.35
我们选择购物篮数据集进行测试,这个数据集数据总条目为9835条,我们取前10条展示:
- 柑橘类水果 人造黄油 即食汤 半成品面包
- 咖啡 热带水果 酸奶
- 全脂牛奶
- 奶油乳酪 肉泥 仁果类水果 酸奶
- 炼乳 长面包 其他蔬菜 全脂牛奶
- 腐蚀性清洁剂 黄油 白饭 全脂牛奶 酸奶
- 面包卷
- 瓶装啤酒 开胃酒 其他蔬菜 面包卷 超高温杀菌的牛奶
- 盆栽
- 谷物 全脂牛奶
算法运行产生的输出结果:
- (Set(本地蛋类),Set(其他蔬菜),0.35096153846153844)
- (Set(本地蛋类),Set(全脂牛奶),0.47275641025641024)
- (Set(仁果类水果),Set(全脂牛奶),0.3978494623655914)
- (Set(黑面包),Set(全脂牛奶),0.3887147335423197)
- (Set(酸奶油),Set(其他蔬菜),0.40283687943262414)
- (Set(柑橘类水果),Set(全脂牛奶),0.36855036855036855)
- (Set(热带水果),Set(全脂牛奶),0.40310077519379844)
- (Set(黄油),Set(全脂牛奶),0.4972477064220184)
- (Set(牛肉),Set(全脂牛奶),0.4050387596899225)
- (Set(猪肉),Set(其他蔬菜),0.37566137566137564)
- (Set(根茎类蔬菜),Set(全脂牛奶),0.44869402985074625)
- (Set(冷冻蔬菜),Set(全脂牛奶),0.4249471458773784)
- (Set(其他蔬菜),Set(全脂牛奶),0.38675775091960063)
- (Set(黄油),Set(其他蔬菜),0.3614678899082569)
- (Set(根茎类蔬菜),Set(其他蔬菜),0.43470149253731344)
- (Set(猪肉),Set(全脂牛奶),0.3844797178130512)
- (Set(酸奶),Set(全脂牛奶),0.40160349854227406)
- (Set(人造黄油),Set(全脂牛奶),0.4131944444444444)
- (Set(凝乳),Set(全脂牛奶),0.4904580152671756)
- (Set(酸奶油),Set(全脂牛奶),0.449645390070922)
- (Set(水果/蔬菜汁),Set(全脂牛奶),0.36849507735583686)
- (Set(糕点),Set(全脂牛奶),0.3737142857142857)
- (Set(其他蔬菜, 酸奶),Set(全脂牛奶),0.5128805620608898)
- (Set(全脂牛奶, 酸奶),Set(其他蔬菜),0.39745916515426494)
- (Set(其他蔬菜, 根茎类蔬菜),Set(全脂牛奶),0.4892703862660944)
- (Set(全脂牛奶, 根茎类蔬菜),Set(其他蔬菜),0.47401247401247404)
从输出中选择一项来说明,例如(Set(其他蔬菜, 根茎类蔬菜),Set(全脂牛奶),0.4892703862660944),说明顾客购买其他蔬菜,根茎类蔬菜的前提下,再购买全脂牛奶的概率大约有48.92%
程序运行时间统计:
运行时间统计:
- 数据总条目9835
- 加载总用时: 843ms
- 生成k=1层级的频繁项用时: 481ms
- -——---------
- 产生k=2层级的候选项数目: 1711
- 生成k=2的所有可能的候选项集用时: 27ms
- 生成k=2频繁项用时: 9474ms
- 产生k=2层级的频繁项目: 61条
- 生成k=2层级的关联规则用时: :52ms
- -——---------
- 产生k=3层级的候选项数目: 477
- 生成k=3的所有可能的候选项集用时: 11ms
- 生成k=3频繁项用时: 2703ms
- 产生k=3层级的频繁项目: 2条
- 生成k=3层级的关联规则用时: :1ms
- -——---------
- 产生k=4层级的候选项数目: 1
- 生成k=4的所有可能的候选项集用时: 0ms
- 生成k=4频繁项用时: 7ms
- 产生k=4层级的频繁项目: 0条
- 生成k=4层级的关联规则用时: :0ms
可以看出算法运行中大部分时间用于生成频繁项
3.5 缺点和不足
首先,在每一步产生侯选项目集时循环产生的组合过多,没有排除不应该参与组合的元素; 其次,在每次计算项集的支持度时,都对数据集中的全部记录进行了一遍扫描比较,这种扫描会大大增加计算系统的I/O开销,代价是随着数据集中记录数目的增加访存时间会呈现出大幅度的增加,因此需要设计更好性能的算法或是使用大数据计算框架进行处理。
第4章 工程实现:基于大数据处理框架关联规则挖掘算法
4.1 简介
Spark框架是一种分布式计算框架,相对于Hadoop其具有改进的数据流处理的批处理框架,基于内存计算及处理优化,保证了集群计算的效率,同时Spark 提供了更多灵活可用的数据操作,比如 filter, union, join, 以及各种对 key value pair 的方便操作,甚至提供了一个通用接口,让用户根据需要开发定制的数据操作,因此我们可以在 Spark平台上实现 Apriori 频繁项集挖掘的并行化算法。
4.2算法思想
再次分析Apriori算法,我们可以看出它产生频繁项集有两个特点:第一,它是逐层的,即从频繁1-项集逐个产生到频繁k-项集;第二,每次迭代后都由前一次产生的频繁项来产生新的候选项,然后对新产生的候选项集进行支持度计数进而得到新的频繁项集。
根据以上的特点,我们可以将算法分为两个阶段:第一个阶段,从HDFS上获取原始的数据集,将所有的事务转化为Spark RDD并利用cache()缓存到分布式内存中。然后在该RDD上扫描所有的事务,进行支持度计数,产生频繁1-项集。第二个阶段则从频繁1-项集开始,使用频繁k-项集去产生频繁k+1项集,不断迭代下去,直到不能产生新的频繁项集。在算法运行时,主节点每次迭代时需要将候选项集以broadcast()的形式分发到每个从节点,每个从节点收到之后进行一系列的MapReduce操作进而得到新的频繁项集,如此反复直至求得最大频繁项集。
4.3代码实现
scala on spark
首先配置sparkContent与HDFS,使用伪分布式实现算法
//配置sparkContent
- val conf = new SparkConf().setAppName(“Apriori”).setMaster(“local”)
- val sc = new SparkContext(conf)
- //配置HDFS
- val hdfsConf = new Configuration()
- val hdfs = FileSystem.get(hdfsConf)
- //设置输出文件
- val path: Path = new Path(output)
- if (hdfs.exists(path)) {
- hdfs.delete(path, true)
- }
利用sparkContent.textFile()读入数据,然后使用将其利用cache()缓存到分布式内存
- val transactions = sc.textFile(input)
- .map(x => {
- val elementSet = x.split(“\s+”).toSet
- elementSet
- }).cache()
- val transactions_num = transactions.count().toDouble
首先通过支持度计数来计算频繁1项集,以用于后续的迭代计算,在每一个事务中执行flatMap函数,将其中项集的所有元素都取出来铺平,再在该基础上执行map函数,将原有的Item映射为:<Set(Item), 1/数据数目>的key/value形式,接下来执行reduceByKey函数,用于统计每一个候选1-项集的支持度,并利用支持度阈值进行过滤,将支持度大于支持度阈值的项集生成频繁1-项集,完成第一阶段,下面是第一阶段的算法伪代码:
- val L_1 = transactions.flatMap(_.seq)
- .map(item => (Set(item), 1 / transactions_num))
- .reduceByKey((supportA, supportB) => supportA + supportB)
- .filter(item => item._2 >= minSupport)
在第二阶段,当频繁k-项集计算结束后,首先对计算得到分布式RDD利用collect()方法进行收集,再将其转化为< itemSet >的集合形式进行存储 ,在生成候选项时,先通过map函数将频繁k-项集中的项集进行组合,并使用reduceRight() 从末尾向前将集合进行合并,使用items.size == k的条件对生成的候选项进行过滤,选出频繁k+1-项集的候选集合,最后利用 broadcast()广播到每个工作节点
- var currentLSet = L_1.collect().map(item => item._1).toSet
- //当候选项集不为空时,持续迭代
- while (!currentLSet.isEmpty) {
- //从之前挖掘生成的频繁项中组合生成候选项
- var currentCSet: Set[Set[String]] = currentLSet
- .map(itemsA => currentLSet.map(itemsB => itemsA | itemsB))
- .reduceRight((set1, set2) => set1 | set2)
- .filter(items => (items.size == k))
- // 将候选项集广播到各个分区
- val broadcastCurrentC = sc.broadcast(currentCSet)
接下来,通过flatMap函数获取每个候选项集在原始事务集中的支持度,进一步对每个候选项使用map函数得到<ItemSet, 1/数据数目>的key/value形式,之后通过reduceBykey函数对每个事务的最终的支持度进行收集计数,并利用支持度阈值进行过滤,将支持度大于支持度阈值的项集生成频繁(k+1)-项集
- //生成新的k-频繁项组合
- val ItemFreqCombs = transactions
- .flatMap(transaction => broadcastCurrentC.value.filter(Candidate => Candidate.subsetOf(transaction)).map(Candidate => (Candidate, 1 / transactions_num)))
- .reduceByKey((supportA, supportB) => supportA + supportB)
- .filter(items => items._2 >= minSupport)
- //ItemFreqCombs.saveAsTextFile(output + “/” + “freq” + k)
- //利用计算结果收集新的频繁项集合
- currentLSet = ItemFreqCombs.collect().map(item => item._1).toSet
如果计算得到的频繁(k+1)-项集非空的话,需要根据迭代生成的频繁项集来计算关联规则,利用频繁(k+1)-项集建立频繁项索引toRetItems,并利用 broadcast()广播将其到每个工作节点,对于每个频繁(k+1)-项集,我们都可以将其划分为一个大小为k的频繁项集以及一个频繁项,可以通过遍历对应阶数产生的关联规则左项的长度来构造;这里以频繁项集A与繁项集b为例,首先需要从数据集中计算A->b中左项A的频度,再从toRetItems中索引Ab频繁项的频度即可计算规则A->b的置信度,再利用置信度阈值进行过滤,将置信度大于支持度阈值的项集生成需要的关联规则。
- //产生的频繁项组合数目>0,计算关联规则
- if (currentLSet.size > 0) {
- var toRetItems: Map[Set[String], Double] = Map()
- for (elems <- ItemFreqCombs.collect()) {
- toRetItems += (elems._1 -> elems._2)
- }
- val broadcastToRetItems = sc.broadcast(toRetItems)
- val TimeFlagD = System.nanoTime()
- //生成关联规则
- val associationRules = transactions
- .flatMap(transaction =>
- for {
- set <- broadcastToRetItems.value.keys.toArray
- i <- 1 until k
- subset <- set.subsets(i)
- if (subset.subsetOf(transaction))
- diff = set.diff(subset)
- } yield ((set – diff, diff), 1 / transactions_num))
- .reduceByKey(_ + _)
- .map(x => ((x._1._1, x._1._2), broadcastToRetItems.value.get(x._1._1.union(x._1._2)).getOrElse(0.0) / x._2))
- .filter(x => x._2 >= minConfidence)
4.4程序运行与输出
在
java version “1.8.0_251”
hadoop-2.7.7
spark-3.0.1-bin-hadoop2.7
scala version 2.12.12的环境下运行
最小支持度0.02
最小置信度0.35,
同样选择小规模的购物篮数据集进行测试,这个数据集数据总条目为9835条
k=2阶关联规则
- ((Set(凝乳),Set(全脂牛奶)),0.4904580152671819)
- ((Set(糕点),Set(全脂牛奶)),0.3737142857142852)
- ((Set(猪肉),Set(全脂牛奶)),0.38447971781305657)
- ((Set(本地蛋类),Set(全脂牛奶)),0.47275641025641746)
- ((Set(柑橘类水果),Set(全脂牛奶)),0.36855036855036977)
- ((Set(黄油),Set(其他蔬菜)),0.3614678899082616)
- ((Set(猪肉),Set(其他蔬菜)),0.3756613756613809)
- ((Set(热带水果),Set(全脂牛奶)),0.40310077519379306)
- ((Set(酸奶油),Set(全脂牛奶)),0.4496453900709258)
- ((Set(水果/蔬菜汁),Set(全脂牛奶)),0.3684950773558401)
- ((Set(酸奶油),Set(其他蔬菜)),0.40283687943262786)
- ((Set(冷冻蔬菜),Set(全脂牛奶)),0.4249471458773828)
- ((Set(黄油),Set(全脂牛奶)),0.4972477064220252)
- ((Set(根茎类蔬菜),Set(其他蔬菜)),0.434701492537306)
- ((Set(根茎类蔬菜),Set(全脂牛奶)),0.4486940298507383)
- ((Set(黑面包),Set(全脂牛奶)),0.3887147335423251)
- ((Set(其他蔬菜),Set(全脂牛奶)),0.386757750919609)
- ((Set(酸奶),Set(全脂牛奶)),0.40160349854226896)
- ((Set(本地蛋类),Set(其他蔬菜)),0.3509615384615436)
- ((Set(人造黄油),Set(全脂牛奶)),0.41319444444445047)
- ((Set(仁果类水果),Set(全脂牛奶)),0.39784946236559415)
- ((Set(牛肉),Set(全脂牛奶)),0.4050387596899274)
k=3阶关联规则
- ((Set(其他蔬菜, 酸奶),Set(全脂牛奶)),0.5128805620608942)
- ((Set(全脂牛奶, 酸奶),Set(其他蔬菜)),0.3974591651542704)
- ((Set(其他蔬菜, 根茎类蔬菜),Set(全脂牛奶)),0.4892703862660995)
- ((Set(全脂牛奶, 根茎类蔬菜),Set(其他蔬菜)),0.47401247401247926)
可以看出运行结果和单机算法是近乎相同的
程序运行时间统计
- spark启动总用时: 7065ms
- 数据总条目9835
- 加载总用时: 1592ms
- 生成k=1层级的频繁项用时: 866ms
- 产生k=2层级的候选项数目: 1711
- 生成k=2层级的所有可能的候选项集用时: 39ms
- 生成k=2频繁项用时: 1917ms
- 产生k=2层级的频繁项目: 61条
- 生成k=2层级的关联规则用时: :12ms
- 生成的关联规则数目: 22
- 产生k=3层级的候选项数目: 477
- 生成k=3层级的所有可能的候选项集用时: 15ms
- 生成k=3频繁项用时: 725ms
- 产生k=3层级的频繁项目: 2条
- 生成k=3层级的关联规则用时: :8
- ms
- 生成的关联规则数目: 4
- 产生k=4层级的候选项数目: 1
- 生成k=4层级的所有可能的候选项集用时: 2ms
- 生成k=4频繁项用时: 319ms
- 产生k=4层级的频繁项目: 0条
- 进程已结束,退出代码0
可以看出相较于单机版的算法,在相同的机器配置与运行环境下,基于大数据处理框架(即时是伪分布式)的挖掘算法在运行时间方面显著优于前者。
4.5缺点和不足
Apriori算法本身对数据库的扫描次数过多,而且扫描后可能产生大量的候选项集,在频繁项目集长度变大的情况下,运算时间显著增加,这个问题并不能仅凭大数据处理框架解决,如果数据量过大超过了主存的大小,频繁的换入换出造成的时间浪费同样也是不可忽视的。同时有许多应用并不需要发现所有的频繁项:比方说在某些情况下,我们只要找到大部分的销售频繁关联项就够了,而不必找出所有的频繁项。
第5章 工程实现:大数据框架下有限扫描关联挖掘算法
5.1简介
基于大数据处理框架的有限扫描挖掘算法通过选择原始数据的一个样本,在这个样本上再使用Apriori算法进行频繁的挖掘,以牺牲精确度的代价来减少算法开销,为了提高效率,样本大小应该以可以放在内存中为宜,可以适当降低最小支持度来减少遗漏的频繁模式,尽可能发现全部或大部分的频繁项集。即先使用从全体数据集中抽取出来的采样得到一些在整个数据库中可能成立的规则,然后对数据库的剩余部分验证这个结果。
5.2算法思想
从给定数据集上选取随机样本S,然后在S而不是整个数据集中执行Apriori算法搜索频繁项集。用这种方法是牺牲了一些精度换取有效性。样本S的大小选取使得频繁项目集产生的数目大大减少。这样算法不需要扫描整个数据集中的事务。由于算法只是搜索随机样本中的数据,因此可能会丢失一些全局频繁项集。为了减少这样的情况,使用比最小支持度更低的支持度阈值来找出局部于S的频繁项集(记做pS)。然后,原数据集的其余部分用于计算pS中每个项集的实际频率。使用一种机制来确定是否所有的频繁项集都包含在pS中。如果pS实际包含了整个数据集中的所有频繁项集,则只需扫描一次数据集。否则,需要进行重新采样,继续重复上面的步骤,直到出现满足pS实际包含了整个数据集中的所有频繁项集。
我们定义一个概念:Negative border(非频繁项边界,或者说是反例边界):是采样集中的一个非频繁项的集合,当且仅当这些项集中去掉任意一个项后得到的所有直接子集是在采样集中都是频繁集。
使用Negative border的目的是充当一个类似于金丝雀的边界探测。Negative border中的项在整个数据集中没有一个是频繁的,因为我们在采样样本中选择了明显低于比例阈值的支持度阈值,如果项集在采样样本中不是频繁项集,那么其在全体数据集上也就不可能是频繁项集。
但是,如果一个或多个Negative border中的项集在全体数据集中被证明是频繁的,那么我们就认为该样本相对于全体数据集不具有代表性,我们需要进行重新采样。
因此,我们可以得到有限抽样算法的流程:
首先在样本上计算频繁项和Negative border项,得到的Negative border项我们再将其在全体数据集上验证其是否也是非频繁项。
1、 如果Negative border中所有项集在整个数据集上计算为都为频繁项集。这种情况下,样本中的频繁项集可以近似正确的频繁项集。
2、 如果存在Negative border中的项集在整个数据集中是频繁项集。此时,我们不能确定是否存在更大的项集,这个项集既不在样本的Negative border中,又不在样本的的频繁项集中,但是在整个数据集是频繁项集。这样,我们在此次的抽样中得不到结果,算法只能在重新抽样,继续重复上面的步骤,直到出现Negative border中没有一个项集在整个数据集上计算为频繁项集的情况停止。
其余在局部样本上从k阶频繁项集生成k+1阶候选项集,k+1阶候选项集计算频繁项集,生成关联规则均与第四章中的算法思想相同。
然后我们对算法产生的结果的有效性进行分析
有限扫描挖掘算法不会产生False Negative,就是在也是就是在全体数据集上为频繁项的数据在抽样后表现为非频繁项,下面我们给出证明:
首先,在每轮的样本采样和频繁项计算中,会出现三种结果:
\1. 项集在样本中是频繁项
\2. 项集在样本中不是频繁项,而在Negative border中
\3. 项集在样本中是不是频繁项,也不在Negative border中。
如果在全体数据集上为频繁项的数据在抽样后表现为非频繁项,那么它就有上面提到的2,3两者可能,从算法的流程可以看到:我们保证所有Negative border中的集合在整个数据集都是不频繁的,那么不可能有某个项集满足:它在整个数据集上是频繁的,而且现在样本采样的Negative border中,因此2的情况可以排除。接下来再看3的情况,如果项集在样本中是不是频繁项,也不在Negative border中,而在全体数据集上是频繁项,我们可以通过证明否定这种结果:
假设集合S 在全体数据集上是频繁项集,但是不是样本的频繁项集,并且不在Negative border中,根据Apriori算法的性质:1.是频繁项集的所有非空子集都必须也是频繁的,2.是非频繁项集的所有父集都是非频繁的。S在样本上是非频繁集,假设T是S所有子集中在样本数据上非频繁项的最小子集,那么T一定在 Negative border中,首先T在样本数据集上是非频繁的,其次T的所有子集在样本数据集上是频繁的,否则就与T是S所有子集中在样本数据上非频繁项的最小子集的假设相矛盾;又因为S 在全体数据集上是频繁项集,所以T作为S的子集也是频繁项集(基于Apriori算法的性质1)
我们可以发现,T即在Negative border,又在全体数据集上是频繁集,所以随机取样算法不能产生结果,需要重新运行采样;因此可以证明:项集在样本中是不是频繁项,也不在Negative border中的情况不会出现。
综上述可知:项集要么在样本数据集中是频繁项 ,要么项集在样本中不是频繁项,而在Negative border中,此时在全体数据集中一定不会是频繁项,所以说有限扫描挖掘算法会不会产生False Negative。
5.3代码实现
配置spark和HDFS等步骤均与第四章中实现相同。
利用sparkContent.textFile()读入数据,然后使用将其利用cache()缓存到分布式内存也与与第四章中实现相同。
为了实现算法思想中的重复采样的需求,利用一个while循环实现,并维护一个布尔型遍历SampleValidity ,表示采样是否有效,当运行全部的频繁项计算且满足有限扫码的采样条件时,将其置为真,算法结束。
利用spark中的RDD. sample()方法进行采样,置换采样设置为false,fractionOfTransactionUsed为预先设置的采样比例,设置采样随机种子为scala.util.Random.nextInt(1000)
- var SampleValidity : Boolean = false
- val loop = new Breaks;
- while(SampleValidity!=true){
- if (hdfs.exists(path)) {
- hdfs.delete(path, true)
- }
- loop.breakable {
\8. val sample = transactions.sample(false, fractionOfTransactionUsed, scala.util.Random.nextInt(1000))
生成1-频繁项和第四章中相同,除了要生成频繁项之外,还需要计算Negative border项集 ,再和全数据集上的频繁项进行对照。
- //抽样下的 1-非频繁项,又称为NegativeBorder
- val NegativeBorderItem = FrequentCountMap.filter(item => item._2 < minSupport).map(item=>item._1)
- //计算全数据集的 1-非频繁项
- val NegativeItems = transactions.flatMap(_.seq)
- .map(item => (Set(item), 1 / transactions_num))
- .reduceByKey((supportA, supportB) => supportA + supportB)
- .filter(item => item._2 < minSupport).map(item=>item._1)
和全数据集上的频繁项进行对照采用先取全数据集上的非频繁项集,再与Negative border项集取差集,如果得到差集中有元素的话,说明存在Negative border中的项集在全数据集上是频繁项,这时候就需要重新运行采样算法,重新采样了。
- if( NegativeItems.subtract(NegativeBorderItem).count()>0) {
- println(“重新运行采样算法 k=”+k+“\n”+“-”12+“\n”+“-”12)
- //NegativeBorderItem.subtract(NegativeItems).foreach(print)
- //println(“-----”)
- NegativeItems.subtract(NegativeBorderItem).foreach(print)
- loop.break()
- //重新采样运行算法
- }
除此之外,因为我们选择了一个抽样样本,所有的采样与频繁项计算都需要在这个样本之上,而不是在全数据集transactions 上
对于由k 项集生成(k+1)项集时,生成时不仅要计算频繁项集randItemFreqCombs,还需要计算Negative border项集randNegativeBorderItems,
计算这个项集的思路是先将sampleCountMap 中不满足items._2 (支持度)>= minSupport的项集提取出来,再用filter方法进行筛选,选出项集中去掉任意一个项后得到的所有直接子集是在采样集中都是频繁集的项集加入Negative border中。
- sampleCountMap = sample
- .flatMap(transaction => broadcastCurrentC.value.filter(Candidate => Candidate.subsetOf(transaction)).map(Candidate => (Candidate, 1 / sample_num)))
- .reduceByKey((supportA, supportB) => supportA + supportB)
- val randItemFreqCombs = sampleCountMap
- .filter(items => items._2 >= minSupport)
- randItemFreqCombs.saveAsTextFile(output + “/” + “randFreq” + k)
- //利用计算结果收集新的频繁项集合
- currentLSet = randItemFreqCombs.collect().map(item => item._1).toSet
- //计算Negative border:是样品的一个非频繁项集合,并且这些项集去掉任意一个项后的直接子集是频繁集。
- val randNegativeBorderItems = sampleCountMap.filter(items => items._2 < minSupport).
- map(x=>x._1).filter(items=>items.subsets.filter(item=>(item.size==k-1))
- .forall(x=>currentLSet.contains(x)))
- //将randNegativeBorderItems广播到所有工作节点
- val broadcastRNI = sc.broadcast(randNegativeBorderItems.collect())
- //验证negative border在全部数据集上也是非频繁项集
- val existFalsePositive = transactions
- .flatMap(transaction=>broadcastRNI.value.filter(rNI=>rNI.subsetOf(transaction)).map(rNI=>(rNI,1 / transactions_num)))
- .reduceByKey((supportA, supportB) => supportA + supportB)
- .collect().forall(items => items._2 < minSupport)
- if(existFalsePositive == false){
- println(“重新运行采样算法 k=”+k+“\n”+“-”12+“\n”+“-”12)
- loop.break()
- //重新采样运行算法
- }
计算关联规则的方法和第四章中相同,只不过将全数据集集合替换为样本集即可,这里不再赘述。
5.4程序运行与输出
同样选择小规模的购物篮数据集进行测试,这个数据集数据总条目为9835条
在第四章相同的环境下运行:
最小支持度0.02
最小置信度0.35,
抽样大小 0.5
样本最小支持度:0.9*最小支持度
k=2阶关联规则
- ((Set(凝乳),Set(全脂牛奶)),0.4922480620155052)
- ((Set(糕点),Set(全脂牛奶)),0.37354988399071676)
- ((Set(猪肉),Set(全脂牛奶)),0.373188405797103)
- ((Set(香肠),Set(面包卷)),0.3675213675213643)
- ((Set(本地蛋类),Set(全脂牛奶)),0.4478527607361969)
- ((Set(柑橘类水果),Set(全脂牛奶)),0.36231884057970815)
- ((Set(猪肉),Set(其他蔬菜)),0.36594202898550876)
- ((Set(热带水果),Set(全脂牛奶)),0.39525691699604304)
- ((Set(酸奶油),Set(全脂牛奶)),0.428954423592492)
- ((Set(酸奶油),Set(其他蔬菜)),0.4182305630026797)
- ((Set(餐巾),Set(全脂牛奶)),0.39926739926740074)
- ((Set(冷冻蔬菜),Set(全脂牛奶)),0.422764227642278)
- ((Set(黄油),Set(全脂牛奶)),0.4726562500000014)
- ((Set(根茎类蔬菜),Set(其他蔬菜)),0.42418426103646323)
- ((Set(根茎类蔬菜),Set(全脂牛奶)),0.443378119001914)
- ((Set(黑面包),Set(全脂牛奶)),0.39755351681957257)
- ((Set(其他蔬菜),Set(全脂牛奶)),0.3640167364016669)
- ((Set(酸奶),Set(全脂牛奶)),0.39410939691443897)
- ((Set(人造黄油),Set(全脂牛奶)),0.4137931034482772)
- ((Set(仁果类水果),Set(全脂牛奶)),0.3977272727272724)
- ((Set(牛肉),Set(全脂牛奶)),0.4078431372549035)
k=3阶关联规则
- ((Set(酸奶, 其他蔬菜),Set(全脂牛奶)),0.509174311926607)
- ((Set(全脂牛奶, 酸奶),Set(其他蔬菜)),0.3950177935943075)
- ((Set(其他蔬菜, 根茎类蔬菜),Set(全脂牛奶)),0.4615384615384632)
- ((Set(根茎类蔬菜, 全脂牛奶),Set(其他蔬菜)),0.44155844155844315)
对关联规则进行比较,k=3阶关联规则与精确算法生成的关联规则是完全相同的,而k=2阶的关联规则与精确算法的存在一些误差,准确率大概在85%左右。
运行时间:
- spark启动总用时: 7422ms
- 数据总条目9835
- 加载总用时: 1909ms
- 抽样总条目4926
- 生成k=1层级的频繁项用时: 401ms
- 重新运行采样算法 k=1
- Set(防腐用品)抽样总条目5020
- 生成k=1层级的频繁项用时: 93ms
- 产生k=2层级的候选项数目: 595
- 生成k=2层级的所有可能的候选项集用时: 31ms
- 生成k=2频繁项用时: 588ms
- 产生k=2层级的频繁项目: 59条
- 生成k=2层级的关联规则用时: :8ms
- 生成的关联规则数目: 21
- 产生k=3层级的候选项数目: 447
- 生成k=3层级的所有可能的候选项集用时: 13ms
- 生成k=3频繁项用时: 606ms
- 产生k=3层级的频繁项目: 2条
- 生成k=3层级的关联规则用时: :6ms
- 生成的关联规则数目: 4
- 产生k=4层级的候选项数目: 1
- 生成k=4层级的所有可能的候选项集用时: 13ms
- 生成k=4频繁项用时: 323ms
- 产生k=4层级的频繁项目: 0条
- 进程已结束,退出代码0
可以看出和全数据集上的频繁项挖掘算法对比,生成候选项和频繁条目的用时显著的减小了,这是因为我们在抽样集上进行挖掘,生成候选项的数目大大地减少了,同时对数据集的遍历次数也减少了。
5.5缺点和不足
随机采样算法存在一个很大的缺点就是产生的结果不精确,即存在所谓的数据扭曲,因为在一些实际情况下,分布在同一页面上的数据时常是高度相关的,对这些数据的采样可能不能表示整个数据集中模式与潜在规则的分布,由此而导致的是多次重复采样数据所花费的代价有可能是较大的。
第6章 实验评估与分析
6.1 实验设计
6.1.1实验环境
本次实验的软件环境使用java version “1.8.0_251”, scala version 2.12.12,hadoop-2.7.7,spark-3.0.1-bin-hadoop2.7,运行在Microsoft Windows 10 专业版,10.0.19042 Build 19042,在IntelliJ IDEA 2020.3.1 (Ultimate Edition)下运行,其中spark运行在StandAlone模式
硬件环境安装有16,258 MB物理内存,使用Intel® Core™ i5-8300H CPU @ 2.30GHz (8 CPUs), ~2.3GHz处理器
6.1.2 数据集介绍
本次实验共采用三个规模不同数据集:
\1. data_act_combat5529_Retail,
示例:
- 柑橘类水果 人造黄油 即食汤 半成品面包
- 咖啡 热带水果 酸奶
- 全脂牛奶
- 奶油乳酪 肉泥 仁果类水果 酸奶
- 炼乳 长面包 其他蔬菜 全脂牛奶
- 腐蚀性清洁剂 黄油 白饭 全脂牛奶 酸奶
- 面包卷
- 瓶装啤酒 开胃酒 其他蔬菜 面包卷 超高温杀菌的牛奶
- 盆栽
- 谷物 全脂牛奶
\2. Retail_Market_Basket_Data_Set数据集由Tom Brijs提供,包含来自匿名比利时零售商店的零售市场篮子数据,共有88163条购物篮数据,所有购物篮共计1076195件商品,购物篮的平均大小为13,且大部分购物篮的大小都在7-11的范围之间
示例:
\3. IBM_Almaden_Quest T40I10D100K 数据集,是使用IBM Almaden Quest research group的生成器生成的数据,共有100000条,数据集中涉及1000种不同的挖掘项类型
示例:
6.1.3实验内容
分布在多个不同规模的数据集上,运行大数据处理框架的挖掘算法和有限扫描挖掘算法,对大数据计算框架下的随机与精确算法的性能与正确率进行对照与分析。
统计算法性能方式:对程序运行进行计时
统计正确率的方式:对生成的关联规则与精确算法的进行比较
6.2 对比实验
本次实验选择的对比实验是运行传统Apriori算法,记录传统频繁项集挖掘算法面对大规模数据时的表现和限制。
6.3 实验结果
6.3.1 数据集1上实验
首先在数据集1上进行实验
最小支持度0.02
最小置信度0.35,
抽样大小 0.5
样本最小支持度:0.9*最小支持度
运行结果:
数据集1的规模较小,尚在常规算法接受的范围内
可以看出大数据计算框架在计算频繁项时能够节省较多的时间,而频繁项计算占用了算法运行大部分时间,相比于精确算法,随机抽样能够节省更多时间,这与抽样的比例有密切的关系。
6.3.2 数据集2上实验
最小支持度0.02
最小置信度0.35,
抽样大小 0.5
样本最小支持度:0.9*最小支持度
运行结果:
随着数据集中数据的增多,传统挖掘算法需要的时间会显著增加,几乎是基于spark上算法的数倍。
从前两个实验中我们可以看出,算法花费的主要时间是在频繁项搜索上,因此之后我们只统计频繁项搜索花费的时间来确定算法的时间代价
6.3.3 数据集3上实验
最小支持度0.02
最小置信度0.35,
抽样大小 0.5
样本最小支持度:0.9*最小支持度
由于数据集3数据量极大,每个篮中包含的项目较多,因此传统挖掘算法几乎无法完成挖掘任务,我们从100k(100 000)的数据集中选择10k条进行实验
可以发现基于大数据处理框架的算法可以大幅度地较少处理时间(结果约90%),再进行随机采样可以进一步减少时间
然后我们扩大数据集选取50%的数据
很可惜,选取50%的数据,传统挖掘算法显然不能在有效的时间内完成挖掘任务,因此我们跳过对其的测试
可以看出,因为数据量的增多,即使是使用了spark计算框架,也需要消耗大量的时间,而随机抽样的方法可以在一定程度上减少时间的消耗。
6.4 实验受参数的影响
6.4.1支持度
由于在三种算法中,和支持度有关的操作(判断是否为频繁项)都是类似的,仅对传统挖掘算法的实验也具有代表性,因此该实验在传统挖掘算法上执行,运行数据集.
最小支持度 0.01 0.02 0.05 0.07 0.1 0.2
最小置信度 0.35
实验的结果是可以理解的,较大的支持度下会产生更小的频繁项集,进而导致在下一轮迭代计算中产生更小的候选集,更小的候选集在较大的支持度的作用下就会又产生更小的频繁项集,更小的候选集和频繁项集都会减少算法运行消耗的时间。
较大的支持度下会产生更小的频繁项集会产生更少的关联规则,这也是很容易理解的。
6.4.2置信度
因为置信度涉及关联规则的计算,这次选择spark下的频繁项挖掘算法 ,在能产生更多关联项集的数据集2上运行
最小支持度0.02
最小置信度0.1 0.2 0.35 0.5 0.7
实验结果:
从结果可以看出,置信度这个参数和频繁项计算时间与关联规则计算时间没有太多的关联,仅仅对产生关联规则数目有影响,这是因为最小置信度仅在生成关联规则时用到,不涉及其他的计算。
6.4.3取样比例
选择spark下的频繁项挖掘算法与随机抽样挖掘抽样算法进行比较
最小支持度0.02
最小置信度0.35
抽样比例:0.1 0.2 0.3 0.4 0.5 0.8 1
显然抽样比例为1即退化为全数据集上的频繁项挖掘算法
选择从数据集3 :100k(100 000)的数据集中选择10k条进行实验,因为这个数据集规模较大,更容易体现出随机抽样算法的效果。
从结果中可以看出,不同的抽样大小会导致在抽样上花费的时间也不一样,对于一个较小的抽样,在小样本集上遍历生成频繁项所需要的代价就会更小,所需要的运算时间也就越小,进而性能表现也会更好。
但是图表中并没有对小样本导致的样本相对于全体数据集不具有代表性(例如存在Negative border中的项集在整个数据集中是频繁项集)的情况进行体现,只对成功计算采样的情况进行了性能分析,在这种情况下我们需要进行重新采样,小样本导致的多次重复采样数据所花费的代价可能会很大。
- 19 -
版权归原作者 FZQuantum 所有, 如有侵权,请联系我们删除。