0


TiDB系列之:使用Flink TiDB CDC Connector采集数据

TiDB系列之:使用Flink TiDB CDC Connector采集数据

TiDB CDC 连接器允许从 TiDB 数据库读取快照数据和增量数据。本文档介绍如何设置 TiDB CDC 连接器以对 TiDB 数据库运行 SQL 查询。

  • TiDB系列之:使用TiCDC增量同步TiDB数据库数据
  • TiDB系列之:TiCDC同步数据到Kafka集群使用Debezium数据格式
  • TiDB系列之:TiCDC同步TiDB数据库数据到Kafka集群Topic

一、依赖项

为了设置 TiDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)的项目和带有 SQL JAR 包的 SQL Client 的依赖信息。

二、Maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-tidb-cdc</artifactId><version>3.0.1</version></dependency>

三、SQL Client JAR

下载链接仅适用于稳定版本。

下载 flink-sql-connector-tidb-cdc-3.0.1.jar 并将其放在 <FLINK_HOME>/lib/ 下。

四、如何创建 TiDB CDC 表

TiDB CDC 表可以定义如下:

-- checkpoint every 3000 milliseconds                       
Flink SQL>SET'execution.checkpointing.interval'='3s';-- register a TiDB table 'orders' in Flink SQL
Flink SQL>CREATETABLE orders (
     order_id INT,
     order_date TIMESTAMP(3),
     customer_name STRING,
     price DECIMAL(10,5),
     product_id INT,
     order_status BOOLEAN,PRIMARYKEY(order_id)NOT ENFORCED
     )WITH('connector'='tidb-cdc','tikv.grpc.timeout_in_ms'='20000','pd-addresses'='localhost:2379','database-name'='mydb','table-name'='orders');-- read snapshot and binlogs from orders table
Flink SQL>SELECT*FROM orders;

五、连接器选项

参数是否必须默认值类型描述connectorrequired(none)String指定使用什么连接器,这里应该是“tidb-cdc”。database-namerequired(none)String要监控的 TiDB 服务器的数据库名称。table-namerequired(none)String要监控的 TiDB 数据库的表名。scan.startup.modeoptionalinitialStringTiDB CDC Consumer 可选的启动模式,有效枚举为“initial”和“latest-offset”。pd-addressesrequired(none)StringTiKV 集群的 PD 地址。tikv.grpc.timeout_in_msoptional(none)LongTiKV GRPC 超时(以毫秒为单位)。tikv.grpc.scan_timeout_in_msoptional(none)LongTiKV GRPC 扫描超时(以毫秒为单位)。tikv.batch_get_concurrencyoptional20IntegerTiKV GRPC 批量获取并发。tikv.*optional(none)String传递 TiDB 客户端的属性。

六、可用元数据

以下格式元数据可以在表定义中公开为只读(虚拟)列。
keyDataType描述table_nameSTRING NOT NULL包含该行的表的名称。database_nameSTRING NOT NULL包含该行的数据库的名称。op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。如果记录是从表的快照而不是binlog中读取的,则该值始终为0。
扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector'='tidb-cdc',
    'tikv.grpc.timeout_in_ms'='20000',
    'pd-addresses'='localhost:2379',
    'database-name'='mydb',
    'table-name'='orders');

七、特征

一次性处理

TiDB CDC 连接器是一个 Flink Source 连接器,它会先读取数据库快照,然后继续读取更改事件,即使发生故障也只处理一次。

启动阅读位置

配置选项 scan.startup.mode 指定 TiDB CDC Consumer 的启动模式。有效的枚举是:

  • initial(默认):拍摄捕获表的结构和数据的快照;如果您想从捕获的表中获取数据的完整表示,则很有用。
  • latest-offset:仅对捕获的表的结构进行快照;如果只需要获取从现在开始发生的更改,则很有用。

多线程读取

TiDB CDC 源可以并行读取工作,因为有多个任务可以接收更改事件。

