0


flink执行环境和读取kafka以及自定义数据源操作

创建执行环境

    编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我 们 要 获 取 的 执 行 环 境 , 是 

StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的

方式,就是调用这个类的静态方法,具体有以下三种。

**1. getExecutionEnvironment **

    最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文 

直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar

包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方

法会根据当前运行的方式,自行决定该返回什么样的运行环境。

StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式。

**2. createLocalEnvironment **

    这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果 

不传入,则默认并行度就是本地的 CPU 核心数。

StreamExecutionEnvironment localEnv =

StreamExecutionEnvironment.createLocalEnvironment();

**3. createRemoteEnvironment **

    这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定 

要在集群中运行的 Jar 包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment

.createRemoteEnvironment(

"host", // JobManager 主机名

1234, // JobManager 进程端口号

"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包

);

    在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程 

序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。关于时间语义和容错

机制,我们会在后续的章节介绍。

执行模式(Execution Mode)

    上节中我们获取到的执行环境,是一个 StreamExecutionEnvironment,顾名思义它应该是 

做流处理的。那对于批处理,又应该怎么获取执行环境呢?

    在之前的 Flink 版本中,批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment 

的静态方法,返回它的对象:

// 批处理环境

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

// 流处理环境

StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

    基于 ExecutionEnvironment 读入数据创建的数据集合,就是 DataSet;对应的调用的一整 

套转换方法,就是 DataSet API。这些我们在第二章的批处理 word count 程序中已经有了基本

了解。

    而从 1.12.0 版本起,Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特 

性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序

在流处理和批处理之间切换。这样一来,DataSet API 也就没有存在的必要了。

**流执行模式(STREAMING) **

    这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式。

** 批执行模式(BATCH) **

    专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。 

对于不会持续计算的有界数据,我们用这种模式处理会更方便。

**自动模式(AUTOMATIC) **

    在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。 

**1. BATCH ****模式的配置方法 **

    由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。 

主要有两种方式:

(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

    在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。 

(2)通过代码配置

StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

    在代码中,直接基于执行环境调用 setRuntimeMode 方法,传入 BATCH 模式。 

    建议: 不要在代码中配置,而是使用命令行。这同设置并行度是类似的:在提交作业时指 

定参数可以更加灵活,同一段应用程序写好之后,既可以用于批处理也可以用于流处理。而在

代码中硬编码(hard code)的方式可扩展性比较差,一般都不推荐。

**2. ****什么时候选择 ****BATCH ****模式 **

    我们知道,Flink 本身持有的就是流处理的世界观,即使是批量数据,也可以看作“有界 

流”来进行处理。所以 STREAMING 执行模式对于有界数据和无界数据都是有效的;而 BATCH

模式仅能用于有界数据。

    看起来 BATCH 模式似乎被 STREAMING 模式全覆盖了,那还有必要存在吗?我们能不 

能所有情况下都用流处理模式呢?

    当然是可以的,但是这样有时不够高效。 

    我们可以仔细回忆一下 word count 程序中,批处理和流处理输出的不同:在 STREAMING 

模式下,每来一条数据,就会输出一次结果(即使输入数据是有界的);而 BATCH 模式下,

只有数据全部处理完之后,才会一次性输出结果。最终的结果两者是一致的,但是流处理模式

会将更多的中间结果输出。在本来输入有界、只希望通过批处理得到最终的结果的场景下,

STREAMING 模式的逐个输出结果就没有必要了。

    所以总结起来,一个简单的原则就是:用 BATCH 模式处理批量数据,用 STREAMING 

模式处理流式数据。因为数据有界的时候,直接输出结果会更加高效;而当数据无界的时候, 我

们没得选择——只有 STREAMING 模式才能处理持续的数据流.

    当然,在后面的示例代码中,即使是有界的数据源,我们也会统一用 STREAMING 模式 

处理。这是因为我们的主要目标还是构建实时处理流数据的程序,有界数据源也只是我们用来

测试的手段。

触发程序执行

    有了执行环境,我们就可以构建程序的处理流程了:基于环境读取数据源,进而进行各种 

转换操作,最后输出结果到外部系统。

    需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当 main()方法被调用 

时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据

——因为数据可能还没来。Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,

这也被称为“延迟执行”或“懒执行”(lazy execution)。

    所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一 

直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute();

数据源操作

读取kafka数据源操作

    Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传 

输方式,恰恰和流处理是完全一致的。所以可以说 Kafka 和 Flink 天生一对,是当前处理流式

数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分

析计算,这样的架构已经成为众多企业的首选,如图 5-3 所示。

    略微遗憾的是,与 Kafka 的连接比较复杂,Flink 内部并没有提供预实现的方法。所以我 

们只能采用通用的 addSource 方式、实现一个 SourceFunction 了。

    好在Kafka与Flink确实是非常契合,所以Flink官方提供了连接工具flink-connector-kafka, 

直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的

SourceFunction。

    所以想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。Flink 官 

方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本

只支持 0.10.0 版本以上的 Kafka,读者使用时可以根据自己安装的 Kafka 版本选定连接器的依

赖版本。这里我们需要导入的依赖如下。

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class SourceKafkaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> stream = env.addSource(new
FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
stream.print("Kafka");
 env.execute();
 }
}

