Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的。
与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是通过调用 DataStream 的 addSink()方法实现的。
stream.addSink(new SinkFunction(…))
SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。
像 Kafka 之类流式系统,Flink 提供了完美对接,Source/Sink 两端都能连接,可读可写;而对于 Elasticsearch、文件系统(FileSystem)、JDBC 等数据存储系统,则只提供了输出写入的 Sink 连接器。
除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一些其他第三方系统与 Flink 的连接器。
除此以外,就需要用户自定义实现 Sink 连接器了。
输出到文件
Flink 为此专门提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。
StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件输出的方式。它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件。
StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:
⚫ 行编码:StreamingFileSink.forRowFormat t(basePath,rowEncoder)。
⚫ 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。
在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。
val stream = env.addSource(new ClickSource)
val fileSink = StreamingFileSink
.forRowFormat(new Path("./output"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) //至少包含 15 分钟的数据
.withMaxPartSize(128 * 1024 * 1024)//文件大小已达到 128m
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) // 最近 5 分钟没有收到新的数据
.build()
)
.build()
stream.map(_.toString).addSink(fileSink)
输出到kafka
Flink 官方为 Kafka 提供了 Source 和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。
val topic = "mytest"
val bootstrap = "192.168.0.30:9092"
stream.map(_.toString).addSink(new FlinkKafkaProducer[String](
bootstrap,
topic,
new SimpleStringSchema()
))
输出到redis
增加pom文件中依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
示例代码如下:
class MyRedisSink extends RedisMapper[(String, String)] {
override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET)
override def getKeyFromData(data: (String, String)): String = data._1
override def getValueFromData(data: (String, String)): String = data._2
}
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("192.168.0.30")
.setDatabase(2)
.build()
stream.addSink(new RedisSink[(String, String)](conf, new MyRedisSink))
这里 RedisSink 的构造方法需要传入两个参数:
⚫ JFlinkJedisConfigBase:Jedis 的连接配置。
⚫ RedisMapper:Redis 映射类接口,说明怎样将数据转换成可以写入 Redis 的类型。
输出到ElasticSearch
增加pom文件中依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
代码如下:
//输出到es
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("192.168.0.30", 9200, "http"))
// 定义一个ES sink function
val esFun = new ElasticsearchSinkFunction[Event] {
override def process(t: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
val data = new util.HashMap[String, String]()
data.put("url", t.url)
data.put("user", t.user)
// 包装要发送的http请求
val indexRequest = Requests.indexRequest()
.index("clicks")
.source(data)
.`type`("_doc")
.id(t.user + Random.nextInt(111))
// 发送请求
requestIndexer.add(indexRequest)
}
}
stream.addSink(new ElasticsearchSink.Builder[Event](httpHosts, esFun).build())
与 RedisSink 类 似 , 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用 ElasticsearchSink 的 Builder 内部静态类,调用它的 build()方法才能创建出真正的SinkFunction。
而 Builder 的构造方法中又有两个参数:
⚫ httpHosts:连接到的 Elasticsearch 集群主机列表。
⚫ elasticsearchSinkFunction:这并不是我们所说的 SinkFunction,而是用来说明具体处理逻辑、准备数据向 Elasticsearch 发送请求的函数。
具体的操作需要重写中 elasticsearchSinkFunction 中的 process()方法,我们可以将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求。
输出到MySQL(JDBC)
增加pom文件中依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
默认已经创建好相关表。
// 输出到MySQL
stream.addSink(
JdbcSink.sink(
"insert into clicks(user,url) values(?,?)",
new JdbcStatementBuilder[Event] {
override def accept(t: PreparedStatement, u: Event): Unit = {
t.setString(1, u.user)
t.setString(2, u.url)
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
)
)
自定义输出
与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkFunction抽象类,只要实现它,通过简单地调用 DataStream 的 addSink()方法就可以自定义写入任何外部存储。
版权归原作者 不加班程序员 所有, 如有侵权,请联系我们删除。