0


Spark Streaming + Kafka构建实时数据流

1. 使用Apache Kafka构建实时数据流

参考文档链接:https://cloud.tencent.com/developer/article/1814030

2. 数据见UserBehavior.csv

数据解释:本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集

根据这一csv文档运用Kafka模拟实时数据流,作为Spark Streaming的输入源,两条记录实际的间隔时间如果是1分钟,那么Java应用在发送消息时也可以间隔一分钟再发送。

3. 处理要求

• 找出订单数量最多的日期。

• 找出最受欢迎的前三名商品ID

  1. 这个是老师根据某个比赛修改了赛题给大伙布置的任务,数据在上面方式可见,想着用java写实在是太麻烦了,改用了spark读取并模拟数据的实时性上传到Kafka,然后用sparkStreaming接收并处理数据。

代码如下:

  1. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
  2. import org.apache.log4j.{Level, Logger}
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.sql.{DataFrame, SparkSession}
  6. import org.json.JSONObject
  7. import java.util.Properties
  8. object KafkaProducer {
  9. case class UserBehavior(User_ID: String, Item_ID: String, Category_ID: String, Behavior: String,Timestamp: String,Date: String)
  10. //定义了一个样例类 UserBehavior,用于处理用户行为数据
  11. def main(args:Array[String])={
  12. Logger.getLogger("org").setLevel(Level.WARN)
  13. Logger.getLogger("akka").setLevel(Level.WARN)
  14. //设置日志级别。
  15. val spark:SparkSession = SparkSession.builder()
  16. .appName("KafkaProducer")
  17. .master("local[2]")
  18. .getOrCreate()
  19. //创建SparkSession对象,设置应用程序名和运行模式
  20. val props = new Properties
  21. props.put("bootstrap.servers", "127.0.0.1:9092")
  22. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  23. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  24. val producer = new KafkaProducer[String, String](props)
  25. //设置kafka的生产者属性并创建kafka的生产者实
  26. val sc: SparkContext = spark.sparkContext
  27. sc.setLogLevel("ERROR")
  28. val path=""
  29. val lineRDD: RDD[Array[String]] = sc.textFile(path).map(_.split(","))
  30. val UserBehaviorRDD: RDD[UserBehavior] = lineRDD.map(x => UserBehavior(x(0), x(1), x(2), x(3),x(4),x(5)))
  31. import spark.implicits._
  32. val UserBehaviorDF: DataFrame = UserBehaviorRDD.toDF
  33. val jsonStringDF = UserBehaviorDF.toJSON.toDF("value") // 转换为JSON格式的DataFrame
  34. val jsonStringArr = jsonStringDF.collect.map(_.getString(0)) // 获取JSON格式的DataFrame中的JSON字符串数组
  35. val topic = "UserBehavior"
  36. //或者你也可以直接这样发送数据更简单
  37. //val path=""
  38. //val df = spark.read.csv(path)
  39. //val JsonDF = df.toJSON
  40. //val data = JsonDF.collect()
  41. //data.foreach{x=>
  42. // val record = new ProducerRecord[String, String](topic,x) // producer.send(record)
  43. //}
  44. var lastTimestamp = 10000000000L
  45. for (jsonString <- jsonStringArr) {
  46. val jsonObject = new JSONObject(jsonString)
  47. val timestamp = jsonObject.getString("Timestamp")
  48. var currentTimestamp = timestamp.toLong
  49. if (currentTimestamp - lastTimestamp >= 60000) { //模拟数据实时发送,如果当此时的时间与上一条的时间相隔超过60秒
  50. Thread.sleep(60000) //等待1分钟发送
  51. lastTimestamp=currentTimestamp
  52. println(jsonString)
  53. val record = new ProducerRecord[String, String](topic,jsonString)
  54. producer.send(record)
  55. } else {
  56. lastTimestamp=currentTimestamp
  57. println(jsonString)
  58. val record = new ProducerRecord[String, String](topic,jsonString)
  59. producer.send(record)
  60. }
  61. }
  62. producer.close()
  63. sc.stop()
  64. spark.stop()
  65. }
  66. }

