0


2023_Spark_实验三十二:消费Kafka数据并保存到MySQL中

实验目的:掌握Scala开发工具消费Kafka数据,并将结果保存到关系型数据库中

实验方法:消费Kafka数据保存到MySQL中

实验步骤:

一、创建Job_ClickData_Process

代码如下:

  1. package exams
  2. import org.apache.kafka.clients.consumer.ConsumerRecord
  3. import org.apache.kafka.common.TopicPartition
  4. import org.apache.kafka.common.serialization.StringDeserializer
  5. import org.apache.spark.streaming.dstream.InputDStream
  6. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
  7. import org.apache.spark.{SparkConf, SparkContext}
  8. import org.apache.spark.streaming.{Seconds, StreamingContext}
  9. import java.sql.{Connection, DriverManager, PreparedStatement}
  10. import scala.collection.mutable
  11. /**
  12. * @projectName sparkGNU2023
  13. * @package exams
  14. * @className exams.Job_ClickData_Process
  15. * @description ${description}
  16. * @author pblh123
  17. * @date 2023/12/20 15:42
  18. * @version 1.0
  19. *
  20. */
  21. object Job_ClickData_Process {
  22. def main(args: Array[String]): Unit = {
  23. // 1. 创建spark,sc,sparkstreaming对象
  24. if (args.length != 3) {
  25. println("您需要输入三个参数")
  26. System.exit(5)
  27. }
  28. val musrl: String = args(0)
  29. val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster(musrl)
  30. val sc: SparkContext = new SparkContext(conf)
  31. sc.setLogLevel("WARN")
  32. val ckeckpointdir: String = args(1)
  33. val ssc = new StreamingContext(sc, Seconds(5)) //连续流批次处理的大小
  34. // 2. 代码主体
  35. // 设置ckeckpoint目录
  36. ssc.checkpoint(ckeckpointdir)
  37. //准备kafka的连接参数
  38. val kfkbst: String = args(2)
  39. val kafkaParams: Map[String, Object] = Map[String, Object](
  40. "bootstrap.servers" -> kfkbst,
  41. "group.id" -> "SparkKafka",
  42. //latest表示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,就从最新/或最后的位置开始消费
  43. //earliest表示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,就从最开始/最早的位置开始消费
  44. //none示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,则报错
  45. "auto.offset.reset" -> "latest", //偏移量的重置位置
  46. "enable.auto.commit" -> (false: java.lang.Boolean), //是否自动提交偏移量
  47. "key.deserializer" -> classOf[StringDeserializer],
  48. "value.deserializer" -> classOf[StringDeserializer]
  49. )
  50. val topics: Array[String] = Array("RealDataTopic")
  51. //从mysql中查询出offsets:Map[TopicPartition, Long]
  52. val offsetsMap: mutable.Map[TopicPartition, Long] = OffsetUtils.getOffsetMap("SparkKafka", "RealDataTopic")
  53. val kafkaDS: InputDStream[ConsumerRecord[String, String]] = if (offsetsMap.size > 0) {
  54. println("MySql记录了offset信息,从offset处开始消费")
  55. //连接kafka的消息
  56. KafkaUtils.createDirectStream[String, String](
  57. ssc,
  58. LocationStrategies.PreferConsistent,
  59. ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetsMap)
  60. )
  61. } else {
  62. println("MySql没有记录了offset信息,从latest处开始消费")
  63. //连接kafka的消息
  64. KafkaUtils.createDirectStream[String, String](
  65. ssc,
  66. LocationStrategies.PreferConsistent,
  67. ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  68. )
  69. }
  70. //实时处理数据并手动维护offset
  71. val valueDS = kafkaDS.map(_.value()) //_表示从kafka中消费出来的每一条数据
  72. valueDS.print()
  73. kafkaDS.map(_.value())
  74. valueDS.foreachRDD(rdd => {
  75. rdd.foreachPartition(lines => {
  76. //将处理分析的结果存入mysql
  77. /*
  78. DROP TABLE IF EXISTS `job_real_time`;
  79. CREATE TABLE `job_real_time` (
  80. `datetime` varchar(8) DEFAULT NULL COMMENT '日期',
  81. `job_type` int(2) DEFAULT NULL COMMENT '1代表新招聘岗位,0代表找工作的人',
  82. `job_id` int(8) DEFAULT NULL COMMENT '岗位ID,匹配岗位名称',
  83. `count` int(8) DEFAULT NULL COMMENT '企业新增岗位数和找工作的人数'
  84. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  85. */
  86. //1.开启连接
  87. val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.137.110:3306/bigdata19?characterEncoding=UTF-8&serverTimezone=UTC", "lh", "Lh123456!")
  88. //2.编写sql并获取ps
  89. val sql: String = "replace into job_real_time(datetime,job_type,job_id,count) values(?,?,?,?)"
  90. val ps: PreparedStatement = conn.prepareStatement(sql)
  91. //3.设置参数并执行
  92. for (line <- lines) {
  93. var item = line.split(" ")
  94. ps.setString(1, item(0).toString)
  95. ps.setInt(2, item(1).toInt)
  96. ps.setInt(3, item(2).toInt)
  97. ps.setInt(4, item(3).toInt)
  98. ps.executeUpdate()
  99. }
  100. //4.关闭资源
  101. ps.close()
  102. conn.close()
  103. })
  104. })
  105. //手动提交偏移量
  106. kafkaDS.foreachRDD(rdd => {
  107. if (rdd.count() > 0) {
  108. //获取偏移量
  109. val offsets: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  110. OffsetUtils.saveOffsets(groupId = "SparkKafka", offsets)
  111. }
  112. })
  113. //开启sparkstreaming任务并等待结束,关闭ssc,sc
  114. ssc.start()
  115. ssc.awaitTermination()
  116. ssc.stop()
  117. sc.stop()
  118. }
  119. }

二、编写模拟点击量并消费Kafka数据

启动zookeeper集群

  1. zk.sh start

启动kafka集群

  1. kf.sh start

检查模拟的实时数据是否正常更新

不断正常更新的情况下,启动flume采集real-time-data.log的实时数据

启动flume

在mysql数据库中准备偏移表与实时数据表

启动Job_ClickData_Process方法消费kafka数据并保存到mysql中

检查mysql表是否存入数据

实验结果:通过scala开发spark代码实现消费kafka数据存储到MySQL中

标签: spark kafka mysql

本文转载自: https://blog.csdn.net/pblh123/article/details/135109828
版权归原作者 pblh123 所有, 如有侵权,请联系我们删除。

“2023_Spark_实验三十二:消费Kafka数据并保存到MySQL中”的评论:

还没有评论