IDEA中运行Dinky0.7.5之KAFKA数据源调试问题解决过程
1. 首次代码编译
本文所使用的IDEA版本信息如下:
IntelliJ IDEA 2023.1.3 (Community Edition)
Build #IC-231.9161.38, built on June 20, 2023
Runtime version: 17.0.7+10-b829.16 amd64
VM: OpenJDK 64-Bit Server VM by JetBrains s.r.o.
Windows 10.0
GC: G1 Young Generation, G1 Old Generation
Memory: 2048M
Cores: 4
Non-Bundled Plugins:
de.netnexus.camelcaseplugin (3.0.12)
com.dguner.lombok-builder-helper (1.5.0)
MavenRunHelper (4.25.1-IJ2022.2)
io.github.hyuga0410.lombok-enums-component (1.2)
Kotlin: 231-1.8.21-IJ9161.38
从https://gitee.com/DataLinkDC/Dinky.git下载代码后,切换到0.7.5分支,不做任何修改。
jdk、maven等基本的,自己配好。
在IDEA右边Maven选项中进行Maven Profile勾选:dev、flink-1.17、jdk1.8、scala-2.12、web,然后继续使用它进行maven clean,maven install,随后就是漫长的等待(编译时间长短取决机器硬件),等待编译结束。
2. 运行环境
说明项****内容hadoop版本hadoop-3.1.4flink任务执行模式Yarn Sessionflink版本flink-1.17.0dlink版本dlink-release-0.7.5kafka版本kafka_2.12-3.0.0kafka运行模式zookeepermysql版本5.7.28
HDFS集群、YARN集群、Dilink环境的搭建和启动,这里略过,假设已经完成
3. 初始化数据库
在MySQL数据库创建 dlink_075 用户并在 dlink_075 数据库中执行 dlink-doc/sql/dinky.sql 文件。
4. IDEA运行配置
查看dlink根目录下/dlink-admin/src/main/resources/application.ym文件,该文件最上面可以看到:
spring:
datasource:
url: jdbc:mysql://${MYSQL_ADDR:127.0.0.1:3306}/${MYSQL_DATABASE:dlink}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: ${MYSQL_USERNAME:dlink}
password: ${MYSQL_PASSWORD:dlink}
driver-class-name: com.mysql.cj.jdbc.Driver
可以看到${}中有些变量(如:MYSQL_ADDR、MYSQL_DATABAS、MYSQL_USERNAME、MYSQL_PASSWORD),是可以外部指定的,如果没有指定,则使用冒号后面的值。
现在,在IDEA设置yml配置文件的参数,进入Run/Debug Conigurations配置页面(Run —> Edit Conigurations…):
在Environment variables中填入相关变量的值 ,变量名=变量值,不同部分间用分号间隔:
MYSQL_ADDR=192.168..;MYSQL_DATABASE=dlink_075;MYSQL_USERNAME=root;MYSQL_PASSWORD=****
5. 运行flinksql作业
5.1 申请Yarn Session来启动Flink集群
在Flink家目录下执行以下命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群:
./bin/yarn-session.sh -d-nm ww
可以在Yarn Web UI中看到我们新启动的YARN会话:
参数说明:
- -d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
- -nm(–name):配置在YARN UI界面上显示的任务名。
5.2 运行一个基本的FlinkSQL作业
在编辑器中输入以下内容:
DROPTABLEIFEXISTS employees;CREATETABLEIFNOTEXISTS employees (`emp_no`INTNOTNULL,`birth_date`DATE,`first_name` STRING,`last_name` STRING,`gender` STRING,`hire_date`DATE,
proctime as PROCTIME(),PRIMARYKEY(`emp_no`)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='192.168.*.*','port'='3306','username'='root','password'='****','scan.incremental.snapshot.enabled'='true','debezium.snapshot.mode'='latest-offset','database-name'='nfp_ep','table-name'='employees_dinky');DROPTABLEIFEXISTS dim_sex;CREATETABLE dim_sex (
sex STRING,
caption STRING,PRIMARYKEY(sex)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://192.168.*.*:3306/employees','table-name'='dim_sex','username'='root','password'='****');select*from
employees
leftjoin dim_sex FOR SYSTEM_TIME ASOF employees.proctime ON employees.gender = dim_sex.sex;
这量一个最基本的FlinkSQL任务,运行正常,源端表有新增或修改,可以在编辑界面下方的结果Sheet看到最新的结果变化,点击“获取最新数据”按钮,可以看到数据:
5.3 运行Kafka作为数据源的FlinkSQL作业
在编辑器中输入以下内容:
DROPTABLEIFEXISTS employees_kafka;CREATETABLEIFNOTEXISTS employees_kafka (`emp_no`INTNOTNULL,`birth_date`DATE,`first_name` STRING,`last_name` STRING,`gender` STRING,`hire_date`DATE)WITH('connector'='kafka','topic'='flink-cdc-kafka','properties.bootstrap.servers'='bd171:9092,bd172:9092,bd173:9092','properties.group.id'='flink-cdc-kafka-group','format'='json','scan.startup.mode'='latest-offset');CREATETABLEIFNOTEXISTS employees_sink (`emp_no`INTNOTNULL,`birth_date`DATE,`first_name` STRING,`last_name` STRING,`gender` STRING,`hire_date`DATE,PRIMARYKEY(`emp_no`)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://192.168.*.*:3306/employees?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false','table-name'='employees_kafka_sink','driver'='com.mysql.cj.jdbc.Driver','username'='root','password'='****');insertinto
employees_sink
select
emp_no,
birth_date,
first_name,
last_name,
gender,
hire_date
from
employees_kafka;
运行时会报错,如下:
[dlink] 2023-11-30 21:36:27.751 ERROR 16072 --- [nio-8888-exec-9] com.dlink.utils.LogUtil: 2023-11-30T21:36:27.750: Exception in executing FlinkSQL:
insert into
employees_sink
select
emp_no,
birth_date,
first_name,
last_name,
gender,
hire_date
from
employees_kafka
Error message:
org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.employees_kafka'.
Table options are:
'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='bd171:9092,bd172:9092,bd173:9092'
'properties.group.id'='flink-cdc-kafka-group'
'scan.startup.mode'='latest-offset'
'topic'='flink-cdc-kafka'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:413)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:857)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:374)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:758)
at com.dlink.executor.Executor.executeSql(Executor.java:249)
at com.dlink.job.JobManager.executeSql(JobManager.java:516)
at com.dlink.service.impl.StudioServiceImpl.executeFlinkSql(StudioServiceImpl.java:203)
at com.dlink.service.impl.StudioServiceImpl.executeSql(StudioServiceImpl.java:190)
at com.dlink.service.impl.StudioServiceImpl$$FastClassBySpringCGLIB$$e3eb787.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:89)
at com.dlink.aop.UdfClassLoaderAspect.round(UdfClassLoaderAspect.java:65)
at sun.reflect.GeneratedMethodAccessor154.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:634)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
at com.dlink.service.impl.StudioServiceImpl$$EnhancerBySpringCGLIB$$aef944af.executeSql(<generated>)
at com.dlink.controller.StudioController.executeSql(StudioController.java:78)
at com.dlink.controller.StudioController$$FastClassBySpringCGLIB$$e6483d87.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.adapter.AfterReturningAdviceInterceptor.invoke(AfterReturningAdviceInterceptor.java:57)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
at com.dlink.controller.StudioController$$EnhancerBySpringCGLIB$$8bbf8e16.executeSql(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1071)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:964)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:696)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:779)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at com.alibaba.druid.support.http.WebStatFilter.doFilter(WebStatFilter.java:124)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:360)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:399)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1789)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
Available factory identifiers are:
raw
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:1130)
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:1046)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.getValueDecodingFormat(KafkaDynamicTableFactory.java:330)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:183)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:164)
... 114 more
缺少json相关依赖,于是在dlink-admin的pom.xml文件里加上以下依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency>
准备再次编译整个Dinky工程,你会问为什么不单独编译dlink-admin,因为会报错,如下:
[INFO] --- spotless-maven-plugin:2.27.1:check (default) @ dlink-admin ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 36.993 s
[INFO] Finished at: 2023-11-30T22:02:29+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.27.1:check (default) on project dlink-admin: Execution default of goal com.diffplug.spotless:spotless-maven-plugin:2.27.1:check failed: Unable to locate file with path: style/spotless_dlink_formatter.xml: Could not find resource 'style/spotless_dlink_formatter.xml'. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
编译整个Dinky工程前,在IDEA右边Maven选项中,把Maven Profile里对“web”勾选去掉,静态的web项目再编译是没必要的,并且编译它太耗时了。
经过对整个Dinky工程的编译,启动dinky,再次执行上面的FlinkSQL,前面的问题不现出现,但会发现有新的问题,IDEA中有以下报错误:
[dlink] 2023-11-30 22:22:52.386 INFO 5668 --- [ent-IO-thread-1] org.apache.flink.client.program.rest.RestClusterClient: Submitting job 'kafka到mysql单表employees_savepoint' (16258731657846a524dd565dcfbef607).
[dlink] 2023-11-30 22:22:54.522 INFO 5668 --- [ent-IO-thread-4] org.apache.flink.client.program.rest.RestClusterClient: Successfully submitted job 'kafka到mysql单表employees_savepoint' (16258731657846a524dd565dcfbef607) to 'http://bd171:18081'.
[dlink] 2023-11-30 22:22:54.698 ERROR 5668 --- [nio-8888-exec-8] com.dlink.utils.LogUtil: 2023-11-30T22:22:54.698: Exception in executing FlinkSQL:
insert into
employees_sink
select
emp_no,
birth_date,
first_name,
last_name,
gender,
hire_date
from
employees_kafka
Error message:
org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
at com.dlink.executor.Executor.executeSql(Executor.java:249)
at com.dlink.job.JobManager.executeSql(JobManager.java:516)
at com.dlink.service.impl.StudioServiceImpl.executeFlinkSql(StudioServiceImpl.java:203)
at com.dlink.service.impl.StudioServiceImpl.executeSql(StudioServiceImpl.java:190)
at com.dlink.service.impl.StudioServiceImpl$$FastClassBySpringCGLIB$$e3eb787.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:89)
at com.dlink.aop.UdfClassLoaderAspect.round(UdfClassLoaderAspect.java:65)
at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:634)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
at com.dlink.service.impl.StudioServiceImpl$$EnhancerBySpringCGLIB$$ca979efe.executeSql(<generated>)
at com.dlink.controller.StudioController.executeSql(StudioController.java:78)
at com.dlink.controller.StudioController$$FastClassBySpringCGLIB$$e6483d87.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.adapter.AfterReturningAdviceInterceptor.invoke(AfterReturningAdviceInterceptor.java:57)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
at com.dlink.controller.StudioController$$EnhancerBySpringCGLIB$$9b916e1f.executeSql(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1071)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:964)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:696)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:779)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at com.alibaba.druid.support.http.WebStatFilter.doFilter(WebStatFilter.java:124)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:360)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:399)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1789)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'kafka到mysql单表employees_savepoint'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921)
... 94 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: employees_kafka[1] -> ConstraintEnforcer[2] -> Sink: employees_sink[2]
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
... 3 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: employees_kafka[1] -> ConstraintEnforcer[2] -> Sink: employees_sink[2]
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 more
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: employees_kafka[1] -> ConstraintEnforcer[2] -> Sink: employees_sink[2]
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:914)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:896)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:852)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:471)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
... 20 more
可以发现最核心的问题是:
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
单独部署的dinky运行这个FlinkSQL,是没有这个问题的;这看起来是kafka.clients的原始包,与被着色(maven-shade-plugin执行的结果 )后的包发生类型不一致,翻一下flink1.17的源码,看看flink-sql-connector-kafka子项目的pom.xml文件,可以看到:
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><executions><execution><id>shade-flink</id><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><includes><include>org.apache.flink:flink-connector-base</include><include>org.apache.flink:flink-connector-kafka</include><include>org.apache.kafka:*</include></includes></artifactSet><filters><filter><artifact>org.apache.kafka:*</artifact><excludes><exclude>kafka/kafka-version.properties</exclude><exclude>LICENSE</exclude><!-- Does not contain anything relevant.
Cites a binary dependency on jersey, but this is neither reflected in the
dependency graph, nor are any jersey files bundled. --><exclude>NOTICE</exclude><exclude>common/**</exclude></excludes></filter></filters><relocations><relocation><pattern>org.apache.kafka</pattern><shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern></relocation></relocations></configuration></execution></executions></plugin></plugins></build>
kafka.clients中的类的包名由“org.apache.kafka”着色成“org.apache.flink.kafka.shaded.org.apache.kafka”
上面以kafka作为数据源的FlinkSQL为什么会报错呢,回看IDEA中报错的关键点,其中有:
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
......
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
......
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348)
......
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:471)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
... 20 more
远端的flink集群的jobmaster运行中有错,在反序列化任务对象时出错,显然这个错是dinky和远端的flink集群中的类不一致引起的,再看看flink的jobmaster的日志:
显然,IDEA端的错误,部分就转自远端Flink集群的JobMaster错误;
来看看IDEA中dinky项目所使用jar包中,与kafka在关的jar包有哪些,File —> Projec Structure —> Project Settings —> Libraries:
Dinky源码工程在编译后运行时,用到的主要是:flink-connector-kafka-1.17.1.jar,kafka-clients-3.0.2.jar;
再看看flink用的是什么kafka有关的包:
看来flink用的是:flink-sql-connector-kafka-1.17.0.jar;把flink的kafka换成和dinky端一样的flink-connector-kafka-1.17.1.jar吧;
然后再次启动flink集群,启动成功后再次运行前面的FlinkSQL,在IDEA端运行正常,没有报错:
[dlink] 2023-11-30 23:31:13.209 INFO 5668 --- [adPool-Worker-5] com.dlink.api.FlinkAPI: Unable to connect to Flink JobManager: http://FINISHED
[dlink] 2023-11-30 23:31:13.216 WARN 5668 --- [adPool-Worker-5] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 60278
[dlink] 2023-11-30 23:41:29.010 WARN 5668 --- [io-8888-exec-10] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 615551
[dlink] 2023-11-30 23:42:58.238 WARN 5668 --- [io-8888-exec-10] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 88776
[dlink] 2023-11-30 23:42:58.980 INFO 5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:58.996 INFO 5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:59.008 INFO 5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:59.017 INFO 5668 --- [io-8888-exec-10] com.dlink.executor.Executor: Simple authentication mode
[dlink] 2023-11-30 23:42:59.137 INFO 5668 --- [ent-IO-thread-1] org.apache.flink.client.program.rest.RestClusterClient: Submitting job 'kafka到mysql单表employees_savepoint' (2fa25e0cbab4e2ba11a6818fe2da2677).
[dlink] 2023-11-30 23:42:59.588 INFO 5668 --- [ent-IO-thread-4] org.apache.flink.client.program.rest.RestClusterClient: Successfully submitted job 'kafka到mysql单表employees_savepoint' (2fa25e0cbab4e2ba11a6818fe2da2677) to 'http://bd171:18081'.
[dlink] 2023-11-30 23:44:02.942 WARN 5668 --- [adPool-Worker-5] com.alibaba.druid.pool.DruidAbstractDataSource: discard long time none received connection. , jdbcUrl : jdbc:mysql://192.168.1.198/dlink_075?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true, version : 1.2.8, lastPacketReceivedIdleMillis : 60395
但是在远端flink集群上却有错:
还是把dinky和flink端的jar包统一成flink-sql-connector-kafka-1.17.0.jar吧;flink端更换jar的过程略;
在IDEA中全局搜索包含flink-connector-kafka的xml文件,
在只更新dlink-flink-1.17下的,把它替换为:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>${flink.version}</version></dependency>
然后:重新编译dinky项目,启动flink集群,启动dinky,运行前面的FlinkSQL,这回一切正常。
2023-11-30 23:55:34,027 WARN org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig [] - The configuration 'client.id.prefix' was supplied but isn't a known config.
2023-11-30 23:55:34,027 WARN org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig [] - The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config.
2023-11-30 23:55:34,033 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 2.7.2
2023-11-30 23:55:34,034 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: 37a1cc36bf4d76f3
2023-11-30 23:55:34,034 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1701335854027
2023-11-30 23:55:34,065 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0
2023-11-30 23:55:34,083 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Subscribed to partition(s): flink-cdc-kafka-0
2023-11-30 23:55:34,095 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Seeking to LATEST offset of partition flink-cdc-kafka-0
2023-11-30 23:55:35,618 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Resetting the last seen epoch of partition flink-cdc-kafka-0 to 3 since the associated topicId changed from null to oQYrIKJBRe-oWt7Q0nZi7A
2023-11-30 23:55:35,625 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Cluster ID: _nGd57n0QxGTp130IKGwDQ
2023-11-30 23:55:35,669 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-cdc-kafka-group-0, groupId=flink-cdc-kafka-group] Resetting offset for partition flink-cdc-kafka-0 to position FetchPosition{offset=1222, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[bd171:9092 (id: 171 rack: null)], epoch=3}}.
向kafka相关的主题里写入若干数据,数据最终端落到数据库表里了;
7. 结论
以上浓缩了从发现问题到解决核心问题的全过程,这个过程,可不像上面描述的这么容易解决了。
继续努力!
版权归原作者 武舞悟 所有, 如有侵权,请联系我们删除。