0


7.Flink数据管道 & ETL(无状态的转换、Keyed Stream 的聚合、有状态的转换)

目录


Flink专栏目录(点击进入…)


Flink数据管道 & ETL(无状态的转换、Keyed Stream 的聚合、有状态的转换)

Flink数据管道 & ETL

Flink 的一种常见应用场景是 ETL(抽取、转换、加载)管道任务。从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。

注意:Flink 的 Table 和 SQL API 完全可以满足很多 ETL 使用场景。但无论最终是否直接使用 DataStream API,对这里介绍的基本知识有扎实的理解都是有价值的。

无状态的转换

①map()

map() 通常用于对数据进行简单的变换,比如将数据进行格式转换、提取某些字段、或者做简单的计算。

示例:简单的 map() 转换
假设有一组用户数据流,包含用户 ID 和年龄,现在我们需要将年龄加 1 作为新的年龄输出。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class MapExample {
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟数据源:Tuple2<用户ID, 年龄>
        DataStream<Tuple2<String, Integer>> userStream = env.fromElements(
            new Tuple2<>("user_1", 25),
            new Tuple2<>("user_2", 30),
            new Tuple2<>("user_3", 22)
        );

        // 使用 map() 转换,将年龄加1
        DataStream<Tuple2<String, Integer>> updatedUserStream = userStream.map(
            new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(Tuple2<String, Integer> value) {
                    return new Tuple2<>(value.f0, value.f1 + 1);  // 将年龄加1
                }
            }
        );

        // 打印结果
        updatedUserStream.print();

        // 执行程序
        env.execute("Flink Map Example");
    }
}

代码说明
(1)数据流 (userStream):这是一个包含用户 ID 和年龄的流,使用 Tuple2<String, Integer> 表示(用户ID,年龄)。
(2)map() 转换:我们通过 map() 操作,对输入的每个元组中的年龄字段进行加 1 操作,生成一个新的 Tuple2,并返回。
(3)无状态性:map() 操作对每个输入元素执行独立的转换,不依赖之前处理过的元素,也不会保存任何状态。

输出结果

(user_1,26)
(user_2,31)
(user_3,23)

可以看到,每个用户的年龄都增加了 1。

更多用例
(1)数据类型转换:将字符串转换为对象或将对象序列化。
(2)字段计算:如从传感器数据流中读取温度并将其从摄氏度转换为华氏度。
(3)简单过滤:虽然 filter() 更适合过滤场景,但也可以通过 map() 返回 null 或特定值来实现。

这种无状态的 map() 适用于简单的转换和处理,不涉及历史数据,也不需要保存状态。

②flatmap()

MapFunction 只适用于一对一的转换:对每个进入算子的流元素,map() 将仅输出一个转换后的元素。

flatMap() 是一种强大的无状态转换操作,它可以将输入的每个元素映射为零个、一个或多个输出元素。与 map() 不同,flatMap() 可以输出任意数量的结果(包括零个),因此它非常适合需要从一个输入产生多个输出的场景,比如数据拆分、过滤、清洗等。

flatMap() 的使用场景
(1)数据拆分:例如将一行文本拆分为多个单词。
(2)过滤并变换:可以过滤不需要的元素,同时对其他元素进行变换。
(3)嵌套数据展开:当数据包含嵌套结构时,可以使用 flatMap() 将其展开

示例:文本拆分(Word Count)
假设我们有一组句子构成的文本流,需要将每句话拆分成单词,并输出每个单词。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlatMapExample {
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟数据源:输入的句子流
        DataStream<String> textStream = env.fromElements(
            "Flink is powerful",
            "Flink supports batch and stream processing"
        );

        // 使用 flatMap() 进行拆分操作:将句子拆分成单词
        DataStream<String> wordStream = textStream.flatMap(
            new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String sentence, Collector<String> out) {
                    // 按空格拆分句子
                    String[] words = sentence.split(" ");
                    // 收集每个单词
                    for (String word : words) {
                        out.collect(word);  // 将每个单词发出
                    }
                }
            }
        );

        // 打印结果
        wordStream.print();

        // 执行程序
        env.execute("Flink FlatMap Example");
    }
}