下面是SparkStreaming读取的代码:

  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.functions._
  4. import org.apache.spark.sql.streaming.Trigger
  5. import org.apache.spark.sql.types._
  6. object SparkStreaming {
  7. def main(args: Array[String]): Unit = {
  8. Logger.getLogger("org").setLevel(Level.WARN)
  9. Logger.getLogger("akka").setLevel(Level.WARN)
  10. val spark = SparkSession.builder.appName("SparkStreaming").master("local[2]").getOrCreate()
  11. val schema = StructType(Seq(
  12. StructField("User_ID", StringType),
  13. StructField("Item_ID", StringType),
  14. StructField("Category_ID", StringType),
  15. StructField("Behavior", StringType),
  16. StructField("Timestamp", StringType),
  17. StructField("Date", StringType),
  18. ))//定义数据模式
  19. val df = spark.readStream
  20. .format("kafka")
  21. .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  22. .option("subscribe","UserBehavior")
  23. .option("startingOffsets", "earliest")
  24. .load()
  25. .selectExpr("CAST(value AS STRING)")
  26. .select(from_json(col("value"),schema).as("data"))
  27. .select("data.User_ID","data.Item_ID","data.Category_ID","data.Behavior","data.Timestamp","data.Date")
  28. //选择value列,并映射成DataFrame,解析JSON格式的数据成可读的列。
  29. val newDF = df.withColumn("Timestamp", from_unixtime(df("Timestamp"), "yyyy-MM-dd"))
  30. //将时间戳改成时间格式
  31. val result = newDF.filter(col("Behavior") === "buy")
  32. .groupBy(col("Timestamp"))
  33. .agg(count(col("User_ID")).as("buy_count"))
  34. .orderBy(col("buy_count").desc)
  35. .limit(1)
  36. // .cache()
  37. val result2 = newDF.groupBy("Item_ID")
  38. .agg(count("*").as("count"))
  39. .orderBy(col("count").desc)
  40. .limit(3)
  41. // .cache()
  42. // 启动流处理并等待处理结束
  43. val query = result.writeStream
  44. .outputMode("complete")
  45. .format("console")
  46. .trigger(Trigger.ProcessingTime("30 seconds"))
  47. .start()
  48. val query2 = result2.writeStream
  49. .outputMode("complete")
  50. // .outputMode("update")
  51. .format("console")
  52. .trigger(Trigger.ProcessingTime("30 seconds"))
  53. .start()
  54. query.awaitTermination()
  55. query2.awaitTermination()
  56. spark.stop()
  57. }
  58. }
  1. 这里可见我用过cache()将数据缓存到内存中,但是cache()对于这两个查询任务的性能提升不太明显。因为在这个例子中,数据是实时流式处理的,而不是一次处理一个批次的静态数据。对于流处理程序而言,常规的缓存方法对于提升性能的作用是非常有限的。流式数据的实时特性意味着数据不断更新,因此很难保持缓存的数据与最新的数据的一致性。所以在流处理中,更有效的性能优化方法是使用更高效的算法,并通过对流数据的精细控制来调整计算中的批大小和触发机制,而不是简单地使用缓存方法。
  2. 但是用dataframe格式输出的太慢了,所以下面试用rdd的形式:
  1. import org.apache.kafka.common.serialization.StringDeserializer
  2. import org.apache.log4j.{Level, Logger}
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.{DataFrame, SparkSession}
  5. import org.apache.spark.streaming._
  6. import org.apache.spark.streaming.kafka010._
  7. import org.json.JSONObject
  8. import java.util.Properties
  9. object RDDStreaming {
  10. case class UserBehavior(User_ID: String, Item_ID: String, Category_ID: String, Behavior: String,Timestamp: String,Date: String)
  11. def main(args: Array[String]): Unit = {
  12. val conf = new SparkConf().setAppName("KafkaSparkStreaming").setMaster("local[*]")
  13. val spark = SparkSession.builder().config(conf).getOrCreate()
  14. val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
  15. Logger.getLogger("org").setLevel(Level.WARN)
  16. Logger.getLogger("akka").setLevel(Level.WARN)
  17. val kafkaParams = Map[String, Object](
  18. //Kafka的broker列表,格式为host:port,host:port
  19. "bootstrap.servers" -> "127.0.0.1:9092",
  20. //key的反序列化方式
  21. "key.deserializer" -> classOf[StringDeserializer],
  22. //value的反序列化方式
  23. "value.deserializer" -> classOf[StringDeserializer],
  24. //消费者组ID
  25. "group.id" -> "test-group",
  26. //从最早的记录开始处理消息
  27. "auto.offset.reset" -> "earliest",
  28. //不自动提交偏移量
  29. "enable.auto.commit" -> (false: java.lang.Boolean)
  30. )
  31. def writeToMySQL(df: DataFrame) = {
  32. val properties: Properties = new Properties()
  33. properties.setProperty("user", "账户")
  34. properties.setProperty("password", "密码")
  35. properties.setProperty("driver", "com.mysql.jdbc.Driver")
  36. df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/Order", "Order.userbehavior", properties)
  37. }
  38. val topics = Array("UserBehavior")
  39. val stream = KafkaUtils.createDirectStream[String, String](
  40. ssc,
  41. LocationStrategies.PreferConsistent,
  42. ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  43. )
  44. println("--------------------------------------")
  45. stream.foreachRDD(rdd => {
  46. val count = rdd.filter(row => row.value.contains("buy")).count()
  47. val order = rdd.filter(row => row.value.contains("buy")).map(row => {
  48. val json = new JSONObject(row.value())
  49. UserBehavior(
  50. json.getString("User_ID"),
  51. json.getString("Item_ID"),
  52. json.getString("Category_ID"),
  53. json.getString("Behavior"),
  54. json.getString("Timestamp"),
  55. json.getString("Date")
  56. )
  57. })
  58. // val current = order.map(x => (x.User_ID, x.Item_ID))
  59. // current.foreach(x => println("用户ID:" + x._1 + " 商品ID: " + x._2))
  60. val MostOrderCount = order.map(x=>(x.Date.split(" ")(0),1)).reduceByKey(_+_).sortBy(_._2,false)
  61. if (!MostOrderCount.isEmpty()) {
  62. println("订单数量最多的日期:"+MostOrderCount.first()._1+" 数量:"+MostOrderCount.first()._2)
  63. } else {
  64. print(" ")
  65. }
  66. val order1 = rdd.map(row => {
  67. val json = new JSONObject(row.value())
  68. UserBehavior(
  69. json.getString("User_ID"),
  70. json.getString("Item_ID"),
  71. json.getString("Category_ID"),
  72. json.getString("Behavior"),
  73. json.getString("Timestamp"),
  74. json.getString("Date")
  75. )
  76. })
  77. val popular = order1.map(x=>(x.Item_ID,1)).reduceByKey(_+_).sortBy(_._2,false).take(3)
  78. popular.foreach(x=>println("最受欢迎的商品id:"+x._1+" 用户操作数量:"+x._2))
  79. println("订单总数为:"+count)
  80. // order1.foreachPartition(partition => {
  81. // val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/order", "root", "123456")
  82. //
  83. // // 获取数据库连接,可以使用连接池技术来管理数据库连接
  84. // partition.foreach(record => {
  85. // val insertStatement = connection.prepareStatement(
  86. // "INSERT INTO user_behavior (User_ID, Item_ID, Category_ID, Behavior, Timestamp, Date) " +
  87. // "VALUES (?, ?, ?, ?, ?, ?)")
  88. // insertStatement.setString(1, record.User_ID)
  89. // insertStatement.setString(2, record.Item_ID)
  90. // insertStatement.setString(3, record.Category_ID)
  91. // insertStatement.setString(4, record.Behavior)
  92. // insertStatement.setString(5, record.Timestamp)
  93. // insertStatement.setString(6, record.Date)
  94. // insertStatement.executeUpdate()
  95. // insertStatement.close()
  96. // })
  97. // connection.close()
  98. // })
  99. // println("数据写入成功")
  100. //插入数据速度较慢,用批处理
  101. import spark.implicits._
  102. if(!order1.isEmpty()) {
  103. writeToMySQL(order1.toDF)
  104. println("数据写入成功")
  105. }
  106. else println("无数据传入")
  107. println("--------------------------------------")
  108. })
  109. ssc.start()
  110. ssc.awaitTermination()
  111. }
  112. }
  1. 很多人遇到个问题就是sparksessionsparkcontext不能一起创建,那是因为只能启动一个sparkcontext,在启动sparksession时会默认启动sparkContext,启动StreamingContext也一样会启动sparkContext,所以这时候只需要设置用一开始创建的那个sparkContext即可,然后对Stream中每一个rdd统计‘buy’的数量然后将所有数据写入到MYSQL中。
  2. 下面附带maven依赖(可能这个代码里有些没用上,挑选其中即可):
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>mysql</groupId>
  9. <artifactId>mysql-connector-java</artifactId>
  10. <version>8.0.30</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.kafka</groupId>
  14. <artifactId>kafka-clients</artifactId>
  15. <version>3.0.0</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.spark</groupId>
  19. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  20. <version>3.0.0</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.spark</groupId>
  24. <artifactId>spark-sql_2.12</artifactId>
  25. <version>3.0.0</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.spark</groupId>
  29. <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
  30. <version>3.0.0</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.spark</groupId>
  34. <artifactId>spark-hive_2.12</artifactId>
  35. <version>3.0.0</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.apache.spark</groupId>
  39. <artifactId>spark-streaming_2.12</artifactId>
  40. <version>3.0.0</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.spark</groupId>
  44. <artifactId>spark-mllib_2.12</artifactId>
  45. <version>3.0.0</version>
  46. </dependency>
  47. </dependencies>
标签: spark kafka json

本文转载自: https://blog.csdn.net/weixin_63880640/article/details/130868533
版权归原作者 茶树油酸梅酱 所有, 如有侵权,请联系我们删除。

“Spark Streaming + Kafka构建实时数据流”的评论:

还没有评论