DataStream Source

TiDB CDC 连接器也可以是 DataStream 源。您可以创建一个 SourceFunction,如下所示:

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.util.Collector;import org.apache.flink.cdc.connectors.tidb.TDBSourceOptions;import org.apache.flink.cdc.connectors.tidb.TiDBSource;import org.apache.flink.cdc.connectors.tidb.TiKVChangeEventDeserializationSchema;import org.apache.flink.cdc.connectors.tidb.TiKVSnapshotEventDeserializationSchema;import org.tikv.kvproto.Cdcpb;import org.tikv.kvproto.Kvrpcpb;import java.util.HashMap;public class TiDBSourceExample {

    public static void main(String[] args) throws Exception {

        SourceFunction<String> tidbSource =
            TiDBSource.<String>builder().database("mydb")// set captured database.tableName("products")// set captured table.tiConf(
                    TDBSourceOptions.getTiConfiguration("localhost:2399", new HashMap<>())).snapshotEventDeserializer(
                    new TiKVSnapshotEventDeserializationSchema<String>() {
                        @Overridepublic void deserialize(
                            Kvrpcpb.KvPair record, Collector<String>out)
                            throws Exception {
                            out.collect(record.toString());
                        }

                        @Overridepublic TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    }).changeEventDeserializer(
                    new TiKVChangeEventDeserializationSchema<String>() {
                        @Overridepublic void deserialize(
                            Cdcpb.Event.Row record, Collector<String>out)
                            throws Exception {
                            out.collect(record.toString());
                        }

                        @Overridepublic TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    }).build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpoint
        env.enableCheckpointing(3000);
        env.addSource(tidbSource).print().setParallelism(1);

        env.execute("Print TiDB Snapshot + Binlog");
    }
}

八、数据类型映射

TiDB typeFlink SQL typeNOTETINYINTTINYINTSMALLINT、TINYINT UNSIGNEDSMALLINTINT、MEDIUMINT、SMALLINT UNSIGNEDINTBIGINT、INT UNSIGNEDBIGINTBIGINT UNSIGNEDDECIMAL(20, 0)FLOATFLOATREAL、DOUBLEDOUBLENUMERIC(p, s) DECIMAL(p, s) where p <= 38DECIMAL(p, s)NUMERIC(p, s) DECIMAL(p, s) where 38 < p <= 65STRING在 TiDB 中 DECIMAL 数据类型的精度最高为 65,但在 Flink 中 DECIMAL 的精度限制为 38。因此,如果定义精度大于 38 的十进制列,则应将其映射到 STRING 以避免精度损失。BOOLEAN、TINYINT(1)、BIT(1)BOOLEANDATEDATETIME [§]TIME [§]TIMESTAMP [§]TIMESTAMP_LTZ [§]DATETIME [§]TIMESTAMP [§]CHAR(n)CHAR(n)VARCHAR(n)VARCHAR(n)BIT(n)BINARY(⌈n/8⌉)BINARY(n)BINARY(n)TINYTEXT、TEXT、MEDIUMTEXT、LONGTEXTSTRINGTINYBLOB、BLOB、MEDIUMBLOB、LONGBLOBBYTES目前,TiDB 中的 BLOB 数据类型仅支持长度不大于 2,147,483,647(2 ** 31 - 1) 的 Blob。YEARINTENUMSTRINGJSONSTRINGJSON 数据类型在 Flink 中会被转换为 JSON 格式的 STRING。SETARRAY由于 TiDB 中的 SET 数据类型是一个字符串对象,可以有零个或多个值,因此它应该始终映射到字符串数组


本文转载自: https://blog.csdn.net/zhengzaifeidelushang/article/details/140844033
版权归原作者 最笨的羊羊 所有, 如有侵权,请联系我们删除。

“TiDB系列之:使用Flink TiDB CDC Connector采集数据”的评论:

还没有评论