0


Flink 流处理流程 API详解

流处理API的衍变

Storm

TopologyBuilder

构建图的工具,然后往图中添加节点,指定节点与节点之间的有向边是什么。构建完成后就可以将这个图提交到远程的集群或者本地的集群运行。

Flink

:不同之处是面向数据本身的,会把

DataStream

抽象成一个本地集合,通过面向集合流的编程方式进行代码编写。两者没有好坏之分,

Storm

比较灵活自由。更好的控制。在工业界Flink会更好点。开发起来比较简单、高效。经过一些列优化、转化最终也会像

Storm

一样回到底层的抽象。

Strom API

是面向操作的,偏向底层。

Flink

面向数据,相对高层次一些。
在这里插入图片描述

流处理的简单流程

其他分布式处理引擎一样,Flink应用程序也遵循着一定的编程模式。不管是使用

DataStream API

还是

DataSet API

基本具有相同的程序结构,如下代码清单所示。通过流式计算的方式实现对文本文件中的单词数量进行统计,然后将结果输出在给定路径中。

publicclassFlinkWordCount{publicstaticvoidmain(String[] args)throwsException{//  1、获取运行环境finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//  2、通过socket获取源数据DataStreamSource<String> sourceData = env.socketTextStream("192.168.52.12",9000);/**
         *  3、数据源进行处理
         *  flatMap方法与spark一样,对数据进行扁平化处理
         *  将每行的单词处理为<word,1>
         */DataStream<Tuple2<String,Integer>> dataStream = sourceData.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){publicvoidflatMap(String s,Collector<Tuple2<String,Integer>> collector)throwsException{String[] words = s.split(" ");for(String word : words){
                    collector.collect(newTuple2<String,Integer>(word,1));}}})// 相同的单词进行分组.keyBy(0)//  聚合数据.sum(1);//  4、将数据流打印到控制台
        dataStream.print();/**
         *  5、Flink与Spark相似,通过action进行出发任务执行,其他的步骤均为lazy模式
         *  这里env.execute就是一个action操作,触发任务执行
         */
        env.execute("streaming word count");}}

整个

Flink

程序一共分为5步,分别为设定

Flink

执行环境、创建和加载数据集、对数据集指定转换操作逻辑、指定计算结果输出位置、调用

execute

方法触发程序执行。对于所有的

Flink

应用程序基本都含有这

5

个步骤,下面将详细介绍每个步骤。
在这里插入图片描述

操作概览

如果给你一串数据你会怎么去处理它?
在这里插入图片描述
【1】基于单条记录进行

Filter

Map

【2】基于窗口

window

进行计算,例如小时数,看到的就不一定是单数。
【3】有时会可能会合并多条流

union

(多个数据流合并成一个大的流)、

Join

(多条流按照一定的条件进行合并)、

connect

(针对多种不同类型的流进行合并)。
【4】有时候需要将一条流拆分成多个流,例如

split

,然后针对特殊的流进行特殊操作。

DataStream 基本转换

在这里插入图片描述
【1】对

DataStream

进行一对一转换,输入是

SataStream

输出也是

DataStream

。比较有代表性的,例如

map


【2】将一条

DataStream

拆分成多条,例如使用

split

,并给划分后的每一个结果都打上一个标签;
【3】通过调用

SplitStream

对象的

select

方法,根据标签抽取一个感兴趣的流,它也是一个

DataStream

对象。
【4】把两条流通过

connect

合并成一个

ConnectedSteam

,对

ConnectedSteam

流的操作可能与

DataStream

流的操作有不太一样的地方。

ConnectedSteam

中不同类型的流在处理的时候对应不同的 process 方法,他们都位于同一个 function中,会存在一些共享的数据信息。我们在后期做一些底层的join操作的时候都会用到这个

ConnectedSteam


【5】对

ConnectedSteam

也可以做类似于

Map

的一些操作,它的操作名叫

