0


Flink中的JDBC SQL Connector

Flink中的JDBC SQL Connector

JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将数据写入数据。 本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。

如果在 DDL 上定义了主键,则 JDBC sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。

引入依赖

为了使用 JDBC 连接器,使用构建自动化工具(例如 Maven 或 SBT)的项目和带有 SQL JAR 包的 SQL 客户端都需要以下依赖项。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${SCALA_VERSION}</artifactId><version>${FLINK_VERSION}1.13.6</version></dependency>

连接到指定的数据库还需要驱动程序依赖项。 以下是当前支持的驱动程序:
DriverGroup IdArtifact IdMySQLmysqlmysql-connector-javaPostgreSQLorg.postgresqlpostgresqlDerbyorg.apache.derbyderby
JDBC 连接器和驱动程序目前不是 Flink 二进制发行版的一部分。 点此 查看如何与它们链接以进行集群执行。

如何创建一个JDBC表

jdbc表可以这样定义:

-- register a MySQL table 'users' in Flink SQLCREATETABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,statusBOOLEAN,PRIMARYKEY(id)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://localhost:3306/mydatabase','table-name'='users');-- write data into the JDBC table from the other table "T"INSERTINTO MyUserTable
SELECT id, name, age,statusFROM T;-- scan data from the JDBC tableSELECT id, name, age,statusFROM MyUserTable;-- temporal join the JDBC table as a dimension tableSELECT*FROM myTopic
LEFTJOIN MyUserTable FOR SYSTEM_TIME ASOF myTopic.proctime
ON myTopic.key= MyUserTable.id;

Connector 的全部配置项

OptionRequiredDefaultTypeDescriptionconnectorrequired(none)StringSpecify what connector to use, here should be

'jdbc'

.urlrequired(none)StringThe JDBC database url.table-namerequired(none)StringThe name of JDBC table to connect.driveroptional(none)StringThe class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.usernameoptional(none)StringThe JDBC user name.

'username'

and

'password'

must both be specified if any of them is specified.passwordoptional(none)StringThe JDBC password.connection.max-retry-timeoutoptional60sDurationMaximum timeout between retries. The timeout should be in second granularity and shouldn’t be smaller than 1 second.scan.partition.columnoptional(none)StringThe column name used for partitioning the input. See the following Partitioned Scan section for more details.scan.partition.numoptional(none)IntegerThe number of partitions.scan.partition.lower-boundoptional(none)IntegerThe smallest value of the first partition.scan.partition.upper-boundoptional(none)IntegerThe largest value of the last partition.scan.fetch-sizeoptional0IntegerThe number of rows that should be fetched from the database when reading per round trip. If the value specified is zero, then the hint is ignored.scan.auto-commitoptionaltrueBooleanSets the auto-commit flag on the JDBC driver, which determines whether each statement is committed in a transaction automatically. Some JDBC drivers, specifically Postgres, may require this to be set to false in order to stream results.lookup.cache.max-rowsoptional(none)IntegerThe max number of rows of lookup cache, over this value, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details.lookup.cache.ttloptional(none)DurationThe max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details.lookup.max-retriesoptional3IntegerThe max retry times if lookup database failed.sink.buffer-flush.max-rowsoptional100IntegerThe max size of buffered records before flush. Can be set to zero to disable it.sink.buffer-flush.intervaloptional1sDurationThe flush interval mills, over this time, asynchronous threads will flush data. Can be set to

'0'

to disable it. Note,

'sink.buffer-flush.max-rows'

can be set to

'0'

with the flush interval set allowing for complete async processing of buffered actions.sink.max-retriesoptional3IntegerThe max retry times if writing records to database failed.sink.parallelismoptional(none)IntegerDefines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Flink SQL Connetor特点

主键处理(Key handling)

Flink 在将数据写入外部数据库时使用 DDL 中定义的主键。 如果定义了主键,则连接器以 upsert 模式运行,否则,连接器以追加(append)模式运行。

