0


Flink Sql(二) Kafka连接器

Kafka连接器

​ 在 Table API 和 SQL 编写的 Flink 程序中,可以在创建表的时候用 WITH 子句指定连接器 (connector),这样就可以连接到外部系统进行数据交互了。

​ 架构中的 TableSource 负责从外部系统中读取数据并转换成表,TableSink 则负责将结果表 写入外部系统。在 Flink 1.13 的 API 调用中,已经不去区分 TableSource 和 TableSink,我们只要建立到外部系统的连接并创建表就可以,Flink 自动会从程序的处理逻辑中解析出它们的用途。

​ Flink 的 Table API 和 SQL 支持了各种不同的连接器。当然,最简单的其实就是连接到控制台打印输出:

CREATETABLE ResultTable (user STRING,
    cnt BIGINTWITH('connector'='print');

​ 这里只需要在 WITH 中定义 connector 为 print 就可以了。而对于其它的外部系统,则需要增加一些配置项。下面我们就分别进行介绍。

​ Kafka 的 SQL 连接器可以从 Kafka 的主题(topic)读取数据转换成表,也可以将表数据 写入 Kafka 的主题。换句话说,创建表的时候指定连接器为 Kafka,则这个表既可以作为输入表,也可以作为输出表。

1.引入依赖

想要在 Flink 程序中使用 Kafka 连接器,需要引入如下依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

​ 这里我们引入的 Flink 和 Kafka 的连接器,与之前 DataStream API 中引入的连接器是一样的。如果想在 SQL 客户端里使用 Kafka 连接器,还需要下载对应的 jar 包放到 lib 目录下。

​ 另外,Flink 为各种连接器提供了一系列的“表格式”(table formats),比如 CSV、JSON、 Avro、Parquet 等等。这些表格式定义了底层存储的二进制数据和表的列之间的转换方式,相当于表的序列化工具。对于 Kafka 而言,CSV、JSON、Avro 等主要格式都是支持的, 根据 Kafka 连接器中配置的格式,我们可能需要引入对应的依赖支持。

以 CSV 为例:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency>

​ 由于 SQL 客户端中已经内置了 CSV、JSON 的支持,因此使用时无需专门引入;而对于 没有内置支持的格式(比如 Avro),则仍然要下载相应的 jar 包。关于连接器的格式细节详见官网说明,我们后面就不再讨论了。

2. 创建连接到 Kafka 的表

​ 创建一个连接到 Kafka 表,需要在 CREATE TABLE 的 DDL 中在 WITH 子句里指定连接 器为 Kafka,并定义必要的配置参数。

​ 下面是一个具体示例:

CREATETABLE KafkaTable (`user` STRING,`url` STRING,`ts`TIMESTAMP(3) METADATA FROM'timestamp')WITH('connector'='kafka','topic'='events','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv')

​ 这里定义了 Kafka 连接器对应的主题(topic),Kafka 服务器,消费者组 ID,消费者起始 模式以及表格式。需要特别说明的是,在 KafkaTable 的字段中有一个 ts,它的声明中用到了 METADATA FROM,这是表示一个“元数据列”(metadata column),它是由 Kafka 连接器的 元数据“timestamp”生成的。这里的 timestamp 其实就是 Kafka 中数据自带的时间戳,我们把 它直接作为元数据提取出来,转换成一个新的字段 ts.

3. Upsert Kafka

​ 正常情况下,Kafka 作为保持数据顺序的消息队列,读取和写入都应该是流式的数据,对 应在表中就是仅追加(append-only)模式。如果我们想要将有更新操作(比如分组聚合)的结 果表写入 Kafka,就会因为 Kafka 无法识别撤回(retract)或更新插入(upsert)消息而导致异常。

​ 为了解决这个问题,Flink 专门增加了一个“更新插入 Kafka”(Upsert Kafka)连接器。这 个连接器支持以更新插入(UPSERT)的方式向 Kafka 的 topic 中读写数据。

​ 具体来说,Upsert Kafka 连接器处理的是更新日志(changlog)流。如果作为 TableSource, 连接器会将读取到的 topic中的数据(key, value),解释为对当前 key 的数据值的更新(UPDATE), 也就是查找动态表中 key 对应的一行数据,将 value 更新为最新的值;因为是 Upsert 操作,所 以如果没有 key 对应的行,那么也会执行插入(INSERT)操作。另外,如果遇到 value 为空 (null),连接器就把这条数据理解为对相应 key 那一行的删除(DELETE)操作。

​ 如果作为 TableSink,Upsert Kafka 连接器会将有更新操作的结果表,转换成更新日志 (changelog)流。如果遇到插入(INSERT)或者更新后(UPDATE_AFTER)的数据,对应 的是一个添加(add)消息,那么就直接正常写入 Kafka 主题;如果是删除(DELETE)或者 更新前的数据,对应是一个撤回(retract)消息,那么就把 value 为空(null)的数据写入 Kafka。 由于 Flink 是根据键(key)的值对数据进行分区的,这样就可以保证同一个 key 上的更新和删除消息都会落到同一个分区中。

​ 下面是一个创建和使用 Upsert Kafka 表的例子:

CREATETABLE pageviews_per_region (
 user_region STRING,
 pv BIGINT,
 uv BIGINT,PRIMARYKEY(user_region)NOT ENFORCED
)WITH('connector'='upsert-kafka','topic'='pageviews_per_region','properties.bootstrap.servers'='...','key.format'='avro','value.format'='avro');CREATETABLE pageviews (
 user_id BIGINT,
 page_id BIGINT,
 viewtime TIMESTAMP,
 user_region STRING,
 WATERMARK FOR viewtime AS viewtime -INTERVAL'2'SECOND)WITH('connector'='kafka','topic'='pageviews','properties.bootstrap.servers'='...','format'='json');-- 计算 pv、uv 并插入到 upsert-kafka 表中INSERTINTO pageviews_per_region
SELECT
 user_region,COUNT(*),COUNT(DISTINCT user_id)FROM pageviews
GROUPBY user_region;

​ 这里我们从 Kafka 表 pageviews 中读取数据,统计每个区域的 PV(全部浏览量)和 UV (对用户去重),这是一个分组聚合的更新查询,得到的结果表会不停地更新数据。为了将结 果表写入 Kafka 的 pageviews_per_region 主题,我们定义了一个 Upsert Kafka 表,它的字段中 需要用PRIMARY KEY来指定主键,并且在WITH子句中分别指定key和value的序列化格式。

标签: kafka flink sql

本文转载自: https://blog.csdn.net/qq_42575907/article/details/125836153
版权归原作者 ambitfly 所有, 如有侵权,请联系我们删除。

“Flink Sql(二) Kafka连接器”的评论:

还没有评论