0


Spark流处理日志+SSM前端展示(详细)

1.前言

最近在学习Spark Streaming,对流数据日志处理有了一些了解,加上学校近期开展了SSM框架的课程。一上头,就想把这东西给整和一下,源码我整合在Gitee上了,可自行取。

此项属于对前后端+Spark数据处理的一个整合,难度并不大,属于对Spark学习的横向扩展,适用于Spark Streaming的初学者。

考虑篇幅过长,先上源码:Spark流处理日志

2.实战内容

  1. python编写脚本定时生成伪随机日志
  2. SparkStream获取数据,将数据清洗过滤非法数据,然后分析日志中用户的访问量,统计访问课程数;访问搜索引擎,统计不同搜索引擎的点击量 - 将Spark Streaming处理的结果初步存入Mysql(可以缓存到Hbase中,取数可以更快)- Spark获取Mysql数据,统计课程和搜索引擎的点击量,存入Mysql
  3. 前端使用Spring, Spring Mvc, MyBatis展示数据
  4. 考虑到图像运用Echarts,所以使用Ajax异步传输数据给jsp

Spark工作流程图如下:

3.实战框架

实战工具:Intellij IDEA2021专业版( 因为社区版没有Tomcat,也没有具体试验过插件Smart Tomcat,其他功能社区版均能实现 ), PyCharm社区版,JDK版本1.8,Scala版本2.13

架构和涉及的技术栈:

  • Python-3.9
  • Spark-3.2.1
  • Mybatis-3.5.9
  • Spring Mvc-5.3.16
  • Spring-5.3.16
  • Echarts

4.项目实战

注意:本项目将Scala和Java板块合并在同一个工程中,所以只需要创建一个工程即可

1.环境搭建:

1)创建Maven工程,引入依赖pom.xml(源文件是完整的):

  1. <dependencies>
  2. <dependency>
  3. <groupId>mysql</groupId>
  4. <artifactId>mysql-connector-java</artifactId>
  5. <version>${mysql.version}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.mybatis</groupId>
  9. <artifactId>mybatis</artifactId>
  10. <version>${mybatis.version}</version>
  11. </dependency>
  12. <!-- https://mvnrepository.com/artifact/org.mybatis/mybatis-spring -->
  13. <dependency>
  14. <groupId>org.mybatis</groupId>
  15. <artifactId>mybatis-spring</artifactId>
  16. <version>2.0.7</version>
  17. </dependency>
  18. <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
  19. <dependency>
  20. <groupId>commons-logging</groupId>
  21. <artifactId>commons-logging</artifactId>
  22. <version>1.2</version>
  23. </dependency>
  24. <!-- https://mvnrepository.com/artifact/org.springframework/spring-expression -->
  25. <dependency>
  26. <groupId>org.springframework</groupId>
  27. <artifactId>spring-expression</artifactId>
  28. <version>${spring.version}</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.springframework</groupId>
  32. <artifactId>spring-context-support</artifactId>
  33. <version>${spring.version}</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.springframework</groupId>
  37. <artifactId>spring-jdbc</artifactId>
  38. <version>${spring.version}</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.springframework</groupId>
  42. <artifactId>spring-webmvc</artifactId>
  43. <version>${spring_mvc.vserion}</version>
  44. </dependency>
  45. <!-- https://mvnrepository.com/artifact/org.springframework/spring-core -->
  46. <dependency>
  47. <groupId>org.springframework</groupId>
  48. <artifactId>spring-core</artifactId>
  49. <version>${spring.version}</version>
  50. </dependency>
  51. <!-- https://mvnrepository.com/artifact/org.springframework/spring-context -->
  52. <dependency>
  53. <groupId>org.springframework</groupId>
  54. <artifactId>spring-context</artifactId>
  55. <version>${spring.version}</version>
  56. </dependency>
  57. <dependency>
  58. <groupId>net.sf.json-lib</groupId>
  59. <artifactId>json-lib</artifactId>
  60. <version>2.4</version>
  61. <classifier>jdk15</classifier>
  62. </dependency>
  63. <dependency>
  64. <groupId>com.fasterxml.jackson.core</groupId>
  65. <artifactId>jackson-databind</artifactId>
  66. <version>2.12.2</version>
  67. </dependency>
  68. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
  69. <dependency>
  70. <groupId>org.apache.spark</groupId>
  71. <artifactId>spark-core_2.13</artifactId>
  72. <version>${spark.version}</version>
  73. </dependency>
  74. <!-- https://mvnrepository.com/artifact/junit/junit -->
  75. <dependency>
  76. <groupId>junit</groupId>
  77. <artifactId>junit</artifactId>
  78. <version>4.13.2</version>
  79. <scope>test</scope>
  80. </dependency>
  81. <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
  82. <dependency>
  83. <groupId>org.scala-lang</groupId>
  84. <artifactId>scala-library</artifactId>
  85. <version>2.13.8</version>
  86. </dependency>
  87. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
  88. <dependency>
  89. <groupId>org.apache.spark</groupId>
  90. <artifactId>spark-sql_2.13</artifactId>
  91. <version>${spark.version}</version>
  92. </dependency>
  93. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
  94. <dependency>
  95. <groupId>org.apache.spark</groupId>
  96. <artifactId>spark-streaming_2.13</artifactId>
  97. <version>${spark.version}</version>
  98. </dependency>
  99. </dependencies>