在 upsert 模式下,Flink 会根据主键插入新行或更新现有行,Flink 可以通过这种方式保证幂等性。 为保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。 在 append 模式下,Flink 会将所有记录解释为 INSERT 消息,如果底层数据库发生主键或唯一约束违规,INSERT 操作可能会失败。

有关 PRIMARY KEY 语法的更多详细信息,请参阅 CREATE TABLE DDL。

分区扫描(Partitioned Scan )

为了加速并行 Source 任务实例中的数据读取,Flink 提供了 JDBC 表的分区扫描功能。

如果指定了以下所有扫描分区选项,则必须全部指定。 它们描述了从多个任务并行读取时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。 请注意,scan.partition.lower-bound 和 scan.partition.upper-bound 用于决定分区步长和过滤表中的行。 如果是批处理作业,也可以在提交 flink 作业之前先获取最大值和最小值。

  • scan.partition.column:用于对输入进行分区的列名。
  • scan.partition.num:分区数。
  • scan.partition.lower-bound:第一个分区的最小值。
  • scan.partition.upper-bound:最后一个分区的最大值。

查找缓存(Lookup Cache )

在流式计算中,维表(dim_dic)是一个很常见的概念,一般用于sql的join中,对流式数据进行数据补全,或在不复杂的模型中对事实表做轻量化的维度退化。

比如我们的source stream是来自日志的订单数据,但日志中我们只是记录了订单商品的id,却没有其他的附加信息(如sku、促销活动、优惠券信息等),但我们把订单数据存入实时数仓进行数据分析的时候,却需要同步获取sku名称、优惠券等其他的信息,这种问题我们可以在进行流处理的时候通过查询维表的方式对数据进行数据补全。

维表一般存储在外部存储中,如mysql、hbase(使用phoenix操作)、redis等等。

JDBC 连接器可以在时间连接中用作查找源(又名维度表)。目前,仅支持同步查找模式。

默认情况下,查找缓存未启用。您可以通过设置如下2个属性来启用它。

  • lookup.cache.max-rows
  • lookup.cache.ttl

查找缓存用于提高临时连接 JDBC 连接器的性能。默认情况下,查找缓存未启用,因此所有请求都发送到外部数据库。启用查找缓存后,每个进程(即 TaskManager)都会持有一个缓存。 Flink 会先查找缓存,只有在缓存缺失时才会向外部数据库发送请求,并根据返回的行更新缓存。当缓存达到最大缓存行

lookup.cache.max-rows

或行超过最大存活时间

lookup.cache.ttl 

时,缓存中最旧的行将过期。缓存的行可能不是最新的,

用户可以将 lookup.cache.ttl 调整为较小的值以获得更快的数据更新

,但

这可能会增加发送到数据库的请求数量

。所以这是

吞吐量和正确性之间的平衡

幂等写入(Idempotent Writes)

如果在 DDL 中定义了主键,则 JDBC 接收器将使用 upsert 语义而不是普通的 INSERT 语句。 Upsert 语义是指如果底层数据库中存在唯一约束违规,则原子地添加新行或更新现有行,这提供了幂等性。

如果出现故障,Flink 作业将从上一个成功的检查点恢复并重新处理,这可能导致恢复期间重新处理消息。 强烈建议使用 upsert 模式,因为如果需要重新处理记录,它有助于避免违反约束或重复数据。

除了故障恢复之外,随着时间的推移,源主题自然也可能包含多个具有相同主键的记录,这使得 upserts 是可取的。

由于 upsert 没有标准语法,下表描述了使用的特定于数据库的 DML。
DatabaseUpsert GrammarMySQLINSERT … ON DUPLICATE KEY UPDATE …PostgreSQLINSERT … ON CONFLICT … DO UPDATE SET …

数据类型映射

