创建执行环境
编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我 们 要 获 取 的 执 行 环 境 , 是
StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的
方式,就是调用这个类的静态方法,具体有以下三种。
**1. getExecutionEnvironment **
最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文
直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar
包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方
法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式。
**2. createLocalEnvironment **
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果
不传入,则默认并行度就是本地的 CPU 核心数。
StreamExecutionEnvironment localEnv =
StreamExecutionEnvironment.createLocalEnvironment();
**3. createRemoteEnvironment **
这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定
要在集群中运行的 Jar 包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager 主机名
1234, // JobManager 进程端口号
"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
);
在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程
序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。关于时间语义和容错
机制,我们会在后续的章节介绍。
执行模式(Execution Mode)
上节中我们获取到的执行环境,是一个 StreamExecutionEnvironment,顾名思义它应该是
做流处理的。那对于批处理,又应该怎么获取执行环境呢?
在之前的 Flink 版本中,批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment
的静态方法,返回它的对象:
// 批处理环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
基于 ExecutionEnvironment 读入数据创建的数据集合,就是 DataSet;对应的调用的一整
套转换方法,就是 DataSet API。这些我们在第二章的批处理 word count 程序中已经有了基本
了解。
而从 1.12.0 版本起,Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特
性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序
在流处理和批处理之间切换。这样一来,DataSet API 也就没有存在的必要了。
**流执行模式(STREAMING) **
这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式。
** 批执行模式(BATCH) **
专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。
对于不会持续计算的有界数据,我们用这种模式处理会更方便。
**自动模式(AUTOMATIC) **
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
**1. BATCH ****模式的配置方法 **
由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。
主要有两种方式:
(1)通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。
(2)通过代码配置
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代码中,直接基于执行环境调用 setRuntimeMode 方法,传入 BATCH 模式。
建议: 不要在代码中配置,而是使用命令行。这同设置并行度是类似的:在提交作业时指
定参数可以更加灵活,同一段应用程序写好之后,既可以用于批处理也可以用于流处理。而在
代码中硬编码(hard code)的方式可扩展性比较差,一般都不推荐。
**2. ****什么时候选择 ****BATCH ****模式 **
我们知道,Flink 本身持有的就是流处理的世界观,即使是批量数据,也可以看作“有界
流”来进行处理。所以 STREAMING 执行模式对于有界数据和无界数据都是有效的;而 BATCH
模式仅能用于有界数据。
看起来 BATCH 模式似乎被 STREAMING 模式全覆盖了,那还有必要存在吗?我们能不
能所有情况下都用流处理模式呢?
当然是可以的,但是这样有时不够高效。
我们可以仔细回忆一下 word count 程序中,批处理和流处理输出的不同:在 STREAMING
模式下,每来一条数据,就会输出一次结果(即使输入数据是有界的);而 BATCH 模式下,
只有数据全部处理完之后,才会一次性输出结果。最终的结果两者是一致的,但是流处理模式
会将更多的中间结果输出。在本来输入有界、只希望通过批处理得到最终的结果的场景下,
STREAMING 模式的逐个输出结果就没有必要了。
所以总结起来,一个简单的原则就是:用 BATCH 模式处理批量数据,用 STREAMING
模式处理流式数据。因为数据有界的时候,直接输出结果会更加高效;而当数据无界的时候, 我
们没得选择——只有 STREAMING 模式才能处理持续的数据流.
当然,在后面的示例代码中,即使是有界的数据源,我们也会统一用 STREAMING 模式
处理。这是因为我们的主要目标还是构建实时处理流数据的程序,有界数据源也只是我们用来
测试的手段。
触发程序执行
有了执行环境,我们就可以构建程序的处理流程了:基于环境读取数据源,进而进行各种
转换操作,最后输出结果到外部系统。
需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当 main()方法被调用
时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据
——因为数据可能还没来。Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,
这也被称为“延迟执行”或“懒执行”(lazy execution)。
所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一
直等待作业完成,然后返回一个执行结果(JobExecutionResult)。
env.execute();
数据源操作
读取kafka数据源操作
Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传
输方式,恰恰和流处理是完全一致的。所以可以说 Kafka 和 Flink 天生一对,是当前处理流式
数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分
析计算,这样的架构已经成为众多企业的首选,如图 5-3 所示。
略微遗憾的是,与 Kafka 的连接比较复杂,Flink 内部并没有提供预实现的方法。所以我
们只能采用通用的 addSource 方式、实现一个 SourceFunction 了。
好在Kafka与Flink确实是非常契合,所以Flink官方提供了连接工具flink-connector-kafka,
直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的
SourceFunction。
所以想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。Flink 官
方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本
只支持 0.10.0 版本以上的 Kafka,读者使用时可以根据自己安装的 Kafka 版本选定连接器的依
赖版本。这里我们需要导入的依赖如下。
<dependency><groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class SourceKafkaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> stream = env.addSource(new
FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
stream.print("Kafka");
env.execute();
}
}
创建 FlinkKafkaConsumer 时需要传入三个参数:
第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic
列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据
时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。
第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消
息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中
使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数
组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是
公共接口,所以我们也可以自定义反序列化逻辑。
第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。
自定义Source
大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,
我们想要读取的数据源来自某个外部系统,而 flink 既没有预实现的方法、也没有提供连接器,
又该怎么办呢?
那就只好自定义实现 SourceFunction 了。
接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:
run()和 cancel()。
run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
⚫ cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。
代码如下:
我们先来自定义一下数据源:
package com.atmk.stream.app;
import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
/**
* @author:lss
* @date:2022/11/3 17:18
* @description:some
*/
public class ClickSource implements SourceFunction<Event> {
//声明一个变量,作为控制数据生成的标识位
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
//在指定数据集中随机选取数据
Random random = new Random();
String[] users = {"Mary","Bob","Alice","Cary"};
String[] urls = {"./home","./cart","./fav","./prod?id=1"};
while (running){
ctx.collect(new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
));
//隔一秒生成一个点击事件,方面观测
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
这个数据源,我们后面会频繁使用,所以在后面的代码中涉及到 ClickSource()数据源,使
用上面的代码就可以了。
下面的代码我们来读取一下自定义的数据源。有了自定义的 source function,接下来只要
调用 addSource()就可以了:
env.addSource(new ClickSource())
下面是完整的代码:
package com.atmk.stream.app;
import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author:lss
* @date:2022/11/3 17:26
* @description:some
*/
public class SourceCustom {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用自定义的source function,调用addSource方法
DataStreamSource<Event> stream = env.addSource(new ClickSource());
stream.print("SourceCustom");
env.execute();
}
}
这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1,如果数据源设
置为大于 1 的并行度,则会抛出异常。如下程序所示:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class SourceThrowException {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new ClickSource()).setParallelism(2).print();
env.execute();
}
}
输出的异常如下:
Exception in thread "main" java.lang.IllegalArgumentException: The parallelism
of non parallel operator must be 1.
所以如果我们想要自定义并行的数据源的话,需要使用 ParallelSourceFunction,示例程序
如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
public class ParallelSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CustomSource()).setParallelism(2).print();
env.execute();
}
public static class CustomSource implements ParallelSourceFunction<Integer>
{
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while (running) {
sourceContext.collect(random.nextInt());
}
}
@Override
public void cancel() {
running = false;
}
}
}
输出结果如下:
2> -686169047
2> 429515397
2> -223516288
2> 1137907312
2> -380165730
2> 2082090389
版权归原作者 weixin_lss 所有, 如有侵权,请联系我们删除。