0


flink sqlserver cdc实时同步(含sqlserver安装配置等)

文章目录

01 引言

官方文档:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/sqlserver-cdc.md

如果要使用flink cdc做sqlserver的实时同步,需要满足以下条件:

  1. 需要安装SQLServer(需要支持CDC的功能,SQLServer 2008之后的版本都支持)
  2. 需要开启SQL Server代理;
  3. 启用CDC功能。

ok,接下来开始讲解。

02 SQLServer安装

首先需要先安装SqlServer(使用的是2019版本),有兴趣的同学可以参考博主之前写的《Docker下安装SqlServer2019》。

主要就是两个步骤:

## 拉取最新镜像docker pull mcr.microsoft.com/mssql/server:2019-latest
## 运行 SQL Server 容器(密码必须是8个字符,并包含字母、数字和特殊字符,如:abc@123456 ,下面映射主机端口为30027)docker run -e'ACCEPT_EULA=Y'-e'SA_PASSWORD=abc@123456'-p30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest

03 开启SQLServer代理

首先使用root用户进入容器:

dockerexec-it--user root sql_server_2019 bash

进入容器后,执行命令启用SqlServeragent:

/opt/mssql/bin/mssql-conf set sqlagent.enabled true

退出,并重启容器:

exitdocker restart sql_server_2019

具体操作如下:
在这里插入图片描述

04 开启CDC功能


step1:创建’cdc_test’数据库,并使用连接工具登录该数据库,使用以下 SQL 命令启用 CDC 功能:

-- 创建数据库CREATEDATABASE cdc_test;-- 启用CDC功能EXEC sys.sp_cdc_enable_db;-- 判断当前数据库是否启用了CDC(如果返回1,表示已启用)SELECT is_cdc_enabled FROM sys.databasesWHERE name ='cdc_test';

在这里插入图片描述


step2:选择要进行 CDC 跟踪的表(这里使用

orders表作为演示

-- 创建示例表(orders)CREATETABLE orders (
     id int,
     order_date date,
     purchaser int,
     quantity int,
     product_id int,PRIMARYKEY([id]));-- schema_name 是表所属的架构(schema)的名称。-- table_name 是要启用 CDC 跟踪的表的名称。-- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。EXEC sys.sp_cdc_enable_table
  @source_schema='dbo',@source_name='orders',@role_name='cdc_role';

执行结果如下:
在这里插入图片描述


step3:启用 CDC 后,SQL Server 将自动跟踪启用了 CDC 的表上的数据更改,并将更改信息存储在 CDC 相关的表中,您可以使用这些信息进行数据更改追踪和同步。

-- 查询在当前数据库下所有的表:SELECT*FROM INFORMATION_SCHEMA.TABLES

在这里插入图片描述

05 Flink SQL

ok,现在可以写FlinkSQL了,如下:

-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据CREATETABLE t_source_sqlserver (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,PRIMARYKEY(id)NOT ENFORCED -- 主键定义(可选))WITH('connector'='sqlserver-cdc',-- 使用SQL Server CDC连接器'hostname'='10.194.183.120',-- SQL Server主机名'port'='30027',-- SQL Server端口'username'='sa',-- SQL Server用户名'password'='abc@123456',-- SQL Server密码'database-name'='cdc_test',-- 数据库名称'schema-name'='dbo',-- 模式名称'table-name'='orders'-- 要捕获更改的表名);-- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库CREATETABLE table_sink_mysql (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,PRIMARYKEY(id)NOT ENFORCED  -- 主键定义(可选))WITH('connector'='jdbc',-- 使用JDBC连接器'url'='jdbc:mysql://10.194.183.120:30025/test',-- MySQL的JDBC URL'username'='root',-- MySQL用户名'password'='root',-- MySQL密码'table-name'='orders'-- 要写入的MySQL表名);-- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中INSERTINTO table_sink_mysql SELECT*FROM t_source_sqlserver;

启动程序,一切正常:
在这里插入图片描述

06 验证

验证新增:
在这里插入图片描述
在这里插入图片描述


验证修改:
在这里插入图片描述
在这里插入图片描述


验证删除:
在这里插入图片描述
在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_20042935/article/details/131959277
版权归原作者 杨林伟 所有, 如有侵权,请联系我们删除。

“flink sqlserver cdc实时同步(含sqlserver安装配置等)”的评论:

还没有评论