代码说明
(1)数据流 (textStream):这是一个句子流,每个元素是一行文本。
(2)flatMap() 转换:使用 flatMap() 将每个句子按空格拆分为单词。Collector<String> out 用来输出结果,每次调用 out.collect() 将一个新元素发送到输出流。
(3)无状态性:flatMap() 对每个句子独立处理,不依赖之前的输入数据,也不存储状态。

输出结果

Flink
is
powerful
Flink
supports
batch
and
stream
processing

可以看到,输入的两句文本被拆分成了 8 个单词,并逐一输出。

Keyed Stream 的聚合

①keyBy()

keyBy() 是一种将数据流按键分组的无状态转换操作,它根据指定的键对数据流进行逻辑上的分区,并且会将具有相同键的元素发送到同一个分区。在 keyBy() 之后,所有的后续操作(如 reduce, sum, fold 等)都会基于这个键进行计算。

例如:如果你想找到从每个网格单元出发的最远的出租车行程。按 SQL 查询的方式来考虑,这意味着要对 startCell 进行 GROUP BY 再排序,在 Flink 中这部分可以用keyBy(KeySelector) 实现。

keyBy() 操作虽然对数据进行了逻辑分区,但并不会对数据流产生任何状态变化,因此它本身是无状态的。

每个 keyBy 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。
在这里插入图片描述

keyBy() 的使用场景
(1)分组聚合:在数据流上按某个字段进行分组,并对每个分组进行计算(如累加、平均等)。
(2)分组操作:对于具有相同键的元素,应用进一步的操作,如 window 或 reduce。
(3)动态路由:将具有相同键的元素路由到同一个任务槽(task slot)。

示例:基于用户 ID 的分组统计
假设有一个包含用户 ID 和消费金额的数据流,我们希望按用户 ID 进行分组,并计算每个用户的总消费金额

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KeyByExample {

    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟数据源:Tuple2<用户ID, 消费金额>
        DataStream<Tuple2<String, Double>> purchases = env.fromElements(
            new Tuple2<>("user_1", 50.0),
            new Tuple2<>("user_2", 20.0),
            new Tuple2<>("user_1", 30.0),
            new Tuple2<>("user_3", 40.0),
            new Tuple2<>("user_2", 60.0)
        );

        // 使用 keyBy() 按用户 ID 分组,进行消费金额的累加
        DataStream<Tuple2<String, Double>> totalPurchases = purchases
            .keyBy(value -> value.f0)  // 按用户ID进行分组
            .sum(1);  // 累加消费金额

        // 打印结果
        totalPurchases.print();

        // 执行程序
        env.execute("Flink KeyBy Example");
    }
    
}

代码说明
(1)数据流 (temperatureStream):包含传感器 ID 和温度数据的流,用 Tuple2<String, Double> 表示(传感器ID, 温度)。
(2)keyBy() 分组:按传感器 ID 分组。
(3)min() 计算最小值:计算每个传感器的最低温度(第二个字段 f1 表示温度)。
无状态性:keyBy() 只是按键进行分组,没有存储状态,后续的 min() 操作会根据键计算每个组的最小值。

输出结果

(sensor_1,35.0)
(sensor_2,36.5)
(sensor_1,34.8)
(sensor_3,40.0)
(sensor_2,33.9)

可以看到,对于 sensor_1 和 sensor_2,温度数据流中的最低值被实时计算和更新。

总结
(1)keyBy() 是无状态的,它将数据流按指定的键进行分组,不涉及状态管理。
(2)在 keyBy() 分组之后,数据流被逻辑上分区,后续的操作(如 sum(), min(), reduce() 等)将基于分组后的数据进行计算。
(3)keyBy() 非常适合分组聚合场景,如根据用户、设备或其他关键字段进行分组统计或计算。

通过 keyBy(),Flink 能高效地处理大规模数据流中的分组操作,将具有相同键的数据发送到同一个分区以便后续的聚合或计算操作。

②reduce()

reduce() 是一种聚合操作,用于连续地对数据流中的元素进行聚合。它是 DataStream API 中的一部分,适用于需要将一组元素逐步合并为一个单一元素的场景。

