使用FlinkSql进行实时工作流开发
引言
在大数据时代,实时数据分析和处理变得越来越重要。Apache Flink,作为流处理领域的佼佼者,提供了一套强大的工具集来处理无界和有界数据流。其中,Flink SQL是其生态系统中一个重要的组成部分,允许用户以SQL语句的形式执行复杂的数据流操作,极大地简化了实时数据处理的开发流程。
什么是Apache Flink?
Apache Flink是一个开源框架,用于处理无边界(无尽)和有边界(有限)数据流。它提供了低延迟、高吞吐量和状态一致性,使开发者能够构建复杂的实时应用和微服务。Flink的核心是流处理引擎,它支持事件时间处理、窗口操作以及精确一次的状态一致性。
为什么选择Flink SQL?
易用性:Flink SQL使得非专业程序员也能快速上手,使用熟悉的SQL语法进行实时数据查询和处理。
灵活性:可以无缝地将SQL与Java/Scala API结合使用,为用户提供多种编程模型的选择。
性能:利用Flink的高性能流处理引擎,Flink SQL能够实现实时响应和低延迟处理。
集成能力:支持多种数据源和数据接收器,如Kafka、JDBC、HDFS等,易于集成到现有的数据生态系统中。
Flink SQL实战
常用的Connector
在配置FlinkSQL实时开发时,使用mysql-cdc、Kafka、jdbc和rabbitmq作为连接器是一个很常见的场景。以下是详细的配置说明,你可以基于这些信息来撰写你的博客:
1. MySQL-CDC 连接器配置
MySQL-CDC(Change Data Capture)连接器用于捕获MySQL数据库中的变更数据。配置示例如下:
CREATETABLE mysql_table (-- 定义表结构
id INT,
name STRING,-- 其他列)WITH('connector'='mysql-cdc',-- 使用mysql-cdc连接器'hostname'='mysql-host',-- MySQL服务器主机名'port'='3306',-- MySQL端口号'username'='user',-- MySQL用户名'password'='password',-- MySQL密码'database-name'='db',-- 数据库名'table-name'='table'-- 表名'server-time-zone'='GMT+8',-- 服务器时区'debezium.snapshot.mode'='initial',-- 初始快照模式,initial表示从头开始读取所有数据;latest-offset表示从最近的偏移量开始读取;timestamp则可以指定一个时间戳,从该时间戳之后的数据开始读取。'scan.incremental.snapshot.enabled'='true'-- 可选,设置为true时,Flink会尝试维护一个数据库表的增量快照。这意味着Flink不会每次都重新读取整个表,而是只读取自上次读取以来发生变化的数据。这样可以显著提高读取效率,尤其是在处理大量数据且频繁更新的场景下。'scan.incremental.snapshot.chunk.size'='1024'-- 可选, 增量快照块大小'debezium.snapshot.locking.mode'='none',-- 可选,控制在快照阶段锁定表的方式,以防止数据冲突。none表示不锁定,lock-tables表示锁定整个表,transaction表示使用事务来锁定。'debezium.properties.include-schema-changes'='true',-- 可选,如果设置为true,则在CDC事件中会包含模式变更信息。'debezium.properties.table.whitelist'='mydatabase.mytable',-- 可选,指定要监控的表的白名单。如果table-name未设置,可以通过这个属性来指定。'debezium.properties.database.history'='io.debezium.relational.history.FileDatabaseHistory'-- 可选,设置数据库历史记录的实现类,通常使用FileDatabaseHistory来保存历史记录,以便在重启后能恢复状态。);
2. Kafka 连接器配置
Kafka连接器用于读写Kafka主题中的数据。配置示例如下:
CREATETABLE kafka_table (-- 定义表结构
id INT,
name STRING,-- 其他列)WITH('connector'='kafka',-- 使用kafka连接器'topic'='topic_name',-- Kafka主题名'properties.bootstrap.servers'='kafka-broker:9092',-- Kafka服务器地址'format'='json'-- 数据格式,例如json'properties.group.id'='flink-consumer-group',-- 消费者组ID'scan.startup.mode'='earliest-offset',-- 启动模式(earliest-offset, latest-offset, specific-offset, timestamp)'format'='json',-- 数据格式'json.fail-on-missing-field'='false',-- 是否在字段缺失时失败'json.ignore-parse-errors'='true',-- 是否忽略解析错误'properties.security.protocol'='SASL_SSL',-- 安全协议(可选)'properties.sasl.mechanism'='PLAIN',-- SASL机制(可选)'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";'-- SASL配置(可选));
3. JDBC 连接器配置
JDBC连接器用于与其他关系型数据库进行交互。配置示例如下:
CREATETABLE jdbc_table (-- 定义表结构
id INT,
name STRING,-- 其他列)WITH('connector'='jdbc',-- 使用jdbc连接器'url'='jdbc:mysql://mysql-host:3306/db',-- JDBC连接URL'table-name'='table_name',-- 数据库表名'username'='user',-- 数据库用户名'password'='password'-- 数据库密码'driver'='com.mysql.cj.jdbc.Driver',-- JDBC驱动类'lookup.cache.max-rows'='5000',-- 可选,查找缓存的最大行数'lookup.cache.ttl'='10min',-- 可选,查找缓存的TTL(时间到期)'lookup.max-retries'='3',-- 可选,查找的最大重试次数'sink.buffer-flush.max-rows'='1000',-- 可选,缓冲区刷新最大行数'sink.buffer-flush.interval'='2s'-- 可选,缓冲区刷新间隔);
4. RabbitMQ 连接器配置
RabbitMQ连接器用于与RabbitMQ消息队列进行交互。配置示例如下:
CREATETABLE rabbitmq_table (-- 定义表结构
id INT,
name STRING,-- 其他列)WITH('connector'='rabbitmq',-- 使用rabbitmq连接器'host'='rabbitmq-host',-- RabbitMQ主机名'port'='5672',-- RabbitMQ端口号'username'='user',-- RabbitMQ用户名'password'='password',-- RabbitMQ密码'queue'='queue_name',-- RabbitMQ队列名'exchange'='exchange_name'-- RabbitMQ交换机名'routing-key'='routing_key',-- 路由键'delivery-mode'='2',-- 投递模式(2表示持久)'format'='json',-- 数据格式'json.fail-on-missing-field'='false',-- 是否在字段缺失时失败'json.ignore-parse-errors'='true'-- 是否忽略解析错误);
5. REST Lookup 连接器配置
REST Lookup 连接器允许在 SQL 查询过程中,通过 REST API 进行查找操作。
CREATETABLE rest_table (
id INT,
name STRING,
price DECIMAL(10,2),PRIMARYKEY(id)NOT ENFORCED
)WITH('connector'='rest-lookup','url'='http://api.example.com/user/{id}',-- REST API URL,使用占位符 {product_id}'lookup-method'='POST'-- 'GET' 或 'POST''format'='json',-- 数据格式'asyncPolling'='false'-- 可选,指定查找操作是否使用异步轮询模式。默认值为 'false'。当设置为 'true' 时,查找操作会以异步方式执行,有助于提高性能。'gid.connector.http.source.lookup.header.Content-Type'='application/json'-- 可选,设置 Content-Type 请求头。用于指定请求体的媒体类型。例如,设置为 application/json 表示请求体是 JSON 格式。'gid.connector.http.source.lookup.header.Origin'='*'-- 可选,设置 Origin 请求头。通常用于跨域请求。'gid.connector.http.source.lookup.header.X-Content-Type-Options'='nosniff'-- 可选,设置 X-Content-Type-Options 请求头。用于防止 MIME 类型混淆攻击。'json.fail-on-missing-field'='false',-- 可选,是否在字段缺失时失败'json.ignore-parse-errors'='true'-- 可选,是否忽略解析错误'lookup.cache.max-rows'='5000',-- 可选,查找缓存的最大行数'lookup.cache.ttl'='10min',-- 可选,查找缓存的TTL(时间到期)'lookup.max-retries'='3'-- 可选,查找的最大重试次数);
6. HDFS 连接器配置
HDFS connector用于读取或写入Hadoop分布式文件系统中的数据。
创建HDFS Source
CREATETABLE hdfsSource (
line STRING
)WITH('connector'='filesystem','path'='hdfs://localhost:9000/data/input',-- HDFS上的路径。'format'='csv'-- 文件格式。);
创建HDFS Sink
CREATETABLE hdfsSink (
line STRING
)WITH('connector'='filesystem','path'='hdfs://localhost:9000/data/output','format'='csv');
FlinkSql数据类型
在FlinkSQL中,数据类型的选择和定义是非常重要的,因为它们直接影响数据的存储和处理方式。FlinkSQL提供了多种数据类型,可以满足各种业务需求。以下是FlinkSQL中的常见数据类型及其详细介绍:
1. 基本数据类型
- BOOLEAN: 布尔类型,表示
TRUE
或FALSE
。CREATETABLE example_table ( is_active BOOLEAN);
- TINYINT: 8位带符号整数,范围是
-128
到127
。CREATETABLE example_table ( tiny_value TINYINT);
- SMALLINT: 16位带符号整数,范围是
-32768
到32767
。CREATETABLE example_table ( small_value SMALLINT);
- INT: 32位带符号整数,范围是
-2147483648
到2147483647
。CREATETABLE example_table ( int_value INT);
- BIGINT: 64位带符号整数,范围是
-9223372036854775808
到9223372036854775807
。CREATETABLE example_table ( big_value BIGINT);
- FLOAT: 单精度浮点数。
CREATETABLE example_table ( float_value FLOAT);
- DOUBLE: 双精度浮点数。
CREATETABLE example_table ( double_value DOUBLE);
- DECIMAL(p, s): 精确数值类型,
p
表示总精度,s
表示小数位数。CREATETABLE example_table ( decimal_value DECIMAL(10,2));
2. 字符串数据类型
- CHAR(n): 定长字符串,
n
表示字符串的长度。CREATETABLE example_table ( char_value CHAR(10));
- VARCHAR(n): 可变长字符串,
n
表示最大长度。CREATETABLE example_table ( varchar_value VARCHAR(255));
- STRING: 可变长字符串,无长度限制。
CREATETABLE example_table ( string_value STRING);
3. 日期和时间数据类型
- DATE: 日期类型,格式为
YYYY-MM-DD
。CREATETABLE example_table ( date_value DATE);
- TIME§: 时间类型,格式为
HH:MM:SS
,p
表示秒的小数位精度。CREATETABLE example_table ( time_value TIME(3));
- TIMESTAMP§: 时间戳类型,格式为
YYYY-MM-DD HH:MM:SS.sss
,p
表示秒的小数位精度。CREATETABLE example_table ( timestamp_value TIMESTAMP(3));
- TIMESTAMP§ WITH LOCAL TIME ZONE: 带有本地时区的时间戳类型。
CREATETABLE example_table ( local_timestamp_value TIMESTAMP(3)WITHLOCALTIME ZONE);
4. 复杂数据类型
- ARRAY: 数组类型,
T
表示数组中的元素类型。CREATETABLE example_table ( array_value ARRAY<INT>);
- MAP<K, V>: 键值对映射类型,
K
表示键的类型,V
表示值的类型。CREATETABLE example_table ( map_value MAP<STRING,INT>);
- ROW<…>: 行类型,可以包含多个字段,每个字段可以有不同的类型。
CREATETABLE example_table ( row_value ROW<name STRING, age INT>);
5. 特殊数据类型
- BINARY(n): 定长字节数组,
n
表示长度。CREATETABLE example_table ( binary_value BINARY(10));
- VARBINARY(n): 可变长字节数组,
n
表示最大长度。CREATETABLE example_table ( varbinary_value VARBINARY(255));
数据类型的使用示例
以下是一个包含各种数据类型的表的定义示例:
CREATETABLE example_table (
id INT,
name STRING,
is_active BOOLEAN,
salary DECIMAL(10,2),
birth_date DATE,
join_time TIMESTAMP(3),
preferences ARRAY<STRING>,
attributes MAP<STRING, STRING>,
address ROW<street STRING, city STRING, zip INT>);
为了详细介绍 Flink SQL 中所有内置函数,以下是它们的分类、功能描述以及使用案例:
1. 数学函数
- ABS(x)- 描述:返回 x 的绝对值。- 示例:
SELECT ABS(-5);
返回 5。 - CEIL(x)- 描述:返回不小于 x 的最小整数。- 示例:
SELECT CEIL(4.3);
返回 5。 - FLOOR(x)- 描述:返回不大于 x 的最大整数。- 示例:
SELECT FLOOR(4.7);
返回 4。 - EXP(x)- 描述:返回 e 的 x 次方。- 示例:
SELECT EXP(1);
返回 2.71828。 - LOG(x)- 描述:返回 x 的自然对数。- 示例:
SELECT LOG(2.71828);
返回 1。 - LOG2(x)- 描述:返回 x 以 2 为底的对数。- 示例:
SELECT LOG2(8);
返回 3。 - LOG10(x)- 描述:返回 x 以 10 为底的对数。- 示例:
SELECT LOG10(100);
返回 2。 - POWER(x, y)- 描述:返回 x 的 y 次方。- 示例:
SELECT POWER(2, 3);
返回 8。 - SQRT(x)- 描述:返回 x 的平方根。- 示例:
SELECT SQRT(16);
返回 4。
2. 字符串函数
- LENGTH(str)- 描述:返回字符串 str 的长度。- 示例:
SELECT LENGTH('Flink');
返回 5。 - UPPER(str)- 描述:将字符串转为大写。- 示例:
SELECT UPPER('flink');
返回 ‘FLINK’。 - LOWER(str)- 描述:将字符串转为小写。- 示例:
SELECT LOWER('FLINK');
返回 ‘flink’。 - SUBSTRING(str, pos, len)- 描述:返回从字符串 str 的 pos 位置开始长度为 len 的子字符串。- 示例:
SELECT SUBSTRING('Flink', 1, 2);
返回 ‘Fl’。 - CONCAT(str1, str2, …)- 描述:将多个字符串连接成一个字符串。- 示例:
SELECT CONCAT('Hello', ' ', 'World');
返回 ‘Hello World’。 - TRIM(str)- 描述:去除字符串两端的空白字符。- 示例:
SELECT TRIM(' Flink ');
返回 ‘Flink’。
3. 日期时间函数
- CURRENT_TIMESTAMP()- 描述:返回当前的时间戳。- 示例:
SELECT CURRENT_TIMESTAMP();
返回类似 ‘2024-08-07 12:34:56.789’。 - CURRENT_DATE()- 描述:返回当前的日期。- 示例:
SELECT CURRENT_DATE();
返回类似 ‘2024-08-07’。 - DATE_FORMAT(timestamp, format)- 描述:格式化日期时间戳为指定格式的字符串。- 示例:
SELECT DATE_FORMAT(TIMESTAMP '2024-08-07 12:34:56', 'yyyy-MM-dd HH:mm:ss');
返回 ‘2024-08-07 12:34:56’。 - TIMESTAMPDIFF(unit, ts1, ts2)- 描述:返回两个时间戳之间的差值,单位可以是 DAY、HOUR、MINUTE 等。- 示例:
SELECT TIMESTAMPDIFF(DAY, TIMESTAMP '2024-08-01 00:00:00', TIMESTAMP '2024-08-07 00:00:00');
返回 6。
4. 聚合函数
- COUNT(expr)- 描述:计算符合条件的行数。- 示例:
SELECT COUNT(*) FROM table;
返回表中的行数。 - SUM(expr)- 描述:计算 expr 的总和。- 示例:
SELECT SUM(salary) FROM employees;
返回员工薪资的总和。 - AVG(expr)- 描述:计算 expr 的平均值。- 示例:
SELECT AVG(salary) FROM employees;
返回员工薪资的平均值。 - MIN(expr)- 描述:返回 expr 的最小值。- 示例:
SELECT MIN(salary) FROM employees;
返回员工薪资的最小值。 - MAX(expr)- 描述:返回 expr 的最大值。- 示例:
SELECT MAX(salary) FROM employees;
返回员工薪资的最大值。
5. 条件函数
- CASE WHEN condition THEN result [WHEN …] [ELSE result] END- 描述:类似于编程语言中的条件语句,根据条件返回不同的结果。- 示例:
SELECT CASE WHEN salary > 5000 THEN 'High' ELSE 'Low' END FROM employees;
根据薪资返回 ‘High’ 或 ‘Low’。
除了上述基本的数学、字符串、日期时间和聚合函数外,Flink SQL 还提供了一些实际使用中的实用功能,这些功能在数据处理和分析中非常有用。以下是一些实用功能的介绍和示例:
高阶函数
1. 窗口函数
窗口函数允许用户在指定的窗口范围内进行计算,如滑动窗口、滚动窗口和会话窗口。以下是几种常见的窗口函数:
- TUMBLE- 描述:创建一个固定大小的滚动窗口。- 示例:
SELECT TUMBLE_START(ts,INTERVAL'10'MINUTE)AS window_start,COUNT(*)AS cntFROMtableGROUPBY TUMBLE(ts,INTERVAL'10'MINUTE);
- HOP- 描述:创建一个滑动窗口。- 示例:
SELECT HOP_START(ts,INTERVAL'5'MINUTE,INTERVAL'10'MINUTE)AS window_start,COUNT(*)AS cntFROMtableGROUPBY HOP(ts,INTERVAL'5'MINUTE,INTERVAL'10'MINUTE);
- SESSION- 描述:创建一个会话窗口。- 示例:
SELECT SESSION_START(ts,INTERVAL'15'MINUTE)AS window_start,COUNT(*)AS cntFROMtableGROUPBYSESSION(ts,INTERVAL'15'MINUTE);
2. 集合操作函数
集合操作函数用于对多个表进行集合运算,如 UNION、INTERSECT 和 EXCEPT。
- UNION- 描述:合并两个表,去重。- 示例:
SELECT*FROM table1UNIONSELECT*FROM table2;
- UNION ALL- 描述:合并两个表,不去重。- 示例:
SELECT*FROM table1UNIONALLSELECT*FROM table2;
- INTERSECT- 描述:返回两个表的交集。- 示例:
SELECT*FROM table1INTERSECTSELECT*FROM table2;
- EXCEPT- 描述:返回存在于第一个表但不存在于第二个表的记录。- 示例:
SELECT*FROM table1EXCEPTSELECT*FROM table2;
3. JSON 函数
Flink SQL 支持处理 JSON 数据的函数,非常适用于处理嵌套结构的数据。
- JSON_VALUE(json_string, path)- 描述:从 JSON 字符串中提取指定路径的值。- 示例:
SELECT JSON_VALUE('{"name": "John", "age": 30}','$.name')AS name;
- JSON_QUERY(json_string, path)- 描述:从 JSON 字符串中提取指定路径的子 JSON。- 示例:
SELECT JSON_QUERY('{"name": "John", "info": {"age": 30, "city": "New York"}}','$.info')AS info;
4. 数据类型转换函数
这些函数用于在不同的数据类型之间进行转换。
- CAST(expr AS type)- 描述:将 expr 转换为指定的数据类型。- 示例:
SELECT CAST('2024-08-07'ASTIMESTAMP)AS ts;
- TRY_CAST(expr AS type)- 描述:尝试将 expr 转换为指定的数据类型,如果失败则返回 NULL。- 示例:
SELECT TRY_CAST('abc'ASINT)AS number;-- 返回 NULL
5. 复杂类型函数
处理数组、映射等复杂数据类型的函数。
- ARRAY- 描述:创建一个数组。- 示例:
SELECT ARRAY[1,2,3]AS numbers;
- CARDINALITY(array)- 描述:返回数组的元素个数。- 示例:
SELECT CARDINALITY(ARRAY[1,2,3])AS size;
- ELEMENT(array, index)- 描述:返回数组中指定索引的元素。- 示例:
SELECT ELEMENT(ARRAY[1,2,3],2)AS second_element;-- 返回 2
6. 用户自定义函数 (UDF)
Flink SQL 允许用户定义自己的函数以满足特定需求。
- 创建和使用 UDF- 示例:
// Java 代码示例publicstaticclassMyUdfextendsScalarFunction{publicinteval(int x){return x * x;}}// 注册和使用 UDFtableEnv.createTemporarySystemFunction("MyUdf",MyUdf.class);tableEnv.sqlQuery("SELECT MyUdf(age) FROM people");
版权归原作者 LOVE_DDZ 所有, 如有侵权,请联系我们删除。