2)下载Scala插件,再右键项目名添加框架支持,导入Scala模块

2.后端开发

1.在PyCharm创建python脚本,生成随机日志

一次脚本的完整运行,将会一次性产生2000条日志,以追加的方式平摊在文档中。根据脚本的编码,产生一条正确完整的Web日志的概率仅为20%,即一次运行,只有400条左右的日志会存入数据库中,详情可查看步骤2)

  1. import random
  2. import time
  3. # 创建url访问数组class
  4. url_paths = [
  5. "class/112.html",
  6. "class/128.html",
  7. "class/145.html",
  8. "class/146.html",
  9. "class/500.html",
  10. "class/250.html",
  11. "class/131.html",
  12. "learn/821",
  13. "learn/823",
  14. "learn/987",
  15. "learn/500",
  16. "course/list"
  17. ]
  18. # 创建ip数组,随机选择4个数字作为ip如132.168.30.87
  19. ip_slices = [132, 156, 124, 10, 29, 167, 143, 187, 30, 46, 55, 63, 72, 87, 98, 168, 192, 134, 111, 54, 64, 110, 43, 127, 12, 53, 99]
  20. # 搜索引擎访问数组
  21. # {query}
  22. # 代表搜索关键字
  23. http_referers = [
  24. "http://www.baidu.com/s?wd={query}",
  25. "https://www.sogou.com/web?query={query}",
  26. "http://cn.bing.com/search?q={query}",
  27. "https://search.yahoo.com/search?p={query}",
  28. ]
  29. # 搜索关键字数组
  30. search_keyword = [
  31. "Spark SQL",
  32. "Hadoop",
  33. "Storm",
  34. "Spark Streaming",
  35. "10 HoursCrash",
  36. "Blink",
  37. "SpringBoot",
  38. "Linux",
  39. "Vue.js",
  40. "Hive",
  41. "Flink",
  42. ]
  43. # 状态码数组
  44. status_codes = ["200", "504", "404", "403"]
  45. # 随机选择一个url
  46. def sample_url():
  47. return random.sample(url_paths, 1)[0]
  48. # 随机组合一个ip
  49. def sample_ip():
  50. slice = random.sample(ip_slices, 4)
  51. return ".".join([str(item) for item in slice])
  52. # 随机产生一个搜索url
  53. def sample_referer():
  54. # // 0.2的概率会产生非法url,用于模拟非法的用户日志
  55. if random.uniform(0, 1) > 0.8:
  56. return "-"
  57. refer_str = random.sample(http_referers, 1)
  58. query_str = random.sample(search_keyword, 1)
  59. return refer_str[0].format(query=query_str[0])
  60. # 随机产生一个数组
  61. def sample_status_code():
  62. return random.sample(status_codes, 1)[0]
  63. def generate_log(count=10, index=10):
  64. # 获取本机时间并将其作为访问时间写进访问日志中
  65. time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  66. indexes = str(index)
  67. file = "./data/OfflineWareHouse/随即日志生成器" + indexes + ".log"
  68. # 存储日志的目标文件(换成你自己的) w覆盖 a追加
  69. f = open(file, "a")
  70. # 组合用户日志
  71. while count >= 1:
  72. query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(url=sample_url(),
  73. ip=sample_ip(),
  74. referer=sample_referer(),
  75. status_code=sample_status_code(),
  76. local_time=time_str)
  77. f.write(query_log + "\n")
  78. count = count - 1
  79. f.close()
  80. # 执行main,每次产生100*20条用户日志
  81. counts = 20
  82. while counts:
  83. sleepTime = random.randint(6, 12)
  84. time.sleep(sleepTime)
  85. print("第%d个Web日志新增%d条数据" % (counts, 100))
  86. generate_log(100, counts)
  87. counts = counts - 1

2.SparkStreaming数据的初步清洗

Spark Streaming可整合多种输入数据源,如Kafka,Flume,HDFS,甚至是普通的TCP套接字流,本次是对本地模式中的固定目录进行监听,一旦发现有新的文件内容生成,Spark Streaming就会自动把文件内容读取出来,使用自定义的处理逻辑处理.

