0


Flink cdc debug调试动态变更表结构

文章目录

前言

接着上一篇Flink cdc3.0动态变更表结构——源码解析,cdc debug部分官方没有特别说明,尝试踩了一些坑, 这里记录下。

调试流程

1. 拉取代码本地打包

通过 github 拉取

3.0.0

以上版本,本地maven打包

 mvn clean package -DskipTests

2. 配置启动参数

搜索启动类

CliFrontend

,修改运行配置。
在这里插入图片描述
需要配置环境变量

FLINK_HOME

pipeline connector依赖包

flink-dist 包

,以及指定配置文件,

--use-mini-cluster true

代表使用 local 集群。
在这里插入图片描述

mysql-to-doris.yaml
################################################################################# Description: Sync MySQL all tables to Doris################################################################################
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app_db.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 1

3. 日志配置

如果想debug时控制台有日志输出,需要修改主pom.xml中

org.apache.logging.log4j

相关的

scope

。然后再

flink-cdc-cli

模块添加对应的

log4j2.xml

日志配置。我这里为了测试效果,将日志级别调成

info


在这里插入图片描述

4. 启动验证

在测试前需要doris创建好对应的数据库,启动验证库表能正常同步,这次我们拿

app_db. products

表进行测试。目前表结构和数据能正常同步,接下来我们准备一条添加字段的sql在mysql执行
在这里插入图片描述

5. 断点验证

我们在

SchemaOperator

类的

processElement

方法中提前加上断点,在mysql中执行

alter table products add COLUMN name VARCHAR(64);

,可以看到断点处捕获到了对应的

SchemaChangeEvent

事件。在这里插入图片描述
从这里可以看到

SchemaChangeEvent

的生成及之后的处理,和上一篇分析的流程也是相同的。完整执行日志:
在这里插入图片描述
doris上也能查到添加的字段,不过是

VARCHAR(256)

,这里又是一个bug。
在这里插入图片描述

问题

1. Cannot find factory with identifier “mysql” in the classpath.

没有添加

flink-cdc-pipeline-connector-mysql-3.0-SNAPSHOT.jar

引起的异常
在这里插入图片描述

2.JsonFactory异常

下面这个异常网上很多解决方案是添加

com.fasterxml.jackson.core

相关包,而这里实际可能是classpath没有添加

flink-dist-1.18.0.jar
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/shaded/jackson2/com/fasterxml/jackson/core/JsonFactory
    at com.ververica.cdc.cli.utils.FlinkEnvironmentUtils.loadFlinkConfiguration(FlinkEnvironmentUtils.java:33)
    at com.ververica.cdc.cli.CliFrontend.createExecutor(CliFrontend.java:89)
    at com.ververica.cdc.cli.CliFrontend.main(CliFrontend.java:62)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

3. NoSuchMethodError异常

如果classpath引入的pipeline版本不匹配,可能会找出NoSuchMethodError异常, 比如我最开始使用的官网的链接:
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar

Caused by: java.lang.NoSuchMethodError: com.ververica.cdc.common.event.AddColumnEvent$ColumnWithPosition.<init>(Lcom/ververica/cdc/common/schema/Column;Lcom/ververica/cdc/common/event/AddColumnEvent$ColumnPosition;Lcom/ververica/cdc/common/schema/Column;)V
    at com.ververica.cdc.connectors.mysql.source.parser.CustomAlterTableParserListener.lambda$exitAlterByAddColumn$0(CustomAlterTableParserListener.java:120)
    at io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.runIfNotNull(MySqlAntlrDdlParser.java:358)
    at com.ververica.cdc.connectors.mysql.source.parser.CustomAlterTableParserListener.exitAlterByAddColumn(CustomAlterTableParserListener.java:98)
    at io.debezium.ddl.parser.mysql.generated.MySqlParser$AlterByAddColumnContext.exitRule(MySqlParser.java:15459)
    at io.debezium.antlr.ProxyParseTreeListenerUtil.delegateExitRule(ProxyParseTreeListenerUtil.java:64)
    at com.ververica.cdc.connectors.mysql.source.parser.CustomMySqlAntlrDdlParserListener.exitEveryRule(CustomMySqlAntlrDdlParserListener.java:122)
    at com.ververica.cdc.connectors.shaded.org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:48)

其他

调试环境准备的过程中,会遇到各种小问题。这里也很难罗列完,基本是根据日志来判断处理,既然有案例可以正常调试,相信大家也可以的。

结尾

参考文献:https://docs.google.com/document/d/1L6cJiqYkAsZ_nDa3MgRwV3SKQuw5OrMbqGC4YgzgKR4/edit#heading=h.aybxdd96r62i

标签: flink 大数据 debug

本文转载自: https://blog.csdn.net/yyoc97/article/details/136058801
版权归原作者 yyoc97 所有, 如有侵权,请联系我们删除。

“Flink cdc debug调试动态变更表结构”的评论:

还没有评论