0


Flink 内容分享(三):Fink原理、实战与性能优化(三)

流处理 & 批处理

在我们深入探讨Flink之前,首先要掌握一些流计算的基础概念。

  • 流处理:流处理主要针对的是数据流,特点是无界、实时,对系统传输的每个数据依次执行操作,一般用于实时统计。在流处理中,数据被视为无限连续的流,并且会尽快地进行处理。Flink在此模型下可以提供秒级甚至毫秒级的延迟,使其成为需要快速反应和决策的场景(例如实时推荐、欺诈检测等)的理想选择。
  • 批处理:批处理,也叫作离线处理,一般用于离线统计。这是一种处理存储在系统中的静态数据集的模型。在批处理中,所有数据都被看作是一个有限集合,处理过程通常在非交互式模式下进行,即作业开始时所有数据都已经可用,作业结束时给出所有计算结果。由于批处理允许对整个数据集进行全面分析,因此它适合于需要长期深度分析的场景(如历史数据分析、大规模ETL任务等)。

事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流。

在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流

无界流Unbounded Streams

无界流有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。

我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。

处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。

有界流Bounded Streams

有界流有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算,有界流所有数据可以被排序,所以并不需要有序摄取。

有界流处理通常被称为批处理。所以在Flink里批计算其实指的就是有界流。

图片

Flink的特点和优势

Flink具有如下特点和优势:

  • 同时支持高吞吐、低延迟、高性能。
  • 支持事件时间(Event Time)概念,结合Watermark处理乱序数据。
  • 支持有状态计算,并且支持多种状态内存、 文件、RocksDB。
  • 支持高度灵活的窗口(Window) 操作 time、 count、 session等。
  • 基于轻量级分布式快照(CheckPoint) 实现的容错保证Exactly- Once语义。
  • 基于JVM实现独立的内存管理。

Flink VS Spark

Spark 和 Flink 在不同的应用领域上表现会有差别。

一般来说,Spark 基于微批处理的方式做同步总有一个“攒批”的过程,所以会有额外开销,因此无法在流处理的低延迟上做到极致。

在低延迟流处理场景,Flink 已经有明显的优势。而在海量数据的批处理领域,Spark 能够处理的吞吐量更大

另外,Spark Streaming中的流计算其实是微批计算,实时性不如Flink,还有一点很重要的是Spark Streaming不适合有状态的计算,得借助一些存储如:Redis,才能实现。而Flink天然支持有状态的计算。

Flink API

Flink 本身提供了多层 API:

图片

  • Stateful Stream Processing :最低级的抽象接口是状态化的数据流接口(stateful streaming)。这个接口是通过 ProcessFunction 集成到 DataStream API 中的。该接口允许用户自由的处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户也可以通过注册 event time 和 processing time 处理回调函数的方法来实现复杂的计算。
  • DataStream/DataSet API: DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
  • Table API: Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁,可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
  • SQL: Flink 提供的最高层级的抽象是 Flink SQL,这一层抽象在语法与表达能力上与 Table API 类似,SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

Dataflows数据流图

所有的 Flink 程序都可以归纳为由三部分构成:

Source

Transformation

Sink

  • Source 表示“源算子”,负责读取数据源。
  • Transformation 表示“转换算子”,利用各种算子进行处理加工。
  • Sink 表示“下沉算子”,负责数据的输出。

Source数据源会源源不断的产生数据,Transformation将产生的数据进行各种业务逻辑的数据处理,最终由Sink输出到外部(console、kafka、redis、DB......)。

所有基于Flink开发的程序都能够映射成一个Dataflows(数据流图):

图片

当Source数据源的数量比较大或计算逻辑相对比较复杂的情况下,需要提高并行度来处理数据,采用并行数据流。

通过设置不同算子的并行度,比如 Source并行度设置为2 ,map也是2。代表会启动2个并行的线程来处理数据:

图片

Job Manager & Task Manager