Flink 支持连接多个使用方言的数据库,如 MySQL、PostgresSQL、Derby。 Derby 数据库通常用于测试目的。 从关系数据库数据类型到 Flink SQL 数据类型的字段数据类型映射如下表所示,映射表可以帮助在 Flink 中轻松定义 JDBC 表。
MySQL 类型PostgreSQL 类型Flink SQL 类型

TINYINT
TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INTEGER
SERIAL
INT
BIGINT
INT UNSIGNED
BIGINT
BIGSERIAL
BIGINT
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT
BIGINT
BIGINT
FLOAT
REAL
FLOAT4
FLOAT
DOUBLE
DOUBLE PRECISION
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
TINYINT(1)
BOOLEAN
BOOLEAN
DATE
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIMEZONE]
TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)
VARCHAR(n)
TEXT
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BINARY
VARBINARY
BLOB
BYTEA
BYTES
ARRAY
ARRAY

与Hive集成

更详细信息参考 这里

集成说明

输入输出:

Source:Bounded

Sink:Batch & Streaming Append / Upsert Mode

LookupSource: SyncMode

Flink 与 Hive 的集成包含两个层面。

一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过 HiveCatalog 将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用 HiveCatalog 将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

二是利用 Flink 来读写 Hive 的表。HiveCatalog 的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 仓库。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

官方强烈建议用户使用 Blink planner 与 Hive 集成。

Flink 支持以下 Hive 版本:

1.0.x(1.0.0、1.0.1),1.1.x(1.1.0、1.1.1),1.2.x(1.2.0、1.2.1、1.2.2)

2.0.x(2.0.0、2.0.1),2.1.x(2.1.0、2.1.1),2.2.0,2.3.x(2.3.0、2.3.1、2.3.2、2.3.4、2.3.5、2.3.6)

3.1.x(3.1.0、3.1.1、3.1.2)

某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:

  • Hive 内置函数在使用 Hive-1.2.0 及更高版本时支持。
  • Column 约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
  • 更改表的统计信息,在使用 Hive-1.2.0 及更高版本时支持。
  • DATE 列统计信息,在使用 Hive-1.2.0 及更高版时支持。
  • 使用 Hive-2.0.x 版本时不支持写入 ORC 表。

依赖项

与 Hive 集成,需要在 Flink 下的 /lib/ 目录中添加一些额外的依赖包,以便通过 Table API 或 SQL Client 与 Hive 进行交互。或者,可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的 -C 或 -l 选项将它们添加到 classpath 中。

Apache Hive 是基于 Hadoop 之上构建的,需要设置 Hadoop 环境变量

exportHADOOP_CLASSPATH=`hadoop classpath`

有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar 包。可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包(如果使用的 Hive 版本没有在下方列出,则这种方法会更适合)。

建议优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足需求时,再考虑使用分开添加 jar 包的方式。

使用 Flink 提供的 Hive jar
下表列出了所有可用的 Hive jar。可以选择一个并放在 Flink 的 /lib/ 目录中。
Metastore version****Maven dependency1.0.0 – 1.2.2flink-sql-connector-hive-1.2.22.0.0 – 2.2.0flink-sql-connector-hive-2.2.02.3.0 – 2.3.6flink-sql-connector-hive-2.3.63.0.0 – 3.1.2flink-sql-connector-hive-3.1.2

Maven 依赖

构建应用程序,则需要在 mvn 文件中添加以下依赖项。 应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们(scope: provide)。

<!-- Flink Dependency --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive{{ site.scala_version_suffix }}</artifactId><version>{{site.version}}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge{{ site.scala_version_suffix }}</artifactId><version>{{site.version}}</version><scope>provided</scope></dependency><!-- Hive Dependency --><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version><scope>provided</scope></dependency>

连接到Hive

通过 TableEnvironment 或者 YAML 配置,连接到现有的 Hive 集群。