coMap

,但是在

API

中写法是

Map


【6】我们可以对流按照时间或者个数进行一些切分,可以理解为将无线的流分成一个一个的单位流,怎么切分根据用户自定的逻辑决定的。例如调用

windowAll

生成一个

AllWindowedStream


【7】我们对

AllWindowedStream

去应用自己的一些业务逻辑

apply

,最终形成原始的

DataStream


【8】对

DataStream

进行

keyBy

进行分组操作形成

KeyedStream


【9】我们不能对普通的

DataStream

reduce

操作,只能对

KeyedStream

进行

reduce

。主要出于计算量的考虑。
【10】我们也可以对

KeyedStream

进行

window

操作形成

WindowedStream


【11】我们对

WindowedStream

进行

apply

操作,形成原始的

DataStream

操作。

Environment 执行环境

【1】

getExecutionEnvironment

:创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,与就是说,执行环境决定了程序执行在什么环境

getExecutionEnvironment

会根据查询运行的方式返回不同的运行环境,是最常用的一种创建执行环境的方式。批量处理作业和流式处理作业分别使用的是不同的

ExecutionEnvironment

。例如

StreamExecutionEnvironment

是用来做流式数据处理环境,

ExecutionEnvironment

是批量数据处理环境。

//流处理StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//块梳理ExecutionEnvironment executionEnvironment =ExecutionEnvironment.getExecutionEnvironment();

如果没有设置并行度,会以

flink-conf.yaml

中的配置为准,默认为

1

parallelism.default:1
//可以设置并行度(优先级最高)
env.setParallelism(1);

如果是本地执行环境底层调用的是

createLocalEnvironment

:需要在调用时指定默认的并行度

val env =StreamExecutionEnvironment.createLocalEnvironment(1)

如果是集群执行环境

createRemoteEnvironment

:将

Jar

提交到远程服务器,需要在调用时指定

JobManager

IP

和端口号,并指定要在集群中运行的Jar包。

flink

将这两种都进行了包装,方便我们使用。

var env =ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname",6123,"YOURPATH//wordcount.jar")

Source 初始化数据

创建完成

ExecutionEnvironment

后,需要将数据引入到

Flink

系统中。

ExecutionEnvironment

提供不同的数据接入接口完成数据的初始化,将外部数据转换成

DataStream

DataSet

数据集。如以下代码所示,通过调用

readTextFile()

方法读取

file:///pathfile

路径中的数据并转换成

DataStream

数据集。我们可以吧

streamSource

看做一个集合进行处理。

//readTextFile读取文本文件的连接器 streamSource 可以想象成一个集合DataStreamSource<String> streamSource = env.readTextFile("file:///path/file");//从集合中读取数据 scala
val stream1:DataStream[类]= env.fromCollection(list(类,类))//socket文本流      使用的比较少
val stream3 = env.socketTextStream("localhost",777);//直接传数据,测试用,可以传入任何数据类型,最终会转化为 TypeInformation
val stream5 = env.fromElements(1,4,"333");/**重要,常见的是从 kafka 中读取,需要引入插件。
 *<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
 *<dependency>
 *    <groupId>org.apache.flink</groupId>
 *    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
 *    <version>1.10.0</version>
 *</dependency>
 */// kafkaConsumer 需要的配置参数
val props =newProperties()// 定义kakfa 服务的地址,不需要将所有broker指定上
props.put("bootstrap.servers","hadoop1:9092")// 制定consumer group
props.put("group.id","test")// 是否自动确认offset
props.put("enable.auto.commit","true")// 自动确认offset的时间间隔
props.put("auto.commit.interval.ms","1000")// key的序列化类
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")// value的序列化类
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")//从kafka读取数据,需要实现 SourceFunction 他给我们提供了一个
env.addSource(newFlinkKafkaConsumer011[String]("sensor",newSimpleStringSchema(),props));
Flink

输出至