Flink是一个典型的Master-Slave架构,架构中包含了两个重要角色,分别是「JobManager」和「TaskManager」。

JobManager相当于是Master,TaskManager相当于是Slave。

图片

在Flink中,JobManager负责整个Flink集群任务的调度以及资源的管理。它从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlot资源并命令TaskManager启动从客户端中获取的应用。

TaskManager则负责执行作业流的Task,并且缓存和交换数据流。

在TaskManager中资源调度的最小单位是Task slot。TaskManager中Task slot的数量表示并发处理Task的数量。

一台机器节点可以运行多个TaskManager,TaskManager工作期间会向JobManager发送心跳保持连接

部署 & 运行

部署模式

Flink支持多种部署模式,包括本地模式、Standalone模式、YARN模式、Mesos模式和Kubernetes模式。

  • 本地模式:本地模式是在单个JVM中启动Flink,主要用于开发和测试。它不需要任何集群管理器,但也不能跨多台机器运行。本地模式的优点是部署简单,缺点是不能利用分布式计算的优势。
  • Standalone模式:Standalone模式是在一个独立的集群中运行Flink。它需要手动启动Flink集群,并且需要手动管理资源。Standalone模式的优点是部署简单,可以跨多台机器运行,缺点是需要手动管理资源。
  • YARN模式:在这个模式下,Flink作为YARN的一个应用程序运行在YARN集群中。Flink会从YARN获取所需的资源来运行JobManager和TaskManager。如果你已经有了一个运行Hadoop/YARN的大数据平台,选择这个模式可以方便地利用已有的资源,这是企业中用的比较多的方式。
  • Mesos模式:Mesos是一个更通用的集群管理框架,可以运行非Java应用程序,并具有良好的容错性和伸缩性。Flink在Mesos上的运行方式与在YARN上类似,也是从Mesos请求资源来运行JobManager和TaskManager。
  • Kubernetes模式:Kubernetes是一个开源的容器编排平台。Flink可以作为一组分布式的Docker容器在Kubernetes集群上运行。Kubernetes提供了自动化、扩展和管理应用程序容器的功能,对于云原生应用程序部署非常合适。

每种部署模式都有其优缺点,选择哪种部署模式取决于具体的应用场景和需求。

运行模式

Flink 有三种不同的运行模式:Session、Per-Job 和 Application。

  • Session:在这种模式下,一个 Flink 集群会被启动且会一直运行,直到明确地被终止。用户可以在这个集群中提交多个作业。这个模式适合多个短作业的场景。
  • Per-Job:在这种模式下,对于每个提交的作业,都会启动一个新的 Flink 集群,然后再执行该作业。作业完成后,相应的 Flink 集群也会被终止。这种模式适合长时间运行的作业。
  • Application:这种模式是一种特殊的 Per-Job 模式,它允许用户以反应式的方式与作业进行交互(比如,使用 DataStream API)。这是 Flink 1.11 版本引入的新模式,它结合了Session模式和Per-Job模式的优点。在Application模式下,每个作业都会启动一个独立的Flink集群,但是作业提交快。

以上所述的部署环境可以与任何一种运行模式结合使用。例如,你可以在本地模式、Standalone 模式或 YARN 模式下运行 Session、Per-Job 或 Application 模式的 Flink 作业。

提交和执行作业流程

