0


Spark项目实战-数据清洗

日志文件:https://pan.baidu.com/s/1Eve8GmGi21JLV70fqJjmQw
提取码:3xsp

使用工具:IDEA Maven

使用Spark完成数据清洗和日用户留存分析:

1.搭建环境

配置pom.xml

  1. <repositories>
  2. <repository>
  3. <id>aliyunmaven</id>
  4. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  5. </repository>
  6. <repository>
  7. <id>spring-milestones</id>
  8. <name>Spring Milestones</name>
  9. <url>https://repo.spring.io/milestone</url>
  10. </repository>
  11. </repositories>
  12. <dependencies>
  13. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
  14. <dependency>
  15. <groupId>org.apache.spark</groupId>
  16. <artifactId>spark-core_2.13</artifactId>
  17. <version>3.2.1</version>
  18. </dependency>
  19. <!-- https://mvnrepository.com/artifact/junit/junit -->
  20. <dependency>
  21. <groupId>junit</groupId>
  22. <artifactId>junit</artifactId>
  23. <version>4.13.2</version>
  24. <scope>test</scope>
  25. </dependency>
  26. <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
  27. <dependency>
  28. <groupId>org.scala-lang</groupId>
  29. <artifactId>scala-library</artifactId>
  30. <version>2.13.8</version>
  31. </dependency>
  32. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
  33. <dependency>
  34. <groupId>org.apache.spark</groupId>
  35. <artifactId>spark-sql_2.13</artifactId>
  36. <version>3.2.1</version>
  37. </dependency>
  38. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
  39. <dependency>
  40. <groupId>org.apache.spark</groupId>
  41. <artifactId>spark-streaming_2.13</artifactId>
  42. <version>3.2.1</version>
  43. </dependency>
  44. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  45. <dependency>
  46. <groupId>mysql</groupId>
  47. <artifactId>mysql-connector-java</artifactId>
  48. <version>8.0.28</version>
  49. </dependency>
  50. </dependencies>

下载Scala插件:

file->setting->plugins

2.数据清洗

可以通过SparkSql中DataFrame的数据抽象,将数据存放在Mysql中,整个日志的RDD格式走向变化过程可理解为:

RDD[String]->RDD[Array[String]]->RDD[Row]->DataFrame->存入Mysql

在数据清洗前,需要了解Web日志的规格设置,本日志数据与数据之间是通过"\t"也就是Tab键位分隔开的,下面是一条常规的Web日志,其规格如下

  1. event_time = 2018-09-04T20:27:31+08:00
  2. url = http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451540&actionClient=Mozilla%2F5.0+%28Windows+NT+10.0%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F58.0.3029.110+Safari%2F537.36+SE+2.X+MetaSr+1.0&actionEnd=1536150451668&actionName=startEval&actionTest=0&actionType=3&actionValue=272090&clientType=001_kgc&examType=001&ifEquipment=web&isFromContinue=false&skillIdCount=0&skillLevel=0&testType=jineng&userSID=B842B843AE317425D53D0C567A903EF7.exam-tomcat-node3.exam-tomcat-node3&userUID=272090&userUIP=1.180.18.157
  3. method = GET
  4. status = 200
  5. sip = 192.168.168.64
  6. user_uip = -
  7. action_prepend = -
  8. action_client = Apache-HttpClient/4.1.2 (java 1.5)

1)将RDD[String]转换为RDD[Row]的形式,并且过滤字段数少于8的日志

  1. val linesRDD = sc.textFile("C:/Users/Lenovo/Desktop/Working/Python/data/test.log")
  2. import spark.implicits._
  3. val line1 = linesRDD.map(x => x.split("\t"))
  4. //line1.foreach(println)
  5. val rdd = line1
  6. .filter(x => x.length == 8)
  7. .map(x => Row(x(0).trim, x(1).trim, x(2).trim, x(3).trim, x(4).trim, x(5).trim, x(6).trim, x(7).trim))
  8. //rdd.foreach(println)

