keyBy 如何指定key
DataStream
DataStream<…> input = // […]
DataStream<…> windowed = input
.keyBy(/define key here/)
.window(/window specification/);
类似于mysql中的join操作:select a.* , b.* from a join b on a.id=b.id
这里的keyBy就是a.id=b.id
有哪几种方式定义Key?
方式一:Tuple
DataStream<Tuple3<Integer,String,Long>> input = // […]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
可以传字段的位置
DataStream<Tuple3<Integer,String,Long>> input = // […]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
可以传字段位置的组合
这对于简单的使用时没问题的。但是对于内嵌的Tuple,如下所示:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
如果使用keyBy(0),那么他就会使用整个Tuple2<Integer, Float>作为key,(因为Tuple2<Integer, Float>是Tuple3<Tuple2<Integer, Float>,String,Long>的0号位置)。如果想要指定key到Tuple2<Integer, Float>内部中,可以使用下面的方式。
方式二:字段表达式
我们可以使用基于字符串字段表达式来引用内嵌字段去定义key。
之前我们的算子写法是这样的:
text.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out)throwsException{String[] tokens = value.toLowerCase().split(",");for(String token: tokens){if(token.length()>0){
out.collect(newTuple2<String,Integer>(token,1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
其中的new FlatMapFunction<String, Tuple2<String, Integer>>表示输入是一个String,输出是一个Tuple2<String, Integer>。这里我们重新定义一个内部类:
publicstaticclassWC{privateString word;privateint count;publicWC(){}publicWC(String word,int count){this.word = word;this.count = count;}@OverridepublicStringtoString(){return"WC{"+"word='"+ word +'\''+", count="+ count +'}';}publicStringgetWord(){return word;}publicvoidsetWord(String word){this.word = word;}publicintgetCount(){return count;}publicvoidsetCount(int count){this.count = count;}}
修改算子的写法:
text.flatMap(newFlatMapFunction<String, WC>(){@OverridepublicvoidflatMap(String value,Collector<WC> out)throwsException{String[] tokens = value.toLowerCase().split(",");for(String token : tokens){if(token.length()>0){
out.collect(newWC(token,1));}}}}).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);
将原来的输出Tuple2<String, Integer>,修改为输出WC类型;将原来的keyBy(0)修改为keyBy(“word”);将原来的sum(1)修改为sum(“count”)
因此,在这个例子中我们有一个POJO类,有两个字段分别是"word"和"count",可以传递字段名到keyBy(“”)中。
语法:
字段名一定要与POJO类中的字段名一致。一定要提供默认的构造函数,和get与set方法。
使用Tuple时,0表示第一个字段
可以使用嵌套方式,举例如下:
publicstaticclassWC{publicComplexNestedClass complex;//nested POJOprivateint count;// getter / setter for private field (count)publicintgetCount(){return count;}publicvoidsetCount(int c){this.count = c;}}publicstaticclassComplexNestedClass{publicInteger someNumber;publicfloat someFloat;publicTuple3<Long,Long,String> word;publicIntWritable hadoopCitizen;}
“count”,指向的是WC中的字段count
“complex”,指向的是复杂数据类型,会递归选择所有ComplexNestedClass的字段
“complex.word.f2”,指向的是Tuple3中的最后一个字段。
“complex.hadoopCitizen”,指向的是Hadoop IntWritable type
scala写法:
object StreamingWCScalaApp{
def main(args:Array[String]):Unit={
val env =StreamExecutionEnvironment.getExecutionEnvironment
// 引入隐式转换importorg.apache.flink.api.scala._
val text = env.socketTextStream("192.168.152.45",9999)
text.flatMap(_.split(",")).map(x =>WC(x,1)).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1)
env.execute("StreamingWCScalaApp");}caseclassWC(word:String, count:Int)}
方式三:key选择器函数
.keyBy(newKeySelector<WC,Object>(){@OverridepublicObjectgetKey(WC value)throwsException{return value.word;}})
其中WC是输入类型,Object是数据类型
在这里插入代码片
版权归原作者 wzf_winner 所有, 如有侵权,请联系我们删除。