0


基于Hadoop的云计算与大数据处理技术

一、实验目的

1.了解Scala语言的基本语法

2.了解Spark Scala开发的原理

3.了解Spark Java API的使用

4.了解Spark的Scala API及Java API对数据处理的不同点

****二、实验内容 ****

某电商网站记录了大量用户对商品的收藏数据,并将数据存储在名为buyer_favorite1的文件中,数据格式以及数据内容如下:

用户ID(buyer_id),商品ID(goods_id),收藏日期(dt)

view plain copy

  1. 用户id 商品id 收藏日期
  2. 10181 1000481 2010-04-04 16:54:31
  3. 20001 1001597 2010-04-07 15:07:52
  4. 20001 1001560 2010-04-07 15:08:27
  5. 20042 1001368 2010-04-08 08:20:30
  6. 20067 1002061 2010-04-08 16:45:33
  7. 20056 1003289 2010-04-12 10:50:55
  8. 20056 1003290 2010-04-12 11:57:35
  9. 20056 1003292 2010-04-12 12:05:29
  10. 20054 1002420 2010-04-14 15:24:12
  11. 20055 1001679 2010-04-14 19:46:04
  12. 20054 1010675 2010-04-14 15:23:53
  13. 20054 1002429 2010-04-14 17:52:45
  14. 20076 1002427 2010-04-14 19:35:39
  15. 20054 1003326 2010-04-20 12:54:44
  16. 20056 1002420 2010-04-15 11:24:49
  17. 20064 1002422 2010-04-15 11:35:54
  18. 20056 1003066 2010-04-15 11:43:01
  19. 20056 1003055 2010-04-15 11:43:06
  20. 20056 1010183 2010-04-15 11:45:24
  21. 20056 1002422 2010-04-15 11:45:49
  22. 20056 1003100 2010-04-15 11:45:54
  23. 20056 1003094 2010-04-15 11:45:57
  24. 20056 1003064 2010-04-15 11:46:04
  25. 20056 1010178 2010-04-15 16:15:20
  26. 20076 1003101 2010-04-15 16:37:27
  27. 20076 1003103 2010-04-15 16:37:05
  28. 20076 1003100 2010-04-15 16:37:18
  29. 20076 1003066 2010-04-15 16:37:31
  30. 20054 1003103 2010-04-15 16:40:14
  31. 20054 1003100 2010-04-15 16:40:16

现分别使用Spark Scala API及Spark Java API对用户收藏数据,进行wordcount操作,统计每个用户收藏商品数量。

三、实验原理或流程

Spark的核心就是RDD,所有在RDD上的操作会被运行在Cluster上,Driver程序启动很多Workers,Workers在(分布式)文件系统中读取数据后转化为RDD(弹性分布式数据集),然后对RDD在内存中进行缓存和计算。

对于Spark中的API来说,它支持的语言有Scala、Java和Python,由于Scala是Spark的原生语言,各种新特性肯定是Scala最先支持的,Scala语言的优势在于语法丰富且代码简洁,开发效率高。缺点在于Scala的API符号标记复杂,某些语法太过复杂,不易上手。对Java开发者而言,也可以使用Spark Java API。

RDD有两种类型的操作 ,分别是Action(返回values)和Transformations(返回一个新的RDD)。

四、实验过程及源代码

1.在Linux上创建/data/spark4目录,用于存储实验所需的数据。

view plain copy

  1. mkdir -p /data/spark4

切换到/data/spark4目录下,并从http://172.16.103.12:60000/allfiles/spark4/下载实验数据buyer_favorite1及spark-assembly-1.6.0-hadoop2.6.0.jar。

view plain copy

  1. cd /data/spark4
  2. wget http://172.16.103.12:60000/allfiles/spark4/buyer_favorite1
  3. wget http://172.16.103.12:60000/allfiles/spark4/spark-assembly-1.6.0-hadoop2.6.0.jar

2.使用jps查看Hadoop以及Spark的相关进程是否已经启动,若未启动则执行启动命令。

view plain copy

  1. jps
  2. /apps/hadoop/sbin/start-all.sh

view plain copy

  1. /apps/spark/sbin/start-all.sh

将Linux本地/data/spark4/buyer_favorite文件,上传到HDFS上的/myspark4目录下。若HDFS上/myspark4目录不存在则创建。

view plain copy

  1. hadoop fs -mkdir -p /myspark4
  2. hadoop fs -put /data/spark4/buyer_favorite1 /myspark4

3.打开已安装完Scala插件的Eclipse,新建一个Scala项目。

将项目命名为spark4。

在spark4项目下新建包名,命名为my.scala。

右键点击包名, 新建scala Object。

将scala object命名为ScalaWordCount。

4.右键项目,创建一个文件夹,名为lib。

将Linux上的/data/spark4/spark-assembly-1.6.0-hadoop2.6.0.jar文件,拷贝到lib目录下。右键jar包,点击Build Path=>Add to Build Path。

5.在Eclipse中,打开ScalaWordCount.scala文件。编写Scala语句,并统计用户收藏数据中,每个用户收藏商品数量。