EnvironmentSettings settings =EnvironmentSettings.newInstance().useBlinkPlanner().build();TableEnvironment tableEnv =TableEnvironment.create(settings);String name            ="myhive";String defaultDatabase ="mydatabase";String hiveConfDir     ="/opt/hive-conf";HiveCatalog hive =newHiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");

Hive Catalog

在 Hadoop 生态系统中,Hive Metastore 多年来已经演变成事实上的元数据中心。

对于同时使用 Hive 和 Flink 的用户,HiveCatalog 允许使用 Hive Metastore 管理 Flink 的元数据。

对于只适用 Flink 的用户,HiveCatalog 是 Flink 唯一支持的开箱即用的持久化 Catalog。

如果没有持久化 Catalog,用户在创建 Kafka Table 时需要重复在每个应用中使用 Flink Create SQL 配置元数据。HiveCatalog 实现用户只创建一次表和元对象,并在以后的会话中方便地引用和管理它们。

使用 HiveCatalog
HiveCatalog可用于处理两种类型的表:Hive兼容表和通用表。 就存储层中的元数据和数据而言,兼容 Hive 的表是以兼容 Hive 的方式存储的表。 因此,通过 Flink 创建的 Hive 兼容表可以从 Hive 端查询。

另一方面,通用表特定于 Flink。 使用 HiveCatalog 创建通用表时,只是使用 HMS 来保留元数据。虽然这些表格对 Hive 可见,但 Hive 不太可能能够理解元数据。因此,在 Hive 中使用此类表会导致未定义的行为。

下面将通过一个简单的例子,演示将 Kafa 作为数据源,并将元数据保存到 Hive metastore 中,使用 Flink SQL 直接读取 Kafka。

step1. 确保 Hive Metastore 可用

安装 Hive 环境,设置 Hive 的 Metastore 配置(hive-site.xml 文件)。使用 Hive CLI 测试

hive>showdatabases;
OK
defaultTime taken: 0.032 seconds, Fetched: 1row(s)

hive>showtables;
OK
Time taken: 0.028 seconds, Fetched: 0row(s)

step2. 配置 Flink 集群和 SQL CLI

将 Hive 依赖 jar 添加到 Flink 安装目录下的 /lib 目录中。修改 SQL CLI 的 YARM 配置(…/conf/sql-cli-defaults.yaml)

execution:planner: blink
    type: streaming
    ...current-catalog: myhive  # set the HiveCatalog as the current catalog of the sessioncurrent-database: mydatabase

catalogs:-name: myhive
     type: hive
     hive-conf-dir: /opt/hive-conf  # contains hive-site.xml

step3. 确保 Kafka 集群可用

step4. 启动 SQL Client,使用 Flink SQL DDL 创建 kafka table

Flink SQL>CREATETABLE mykafka (name String, age Int)WITH('connector.type'='kafka','connector.version'='universal','connector.topic'='test','connector.properties.bootstrap.servers'='localhost:9092','format.type'='csv','update-mode'='append');[INFO]Table has been created.

Flink SQL>DESCRIBE mykafka;
root
 |-- name: STRING|-- age: INT

使用 Hive CLI 测试是否可以使用该表

hive>showtables;
OK
mykafka
Time taken: 0.038 seconds, Fetched: 1row(s)

hive>describe formatted mykafka;
OK
# col_name              data_type               comment...# Detailed Table InformationDatabase:               defaultTableType:             MANAGED_TABLE
Table Parameters:
    flink.connector.properties.bootstrap.servers    localhost:9092
    flink.connector.topic   test
    flink.connector.type    kafka
    flink.connector.version universal
    flink.format.type       csv
    flink.generic.table.schema.0.data-typeVARCHAR(2147483647)
    flink.generic.table.schema.0.name   name
    flink.generic.table.schema.1.data-typeINT
    flink.generic.table.schema.1.name   age
    flink.update-mode       append
    is_generic              true

step5. 运行 Flink SQL 查询该表

