1.聚合算子简介
常见的聚合算子 sum,max,min等
聚合算子可以在在keyedStream 流上进行滚动的聚合(即累计的操作),而且同一个 keyedStream 流上只能调用一次 聚合算子
sum 示例:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object aggregationTest {
//defined the dataSource's type
case class StockPrice(stockId:String, timeStamp:Long, price:Double)
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//generate ds
val stockList = List(StockPrice("stock_1", 66666, 1)
, StockPrice("stock_1", 8888, 2)
, StockPrice("stock_2", 77777, 1)
, StockPrice("stock_2", 999, 3)
, StockPrice("stock_3", 3333, 1)
)
val ds = env.fromCollection(stockList)
//transformation
val keyedStream = ds.keyBy("stockId")
val sumedStream = keyedStream.sum(2)
sumedStream.print()
env.execute()
}
}
输出结果:
max示例:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//defined the dataSource's type
case class StockPrice(stockId:String, timeStamp:Long, price:Double)
object maxTest {
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//generate ds
val stockList = List(StockPrice("stock_1", 66666, 1)
, StockPrice("stock_1", 8888, 2)
, StockPrice("stock_2", 77777, 1)
, StockPrice("stock_2", 999, 3)
, StockPrice("stock_3", 3333, 1)
)
val ds = env.fromCollection(stockList)
//transformation
val keyedStream = ds.keyBy("stockId")
val maxedStream = keyedStream.max(2)
maxedStream.print()
env.execute()
}
}
输出结果:
本文转载自: https://blog.csdn.net/hzp666/article/details/126296749
版权归原作者 hzp666 所有, 如有侵权,请联系我们删除。
版权归原作者 hzp666 所有, 如有侵权,请联系我们删除。