创建 FlinkKafkaConsumer 时需要传入三个参数:

     第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic 

列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据

时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。

     第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消 

息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中

使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数

组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是

公共接口,所以我们也可以自定义反序列化逻辑。

     第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。

自定义Source

    大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况, 

我们想要读取的数据源来自某个外部系统,而 flink 既没有预实现的方法、也没有提供连接器,

又该怎么办呢?

    那就只好自定义实现 SourceFunction 了。 

    接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法: 

run()和 cancel()。

     run()方法:使用运行时上下文对象(SourceContext)向下游发送数据; 

⚫ cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

代码如下:

我们先来自定义一下数据源:

package com.atmk.stream.app;

import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

/**
 * @author:lss
 * @date:2022/11/3 17:18
 * @description:some
 */
public class ClickSource implements SourceFunction<Event> {
    //声明一个变量,作为控制数据生成的标识位
    private Boolean running = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
       //在指定数据集中随机选取数据
        Random random = new Random();
        String[] users = {"Mary","Bob","Alice","Cary"};
        String[] urls = {"./home","./cart","./fav","./prod?id=1"};
        while (running){
            ctx.collect(new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));
            //隔一秒生成一个点击事件,方面观测
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}
    这个数据源,我们后面会频繁使用,所以在后面的代码中涉及到 ClickSource()数据源,使 

用上面的代码就可以了。

    下面的代码我们来读取一下自定义的数据源。有了自定义的 source function,接下来只要 

调用 addSource()就可以了:

env.addSource(new ClickSource())

下面是完整的代码:

package com.atmk.stream.app;

import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author:lss
 * @date:2022/11/3 17:26
 * @description:some
 */
public class SourceCustom {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用自定义的source function,调用addSource方法
        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        stream.print("SourceCustom");
        env.execute();
    }
}
    这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1,如果数据源设 

置为大于 1 的并行度,则会抛出异常。如下程序所示:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class SourceThrowException {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.addSource(new ClickSource()).setParallelism(2).print();
 env.execute();
 }
}

输出的异常如下:

Exception in thread "main" java.lang.IllegalArgumentException: The parallelism

of non parallel operator must be 1.

所以如果我们想要自定义并行的数据源的话,需要使用 ParallelSourceFunction,示例程序

如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
public class ParallelSourceExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.addSource(new CustomSource()).setParallelism(2).print();
 env.execute();
 }
 public static class CustomSource implements ParallelSourceFunction<Integer> 
{
 private boolean running = true;
 private Random random = new Random();
 @Override
 public void run(SourceContext<Integer> sourceContext) throws Exception {
 while (running) {
 sourceContext.collect(random.nextInt());
 }
 }
 @Override
 public void cancel() {
 running = false;
 }
 }
}

输出结果如下:

2> -686169047

2> 429515397

2> -223516288

2> 1137907312

2> -380165730

2> 2082090389

标签: flink 大数据

本文转载自: https://blog.csdn.net/weixin_40659514/article/details/127671568
版权归原作者 weixin_lss 所有, 如有侵权,请联系我们删除。

“flink执行环境和读取kafka以及自定义数据源操作”的评论:

还没有评论