Flink SQL>select*from mykafka;SQL Query Result (Table)
 Refresh: 1 s    Page: Lastof1     

        name                       age
         tom                        15
        john                        21
       kitty                        30
         amy                        24
       kaiky                       18

Hive Dialect

从 1.11.0 开始,在使用 Hive Dialect 时,Flink 允许用户用 Hive 语法来编写 SQL 语句。旨在改善与 Hive 的互操作性,并减少在 Flink 和 Hive 之间切换来执行不同语句的情况。

使用 Hive Dialect
Flink 目前支持两种 SQL Dialect:default 和 hive。需要先切换到 Hive 方言,然后才能使用 Hive 语法编写。下面介绍如何使用 SQL 客户端和 Table API 设置方言。可以为执行的每个语句动态切换方言。无需重新启动会话即可使用其他方言。

SQL Client
以通过 table.sql-dialect 属性指定。修改 SQL CLI 的 YARM 配置(…/conf/sql-cli-defaults.yaml)

execution:planner: blink
  type: batch
  result-mode: table

configuration:table.sql-dialect: hive

可以在 SQL 客户端启动后设置方言。

Flink SQL>settable.sql-dialect=hive;-- to use hive dialect[INFO]Session property has been set.

Flink SQL>settable.sql-dialect=default;-- to use default dialect[INFO]Session property has been set.

Table API

EnvironmentSettings settings =EnvironmentSettings.newInstance().useBlinkPlanner()...build();TableEnvironment tableEnv =TableEnvironment.create(settings);// to use hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);// to use default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

注意事项

以下是使用 Hive 方言的一些注意事项:

  • Hive 方言只能用于操作 Hive 对象,并要求当前 Catalog 是一个 HiveCatalog。
  • Hive 方言只支持 db.table 这种两级的标识符,不支持带有 Catalog 名字的标识符。
  • 虽然所有 Hive 版本支持相同的语法,但是一些特定的功能是否可用仍取决于使用的 Hive 版本。
  • 执行 DML 和 DQL 时应该使用 HiveModule 。

Hive Read & Write

使用 HiveCatalog,Flink 可以统一 Hive 表的批处理和流处理。

Reading

Flink 支持以批处理和流处理模式从 Hive 读取数据。当作为批处理应用程序运行时,Flink 会处理的执行查询时的表数据。流式读取将持续监视表的更新,在新数据可用时以增量方式即时获取。默认情况,Flink 视为读取有界的表。

流式读取支持使用分区和非分区表。对于分区表,Flink 将监视新分区的生成,并在可用时以增量方式读取。对于非分区表,Flink 将监视文件夹中新文件的生成,并以增量方式读取新文件。

使用 SQL Hints 可以应用以下配置(无需修改 Hive Metastore 中的定义)

SELECT*FROM hive_table 
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

KeyDefaultType****Descriptionstreaming-source.enablefalseBoolean是否开启流式读取 Hive 数据。默认是批处理模式。streaming-source.partition.includeall可选值[‘all’, ‘latest’]设置要读取的分区,默认读所有分区。 可选项:latest:只读取最新分区数据all:读取全量分区数据latest 只能用在 temporal join 中,用于读取最新分区作为维表。streaming-source.monitor-intervalNoneDuration监听新分区生成的时间。 Hive streaming reading 默认间隔是1分钟。Hive streaming temporal join 默认间隔是1小时。 目前的实现是每个 TaskManager 都会查询 Hive Metastore,高频的查询可能会对 Hive Metastore 产生过大的压力。streaming-source.partition-orderpartition-name可选值[‘partition-name’, ‘create-time’, ‘partition-time’]加载分区顺序 可选项:partition-name:使用默认分区名称顺序加载最新分区create-time:使用分区文件创建时间顺序partition-time:使用分区时间顺序streaming-source.consume-start-offsetNoneString流式读取 Hive 表的起始偏移量。 取决于设置的分区顺序对于 create-time 和 partition-time,应该是时间字符串(yyyy-[m]m-[d]d [hh:mm:ss])对于 partition-time,应该是分区名称字符串(例如pt_year=2020/pt_mon=10/pt_day=01)
需要注意的

  • 监视策略会扫描当前位置路径中的所有目录/文件。太多的分区可能会导致性能下降。
  • 非分区表的流式读取要求将每个文件的生产者以原子方式写入目标目录。
  • 对分区表的流式读取要求每个分区都应以原子方式添加到 Hive Metastore 视图中。
  • 流读取不支持 Flink DDL 中的水印语法。这些表不能使用窗口运算。