Flink在不同运行模型下的作业提交和执行流程大致如下:

  • Session 模式:当你的作业完成运行后,该作业的JobManager会被停止,但是Flink集群(包括Dispatcher和其余的TaskManager)仍然处于运行状态,等待新的作业提交。这就是所谓的Session模式,它允许在同一个Flink集群上连续运行多个作业。- 启动Flink集群:在Session模式下,首先需要启动一个运行中的Flink集群。这个集群可以是Standalone Session Cluster,也可以是在Yarn或Kubernetes等资源管理器上的Session Cluster。- 作业提交:然后,用户通过Flink客户端(例如CLI、REST API或Web UI)将作业提交给Flink Dispatcher服务。Dispatcher服务是Flink集群的主要入口点,负责接收和协调作业请求。- 作业解析与优化:一旦Flink Dispatcher接收到作业,它会对作业执行图(JobGraph)进行解析,并使用Flink的优化器对执行图进行优化。- 创建作业执行环境:Dispatcher会为新的作业创建一个JobManager,这个JobManager就是一次作业的执行环境。并且,每个Job都有属于自己的JobManager。- 作业执行:JobManager将优化后的执行图发送到TaskManager节点来执行具体的任务。TaskManager节点包含若干个slot,每个slot可以运行作业图中的一个并行操作。- 结果和状态管理:作业执行过程中,输出结果被发送回JobManager,并提供给用户。同时,作业的状态也由JobManager管理,以支持故障恢复。
  • Per-Job 模式:在Per-Job模式下,每个作业都有自己的资源隔离,互不干扰,资源利用率较高,但是如果作业数量大,则可能会因为每个作业都需要单独申请、释放资源导致效率较低。- 用户通过命令行或者UI将程序包含所有依赖提交到Flink集群。- Flink Master节点接收到用户提交的作业后,会启动一个新的JobManager来负责这个作业的资源管理与任务调度。- JobManager通过ResourceManager向Flink集群请求所需的TaskManager资源。- ResourceManager分配TaskManager给JobManager,并启动TaskManager进程。- TaskManager向JobManager注册并提供自己的状态及可用的slot信息。- JobManager根据程序的DAG图计算出ExecutionGraph,然后按照stages将相应的tasks分配到TaskManager的Slots中去执行。- 如果作业执行完毕或执行失败,JobManager会释放所有资源,并将结果返回给用户。
  • Application 模式:- 构建Flink Job:客户端或者用户在本地环境上构建Flink作业。- 提交Flink Job:通过Flink命令行工具或者Web UI,将序列化后的JobGraph提交到Flink集群。也可以通过REST API直接提交作业。- JobManager接收Job:JobManager是Flink中负责任务调度和协调的组件。它会接收到提交的JobGraph,并将其封装成ExecutionGraph。- 任务调度:JobManager会根据ExecutionGraph对任务进行调度,决定何时启动任务,以及哪个TaskManager上启动任务。- 任务执行:TaskManager接收到JobManager分配的任务后开始执行。每个TaskManager包含一到多个Slot,这些Slot用于运行任务。- 状态反馈:TaskManager在执行任务过程中会将状态信息(如进度、日志等)反馈给JobManager。- 结果返回:当所有任务执行完成后,JobManager会将执行结果返回给客户端。

配置开发环境

每个 Flink 应用都需要依赖一组 Flink 类库。Flink 应用至少需要依赖 Flink APIs。许多应用还会额外依赖连接器类库(比如 Kafka、Cassandra 等)。 当用户运行 Flink 应用时(无论是在 IDEA 环境下进行测试,还是部署在分布式环境下),运行时类库都必须可用。

开发工具:IntelliJ IDEA

以Java语言为例,配置开发Maven依赖:

    <properties>
        <flink.version>1.13.6</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

注意点

  • 如果要将程序打包提交到集群运行,打包的时候不需要包含这些依赖,因为集群环境已经包含了这些依赖,此时依赖的作用域应该设置为provided
  • Flink 应用在 IntelliJ IDEA 中运行,这些 Flink 核心依赖的作用域需要设置为 compile 而不是 provided 。 否则 IntelliJ 不会添加这些依赖到 classpath,会导致应用运行时抛出 NoClassDefFountError 异常。

添加打包插件:

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

WordCount程序

配置好开发环境之后我们来写一个简单的Flink程序。

需求:统计单词出现的次数

