0


Flink学习19:算子介绍keyBy

1.keyBy简介

主要作用:把相同的数据,汇总到相同的分区中

(数据本来是分布在不同的slot中,keyBy会把相同的数据拉到相同的slot中)

2.keyBy的使用

在使用keyBy时候,需要向keyBy传递一个参数,告诉其按照哪个字段进行归类。

有2种传递参数的方式,

1.传递位置的数值

示例:

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object keyByTest {
  def main(args: Array[String]): Unit = {
    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //create ds
    val ds = env.fromElements(("张三", 4), ("张三", 2), ("leo", 5), ("leo", 1),("raj", 8), ("giao", 7))

    val keyByedDs = ds.keyBy(0)

    keyByedDs.print()

    env.execute()

  }

}

输出结果:

2.通过名称进行keyBy

示例:

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object keyByNameTest {

  //defined the dataSource's data type
  case class StockPrice(stockId:String, timestamp: Long, price:Double)

  def main(args: Array[String]): Unit = {

    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //create ds

    val pricesList = List(StockPrice("stock1", 154545454, 1212.23), StockPrice("stock1", 154545454, 1212.23), StockPrice("stock2", 154545454, 666.23), StockPrice("stock3", 154545454, 888.23))

    val ds = env.fromCollection(pricesList)

    //transformation
    val keyByedDs = ds.keyBy("stockId")

    keyByedDs.print()

    env.execute()

  }

}

输出结果:

标签: flink 学习 大数据

本文转载自: https://blog.csdn.net/hzp666/article/details/126267055
版权归原作者 hzp666 所有, 如有侵权,请联系我们删除。

“Flink学习19:算子介绍keyBy”的评论:

还没有评论