0


【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户

🚀 作者 :“大数据小禅”

🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容
🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪

手机流量日志处理

SparkSQL简介

  • Spark SQL是Apache Spark的一个模块,提供了一种基于结构化数据的编程接口。它允许用户使用SQL语句或DataFrame API来查询和操作数据,同时还支持使用Spark的分布式计算引擎进行高效的并行计算。
  • Spark SQL支持多种数据源,包括Hive、JSON、Parquet、Avro、ORC等,这些数据源可以通过DataFrame API或SQL语句进行查询和操作。同时,Spark SQL还提供了一些高级功能,如窗口函数、聚合函数、UDF等,以满足更复杂的数据分析需求。
  • Spark SQL还支持将SQL查询结果写入到外部数据源,如Hive表、JSON文件、Parquet文件等。此外,Spark SQL还提供了一些工具,如Spark SQL CLI、JDBC/ODBC驱动程序等,方便用户进行交互式查询和数据分析。
  • 使用前需要新引入对应依赖

依赖引入

使用Spark SQL需要在项目中添加以下依赖:

<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.1.2</version></dependency></dependencies>

其中,spark-sql_2.12是Spark SQL的核心依赖,spark-core_2.12是Spark的核心依赖。注意,版本号可以根据实际情况进行调整。

如果需要使用其他数据源,如MySQL、Hive等,则需要添加相应的依赖。例如,如果需要连接MySQL数据库,则需要添加以下依赖:

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>3.1.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency>

其中,spark-sql-kafka-0-10_2.12是连接Kafka数据源的依赖,mysql-connector-java是连接MySQL数据库的依赖。注意,版本号也可以根据实际情况进行调整。

以上是使用Maven进行依赖配置的方式。

SparkSQL快速入门案例

  • 准备数据
  • 我们假设有一个CSV文件employee.csv,包含了员工的信息,如下所示:
id,name,age,gender,salary
1,Jack,25,M,50002,Lucy,28,F,60003,Tom,30,M,80004,Lily,27,F,70005,David,32,M,9000

创建SparkSession对象
首先,我们需要创建一个SparkSession对象,它是Spark SQL的入口点。可以使用以下代码创建SparkSession对象:

importorg.apache.spark.sql.SparkSession

val spark =SparkSession.builder
  .appName("Spark SQL Demo").getOrCreate()//加载CSV文件//使用SparkSession对象的read方法加载CSV文件:

val df = spark.read
  .option("header","true").option("inferSchema","true").csv("employee.csv")//其中,header=true表示第一行是列名,inferSchema=true表示自动推断列的数据类型。//创建临时表//使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时表:

df.createOrReplaceTempView("employee")//执行SQL查询//使用SparkSession对象的sql方法执行SQL查询:
val result = spark.sql("SELECT * FROM employee WHERE age > 27")
这将返回所有年龄大于27岁的员工信息。

//输出结果//使用DataFrame的show方法输出查询结果:

result.show()//这将输出所有符合条件的员工信息。
  • 完整代码如下:
importorg.apache.spark.sql.SparkSession

val spark =SparkSession.builder
  .appName("Spark SQL Demo").getOrCreate()

val df = spark.read
  .option("header","true").option("inferSchema","true").csv("employee.csv")

df.createOrReplaceTempView("employee")

val result = spark.sql("SELECT * FROM employee WHERE age > 27")

result.show()
输出结果:

+---+----+---+------+-----+| id|name|age|gender|salary|+---+----+---+------+-----+|2|Lucy|28|F|6000||3|Tom|30|M|8000||5|David|32|M|9000|+---+----+---+------+-----+

手机流量日志数据格式与处理要求

  • 日志字段与字段说明如下在这里插入图片描述1.需要实现的需求1.按月统计流量使用量最多的用户(每个月使用流量最多的用户) 2.将结果数据持久化到硬盘

处理程序

/**
  * @Description
  * @Author xiaochan
  * @Version 1.0
  */// 时间戳         手机号码          基站物理地址             ip        接受数 接受数据包 上行流量  下行流量  状态码//2020-03-10    15707126156    QK-X7-7N-G2-1N-QZ:CMCC    212.188.187.220    33         40        67584       81920    200//使用量 =上+下  手机号码就是用户   RDD处理方式->((月,号码),(上行+下行))//1.下载手机流量日志//2.按月统计流量使用量最多的用户//3.将结果数据持久化到硬盘
object LogPhone{System.setProperty("hadoop.home.dir","F:\\hadoop-2.7.3\\hadoop-2.7.3")
  def main(args:Array[String]):Unit={//1.创建sparksession
    val sc =newsql.SparkSession.Builder().appName("test").master("local[6]").config("spark.testing.memory","471859201").getOrCreate()// 读取输入文件
    val log = sc.sparkContext.textFile("dataset\\phone.log")
    val value = log.map(_.split("\t")).filter(arr =>{!(arr(1)==null)}).map(tmp =>{//处理日期 获取月份
      val month:String=tmp(0).split("-")(1)//号码
      val user =tmp(1)//使用流量数var use =tmp(6)+tmp(7)Log(user, use.toLong, month)})
    sc.createDataFrame(value).createOrReplaceTempView("log")//每个月流量使用做多的用户 group by行数会减少,开窗函数over()行数不会减少
    val data:DataFrame= sc.sql("select user,month,useall from "+"(select user,month,sum(use) over(partition by user,month order by use desc) as useall,"+"dense_rank() over(partition by month order by use desc) as rn from log)t1 where rn=1 order by month")
    data.show()
    data.write.parquet("dataset\\output\\directory")

    sc.close()}}/**
  * @Description
  * @Author xiaochan
  * @Version 1.0
  */caseclassLog(
    user:String,
    use:Long,
    month:String)
  • 结果如下在这里插入图片描述

在这里插入图片描述

标签: spark 大数据 hive

本文转载自: https://blog.csdn.net/weixin_45574790/article/details/129792909
版权归原作者 大数据小禅 所有, 如有侵权,请联系我们删除。

“【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户”的评论:

还没有评论