public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为 1
        env.setParallelism(1);
        // 构建输入数据流
        DataStream<String> text = env.fromElements(
                "Hello World",
                "Hello Flink",
                "Hello Java");
        // 对输入数据进行操作,包括分割、映射和聚合
        DataStream<Tuple2<String, Integer>> counts = text
                .flatMap(new Tokenizer())
                //keyBy(0) 操作按照元组的第一个字段(索引为0)进行分组。在这个例子中,这就表示根据每个单词(字符串)进行分组。         
                .keyBy(0)
                //sum(1) 是一个聚合操作,它对每个分组内的元素进行求和。在这个例子中,对元组的第二个字段(索引为1)进行求和,表示每个单词的出现次数。
                .sum(1);
        // 输出结果
        counts.print();
        // 执行任务
        env.execute("Flink Streaming Java WordCount");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // 规范化并分割行
            String[] words = value.toLowerCase().split("\\W+");
            // 为每个单词发出 Tuple2<String, Integer>(word, 1)
            for (String word : words) {
                if (word.length() > 0) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }

输出如下:

(hello,1)
(world,1)
(hello,2)
(flink,1)
(hello,3)
(java,1)

对代码简要解析一下:

这是一个基本的单词计数程序,它使用Apache Flink的流处理环境。这个程序读入一系列的字符串,然后把每个字符串分割成单词,对每个单词进行计数,并且输出计数结果。

在提供的例子中,有三个输入字符串:"Hello World", "Hello Flink", "Hello Java",'Hello'这个单词出现了三次,其余单词 ('World', 'Flink', 'Java') 各出现了一次。

由于Flink是一个流处理框架,它实时地处理数据,所以它会在每一次遇到一个新的单词时就更新和输出计数。因此,每当 'Hello' 出现时,都会更新和输出其计数。

对于这个例子:

  • 首先遇到 'Hello' 和 'World',所以输出 (hello,1) 和 (world,1)。
  • 然后再次遇到 'Hello' 和第一次遇到 'Flink',所以输出 (hello,2) 和 (flink,1)。
  • 最后再次遇到 'Hello' 和第一次遇到 'Java',所以输出 (hello,3) 和 (java,1)。

这段代码已在本地运行和测试过,且相关部分已添加注释,大家可以实际运行感受一下。

并行度

特定算子的子任务(subtask)的个数称之为并行度(parallel),并行度是几,这个task内部就有几个subtask

怎样实现算子并行呢?其实也很简单,我们把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。

整个流处理程序的并行度,理论上是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量

如果我们将上面WordCount程序的并行度设置为3

env.setParallelism(3);

就会看到如下输出:

2> (world,1)
3> (flink,1)
1> (hello,1)
1> (hello,2)
1> (java,1)
1> (hello,3)

前面的数字代表线程,Flink会将相同的 key 分配到不同的 slot 进行处理。

并行度设置

在 Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。

代码中设置

  • 我们在代码中,可以很简单地在算子后跟着调用 setParallelism()方法,来设置当前算子的并行度: stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);这种方式设置的并行度,只针对当前算子有效。
  • 我们也可以直接调用执行环境的 setParallelism()方法,全局设定并行度:env.setParallelism(2);这样代码中所有算子,默认的并行度就都为2了。

提交应用时设置

在使用 flink run 命令提交应用时,可以增加

-p

参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置。如果我们直接在 Web UI 上提交作业,也可以在对应输入框中直接添加并行度。

配置文件中设置

我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:

parallelism.default: 2

(初始值为 1)

这个设置对于整个集群上提交的所有作业有效。

并行度生效优先级

  1. 对于一个算子,首先看在代码中是否单独指定了它的并行度,这个特定的设置优先级最高,会覆盖后面所有的设置。
  2. 如果没有单独设置,那么采用当前代码中执行环境全局设置的并行度。
  3. 如果代码中完全没有设置,那么采用提交时-p 参数指定的并行度。
  4. 如果提交时也未指定-p 参数,那么采用集群配置文件中的默认并行度。