2)将RDD[Row]转换为DataFrame,建立初步映射关系

  1. // 建立RDD和表格的映射关系
  2. val schema = StructType(Array(
  3. StructField("event_time", StringType),
  4. StructField("url", StringType),
  5. StructField("method", StringType),
  6. StructField("status", StringType),
  7. StructField("sip", StringType),
  8. StructField("user_uip", StringType),
  9. StructField("action_prepend", StringType),
  10. StructField("action_client", StringType)
  11. ))
  12. val orgDF = spark.createDataFrame(rdd, schema)
  13. // orgDF.show(5)

3)将url按照"&"和"="切割字段

  1. //去重,过滤掉状态码非200,过滤时间为空
  2. //distinct是根据每一条数据进行完整内容的比对和去重,dropDuplicates可以根据指定的字段进行去重。
  3. val ds1 = orgDF.dropDuplicates("event_time", "url")
  4. .filter(x => x(3) == "200")
  5. .filter(x => StringUtils.isNotEmpty(x(0).toString))
  6. //将url按照"&"和"="切割
  7. //userSID
  8. //userUIP
  9. //actionClient
  10. //actionBegin
  11. //actionEnd
  12. //actionType
  13. //actionPrepend
  14. //actionTest
  15. //ifEquipment
  16. //actionName
  17. //id
  18. //progress进行切割
  19. //以map的形式建立内部映射关系
  20. val dfDetail = ds1.map(row => {
  21. val urlArray = row.getAs[String]("url").split("\\?")
  22. var map = Map("params" -> "null")
  23. if (urlArray.length == 2) {
  24. map = urlArray(1).split("&")
  25. .map(x => x.split("="))
  26. .filter(_.length == 2)
  27. .map(x => (x(0), x(1)))
  28. .toMap
  29. }
  30. (
  31. //map为url中字段,row为原DataFrame字段
  32. row.getAs[String]("event_time"),
  33. row.getAs[String]("user_uip"),
  34. row.getAs[String]("method"),
  35. row.getAs[String]("status"),
  36. row.getAs[String]("sip"),
  37. map.getOrElse("actionBegin", ""),
  38. map.getOrElse("actionEnd", ""),
  39. map.getOrElse("userUID", ""),
  40. map.getOrElse("userSID", ""),
  41. map.getOrElse("userUIP", ""),
  42. map.getOrElse("actionClient", ""),
  43. map.getOrElse("actionType", ""),
  44. map.getOrElse("actionPrepend", ""),
  45. map.getOrElse("actionTest", ""),
  46. map.getOrElse("ifEquipment", ""),
  47. map.getOrElse("actionName", ""),
  48. map.getOrElse("progress", ""),
  49. map.getOrElse("id", "")
  50. )
  51. }).toDF()
  52. // dfDetail.show(5)

4)重新组建表头,将原DataFrame数据全部平摊,并存入数据库

  1. val detailRDD = dfDetail.rdd
  2. val detailSchema = StructType(Array(
  3. StructField("event_time", StringType),
  4. StructField("user_uip", StringType),
  5. StructField("method", StringType),
  6. StructField("status", StringType),
  7. StructField("sip", StringType),
  8. StructField("actionBegin", StringType),
  9. StructField("actionEnd", StringType),
  10. StructField("userUID", StringType),
  11. StructField("userSID", StringType),
  12. StructField("userUIP", StringType),
  13. StructField("actionClient", StringType),
  14. StructField("actionType", StringType),
  15. StructField("actionPrepend", StringType),
  16. StructField("actionTest", StringType),
  17. StructField("ifEquipment", StringType),
  18. StructField("actionName", StringType),
  19. StructField("progress", StringType),
  20. StructField("id", StringType)
  21. ))
  22. val detailDF = spark.createDataFrame(detailRDD, detailSchema)
  23. // overwrite重写,append追加
  24. val prop = new Properties()
  25. prop.put("user", "root")
  26. prop.put("password", "******")
  27. prop.put("driver","com.mysql.jdbc.Driver")
  28. val url = "jdbc:mysql://localhost:3306/python_db"
  29. println("开始写入数据库")
  30. detailDF.write.mode("overwrite").jdbc(url,"logDetail",prop)
  31. println("完成写入数据库")