Reading Hive Views

Flink 可以从 Hive 定义的视图中读取,有以下限制:

  • 1、在查询视图之前,必须将 Hive catalog 设置为当前目录。可以通过 Table API 的 tableEnv.useCatalog() 或 SQL 语法 USE CATALOG 来完成。
  • 2、Hive 和 Flink 有不同的 SQL 语法。确保视图的查询与 Flink 语法兼容。

Vectorized Optimization upon Read

当满足以下条件时,Flink 将自动使用列式读取优化:

  • Table Format:ORC 或 Parquet
  • Column 没有复杂字段(List、Map、Struct、Union)

默认情况下是开启的,可以通过下面的配置关闭

table.exec.hive.fallback-mapred-reader=true

Source Parallelism Inference

默认情况下,Flink 将根据文件数和每个文件中的块数推断出 Hive Reader 的最佳并行度。

Flink 也支持灵活地配置并行推理策略。可以在 TableConfig 中配置以下参数(请注意,这些参数会影响 Job 的所有 Source):
KeyDefaultType****Descriptiontable.exec.hive.infer-source-parallelismtrueBoolean如果为 true,根据文件数和文件块推断并行度。如果为 false,则由 config 设置源的并行度。table.exec.hive.infer-source-parallelism.max1000Integer设置 Source 的最大推断并行度。

Temporal Table Join

可以将 Hive 表用作时态表(Temporal Table),流可以通过时态联接(Temporal Join)关联 Hive 表。

Flink 支持处理时间时态连接 Hive 表,处理时间时态连接总是关联最新版本的时态表。

Flink 支持 Hive 分区表和非分区表的时态连接,对于分区表,Flink 支持自动跟踪 Hive 表的最新分区。

Flink 还不支持事件时间临时连接 Hive 表。

Temporal Join The Latest Partition

对于一个随时间变化的分区表,可以把读作一个无界流,每个分区可以作为时态表的一个版本(如果每个分区都包含一个版本的完整数据)。表的最新版本保留 Hive 表的分区数据。

Flink 支持在处理时间时态连接时自动跟踪时态表的最新分区(版本),最新分区(版本)由 streaming source.partition-order 选项定义。

此功能仅在 Flink 流处理模式下支持。

下面代码演示了一个经典的业务处理,维度表来自 Hive,每天由批处理作业更新一次,kafka 流来自实时在线业务数据或日志,需要与维度表连接以丰富流。

假设 Hive 表中的数据每天更新,每天都包含最新的完整维度数据。

SETtable.sql-dialect=hive;CREATETABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10,4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,...) PARTITIONED BY(pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (-- 使用默认分区名顺序(partition-name),每12小时加载一次最新的分区(推荐用法)'streaming-source.enable'='true','streaming-source.partition.include'='latest','streaming-source.monitor-interval'='12 h','streaming-source.partition-order'='partition-name',-- 使用分区文件创建时间顺序,每12小时加载一次最新的分区'streaming-source.enable'='true','streaming-source.partition.include'='latest','streaming-source.partition-order'='create-time','streaming-source.monitor-interval'='12 h'-- 使用分区时间顺序,每12小时加载一次最新的分区'streaming-source.enable'='true','streaming-source.partition.include'='latest','streaming-source.monitor-interval'='12 h','streaming-source.partition-order'='partition-time','partition.time-extractor.kind'='default','partition.time-extractor.timestamp-pattern'='$pt_year-$pt_month-$pt_day 00:00:00');

创建 Kafka Table

SETtable.sql-dialect=default;CREATETABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME())WITH(...);

