目录
Flink专栏目录(点击进入…)
Flink数据管道 & ETL(无状态的转换、Keyed Stream 的聚合、有状态的转换)
Flink数据管道 & ETL
Flink 的一种常见应用场景是 ETL(抽取、转换、加载)管道任务。从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。
注意:Flink 的 Table 和 SQL API 完全可以满足很多 ETL 使用场景。但无论最终是否直接使用 DataStream API,对这里介绍的基本知识有扎实的理解都是有价值的。
无状态的转换
①map()
map() 通常用于对数据进行简单的变换,比如将数据进行格式转换、提取某些字段、或者做简单的计算。
示例:简单的 map() 转换
假设有一组用户数据流,包含用户 ID 和年龄,现在我们需要将年龄加 1 作为新的年龄输出。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class MapExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟数据源:Tuple2<用户ID, 年龄>
DataStream<Tuple2<String, Integer>> userStream = env.fromElements(
new Tuple2<>("user_1", 25),
new Tuple2<>("user_2", 30),
new Tuple2<>("user_3", 22)
);
// 使用 map() 转换,将年龄加1
DataStream<Tuple2<String, Integer>> updatedUserStream = userStream.map(
new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) {
return new Tuple2<>(value.f0, value.f1 + 1); // 将年龄加1
}
}
);
// 打印结果
updatedUserStream.print();
// 执行程序
env.execute("Flink Map Example");
}
}
代码说明
(1)数据流 (userStream):这是一个包含用户 ID 和年龄的流,使用 Tuple2<String, Integer> 表示(用户ID,年龄)。
(2)map() 转换:我们通过 map() 操作,对输入的每个元组中的年龄字段进行加 1 操作,生成一个新的 Tuple2,并返回。
(3)无状态性:map() 操作对每个输入元素执行独立的转换,不依赖之前处理过的元素,也不会保存任何状态。
输出结果
(user_1,26)
(user_2,31)
(user_3,23)
可以看到,每个用户的年龄都增加了 1。
更多用例
(1)数据类型转换:将字符串转换为对象或将对象序列化。
(2)字段计算:如从传感器数据流中读取温度并将其从摄氏度转换为华氏度。
(3)简单过滤:虽然 filter() 更适合过滤场景,但也可以通过 map() 返回 null 或特定值来实现。
这种无状态的 map() 适用于简单的转换和处理,不涉及历史数据,也不需要保存状态。
②flatmap()
MapFunction 只适用于一对一的转换:对每个进入算子的流元素,map() 将仅输出一个转换后的元素。
flatMap() 是一种强大的无状态转换操作,它可以将输入的每个元素映射为零个、一个或多个输出元素。与 map() 不同,flatMap() 可以输出任意数量的结果(包括零个),因此它非常适合需要从一个输入产生多个输出的场景,比如数据拆分、过滤、清洗等。
flatMap() 的使用场景
(1)数据拆分:例如将一行文本拆分为多个单词。
(2)过滤并变换:可以过滤不需要的元素,同时对其他元素进行变换。
(3)嵌套数据展开:当数据包含嵌套结构时,可以使用 flatMap() 将其展开
示例:文本拆分(Word Count)
假设我们有一组句子构成的文本流,需要将每句话拆分成单词,并输出每个单词。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlatMapExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟数据源:输入的句子流
DataStream<String> textStream = env.fromElements(
"Flink is powerful",
"Flink supports batch and stream processing"
);
// 使用 flatMap() 进行拆分操作:将句子拆分成单词
DataStream<String> wordStream = textStream.flatMap(
new FlatMapFunction<String, String>() {
@Override
public void flatMap(String sentence, Collector<String> out) {
// 按空格拆分句子
String[] words = sentence.split(" ");
// 收集每个单词
for (String word : words) {
out.collect(word); // 将每个单词发出
}
}
}
);
// 打印结果
wordStream.print();
// 执行程序
env.execute("Flink FlatMap Example");
}
}
代码说明
(1)数据流 (textStream):这是一个句子流,每个元素是一行文本。
(2)flatMap() 转换:使用 flatMap() 将每个句子按空格拆分为单词。Collector<String> out 用来输出结果,每次调用 out.collect() 将一个新元素发送到输出流。
(3)无状态性:flatMap() 对每个句子独立处理,不依赖之前的输入数据,也不存储状态。
输出结果
Flink
is
powerful
Flink
supports
batch
and
stream
processing
可以看到,输入的两句文本被拆分成了 8 个单词,并逐一输出。
Keyed Stream 的聚合
①keyBy()
keyBy() 是一种将数据流按键分组的无状态转换操作,它根据指定的键对数据流进行逻辑上的分区,并且会将具有相同键的元素发送到同一个分区。在 keyBy() 之后,所有的后续操作(如 reduce, sum, fold 等)都会基于这个键进行计算。
例如:如果你想找到从每个网格单元出发的最远的出租车行程。按 SQL 查询的方式来考虑,这意味着要对 startCell 进行 GROUP BY 再排序,在 Flink 中这部分可以用keyBy(KeySelector) 实现。
keyBy() 操作虽然对数据进行了逻辑分区,但并不会对数据流产生任何状态变化,因此它本身是无状态的。
每个 keyBy 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。
keyBy() 的使用场景
(1)分组聚合:在数据流上按某个字段进行分组,并对每个分组进行计算(如累加、平均等)。
(2)分组操作:对于具有相同键的元素,应用进一步的操作,如 window 或 reduce。
(3)动态路由:将具有相同键的元素路由到同一个任务槽(task slot)。
示例:基于用户 ID 的分组统计
假设有一个包含用户 ID 和消费金额的数据流,我们希望按用户 ID 进行分组,并计算每个用户的总消费金额
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyByExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟数据源:Tuple2<用户ID, 消费金额>
DataStream<Tuple2<String, Double>> purchases = env.fromElements(
new Tuple2<>("user_1", 50.0),
new Tuple2<>("user_2", 20.0),
new Tuple2<>("user_1", 30.0),
new Tuple2<>("user_3", 40.0),
new Tuple2<>("user_2", 60.0)
);
// 使用 keyBy() 按用户 ID 分组,进行消费金额的累加
DataStream<Tuple2<String, Double>> totalPurchases = purchases
.keyBy(value -> value.f0) // 按用户ID进行分组
.sum(1); // 累加消费金额
// 打印结果
totalPurchases.print();
// 执行程序
env.execute("Flink KeyBy Example");
}
}
代码说明
(1)数据流 (temperatureStream):包含传感器 ID 和温度数据的流,用 Tuple2<String, Double> 表示(传感器ID, 温度)。
(2)keyBy() 分组:按传感器 ID 分组。
(3)min() 计算最小值:计算每个传感器的最低温度(第二个字段 f1 表示温度)。
无状态性:keyBy() 只是按键进行分组,没有存储状态,后续的 min() 操作会根据键计算每个组的最小值。
输出结果
(sensor_1,35.0)
(sensor_2,36.5)
(sensor_1,34.8)
(sensor_3,40.0)
(sensor_2,33.9)
可以看到,对于 sensor_1 和 sensor_2,温度数据流中的最低值被实时计算和更新。
总结
(1)keyBy() 是无状态的,它将数据流按指定的键进行分组,不涉及状态管理。
(2)在 keyBy() 分组之后,数据流被逻辑上分区,后续的操作(如 sum(), min(), reduce() 等)将基于分组后的数据进行计算。
(3)keyBy() 非常适合分组聚合场景,如根据用户、设备或其他关键字段进行分组统计或计算。
通过 keyBy(),Flink 能高效地处理大规模数据流中的分组操作,将具有相同键的数据发送到同一个分区以便后续的聚合或计算操作。
②reduce()
reduce() 是一种聚合操作,用于连续地对数据流中的元素进行聚合。它是 DataStream API 中的一部分,适用于需要将一组元素逐步合并为一个单一元素的场景。
reduce() 的用法
reduce() 函数通常会接受一个匿名函数或自定义的 ReduceFunction,定义如何合并两个元素。Flink 会以流的方式处理数据,逐个对元素进行累积处理。
代码示例
以下是一个简单的使用 reduce() 进行聚合的示例,它对输入流中的整数进行累加:
// 导入 Flink 相关类
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.ReduceFunction;
public class ReduceExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟数据流
DataStream<Integer> numberStream = env.fromElements(1, 2, 3, 4, 5);
// 使用 reduce 对数据进行累加
DataStream<Integer> reducedStream = numberStream
.keyBy(value -> 1) // 将所有元素分配到同一个组
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) {
return value1 + value2; // 聚合逻辑:累加两个元素
}
});
// 输出结果
reducedStream.print();
// 执行程序
env.execute("Flink Reduce Example");
}
}
关键点:
(1)keyBy():在使用 reduce() 之前,通常需要将数据按某种键进行分组。这样可以确保同一组中的元素会被应用 reduce()。
(2)reduce():reduce() 接受两个参数(即数据流中的两个元素),并返回一个聚合后的结果。这个结果会与流中的下一个元素继续进行聚合。
适用场景:
(1)对数据进行增量聚合,比如求和、求最小值、最大值等。
(2)reduce() 的聚合操作是有状态的,因此适用于需要在流数据上做持续累积计算的场景。
有状态的转换
Flink 为什么要参与状态管理?
在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:
(1)本地性:Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
(2)持久性:Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
(3)纵向可扩展性:Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
(4)横向可扩展性:Flink 状态可以随着集群的扩缩容重新分布
Rich Functions
至此已经有了 Flink 的几种函数接口,包括 FilterFunction, MapFunction,和 FlatMapFunction。这些都是单一抽象方法模式。
对其中的每一个接口,Flink 同样提供了一个所谓 “rich” 的变体,如 RichFlatMapFunction,其中增加了以下方法,包括:
**①open(Configuration c)**:仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。
②close()
**③getRuntimeContext()**:为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。
Keyed State
Keyed State 是一种专用于基于键的流处理状态(state),适用于在 keyBy() 操作之后的数据流处理场景。通过 Keyed State,Flink 可以为每个唯一的键(key)维护独立的状态,确保不同键之间的状态互不影响。
Keyed State 是有状态流处理中的核心概念,允许开发者存储和更新每个键相关联的状态,进而实现像窗口操作、去重、聚合等复杂的流处理逻辑。
关键的 Keyed State 类型:
(1)ValueState:存储单个值的状态。
(2)ListState:存储多个元素的状态(类似列表)。
(3)MapState:存储键值对的状态。
(4)ReducingState:允许指定归约函数的聚合状态。
ValueState 例子
下面是一个使用 ValueState 的简单例子,模拟一个计数器,计算每个键分组下的整数总和。
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class KeyedStateExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟输入数据流
DataStream<Tuple2<String, Integer>> stream = env.fromElements(
Tuple2.of("key1", 1),
Tuple2.of("key2", 2),
Tuple2.of("key1", 3),
Tuple2.of("key2", 4),
Tuple2.of("key1", 5)
);
// 应用 Keyed State
DataStream<String> resultStream = stream
.keyBy(value -> value.f0) // 按照 key 进行分组
.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, String>() {
// 定义一个 ValueState 用于保存每个 key 的累积和
private transient ValueState<Integer> sumState;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
// 初始化状态
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
"sumState", // 状态名称
Types.INT // 状态类型
);
sumState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
// 获取当前的状态值
Integer currentSum = sumState.value();
// 如果状态为空,初始化为 0
if (currentSum == null) {
currentSum = 0;
}
// 更新状态:累加当前元素的值
currentSum += value.f1;
sumState.update(currentSum);
// 输出当前 key 和它的累积和
out.collect("Key: " + value.f0 + ", Sum: " + currentSum);
}
});
// 打印结果
resultStream.print();
// 执行程序
env.execute("Flink Keyed State Example");
}
}
代码说明:
(1)keyBy():首先对流中的元素按键进行分组。每个分组的元素会被独立处理,并拥有各自的状态。
(2)ValueState:为每个键保存一个累积和的状态。
ValueStateDescriptor 用于定义状态的名称和类型。
状态在每个键内是独立的,不同的键(例如 key1 和 key2)会维护各自的状态。
(3)processElement():处理每个输入元素,更新其累积和状态,并输出结果。
输出示例:
Key: key1, Sum: 1
Key: key2, Sum: 2
Key: key1, Sum: 4
Key: key2, Sum: 6
Key: key1, Sum: 9
在这个例子中,key1 和 key2 各自维护了自己的累积和。通过 Flink 的 Keyed State,你可以轻松地对不同键进行独立的状态管理。
扩展:
如果需要在窗口中使用 Keyed State 或使用更复杂的状态管理,可以结合 Flink 的窗口操作和其他类型的状态(如 ListState、MapState)来实现更加复杂的逻辑。
清理状态
Flink 中,状态(state)是流处理任务中的重要组成部分,尤其在有状态的操作(如 Keyed State 和窗口)中。然而,状态的积累会导致内存和存储的消耗不断增加,因此对状态的清理(cleanup)显得非常重要。Flink 提供了多种方式来自动或手动清理状态,以防止状态的无限增长。
(1)TTL(Time-to-Live) 机制
Flink 允许为状态设置过期时间(TTL),在 TTL 到期后,状态将被标记为过期,并会在后续访问时被清理。
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.state.ValueStateDescriptor;
// 创建 TTL 配置
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(1)) // 设置 TTL 时间为 1 小时
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 配置何时更新 TTL(创建或写入时)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 配置状态过期后是否能被访问
.build();
// 创建状态描述符并绑定 TTL 配置
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("myState", Integer.class);
stateDescriptor.enableTimeToLive(ttlConfig);
// 将描述符绑定到状态
ValueState<Integer> state = getRuntimeContext().getState(stateDescriptor);
关键点:
(1)StateTtlConfig.UpdateType:指定何时更新 TTL,常见的选项有 OnCreateAndWrite 和 OnReadAndWrite。
(2)StateTtlConfig.StateVisibility:配置状态过期后是否还能被读取。
(3)TTL 清理不是立即的,而是在状态被访问时清理过期状态。
(2)窗口状态的自动清理
Flink 的窗口操作(Windowing API)内置了状态清理机制。当窗口触发后,Flink 会自动清理对应的状态。一般来说,当窗口计算结束且结果输出后,窗口对应的状态就会被自动删除。
例如:在使用 TumblingWindow 或 SlidingWindow 时,窗口的状态会在窗口关闭时自动清理。
(3)手动清理 Keyed State
在某些情况下,可能需要手动清理特定的键的状态。通过 State.clear() 方法可以手动清理状态。
public class MyProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
private transient ValueState<Integer> sumState;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("sumState", Integer.class);
sumState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
Integer currentSum = sumState.value();
if (currentSum == null) {
currentSum = 0;
}
currentSum += value.f1;
sumState.update(currentSum);
out.collect("Key: " + value.f0 + ", Sum: " + currentSum);
// 清理状态的条件:比如累加到某个阈值时
if (currentSum > 100) {
sumState.clear(); // 手动清理状态
}
}
}
在上面的示例中,当某个键的累积和超过100时,状态会被手动清除。
(4)通过定时器 (Timer) 触发状态清理
Flink 支持通过定时器(TimerService)在特定时间点触发事件,并可以在这些事件触发时执行状态清理。
public class TimerStateCleanupFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
private transient ValueState<Integer> sumState;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("sumState", Integer.class);
sumState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
Integer currentSum = sumState.value();
if (currentSum == null) {
currentSum = 0;
}
currentSum += value.f1;
sumState.update(currentSum);
// 注册一个 5 秒后的定时器
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 在定时器触发时清理状态
sumState.clear();
out.collect("State for key " + ctx.getCurrentKey() + " cleared at " + timestamp);
}
}
在这个例子中,registerProcessingTimeTimer() 注册了一个 5 秒的定时器,当时间到达时,onTimer() 方法会被触发,状态被清理。
(5)增量检查点 (Incremental Checkpoints) 和状态压缩
Flink 在管理状态的同时,还提供了增量检查点机制和 RocksDB 的状态压缩机制。即使没有清理状态,Flink 也能通过这些方式对状态进行高效的存储和管理,从而减少状态管理的资源消耗。
Non-keyed State
Non-keyed State 是指在不使用 keyBy() 进行分组的情况下进行状态管理。这种状态与 Keyed State 相比,是全局的,即状态不是基于特定键的,而是对于整个流或者每个并行任务都有独立的状态。Non-keyed State 通常用于需要在全局范围内或按并行实例管理状态的场景中,比如计时器、定时任务、统计全局信息等。
示例:使用 ListState 进行全局累加
以下是一个简单的例子,演示如何使用 Non-keyed State 实现全局累加(不基于键)。
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.RichFlatMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
public class NonKeyedStateExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟输入流
DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5, 6);
// 使用 Non-keyed State 进行全局累加
stream.flatMap(new RichFlatMapFunction<Integer, String>() {
// 定义 ListState 状态
private transient ListState<Integer> sumState;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
// 初始化 ListStateDescriptor,用于描述状态的类型和名称
ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>(
"sumState", // 状态名称
Types.INT // 状态的类型
);
sumState = getRuntimeContext().getListState(descriptor);
}
@Override
public void flatMap(Integer value, Collector<String> out) throws Exception {
// 将当前输入元素添加到状态中
sumState.add(value);
// 将状态中的所有元素累加起来
Iterable<Integer> currentState = sumState.get();
int sum = 0;
for (Integer i : currentState) {
sum += i;
}
// 输出当前的累加和
out.collect("Current Sum: " + sum);
}
}).print();
// 执行程序
env.execute("Non-keyed State Example");
}
}
代码说明:
(1)ListState:使用了 Non-keyed 状态 ListState 来存储每个输入元素的累加值。
(2)open() 方法:在 open() 方法中初始化状态描述符 ListStateDescriptor,并通过 getRuntimeContext() 获取运行时的状态句柄。
(3)flatMap():每次接收到新的元素时,将其添加到 ListState 中,然后遍历状态中的所有元素进行累加。
Non-keyed State 的使用场景
(1)全局统计信息:比如统计整个数据流中的总数、平均值、最大值等,不基于某个键进行分组。
(2)定时任务:可以在 Non-keyed State 中使用定时器管理一些全局的计时或清理任务。
(3)全局连接状态:当处理需要管理全局资源或者连接状态的操作时,Non-keyed State 可以帮助管理这些连接的状态。
和 Keyed State 的区别
Keyed State 是按键(key)管理的状态,在 keyBy() 后,每个键拥有独立的状态。
Non-keyed State 是按并行任务管理的状态,状态不依赖任何键,因此每个并行任务的状态是独立的。
Connected Streams:连接流
Connected Streams 是一种强大的机制,它允许将两个不同类型的流(DataStream)连接在一起,从而能够在流处理中进行复杂的操作。与合并两个相同类型的流不同,Connected Streams 保留了两个流的数据类型,并允许对它们应用不同的处理逻辑。
预定义转换
连接流
有时想要更灵活地调整转换的某些功能,比如:数据流的阈值、规则或者其他参数。Flink 支持这种需求的模式称为 connected streams ,一个单独的算子有两个输入流。
connected stream 也可以被用来实现流的关联。
使用场景
(1)控制流和数据流
其中一个流可以用作主数据流,而另一个流作为控制流,控制如何处理主数据流中的记录。
(2)合并不同类型的数据流
可以将两个类型不同的数据流连接起来,同时对它们分别处理。
(3)状态共享
两个流在连接后可以共享状态信息。
使用步骤
(1)创建两个流
可以创建两个不同类型的 DataStream。
(2)连接两个流
使用 connect() 方法连接两个流。
(3)应用 CoProcessFunction 或 CoFlatMapFunction
连接后的流需要使用 CoProcessFunction 或 CoFlatMapFunction 来处理不同流中的元素。
示例(相同类型):
一个控制流是用来指定哪些词需要从 streamOfWords 里过滤掉的。 一个称为 ControlFunction 的 RichCoFlatMapFunction 作用于连接的流来实现这个功能。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);
DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);
control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
这里注意两个流只有键一致的时候才能连接。 keyBy 的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。
在这个例子中,两个流都是 DataStream<String> 类型的,并且都将字符串作为键。正如在下面看到的,RichCoFlatMapFunction 在状态中存了一个布尔类型的变量,这个变量被两个流共享。
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
RichCoFlatMapFunction 是一种可以被用于一对连接流的 FlatMapFunction,并且它可以调用 rich function 的接口。这意味着它可以是有状态的。
布尔变量 blocked 被用于记录在数据流 control 中出现过的键(在这个例子中是单词),并且这些单词从 streamOfWords 过滤掉。这是 keyed state,并且它是被两个流共享的,这也是为什么两个流必须有相同的键值空间。
在 Flink 运行时中,flatMap1 和 flatMap2 在连接流有新元素到来时被调用 —— 在我们的例子中,control 流中的元素会进入 flatMap1,streamOfWords 中的元素会进入 flatMap2。这是由两个流连接的顺序决定的,本例中为 control.connect(streamOfWords)。
认识到你没法控制 flatMap1 和 flatMap2 的调用顺序是很重要的。这两个输入流是相互竞争的关系,Flink 运行时将根据从一个流或另一个流中消费的事件做它要做的。对于需要保证时间和/或顺序的场景,你会发现在 Flink 的管理状态中缓存事件一直到它们能够被处理是必须的。(注意:如果你真的迫切需要,可以使用自定义的算子实现 InputSelectable 接口,在两输入算子消费它的输入流时增加一些顺序上的限制。)
示例(不相同类型):
**connect()**:用于连接两个流,流的类型可以不同。
CoFlatMapFunction:它提供了两个 flatMap 方法,分别处理连接的两个流中的数据。
flatMap1 和 flatMap2:分别对应两个流中的元素处理逻辑。
import org.apache.flink.api.common.functions.CoFlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class ConnectedStreamsExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建两个数据流,一个是整数流,一个是字符串流
DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
DataStream<String> stringStream = env.fromElements("a", "b", "c", "d");
// 将两个流连接在一起
ConnectedStreams<Integer, String> connectedStreams = intStream.connect(stringStream);
// 使用 CoFlatMapFunction 来处理两个流的数据
DataStream<String> resultStream = connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect("From int stream: " + value);
}
@Override
public void flatMap2(String value, Collector<String> out) {
out.collect("From string stream: " + value);
}
});
// 输出结果
resultStream.print();
// 执行流处理程序
env.execute("Flink Connected Streams Example");
}
}
适用的类和接口
CoProcessFunction:适用于状态更复杂的场景,允许在处理每个元素时同时维护和操作状态。
CoFlatMapFunction:更适合简单的元素转换场景。
版权归原作者 未禾 所有, 如有侵权,请联系我们删除。