0


Flink Connector 开发

Flink Streaming Connector

Flink

是新一代流批统一的计算引擎,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。**

Connector

的作用就相当于一个连接器**,连接

Flink

计算引擎跟外界存储系统。

Flink

里有以下几种方式,当然也不限于这几种方式可以跟外界进行数据交换:
【1】

Flink

里面预定义了一些

source

sink


【2】

Flink

内部也提供了一些

Boundled connectors


【3】可以使用第三方

Apache Bahir

项目中提供的连接器;
【4】是通过异步

IO

方式;

预定义的 source 和 sink

Flink

里预定义了一部分

source

sink

。在这里分了几类。
[点击并拖拽以移动] ​

基于文件的 source 和 sink

如果要从文本文件中读取数据,可以直接使用:

env.readTextFile(path)

就可以以文本的形式读取该文件中的内容。当然也可以使用:根据指定的

fileInputFormat

格式读取文件中的内容。

env.readFile(fileInputFormat, path)

如果数据在

Flink

内进行了一系列的计算,想把结果写出到文件里,也可以直接使用内部预定义的一些

sink

,比如将结果已文本或

csv

格式写出到文件中,可以使用

DataStream

writeAsText(path)

DataSet

writeAsCsv(path)

基于 Socket 的 Source 和 Sink

提供

Socket

host name

port

,可以直接用

StreamExecutionEnvironment

预定的接口

socketTextStream

创建基于

Socket

source

,从该

socket

中以文本的形式读取数据。当然如果想把结果写出到另外一个

Socket

,也可以直接调用

DataStream writeToSocket

//从 socket 中读取数据流
env.socketTextStream("localhost",777);//输出至 socket 
resultDataStream.writeToSocket("hadoop1",6666,newSimpleStringSchema())

基于内存 Collections、Iterators 的 Source

可以直接基于内存中的集合或者迭代器,调用

StreamExecutionEnvironment fromCollection

fromElements

构建相应的

source

。结果数据也可以直接

print

printToError

的方式写出到标准输出或标准错误。详细也可以参考

Flink

源码中提供的一些相对应的

Examples

来查看异常预定义

source

sink

的使用方法,例如

WordCount

SocketWindowWordCount

//从Java.util.Collection集合中读取数据作为数据源ArrayList<String> list =newArrayList<>(5);
list.add("flink");
list.add("scala");
list.add("spark");
list.add("hadoop");
list.add("hive");
env.fromCollection(list).print();//从Java.util.Collection集合中读取数据作为数据源
 env.fromElements("flink","scala","spark","hadoop","hive").print();

Bundled Connectors

Flink

里已经提供了一些绑定的

Connector

,例如

kafka source

sink

Es sink

等。读写

kafka

es

rabbitMQ

时可以直接使用相应

connector

api

即可。

虽然该部分是

Flink

项目源代码里的一部分,但是真正意义上不算作

Flink

引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交

Job

时候需要注意,

job

代码

jar

包中一定要将相应的

connetor

相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
[点击并拖拽以移动] ​

Apache Bahir 中的连接器

Apache Bahir

最初是从

Apache Spark

中独立出来项目提供,以提供不限于

Spark

相关的扩展 / 插件、连接器和其他可插入组件的实现。通过提供多样化的流连接器

streaming connectors

SQL

数据源扩展分析平台的覆盖面。如有需要写到

flume

redis

的需求的话,可以使用该项目提供的

connector


[点击并拖拽以移动] ​

Async I/O

流计算中经常需要与外部存储系统交互,比如需要关联

MySQL

中的某个表。一般来说,如果用同步

I/O

的方式,会造成系统中出现大的等待时间,影响吞吐和延迟。为了解决这个问题,异步

I/O

可以并发处理多个请求,提高吞吐,减少延迟。

Async

的原理可参考官方文档
[点击并拖拽以移动] ​

标签: flink 大数据 java

本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/135397552
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。

“Flink Connector 开发”的评论:

还没有评论