reduce() 的用法
reduce() 函数通常会接受一个匿名函数或自定义的 ReduceFunction,定义如何合并两个元素。Flink 会以流的方式处理数据,逐个对元素进行累积处理。

代码示例
以下是一个简单的使用 reduce() 进行聚合的示例,它对输入流中的整数进行累加:

// 导入 Flink 相关类
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.ReduceFunction;

public class ReduceExample {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟数据流
        DataStream<Integer> numberStream = env.fromElements(1, 2, 3, 4, 5);

        // 使用 reduce 对数据进行累加
        DataStream<Integer> reducedStream = numberStream
            .keyBy(value -> 1)  // 将所有元素分配到同一个组
            .reduce(new ReduceFunction<Integer>() {
                @Override
                public Integer reduce(Integer value1, Integer value2) {
                    return value1 + value2;  // 聚合逻辑:累加两个元素
                }
            });

        // 输出结果
        reducedStream.print();

        // 执行程序
        env.execute("Flink Reduce Example");
    }
}

关键点:
(1)keyBy():在使用 reduce() 之前,通常需要将数据按某种键进行分组。这样可以确保同一组中的元素会被应用 reduce()。
(2)reduce():reduce() 接受两个参数(即数据流中的两个元素),并返回一个聚合后的结果。这个结果会与流中的下一个元素继续进行聚合。

适用场景:
(1)对数据进行增量聚合,比如求和、求最小值、最大值等。
(2)reduce() 的聚合操作是有状态的,因此适用于需要在流数据上做持续累积计算的场景。

有状态的转换

Flink 为什么要参与状态管理?

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:
(1)本地性:Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
(2)持久性:Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
(3)纵向可扩展性:Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
(4)横向可扩展性:Flink 状态可以随着集群的扩缩容重新分布

Rich Functions

至此已经有了 Flink 的几种函数接口,包括 FilterFunction, MapFunction,和 FlatMapFunction。这些都是单一抽象方法模式。

对其中的每一个接口,Flink 同样提供了一个所谓 “rich” 的变体,如 RichFlatMapFunction,其中增加了以下方法,包括:

**①open(Configuration c)**:仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。
②close()
**③getRuntimeContext()**:为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。

Keyed State

Keyed State 是一种专用于基于键的流处理状态(state),适用于在 keyBy() 操作之后的数据流处理场景。通过 Keyed State,Flink 可以为每个唯一的键(key)维护独立的状态,确保不同键之间的状态互不影响。

Keyed State 是有状态流处理中的核心概念,允许开发者存储和更新每个键相关联的状态,进而实现像窗口操作、去重、聚合等复杂的流处理逻辑。

关键的 Keyed State 类型:
(1)ValueState:存储单个值的状态。
(2)ListState:存储多个元素的状态(类似列表)。
(3)MapState:存储键值对的状态。
(4)ReducingState:允许指定归约函数的聚合状态。

ValueState 例子
下面是一个使用 ValueState 的简单例子,模拟一个计数器,计算每个键分组下的整数总和。

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class KeyedStateExample {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟输入数据流
        DataStream<Tuple2<String, Integer>> stream = env.fromElements(
            Tuple2.of("key1", 1),
            Tuple2.of("key2", 2),
            Tuple2.of("key1", 3),
            Tuple2.of("key2", 4),
            Tuple2.of("key1", 5)
        );

        // 应用 Keyed State
        DataStream<String> resultStream = stream
            .keyBy(value -> value.f0)  // 按照 key 进行分组
            .process(new KeyedProcessFunction<String, Tuple2<String, Integer>, String>() {

                // 定义一个 ValueState 用于保存每个 key 的累积和
                private transient ValueState<Integer> sumState;

                @Override
                public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                    // 初始化状态
                    ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
                        "sumState", // 状态名称
                        Types.INT   // 状态类型
                    );
                    sumState = getRuntimeContext().getState(descriptor);
                }

                @Override
                public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                    // 获取当前的状态值
                    Integer currentSum = sumState.value();

                    // 如果状态为空,初始化为 0
                    if (currentSum == null) {
                        currentSum = 0;
                    }

                    // 更新状态:累加当前元素的值
                    currentSum += value.f1;
                    sumState.update(currentSum);

                    // 输出当前 key 和它的累积和
                    out.collect("Key: " + value.f0 + ", Sum: " + currentSum);
                }
            });

        // 打印结果
        resultStream.print();

        // 执行程序
        env.execute("Flink Keyed State Example");
    }
}

