0


记录一次脏数据导致flink任务失败的处理

之前正常运行的flink任务突然失败,任务是从kafka读取数据,写入到mysql

主要报错有:

java.io.IOException: Failed to deserialize consumer record due to
........
    Suppressed: java.lang.RuntimeException: Writing records to JDBC failed.
    ........
    Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
    ........
    Caused by: java.sql.BatchUpdateException: Data truncation: #22007
    ........
    Caused by: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data 
    truncation: #22007
    ........
    Caused by: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: #22007
........
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = xxxxx, partition = 2, leaderEpoch = 5, offset = 189180123, CreateTime = 1705403370519, serialized key size = -1, serialized value size = 554, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@15c184d1).
........
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    

可以看出,是写入mysql的过程出错。怀疑是脏数据。

MysqlDataTruncation: Data truncation: #22007

这个报错,我记得以前有字符长度不够时,日志会记录是哪个字段长度不够导致的,但这次的日志没有指出具体是哪个字段有问题,排查发现应该不是mysql字段长度不够导致,后来在网上看可能是时间类型字段不匹配也会导致此报错

根据如下报错

Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = xxxxx, partition = 2, leaderEpoch = 5, offset = 189180123, CreateTime = 1705403370519, serialized key size = -1, serialized value size = 554, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@15c184d1).

从kafka中指定时间戳开始消费

其中

2024-01-16T11:09:29.000

是根据报错中 CreateTime = 1705403370519 转换成时间戳 后减去了1s

grep 1705403370519 

是报错中的 CreateTime = 1705403370519,是为了抓取到报错的数据;

# -- 指定时间戳消费,修改消费者组的消费时间戳 ---
kafka-consumer-groups \
--bootstrap-server 10.x.x.x:xxxx \--group group_test202401161132 \--topic topicxxxxx \
--reset-offsets \
--to-datetime 2024-01-16T11:09:29.000 \-execute

kafka-console-consumer  \
--bootstrap-server  10.x.x.x:xxxx \--topic  topicxxxxx \--group group_test202401161132 \--propertyprint.offset=true  \--propertyprint.timestamp=true  \
--max-messages 500\|grep1705403370519

最终抓取到异常数据,有个日期相关的字段值为‘0024-01-16’(正确的应该为2024-01-16),这个格式写入mysql的timestamp类型字段会报错。
针对这种数据在flink-sql中进行处理后,问题解决。

另外:
有点奇怪,我用报错中的 partition = 2, offset = 189180123 设置指定offset查询,没抓到异常数据,可能是offset提前量不够。

kafka-console-consumer  \
--bootstrap-server  10.x.x.x:xxxx \--topic  topicxxxxx \--propertyprint.offset=true  \--propertyprint.timestamp=true  \--offset189180122\--partition2\
--max-messages 1000\|grep1705403370519

详细报错日志如下:

java.io.IOException: Failed to deserialize consumer record due to
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:32) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-table_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
    Suppressed: java.lang.RuntimeException: Writing records to JDBC failed.
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:268) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:70) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
    Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.checkFlushException(JdbcOutputFormat.java:184) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:215) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:265) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        ... 14 more
    Caused by: java.io.IOException: java.sql.BatchUpdateException: Data truncation: #22007
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:225) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:158) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_181]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
        ... 1 more
    Caused by: java.sql.BatchUpdateException: Data truncation: #22007
        at sun.reflect.GeneratedConstructorAccessor20.newInstance(Unknown Source) ~[?:?]
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_181]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_181]
        at com.mysql.cj.util.Util.handleNewInstance(Util.java:192) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.util.Util.getInstance(Util.java:167) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.util.Util.getInstance(Util.java:174) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:853) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:249) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:219) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:158) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_181]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
        ... 1 more
    Caused by: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: #22007
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:104) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
        at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:249) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:219) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:158) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_181]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
        ... 1 more
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = xxxxxxx, partition = 2, leaderEpoch = 5, offset = 189180123, CreateTime = 1705403370519, serialized key size = -1, serialized value size = 554, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@15c184d1).

    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    ... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    ... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at StreamExecCalc$139.processElement_split5(Unknown Source) ~[?:?]
    at StreamExecCalc$139.processElement(Unknown Source) ~[?:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    ... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at StreamExecCalc$139.processElement_split5(Unknown Source) ~[?:?]
    at StreamExecCalc$139.processElement(Unknown Source) ~[?:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    ... 14 more
Caused by: java.io.IOException: Writing records to JDBC failed.
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:201) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at StreamExecCalc$139.processElement_split5(Unknown Source) ~[?:?]
    at StreamExecCalc$139.processElement(Unknown Source) ~[?:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    ... 14 more
Caused by: java.io.IOException: java.sql.BatchUpdateException: Data truncation: #22007
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:225) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at StreamExecCalc$139.processElement_split5(Unknown Source) ~[?:?]
    at StreamExecCalc$139.processElement(Unknown Source) ~[?:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    ... 14 more
Caused by: java.sql.BatchUpdateException: Data truncation: #22007
    at sun.reflect.GeneratedConstructorAccessor20.newInstance(Unknown Source) ~[?:?]
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_181]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_181]
    at com.mysql.cj.util.Util.handleNewInstance(Util.java:192) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.util.Util.getInstance(Util.java:167) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.util.Util.getInstance(Util.java:174) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:853) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:249) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:219) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at StreamExecCalc$139.processElement_split5(Unknown Source) ~[?:?]
    at StreamExecCalc$139.processElement(Unknown Source) ~[?:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    ... 14 more
Caused by: com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: #22007
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:104) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
    at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:249) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:219) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) ~[flink-connector-jdbc_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at StreamExecCalc$139.processElement_split5(Unknown Source) ~[?:?]
    at StreamExecCalc$139.processElement(Unknown Source) ~[?:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) ~[flink-dist_2.11-1.14.5.jar:1.14.5]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51) ~[flink-sql-connector-kafka_2.11-1.14.4.jar:1.14.4]
    ... 14 more
2024-01-17 01:16:20,076 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_2.
标签: flink 大数据

本文转载自: https://blog.csdn.net/weixin_41956627/article/details/135650663
版权归原作者 weixin_41956627 所有, 如有侵权,请联系我们删除。

“记录一次脏数据导致flink任务失败的处理”的评论:

还没有评论