①.在主函数创建对应的工程包scala并标记为"源目录":

②.在数据清洗前,需要了解Web日志的规格设置,本日志数据与数据之间是通过"\t"也就是Tab键位分隔开的,下面是一条常规的Web日志,其规格如下,我们需要将method和url拆开,获取单独的GET,yahoo,Spark Streaming字段

  1. userIp = 168.46.55.10
  2. local_time = 2022-04-03 19:03:52
  3. method = "GET /class/146.html HTTP/1.1"
  4. status = 404
  5. url = https://search.yahoo.com/search?p=Spark Streaming

③.Spark Streaming的代码如下:

SparkStreaming的数据抽象为DStream,不同的数据抽象有不同的函数方法,为了完成需求,、整个日志的RDD格式走向为DStream[String]->RDD[String]->DataFrame

  1. package ssm
  2. import org.apache.spark.sql.{SQLContext}
  3. import java.util.Properties
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. object WareHouseStreaming {
  7. def main(args:Array[String]): Unit ={
  8. val sparkConf = new SparkConf().setAppName("WareHouseStreaming").setMaster("local")
  9. val prop = new Properties()
  10. prop.put("user", "root")
  11. prop.put("password", "******")
  12. prop.put("driver","com.mysql.jdbc.Driver")
  13. val scc = new StreamingContext(sparkConf, Seconds(10))
  14. val file = "file:///C:/Users/Lenovo/Desktop/Working/Python/data/OfflineWareHouse/"
  15. val lineDStream = scc.textFileStream(file)
  16. val lines = lineDStream.flatMap(_.split("\n"))
  17. val line = lines.map(_.replaceAll("\"","")).map(_.split("\t"))
  18. line.foreachRDD(rdd=>{
  19. val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  20. import sqlContext.implicits._
  21. rdd
  22. .map(x=>(x(0).trim,x(1).trim,x(2).trim,x(3).trim,x(4).trim))
  23. .toDF("userIp","local_time","method","status","url")
  24. .registerTempTable("speedtable")
  25. val dataFrame = sqlContext.sql("select * from speedtable ")
  26. //dataFrame.show(5)
  27. //println(dataFrame.count())
  28. //过滤状态码非200字段,过滤非法url字段
  29. //状态码为200的概率为1/4,产生合法url字段的概率为4/5,即一条正确的日志概率为1/5
  30. val d1 = dataFrame
  31. .filter(x => x(3) == "200")
  32. .filter(x => x(4) != "-")
  33. val dfDetail = d1.map( row =>{
  34. val methods = row.getAs[String]("method").split(" ")
  35. val urls = row.getAs[String]("url").split("\\?")
  36. val SE = urls(0)
  37. val names = urls(1)
  38. .split("=")
  39. val name = names(1)
  40. var map = Map("params" -> "null")
  41. val method = methods(0)
  42. val url = methods(1)
  43. val agreement = methods(2)
  44. (
  45. row.getAs[String]("userIp"),
  46. row.getAs[String]("local_time"),
  47. map.getOrElse("method",method),
  48. map.getOrElse("url",url),
  49. map.getOrElse("agreement",agreement),
  50. row.getAs[String]("status"),
  51. map.getOrElse("SE",SE),
  52. map.getOrElse("name",name)
  53. )
  54. }).toDF("userIp","local_time","method","url","agreement","status","SE","name")
  55. dfDetail.show(5)
  56. val url = "jdbc:mysql://localhost:3306/python_db"
  57. println("开始写入数据库")
  58. dfDetail.write.mode("Append").jdbc(url,"warehouse",prop)
  59. println("完成写入数据库,新增"+dfDetail.count()+"条数据")
  60. })
  61. scc.start()
  62. scc.awaitTermination()
  63. }
  64. }

** 3.统计课程点击量和搜索引擎的访问量**

相比较采用流数据处理问题,采用离线数据处理问题的方式简单快捷,程序可以独立运行,方便维护,缺点是计算量比流式计算模式大得多,因为会重复计算数据。但前端页面的刷新时间为10s,数据的时效性并不明显,10s对离线计算时间足以

