Saprk-日志实战
一、用户行为日志
1.概念
用户每次访问网站时所有的行为日志(访问、浏览、搜索、点击)
用户行为轨迹,流量日志
2.原因
分析日志:
网站页面访问量
网站的粘性
推荐
3.生产渠道
(1)Nginx
(2)Ajax
4.日志内容
日志数据内容:
1.访问的系统属性:操作系统、浏览器等
2.访问特征:点击URL,跳转页面(referer)、页面停留时间
3.访问信息:seesion_id、访问id信息(地市\运营商)
注意:Nginx配置,可以获取指定信息
5.意义
(1)网站的眼睛
投放广告收益
(2)网站的神经
网站布局(影响用户体验)
(3)网站的大脑
二、离线数据处理
1.处理流程
1)数据采集
Flume:
产生的Web日志,写入到HDFS
2)数据清洗
Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)
3)数据处理
按照业务逻辑进行统计分析
Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)
4)处理结果入库
RDBMS(MySQL)\NoSQL(HBase、Redis)
5)数据可视化展示
通过图形化展示:饼图、柱状图、地图、折线图
Echarts、HUE、Zeppelin
三、项目需求
code/video
需求一:
统计imooc主站最受欢迎的课程/手记Top N访问次数
需求二:
按地市统计imooc主站最受欢迎的Top N课程
a.根据IP地址获取出城市信息
b.窗口函数在Spark SQL中的使用
需求三:
按流量统计imooc主站最受欢迎的Top N课程
四、日志内容
需要字段:
访问时间、访问URL、访问过程耗费流量、访问IP地址
日志处理:
一般的日志处理方式,我们是需要进行分区的,
按照日志中的访问时间进行相应的分区,比如: d, h,m5(每5分钟一个分区)
输入:访问时间、访问URL、耗费的流量、访问IP地址信息
输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天
Maven打包
mvn install:install-file -Dfile=D:\ipdatabase-master\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar2 -DartifactId=ipdatabase -Dversion=1.0-Dpackaging=jar
五、数据清洗
1.原始日志解析
packagecom.saddam.spark.MuKe.ImoocProject
importorg.apache.spark.SparkContext
importorg.apache.spark.sql.SparkSession
/**
* 第一步清洗:抽取出所需要指定列数据
*
* 添加断点,可以查看各个字段
*/object SparkStatFormatJob {def main(args: Array[String]):Unit={val spark=SparkSession
.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate()val logRDD = spark.sparkContext.textFile("D:\\Spark\\DataSets\\access.20161111.log")// logRDD.take(10).foreach(println)val result = logRDD.map(line =>{val split = line.split(" ")val ip = split(0)/**
* 原始日志的第三个和第四个字段拼接起来就是完整的时间字段:
* [10/Nov/2016:00:01:02 +0800]==>yyyy-MM-dd HH
*///TODO 使用时间解析工具类val time = split(3)+" "+ split(4)//"http://www.imooc.com/code/1852" 引号需要放空val url = split(11).replaceAll("\"","")val traffic = split(9)//使用元组// (ip,DateUtils.parse(time),url,traffic)
DateUtils.parse(time)+"\t"+ url +"\t"+ traffic +"\t"+ ip
}).take(20).foreach(println)// result.saveAsTextFile("D:\\Spark\\OutPut\\log_local_2")/*
(10.100.0.1,[10/Nov/2016:00:01:02 +0800])
(117.35.88.11,[10/Nov/2016:00:01:02 +0800])
(182.106.215.93,[10/Nov/2016:00:01:02 +0800])
(10.100.0.1,[10/Nov/2016:00:01:02 +0800])
*/
spark.stop()}}
2.日期工具类
packagecom.saddam.spark.MuKe.ImoocProject
importjava.util.{Date, Locale}importorg.apache.commons.lang3.time.FastDateFormat
/**
* 日期时间解析工具类
*/object DateUtils {// 输入文件日期时间格式//[10/Nov/2016:00:01:02 +0800]val YYYYMMDDHHMM_TIME_FOEMAT= FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z",Locale.ENGLISH)//目标日期格式//2016-11-10 00:01:02val TARGET_FORMAT=FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")/**
*解析时间
* @param time
* @return
*/def parse(time:String)={
TARGET_FORMAT.format(new Date(getTime(time)))}/**
* 获取输入日志时间:long类型
*
* time:[10/Nov/2016:00:01:02 +0800]
* @param time
* @return
*/def getTime(time:String)={try{
YYYYMMDDHHMM_TIME_FOEMAT.parse(time.substring(time.indexOf("[")+1, time.lastIndexOf("]"))).getTime
}catch{case e: Exception =>{0l}}}def main(args: Array[String]):Unit={
println(parse("[10/Nov/2016:00:01:02 +0800]"))}}
六、项目需求
需求一
统计imooc主站最受欢迎的课程/手记TopN访问次数
按照需求完成统计信息并将统计结果入库
--使用DataFrame API完成统计分析
--使用SQL API完成统计分析
packagecom.saddam.spark.MuKe
importjava.sql.{Connection, DriverManager, PreparedStatement}importorg.apache.spark.sql.{DataFrame, SparkSession}importorg.apache.spark.sql.functions._
importscala.collection.mutable.ListBuffer
object PopularVideoVisits {def main(args: Array[String]):Unit={val spark=SparkSession
.builder().appName("TopNStatJob").config("spark.sql.sources.partitionColumnTypeInference.enable","false").master("local[2]").getOrCreate()val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")
accessDF.printSchema()
accessDF.show(false)/*
+----------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|url |cmsType|cmsId|traffic|ip |city|time |day |
+----------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc.com/video/4500 |video |4500 |304 |218.75.35.226 |鏈煡 |2017-05-11 14:09:14|20170511|
|http://www.imooc.com/video/14623 |video |14623|69 |202.96.134.133 |鏈煡 |2017-05-11 15:25:05|20170511|
|http://www.imooc.com/article/17894|article|17894|115 |202.96.134.133 |鏈煡 |2017-05-11 07:50:01|20170511|
*///代码重构val day="20170511"
videoAccessTopNStat(spark,accessDF,day)//MySQL工具类测试
println(MySQLUtils.getConnection())/**
* 按照流量进行统计
*/def videoAccessTopNStat(spark:SparkSession,accessDF:DataFrame,day:String)={//隐式转换importspark.implicits._
//TODO 统计方式一:DataFrame方式统计videoval videoAccessTopNDF= accessDF
.filter($"day"===day && $"cmsType"==="video").groupBy("day","cmsId").agg(count("cmsId").as("times"))
videoAccessTopNDF.printSchema()
videoAccessTopNDF.show(false)//TODO 统计方式二:SQL方式统计article
accessDF.createOrReplaceTempView("temp")val videoAccessTopNSQL = spark.sql("select "+"day,cmsId,count(1) as times "+"from temp "+"where day='20170511' and cmsType='article' "+"group by day,cmsId "+"order by times desc")
videoAccessTopNSQL.show(false)/**
* TODO 将最受欢迎的TopN课程统计结果写入MySQL
*
*/try{
videoAccessTopNSQL.foreachPartition(partitionOfRecords=>{val list =new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info=>{val day=info.getAs[Integer]("day").toString
val cmsId=info.getAs[Long]("cmsId")val times=info.getAs[Long]("times")
list.append(DayVideoAccessStat(day,cmsId,times))})
StatDAO.insertDayVideoAccessTopN(list)})}catch{case e:Exception=>e.printStackTrace()}}
spark.stop()}/**
* 课程访问次数实体类
*/caseclass DayVideoAccessStat(day:String,cmsId:Long,times:Long)/**
* TODO MySQL操作工具类
*/object MySQLUtils{def getConnection()={
DriverManager.getConnection("jdbc:mysql://121.37.2x.xx:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")}/**
* 释放数据库连接等资源
* @param connection
* @param pstmt
*/def release(connection: Connection, pstmt: PreparedStatement):Unit={try{if(pstmt !=null){
pstmt.close()}}catch{case e: Exception => e.printStackTrace()}finally{if(connection !=null){
connection.close()}}}}/**
* TODO DAO数据库接口
*/object StatDAO{/**
* 批量保存DayVideoAccessStat到数据库
* insertDayVideoAccessTopN:每天访问视频的
*/def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]):Unit={var connection:Connection =nullvar pstmt:PreparedStatement =nulltry{
connection =MySQLUtils.getConnection()
connection.setAutoCommit(false)//设置手动提交val sql ="insert into day_video_access_topn_stat2(day,cms_id,times) values (?,?,?)"
pstmt = connection.prepareStatement(sql)for(ele <- list){
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setLong(3, ele.times)
pstmt.addBatch()}
pstmt.executeBatch()// 执行批量处理
connection.commit()//手工提交}catch{case e: Exception => e.printStackTrace()}finally{
MySQLUtils.release(connection, pstmt)}}}}
需求二
按地市统计imooc主站最受欢迎的Top N课程
packagecom.saddam.spark.MuKe
importjava.sql.{Connection, DriverManager, PreparedStatement}importorg.apache.spark.sql.{DataFrame, SparkSession}importorg.apache.spark.sql.functions._
importscala.collection.mutable.ListBuffer
object PopularCiytVideoVisits {def main(args: Array[String]):Unit={val spark=SparkSession
.builder().appName("TopNStatJob").config("spark.sql.sources.partitionColumnTypeInference.enable","false").master("local[2]").getOrCreate()val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")
accessDF.printSchema()
accessDF.show(false)//代码重构val day="20170511"//TODO 按照地市进行统计TopN课程
cityAccessTopNStat(spark,accessDF,day)/**
* 按照地市进行统计TopN课程
* @param spark
* @param accessDf
*/def cityAccessTopNStat(spark: SparkSession,accessDF:DataFrame,day:String)={importspark.implicits._
val cityAccessTopNDF=accessDF.filter($"day"===day&&$"cmsType"==="video").groupBy("day","city","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
cityAccessTopNDF.printSchema()
cityAccessTopNDF.show(false)//Windows函数在Spark SQL的使用val top3DF=cityAccessTopNDF.select(
cityAccessTopNDF("day"),
cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"),
cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city")).orderBy(cityAccessTopNDF("times").desc)).as("times_rank")).filter("times_rank <=3")//.show(false) //Top3/**
* 将地市进行统计TopN课程统计结果写入MySQL
*
*/try{
top3DF.foreachPartition(partitionOfRecords=>{val list =new ListBuffer[DayCityVideoAccessStat]
partitionOfRecords.foreach(info=>{val day=info.getAs[Integer]("day").toString
val cmsId=info.getAs[Long]("cmsId")val city=info.getAs[String]("city")val times=info.getAs[Long]("times")val timesRank=info.getAs[Int]("times_rank")
list.append(DayCityVideoAccessStat(day,cmsId,city,times,timesRank))})
StatDAO.insertDayCityVideoAccessTopN(list)})}catch{case e:Exception=>e.printStackTrace()}}
spark.stop()}/**
* 实体类
*/caseclass DayCityVideoAccessStat(day:String, cmsId:Long, city:String,times:Long,timesRank:Int)/**
* TODO MySQL操作工具类
*/object MySQLUtils{def getConnection()={
DriverManager.getConnection("jdbc:mysql://121.37.2x.xx:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")}/**
* 释放数据库连接等资源
* @param connection
* @param pstmt
*/def release(connection: Connection, pstmt: PreparedStatement):Unit={try{if(pstmt !=null){
pstmt.close()}}catch{case e: Exception => e.printStackTrace()}finally{if(connection !=null){
connection.close()}}}}/**
* TODO DAO数据库接口
*/object StatDAO{/**
* 批量保存DayCityVideoAccessStat到数据库
*/def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]):Unit={var connection: Connection =nullvar pstmt: PreparedStatement =nulltry{
connection = MySQLUtils.getConnection()
connection.setAutoCommit(false)//设置手动提交val sql ="insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
pstmt = connection.prepareStatement(sql)for(ele <- list){
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setString(3, ele.city)
pstmt.setLong(4, ele.times)
pstmt.setInt(5, ele.timesRank)
pstmt.addBatch()}
pstmt.executeBatch()// 执行批量处理
connection.commit()//手工提交}catch{case e: Exception => e.printStackTrace()}finally{
MySQLUtils.release(connection, pstmt)}}}}
需求三
按流量统计imooc主站最受欢迎的Top N课程
packagecom.saddam.spark.MuKe
importjava.sql.{Connection, DriverManager, PreparedStatement}importorg.apache.spark.sql.{DataFrame, SparkSession}importorg.apache.spark.sql.functions._
importscala.collection.mutable.ListBuffer
object VideoTrafficVisits {def main(args: Array[String]):Unit={val spark=SparkSession
.builder().appName("TopNStatJob").config("spark.sql.sources.partitionColumnTypeInference.enable","false").master("local[2]").getOrCreate()val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")
accessDF.printSchema()
accessDF.show(false)//代码重构val day="20170511"//TODO 按照流量进行统计
videoTrafficsTopNStat(spark,accessDF,day)/**
* 按照流量进行统计
*/def videoTrafficsTopNStat(spark: SparkSession, accessDF:DataFrame,day:String):Unit={importspark.implicits._
val cityAccessTopNDF = accessDF.filter($"day"=== day && $"cmsType"==="video").groupBy("day","cmsId").agg(sum("traffic").as("traffics")).orderBy($"traffics".desc)//.show(false)/**
* 将流量进行统计TopN课程统计结果写入MySQL
*
*/try{
cityAccessTopNDF.foreachPartition(partitionOfRecords=>{val list =new ListBuffer[DayVideoTrafficsStat]
partitionOfRecords.foreach(info=>{val day=info.getAs[Integer]("day").toString
val cmsId=info.getAs[Long]("cmsId")val traffics=info.getAs[Long]("traffics")
list.append(DayVideoTrafficsStat(day,cmsId,traffics))})
StatDAO.insertDayVideoTrafficsAccessTopN(list)})}catch{case e:Exception=>e.printStackTrace()}}
spark.stop()}/**
* 实体类
*/caseclass DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)/**
* TODO MySQL操作工具类
*/object MySQLUtils{def getConnection()={
DriverManager.getConnection("jdbc:mysql://121.37.2x.2xx1:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")}/**
* 释放数据库连接等资源
* @param connection
* @param pstmt
*/def release(connection: Connection, pstmt: PreparedStatement):Unit={try{if(pstmt !=null){
pstmt.close()}}catch{case e: Exception => e.printStackTrace()}finally{if(connection !=null){
connection.close()}}}}/**
* TODO DAO数据库接口
*/object StatDAO{/**
* 批量保存DayVideoTrafficsStat到数据库
*/def insertDayVideoTrafficsAccessTopN(list: ListBuffer[DayVideoTrafficsStat]):Unit={var connection: Connection =nullvar pstmt: PreparedStatement =nulltry{
connection = MySQLUtils.getConnection()
connection.setAutoCommit(false)//设置手动提交val sql ="insert into day_video_traffics_topn_stat(day,cms_id,traffics) values (?,?,?) "
pstmt = connection.prepareStatement(sql)for(ele <- list){
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setLong(3, ele.traffics)
pstmt.addBatch()}
pstmt.executeBatch()// 执行批量处理
connection.commit()//手工提交}catch{case e: Exception => e.printStackTrace()}finally{
MySQLUtils.release(connection, pstmt)}}}}
删除已有数据
/**
* 删除指定日期的数据
*/def deleteData(day:String):Unit={val tables = Array("day_video_access_topn_stat","day_video_city_access_topn_stat","day_video_traffics_topn_stat")var connection:Connection =nullvar pstmt:PreparedStatement =nulltry{
connection = MySQLUtils.getConnection()for(table <- tables){// delete from table ....val deleteSQL =s"delete from $table where day = ?"
pstmt = connection.prepareStatement(deleteSQL)
pstmt.setString(1, day)
pstmt.executeUpdate()}}catch{case e:Exception => e.printStackTrace()}finally{
MySQLUtils.release(connection, pstmt)}}
七、Zeppelin
官网
https://zeppelin.apache.org/
1.解压缩
[root@hadoop src]# tar zxvf zeppelin-0.7.1-bin-all
2.改名
[root@hadoop src]# mv zeppelin-0.7.1-bin-all zeppelin
3.启动
[root@hadoop bin]# ./zeppelin-daemon.sh start
4.Web界面
http://121.37.2xx.xx:8080
5.修改JDBC驱动
com.mysql.jdbc.Driver
xxxxxx
jdbc:mysql://121.37.2x.xx:3306/imooc_project?
root
#mysql驱动
/usr/local/src/mysql-connector-java-5.1.27-bin.jar
6.创建note
7.查询表
%jdbc
show tables;
8.图形展示
%jdbc
select cms_id,times from day_video_access_topn_stat;
八、Spark on Yarn
Spark运行模式
1)Local:开发时使用
2)Standalone:Spark自带的,若一个集群是standalone,则需要在多台机器上同时部署Spark
3)YARN:建议生产上使用该模式,统一使用yarn进行整个集群作业(MR、Spark)的资源调度
4)Mesos
不管使用那种模式,Spark应用程序代码是一模一样的,只需要在提交的时候指定--master指定
1.概述
Spark支持可插拔的集群管理模式
对于yarn而言,Spark Application仅仅只是一个客户端而已
2.client模式
Driver运行在Client端(提交Spark作业的机器)
Client会和请求到的Container进行通信来完成作业的调度和执行,Client是不能退出的
日志信息在控制台输出,便于我们测试
3.cluster模式
Driver运行在ApplicationMaster中
Client提交完作业就可以关掉,因为作业已在
Yarn上运行了
日志在终端输出,看控制台不到的,因为日志在Driver端,只能通过yarn logs -applicationId
4.两种模式对比
Driver运行位置
ApplicationMaster的职责
运行输出日志的位置
5.案例
设置HADOOP_CONF_DIR=?
Client模式
./bin/spark-submit \--class org.apache.spark.examples.SparkPi \--masteryarn\
--executor-memory 1G \
--num-executors 1\
/usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar \4
Pi is roughly 3.141137852844632322/02/28 18:52:26 INFO server.ServerConnector: Stopped Spark@1b0a7baf{HTTP/1.1}{0.0.0.0:4040}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@8a589a2{/stages/stage/kill,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@192f2f27{/jobs/job/kill,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@1bdf8190{/api,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@4f8969b0{/,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6fefce9e{/static,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@74cec793{/executors/threadDump/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@f9b7332{/executors/threadDump,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@18e7143f{/executors/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@209775a9{/executors,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5db4c359{/environment/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2c177f9e{/environment,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@33617539{/storage/rdd/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@47874b25{/storage/rdd,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@290b1b2e{/storage/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@1fc0053e{/storage,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@77307458{/stages/pool/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@389adf1d{/stages/pool,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@7bf9b098{/stages/stage/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@72e34f77{/stages/stage,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6e9319f{/stages/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6fa590ba{/stages,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2416a51{/jobs/job/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@293bb8a5{/jobs/job,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@37ebc9d8{/jobs/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5217f3d0{/jobs,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.184.135:4040
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
22/02/28 18:52:26 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/02/28 18:52:26 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Stopped
22/02/28 18:52:26 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!22/02/28 18:52:26 INFO memory.MemoryStore: MemoryStore cleared
22/02/28 18:52:26 INFO storage.BlockManager: BlockManager stopped
22/02/28 18:52:26 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
22/02/28 18:52:26 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!22/02/28 18:52:26 INFO spark.SparkContext: Successfully stopped SparkContext
22/02/28 18:52:26 INFO util.ShutdownHookManager: Shutdown hook called
22/02/28 18:52:26 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a95834c0-d38b-457b-89b2-fed00d5bef56
Cluster模式
./bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master yarn-cluster \
--executor-memory 1G \
--num-executors 1\
/usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar \4
Pi is roughly 3.141137852844632322/02/28 18:52:26 INFO server.ServerConnector: Stopped Spark@1b0a7baf{HTTP/1.1}{0.0.0.0:4040}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@8a589a2{/stages/stage/kill,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@192f2f27{/jobs/job/kill,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@1bdf8190{/api,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@4f8969b0{/,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6fefce9e{/static,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@74cec793{/executors/threadDump/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@f9b7332{/executors/threadDump,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@18e7143f{/executors/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@209775a9{/executors,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5db4c359{/environment/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2c177f9e{/environment,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@33617539{/storage/rdd/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@47874b25{/storage/rdd,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@290b1b2e{/storage/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@1fc0053e{/storage,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@77307458{/stages/pool/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@389adf1d{/stages/pool,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@7bf9b098{/stages/stage/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@72e34f77{/stages/stage,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6e9319f{/stages/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6fa590ba{/stages,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2416a51{/jobs/job/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@293bb8a5{/jobs/job,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@37ebc9d8{/jobs/json,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5217f3d0{/jobs,null,UNAVAILABLE,@Spark}22/02/28 18:52:26 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.184.135:4040
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
22/02/28 18:52:26 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/02/28 18:52:26 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Stopped
22/02/28 18:52:26 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!22/02/28 18:52:26 INFO memory.MemoryStore: MemoryStore cleared
22/02/28 18:52:26 INFO storage.BlockManager: BlockManager stopped
22/02/28 18:52:26 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
22/02/28 18:52:26 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!22/02/28 18:52:26 INFO spark.SparkContext: Successfully stopped SparkContext
22/02/28 18:52:26 INFO util.ShutdownHookManager: Shutdown hook called
22/02/28 18:52:26 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a95834c0-d38b-457b-89b2-fed00d5bef56
[root@hadoop01 spark]# ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --executor-memory 1G --num-executors 1 /usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar 4
Warning: Master yarn-cluster is deprecated since 2.0. Please use master "yarn" with specified deploy mode instead.
22/02/28 18:54:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/28 18:54:32 WARN util.Utils: Your hostname, hadoop01.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.184.135 instead (on interface ens33)22/02/28 18:54:32 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/02/28 18:54:32 INFO client.RMProxy: Connecting to ResourceManager at hadoop01/192.168.184.135:8032
22/02/28 18:54:32 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers
22/02/28 18:54:33 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)22/02/28 18:54:33 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
22/02/28 18:54:33 INFO yarn.Client: Setting up container launch context for our AM
22/02/28 18:54:33 INFO yarn.Client: Setting up the launch environment for our AM container
22/02/28 18:54:33 INFO yarn.Client: Preparing resources for our AM container
22/02/28 18:54:33 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
22/02/28 18:54:35 INFO yarn.Client: Uploading resource file:/tmp/spark-c7e3fb91-c7d0-4f59-86ba-705a9f256144/__spark_libs__3085975169933820625.zip -> hdfs://hadoop01:9000/user/root/.sparkStaging/application_1646041633964_0004/__spark_libs__3085975169933820625.zip
22/02/28 18:54:35 INFO yarn.Client: Uploading resource file:/usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar -> hdfs://hadoop01:9000/user/root/.sparkStaging/application_1646041633964_0004/spark-examples_2.11-2.1.1.jar
22/02/28 18:54:35 INFO yarn.Client: Uploading resource file:/tmp/spark-c7e3fb91-c7d0-4f59-86ba-705a9f256144/__spark_conf__2818552262823480245.zip -> hdfs://hadoop01:9000/user/root/.sparkStaging/application_1646041633964_0004/__spark_conf__.zip
22/02/28 18:54:35 INFO spark.SecurityManager: Changing view acls to: root
22/02/28 18:54:35 INFO spark.SecurityManager: Changing modify acls to: root
22/02/28 18:54:35 INFO spark.SecurityManager: Changing view acls groups to:
22/02/28 18:54:35 INFO spark.SecurityManager: Changing modify acls groups to:
22/02/28 18:54:35 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled;users with view permissions: Set(root);groups with view permissions: Set();users with modify permissions: Set(root);groups with modify permissions: Set()22/02/28 18:54:35 INFO yarn.Client: Submitting application application_1646041633964_0004 to ResourceManager
22/02/28 18:54:35 INFO impl.YarnClientImpl: Submitted application application_1646041633964_0004
22/02/28 18:54:36 INFO yarn.Client: Application report for application_1646041633964_0004 (state: ACCEPTED)22/02/28 18:54:36 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1646045675928
final status: UNDEFINED
tracking URL: http://hadoop01:8088/proxy/application_1646041633964_0004/
user: root
22/02/28 18:54:37 INFO yarn.Client: Application report for application_1646041633964_0004 (state: ACCEPTED)22/02/28 18:54:38 INFO yarn.Client: Application report for application_1646041633964_0004 (state: ACCEPTED)22/02/28 18:54:39 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)22/02/28 18:54:39 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 192.168.184.135
ApplicationMaster RPC port: 0
queue: default
start time: 1646045675928
final status: UNDEFINED
tracking URL: http://hadoop01:8088/proxy/application_1646041633964_0004/
user: root
22/02/28 18:54:40 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)22/02/28 18:54:41 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)22/02/28 18:54:42 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)22/02/28 18:54:43 INFO yarn.Client: Application report for application_1646041633964_0004 (state: FINISHED)22/02/28 18:54:43 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 192.168.184.135
ApplicationMaster RPC port: 0
queue: default
start time: 1646045675928
final status: SUCCEEDED
tracking URL: http://hadoop01:8088/proxy/application_1646041633964_0004/
user: root
22/02/28 18:54:43 INFO util.ShutdownHookManager: Shutdown hook called
22/02/28 18:54:43 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-c7e3fb91-c7d0-4f59-86ba-705a9f256144
6.检测ID
[root@hadoop01 spark]# yarn logs -applicationId application_1646041633964_000322/02/28 18:59:05 INFO client.RMProxy: Connecting to ResourceManager at hadoop01/192.168.184.135:8032
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in[jar:file:/usr/local/src/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in[jar:file:/usr/local/src/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type[org.slf4j.impl.Log4jLoggerFactory]
/tmp/logs/root/logs/application_1646041633964_0003 does not exist.
Log aggregation has not completed or is not enabled.
#未指定参数,看不到,未作聚合日志配置,需要通过webUI页面
7.WebUI查看结果
http://hadoop01:8042/node/containerlogs/container_1646041633964_0004_01_000001/root
九、Spark项目运行到YARN
maven打包依赖
1.IDEA项目代码-词频统计
packagecom.bigdataimportorg.apache.spark.sql.SparkSession
object WordCountYARN {def main(args: Array[String]):Unit={val spark=SparkSession
.builder().getOrCreate()if(args.length!=2){
println("Usage:WordCountYARN <inputPath><outputPath>")}val Array(inputPath,outputPath)=args
val rdd = spark.sparkContext.textFile(inputPath)val df = rdd.flatMap(x=>x.split("\t")).map(word=>(word,1)).reduceByKey((a,b)=>(a+b))
df.saveAsTextFile(outputPath)
spark.stop()}}
2.spark-submit
spark-submit \--class com.bigdata.WordCountYARN \--name WordCount \--masteryarn\
--executor-memory 1G \
--num-executors 1\
/usr/local/src/spark/spark_jar/BYGJ.jar \
hdfs://hadoop01:9000/wordcount.txt hdfs://hadoop01:9000/wc_output
3.查询结果
[root@hadoop01 spark_jar]# hadoop fs -cat /wc_output/part-*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in[jar:file:/usr/local/src/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in[jar:file:/usr/local/src/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type[org.slf4j.impl.Log4jLoggerFactory](hive,5)(spark,5)(hadoop,2)(hbase,3)
-----------------------------------------
1.IDEA项目代码-日志清洗
packagecom.saddam.spark.MuKe.ImoocProject.LogClean
importorg.apache.spark.sql.{SaveMode, SparkSession}object SparkStatCleanJobYarn {def main(args: Array[String]):Unit={val spark=SparkSession
.builder().getOrCreate()if(args.length!=2){
println("Usage:WordCountYARN <inputPath><outputPath>")}val Array(inputPath,outputPath)=args
val accessRDD = spark.sparkContext.textFile(inputPath)//TODO RDD->DFval accessDF=spark.createDataFrame(accessRDD.map(x=>AccessConvertUtil.parseLog(x)),AccessConvertUtil.struct)
accessDF
.coalesce(1).write
.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(outputPath)
spark.stop()}}
2.spark-submit
spark-submit \--class com.saddam.spark.MuKe.ImoocProject.LogClean.SparkStatCleanJobYarn \--name SparkStatCleanJobYarn \--masteryarn\
--executor-memory 1G \
--num-executors 1\--files /usr/local/src/spark/spark_jar/ipDatabase.csv,/usr/local/src/spark/spark_jar/ipRegion.xlsx \
/usr/local/src/spark/spark_jar/Spark.jar \
hdfs://hadoop01:9000/access.log hdfs://hadoop01:9000/log_output
3.查询结果
进入spark-shell
[root@hadoop01 datas]# spark-shell --master local[2] --jars /usr/local/src/mysql-connector-java-5.1.27-bin.jar
获取hdfs输出文件
/log_output/day=20170511/part-00000-36e30abb-3e42-4237-ad9f-a9f93258d4b2.snappy.parquet
读取文件
scala> spark.read.format("parquet").parquet("/log_output/day=20170511/part-00000-36e30abb-3e42-4237-ad9f-a9f93258d4b2.snappy.parquet").show(false)
SLF4J: Failed to loadclass"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.+----------------------------------+-------+-----+-------+---------------+----+-------------------+|url |cmsType|cmsId|traffic|ip |city|time |+----------------------------------+-------+-----+-------+---------------+----+-------------------+|http://www.imooc.com/video/4500|video |4500|304|218.75.35.226||2017-05-1114:09:14||http://www.imooc.com/video/14623|video |14623|69|202.96.134.133||2017-05-1115:25:05||http://www.imooc.com/article/17894|article|17894|115|202.96.134.133||2017-05-1107:50:01||http://www.imooc.com/article/17896|article|17896|804|218.75.35.226||2017-05-1102:46:43||http://www.imooc.com/article/17893|article|17893|893|222.129.235.182||2017-05-1109:30:25||http://www.imooc.com/article/17891|article|17891|407|218.75.35.226||2017-05-1108:07:35||http://www.imooc.com/article/17897|article|17897|78|202.96.134.133||2017-05-1119:08:13||http://www.imooc.com/article/17894|article|17894|658|222.129.235.182||2017-05-1104:18:47||http://www.imooc.com/article/17893|article|17893|161|58.32.19.255||2017-05-1101:25:21||http://www.imooc.com/article/17895|article|17895|701|218.22.9.56||2017-05-1113:37:22||http://www.imooc.com/article/17892|article|17892|986|218.75.35.226||2017-05-1105:53:47||http://www.imooc.com/video/14540|video |14540|987|58.32.19.255||2017-05-1118:44:56||http://www.imooc.com/article/17892|article|17892|610|218.75.35.226||2017-05-1117:48:51||http://www.imooc.com/article/17893|article|17893|0|218.22.9.56||2017-05-1116:20:03||http://www.imooc.com/article/17891|article|17891|262|58.32.19.255||2017-05-1100:38:01||http://www.imooc.com/video/4600|video |4600|465|218.75.35.226||2017-05-1117:38:16||http://www.imooc.com/video/4600|video |4600|833|222.129.235.182||2017-05-1107:11:36||http://www.imooc.com/article/17895|article|17895|320|222.129.235.182||2017-05-1119:25:04||http://www.imooc.com/article/17898|article|17898|460|202.96.134.133||2017-05-1115:14:28||http://www.imooc.com/article/17899|article|17899|389|222.129.235.182||2017-05-1102:43:15|+----------------------------------+-------+-----+-------+---------------+----+-------------------+
only showing top 20 rows
十、项目性能调优
1.集群优化
存储格式的选择:https://www.infoq.cn/article/bigdata-store-choose/
压缩格式的选择:
默认:snapy
.config("spark.sql.parquet.compression.codec","gzip")修改
2.代码优化
选择高性能算子
复用已有的数据
3.参数优化
并行度:
spark.sql.shuffle.partitions
200
配置在为联接或聚合进行数据洗牌时使用的分区数。
spark-submit:
--conf spark.sql.shuffle.partitions=500
IDEA:
.config("","")
分区字段类型推测:
spark.sql.sources.partitionColumnTypeInference.enabled
spark-submit:
--conf spark.sql.sources.partitionColumnTypeInference.enabled=false
IDEA:
.config("","")
262 |58.32.19.255 | |2017-05-11 00:38:01|
|http://www.imooc.com/video/4600 |video |4600 |465 |218.75.35.226 | |2017-05-11 17:38:16|
|http://www.imooc.com/video/4600 |video |4600 |833 |222.129.235.182| |2017-05-11 07:11:36|
|http://www.imooc.com/article/17895|article|17895|320 |222.129.235.182| |2017-05-11 19:25:04|
|http://www.imooc.com/article/17898|article|17898|460 |202.96.134.133 | |2017-05-11 15:14:28|
|http://www.imooc.com/article/17899|article|17899|389 |222.129.235.182| |2017-05-11 02:43:15|
±---------------------------------±------±----±------±--------------±—±------------------+
only showing top 20 rows
## 十、项目性能调优
### 1.集群优化
~~~markdown
存储格式的选择:https://www.infoq.cn/article/bigdata-store-choose/
压缩格式的选择:
默认:snapy
.config("spark.sql.parquet.compression.codec","gzip")修改
2.代码优化
选择高性能算子
复用已有的数据
3.参数优化
并行度:
spark.sql.shuffle.partitions
200
配置在为联接或聚合进行数据洗牌时使用的分区数。
spark-submit:
--conf spark.sql.shuffle.partitions=500
IDEA:
.config("","")
分区字段类型推测:
spark.sql.sources.partitionColumnTypeInference.enabled
spark-submit:
--conf spark.sql.sources.partitionColumnTypeInference.enabled=false
IDEA:
.config("","")
版权归原作者 醉里挑灯代码 所有, 如有侵权,请联系我们删除。