3.用户日留存分析

  1. 求出第n天的新增用户总数m
  2. 求出第n+1天登录与n天新增用户的交集的总数n
  3. 留存率=n/m*100%

1)求出注册和登录行为的数据表

  1. val prop = new Properties()
  2. prop.put("user", "root")
  3. prop.put("password", "******")
  4. prop.put("driver", "com.mysql.jdbc.Driver")
  5. val url = "jdbc:mysql://localhost:3306/python_db"
  6. val dataFrame = spark.read.jdbc(url, "logdetail", prop)
  7. //所有的注册用户信息(userID,register_time,注册行为)
  8. val registerDF = dataFrame
  9. .filter(dataFrame("actionName") === ("Registered"))
  10. .select("userUID","event_time", "actionName")
  11. .withColumnRenamed("event_time","register_time")
  12. .withColumnRenamed("userUID","regUID")
  13. // registerDF.show(5)
  14. //原获取的日期格式为2018-09-04T20:27:31+08:00,只需要获取前10个字段(yyyy-mm-dd)
  15. val registDF2 = registerDF
  16. .select(registerDF("regUID"),registerDF("register_time")
  17. .substr(1,10).as("register_date"),registerDF("actionName"))
  18. .distinct()
  19. // registDF2.show(5)
  20. //所有的用户登录信息DF(userUID,signin_time,登录行为)
  21. val signinDF = dataFrame.filter(dataFrame("actionName") === ("Signin"))
  22. .select("userUID","event_time", "actionName")
  23. .withColumnRenamed("event_time","signing_time")
  24. .withColumnRenamed("userUID","signUID")
  25. // signinDF.show(5)
  26. val signiDF2 = signinDF
  27. .select(signinDF("signUID"),signinDF("signing_time")
  28. .substr(1,10).as("signing_date"),signinDF("actionName"))
  29. .distinct()
  30. // signiDF2.show(5)

2)求出第n和n+1天的交集总数n,第n天新增用户数m

  1. //以inner方式将相同userUID加在一起
  2. val joinDF = registDF2
  3. .join(signiDF2,signiDF2("signUID") === registDF2("regUID"),joinType = "inner")
  4. // joinDF.show(5)
  5. //Spark内置的datediff函数求出第n和n+1天交集总数n
  6. val frame = joinDF
  7. .filter(datediff(joinDF("signing_date"),joinDF("register_date")) === 1)
  8. .groupBy(joinDF("register_date")).count()
  9. .withColumnRenamed("count","signcount")
  10. // frame.show(5)
  11. //过滤,只拿第n天和当天新增用户总数m
  12. val frame1 = registDF2
  13. .groupBy(registDF2("register_date")).count()
  14. .withColumnRenamed("count","regcount")
  15. // frame1.show(5)

3)留存率=n/m*100%

  1. //将m和n放在一张表格中
  2. val frame2 = frame
  3. .join(frame1,"register_date")
  4. frame2.show()
  5. //新增列名留存率,数值为n/m,求出第n天的用户留存率
  6. frame2.withColumn("留存率",frame2("signcount")/frame2("regcount"))
  7. .show()

4.源代码:

