0


Flink SQL 中处理 MySql Unsigned BIGINT 类型的方式

背景

我是 Porterxie,一个爱问为什么的程序员。最近在使用 Flink 1.17 SQL做数据接入(可以理解为从一个数据源抽取数据写入到目标数据源中的一个过程)的时候,发现处理 MySql Unsigned BIGINT 出现了如下报错

java.lang.ClassCastException:java.math.BigInteger cannot be cast tojava.lang.Long
    at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
    at BatchExecCalc$2.processElement(UnknownSource)~[?:?]

下面我将从问题排查方式及源码层面对该问题进行分析

首先明确一下问题出现的场景(以下简化了众多信息,只是列出关键信息)

  • Mysql 表:A(value unsigned BIGINT) B(value unsigned BIGINT)
  • Flink SQL 中注册表:A(value BIGINT) B(value BIGINT)
  • 数据迁移 SQL :insert into A select value from B

分析过程

  • 从异常栈来看(上面我给出的异常栈并不全),并不能得到太多信息,而且在不了解 flink sql 详细执行过程的情况下,通过异常栈及源码阅读分析出是哪一步的问题是不可能的。因此我采取的是本地复现 & Debug 方式,以下是复现代码
// 创建 Flink Table 环境StreamExecutionEnvironment streamEnvironment =StreamExecutionEnvironment.createLocalEnvironment();StreamTableEnvironment tableEnvironment =StreamTableEnvironment.create(streamEnvironment);// 注册源表
        tableEnvironment.executeSql("CREATE TABLE source ("+"  value BIGINT"+") WITH ("+"  'connector' = 'jdbc',"+"  'url' = '"+ sourceUrl +"',"+"  'table-name' = 'source',"+"  'username' = '"+ sourceUser +"',"+"  'password' = '"+ sourcePassword +"'"+")");// 注册目标表
        tableEnvironment.executeSql("CREATE TABLE target ("+"  value BIGINT"+") WITH ("+"  'connector' = 'jdbc',"+"  'url' = '"+ sinkUrl +"',"+"  'table-name' = 'target',"+"  'username' = '"+ sinkUser +"',"+"  'password' = '"+ sinkPassword +"'"+")");// 执行插入查询,将源表数据插入目标表
        tableEnvironment.executeSql("INSERT INTO target SELECT value FROM source").print();
  • 首先找到 GenericRowData 的 getLong 方法,打上断点,往上跳,会到如下代码处,如下所示

在这里插入图片描述

  • 这个地方的 serializer 是 RowDataSerializer,继续往下跟,会到如下代码处

在这里插入图片描述

  • 这里的 fieldGetter 是一个 lambda 表达式,不好看具体实现是什么,因此看一下这个 fieldSerializers 数组是怎么初始化的

在这里插入图片描述

这个地方打个断点,重新跑一下,就知道此处是 Long 值的 FieldGetter

在这里插入图片描述
这里 RowData 是 GenericRowData,同时值是 BigInteger ,因此在做强转的时候自然会报错

在这里插入图片描述

  • 通过上述分析,我们大致清楚报错的原因,但是问题的根源是什么以及如何去解决问题,是我们更应该知道的。跟随 Debug,我们往上走,来到如下图代码处,此处 Source 的实现是 JdbcRowDataInputFormat

在这里插入图片描述

现在我们进入 JdbcRowDataInputFormat 类中,找到 nextRecord 生成方式

在这里插入图片描述

最终我们会来到这个位置,找到 BIGINT 生成转换方法是直接将原值返回,此时整个脉络就十分清晰了。

在这里插入图片描述
最终问题原因是:Mysql ResultSet.getObject 返回的就是 BigInteger(当然这并不是 Mysql 驱动的 BUG)
返回 BigInteger 的原因其实很简单:java 的 Long 类型是有符号的,无符号 Long 最大值的范围是超过有符号 Long,因此 Mysql 驱动采用的是 BigInteger 去表示。

解决方案

事实上 flink 中这个 BIGINT 指的是 flink 逻辑表字段类型的 BIGINT,因此我们可以采用 DECIMAL 来表示因此 flink sql 中表字段类型,修改之后就为 A(value DECIMAL) B(value DECIMAL),而不是去修改 Mysql 的字段类型

作者介绍

PorterXie

Java 后端工程师,曾涉猎 Android、Web、Eclipse 插件、小程序、大数据等领域。热爱技术,乐于尝试,买过显卡挖过矿。对 AI 充满热情,致力于提升工作效率的工具开发者。

个人工具网站

https://pxtools.top

个人 gitee

https://gitee.com/porterxie

技术观:

  • 用机器代替人工
  • 追求卓越
  • 做有意义的事
标签: flink sql mysql

本文转载自: https://blog.csdn.net/qq_26921975/article/details/141361938
版权归原作者 qq_26921975 所有, 如有侵权,请联系我们删除。

“Flink SQL 中处理 MySql Unsigned BIGINT 类型的方式”的评论:

还没有评论