Reids

Flink

输出至

ES

Flink

输入输出至

Kafka

通过读取文件并转换为

DataStream[String]

数据集,这样就完成了从本地文件到分布式数据集的转换,同时在

Flink

中提供了多种从外部读取数据的连接器,包括批量和实时的数据连接器,能够将

Flink

系统和其他第三方系统连接,直接获取外部数据。批处理读取文件的时候,是读取完之后进行输出的。流处理是读一个处理一个。

Topic

测试:启动

zk

kafka

并创建

Topic="sensor"
[root@hadoop3 kafka_2.11-2.2.2]# ./bin/kafka-console-producer.sh --broker-list hadoop2:9092--topic sensor
>333
Idea

项目启动后,就会接收到传送过来的信息:
在这里插入图片描述
用户自定义一个数据来源类针对特殊的数据源,或者制造测试数据。这里重要针对测试数据。

packagecom.zzx.flinkimportjava.util.Propertiesimportorg.apache.flink.api.common.serialization.SimpleStringSchemaimportorg.apache.flink.api.java.utils.ParameterToolimportorg.apache.flink.streaming.api.functions.source.SourceFunctionimportorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011importscala.util.Random

object StreamWordCount{
  def main(args:Array[String]):Unit={// 创建一个流处理执行环境
    val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment
    // 添加用户定义的数据源
    val stream5 = env.addSource(newMySensorSource())
    stream5.print();//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
    env.execute("stream word count word")}//实现一个自定义的 SourceFunction,自动生成测试数据classMySensorSource()extendsSourceFunction[SensorReading]{//定义一个 flag,表示数据源是否正常运行var running:Boolean=true;//运行,不停的通过 ctx 发出需要流式处理的数据,现在我们直接在内部生成
    override def run(sourceContext:SourceFunction.SourceContext[SensorReading]):Unit={//定义一个随机数发生器
      val rand =newRandom()//随机生成10个传感器的问题值,并且不断在之前温度基础上更新(随机上下波动)//首先生成10个传感器的初始温度var curTemps =1.to(10).map(
        i =>("sensor_"+i,60+ rand.nextGaussian()*10))//无线循环,生成随机数据流while(running){//在当前文段基础上,随机生成微小波动
        curTemps = curTemps.map(
          data =>(data._1,data._2+rand.nextGaussian()))//获取当前系统时间
        val curTs =System.currentTimeMillis()//包装成样例,用 ctx发出数据
        curTemps.foreach(
          data => sourceContext.collect(SensorReading(data._1,curTs,data._2)))//定义间隔时间Thread.sleep(1000);}}//停止
    override def cancel():Unit= running =false}caseclassSensorReading(id:String, timestamp:Long, temperature:Double)}

输出结果展示:
在这里插入图片描述

Transform 执行转换操作

Transform

可以理解为从

source

开始到

sink

输出之间的所有操作都是

Transform

。数据从外部系统读取并转换成

DataStream

或者

DataSet

数据集后,下一步就将对数据集进行各种转换操作。

Flink

中的

Transformation

操作都是通过不同的

Operator

来实现,每个

Operator

内部通过实现

Function

接口完成数据处理逻辑的定义。在

DataStream API

DataSet API

提供了大量的转换算子,例如

map

(一个输入一个输出转换)、

flatMap

(将数据打散,一个输入多个输出)、

filter

(添加过滤条件)、

keyBy

等,用户只需要定义每种算子执行的函数逻辑,然后应用在数据转换操作

Dperator

接口中即可。如下代码实现了对输入的文本数据集通过

FlatMap

算子转换成数组,然后过滤非空字段,将每个单词进行统计,得到最后的词频统计结果。