DataClear.scala

  1. package spark
  2. import org.apache.commons.lang.StringUtils
  3. import org.apache.spark.sql.types.{StringType, StructField, StructType}
  4. import org.apache.spark.sql.{Row, SparkSession}
  5. import java.util.Properties
  6. object DataClear {
  7. def main(args: Array[String]): Unit = {
  8. val spark = SparkSession.builder().master("local[1]").appName("DataClear").getOrCreate()
  9. val sc = spark.sparkContext
  10. val linesRDD = sc.textFile("C:/Users/Lenovo/Desktop/Working/Python/data/test.log")
  11. import spark.implicits._
  12. val line1 = linesRDD.map(x => x.split("\t"))
  13. //line1.foreach(println)
  14. val rdd = line1
  15. .filter(x => x.length == 8)
  16. .map(x => Row(x(0).trim, x(1).trim, x(2).trim, x(3).trim, x(4).trim, x(5).trim, x(6).trim, x(7).trim))
  17. //rdd.foreach(println)
  18. // 建立RDD和表格的映射关系
  19. val schema = StructType(Array(
  20. StructField("event_time", StringType),
  21. StructField("url", StringType),
  22. StructField("method", StringType),
  23. StructField("status", StringType),
  24. StructField("sip", StringType),
  25. StructField("user_uip", StringType),
  26. StructField("action_prepend", StringType),
  27. StructField("action_client", StringType)
  28. ))
  29. val orgDF = spark.createDataFrame(rdd, schema)
  30. // orgDF.show(5)
  31. //去重,过滤掉状态码非200,过滤时间为空
  32. //distinct是根据每一条数据进行完整内容的比对和去重,dropDuplicates可以根据指定的字段进行去重。
  33. val ds1 = orgDF.dropDuplicates("event_time", "url")
  34. .filter(x => x(3) == "200")
  35. .filter(x => StringUtils.isNotEmpty(x(0).toString))
  36. //将url按照"&"以及"="切割,即按照userUID
  37. //userSID
  38. //userUIP
  39. //actionClient
  40. //actionBegin
  41. //actionEnd
  42. //actionType
  43. //actionPrepend
  44. //actionTest
  45. //ifEquipment
  46. //actionName
  47. //id
  48. //progress进行切割
  49. val dfDetail = ds1.map(row => {
  50. val urlArray = row.getAs[String]("url").split("\\?")
  51. var map = Map("params" -> "null")
  52. if (urlArray.length == 2) {
  53. map = urlArray(1).split("&")
  54. .map(x => x.split("="))
  55. .filter(_.length == 2)
  56. .map(x => (x(0), x(1)))
  57. .toMap
  58. }
  59. (
  60. row.getAs[String]("event_time"),
  61. row.getAs[String]("user_uip"),
  62. row.getAs[String]("method"),
  63. row.getAs[String]("status"),
  64. row.getAs[String]("sip"),
  65. map.getOrElse("actionBegin", ""),
  66. map.getOrElse("actionEnd", ""),
  67. map.getOrElse("userUID", ""),
  68. map.getOrElse("userSID", ""),
  69. map.getOrElse("userUIP", ""),
  70. map.getOrElse("actionClient", ""),
  71. map.getOrElse("actionType", ""),
  72. map.getOrElse("actionPrepend", ""),
  73. map.getOrElse("actionTest", ""),
  74. map.getOrElse("ifEquipment", ""),
  75. map.getOrElse("actionName", ""),
  76. map.getOrElse("progress", ""),
  77. map.getOrElse("id", "")
  78. )
  79. }).toDF()
  80. // dfDetail.show(5)
  81. val detailRDD = dfDetail.rdd
  82. val detailSchema = StructType(Array(
  83. StructField("event_time", StringType),
  84. StructField("user_uip", StringType),
  85. StructField("method", StringType),
  86. StructField("status", StringType),
  87. StructField("sip", StringType),
  88. StructField("actionBegin", StringType),
  89. StructField("actionEnd", StringType),
  90. StructField("userUID", StringType),
  91. StructField("userSID", StringType),
  92. StructField("userUIP", StringType),
  93. StructField("actionClient", StringType),
  94. StructField("actionType", StringType),
  95. StructField("actionPrepend", StringType),
  96. StructField("actionTest", StringType),
  97. StructField("ifEquipment", StringType),
  98. StructField("actionName", StringType),
  99. StructField("progress", StringType),
  100. StructField("id", StringType)
  101. ))
  102. val detailDF = spark.createDataFrame(detailRDD, detailSchema)
  103. detailDF.show(10)
  104. // overwrite重写,append追加
  105. val prop = new Properties()
  106. prop.put("user", "root")
  107. prop.put("password", "******")
  108. prop.put("driver","com.mysql.jdbc.Driver")
  109. val url = "jdbc:mysql://localhost:3306/python_db"
  110. println("开始写入数据库")
  111. detailDF.write.mode("overwrite").jdbc(url,"logDetail",prop)
  112. println("完成写入数据库")
  113. }
  114. }

