0


企业Spark案例--酒店数据分析实战提交

第1关:数据清洗--过滤字段长度不足的且将出生日期转:

package com.yy

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object edu{

  1. /**********Begin**********/
  2. // 此处可填写相关代码
  3. case class Person(id:String,Name:String,CtfTp:String,CtfId:String,Gender:String,Birthday:String,Address:String,Zip:String,Duty:String,Mobile:String,Tel:String,Fax:String,EMail:String,Nation:String,Taste:String,Education:String,Company:String,Family:String,Version:String,Hotel:String,Grade:String,Duration:String,City:String)
  4. /**********End**********/
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession
  7. .builder()
  8. .appName("Spark SQL")
  9. .master("local")
  10. .config("spark.some.config.option", "some-value")
  11. .getOrCreate()
  12. val rdd = spark.sparkContext.textFile("file:///root/files/part-00000-4ead9570-10e5-44dc-80ad-860cb072a9ff-c000.csv")
  13. /**********Begin**********/
  14. // 清洗脏数据(字段长度不足 23 的数据视为脏数据)
  15. val rdd1: RDD[String] = rdd.filter(x=>{
  16. val e=x.split(",",-1)
  17. e.length==23 })
  18. // 将出生日期改为 xxxx-xx-xx 格式(例如 19000101:1900-01-01,如果该属性为空不做处理,结果只取前 10 行)
  19. val rdd2: RDD[Person] = rdd1.map(x=>{val str=x.split(",",-1)
  20. if (str(5).trim != "" && str(5).length == 8) {
  21. str(5) = str(5).substring(0,4)+"-"+str(5).substring(4,6)+"-"+str(5).substring(6,8)
  22. }
  23. Person(str(0),str(1),str(2),str(3),str(4),str(5),str(6),str(7),str(8),str(9),str(10),str(11),str(12),str(13),str(14),str(15),str(16),str(17),str(18),str(19),str(20),str(21),str(22))
  24. })
  25. import spark.implicits._
  26. val df =rdd2.toDS()
  27. df.createOrReplaceTempView("yy")
  28. val out= spark.sql("select * from yy limit 10")
  29. // 将结果保存成 csv 格式到 file:///root/files-out 目录下
  30. out.write.csv("file:///root/files-out")
  31. /**********End**********/
  32. spark.stop()
  33. }

}

第二关:数据分析--通过入住时间和入住总时长计算用户

package com.yy

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object edu1{

def main(args: Array[String]): Unit = {

  1. val spark = SparkSession
  2. .builder()
  3. .appName("Spark SQL")
  4. .master("local")
  5. .config("spark.some.config.option", "some-value")
  6. .getOrCreate()
  7. /**********Begin**********/
  8. //加载第一关处理后的数据,数据位于/root/files2目录下,文件名为part-00000-f9f4bd23-1776-4f84-9a39-f83840fa1973-c000.csv
  9. val df = spark.read.option("header", true).csv("file:///root/files2/part-00000-f9f4bd23-1776-4f84-9a39-f83840fa1973-c000.csv")
  10. //通过入住时间和入住总时长计算用户离开时间(入住时间或者入住总时长为空的不做计算)
  11. df.createOrReplaceTempView("yy")
  12. val df2: DataFrame =spark.sql("select Name,from_unixtime(unix_timestamp(Version)+Duration*3600,'yyyy-MM-dd HH:mm:ss') from yy where Version != '' and Duration != '' limit 10")
  13. //将结果保存成csv格式到file:///root/files-out1目录下
  14. df2.write.csv("file:///root/files-out1")
  15. /**********End**********/
  16. spark.stop()

}

}

第三关:数据分析--酒店被入住次数最多的3家和他们的平

package com.yy

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object edu2{

def main(args: Array[String]): Unit = {

  1. val spark = SparkSession
  2. .builder()
  3. .appName("Spark SQL")
  4. .master("local")
  5. .config("spark.some.config.option", "some-value")
  6. .getOrCreate()
  7. /**********Begin**********/

//加载第一关处理后的数据,数据位于/root/files3目录下,文件名为part-00000-f9f4bd23-1776-4f84-9a39-f83840fa1973-c000.csv

val df = spark.read.option("header", true).csv("file:///root/files3/part-00000-f9f4bd23-1776-4f84-9a39-f83840fa1973-c000.csv")

  1. //酒店被入住次数最多的10家和他们的平均得分以及所在城市(评分为空的不做计算,注意考虑连锁酒店的情况,即同一家酒店开设在不同的城市)
  2. df.createOrReplaceTempView("yy")
  3. val df2: DataFrame =spark.sql("select City,Hotel,avg from (select count(Hotel)as num ,Hotel,City ,round(avg(Grade),2) as avg from yy where Grade != '' group by Hotel,City ) aa order by num desc limit 3")

//将结果保存成csv格式到file:///root/files-out2目录下

df2.write.csv("file:///root/files-out2")

  1. /**********End**********/
  2. spark.stop()

}

}

第四关:数据分析--每个用户每年去酒店次数及入住总时长

package com.yy

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object edu3{

  1. def main(args: Array[String]): Unit = {
  2. val spark = SparkSession
  3. .builder()
  4. .appName("Spark SQL")
  5. .master("local")
  6. .config("spark.some.config.option", "some-value")
  7. .getOrCreate()
  8. /**********Begin**********/
  9. //加载第一关处理后的数据,数据位于/root/files4目录下,文件名为part-00000-f9f4bd23-1776-4f84-9a39-f83840fa1973-c000.csv
  10. val df = spark.read.option("header", true).csv("file:///root/files4/part-00000-f9f4bd23-1776-4f84-9a39-f83840fa1973-c000.csv")
  11. //每个用户每年去酒店次数及入住总时长
  12. df.createOrReplaceTempView("yy")
  13. val df2: DataFrame =spark.sql(" select Name ,count(Id),sum(Duration),time from ( select Name ,Id,Duration,year(Version) as time from yy where Version != '' ) a group by time,Name limit 10")
  14. //将结果保存成csv格式到file:///root/files-out3目录下
  15. df2.write.csv("file:///root/files-out3")
  16. /**********End**********/
  17. spark.stop()

}

}

觉得有帮助的小伙伴给个好评点赞吧转发也可以w!


本文转载自: https://blog.csdn.net/czczczs9/article/details/124964291
版权归原作者 cz学java 所有, 如有侵权,请联系我们删除。

“企业Spark案例--酒店数据分析实战提交”的评论:

还没有评论