Flink Streaming Connector
Flink
是新一代流批统一的计算引擎,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。**
Connector
的作用就相当于一个连接器**,连接
Flink
计算引擎跟外界存储系统。
Flink
里有以下几种方式,当然也不限于这几种方式可以跟外界进行数据交换:
【1】
Flink
里面预定义了一些
source
和
sink
;
【2】
Flink
内部也提供了一些
Boundled connectors
;
【3】可以使用第三方
Apache Bahir
项目中提供的连接器;
【4】是通过异步
IO
方式;
预定义的 source 和 sink
Flink
里预定义了一部分
source
和
sink
。在这里分了几类。
基于文件的 source 和 sink
如果要从文本文件中读取数据,可以直接使用:
env.readTextFile(path)
就可以以文本的形式读取该文件中的内容。当然也可以使用:根据指定的
fileInputFormat
格式读取文件中的内容。
env.readFile(fileInputFormat, path)
如果数据在
Flink
内进行了一系列的计算,想把结果写出到文件里,也可以直接使用内部预定义的一些
sink
,比如将结果已文本或
csv
格式写出到文件中,可以使用
DataStream
的
writeAsText(path)
和
DataSet
的
writeAsCsv(path)
。
基于 Socket 的 Source 和 Sink
提供
Socket
的
host name
及
port
,可以直接用
StreamExecutionEnvironment
预定的接口
socketTextStream
创建基于
Socket
的
source
,从该
socket
中以文本的形式读取数据。当然如果想把结果写出到另外一个
Socket
,也可以直接调用
DataStream writeToSocket
。
//从 socket 中读取数据流
env.socketTextStream("localhost",777);//输出至 socket
resultDataStream.writeToSocket("hadoop1",6666,newSimpleStringSchema())
基于内存 Collections、Iterators 的 Source
可以直接基于内存中的集合或者迭代器,调用
StreamExecutionEnvironment fromCollection
、
fromElements
构建相应的
source
。结果数据也可以直接
print
、
printToError
的方式写出到标准输出或标准错误。详细也可以参考
Flink
源码中提供的一些相对应的
Examples
来查看异常预定义
source
和
sink
的使用方法,例如
WordCount
、
SocketWindowWordCount
。
//从Java.util.Collection集合中读取数据作为数据源ArrayList<String> list =newArrayList<>(5);
list.add("flink");
list.add("scala");
list.add("spark");
list.add("hadoop");
list.add("hive");
env.fromCollection(list).print();//从Java.util.Collection集合中读取数据作为数据源
env.fromElements("flink","scala","spark","hadoop","hive").print();
Bundled Connectors
Flink
里已经提供了一些绑定的
Connector
,例如
kafka source
和
sink
,
Es sink
等。读写
kafka
、
es
、
rabbitMQ
时可以直接使用相应
connector
的
api
即可。
虽然该部分是
Flink
项目源代码里的一部分,但是真正意义上不算作
Flink
引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交
Job
时候需要注意,
job
代码
jar
包中一定要将相应的
connetor
相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
Apache Bahir 中的连接器
Apache Bahir
最初是从
Apache Spark
中独立出来项目提供,以提供不限于
Spark
相关的扩展 / 插件、连接器和其他可插入组件的实现。通过提供多样化的流连接器
streaming connectors
和
SQL
数据源扩展分析平台的覆盖面。如有需要写到
flume
、
redis
的需求的话,可以使用该项目提供的
connector
。
Async I/O
流计算中经常需要与外部存储系统交互,比如需要关联
MySQL
中的某个表。一般来说,如果用同步
I/O
的方式,会造成系统中出现大的等待时间,影响吞吐和延迟。为了解决这个问题,异步
I/O
可以并发处理多个请求,提高吞吐,减少延迟。
Async
的原理可参考官方文档
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。