代码说明:
(1)keyBy():首先对流中的元素按键进行分组。每个分组的元素会被独立处理,并拥有各自的状态。
(2)ValueState:为每个键保存一个累积和的状态。
ValueStateDescriptor 用于定义状态的名称和类型。
状态在每个键内是独立的,不同的键(例如 key1 和 key2)会维护各自的状态。

(3)processElement():处理每个输入元素,更新其累积和状态,并输出结果。

输出示例:

Key: key1, Sum: 1
Key: key2, Sum: 2
Key: key1, Sum: 4
Key: key2, Sum: 6
Key: key1, Sum: 9

在这个例子中,key1 和 key2 各自维护了自己的累积和。通过 Flink 的 Keyed State,你可以轻松地对不同键进行独立的状态管理。

扩展:
如果需要在窗口中使用 Keyed State 或使用更复杂的状态管理,可以结合 Flink 的窗口操作和其他类型的状态(如 ListState、MapState)来实现更加复杂的逻辑。

清理状态

Flink 中,状态(state)是流处理任务中的重要组成部分,尤其在有状态的操作(如 Keyed State 和窗口)中。然而,状态的积累会导致内存和存储的消耗不断增加,因此对状态的清理(cleanup)显得非常重要。Flink 提供了多种方式来自动或手动清理状态,以防止状态的无限增长。

(1)TTL(Time-to-Live) 机制

Flink 允许为状态设置过期时间(TTL),在 TTL 到期后,状态将被标记为过期,并会在后续访问时被清理。

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.state.ValueStateDescriptor;

// 创建 TTL 配置
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(1))  // 设置 TTL 时间为 1 小时
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)  // 配置何时更新 TTL(创建或写入时)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)  // 配置状态过期后是否能被访问
    .build();

// 创建状态描述符并绑定 TTL 配置
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("myState", Integer.class);
stateDescriptor.enableTimeToLive(ttlConfig);

// 将描述符绑定到状态
ValueState<Integer> state = getRuntimeContext().getState(stateDescriptor);

关键点:
(1)StateTtlConfig.UpdateType:指定何时更新 TTL,常见的选项有 OnCreateAndWrite 和 OnReadAndWrite。
(2)StateTtlConfig.StateVisibility:配置状态过期后是否还能被读取。
(3)TTL 清理不是立即的,而是在状态被访问时清理过期状态。

(2)窗口状态的自动清理

Flink 的窗口操作(Windowing API)内置了状态清理机制。当窗口触发后,Flink 会自动清理对应的状态。一般来说,当窗口计算结束且结果输出后,窗口对应的状态就会被自动删除。

例如:在使用 TumblingWindow 或 SlidingWindow 时,窗口的状态会在窗口关闭时自动清理

(3)手动清理 Keyed State

在某些情况下,可能需要手动清理特定的键的状态。通过 State.clear() 方法可以手动清理状态。

public class MyProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
    private transient ValueState<Integer> sumState;

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("sumState", Integer.class);
        sumState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
        Integer currentSum = sumState.value();
        if (currentSum == null) {
            currentSum = 0;
        }

        currentSum += value.f1;
        sumState.update(currentSum);

        out.collect("Key: " + value.f0 + ", Sum: " + currentSum);

        // 清理状态的条件:比如累加到某个阈值时
        if (currentSum > 100) {
            sumState.clear();  // 手动清理状态
        }
    }
}

在上面的示例中,当某个键的累积和超过100时,状态会被手动清除。

(4)通过定时器 (Timer) 触发状态清理

Flink 支持通过定时器(TimerService)在特定时间点触发事件,并可以在这些事件触发时执行状态清理。

public class TimerStateCleanupFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {

    private transient ValueState<Integer> sumState;

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("sumState", Integer.class);
        sumState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
        Integer currentSum = sumState.value();
        if (currentSum == null) {
            currentSum = 0;
        }
        currentSum += value.f1;
        sumState.update(currentSum);