课程点击量代码:

  1. package ssm
  2. import org.apache.spark.sql.{Row, SparkSession}
  3. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  4. import java.util.Properties
  5. object NameCount {
  6. def main(args:Array[String]): Unit ={
  7. val prop = new Properties()
  8. prop.put("user", "root")
  9. prop.put("password", "******")
  10. prop.put("driver", "com.mysql.jdbc.Driver")
  11. val url = "jdbc:mysql://localhost:3306/python_db"
  12. while(true){
  13. Thread.sleep(10000)
  14. println("开始更新课程点击量")
  15. lessCount(prop,url)
  16. }
  17. }
  18. def lessCount(prop:Properties,url:String): Unit = {
  19. val spark = SparkSession.builder().appName("NameCount").master("local").getOrCreate()
  20. val sc = spark.sparkContext
  21. /**
  22. 整体流程的数据类型为Mysql->
  23. DataFrame->{ RDD[Row]->RDD[String] }
  24. ->replace归并课程类型
  25. ->RDD[String,Int]统计点击量
  26. ->RDD[Row(String,Integer)]
  27. ->DataFrame->Mysql
  28. */
  29. import spark.implicits._
  30. val dataFrame = spark.read.jdbc(url, "warehouse", prop).select("name")
  31. // DataFrame->{ RDD[Row]->RDD[String] }
  32. val dataRDD = dataFrame.rdd.map(x=>x.mkString(""))
  33. // dataRDD.foreach(println)
  34. // ->replace归并课程类型
  35. val word = dataRDD
  36. .map(_.replaceAll("10 HoursCrash","Hadoop"))
  37. .map(_.replaceAll("Spark Streaming","Spark"))
  38. .map(_.replaceAll("Hive","Hadoop"))
  39. .map(_.replaceAll("Spark SQL","Spark"))
  40. .map(_.replaceAll("Blink","Flink"))
  41. //word.foreach(println)
  42. // ->RDD[String,Int]统计点击量
  43. val wordCount = word.map((_,1))
  44. .reduceByKey(_+_)
  45. .sortBy(_._2,false)
  46. //wordCount.foreach(println)
  47. // ->RDD[Row(String,Integer)]
  48. val dF = wordCount.map(x=>{Row(x._1,x._2)})
  49. //dF.foreach(println)
  50. // ->DataFrame->Mysql
  51. val schema = StructType(Array(
  52. StructField("lesson",StringType),
  53. StructField("counts",IntegerType)
  54. ))
  55. val orgDF = spark.createDataFrame(dF, schema)
  56. orgDF.show()
  57. println("开始写入数据库")
  58. orgDF.write.mode("overwrite").jdbc(url,"lesson_counts",prop)
  59. println("完成写入数据库,因为报错会中断进程")
  60. sc.stop()
  61. }
  62. }

** 搜索引擎访问量统计代码如下:**

  1. package ssm
  2. import org.apache.spark.sql.{Row, SparkSession}
  3. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  4. import java.util.Properties
  5. object UrlCount {
  6. def main(args:Array[String]): Unit ={
  7. val prop = new Properties()
  8. prop.put("user", "root")
  9. prop.put("password", "******")
  10. prop.put("driver", "com.mysql.jdbc.Driver")
  11. val url = "jdbc:mysql://localhost:3306/python_db"
  12. while(true) {
  13. Thread.sleep(10000)
  14. println("开始更新搜索点击量")
  15. SECounts(prop, url)
  16. }
  17. }
  18. def SECounts(prop:Properties,url:String): Unit ={
  19. val spark = SparkSession.builder().appName("urlCount").master("local").getOrCreate()
  20. val sc = spark.sparkContext
  21. val SEFrame = spark.read.jdbc(url, "warehouse", prop).select("SE")
  22. // dataFrame.show(5)
  23. val SERdd = SEFrame.rdd.map(_.mkString(""))
  24. val SE = SERdd
  25. .map(_.split("[.]"))
  26. .map(x=>x(1))
  27. .map(_.replaceAll("baidu","百度"))
  28. .map(_.replaceAll("sogou","搜狗"))
  29. .map(_.replaceAll("bing","必应"))
  30. .map(_.replaceAll("yahoo","雅虎"))
  31. val seCount = SE.map((_,1))
  32. .reduceByKey(_+_)
  33. .sortBy(_._1)
  34. .map(x=>{Row(x._1,x._2)})
  35. seCount.foreach(println)
  36. val schema = StructType(Array(
  37. StructField("Se",StringType),
  38. StructField("counts",IntegerType)
  39. ))
  40. val detailDF = spark.createDataFrame(seCount,schema)
  41. println("正在存入数据库")
  42. detailDF.write.mode("Overwrite").jdbc(url,"se_counts",prop)
  43. println("数据库存入完成")
  44. sc.stop()
  45. }
  46. }

4.检验步骤

运行SparkStreaming代码挂后台

运行UrlCount和NameCount挂后台

运行Python脚本挂后台

前往Spark的窗口查看数据是否成功收集

Spark Streaming成功样例:

** NameCount成功样例:**

** urlCount成功样例:**

