0


Flink CDC MySQL同步MySQL错误记录

1、启动 Flink SQL

[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

在这里插入图片描述

2、新建源表

问题1:Encountered “(”
处理方法:去掉

int(11)

,改为

int
Flink SQL>CREATETABLE`t_user`(>`uid`int(11)NOTNULLAUTO_INCREMENTCOMMENT'user id',>`did`int(11)DEFAULTNULLCOMMENT'dept id',>`username`varchar(14)DEFAULTNULL,>`add_time`datetimeDEFAULTNULL,>PRIMARYKEY(`uid`)NOT ENFORCED
>)WITH(>'connector'='mysql-cdc',>'hostname'='192.25.34.2',>'port'='3306',>'username'='*******',>'password'='*******',>'database-name'='test',>'table-name'='t_user'>);[ERROR] Could notexecuteSQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 2,column12.
Was expecting one of:
    "CONSTRAINT"..."NOT"..."NULL"..."PRIMARY"..."UNIQUE"..."COMMENT"..."METADATA"...")"...","..."MULTISET"..."ARRAY"...

Flink SQL>

问题2:Encountered “AUTO_INCREMENT”
处理方法:删除AUTO_INCREMENT

Flink SQL>CREATETABLE`t_user`(>`uid`intNOTNULLAUTO_INCREMENTCOMMENT'user id',>`did`intDEFAULTNULLCOMMENT'dept id',>`username`varchar(14)DEFAULTNULL,>`add_time`datetimeDEFAULTNULL,>PRIMARYKEY(`uid`)NOT ENFORCED
>)WITH(>'connector'='mysql-cdc',>'hostname'='192.25.34.2',>'port'='3306',>'username'='*******',>'password'='*******',>'database-name'='test',>'table-name'='t_user'>);[ERROR] Could notexecuteSQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2,column22.
Was expecting one of:
    "CONSTRAINT"..."PRIMARY"..."UNIQUE"..."COMMENT"..."METADATA"...")"...","..."MULTISET"..."ARRAY"...

Flink SQL>

问题3:Encountered “DEFAULT”
处理方法:删去DEFAULT

Flink SQL>CREATETABLE`t_user`(>`uid`intNOTNULLCOMMENT'user id',>`did`intDEFAULTNULLCOMMENT'dept id',>`username`varchar(14)DEFAULTNULL,>`add_time`datetimeDEFAULTNULL,>PRIMARYKEY(`uid`)NOT ENFORCED
>)WITH(>'connector'='mysql-cdc',>'hostname'='192.25.34.2',>'port'='3306',>'username'='*******',>'password'='*******',>'database-name'='test',>'table-name'='t_user'>);[ERROR] Could notexecuteSQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 3,column13.
Was expecting one of:
    "CONSTRAINT"..."NOT"..."NULL"..."PRIMARY"..."UNIQUE"..."COMMENT"..."METADATA"...")"...","..."MULTISET"..."ARRAY"...

Flink SQL>

问题4:Unknown identifier ‘datetime’
处理方法:改用 TIMESTAMP(3)

Flink SQL>CREATETABLE`t_user`(>`uid`intNOTNULLCOMMENT'user id',>`did`intCOMMENT'dept id',>`username`varchar(14),>`add_time`datetime,>PRIMARYKEY(`uid`)NOT ENFORCED
>)WITH(>'connector'='mysql-cdc',>'hostname'='192.25.34.2',>'port'='3306',>'username'='*******',>'password'='*******',>'database-name'='test',>'table-name'='t_user'>);[ERROR] Could notexecuteSQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'datetime'

Flink SQL>

创建成功:

Flink SQL>CREATETABLE`t_user`(>`uid`intNOTNULLCOMMENT'user id',>`did`intCOMMENT'dept id',>`username`varchar(14),>`add_time`TIMESTAMP(3),>PRIMARYKEY(`uid`)NOT ENFORCED
>)WITH(>'connector'='mysql-cdc',>'hostname'='192.25.34.2',>'port'='3306',>'username'='*******',>'password'='*******',>'database-name'='test',>'table-name'='t_user'>);[INFO]Execute statement succeed.

Flink SQL>

3、创建目标表

Flink SQL>CREATETABLE`ods_t_user`(>`uid`intNOTNULLCOMMENT'user id',>`did`intCOMMENT'dept id',>`username`varchar(14),>`add_time`TIMESTAMP(3),>PRIMARYKEY(`uid`)NOT ENFORCED
>)WITH(>'connector'='jdbc',>'url'='jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',>'driver'='com.mysql.cj.jdbc.Driver',>'username'='*******',>'password'='*******',>'table-name'='ods_t_user'>);

4、将源表加载到目标表

错误1:Connector ‘mysql-cdc’ can only be used as a source. It cannot be used as a sink.

Flink SQL> insert into t_user select * from ods_t_user;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'mysql-cdc' can only be used as a source. It cannot be used as a sink.

Flink SQL> 

原因:方向搞反了,插入表应该是目标表

Flink SQL> insert into ods_t_user select * from t_user;
[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block data

Flink SQL> 

错误2:unexpected block data
解决办法:
(1)更新jar包如下

[appuser@whtpjfscpt01 flink-1.17.1]$ ll lib/
total 223320
-rw-r--r-- 1 appuser appuser    196491 May 19 18:56 flink-cep-1.17.1.jar
-rw-r--r-- 1 appuser appuser    542620 May 19 18:59 flink-connector-files-1.17.1.jar
-rw-r--r-- 1 appuser appuser    266420 Sep 25 14:21 flink-connector-jdbc-3.1.1-1.17.jar
-rw-r--r-- 1 appuser appuser    345711 Sep 25 15:45 flink-connector-mysql-cdc-2.4.1.jar
-rw-r--r-- 1 appuser appuser    102472 May 19 19:02 flink-csv-1.17.1.jar
-rw-r--r-- 1 appuser appuser 135975541 May 19 19:13 flink-dist-1.17.1.jar
-rw-r--r-- 1 appuser appuser   8452171 Sep 19 10:20 flink-doris-connector-1.17-1.4.0.jar
-rw-r--r-- 1 appuser appuser    180248 May 19 19:02 flink-json-1.17.1.jar
-rw-r--r-- 1 appuser appuser  21043319 May 19 19:12 flink-scala_2.12-1.17.1.jar
-rw-r--r-- 1 appuser appuser  15407424 May 19 19:13 flink-table-api-java-uber-1.17.1.jar
-rw-r--r-- 1 appuser appuser  38191226 May 19 19:08 flink-table-planner-loader-1.17.1.jar
-rw-r--r-- 1 appuser appuser   3146210 May 19 18:56 flink-table-runtime-1.17.1.jar
-rw-r--r-- 1 appuser appuser    208006 May 17 18:07 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser    301872 May 17 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser   1790452 May 17 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 appuser appuser     24279 May 17 18:07 log4j-slf4j-impl-2.17.1.jar
-rw-r--r-- 1 appuser appuser   2462364 Sep 19 11:30 mysql-connector-java-8.0.26.jar
[appuser@whtpjfscpt01 flink-1.17.1]$

(2)重启flink

[appuser@whtpjfscpt01 flink-1.17.1]$ bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 41993) on host whtpjfscpt01.
Stopping standalonesession daemon (pid: 41597) on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host whtpjfscpt01.
Starting taskexecutor daemon on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

(3)重新执行

Flink SQL>SET execution.checkpointing.interval=3s;[INFO]Execute statement succeed.

Flink SQL>CREATETABLE`t_user`(>`uid`intNOTNULLCOMMENT'user id',>`did`intCOMMENT'dept id',>`username`varchar(14),>`add_time`TIMESTAMP(3),>PRIMARYKEY(`uid`)NOT ENFORCED
>)WITH(>'connector'='mysql-cdc',>'hostname'='192.25.34.2',>'port'='3306',>'username'='*******',>'password'='*******',>'database-name'='test',>'table-name'='t_user'>);[INFO]Execute statement succeed.

Flink SQL>CREATETABLE`ods_t_user`(>`uid`intNOTNULLCOMMENT'user id',>`did`intCOMMENT'dept id',>`username`varchar(14),>`add_time`TIMESTAMP(3),>PRIMARYKEY(`uid`)NOT ENFORCED
>)WITH(>'connector'='jdbc',>'url'='jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',>'driver'='com.mysql.cj.jdbc.Driver',>'username'='*******',>'password'='*******',>'table-name'='ods_t_user'>);[INFO]Execute statement succeed.

Flink SQL>

(4)成功执行

Flink SQL>insertinto ods_t_user select*from t_user;[INFO] Submitting SQLupdate statement to the cluster...[INFO]SQLupdate statement has been successfully submitted to the cluster:
Job ID: c2e69d061f3777c031b0acb4ec03d13a

在这里插入图片描述

错误3:无目标表
在这里插入图片描述

CREATETABLE demo.ods_t_user (`uid`int(11)NOTNULLAUTO_INCREMENTCOMMENT'user id',`did`int(11)DEFAULTNULLCOMMENT'dept id',`username`varchar(14)DEFAULTNULL,`add_time`datetimeDEFAULTNULL,PRIMARYKEY(`uid`))

在这里插入图片描述
源表添加新纪录

INSERT INTO test.t_user(did,username)values('3','test'); 

目标表自动同步数据
在这里插入图片描述

标签: flink mysql CDC

本文转载自: https://blog.csdn.net/chengyuqiang/article/details/133275963
版权归原作者 程裕强 所有, 如有侵权,请联系我们删除。

“Flink CDC MySQL同步MySQL错误记录”的评论:

还没有评论