        // 注册一个 5 秒后的定时器
        ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        // 在定时器触发时清理状态
        sumState.clear();
        out.collect("State for key " + ctx.getCurrentKey() + " cleared at " + timestamp);
    }
}

在这个例子中,registerProcessingTimeTimer() 注册了一个 5 秒的定时器,当时间到达时,onTimer() 方法会被触发,状态被清理。

(5)增量检查点 (Incremental Checkpoints) 和状态压缩

Flink 在管理状态的同时,还提供了增量检查点机制和 RocksDB 的状态压缩机制。即使没有清理状态,Flink 也能通过这些方式对状态进行高效的存储和管理,从而减少状态管理的资源消耗。

Non-keyed State

Non-keyed State 是指在不使用 keyBy() 进行分组的情况下进行状态管理。这种状态与 Keyed State 相比,是全局的,即状态不是基于特定键的,而是对于整个流或者每个并行任务都有独立的状态。Non-keyed State 通常用于需要在全局范围内或按并行实例管理状态的场景中,比如计时器、定时任务、统计全局信息等。

示例:使用 ListState 进行全局累加
以下是一个简单的例子,演示如何使用 Non-keyed State 实现全局累加(不基于键)。

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.RichFlatMapFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;

public class NonKeyedStateExample {

    public static void main(String[] args) throws Exception {

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟输入流
        DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5, 6);

        // 使用 Non-keyed State 进行全局累加
        stream.flatMap(new RichFlatMapFunction<Integer, String>() {

            // 定义 ListState 状态
            private transient ListState<Integer> sumState;

            @Override
            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                // 初始化 ListStateDescriptor,用于描述状态的类型和名称
                ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>(
                        "sumState",  // 状态名称
                        Types.INT    // 状态的类型
                );
                sumState = getRuntimeContext().getListState(descriptor);
            }

            @Override
            public void flatMap(Integer value, Collector<String> out) throws Exception {
                // 将当前输入元素添加到状态中
                sumState.add(value);

                // 将状态中的所有元素累加起来
                Iterable<Integer> currentState = sumState.get();
                int sum = 0;
                for (Integer i : currentState) {
                    sum += i;
                }

                // 输出当前的累加和
                out.collect("Current Sum: " + sum);
            }
        }).print();

        // 执行程序
        env.execute("Non-keyed State Example");
    }
}

代码说明:
(1)ListState:使用了 Non-keyed 状态 ListState 来存储每个输入元素的累加值。
(2)open() 方法:在 open() 方法中初始化状态描述符 ListStateDescriptor,并通过 getRuntimeContext() 获取运行时的状态句柄。
(3)flatMap():每次接收到新的元素时,将其添加到 ListState 中,然后遍历状态中的所有元素进行累加。

Non-keyed State 的使用场景
(1)全局统计信息:比如统计整个数据流中的总数、平均值、最大值等,不基于某个键进行分组。
(2)定时任务:可以在 Non-keyed State 中使用定时器管理一些全局的计时或清理任务。
(3)全局连接状态:当处理需要管理全局资源或者连接状态的操作时,Non-keyed State 可以帮助管理这些连接的状态。

和 Keyed State 的区别
Keyed State 是按键(key)管理的状态,在 keyBy() 后,每个键拥有独立的状态。
Non-keyed State 是按并行任务管理的状态,状态不依赖任何键,因此每个并行任务的状态是独立的。

Connected Streams:连接流

Connected Streams 是一种强大的机制,它允许将两个不同类型的流(DataStream)连接在一起,从而能够在流处理中进行复杂的操作。与合并两个相同类型的流不同,Connected Streams 保留了两个流的数据类型,并允许对它们应用不同的处理逻辑。

预定义转换
在这里插入图片描述

连接流
有时想要更灵活地调整转换的某些功能,比如:数据流的阈值、规则或者其他参数。Flink 支持这种需求的模式称为 connected streams ,一个单独的算子有两个输入流。
在这里插入图片描述
connected stream 也可以被用来实现流的关联。

使用场景

(1)控制流和数据流

其中一个流可以用作主数据流,而另一个流作为控制流,控制如何处理主数据流中的记录。

(2)合并不同类型的数据流

