0


运行Flink作业报错:Job execution failed.

项目场景:

一个将本地文本文件写入到MySQL的Flink作业


问题描述

运行作业时报错:Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

因为错误导致Flink作业不能完成

报错信息:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
    at akka.dispatch.OnComplete.internal(Future.scala:264)
    at akka.dispatch.OnComplete.internal(Future.scala:261)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    ... 4 more
Caused by: java.io.IOException: unable to open JDBC writer
    at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:56)
    at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:115)
    at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
    at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
    at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)
    at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:448)
    at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)
    at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
    at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:121)
    at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54)
    ... 12 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
    at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
    at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
    at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
    at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:378)
    at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:205)
    at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1433)
    at com.mysql.cj.NativeSession.connect(NativeSession.java:133)
    at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:948)
    at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:818)
    ... 17 more
Caused by: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0
    at sun.security.ssl.InputRecord.checkRecordVersion(InputRecord.java:552)
    at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:565)
    at sun.security.ssl.InputRecord.read(InputRecord.java:529)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
    at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
    at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347)
    at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:191)
    at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101)
    at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:369)
    ... 22 more

Process finished with exit code 1
######################################################################################
原代码:
package com.lcvc

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import java.sql.PreparedStatement

case class Purchase(customerId: Int, productId: Int,quantity: Int)

object SinkToMysql {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 读取数据源
    val input = env.readTextFile("input/purchases.log")
    val purchases = input.map { line =>
        val fields = line.split(",")
        Purchase(fields(0).toInt,fields(1).toInt,fields(2).toInt)
    }
    // 创建 JDBC sink
    val sink = JdbcSink.sink(
        "INSERT INTO customer (customerId, productId, total) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE total = total + VALUES(total)",
      new JdbcStatementBuilder[Purchase]{
        override def accept(t: PreparedStatement,u: Purchase) = {
          t.setInt(1,u.customerId)
          t.setInt(2,u.productId)
          t.setInt(3,u.quantity)
        }
      },
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
          .withUrl("jdbc:mysql://master:3306/eco")
          .withDriverName("com.mysql.cj.jdbc.Driver")
          .withUsername("root")
          .withPassword("Password123456$")
          .build()
    )
    // 发送作业到sink
    purchases.addSink(sink)

    // 启动Flink作业
    env.execute()
  }
}

原因分析:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

Flink 在运行作业时抛出的一个异常。这通常意味着在执行 Flink 作业时发生了某种错误,导致作业无法成功完成

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

Flink 在执行作业时发生了错误,并且由于配置了

NoRestartBackoffTimeStrategy

,所以不会尝试重启该作业

Caused by: java.io.IOException: unable to open JDBC writer

一个 Java IO 异常,表示在尝试打开 JDBC writer 时发生了问题。这通常意味着程序在连接数据库或进行写操作时遇到了困难

Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

尝试与 MySQL 数据库通信时失败了

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.

成功地将最后一个数据包发送到服务器,这发生在0毫秒前,

JDBC 驱动程序未能从服务器收到任何响应数据包

Caused by: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0

使用 SSL 进行安全连接时,客户端和服务器之间的 SSL 协议版本不兼容

根据报错来看问题是Flink作业和MySQL数据库的连接有问题

我的数据库版本是5.7.18,而我pom.xml文件里却写着8.0.31

为了兼容pom文件里的版本所以代码用的驱动类是com.mysql.cj.jdbc.Driver

而MySQL 8.0以下的用的是com.mysql.jdbc.Driver,这就因为兼容性问题导致无法连接数据库


解决方案:

pom.xml文件修改:

使用和集群数据库相同或相近版本的依赖

具体类代码修改:

useSSL=false:禁用SSL连接,防止出现SSL相关警告

修改后的源码:

package com.lcvc

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import java.sql.PreparedStatement

case class Purchase(customerId: Int, productId: Int,quantity: Int)

object SinkToMysql {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 读取数据源
 val input = env.readTextFile("input/purchases.log")
 val purchases = input.map { line =>
     val fields = line.split(",")
     Purchase(fields(0).toInt,fields(1).toInt,fields(2).toInt)
 }
 // 创建 JDBC sink
 val sink = JdbcSink.sink(
     "INSERT INTO customer (customerId, productId, total) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE total = total + VALUES(total)",
   new JdbcStatementBuilder[Purchase]{
     override def accept(t: PreparedStatement,u: Purchase) = {
       t.setInt(1,u.customerId)
       t.setInt(2,u.productId)
       t.setInt(3,u.quantity)
     }
   },
   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
       .withUrl("jdbc:mysql://master:3306/eco?useSSL=false")

// .withDriverName("com.mysql.cj.jdbc.Driver")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("Password123$")
.build()
)
// 发送作业到sink
purchases.addSink(sink)

// 启动Flink作业
 env.execute()

}
}

问题解决

标签: flink apache 大数据

本文转载自: https://blog.csdn.net/qq_54230492/article/details/140206714
版权归原作者 殺  所有, 如有侵权,请联系我们删除。

“运行Flink作业报错:Job execution failed.”的评论:

还没有评论