实现 Temporal Join 查询丰富流

SELECT*FROM orders_table AS o 
JOIN dimension_table FOR SYSTEM_TIME ASOF o.proctime AS dim
ON o.product_id = dim.product_id;

Temporal Join The Latest Table

对于 Hive 表,可以把读作一个有界流。在这种情况下,Hive 表只能在查询时跟踪其最新版本。表的最新版本保留 Hive 表的所有数据。

在对最新的 Hive 表执行时态联接时,Hive 表将被缓存在 Slot 内存中,并且流中的每条记录都与表联接进行匹配项。可以使用以下属性配置 Hive 表缓存的 TTL。缓存过期后,将再次扫描 Hive 表以加载最新数据。
KeyDefaultType****Descriptionlookup.join.cache.ttl60 minDuration缓存生存时间,默认60分钟。 配置只有再查找有界 Hive Table 时有效。
下面的演示将 hive 表的所有数据作为时态表加载。

-- 假设 Hive 表中的数据被批处理以 overwrite 的形式生成。SETtable.sql-dialect=hive;CREATETABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10,4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,...) TBLPROPERTIES ('streaming-source.enable'='false','streaming-source.partition.include'='all','lookup.join.cache.ttl'='12 h');SELECT*FROM orders_table AS o 
JOIN dimension_table FOR SYSTEM_TIME ASOF o.proctime AS dim
ON o.product_id = dim.product_id;

需要注意:

  • 每个 join 子任务都需要保留一份缓存。确保 Hive 表数据可以放入 TM Slot 的内存中。
  • 建议为 streaming source.monitor-interval 或 lookup.join.cache.ttl 设置一个相对较大的值。否则,表过于频繁地更新和重新加载,影响性能。
  • 一旦缓存需要刷新,就会重新加载整个 Hive 表。没有办法区分新旧数据。

Writing

Flink 支持以批处理和流处理模式向 Hive 写入数据。

当作为批处理运行时,Flink 将只在作业完成时才向 Hive 表写入这些记录。既支持 Append,也支持 Overwrite。

# ------ Append 追加数据
Flink SQL>INSERTINTO mytable SELECT'Tom',25;# ------ Overwrite 覆盖数据
Flink SQL>INSERT OVERWRITE mytable SELECT'Tom',25;

数据也可以插入到特定的分区中。

# ------ 插入静态分区
Flink SQL>INSERT OVERWRITE myparttable PARTITION(my_type='type_1', my_date='2019-08-08')SELECT'Tom',25;# ------ 插入动态分区
Flink SQL>INSERT OVERWRITE myparttable 
SELECT'Tom',25,'type_1','2019-08-08';# ------
Flink SQL>INSERT OVERWRITE myparttable PARTITION(my_type='type_1')SELECT'Tom',25,'2019-08-08';

流式写入不断以增量方式向 Hive 添加新数据,提交使其可见。用户通过多个属性控制何时/如何触发提交。流式写入不支持覆盖插入。

下面的示例演示将 Kafka 中的数据写入有分区的 Hive 表,并运行批处理查询将数据读出。

SETtable.sql-dialect=hive;CREATETABLE hive_table (
  user_id STRING,
  order_amount DOUBLE) PARTITIONED BY(dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');SETtable.sql-dialect=default;CREATETABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),-- Watermark 定义在 TIMESTAMP 类型字段上
  WATERMARK FOR log_ts AS log_ts -INTERVAL'5'SECOND)WITH(...);-- 查询 kafka 插入 HiveINSERTINTOTABLE hive_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts,'yyyy-MM-dd'), DATE_FORMAT(log_ts,'HH')FROM kafka_table;-- 查询 Hive tableSELECT*FROM hive_table WHERE dt='2020-05-20'and hr='12';