可以将两个类型不同的数据流连接起来,同时对它们分别处理。

(3)状态共享

两个流在连接后可以共享状态信息。

使用步骤

(1)创建两个流

可以创建两个不同类型的 DataStream。

(2)连接两个流

使用 connect() 方法连接两个流。

(3)应用 CoProcessFunction 或 CoFlatMapFunction

连接后的流需要使用 CoProcessFunction 或 CoFlatMapFunction 来处理不同流中的元素。

示例(相同类型):
一个控制流是用来指定哪些词需要从 streamOfWords 里过滤掉的。 一个称为 ControlFunction 的 RichCoFlatMapFunction 作用于连接的流来实现这个功能。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env
        .fromElements("DROP", "IGNORE")
        .keyBy(x -> x);

    DataStream<String> streamOfWords = env
        .fromElements("Apache", "DROP", "Flink", "IGNORE")
        .keyBy(x -> x);
  
    control
        .connect(streamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

这里注意两个流只有键一致的时候才能连接。 keyBy 的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。

在这个例子中,两个流都是 DataStream<String> 类型的,并且都将字符串作为键。正如在下面看到的,RichCoFlatMapFunction 在状态中存了一个布尔类型的变量,这个变量被两个流共享。

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;
      
    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext()
            .getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }
      
    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }
      
    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}

RichCoFlatMapFunction 是一种可以被用于一对连接流的 FlatMapFunction,并且它可以调用 rich function 的接口。这意味着它可以是有状态的。

布尔变量 blocked 被用于记录在数据流 control 中出现过的键(在这个例子中是单词),并且这些单词从 streamOfWords 过滤掉。这是 keyed state,并且它是被两个流共享的,这也是为什么两个流必须有相同的键值空间。

在 Flink 运行时中,flatMap1 和 flatMap2 在连接流有新元素到来时被调用 —— 在我们的例子中,control 流中的元素会进入 flatMap1,streamOfWords 中的元素会进入 flatMap2。这是由两个流连接的顺序决定的,本例中为 control.connect(streamOfWords)。

认识到你没法控制 flatMap1 和 flatMap2 的调用顺序是很重要的。这两个输入流是相互竞争的关系,Flink 运行时将根据从一个流或另一个流中消费的事件做它要做的。对于需要保证时间和/或顺序的场景,你会发现在 Flink 的管理状态中缓存事件一直到它们能够被处理是必须的。(注意:如果你真的迫切需要,可以使用自定义的算子实现 InputSelectable 接口,在两输入算子消费它的输入流时增加一些顺序上的限制。)

示例(不相同类型):
**connect()**:用于连接两个流,流的类型可以不同。
CoFlatMapFunction:它提供了两个 flatMap 方法,分别处理连接的两个流中的数据。
flatMap1 和 flatMap2:分别对应两个流中的元素处理逻辑。

import org.apache.flink.api.common.functions.CoFlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class ConnectedStreamsExample {

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建两个数据流,一个是整数流,一个是字符串流
        DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
        DataStream<String> stringStream = env.fromElements("a", "b", "c", "d");

        // 将两个流连接在一起
        ConnectedStreams<Integer, String> connectedStreams = intStream.connect(stringStream);

        // 使用 CoFlatMapFunction 来处理两个流的数据
        DataStream<String> resultStream = connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
            @Override
            public void flatMap1(Integer value, Collector<String> out) {
                out.collect("From int stream: " + value);
            }

            @Override
            public void flatMap2(String value, Collector<String> out) {
                out.collect("From string stream: " + value);
            }
        });

        // 输出结果
        resultStream.print();

        // 执行流处理程序
        env.execute("Flink Connected Streams Example");
    }
    
}

适用的类和接口
CoProcessFunction:适用于状态更复杂的场景,允许在处理每个元素时同时维护和操作状态。
CoFlatMapFunction:更适合简单的元素转换场景。


本文转载自: https://blog.csdn.net/qq_45305209/article/details/142962609
版权归原作者 未禾 所有, 如有侵权,请联系我们删除。

“7.Flink数据管道 & ETL(无状态的转换、Keyed Stream 的聚合、有状态的转换)”的评论:

还没有评论