Flink中的keyBy操作是用于对数据进行分区的一种操作,它通过指定一个或多个键(key)将数据流逻辑上划分为不同的分区,以便进行并行处理。
在Flink中,当我们需要对海量数据进行聚合处理时,通常会先进行分区,以提高处理效率。keyBy操作就是实现这一目的的关键步骤。通过keyBy操作,我们可以根据指定的键将数据流划分为不同的分区,每个分区内的数据将发送到同一个分区进行处理。这种分区的方式是通过计算键的哈希值,并通过对分区数取模运算来实现的。因此,具有相同键的数据会被发送到同一个分区进行处理,从而实现数据的并行聚合。
在指定键的方式上,Flink提供了多种灵活性。例如,对于Tuple数据类型,可以指定字段的位置或多个位置的组合作为键;对于POJO类型,可以指定字段的名称;此外,还可以使用Lambda表达式或实现KeySelector接口来定义从数据中提取键的逻辑。这种灵活性使得keyBy操作能够适应各种复杂的数据处理场景。
此外,窗口操作是Flink中另一个重要的概念,用于处理无界数据流。窗口操作可以将连续的数据流切割成有限大小的多个“存储桶”,每个数据都会被分发到对应的桶中。当达到窗口结束时间时,对每个桶中收集的数据进行计算处理。这种机制使得Flink能够高效地处理实时数据流,实现数据的聚合和统计。
综上所述,Flink中的keyBy操作是数据处理中不可或缺的一步,它通过指定键对数据进行分区,以实现并行处理和聚合操作的高效执行。同时,结合窗口操作,Flink能够更好地处理实时数据流,满足各种数据处理需求
对数据分组主要是为了进行后续的聚合操作,即对同组数据进行聚合分析。
keyBy
会将一个
DataStream
转化为一个
KeyedStream
绝大多数情况,我们要根据事件的某种属性或数据的某个字段进行分组,对一个分组内的数据进行处理。如下图所示,
keyBy
算子根据元素的形状对数据进行分组,相同形状的元素被分到了一起,可被后续算子统一处理。比如,多支股票数据流处理时,可以根据股票代号进行分组,然后对同一股票代号的数据统计其价格变动。又如,电商用户行为日志把所有用户的行为都记录了下来,如果要分析某一个用户行为,需要先按用户ID进行分组
keyBy
算子将
DataStream
转换成一个
KeyedStream
。
KeyedStream
是一种特殊的
DataStream
,事实上,
KeyedStream
继承了
DataStream
,
DataStream
的各元素随机分布在各Task Slot中,
KeyedStream
的各元素按照Key分组,分配到各Task Slot中。我们需要向
keyBy
算子传递一个参数,以告知Flink以什么字段作为Key进行分组
DataStream<Tuple2<String, Integer>> dataStream = ...;
// 使用KeyBy算子将数据流分区
DataStream<Tuple2<String, Integer>> keyedStream = dataStream.keyBy(0); // 根据第一个元素作为Key
比如如下需求假设我们有一份订单数据流,包含订单ID、用户ID和订单金额三个字段,我们希望根据用户ID将数据流分区,并对每个用户的订单金额求和。代码如下所示:
// 定义订单数据类
public class Order {
public String orderId;
public String userId;
public double amount;
public Order(String orderId, String userId, double amount) {
this.orderId = orderId;
this.userId = userId;
this.amount = amount;
}
}
// 生成订单数据流
List<Order> orders = new ArrayList<>();
orders.add(new Order("1", "user1", 200.0));
orders.add(new Order("2", "user2", 200.0));
orders.add(new Order("3", "user1", 300.0));
orders.add(new Order("4", "user2", 400.0));
DataStream<Order> orderStream = env.fromCollection(orders);
// 使用KeyBy算子将数据流按照用户ID分区,并对每个用户的订单金额求和
DataStream<Tuple2<String, Double>> sumStream = orderStream
.keyBy(order -> order.userId) // 根据用户ID作为Key进行分区
.sum("amount"); // 对每个分区的订单金额求和
// 输出分区后的结果
sumStream.print();
在上述代码中,我们首先定义了一个Order类,用于表示订单数据。然后生成一份包含4个订单的数据流,并使用KeyBy算子将数据流按照用户ID分区。接着,我们调用sum算子对每个分区的订单金额求和,并将计算结果打印输出。 运行代码后,我们可以得到如下的输出结果:
(user1, 500.0)
(user2, 600.0)
从输出结果可以看出,KeyBy算子将订单数据流按照用户ID分成了两个分区,分别对应用户user1和user2,而sum算子则对每个分区的订单金额进行了求和。这样,我们就成功地完成了基于Key的聚合操作。 需要注意的是,KeyBy算子只能将数据流按照指定的Key进行分区,而无法对分区进行任何修改。如果需要对分区进行修改或者调整,可以使用其他分区算子,例如Shuffle、Rebalance等。
实例1:要求按照名称分组,相同名称的放在一个分区里,有数据如下:
abc,1
jek,2
abc,3
代码如下:
package flink.transform.keyby;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**abc,1
* jek,2
* abc,3
* 分组,第一个逗号前面的单词相同的分在一个线程里,以姓名为key进行分组
*
*/
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> numbersString = env.socketTextStream("127.0.0.1",9999);
KeyedStream<String,String> keyedStream =numbersString.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String s) throws Exception {
return s.split(",")[0];
}
});
keyedStream.print();
env.execute("Flink Filter Example");
}
}
实例2:要求按照名称分组,并分装在person类里,数据如下
tom,23
rose,28
tom,25
package flink.transform.keyby;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* tom,23
* rose,28
* tom,25
* 通过MAP操作封装成JAVAbean ,映射变换
* keyby分组
*/
public class Test1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> numbersString = env.socketTextStream("127.0.0.1",9999);
DataStream<Person> ss = numbersString.map(new MapFunction<String, Person>() {
@Override
public Person map(String s) throws Exception {
String[] array = s.split(",");
Person person = new Person();
person.setName(array[0]);
person.setSex(Integer.parseInt(array[1]));
return person;
}
});
KeyedStream<Person,String> sss = ss.keyBy(new KeySelector<Person, String>() {
@Override
public String getKey(Person person) throws Exception {
return person.getName();
}
});
sss.print();
env.execute("Flink Filter Example");
}
}
实例3,要求按照名称分组,组装成tupple元组,需要熟悉Tupple2的用法
数据如下:
tom,23
rose,28
tom,25
代码如下:
package flink.transform.keyby;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* tom,23
* rose,28
* tom,25
* 通过MAP操作映射变换成元组
*
*/
public class Test2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> numbersString = env.socketTextStream("127.0.0.1",9999);
DataStream<Tuple2<String, Integer>> ss = numbersString.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] array = s.split(",");
return new Tuple2<>(array[0],Integer.parseInt(array[1]));
}
});
KeyedStream<Tuple2<String,Integer>,String> sss = ss.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
@Override
public String getKey(Tuple2<String,Integer> tp) throws Exception {
return tp.f0 ;
}
});
sss.print();
env.execute("Flink Filter Example");
}
}
版权归原作者 Allen019 所有, 如有侵权,请联系我们删除。