DataStream<Tuple2<String,Integer>> dataStream = sourceData.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){publicvoidflatMap(String s,Collector<Tuple2<String,Integer>> collector)throwsException{String[] words = s.split(" ");for(String word : words){
            collector.collect(newTuple2<String,Integer>(word,1));}}// keyBy 相同的单词进行分组,sum聚合数据}).keyBy(0).sum(1);

在上述代码中,通过 Java接口处理数据,极大地简化数据处理逻辑的定义,只需要通过传入相应

Lambada

计算表达式,就能完成

Function

定义。特殊情况下用户也可以通过实现

Function

接口来完成定义数据处理逻辑。然后将定义好的

Function

应用在对应的算子中即可。

Flink

中定义

Funciton

的计算逻辑可以通过如下几种方式完成定义。
【1】通过创建

Class

实现

Funciton

接口

Flink

中提供了大量的函数供用户使用,例如以下代码通过定义

MyMapFunction Class

实现

MapFunction

接口,然后调用

DataStream

map()

方法将

MyMapFunction

实现类传入,完成对实现将数据集中字符串记录转换成大写的数据处理。

publicclassFlinkWordCount{publicstaticvoidmain(String[] args)throwsException{DataStreamSource<String> sourceData = env.socketTextStream("192.168.52.12",9000);//......//数据源进行处理
        sourceData.map(newMyMapFunciton());//......}}classMyMapFuncitonimplementsMapFunction<String,String>{@OverridepublicStringmap(String s)throwsException{return s.toUpperCase();}}

【2】通过创建匿名类实现

Funciton

接口
除了以上单独定义

Class

来实现

Function

接口之处,也可以直接在

map()

方法中创建匿名实现类的方式定义函数计算逻辑。

DataStreamSource<String> sourceData = env.socketTextStream("192.168.52.12",9000);//通过创建 MapFunction 匿名函数来定义 Map 函数计算逻辑
sourceData.map(newMapFunction<String,String>(){@OverridepublicStringmap(String s)throwsException{//实现字符串大写转换return s.toUpperCase();}});

【3】通过实现

RichFunciton

接口
前面提到的转换操作都实现了

Function

接口,例如

MapFunction

FlatMapFunction

接口,在

Flink

中同时提供了

RichFunction

接口,主要用于比较高级的数据处理场景,

RichFunction

接口中有

open

close

getRuntimeContext

setRuntimeContext

等方法来获取状态,缓存等系统内部数据。和

MapFunction

相似,

RichFunction

子类中也有

RichMapFunction

,如下代码通过实现

RichMapFunction

定义数据处理逻辑。

sourceData.map(newRichFunction(){@Overridepublicvoidopen(Configuration configuration)throwsException{}@Overridepublicvoidclose()throwsException{}@OverridepublicRuntimeContextgetRuntimeContext(){returnnull;}@OverridepublicIterationRuntimeContextgetIterationRuntimeContext(){returnnull;}@OverridepublicvoidsetRuntimeContext(RuntimeContext runtimeContext){}});

分区

Key

指定:在 DataStream数据经过不同的算子转换过程中,某些算子需要根据指定的key进行转换,常见的有

join

coGroup

groupBy

类算子,需要先将

DataStream

DataSet

数据集转换成对应的

KeyedStream

GroupedDataSet

,主要目的是将相同

key

值的数据路由到相同的

Pipeline

中,然后进行下一步的计算操作。需要注意的是,在

Flink

中这种操作并不是真正意义上将数据集转换成

Key-Value

结构,而是一种虚拟的

key

,目的仅仅是帮助后面的基于

Key

的算子使用,分区人

Key

可以通过两种方式指定:
【1】根据字段位置指定

DataStream API

中通过

 keyBy()

方法将

DataStream

数据集根据指定的

key

转换成重新分区的

KeyedStream

,如以下代码所示,对数据集按照相同

key

进行

sum()

聚合操作。

// 根据第一个字段进行重分区,相同的单词进行分组。第二个字段进行求和运算
dataStream.keyBy(0).sum(1);

DataSet API

中,如果对数据根据某一条件聚合数据,对数据进行聚合时候,也需要对数据进行重新分区。如以下代码所示,使用

DataSet API

对数据集根据第一个字段作为

GroupBy

key

,然后对第二个字段进行求和运算。

// 根据第一个字段进行重分区,相同的单词进行分组。max 求相同key下的最大值
dataStream.groupBy(0).max(1);

【2】根据字段名称指定

KeyBy

GroupBy

Key

除了能够通过字段位置来指定之外,也可以根据字段的名称来指定。使用字段名称需要

DataStream

中的数据结构类型必须是

Tuple

类或者

POJOs

类的。如以下代码所示,通过指定

name

字段名称来确定

groupby

key

字段。

DataStreamSource<Persion> sourceData = env.fromElements(newPersion("zzx",18));//使用 name 属性来确定 keyBy
sourceData.keyBy("name").sum("age");

如果程序中使用

Tuple

数据类型,通常情况下字段名称从1开始计算,字段位置索引从0开始计算,以下代码中两种方式是等价的。

//通过位置指定第一个字段
dataStream.keyBy(0).sum(1);//通过名称指定第一个字段名称
dataStream.keyBy("_1").sum("_2");

【3】通过

Key

选择器指定
另外一种方式是通过定义

Key Selector

来选择数据集中的

Key

,如下代码所示,定义

KeySelector

,然后复写

getKey

方法,从

Person

对象中获取

name

为指定的

Key

DataStreamSource<Persion> persionData = env.fromElements(newPersion("zzx",18));
persionData.keyBy("name").sum("age");
persionData.keyBy(newKeySelector<Persion,Object>(){@OverridepublicObjectgetKey(Persion persion)throwsException{return persion.getName();}});

理解 KeyedStream

基于

key

HashCode

重分区,同一个

key

只能在同一个分区内处理,一个分区内可以有不同的

key

DataStream -> KeyedStream

:逻辑地将一个

key

在这里插入图片描述

object StreamWordCount{
  def main(args:Array[String]):Unit={// 创建一个流处理执行环境
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据并转换为 类
    val inputStreamFromFile:DataStream[String]= env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换
    val dataStream:DataStream[SensorReading]= inputStreamFromFile
        .map( data =>{var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)}).keyBy(newMyIdSeletector())//  .sum("temperature")//reduce:传入一个函数,函数类型都一样,每一次都是在之前的基础上结合当前新输入的数据得到一个进一步聚合的结果//需求:输出最大的timestamp 最小的温度值 类型不能变.reduce(newMyReduce)/* .reduce((curRes,newData) =>
          SensorReading(curRes.id,curRes.timestamp.max(newData.timestamp),curRes.temperature.min(newData.temperature)))*///aggregate:都是private类型,所有的滚动算子都会调到 aggregate。//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
    env.execute("stream word count word")}}caseclassSensorReading(id:String, timestamp:Long, temperature:Double)//自定义函数类,key选择器 输入类型SensorReading  返回 StringclassMyIdSeletector()extendsKeySelector[SensorReading,String]{
  override def getKey(in:SensorReading):String= in.id
}//自定义 ReduceclassMyReduceextendsReduceFunction[SensorReading]{
  override def reduce(t:SensorReading, t1:SensorReading):SensorReading={SensorReading(t.id,t.timestamp.max(t1.timestamp),t.temperature.min(t1.temperature))}}

结果展示:分布式处理,可能得到的最后一条时间戳不是最大的。
在这里插入图片描述
假设有一条数据流,可以利用窗口的操作,进行一些竖向的切分,得到就是一个个大的

AllWindowedStream

,再根据

keyBy()

进行横向切分,把数据流中不同类别任务输入到不同的算子中进行处理,不同的算子之间是并行的操作。同时不同的节点只需要维护自己的状态。前提是

key

数 >> 并发度
在这里插入图片描述

Split 分流操作

DataStream->SplitStream

根据某些特征把一个

DataStream

拆分成两个或者多个

DataStream

。但它并不是一个完整的分流操作,只是从逻辑上按照某种特征进行分词了。
在这里插入图片描述

Select

SplitStream->DataStream

:从一个

SplitStream

中获取一个或者多个

DataStream

在这里插入图片描述
案例:按照温度大于

30

和小于

30

进行分类

object StreamWordCount{
  def main(args:Array[String]):Unit={// 创建一个流处理执行环境
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据并转换为 类
    val inputStreamFromFile:DataStream[String]= env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换
    val dataStream:DataStream[SensorReading]= inputStreamFromFile
        .map( data =>{var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)}).keyBy(newMyIdSeletector()).sum("temperature")//分流
      val splitStream = dataStream.split(
        data =>{if(data.temperature >30)Seq("high")elseSeq("low")})
    val highTempStream:DataStream[SensorReading]= splitStream.select("high")
    val lowTempStream:DataStream[SensorReading]= splitStream.select("low")
    val allTempStream:DataStream[SensorReading]= splitStream.select("low","high")
    highTempStream.print("highTempStream")
    lowTempStream.print("lowTempStream")
    allTempStream.print("allTempStream")//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
  env.execute("stream word count word")}

输出结果展示:
在这里插入图片描述

Connect 合流操作

DataStream->ConnectedStreams

:连接两个保持他们类型的数据流,两个数据流被

Connect

之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
在这里插入图片描述

CoMap CoFlatMap

ConnectedStreams->DataStream

:作用于

ConnectedStream

上,功能与

map

flatMap

一样,对

ConnectedStreams

中的每一个

Stream

分别进行

map

flatMap

处理。
在这里插入图片描述

object StreamWordCount{
  def main(args:Array[String]):Unit={// 创建一个流处理执行环境
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据并转换为 类
    val inputStreamFromFile:DataStream[String]= env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换
    val dataStream:DataStream[SensorReading]= inputStreamFromFile
        .map( data =>{var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)}).keyBy(newMyIdSeletector())//  .sum("temperature")//reduce:传入一个函数,函数类型都一样,每一次都是在之前的基础上结合当前新输入的数据得到一个进一步聚合的结果//需求:输出最大的timestamp 最小的温度值 类型不能变.reduce(newMyReduce)//分流
      val splitStream = dataStream.split(
        data =>{if(data.temperature >60)Seq("high")elseSeq("low")})
    val lowTempStream:DataStream[SensorReading]= splitStream.select("low")//合流
    val warningStream:DataStream[(String,Double)]= highTempStream.map(
      data =>(data.id,data.temperature))

    val connectedStreams:ConnectedStreams[(String,Double),SensorReading]= warningStream.connect(lowTempStream)
    val reslutStream:DataStream[Object]= connectedStreams.map(
      warningData =>(warningData._1,warningData._2,"high temp waring"),
      lowTempSata =>(lowTempSata.id,"normal"))
    reslutStream.print("result");//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
    env.execute("stream word count word")}}

输出结果:
在这里插入图片描述

Union

DataStream->DataStream

:对两个或者两个以上的

DataStream

进行

union

操作,产生一个包含所有

DataStream

元素的新

DataStream

。不能把类型不匹配的流合并在一起,可以

Union

两个或两个之上的流。
在这里插入图片描述

val highTempStream:DataStream[SensorReading]= splitStream.select("high")
val lowTempStream:DataStream[SensorReading]= splitStream.select("low")
val allTempStream:DataStream[SensorReading]= splitStream.select("low","high")//union
val unionStream:DataStream[SensorReading]= highTempStream.union(lowTempStream).union(allTempStream)

sink 输出结果

数据集经过转换操作之后,形成最终的结果数据集,一般需要将数据集输出在外部系统中或者输出在控制台之上。在

Flink DataStream

DataSet

接口中定义了基本的数据输出方法,例如基于文件输出

writeAsText()

,基于控制台输出

print()

等。同时

Flink

在系统中定义了大量的

Connector

,方便用户和外部系统交互,用户可以直接通过调用

addSink()

添加输出系统定义的

DataSink

类算子,这样就能将数据输出到外部系统。以下实例调用

DataStream API

中的

writeAsText()

print()

方法将数据集输出在文件和客户端中。

//将数据流打印到控制台
dataStream.print();//将数据输出到文件中
dataStream.writeAsText("file://path/to/savenfile");//将数据输出到socket
reslutStream.writeToSocket(hostname : _root_.scala.Predef.String, port :java.lang.Integer, schema :org.apache.flink.api.common.serialization.SerializationSchema[T]):org.apache.flink.streaming.api.datastream.DataStreamSink[T]={/* compiled code */})//批处理才能使用,即将被弃用
reslutStream.writeAsCsv(path : _root_.scala.Predef.String, writeMode :org.apache.flink.core.fs.FileSystem.WriteMode)//需要传入自定义的 Sink  写入文件
inputStreamFromFile.addSink(StreamingFileSink.forRowFormat(newPath("D:\\de"),newSimpleStringEncoder[String]("UTF-8")).build())//写出 kafka
dataStream.addSink(newFlinkKafkaProducer011[String]("localhost:9092","sinkTest",newSimpleStringSchema()))

