流处理 & 批处理
在我们深入探讨Flink之前,首先要掌握一些流计算的基础概念。
- 流处理:流处理主要针对的是数据流,特点是无界、实时,对系统传输的每个数据依次执行操作,一般用于实时统计。在流处理中,数据被视为无限连续的流,并且会尽快地进行处理。Flink在此模型下可以提供秒级甚至毫秒级的延迟,使其成为需要快速反应和决策的场景(例如实时推荐、欺诈检测等)的理想选择。
- 批处理:批处理,也叫作离线处理,一般用于离线统计。这是一种处理存储在系统中的静态数据集的模型。在批处理中,所有数据都被看作是一个有限集合,处理过程通常在非交互式模式下进行,即作业开始时所有数据都已经可用,作业结束时给出所有计算结果。由于批处理允许对整个数据集进行全面分析,因此它适合于需要长期深度分析的场景(如历史数据分析、大规模ETL任务等)。
事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流。
在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流
无界流Unbounded Streams
无界流有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。
我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。
处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
有界流Bounded Streams
有界流有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算,有界流所有数据可以被排序,所以并不需要有序摄取。
有界流处理通常被称为批处理。所以在Flink里批计算其实指的就是有界流。
Flink的特点和优势
Flink具有如下特点和优势:
- 同时支持高吞吐、低延迟、高性能。
- 支持事件时间(Event Time)概念,结合Watermark处理乱序数据。
- 支持有状态计算,并且支持多种状态内存、 文件、RocksDB。
- 支持高度灵活的窗口(Window) 操作 time、 count、 session等。
- 基于轻量级分布式快照(CheckPoint) 实现的容错保证Exactly- Once语义。
- 基于JVM实现独立的内存管理。
Flink VS Spark
Spark 和 Flink 在不同的应用领域上表现会有差别。
一般来说,Spark 基于微批处理的方式做同步总有一个“攒批”的过程,所以会有额外开销,因此无法在流处理的低延迟上做到极致。
在低延迟流处理场景,Flink 已经有明显的优势。而在海量数据的批处理领域,Spark 能够处理的吞吐量更大
另外,Spark Streaming中的流计算其实是微批计算,实时性不如Flink,还有一点很重要的是Spark Streaming不适合有状态的计算,得借助一些存储如:Redis,才能实现。而Flink天然支持有状态的计算。
Flink API
Flink 本身提供了多层 API:
- Stateful Stream Processing :最低级的抽象接口是状态化的数据流接口(stateful streaming)。这个接口是通过 ProcessFunction 集成到 DataStream API 中的。该接口允许用户自由的处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户也可以通过注册 event time 和 processing time 处理回调函数的方法来实现复杂的计算。
- DataStream/DataSet API: DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
- Table API: Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁,可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
- SQL: Flink 提供的最高层级的抽象是 Flink SQL,这一层抽象在语法与表达能力上与 Table API 类似,SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。
Dataflows数据流图
所有的 Flink 程序都可以归纳为由三部分构成:
Source
、
Transformation
和
Sink
。
- Source 表示“源算子”,负责读取数据源。
- Transformation 表示“转换算子”,利用各种算子进行处理加工。
- Sink 表示“下沉算子”,负责数据的输出。
Source数据源会源源不断的产生数据,Transformation将产生的数据进行各种业务逻辑的数据处理,最终由Sink输出到外部(console、kafka、redis、DB......)。
所有基于Flink开发的程序都能够映射成一个Dataflows(数据流图):
当Source数据源的数量比较大或计算逻辑相对比较复杂的情况下,需要提高并行度来处理数据,采用并行数据流。
通过设置不同算子的并行度,比如 Source并行度设置为2 ,map也是2。代表会启动2个并行的线程来处理数据:
Job Manager & Task Manager
Flink是一个典型的Master-Slave架构,架构中包含了两个重要角色,分别是「JobManager」和「TaskManager」。
JobManager相当于是Master,TaskManager相当于是Slave。
在Flink中,JobManager负责整个Flink集群任务的调度以及资源的管理。它从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlot资源并命令TaskManager启动从客户端中获取的应用。
TaskManager则负责执行作业流的Task,并且缓存和交换数据流。
在TaskManager中资源调度的最小单位是Task slot。TaskManager中Task slot的数量表示并发处理Task的数量。
一台机器节点可以运行多个TaskManager,TaskManager工作期间会向JobManager发送心跳保持连接
部署 & 运行
部署模式
Flink支持多种部署模式,包括本地模式、Standalone模式、YARN模式、Mesos模式和Kubernetes模式。
- 本地模式:本地模式是在单个JVM中启动Flink,主要用于开发和测试。它不需要任何集群管理器,但也不能跨多台机器运行。本地模式的优点是部署简单,缺点是不能利用分布式计算的优势。
- Standalone模式:Standalone模式是在一个独立的集群中运行Flink。它需要手动启动Flink集群,并且需要手动管理资源。Standalone模式的优点是部署简单,可以跨多台机器运行,缺点是需要手动管理资源。
- YARN模式:在这个模式下,Flink作为YARN的一个应用程序运行在YARN集群中。Flink会从YARN获取所需的资源来运行JobManager和TaskManager。如果你已经有了一个运行Hadoop/YARN的大数据平台,选择这个模式可以方便地利用已有的资源,这是企业中用的比较多的方式。
- Mesos模式:Mesos是一个更通用的集群管理框架,可以运行非Java应用程序,并具有良好的容错性和伸缩性。Flink在Mesos上的运行方式与在YARN上类似,也是从Mesos请求资源来运行JobManager和TaskManager。
- Kubernetes模式:Kubernetes是一个开源的容器编排平台。Flink可以作为一组分布式的Docker容器在Kubernetes集群上运行。Kubernetes提供了自动化、扩展和管理应用程序容器的功能,对于云原生应用程序部署非常合适。
每种部署模式都有其优缺点,选择哪种部署模式取决于具体的应用场景和需求。
运行模式
Flink 有三种不同的运行模式:Session、Per-Job 和 Application。
- Session:在这种模式下,一个 Flink 集群会被启动且会一直运行,直到明确地被终止。用户可以在这个集群中提交多个作业。这个模式适合多个短作业的场景。
- Per-Job:在这种模式下,对于每个提交的作业,都会启动一个新的 Flink 集群,然后再执行该作业。作业完成后,相应的 Flink 集群也会被终止。这种模式适合长时间运行的作业。
- Application:这种模式是一种特殊的 Per-Job 模式,它允许用户以反应式的方式与作业进行交互(比如,使用 DataStream API)。这是 Flink 1.11 版本引入的新模式,它结合了Session模式和Per-Job模式的优点。在Application模式下,每个作业都会启动一个独立的Flink集群,但是作业提交快。
以上所述的部署环境可以与任何一种运行模式结合使用。例如,你可以在本地模式、Standalone 模式或 YARN 模式下运行 Session、Per-Job 或 Application 模式的 Flink 作业。
提交和执行作业流程
Flink在不同运行模型下的作业提交和执行流程大致如下:
- Session 模式:当你的作业完成运行后,该作业的JobManager会被停止,但是Flink集群(包括Dispatcher和其余的TaskManager)仍然处于运行状态,等待新的作业提交。这就是所谓的Session模式,它允许在同一个Flink集群上连续运行多个作业。- 启动Flink集群:在Session模式下,首先需要启动一个运行中的Flink集群。这个集群可以是Standalone Session Cluster,也可以是在Yarn或Kubernetes等资源管理器上的Session Cluster。- 作业提交:然后,用户通过Flink客户端(例如CLI、REST API或Web UI)将作业提交给Flink Dispatcher服务。Dispatcher服务是Flink集群的主要入口点,负责接收和协调作业请求。- 作业解析与优化:一旦Flink Dispatcher接收到作业,它会对作业执行图(JobGraph)进行解析,并使用Flink的优化器对执行图进行优化。- 创建作业执行环境:Dispatcher会为新的作业创建一个JobManager,这个JobManager就是一次作业的执行环境。并且,每个Job都有属于自己的JobManager。- 作业执行:JobManager将优化后的执行图发送到TaskManager节点来执行具体的任务。TaskManager节点包含若干个slot,每个slot可以运行作业图中的一个并行操作。- 结果和状态管理:作业执行过程中,输出结果被发送回JobManager,并提供给用户。同时,作业的状态也由JobManager管理,以支持故障恢复。
- Per-Job 模式:在Per-Job模式下,每个作业都有自己的资源隔离,互不干扰,资源利用率较高,但是如果作业数量大,则可能会因为每个作业都需要单独申请、释放资源导致效率较低。- 用户通过命令行或者UI将程序包含所有依赖提交到Flink集群。- Flink Master节点接收到用户提交的作业后,会启动一个新的JobManager来负责这个作业的资源管理与任务调度。- JobManager通过ResourceManager向Flink集群请求所需的TaskManager资源。- ResourceManager分配TaskManager给JobManager,并启动TaskManager进程。- TaskManager向JobManager注册并提供自己的状态及可用的slot信息。- JobManager根据程序的DAG图计算出ExecutionGraph,然后按照stages将相应的tasks分配到TaskManager的Slots中去执行。- 如果作业执行完毕或执行失败,JobManager会释放所有资源,并将结果返回给用户。
- Application 模式:- 构建Flink Job:客户端或者用户在本地环境上构建Flink作业。- 提交Flink Job:通过Flink命令行工具或者Web UI,将序列化后的JobGraph提交到Flink集群。也可以通过REST API直接提交作业。- JobManager接收Job:JobManager是Flink中负责任务调度和协调的组件。它会接收到提交的JobGraph,并将其封装成ExecutionGraph。- 任务调度:JobManager会根据ExecutionGraph对任务进行调度,决定何时启动任务,以及哪个TaskManager上启动任务。- 任务执行:TaskManager接收到JobManager分配的任务后开始执行。每个TaskManager包含一到多个Slot,这些Slot用于运行任务。- 状态反馈:TaskManager在执行任务过程中会将状态信息(如进度、日志等)反馈给JobManager。- 结果返回:当所有任务执行完成后,JobManager会将执行结果返回给客户端。
配置开发环境
每个 Flink 应用都需要依赖一组 Flink 类库。Flink 应用至少需要依赖 Flink APIs。许多应用还会额外依赖连接器类库(比如 Kafka、Cassandra 等)。 当用户运行 Flink 应用时(无论是在 IDEA 环境下进行测试,还是部署在分布式环境下),运行时类库都必须可用。
开发工具:IntelliJ IDEA
以Java语言为例,配置开发Maven依赖:
<properties>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
注意点:
- 如果要将程序打包提交到集群运行,打包的时候不需要包含这些依赖,因为集群环境已经包含了这些依赖,此时依赖的作用域应该设置为
provided
。 - Flink 应用在 IntelliJ IDEA 中运行,这些 Flink 核心依赖的作用域需要设置为
compile
而不是provided
。 否则 IntelliJ 不会添加这些依赖到 classpath,会导致应用运行时抛出NoClassDefFountError
异常。
添加打包插件:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
WordCount程序
配置好开发环境之后我们来写一个简单的Flink程序。
需求:统计单词出现的次数
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为 1
env.setParallelism(1);
// 构建输入数据流
DataStream<String> text = env.fromElements(
"Hello World",
"Hello Flink",
"Hello Java");
// 对输入数据进行操作,包括分割、映射和聚合
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
//keyBy(0) 操作按照元组的第一个字段(索引为0)进行分组。在这个例子中,这就表示根据每个单词(字符串)进行分组。
.keyBy(0)
//sum(1) 是一个聚合操作,它对每个分组内的元素进行求和。在这个例子中,对元组的第二个字段(索引为1)进行求和,表示每个单词的出现次数。
.sum(1);
// 输出结果
counts.print();
// 执行任务
env.execute("Flink Streaming Java WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 规范化并分割行
String[] words = value.toLowerCase().split("\\W+");
// 为每个单词发出 Tuple2<String, Integer>(word, 1)
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
输出如下:
(hello,1)
(world,1)
(hello,2)
(flink,1)
(hello,3)
(java,1)
对代码简要解析一下:
这是一个基本的单词计数程序,它使用Apache Flink的流处理环境。这个程序读入一系列的字符串,然后把每个字符串分割成单词,对每个单词进行计数,并且输出计数结果。
在提供的例子中,有三个输入字符串:"Hello World", "Hello Flink", "Hello Java",'Hello'这个单词出现了三次,其余单词 ('World', 'Flink', 'Java') 各出现了一次。
由于Flink是一个流处理框架,它实时地处理数据,所以它会在每一次遇到一个新的单词时就更新和输出计数。因此,每当 'Hello' 出现时,都会更新和输出其计数。
对于这个例子:
- 首先遇到 'Hello' 和 'World',所以输出 (hello,1) 和 (world,1)。
- 然后再次遇到 'Hello' 和第一次遇到 'Flink',所以输出 (hello,2) 和 (flink,1)。
- 最后再次遇到 'Hello' 和第一次遇到 'Java',所以输出 (hello,3) 和 (java,1)。
这段代码已在本地运行和测试过,且相关部分已添加注释,大家可以实际运行感受一下。
并行度
特定算子的子任务(subtask)的个数称之为并行度(parallel),并行度是几,这个task内部就有几个subtask
怎样实现算子并行呢?其实也很简单,我们把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。
整个流处理程序的并行度,理论上是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量
如果我们将上面WordCount程序的并行度设置为3
env.setParallelism(3);
就会看到如下输出:
2> (world,1)
3> (flink,1)
1> (hello,1)
1> (hello,2)
1> (java,1)
1> (hello,3)
前面的数字代表线程,Flink会将相同的 key 分配到不同的 slot 进行处理。
并行度设置
在 Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。
代码中设置
- 我们在代码中,可以很简单地在算子后跟着调用
setParallelism()
方法,来设置当前算子的并行度:stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
这种方式设置的并行度,只针对当前算子有效。 - 我们也可以直接调用执行环境的
setParallelism()
方法,全局设定并行度:env.setParallelism(2);
这样代码中所有算子,默认的并行度就都为2了。
提交应用时设置
在使用 flink run 命令提交应用时,可以增加
-p
参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置。如果我们直接在 Web UI 上提交作业,也可以在对应输入框中直接添加并行度。
配置文件中设置
我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:
parallelism.default: 2
(初始值为 1)
这个设置对于整个集群上提交的所有作业有效。
并行度生效优先级
- 对于一个算子,首先看在代码中是否单独指定了它的并行度,这个特定的设置优先级最高,会覆盖后面所有的设置。
- 如果没有单独设置,那么采用当前代码中执行环境全局设置的并行度。
- 如果代码中完全没有设置,那么采用提交时
-p
参数指定的并行度。 - 如果提交时也未指定
-p
参数,那么采用集群配置文件中的默认并行度。
这里需要说明的是,算子的并行度有时会受到自身具体实现的影响。比如读取 socket 文本流的算子 socketTextStream,它本身就是非并行的 Source 算子,所以无论怎么设置,它在运行时的并行度都是 1
Task
在 Flink 中,Task 是一个阶段多个功能相同 subTask 的集合,Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起形成 task。每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
要是之前学过Spark,这里可以用Spark的思想来看,Flink的Task就好比Spark中的Stage,而我们知道Spark的Stage是根据宽依赖来拆分的。所以我们也可以认为Flink的Task也是根据宽依赖拆分的(尽管Flink中并没有宽依赖的概念),这样会更好理解
如下图:
Operator Chain(算子链)
在Flink中,为了分布式执行,Flink会将算子子任务链接在一起形成任务。每个任务由一个线程执行。将算子链接在一起形成任务是一种有用的优化:它减少了线程间切换和缓冲的开销,并增加了整体吞吐量,同时降低了延迟
举个例子,假设我们有一个简单的Flink流处理程序,它从一个源读取数据,然后应用
map
和
filter
操作,最后将结果写入到一个接收器。这个程序可能看起来像这样:
DataStream<String> data = env.addSource(new CustomSource());
data.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
})
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("A");
}
})
.addSink(new CustomSink());
**在这个例子中,
map
和
filter
操作可以被链接在一起形成一个任务,被优化为算子链,这意味着它们将在同一个线程中执行,而不是在不同的线程中执行并通过网络进行数据传输**
Task Slots
Task Slots即是任务槽,slot 在 Flink 里面可以认为是资源组,Flink 将每个任务分成子任务并且将这些子任务分配到 slot 来并行执行程序,我们可以通过集群的配置文件来设定 TaskManager 的 slot 数量:
taskmanager.numberOfTaskSlots : 8
。
例如,如果 Task Manager 有2个 slot,那么它将为每个 slot 分配 50% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。
需要注意的是,slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。这也是开发环境默认并行度设为机器 CPU 数量的原因
分发规则
- 不同的Task下的subtask要分发到同一个TaskSlot中,降低数据传输、提高执行效率
- 相同的Task下的subtask要分发到不同的TaskSlot
Slot共享组
如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,在Flink中,可以通过在代码中使用
slotSharingGroup
方法来设置slot共享组。
Flink会将具有相同slot共享组的操作放入同一个slot中,同时保持不具有slot共享组的操作在其他slot中。这可以用来隔离slot。
例如,你可以这样设置:
dataStream.map(...).slotSharingGroup("group1");
默认情况下,所有操作都被分配相同的SlotSharingGroup。
这样,只有属于同一个 slot 共享组的子任务,才会开启 slot 共享,不同组之间的任务是完全隔离的,必须分配到不同的 slot 上。
并行度和Slots解释
听了上面并行度和Slots的理论,可能还是有点疑惑,下面通过例子解释说明下:
假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示最多能并行执行9个任务。
假设我们写了一个WordCount程序,有四个转换算子:source —> flatMap —> reduce —> sink
当所有算子并行度相同时,很容易看出source和flatMap可以优化合并算子链,于是最终有三个任务节点:source & flatMap,reduce 和sink。 如果我们没有任何并行度设置,而配置文件中默认
parallelism.default:1
,那么默认并行度为1,总共有3个任务。由于不同算子的任务可以共享任务槽,所以最终占用的slot只有1个。9个slot只用了1个,有8个空闲
如图所示:
我们可以直接把并行度设置为 9,这样所有 3*9=27 个任务就会完全占用 9 个 slot。这是当前集群资源下能执行的最大并行度,计算资源得到了充分的利用。
另外再考虑对于某个算子单独设置并行度的场景。例如,如果我们考虑到输出可能是写入文件,那会希望不要并行写入多个文件,就需要设置 sink 算子的并行度为 1。这时其他的算子并行度依然为 9,所以总共会有 19 个子任务。
根据 slot 共享的原则,它们最终还是会占用全部的 9 个 slot,而 sink 任务只在其中一个 slot 上执行,通过这个例子也可以明确地看到,整个流处理程序的并行度,就应该是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量
DataSource数据源
Flink内嵌支持的数据源非常多,比如HDFS、Socket、Kafka、Collections。Flink也提供了addSource方式,可以自定义数据源,下面介绍一些常用的数据源。
File Source
- 通过读取本地、HDFS文件创建一个数据源。
如果读取的是HDFS上的文件,那么需要导入Hadoop依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
</dependency>
代码示例:每隔10s去读取HDFS指定目录下的新增文件内容,并且进行WordCount。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String filePath = "hdfs://node01:9000/flink/data/";
FileInputFormat<String> textInputFormat = new TextInputFormat(new Path(filePath));
//PROCESS_CONTINUOUSLY模式时,Flink会持续监视给定的路径,并在发现新数据时将其引入流中进行处理。
DataStream<String> textStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10000);
DataStream<Tuple2<String, Integer>> result = textStream
.flatMap(new WordSplitter())
.map(new WordMapper())
.keyBy(0)
.sum(1);
result.print();
env.execute();
}
public static class WordSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
public static class WordMapper implements MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> wordCountTuple) {
//f0, f1 等是用来访问元组中的元素的字段。Tuple2<String, Integer> 表示这是一个大小为 2 的元组,其中 f0 是 String 类型,f1 是 Integer 类型。
// 在代码中,wordCountTuple.f0 表示的就是单词(即String类型的值),wordCountTuple.f1 则表示的是这个单词的计数(即 Integer 类型的值)。
return new Tuple2<>(wordCountTuple.f0, wordCountTuple.f1);
}
}
Collection Source
基于本地集合的数据源,一般用于测试场景,对于线上环境没有太大意义。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<String> data = Arrays.asList("hello word flink","hello java flink");
DataStream<String> text = env.fromCollection(data);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
counts.print();
env.execute("WordCount Example");
}
Socket Source
接受Socket Server中的数据:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 连接 socket 获取输入数据
DataStream<String> text = env.socketTextStream("localhost", 9999);
// 解析数据、对数据进行分组、窗口处理和聚合计算
DataStream<Tuple2<String, Integer>> wordCount = text.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
wordCount.print();
env.execute("WordCount from Socket TextStream Example");
}
Kafka Source
Flink想要接受Kafka中的数据,首先要配置flink与kafka的连接器依赖。
Maven依赖:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.6</version>
</dependency>
示例代码:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kafka的相关参数并从Kafka中读取数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("topic_name", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(flinkKafkaConsumer);
// 对接收的每一行数据进行处理,分割出每个单词并初始化其数量为1
DataStream<Tuple2<String, Integer>> words = stream.flatMap(new Tokenizer());
DataStream<Tuple2<String, Integer>> wordCounts = words.keyBy(0).sum(1);
wordCounts.print().setParallelism(1);
env.execute("WordCountFromKafka");
}
版权归原作者 之乎者也· 所有, 如有侵权,请联系我们删除。