这里需要说明的是,算子的并行度有时会受到自身具体实现的影响。比如读取 socket 文本流的算子 socketTextStream,它本身就是非并行的 Source 算子,所以无论怎么设置,它在运行时的并行度都是 1

Task

在 Flink 中,Task 是一个阶段多个功能相同 subTask 的集合,Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起形成 task。每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。

要是之前学过Spark,这里可以用Spark的思想来看,Flink的Task就好比Spark中的Stage,而我们知道Spark的Stage是根据宽依赖来拆分的。所以我们也可以认为Flink的Task也是根据宽依赖拆分的(尽管Flink中并没有宽依赖的概念),这样会更好理解

如下图:

图片

Operator Chain(算子链)

在Flink中,为了分布式执行,Flink会将算子子任务链接在一起形成任务。每个任务由一个线程执行。将算子链接在一起形成任务是一种有用的优化:它减少了线程间切换和缓冲的开销,并增加了整体吞吐量,同时降低了延迟

举个例子,假设我们有一个简单的Flink流处理程序,它从一个源读取数据,然后应用

map

filter

操作,最后将结果写入到一个接收器。这个程序可能看起来像这样:

DataStream<String> data = env.addSource(new CustomSource());
data.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
        return value.toUpperCase();
    }
})
.filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String value) throws Exception {
        return value.startsWith("A");
    }
})
.addSink(new CustomSink());

**在这个例子中,

map

filter

操作可以被链接在一起形成一个任务,被优化为算子链,这意味着它们将在同一个线程中执行,而不是在不同的线程中执行并通过网络进行数据传输**

Task Slots

Task Slots即是任务槽,slot 在 Flink 里面可以认为是资源组,Flink 将每个任务分成子任务并且将这些子任务分配到 slot 来并行执行程序,我们可以通过集群的配置文件来设定 TaskManager 的 slot 数量:

taskmanager.numberOfTaskSlots : 8

例如,如果 Task Manager 有2个 slot,那么它将为每个 slot 分配 50% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。

需要注意的是,slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。这也是开发环境默认并行度设为机器 CPU 数量的原因

分发规则

  • 不同的Task下的subtask要分发到同一个TaskSlot中,降低数据传输、提高执行效率
  • 相同的Task下的subtask要分发到不同的TaskSlot

Slot共享组

如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,在Flink中,可以通过在代码中使用

slotSharingGroup

方法来设置slot共享组。

Flink会将具有相同slot共享组的操作放入同一个slot中,同时保持不具有slot共享组的操作在其他slot中。这可以用来隔离slot。

例如,你可以这样设置:

dataStream.map(...).slotSharingGroup("group1");

默认情况下,所有操作都被分配相同的SlotSharingGroup。

这样,只有属于同一个 slot 共享组的子任务,才会开启 slot 共享,不同组之间的任务是完全隔离的,必须分配到不同的 slot 上。

并行度和Slots解释

听了上面并行度和Slots的理论,可能还是有点疑惑,下面通过例子解释说明下:

假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示最多能并行执行9个任务。

假设我们写了一个WordCount程序,有四个转换算子:source —> flatMap —> reduce —> sink

当所有算子并行度相同时,很容易看出source和flatMap可以优化合并算子链,于是最终有三个任务节点:source & flatMap,reduce 和sink。 如果我们没有任何并行度设置,而配置文件中默认

parallelism.default:1

,那么默认并行度为1,总共有3个任务。由于不同算子的任务可以共享任务槽,所以最终占用的slot只有1个。9个slot只用了1个,有8个空闲

如图所示:

图片

我们可以直接把并行度设置为 9,这样所有 3*9=27 个任务就会完全占用 9 个 slot。这是当前集群资源下能执行的最大并行度,计算资源得到了充分的利用。

