网约车大数据综合项目——数据分析Spark
第1关: 统计撤销订单中撤销理由最多的前 10 种理由
importorg.apache.log4j.Level;importorg.apache.log4j.Logger;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SaveMode;importorg.apache.spark.sql.SparkSession;publicclassCancelReasonTop10{publicstaticvoidmain(String[] args){/********** Begin **********/Logger.getLogger("org").setLevel(Level.ERROR);SparkSession spark =SparkSession.builder().master("local").appName("CancelReasonTop10").getOrCreate();Dataset<Row> moviesData = spark.read().option("delimiter","|").csv("/data/workspace/myshixun/data/canceldata.txt").toDF("companyid","address","districtname","orderid","ordertime","canceltime","operator","canceltypecode","cancelreason");
moviesData.registerTempTable("data");
spark.sql("select cancelreason,count(*) num from data where cancelreason != '未知' group by cancelreason order by num desc limit 10").write().format("jdbc").option("url","jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8").option("dbtable","cancelreason").option("user","root").option("password","123123").mode(SaveMode.Append).save();/********** End **********/}}
第2关:查询出成功订单最多的10个地区名
importorg.apache.log4j.Level;importorg.apache.log4j.Logger;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SaveMode;importorg.apache.spark.sql.SparkSession;publicclassOrderByCreateTop10{publicstaticvoidmain(String[] args){/********** Begin **********/Logger.getLogger("org").setLevel(Level.ERROR);SparkSession spark =SparkSession.builder().master("local").appName("OrderByCreateTop10").getOrCreate();Dataset<Row> orderdata = spark.read().option("delimiter","\t").csv("/data/workspace/myshixun/data/createdata.txt").toDF("companyid","address","districtname","orderid","departtime","ordertime","departure","deplongitude","deplatitude","destination","destlongitude","destlatitude");
orderdata.registerTempTable("data");
spark.sql("select districtname,count(*) num from data group by districtname order by num desc limit 10").write().format("jdbc").option("url","jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8").option("dbtable","order_district").option("user","root").option("password","123123").mode(SaveMode.Append).save();/********** End **********/}}
第3关:查询订单线路中出行次数最多的五条线路
importorg.apache.log4j.Level;importorg.apache.log4j.Logger;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SaveMode;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.api.java.UDF1;importorg.apache.spark.sql.types.DataTypes;publicclassLinesTop5{publicstaticvoidmain(String[] args){/********** Begin **********/Logger.getLogger("org").setLevel(Level.ERROR);SparkSession spark =SparkSession.builder().master("local").appName("OrderByCreateTop10").getOrCreate();Dataset<Row> orderdata = spark.read().option("delimiter","\t").csv("/data/workspace/myshixun/data/createdata.txt").toDF("companyid","address","districtname","orderid","departtime","ordertime","departure","deplongitude","deplatitude","destination","destlongitude","destlatitude");
orderdata.registerTempTable("data");
spark.udf().register("compare",(UDF1<String,String>) s ->{String ss ="";int i = s.split("\\*")[0].compareTo(s.split("\\*")[1]);if(s.split("\\*").length ==2){if(i >=0){
ss = s.split("\\*")[0]+"*"+ s.split("\\*")[1];}else{
ss = s.split("\\*")[1]+"*"+ s.split("\\*")[0];}}elseif(s.split("\\*").length ==6){if(i >=0){
ss = s.split("\\*")[0]+"*"+ s.split("\\*")[1]+"*"+ s.split("\\*")[2]+"*"+ s.split("\\*")[3]+"*"+ s.split("\\*")[4]+"*"+ s.split("\\*")[5];}else{
ss = s.split("\\*")[1]+"*"+ s.split("\\*")[0]+"*"+ s.split("\\*")[4]+"*"+ s.split("\\*")[5]+"*"+ s.split("\\*")[2]+"*"+ s.split("\\*")[3];}}return ss;},DataTypes.StringType);
spark.sql("select compare(concat_ws('*',departure,destination))line,count(*) num from data where departure is not null and destination is not null group by compare(concat_ws('*',departure,destination)) order by num desc limit 5").registerTempTable("t1");
spark.sql("select concat_ws('*',split(compare(concat_ws('*',departure,destination,deplongitude,deplatitude,destlongitude,destlatitude)),'[*]')[0],split(compare(concat_ws('*',departure,destination,deplongitude,deplatitude,destlongitude,destlatitude)),'[*]')[1])line,compare(concat_ws('*',departure,destination,deplongitude,deplatitude,destlongitude,destlatitude)) bb,count(*) num from data where departure is not null and destination is not null group by compare(concat_ws('*',departure,destination,deplongitude,deplatitude,destlongitude,destlatitude)) order by num desc").registerTempTable("t2");
spark.sql("select split(bb,'[*]')[0] departure,split(bb,'[*]')[2] deplongitude,split(bb,'[*]')[3] deplatitude,split(bb,'[*]')[1] destination,split(bb,'[*]')[4] destlongitude,split(bb,'[*]')[5] destlatitude,num from(select t1.line,t2.bb,t2.num count,t1.num, Row_Number() OVER (partition by t1.line ORDER BY t2.num desc) rank from t1 left join t2 on t1.line = t2.line order by t1.num desc) where rank=1").write().format("jdbc").option("url","jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8").option("dbtable","orderline").option("user","root").option("password","123123").mode(SaveMode.Append).save();/********** End **********/}}
第4关:湖南各个市的所有订单总量
importorg.apache.log4j.Level;importorg.apache.log4j.Logger;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SaveMode;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.api.java.UDF1;importorg.apache.spark.sql.types.DataTypes;publicclassOrderCountByCity{publicstaticvoidmain(String[] args){/********** Begin **********/Logger.getLogger("org").setLevel(Level.ERROR);SparkSession spark =SparkSession.builder().master("local").appName("OrderCountByCity").getOrCreate();Dataset<Row> orderdata = spark.read().option("delimiter","\t").csv("/data/workspace/myshixun/data/createdata.txt").toDF("companyid","address","districtname","orderid","departtime","ordertime","departure","deplongitude","deplatitude","destination","destlongitude","destlatitude");
orderdata.registerTempTable("data");Dataset<Row> canceldata = spark.read().option("delimiter","|").csv("/data/workspace/myshixun/data/canceldata.txt").toDF("companyid","address","districtname","orderid","ordertime","canceltime","operator","canceltypecode","cancelreason");
canceldata.registerTempTable("data1");
spark.udf().register("city",(UDF1<String,String>) s ->{String city ="";if(s.contains("自治州")){
city = s.split("自治州")[0]+"自治州";}else{
city = s.split("市")[0]+"市";}return city;},DataTypes.StringType);
spark.sql("select city(districtname) city,count(*) count from data where districtname like '湖南省%' group by city(districtname)").registerTempTable("order");
spark.sql("select city(districtname) city,count(*) count from data1 where districtname like '湖南省%' group by city(districtname)").registerTempTable("cancel");
spark.sql("select order.city,(order.count+cancel.count) num from order left join cancel on order.city == cancel.city order by num desc").write().format("jdbc").option("url","jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8").option("dbtable","orderbycity").option("user","root").option("password","123123").mode(SaveMode.Append).save();
spark.stop();/********** End **********/}}
第5关:统计湖南省当天的各时间段订单总数量与各市级当天各时间段订单总数量
importorg.apache.log4j.Level;importorg.apache.log4j.Logger;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SaveMode;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.api.java.UDF1;importorg.apache.spark.sql.types.DataTypes;publicclassOrderHourCity{publicstaticvoidmain(String[] args){/********** Begin **********/Logger.getLogger("org").setLevel(Level.ERROR);SparkSession spark =SparkSession.builder().master("local").appName("OrderHourCity").getOrCreate();Dataset<Row> orderdata = spark.read().option("delimiter","\t").csv("/data/workspace/myshixun/data/createdata.txt").toDF("companyid","address","districtname","orderid","departtime","ordertime","departure","deplongitude","deplatitude","destination","destlongitude","destlatitude");
orderdata.registerTempTable("data");Dataset<Row> canceldata = spark.read().option("delimiter","|").csv("/data/workspace/myshixun/data/canceldata.txt").toDF("companyid","address","districtname","orderid","ordertime","canceltime","operator","canceltypecode","cancelreason");
canceldata.registerTempTable("data1");
spark.udf().register("city",(UDF1<String,String>) s ->{String city ="";if(s.contains("自治州")){
city = s.split("自治州")[0]+"自治州";}else{
city = s.split("市")[0]+"市";}return city;},DataTypes.StringType);
spark.sql("select hour(ordertime) hour,city(districtname)city,count(*) count from data1 where districtname like '湖南省%' group by hour(ordertime),city(districtname) order by hour").registerTempTable("t1");
spark.sql("select hour(ordertime) hour,city(districtname)city,count(*) count from data where districtname like '湖南省%' group by hour(ordertime),city(districtname) order by hour").registerTempTable("t2");
spark.sql("select (case when t1.hour is null then t2.hour when t2.hour is null then t1.hour else t2.hour end)hour,(case when t1.city is null then t2.city when t2.city is null then t1.city else t2.city end)city,(case when t1.count is null then t2.count when t2.count is null then t1.count else t2.count+t1.count end)num from t1 full join t2 on concat_ws('*',t1.hour,t1.city) = concat_ws('*',t2.hour,t2.city) order by hour,city").write().format("jdbc").option("url","jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8").option("dbtable","order_city_hour").option("user","root").option("password","123123").mode(SaveMode.Append).save();
spark.sql("select (case when t1 is null then t2 when t2 is null then t1 else t2 end) as time ,(case when count1 is null then count2 when count2 is null then count1 else count2+count1 end) as num from(select * from (SELECT DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm') as t1,count(DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm')) as count1 FROM data GROUP BY DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm')) as a FULL OUTER JOIN (SELECT DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm') as t2,count(DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm')) as count2 FROM data1 GROUP BY DATE_FORMAT(ordertime,'yyyy-MM-dd HH:mm')) as b on a.t1=b.t2) as c order by time").write().format("jdbc").option("url","jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8").option("dbtable","order_quantity_time").option("user","root").option("password","123123").mode(SaveMode.Append).save();
spark.stop();/********** End **********/}}
本文转载自: https://blog.csdn.net/qq_52792570/article/details/130970721
版权归原作者 Wa_Automata 所有, 如有侵权,请联系我们删除。
版权归原作者 Wa_Automata 所有, 如有侵权,请联系我们删除。