view plain copy

  1. package my.scala
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.SparkContext
  4. object ScalaWordCount {
  5. def main(args: Array[String]) {  
    
  6.     val conf = ****new**** SparkConf()  
    
  7.     conf.setMaster("local")  
    
  8.         .setAppName("scalawordcount")  
    
  9.     val sc = ****new**** SparkContext(conf)  
    
  10.     val rdd = sc.textFile("hdfs://localhost:9000/myspark4/buyer_favorite1")  
    
  11.     rdd.map(line => (line.split("\t")(0), 1))  
    
  12.        .reduceByKey(_ + _)  
    
  13.        .collect()  
    
  14.        .foreach(println)  
    
  15.     sc.stop()  
    
  16. }  
    
  17. }

第一步:创建Spark的配置对象sparkConf,设置Spark程序运行时的配置信息;

第二步:创建SparkContext对象,SparkContext是Spark程序所有功能的唯一入口,无论采用Scala、Java还是Python都必须有一个SparkContext;

第三步:根据具体的数据来源,通过SparkContext来创建RDD;

第四步:对初始的RDD进行Transformation级别的处理。(首先将每一行的字符串拆分成单个的单词,然后在单词拆分的基础上对每个单词实例计数为1;最后,在每个单词实例计数为1的基础上统计每个单词在文件出现的总次数)。

6.在Eclipse中执行代码

在控制界面console中查看的输出结果。

view plain copy

  1. (用户id 收藏商品数量)
  2. (20042,1)
  3. (20054,6)
  4. (20055,1)
  5. (20064,1)
  6. (20001,2)
  7. (10181,1)
  8. (20067,1)
  9. (20056,12)
  10. (20076,5)

7.再次右键点击项目名,新建package,将包命名为my.java 。

右键点击包my.java,新建Class,命名为JavaWordCount。

8.打开JavaWordCount.java文件,编写Java代码,统计用户收藏数据中,每个用户收藏商品数量。

view plain copy

  1. package my.java;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.FlatMapFunction;
  7. import org.apache.spark.api.java.function.Function2;
  8. import org.apache.spark.api.java.function.PairFunction;
  9. import scala.Tuple2;
  10. import java.util.Arrays;
  11. import java.util.List;
  12. import java.util.regex.Pattern;
  13. public final class JavaWordCount {
  14. private static final Pattern SPACE = Pattern.compile("\t");
  15. public static void main(String[] args) throws Exception {
  16. SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
  17. JavaSparkContext ctx = new JavaSparkContext(sparkConf);
  18. JavaRDD<String> lines = ctx.textFile("hdfs://localhost:9000/myspark4/buyer_favorite1");
  19. JavaRDD<String> words = lines.flatMap(****new**** FlatMapFunction<String, String>() {  
    
  20.     @Override  
    
  21.     ****public**** Iterable<String> call(String s) {  
    
  22.         String word[]=s.split("\t",2);  
    
  23.         ****return**** Arrays.asList(word[0]);  
    
  24.         }  
    
  25.         });  
    
  26.         JavaPairRDD<String, Integer> ones = words.mapToPair(****new**** PairFunction<String, String, Integer>() {  
    
  27.         @Override  
    
  28.         ****public**** Tuple2<String, Integer> call(String s) {  
    
  29.         ****return**** ****new**** Tuple2<String, Integer>(s, 1);  
    
  30.         }  
    
  31.         });  
    
  32.         JavaPairRDD<String, Integer> counts = ones.reduceByKey(****new**** Function2<Integer, Integer, Integer>() {  
    
  33.         @Override  
    
  34.         ****public**** Integer call(Integer i1, Integer i2) {  
    
  35.         ****return**** i1 + i2;  
    
  36.         }  
    
  37.         });  
    
  38.         List<Tuple2<String, Integer>> output = counts.collect();  
    
  39.         System.out.println(counts.collect());  
    
  40.         counts.saveAsTextFile("hdfs://localhost:9000/myspark4/out");  
    
  41.         ctx.stop();  
    
  42.         }  
    
  43.         }
    

9.在Eclipse上执行Java代码,并在Java代码指定输出目录下查看实验结果。

view plain copy

  1. hadoop fs -ls /myspark4/out
  2. hadoop fs -cat /myspark4/out/part-00000

五、实验结论及心得

通过这个实验,我们可以得出以下结论和心得:

Scala语言具有丰富的语法和简洁的代码,可以提高开发效率。但对于初学者来说,Scala的符号标记可能比较复杂,需要一定的学习成本。

Spark的工作原理是基于RDD的分布式计算,它充分利用了内存进行缓存和计算,提高了效率。了解这个原理有助于我们更好地使用Spark进行开发。

对于熟悉Java的开发者来说,Spark的Java API是一个很好的选择。它提供了与Scala相似的功能,但是更接近Java的语法和习惯。

在选择使用Spark的Scala API还是Java API时,需要根据项目需求和开发者的熟悉程度进行权衡。两者都有各自的优势和适用场景。

总的来说,掌握Scala语言和Spark的开发原理对于进行大数据处理和分析是非常重要的。通过实验我们对这些内容有了更深入的了解,为以后的开发工作打下了基础。


本文转载自: https://blog.csdn.net/2301_77118579/article/details/135112021
版权归原作者 昌航小马子 所有, 如有侵权,请联系我们删除。

“基于Hadoop的云计算与大数据处理技术”的评论:

还没有评论