项目场景:
一个将本地文本文件写入到MySQL的Flink作业
问题描述
运行作业时报错:Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
因为错误导致Flink作业不能完成
报错信息:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: unable to open JDBC writer
at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:56)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:115)
at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:448)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:121)
at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54)
... 12 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:378)
at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:205)
at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1433)
at com.mysql.cj.NativeSession.connect(NativeSession.java:133)
at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:948)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:818)
... 17 more
Caused by: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0
at sun.security.ssl.InputRecord.checkRecordVersion(InputRecord.java:552)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:565)
at sun.security.ssl.InputRecord.read(InputRecord.java:529)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347)
at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:191)
at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101)
at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:369)
... 22 more
Process finished with exit code 1
######################################################################################
原代码:
package com.lcvc
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import java.sql.PreparedStatement
case class Purchase(customerId: Int, productId: Int,quantity: Int)
object SinkToMysql {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取数据源
val input = env.readTextFile("input/purchases.log")
val purchases = input.map { line =>
val fields = line.split(",")
Purchase(fields(0).toInt,fields(1).toInt,fields(2).toInt)
}
// 创建 JDBC sink
val sink = JdbcSink.sink(
"INSERT INTO customer (customerId, productId, total) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE total = total + VALUES(total)",
new JdbcStatementBuilder[Purchase]{
override def accept(t: PreparedStatement,u: Purchase) = {
t.setInt(1,u.customerId)
t.setInt(2,u.productId)
t.setInt(3,u.quantity)
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://master:3306/eco")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("Password123456$")
.build()
)
// 发送作业到sink
purchases.addSink(sink)
// 启动Flink作业
env.execute()
}
}
原因分析:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Flink 在运行作业时抛出的一个异常。这通常意味着在执行 Flink 作业时发生了某种错误,导致作业无法成功完成
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Flink 在执行作业时发生了错误,并且由于配置了
NoRestartBackoffTimeStrategy
,所以不会尝试重启该作业
Caused by: java.io.IOException: unable to open JDBC writer
一个 Java IO 异常,表示在尝试打开 JDBC writer 时发生了问题。这通常意味着程序在连接数据库或进行写操作时遇到了困难
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
尝试与 MySQL 数据库通信时失败了
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
成功地将最后一个数据包发送到服务器,这发生在0毫秒前,
JDBC 驱动程序未能从服务器收到任何响应数据包
Caused by: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0
使用 SSL 进行安全连接时,客户端和服务器之间的 SSL 协议版本不兼容
根据报错来看问题是Flink作业和MySQL数据库的连接有问题
我的数据库版本是5.7.18,而我pom.xml文件里却写着8.0.31
为了兼容pom文件里的版本所以代码用的驱动类是com.mysql.cj.jdbc.Driver
而MySQL 8.0以下的用的是com.mysql.jdbc.Driver,这就因为兼容性问题导致无法连接数据库
解决方案:
pom.xml文件修改:
使用和集群数据库相同或相近版本的依赖
具体类代码修改:
useSSL=false:禁用SSL连接,防止出现SSL相关警告
修改后的源码:
package com.lcvc
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import java.sql.PreparedStatementcase class Purchase(customerId: Int, productId: Int,quantity: Int)
object SinkToMysql {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment// 读取数据源 val input = env.readTextFile("input/purchases.log") val purchases = input.map { line => val fields = line.split(",") Purchase(fields(0).toInt,fields(1).toInt,fields(2).toInt) } // 创建 JDBC sink val sink = JdbcSink.sink( "INSERT INTO customer (customerId, productId, total) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE total = total + VALUES(total)", new JdbcStatementBuilder[Purchase]{ override def accept(t: PreparedStatement,u: Purchase) = { t.setInt(1,u.customerId) t.setInt(2,u.productId) t.setInt(3,u.quantity) } }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://master:3306/eco?useSSL=false")
// .withDriverName("com.mysql.cj.jdbc.Driver")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("Password123$")
.build()
)
// 发送作业到sink
purchases.addSink(sink)// 启动Flink作业 env.execute()
}
}
问题解决
版权归原作者 殺 所有, 如有侵权,请联系我们删除。