1.keyBy简介
主要作用:把相同的数据,汇总到相同的分区中
(数据本来是分布在不同的slot中,keyBy会把相同的数据拉到相同的slot中)
2.keyBy的使用
在使用keyBy时候,需要向keyBy传递一个参数,告诉其按照哪个字段进行归类。
有2种传递参数的方式,
1.传递位置的数值
示例:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object keyByTest {
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create ds
val ds = env.fromElements(("张三", 4), ("张三", 2), ("leo", 5), ("leo", 1),("raj", 8), ("giao", 7))
val keyByedDs = ds.keyBy(0)
keyByedDs.print()
env.execute()
}
}
输出结果:
2.通过名称进行keyBy
示例:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object keyByNameTest {
//defined the dataSource's data type
case class StockPrice(stockId:String, timestamp: Long, price:Double)
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create ds
val pricesList = List(StockPrice("stock1", 154545454, 1212.23), StockPrice("stock1", 154545454, 1212.23), StockPrice("stock2", 154545454, 666.23), StockPrice("stock3", 154545454, 888.23))
val ds = env.fromCollection(pricesList)
//transformation
val keyByedDs = ds.keyBy("stockId")
keyByedDs.print()
env.execute()
}
}
输出结果:
本文转载自: https://blog.csdn.net/hzp666/article/details/126267055
版权归原作者 hzp666 所有, 如有侵权,请联系我们删除。
版权归原作者 hzp666 所有, 如有侵权,请联系我们删除。