** 5.注意事项:**

①.Spark Streaming处理日志需要花费的时间最好不要超过一次检测(目前是10s)的时间,当Python一次性生成的数据量过大(目前是100)导致上述问题发生时,Spark的计算会出现严重问题,甚至可能会宕机。合理分配Spark检测时间和数据量很重要,这属于经验,具体要看后续其他业务的数据量的大小来调优。

---有一次作死,让python每5s产生1500条日志(总计一次3万),Spark检测时间也是5s,结果一次存入数据库的数据量也在3万上下(理论是6000)。后续发现当第一批数据读入还未计算完,第二批就进来了,结果就是第一批数据夹杂着第二批的数据一并存入数据库中,而当我意识到这个问题的时候,数据库中的数据量已经超过20多万了,我一气之下删库了.....

至此Spark Streaming的开发已经完成了

3.前端开发(IDEA专业版)

创建工程文件搭建SSM框架

1.配置mapper和pojo目录文件

工程文件显示如下,其中SE为搜索引擎的类,Lesson为课程点击量的类

Lesson.java (SE.java同理,可以直接去发的源码查看)

  1. package cn.pojo;
  2. public class Lesson {
  3. private String lesson = null;
  4. private Integer lesson_count = null;
  5. public String getLesson() {return lesson;}
  6. public void setLesson(String lesson) {this.lesson = lesson;}
  7. public Integer getLesson_count() {return lesson_count;}
  8. public void setLesson_count(Integer lesson_count) {this.lesson_count = lesson_count;}
  9. @Override
  10. public String toString() {
  11. return "Lesson{" +
  12. " lesson= " + lesson +
  13. " ,lesson_count= " + lesson_count +
  14. '}';
  15. }
  16. }

接口LessonMapper.java (SEMapper.java同理)

  1. package cn.mapper;
  2. import org.apache.ibatis.annotations.Param;
  3. import cn.pojo.Lesson;
  4. import java.util.List;
  5. public interface LessonMapper {
  6. Integer selectLessonCount(Lesson user);
  7. List<Lesson> selectAll();
  8. }