另外再考虑对于某个算子单独设置并行度的场景。例如,如果我们考虑到输出可能是写入文件,那会希望不要并行写入多个文件,就需要设置 sink 算子的并行度为 1。这时其他的算子并行度依然为 9,所以总共会有 19 个子任务。

根据 slot 共享的原则,它们最终还是会占用全部的 9 个 slot,而 sink 任务只在其中一个 slot 上执行,通过这个例子也可以明确地看到,整个流处理程序的并行度,就应该是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量

DataSource数据源

Flink内嵌支持的数据源非常多,比如HDFS、Socket、Kafka、Collections。Flink也提供了addSource方式,可以自定义数据源,下面介绍一些常用的数据源。

File Source

  • 通过读取本地、HDFS文件创建一个数据源。

如果读取的是HDFS上的文件,那么需要导入Hadoop依赖

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.1</version>
        </dependency>

代码示例:每隔10s去读取HDFS指定目录下的新增文件内容,并且进行WordCount。

public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String filePath = "hdfs://node01:9000/flink/data/";
        FileInputFormat<String> textInputFormat = new TextInputFormat(new Path(filePath));
        //PROCESS_CONTINUOUSLY模式时,Flink会持续监视给定的路径,并在发现新数据时将其引入流中进行处理。
        DataStream<String> textStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10000);
        DataStream<Tuple2<String, Integer>> result = textStream
                .flatMap(new WordSplitter())
                .map(new WordMapper())
                .keyBy(0)
                .sum(1);
        result.print();
        env.execute();
    }

    public static class WordSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] words = value.split(" ");
            for (String word : words) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
    public static class WordMapper implements MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(Tuple2<String, Integer> wordCountTuple) {
            //f0, f1 等是用来访问元组中的元素的字段。Tuple2<String, Integer> 表示这是一个大小为 2 的元组,其中 f0 是 String 类型,f1 是 Integer 类型。
            // 在代码中,wordCountTuple.f0 表示的就是单词(即String类型的值),wordCountTuple.f1 则表示的是这个单词的计数(即 Integer 类型的值)。
            return new Tuple2<>(wordCountTuple.f0, wordCountTuple.f1);
        }
    }

Collection Source

基于本地集合的数据源,一般用于测试场景,对于线上环境没有太大意义。

public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<String> data = Arrays.asList("hello word flink","hello java flink");
        DataStream<String> text = env.fromCollection(data);
        DataStream<Tuple2<String, Integer>> counts = text
                .flatMap(new Tokenizer())
                .keyBy(0)
                .sum(1);
        counts.print();
        env.execute("WordCount Example");
    }

Socket Source

接受Socket Server中的数据:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 连接 socket 获取输入数据
        DataStream<String> text = env.socketTextStream("localhost", 9999);
        // 解析数据、对数据进行分组、窗口处理和聚合计算
        DataStream<Tuple2<String, Integer>> wordCount = text.flatMap(new Tokenizer())
                .keyBy(0)
                .sum(1);
        wordCount.print();
        env.execute("WordCount from Socket TextStream Example");
    }

Kafka Source

Flink想要接受Kafka中的数据,首先要配置flink与kafka的连接器依赖。

Maven依赖:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.6</version>
</dependency>

示例代码:

public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置Kafka的相关参数并从Kafka中读取数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("topic_name", new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(flinkKafkaConsumer);
        // 对接收的每一行数据进行处理,分割出每个单词并初始化其数量为1
        DataStream<Tuple2<String, Integer>> words = stream.flatMap(new Tokenizer());
        DataStream<Tuple2<String, Integer>> wordCounts = words.keyBy(0).sum(1);
        wordCounts.print().setParallelism(1);
        env.execute("WordCountFromKafka");
    }
标签: flink 大数据

本文转载自: https://blog.csdn.net/qq_45038038/article/details/135253126
版权归原作者 之乎者也· 所有, 如有侵权,请联系我们删除。

“Flink 内容分享(三):Fink原理、实战与性能优化(三)”的评论:

还没有评论