1. 使用Apache Kafka构建实时数据流
参考文档链接:https://cloud.tencent.com/developer/article/1814030
2. 数据见UserBehavior.csv
数据解释:本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集
根据这一csv文档运用Kafka模拟实时数据流,作为Spark Streaming的输入源,两条记录实际的间隔时间如果是1分钟,那么Java应用在发送消息时也可以间隔一分钟再发送。
3. 处理要求
• 找出订单数量最多的日期。
• 找出最受欢迎的前三名商品ID
这个是老师根据某个比赛修改了赛题给大伙布置的任务,数据在上面方式可见,想着用java写实在是太麻烦了,改用了spark读取并模拟数据的实时性上传到Kafka,然后用sparkStreaming接收并处理数据。
代码如下:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json.JSONObject
import java.util.Properties
object KafkaProducer {
case class UserBehavior(User_ID: String, Item_ID: String, Category_ID: String, Behavior: String,Timestamp: String,Date: String)
//定义了一个样例类 UserBehavior,用于处理用户行为数据
def main(args:Array[String])={
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
//设置日志级别。
val spark:SparkSession = SparkSession.builder()
.appName("KafkaProducer")
.master("local[2]")
.getOrCreate()
//创建SparkSession对象,设置应用程序名和运行模式
val props = new Properties
props.put("bootstrap.servers", "127.0.0.1:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
//设置kafka的生产者属性并创建kafka的生产者实
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("ERROR")
val path=""
val lineRDD: RDD[Array[String]] = sc.textFile(path).map(_.split(","))
val UserBehaviorRDD: RDD[UserBehavior] = lineRDD.map(x => UserBehavior(x(0), x(1), x(2), x(3),x(4),x(5)))
import spark.implicits._
val UserBehaviorDF: DataFrame = UserBehaviorRDD.toDF
val jsonStringDF = UserBehaviorDF.toJSON.toDF("value") // 转换为JSON格式的DataFrame
val jsonStringArr = jsonStringDF.collect.map(_.getString(0)) // 获取JSON格式的DataFrame中的JSON字符串数组
val topic = "UserBehavior"
//或者你也可以直接这样发送数据更简单
//val path=""
//val df = spark.read.csv(path)
//val JsonDF = df.toJSON
//val data = JsonDF.collect()
//data.foreach{x=>
// val record = new ProducerRecord[String, String](topic,x) // producer.send(record)
//}
var lastTimestamp = 10000000000L
for (jsonString <- jsonStringArr) {
val jsonObject = new JSONObject(jsonString)
val timestamp = jsonObject.getString("Timestamp")
var currentTimestamp = timestamp.toLong
if (currentTimestamp - lastTimestamp >= 60000) { //模拟数据实时发送,如果当此时的时间与上一条的时间相隔超过60秒
Thread.sleep(60000) //等待1分钟发送
lastTimestamp=currentTimestamp
println(jsonString)
val record = new ProducerRecord[String, String](topic,jsonString)
producer.send(record)
} else {
lastTimestamp=currentTimestamp
println(jsonString)
val record = new ProducerRecord[String, String](topic,jsonString)
producer.send(record)
}
}
producer.close()
sc.stop()
spark.stop()
}
}
下面是SparkStreaming读取的代码:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._
object SparkStreaming {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder.appName("SparkStreaming").master("local[2]").getOrCreate()
val schema = StructType(Seq(
StructField("User_ID", StringType),
StructField("Item_ID", StringType),
StructField("Category_ID", StringType),
StructField("Behavior", StringType),
StructField("Timestamp", StringType),
StructField("Date", StringType),
))//定义数据模式
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe","UserBehavior")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"),schema).as("data"))
.select("data.User_ID","data.Item_ID","data.Category_ID","data.Behavior","data.Timestamp","data.Date")
//选择value列,并映射成DataFrame,解析JSON格式的数据成可读的列。
val newDF = df.withColumn("Timestamp", from_unixtime(df("Timestamp"), "yyyy-MM-dd"))
//将时间戳改成时间格式
val result = newDF.filter(col("Behavior") === "buy")
.groupBy(col("Timestamp"))
.agg(count(col("User_ID")).as("buy_count"))
.orderBy(col("buy_count").desc)
.limit(1)
// .cache()
val result2 = newDF.groupBy("Item_ID")
.agg(count("*").as("count"))
.orderBy(col("count").desc)
.limit(3)
// .cache()
// 启动流处理并等待处理结束
val query = result.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
val query2 = result2.writeStream
.outputMode("complete")
// .outputMode("update")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
query.awaitTermination()
query2.awaitTermination()
spark.stop()
}
}
这里可见我用过cache()将数据缓存到内存中,但是cache()对于这两个查询任务的性能提升不太明显。因为在这个例子中,数据是实时流式处理的,而不是一次处理一个批次的静态数据。对于流处理程序而言,常规的缓存方法对于提升性能的作用是非常有限的。流式数据的实时特性意味着数据不断更新,因此很难保持缓存的数据与最新的数据的一致性。所以在流处理中,更有效的性能优化方法是使用更高效的算法,并通过对流数据的精细控制来调整计算中的批大小和触发机制,而不是简单地使用缓存方法。
但是用dataframe格式输出的太慢了,所以下面试用rdd的形式:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.json.JSONObject
import java.util.Properties
object RDDStreaming {
case class UserBehavior(User_ID: String, Item_ID: String, Category_ID: String, Behavior: String,Timestamp: String,Date: String)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaSparkStreaming").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val kafkaParams = Map[String, Object](
//Kafka的broker列表,格式为host:port,host:port
"bootstrap.servers" -> "127.0.0.1:9092",
//key的反序列化方式
"key.deserializer" -> classOf[StringDeserializer],
//value的反序列化方式
"value.deserializer" -> classOf[StringDeserializer],
//消费者组ID
"group.id" -> "test-group",
//从最早的记录开始处理消息
"auto.offset.reset" -> "earliest",
//不自动提交偏移量
"enable.auto.commit" -> (false: java.lang.Boolean)
)
def writeToMySQL(df: DataFrame) = {
val properties: Properties = new Properties()
properties.setProperty("user", "账户")
properties.setProperty("password", "密码")
properties.setProperty("driver", "com.mysql.jdbc.Driver")
df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/Order", "Order.userbehavior", properties)
}
val topics = Array("UserBehavior")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
println("--------------------------------------")
stream.foreachRDD(rdd => {
val count = rdd.filter(row => row.value.contains("buy")).count()
val order = rdd.filter(row => row.value.contains("buy")).map(row => {
val json = new JSONObject(row.value())
UserBehavior(
json.getString("User_ID"),
json.getString("Item_ID"),
json.getString("Category_ID"),
json.getString("Behavior"),
json.getString("Timestamp"),
json.getString("Date")
)
})
// val current = order.map(x => (x.User_ID, x.Item_ID))
// current.foreach(x => println("用户ID:" + x._1 + " 商品ID: " + x._2))
val MostOrderCount = order.map(x=>(x.Date.split(" ")(0),1)).reduceByKey(_+_).sortBy(_._2,false)
if (!MostOrderCount.isEmpty()) {
println("订单数量最多的日期:"+MostOrderCount.first()._1+" 数量:"+MostOrderCount.first()._2)
} else {
print(" ")
}
val order1 = rdd.map(row => {
val json = new JSONObject(row.value())
UserBehavior(
json.getString("User_ID"),
json.getString("Item_ID"),
json.getString("Category_ID"),
json.getString("Behavior"),
json.getString("Timestamp"),
json.getString("Date")
)
})
val popular = order1.map(x=>(x.Item_ID,1)).reduceByKey(_+_).sortBy(_._2,false).take(3)
popular.foreach(x=>println("最受欢迎的商品id:"+x._1+" 用户操作数量:"+x._2))
println("订单总数为:"+count)
// order1.foreachPartition(partition => {
// val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/order", "root", "123456")
//
// // 获取数据库连接,可以使用连接池技术来管理数据库连接
// partition.foreach(record => {
// val insertStatement = connection.prepareStatement(
// "INSERT INTO user_behavior (User_ID, Item_ID, Category_ID, Behavior, Timestamp, Date) " +
// "VALUES (?, ?, ?, ?, ?, ?)")
// insertStatement.setString(1, record.User_ID)
// insertStatement.setString(2, record.Item_ID)
// insertStatement.setString(3, record.Category_ID)
// insertStatement.setString(4, record.Behavior)
// insertStatement.setString(5, record.Timestamp)
// insertStatement.setString(6, record.Date)
// insertStatement.executeUpdate()
// insertStatement.close()
// })
// connection.close()
// })
// println("数据写入成功")
//插入数据速度较慢,用批处理
import spark.implicits._
if(!order1.isEmpty()) {
writeToMySQL(order1.toDF)
println("数据写入成功")
}
else println("无数据传入")
println("--------------------------------------")
})
ssc.start()
ssc.awaitTermination()
}
}
很多人遇到个问题就是sparksession和sparkcontext不能一起创建,那是因为只能启动一个sparkcontext,在启动sparksession时会默认启动sparkContext,启动StreamingContext也一样会启动sparkContext,所以这时候只需要设置用一开始创建的那个sparkContext即可,然后对Stream中每一个rdd统计‘buy’的数量然后将所有数据写入到MYSQL中。
下面附带maven依赖(可能这个代码里有些没用上,挑选其中即可):
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
版权归原作者 茶树油酸梅酱 所有, 如有侵权,请联系我们删除。