接口的实现LessonMapper.xml (SE同理)

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper
  3. PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  4. "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  5. <mapper namespace="cn.mapper.LessonMapper">
  6. <select id="selectLessonCount" resultType="int">
  7. select counts from lesson_counts where lesson = CONCAT(#{lesson})
  8. </select>
  9. <!-- 左边为Lesson设置的参数名字,右边为MySQL表格的参数名字,如果是联表查询都可以resultMap为这个 -->
  10. <resultMap type="cn.pojo.Lesson" id="userLesson">
  11. <result property="lesson" column="lesson"/>
  12. <result property="lesson_count" column="counts"/>
  13. </resultMap>
  14. <select id="selectAll" resultType="cn.pojo.Lesson" resultMap="userLesson">
  15. select * from lesson_counts
  16. </select>
  17. </mapper>

2.util配置

在util下创建MybatisUtils.java用于Mybatis连接Mysql

  1. package cn.util;
  2. import org.apache.ibatis.io.Resources;
  3. import org.apache.ibatis.session.SqlSession;
  4. import org.apache.ibatis.session.SqlSessionFactory;
  5. import org.apache.ibatis.session.SqlSessionFactoryBuilder;
  6. import java.io.IOException;
  7. import java.io.InputStream;
  8. public class MybatisUtils {
  9. private static SqlSessionFactory sqlSessionFactory;
  10. static {
  11. try {
  12. String resource = "mybatis-config.xml";
  13. InputStream inputStream = Resources.getResourceAsStream(resource);
  14. sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. //获取SqlSession连接
  20. public static SqlSession getSession(){
  21. return sqlSessionFactory.openSession();
  22. }
  23. }

3.配置dao层和service层代码

DAO层代码

接口CourseClickDao.java (SE同理)

  1. package cn.dao;
  2. import cn.pojo.Lesson;
  3. import java.util.List;
  4. public interface CourseClickDao {
  5. List<Lesson> selectLessonAll(Lesson user);
  6. }

接口CourseClickDao.java的实现CourseDaoImpl.java

  1. package cn.dao.Impl;
  2. import cn.dao.CourseClickDao;
  3. import cn.pojo.Lesson;
  4. import org.mybatis.spring.SqlSessionTemplate;
  5. import java.util.List;
  6. public class CourseDaoImpl implements CourseClickDao{
  7. public SqlSessionTemplate session;
  8. public SqlSessionTemplate getSession(){return session;}
  9. public void setSession(SqlSessionTemplate session){this.session = session;}
  10. @Override
  11. public List selectLessonAll(Lesson user){
  12. return session.selectList("cn.mapper.LessonMapper.selectAll");
  13. }
  14. }

4.resources配置

Spring配置文件beans.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
  5. xmlns:aop="http://www.springframework.org/schema/aop"
  6. xmlns:p="http://www.springframework.org/schema/p"
  7. xsi:schemaLocation="http://www.springframework.org/schema/beans
  8. http://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/context
  10. http://www.springframework.org/schema/context/spring-context-4.3.xsd
  11. http://www.springframework.org/schema/tx
  12. http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
  13. http://www.springframework.org/schema/aop
  14. http://www.springframework.org/schema/aop/spring-aop-4.3.xsd"
  15. default-autowire="byType">
  16. <!-- 配置数据源 换成你的数据库url-->
  17. <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource"
  18. destroy-method="close">
  19. <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
  20. <property name="url" value="jdbc:mysql://localhost:3306/python_db"/>
  21. <property name="username" value="root"/>
  22. <property name="password" value="******"/>
  23. </bean>
  24. <!-- 配置SessionFactory -->
  25. <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
  26. <property name="dataSource" ref="dataSource"/>
  27. <property name="configLocation" value="classpath:mybatis-config.xml"/>
  28. </bean>
  29. <!--内容可以不改动,配置sqlSessionTemplate -->
  30. <bean id="sqlSessionTemplate" class="org.mybatis.spring.SqlSessionTemplate">
  31. <constructor-arg name="sqlSessionFactory" ref="sqlSessionFactory"/>
  32. </bean>
  33. <bean id="clickDao" class="cn.dao.Impl.CourseDaoImpl">
  34. <property name="session" ref="sqlSessionTemplate"/>
  35. </bean>
  36. <bean id="searchDao" class="cn.dao.Impl.SearchDaoImpl">
  37. <property name="session" ref="sqlSessionTemplate"/>
  38. </bean>
  39. <bean id="clickService" class="cn.service.Impl.CourseServiceImpl">
  40. <property name="dao" ref = "clickDao"/>
  41. </bean>
  42. <bean id="searchService" class="cn.service.Impl.SearchServiceImpl">
  43. <property name="dao" ref = "searchDao"/>
  44. </bean>
  45. <bean id="Lesson" class="cn.pojo.Lesson"
  46. p:lesson="Hadoop" p:lesson_count="1"/>
  47. <bean id="SE" class="cn.pojo.SE"
  48. p:lesson="百度" p:lesson_count="1"/>
  49. <!-- 包扫描 -->
  50. <context:component-scan base-package="cn.dao,cn.service" />
  51. <!-- 事务管理器 -->
  52. <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
  53. <property name="dataSource" ref="dataSource"/>
  54. </bean>
  55. <!-- 配置事务 -->
  56. <tx:advice id="txAdvice" transaction-manager="txManager">
  57. <tx:attributes>
  58. <tx:method name="*" propagation="REQUIRED" isolation="DEFAULT"/>
  59. </tx:attributes>
  60. </tx:advice>
  61. </beans>

Spring MVC配置文件dispatcher.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:mvc="http://www.springframework.org/schema/mvc"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans.xsd
  8. http://www.springframework.org/schema/mvc
  9. http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
  10. http://www.springframework.org/schema/context
  11. http://www.springframework.org/schema/context/spring-context-4.3.xsd">
  12. <!-- 配置包扫描路径 -->
  13. <context:component-scan base-package="cn.controller"/>
  14. <!-- 配置使用注解驱动 -->
  15. <mvc:annotation-driven />
  16. <mvc:default-servlet-handler/>
  17. <!-- 配置视图解析器 -->
  18. <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
  19. <property name="prefix" value="/WEB-INF/jsp"/>
  20. <property name="suffix" value=".jsp"/>
  21. </bean>
  22. </beans>

Mybatis配置文件mybatis-config.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE configuration
  3. PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
  4. "http://mybatis.org/dtd/mybatis-3-config.dtd">
  5. <configuration>
  6. <!--这一部分的datasource可以不需要,只需要mappers-->
  7. <environments default="development">
  8. <environment id="development">
  9. <transactionManager type="JDBC"/>
  10. <dataSource type="POOLED">
  11. <!--<property name="driver" value="com.mysql.jdbc.Driver"/>-->
  12. <property name="driver" value="com.mysql.cj.jdbc.Driver"/>
  13. <property name="url" value="jdbc:mysql://localhost:3306/python_db"/>
  14. <property name="username" value="root"/>
  15. <property name="password" value="******"/>
  16. </dataSource>
  17. </environment>
  18. </environments>
  19. <mappers>
  20. <mapper class="cn.mapper.SEMapper"/>
  21. <mapper class="cn.mapper.LessonMapper"/>
  22. </mappers>
  23. </configuration>

5.配置controller

配置MVC控制器CourseClickController.java如下(SearchClickController.java同理)

  1. package cn.controller;
  2. import cn.pojo.Lesson;
  3. import cn.service.CourseClickService;
  4. import net.sf.json.JSONArray;
  5. import net.sf.json.JSONObject;
  6. import org.springframework.context.ApplicationContext;
  7. import org.springframework.context.support.ClassPathXmlApplicationContext;
  8. import org.springframework.stereotype.Controller;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RequestMethod;
  11. import org.springframework.web.bind.annotation.ResponseBody;
  12. import java.util.List;
  13. /**
  14. * 课程点击Controller
  15. */
  16. @Controller("COURSECLICK")
  17. @RequestMapping("/")
  18. public class CourseClickController {
  19. //页面跳转
  20. @RequestMapping("/courseClick")
  21. public String toGetCourseClick(){
  22. return "courseClick";
  23. }
  24. /**
  25. * sponseBody注解的作用是将controller的方法返回的对象通过适当的转换器转
  26. * 换为指定的格式之后,写入到response对象的body区,通常用来返回JSON数据或者是XML
  27. */
  28. @ResponseBody
  29. @RequestMapping(value = "/getCourseClickCount",method = RequestMethod.GET)
  30. public JSONArray courseClickCount(){
  31. JSONArray json = new JSONArray();
  32. // Lesson类有课程名name(String)和对应的人点击量count(int)
  33. ApplicationContext context = new ClassPathXmlApplicationContext(
  34. "beans.xml");
  35. CourseClickService service = (CourseClickService) context.getBean("clickService");
  36. Lesson use = (Lesson) context.getBean("Lesson");
  37. List<Lesson> lessons = service.selectLessonALlSer(use);
  38. for(Lesson les : lessons){
  39. JSONObject jo = new JSONObject();
  40. jo.put("name",les.getLesson());
  41. jo.put("count",les.getLesson_count());
  42. json.add(jo);
  43. }
  44. // list有多个课程名和对应的点击量
  45. //封装JSON数据
  46. return json;
  47. }
  48. }

6.配置webapp

创建webapp,添加Web框架支持

以上文件的代码如下:

1.WEB-INF文件代码如下

  1. web.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
  5. version="4.0">
  6. <!-- 指定spring的配置文件beans.xml -->
  7. <context-param>
  8. <param-name>contextConfigLocation</param-name>
  9. <param-value>classpath:dispatcher-servlet.xml</param-value>
  10. </context-param>
  11. <!-- 确保web服务器启动时,完成spring的容器初始化 -->
  12. <listener>
  13. <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  14. </listener>
  15. <!-- 配置分发器 -->
  16. <servlet>
  17. <servlet-name>dispatcher</servlet-name>
  18. <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
  19. </servlet>
  20. <servlet-mapping>
  21. <servlet-name>dispatcher</servlet-name>
  22. <url-pattern>/</url-pattern>
  23. </servlet-mapping>
  24. </web-app>
  1. courseClick.jsp
  1. <%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
  2. <%@ page contentType="text/html;charset=UTF-8" language="java" %>
  3. <html>
  4. <head>
  5. <!-- 设置每隔10秒刷新一次页面-->
  6. <meta http-equiv="refresh" content="10">
  7. <script type="text/javascript" src="<c:url value="/js/echarts.js"/>"></script>
  8. <script src="<c:url value="/js/jquery-3.6.0.js"/>"></script>
  9. </head>
  10. <body>
  11. <div id="main1" style="width: 600px;height:500px;float: left;margin-top:100px"></div>
  12. <div id="main2" style="width: 600px;height:500px;float: right;margin-top:100px"></div>
  13. </body>>
  14. <script src="<c:url value="/js/getCourseClickData.js"/>"></script>
  15. </html>

2.js文件代码如下

** echarts和jquery源文件有,或者自行在官网下载**

  1. getCourseClickData.js
  1. function addScript(url){
  2. document.write("<script language=javascript src="+echarts+"></script>");
  3. }
  4. var scources = [];
  5. var scources2 = [];
  6. var scources3 = [];
  7. var scources4 = [];
  8. //获得url上参数date的值
  9. function GetQueryString()
  10. {
  11. var reg = new RegExp("(^|&)=([^&]*)(&|$)");
  12. var r = window.location.search.substr(1).match(reg);//search,查询?后面的参数,并匹配正则
  13. if(r!=null)return unescape(r[2]); return null;
  14. }
  15. var date = GetQueryString();
  16. $.ajax({
  17. type:"GET",
  18. url:"/getCourseClickCount",
  19. dataType:"json",
  20. async:false,
  21. success:function (result) {
  22. if(scources.length != 0){
  23. scources.clean();
  24. scources2.clean();
  25. scources3.clean();
  26. scources4.clean();
  27. }
  28. for(var i = 0; i < result.length; i++){//饼图外侧所有数据
  29. scources3.push({"value":result[i].count,"name":result[i].name});
  30. }
  31. for(var i = 0; i < 5; i++){//柱状图前五数据
  32. scources.push(result[i].name);
  33. scources2.push(result[i].count);
  34. }
  35. for(var i = 0; i < 3; i++){//内测饼图前三数据
  36. scources4.push({"value":result[i].count,"name":result[i].name});
  37. }
  38. }
  39. })
  40. // 基于准备好的dom,初始化echarts实例
  41. var myChart = echarts.init(document.getElementById('main1'));
  42. // 指定图表的配置项和数据
  43. var option = {
  44. title: {
  45. text: '学习网实时实战课程访问量',
  46. subtext: '课程点击数',
  47. x:'center'
  48. },
  49. tooltip: {
  50. legend: {
  51. data: ['点击数']
  52. }
  53. },
  54. xAxis: {
  55. data: scources
  56. },
  57. yAxis: {},
  58. series: [{
  59. name: '点击数',
  60. type: 'bar',
  61. data: scources2
  62. }]
  63. };
  64. // 使用刚指定的配置项和数据显示图表。
  65. myChart.setOption(option);
  66. var myChart = echarts.init(document.getElementById('main2'));
  67. // 指定图表的配置项和数据
  68. var option = {
  69. title: {
  70. text: '学习网实时实战课程访问量',
  71. subtext: '课程点击数',
  72. x:'center'
  73. },
  74. tooltip: {
  75. trigger: 'item',
  76. formatter: "{a} <br/>{b}: {c} ({d}%)"
  77. },
  78. legend: {
  79. type: 'scroll',
  80. orient: 'vertical',
  81. right: 50,
  82. top: 20,
  83. data: scources
  84. },
  85. series: [
  86. {
  87. name: 'Access From',
  88. type: 'pie',
  89. selectedMode: 'single',
  90. radius: [0, '30%'],
  91. label: {
  92. position: 'inner',
  93. fontSize: 20
  94. },
  95. labelLine: {
  96. show: false
  97. },
  98. data: scources4
  99. },
  100. {
  101. name: 'Access From',
  102. type: 'pie',
  103. radius: ['40%', '55%'],
  104. labelLine: {
  105. length: 30
  106. },
  107. label: {
  108. formatter: '{a|{a}}{abg|}\n{hr|}\n {b|{b} :}{c} {per|{d}%} ',
  109. backgroundColor: '#F6F8FC',
  110. borderColor: '#8C8D8E',
  111. borderWidth: 1,
  112. borderRadius: 5,
  113. rich: {
  114. a: {
  115. color: '#6E7079',
  116. lineHeight: 20,
  117. align: 'center'
  118. },
  119. hr: {
  120. borderColor: '#8C8D8E',
  121. width: '100%',
  122. borderWidth: 1,
  123. height: 0
  124. },
  125. b: {
  126. color: '#4C5058',
  127. fontSize: 15,
  128. fontWeight: 'bold',
  129. lineHeight: 30
  130. },
  131. per: {
  132. color: '#fff',
  133. backgroundColor: '#4C5058',
  134. padding: [3, 4],
  135. borderRadius: 5
  136. }
  137. }
  138. },
  139. data: scources3
  140. }
  141. ]
  142. };
  143. // 使用刚指定的配置项和数据显示图表。
  144. myChart.setOption(option);

7.配置tomcat服务器

先去官网下载tomcat服务器,这里使用的时8.5版本

一般此时IDEA会提醒你发现Web,是否配置Web框架,点击配置,前往“项目结构”,为Web配置工件,将右边SSMWork的与SSM相关的元素带入SSMWork_Web war中

进入编辑配置,添加工件,大致配置如下,删除“部署”中“应用程序上下文”的地址

5.效果展示

运行tomcat服务器后,默认先打开index.jsp网页,显示内容则表示tomcat配置没问题

在url地址栏输入/courseClcik或者/searchClick显示如下

网站默认10s刷新一次,此时开启scala目录的三个进程,再开启python脚本,页面便可以10秒更新一次数据,从而达到实时数据的展示

标签: spark 大数据

本文转载自: https://blog.csdn.net/Legosnow/article/details/124227289
版权归原作者 Legosnow 所有, 如有侵权,请联系我们删除。

“Spark流处理日志+SSM前端展示(详细)”的评论:

还没有评论