execute 程序触发

所有的计算逻辑全部操作定义好之后,需要调用

ExecutionEnvironment

execute()

方法来触发应用程序的执行,因为

flink

在执行前会先构建执行图,再执行。其中

execute()

方法返回的结果类型为

JobExecutionResult

,里面包含了程序执行的时间和累加器等指标。需要注意的是,

execute

方法调用会因为应用的类型有所不同,

DataStream

流式应用需要显性地指定

execute()

方法运行程序,如果不调用则

Flink

流式程序不会执行,但对于

DataSet API

输出算子中已经包含对

execute()

方法的调用,则不需要显性调用

execute()

方法,否则会出现程序异常。

//调用 StreamExecutionEnvironment 的 execute 方法执行流式应用程序
env.execute("App Name");

物理分组

在这里插入图片描述
如上,有两个

DataSource

实例

A1

A2

。不同颜色代表不同的实例,

Flink

为我们提供了比较完整的物理分组方案:

global()

作用就是无论你下游有多少个实例(

B

),上游的数据(

A

)都会发往下游的第一个实例(

B1

);

broadcast() 

广播,对上游的数据(

A

)复制很多份发往下游的所有实例(

B

),数据指数级的增长,数据量大时要注意;

forward()

当上下游并发度一致的时候一对一发送,否则会报错;

shuffle()

随机均匀分配;

rebalance()

轮询;

recale()

本地轮流分配,例如上图

A1

只能看到两个实例

B1

B2

partitionCustom()

自定义单播;

类型系统

Flink

它里面的抽象都是强类型的,与它自身的序列化和反序列化机制有关。这个引擎对类型信息知道的越多,就可以对数据进行更充足的优化,序列化与反序列化就会越快。每一个

DataStream

里面都需要有一个明确的类型和

TypeInformation

Flink

内置了如下类型,都提供了对应的

TypeInfomation


在这里插入图片描述

API 原理

在这里插入图片描述
一个

DataStream

是如何转化成另一个

DataStream

的,其实我们调用

map

方法的时候,

Flink

会给我们创建一个

OneInputTransformation

,需要一个

StreamOperator

参数

Flink

内部会有预先定义好的

StreamMap

转换的算子。

Operator

内部我们需要自定义一个

MapFunction

,一般

Function

才是我们写代码需要关注的点。如果需要更深一点就会写一些

Operator

标签: flink c# 大数据

本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/135006162
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。

“Flink 流处理流程 API详解”的评论:

还没有评论