如果 Watermark 定义在 TIMESTAMP_LTZ 类型字段上,sink.partition-commit.watermark-time-zone 需要配置时区(如果使用 partition-time 作为提交策略)

SETtable.sql-dialect=hive;CREATETABLE hive_table (
  user_id STRING,
  order_amount DOUBLE) PARTITIONED BY(dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h',-- 配置时区'sink.partition-commit.watermark-time-zone'='Asia/Shanghai',-'sink.partition-commit.policy.kind'='metastore,success-file');SETtable.sql-dialect=default;CREATETABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  ts BIGINT, 
  ts_ltz AS TO_TIMESTAMP_LTZ(ts,3),
  WATERMARK FOR ts_ltz AS ts_ltz -INTERVAL'5'SECOND)WITH(...);

Formats

Flink 支持 Hive 使用以下数据格式:Text、CSV、SequenceFile、ORC、Parquet

Hive Functions

Built-in Function

HiveModule 提供了 Hive 可以使用的内置函数

tableEnv.loadModue("myhive",newHiveModule("2.3.4"));

User Defined Function

用户可以在 Flink 中使用 Hive UDF,支持的 UDF 类型包括:

  • UDF
  • Generic UDF
  • Generic UDTF UDAF
  • GenericUDAFResolver2

Hive 的 UDF 和 GenericUDF 自动转换为 Flink 的 ScalarFunction,Hive 的 GenericUDTF 自动转换为 Flink 的 TableFunction,Hive 的 UDAF 和 GenericUDAFResolver2 转换为 Flink 的 AggregateFunction。

要使用 Hive UDF,用户必须:

  • 设置 HiveCatalog 包含该函数作为会话的当前目录
  • 在 Flink 的类路径中包含包含该函数的jar
  • 使用 Blink Planner

创建一个 UDF,使用时注册名 “myudf”

publicclassTestHiveSimpleUDFextendsUDF{publicIntWritableevaluate(IntWritable i){returnnewIntWritable(i.get());}publicTextevaluate(Text text){returnnewText(text.toString());}}

创建一个 Generic UDF,使用时注册名 “mygenericudf”

publicclassTestHiveGenericUDFextendsGenericUDF{@OverridepublicObjectInspectorinitialize(ObjectInspector[] arguments)throwsUDFArgumentException{// ...}@OverridepublicObjectevaluate(DeferredObject[] arguments)throwsHiveException{return arguments[0].get();}@OverridepublicStringgetDisplayString(String[] children){return"TestHiveGenericUDF";}}

创建一个 Generic UDTF,使用时注册名 “mygenericudtf”

public class TestHiveUDTF extends GenericUDTF {

    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
             // ...
    }

    @Override
    public void process(Object[] args) throws HiveException {
        String str = (String) args[0];
        for (String s : str.split(",")) {
            forward(s);
            forward(s);
        }
    }

    @Override
    public void close() {
    }
}

在 Hive CLI 中查看

hive>show functions;
OK
mygenericudf
myudf
myudtf

在 Flink SQL 中使用

select 
    mygenericudf(myudf(name),1)as a, 
    mygenericudf(myudf(age),1)as b, 
    s 
from mysourcetable, 
lateral table(myudtf(name,1))as T(s);

参考列表:

  • JDBC SQL Connector
  • [详解flink中Look up维表的使用](详解flink中Look up维表的使用)
标签: flink sql 数据库

本文转载自: https://blog.csdn.net/liuwei0376/article/details/124922603
版权归原作者 江畔独步 所有, 如有侵权,请联系我们删除。

“Flink中的JDBC SQL Connector”的评论:

还没有评论