想要在spark sql中对group by + concat_ws()的字段进行排序,可以参考如下方法。
原始数据如下:
+---+-----+----+|id |name |type|+---+-----+----+|1|name1|p ||2|name2|p ||3|name3|p ||1|x1 |q ||2|x2 |q ||3|x3 |q |+---+-----+----+
目标数据如下:
+----+---------------------+|type|value_list |+----+---------------------+|p |[name3, name2, name1]||q |[x3, x2, x1]|+----+---------------------+
spark-shell:
val df=Seq((1,"name1","p"),(2,"name2","p"),(3,"name3","p"),(1,"x1","q"),(2,"x2","q"),(3,"x3","q")).toDF("id","name","type")
df.show(false)
1.使用开窗函数
df.createOrReplaceTempView("test")
spark.sql("select type,max(c) as c1 from (select type,concat_ws('&',collect_list(trim(name)) over(partition by type order by id desc)) as c from test) as x group by type ")
因为使用开窗函数本身会使用比较多的资源,
这种方式在大数据量下性能会比较慢,所以尝试下面的操作。
2.使用struct和sort_array(array,asc?true,flase)的方式来进行,效率高些:
val df3=spark.sql("select type, concat_ws('&',sort_array(collect_list(struct(id,name)),false).name) as c from test group by type ")
df3.show(false)
例如:计算一个结果形如:
user_id stk_id:action_type:amount:price:time stk_id:action_type:amount:price:time stk_id:action_type:amount:price:time stk_id:action_type:amount:price:time
需要按照time 升序排,则:
Dataset<Row> splitStkView = session.sql("select client_id, innercode, entrust_bs, business_amount, business_price, trade_date from\n"+"(select client_id,\n"+" split(action,':')[0] as innercode,\n"+" split(action,':')[1] as entrust_bs,\n"+" split(action,':')[2] as business_amount,\n"+" split(action,':')[3] as business_price,\n"+" split(action,':')[4] as trade_date,\n"+" ROW_NUMBER() OVER(PARTITION BY split(action,':')[0] ORDER BY split(action,':')[4] DESC) AS rn\n"+"from stk_temp)\n"+"where rn <= 5000");
splitStkView.createOrReplaceTempView("splitStkView");Dataset<Row> groupStkView = session.sql("select client_id, CONCAT(innercode, ':', entrust_bs, ':', business_amount, ':', business_price, ':', trade_date) as behive, trade_date from splitStkView");
groupStkView.createOrReplaceTempView("groupStkView");Dataset<Row> resultData = session.sql("SELECT client_id, concat_ws('\t',sort_array(collect_list(struct(trade_date, behive)),true).behive) as behives FROM groupStkView GROUP BY client_id");
3.udf的方式
importorg.apache.spark.sql.functions._
importorg.apache.spark.sql._
val sortUdf =udf((rows:Seq[Row])=>{
rows.map {caseRow(id:Int, value:String)=>(id, value)}.sortBy {case(id, value)=>-id }//id if asc.map {case(id, value)=> value }})
val grouped = df.groupBy(col("type")).agg(collect_list(struct("id","name")) as "id_name")
val r1 = grouped.select(col("type"),sortUdf(col("id_name")).alias("value_list"))
r1.show(false)
版权归原作者 容若只如初见 所有, 如有侵权,请联系我们删除。