UserAnaylsis.scala

  1. package spark
  2. import java.text.SimpleDateFormat
  3. import java.util.Properties
  4. import org.apache.spark.sql.SparkSession
  5. import org.apache.spark.sql.functions.{datediff, unix_timestamp}
  6. object UserAnalysis {
  7. def main(args: Array[String]): Unit = {
  8. val spark = SparkSession.builder().appName("userAnalysis").master("local").getOrCreate()
  9. val sc = spark.sparkContext
  10. val prop = new Properties()
  11. prop.put("user", "root")
  12. prop.put("password", "******")
  13. prop.put("driver", "com.mysql.jdbc.Driver")
  14. val url = "jdbc:mysql://localhost:3306/python_db"
  15. val dataFrame = spark.read.jdbc(url, "logdetail", prop)
  16. dataFrame.show(10)
  17. //所有的注册用户信息(userID,register_time,注册行为)
  18. val registerDF = dataFrame.filter(dataFrame("actionName") === ("Registered"))
  19. .select("userUID","event_time", "actionName")
  20. .withColumnRenamed("event_time","register_time")
  21. .withColumnRenamed("userUID","regUID")
  22. // registerDF.show(5)
  23. //原获取的日期格式为2018-09-04T20:27:31+08:00,只需要获取前10个字段(yyyy-mm-dd)
  24. val registDF2 = registerDF
  25. .select(registerDF("regUID"),registerDF("register_time")
  26. .substr(1,10).as("register_date"),registerDF("actionName"))
  27. .distinct()
  28. // registDF2.show(5)
  29. //所有的用户登录信息DF(userUID,signin_time,登录行为)
  30. val signinDF = dataFrame.filter(dataFrame("actionName") === ("Signin"))
  31. .select("userUID","event_time", "actionName")
  32. .withColumnRenamed("event_time","signing_time")
  33. .withColumnRenamed("userUID","signUID")
  34. // signinDF.show(5)
  35. val signiDF2 = signinDF
  36. .select(signinDF("signUID"),signinDF("signing_time")
  37. .substr(1,10).as("signing_date"),signinDF("actionName"))
  38. .distinct()
  39. // signiDF2.show(5)
  40. //以inner方式将相同userUID加在一起
  41. val joinDF = registDF2
  42. .join(signiDF2,signiDF2("signUID") === registDF2("regUID"),joinType = "inner")
  43. // joinDF.show(5)
  44. //Spark内置的datediff函数求出第n和n+1天交集总数n
  45. val frame = joinDF
  46. .filter(datediff(joinDF("signing_date"),joinDF("register_date")) === 1)
  47. .groupBy(joinDF("register_date")).count()
  48. .withColumnRenamed("count","signcount")
  49. // frame.show(5)
  50. //过滤,只拿第n天和当天新增用户总数m
  51. val frame1 = registDF2
  52. .groupBy(registDF2("register_date")).count()
  53. .withColumnRenamed("count","regcount")
  54. // frame1.show(5)
  55. //将m和n放在一张表格中
  56. val frame2 = frame
  57. .join(frame1,"register_date")
  58. // frame2.show()
  59. //新增列名留存率,数值为n/m,求出第n天的用户留存率
  60. frame2.withColumn("留存率",frame2("signcount")/frame2("regcount"))
  61. .show()
  62. sc.stop()
  63. }
  64. }
标签: spark

本文转载自: https://blog.csdn.net/Legosnow/article/details/124169161
版权归原作者 Legosnow 所有, 如有侵权,请联系我们删除。

“Spark项目实战-数据清洗”的评论:

还没有评论