0


Spark-环境启动

一、概览

从start-all.sh开始捋,一直捋到Master、Worker的启动并建立通信

二、宏观描述

Master端

1、start-all.sh调用start-master.sh启动Master

2、执行org.apache.spark.deploy.master.Master中main方法

3、通过工厂模式创建RpcEnv子类NettyRpcEnv

    a、创建TransportServer

    b、初始化TransportServer(通过ServerBootstrap去引导Netty并并绑定端口)

    c、Dispatcher将Master作为一个RpcEndpoint注册到RpcEnv中并返回RpcEndpointRef

    d、创建一个端点传递消息的MessageLoop,并将这个MessageLoop和Master绑定

    e、New一个带有待处理消息的收件箱(Inbox)同时也和Master绑定

    f、Inbox中有一个代码块会把OnStart方法添加到消息列表

    g、调用setActive(inbox)将其标记为活动以处理OnStart消息

4、Master中的OnStart的方法被调起

5、处理来自Workers的注册信息RegisterWorker,向Workers发送注册成功信息RegisterWorkerResponse

5、检查并删除超时的Worker

Worker端

1、start-all.sh调用start-workers.sh脚本

2、start-workers.sh去workers文件中查询节点ip或host依次ssh过去启动start-worker.sh脚本

3、各个节点执行org.apache.spark.deploy.worker.Worker中main方法

4、通过工厂模式创建RpcEnv子类NettyRpcEnv

    a、创建TransportServer

    b、初始化TransportServer(通过ServerBootstrap去引导Netty并并绑定端口)

    c、Dispatcher将Master作为一个RpcEndpoint注册到RpcEnv中并返回RpcEndpointRef

    d、创建一个端点传递消息的MessageLoop,并将这个MessageLoop和Master绑定

    e、New一个带有待处理消息的收件箱(Inbox)同时也和Master绑定

    f、Inbox中有一个代码块会把OnStart方法添加到消息列表

    g、调用setActive(inbox)将其标记为活动以处理OnStart消息

5、Worker中的OnStart的方法被调起

6、根据配置看是否启动外部ShuffleService

7、获取资源信息

8、根据Master的URL从RpcEnv中获取MasterEndpointRef

9、向所有的Master(可能会是HA的场景)发送注册信息RegisterWorker

10、接收到来自Master的RegisterWorkerResponse信息

11、向Master发送心跳Heartbeat

三、源码调用绘图

把图下载下来,放大可以看清楚

四、核心源码

脚本部分

1、start-all.sh

#启动所有spark守护进程
#在此节点上启动Master
#在conf/workers中指定的每个节点上启动一个worker

# Start Master
"${SPARK_HOME}/sbin"/start-master.sh

# Start Workers
"${SPARK_HOME}/sbin"/start-workers.sh

2、start-master.sh

CLASS="org.apache.spark.deploy.master.Master"
#默认端口
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
#获取Master节点的域名
$SPARK_MASTER_HOST

#启动Master守护进程
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
  --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
  $ORIGINAL_ARGS

3、start-workers.sh

#Master节点默认端口
SPARK_MASTER_PORT=7077
#获取Master节点的域名
$SPARK_MASTER_HOST
#启动所有的worker
"${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"

4、start-worker.sh

CLASS="org.apache.spark.deploy.worker.Worker"

#启动几个Worker实例
$SPARK_WORKER_INSTANCES
#第一个worker的端口号。如果设置,后续的workers将递增此数字。如果未设置,Spark将找到一个有效的端口号,但不能保证模式是可预测的。
$SPARK_WORKER_PORT
#第一个worker的web界面的基本端口。后续的workers将增加这个数字。默认值为8081。
$SPARK_WORKER_WEBUI_PORT
#启动worker
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
     --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"

scala代码部分

1、Master

//Master 是一个多线程安全的RpcEndpoint
private[deploy] class Master(
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

  private val forwardMessageThread =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
  //Master端有所有worker的信息 并且有它们的 Rpc通信地址
  val workers = new HashSet[WorkerInfo]
  private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]

  override def onStart(): Unit = {
    logInfo("Starting Spark master at " + masterUrl)
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    webUi = new MasterWebUI(this, webUiPort)
    webUi.bind()
    masterWebUiUrl = webUi.webUrl
    //检测所有Worker的超时任务
    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
      () => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
      0, workerTimeoutMs, TimeUnit.MILLISECONDS)
  }

  override def onStop(): Unit = {...}

  //HA下的Leader选举
  override def electedLeader(): Unit = {
    self.send(ElectedLeader)
  }

  //接收其他 Endpoint 的 send 消息
  override def receive: PartialFunction[Any, Unit] = {
    case ElectedLeader => Leader选举
    case RegisterWorker(...)=> 
        worker 此时会携带它的资源信息 xx核 xx 内存
        master 会添加这个 worker 并向这个 worker send  RegisterWorkerResponse 消息
    case RegisterApplication(description, driver) => derver端的应用注册
    case Heartbeat(workerId, worker) => 接收worker端的心跳
    case CheckForWorkerTimeOut => 检测worker的心跳
    ......
  }

  //接收其他 Endpoint 的 ask 消息
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RequestSubmitDriver(description) => 提交 driver 
    case RequestKillDriver(driverId) => 杀掉 driver
    case RequestDriverStatus(driverId) => driver 状态
    case RequestMasterState => master 状态
    case RequestExecutors(appId, requestedTotal) => 启动 executors
    case KillExecutors(appId, executorIds) => 杀掉 executors
    ......
  }

  //在workers上启动executors 
  //返回一个数组,其中包含分配给每个worker的核心数。
  //有两种启动executors 的模式。
  //第一种方法试图将应用程序的executors 分散到尽可能多的worker进程上, 更适合数据局部性,是默认设置
  //第二种方法则相反(即在尽可能少的worker进程中启动它们)。
  //分配给每个executor的核心数量是可配置的。当显式设置此选项时,如果工作进程有足够的内核和内存,则可以在同一worker进程上启动来自同一应用程序的多个executor。否则,
  //默认情况下,每个executor都会抓取worker上可用的所有内核,在这种情况下,在一次单独的计划迭代中,每个应用程序只能在每个worker上启动一个executor。
  //请注意,当未设置“spark.executor.cores”时,我们仍然可以在同一个worker上从同一个应用程序启动多个executor。
  //假设appA和appB都有一个executor在worker1上运行,并且appA.cores>0,则appB完成并释放worker1上的所有内核,因此对于下一个计划迭代,appA启动一个新的executor,抓取worker1上所有空闲的内核,因此我们从运行在worker1的appA中获得多个executor。
  private def scheduleExecutorsOnWorkers(...){
    ......
  }

  //在workers上启动executors
  private def startExecutorsOnWorkers(): Unit = {
    .......
  }

  //分配worker上的资源给 1个 或多个 executor
  private def allocateWorkerResourceToExecutors(){
    .......
  }

  //在等待的应用程序之间安排当前可用的资源。每当有新应用加入或资源可用性发生变化时,都会调用此方法。
  private def schedule(): Unit = {
    .......
  }

  ......

}

private[deploy] object Master extends Logging {
  val SYSTEM_NAME = "sparkMaster"
  val ENDPOINT_NAME = "Master"
  def main(argStrings: Array[String]): Unit = {
    val args = new MasterArguments(argStrings, conf)
    //启动Master 并返回一个 tuple 3
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }
  
  def startRpcEnvAndEndpoint(...){
    //启动一个RpcEnv
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    //将Master Endpoint 注册到 RpcEnv 中
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }

}

2、Worker

//Worker 是一个多线程安全的RpcEndpoint
private[deploy] class Worker(...){

  //Worker 端也有 Master 端的RpcEndpointRef
  private var master: Option[RpcEndpointRef] = None
  private var activeMasterUrl: String = ""

  //该Worker上有哪些 app driver executor 
  val drivers = new HashMap[String, DriverRunner]
  val executors = new HashMap[String, ExecutorRunner]
  val appDirectories = new HashMap[String, Seq[String]]

  override def onStart(): Unit = {
    //根据配置启动外部ShuffleService
    startExternalShuffleService()
    //设置资源
    setupWorkerResources()
    //向所有Master注册自己
    registerWithMaster()
    ......
  }

  //使用个新的Master
  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String,
      masterAddress: RpcAddress): Unit = {
    ......
  }

  //向所有的Master注册
  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
    ......
  }

  //向Master发送注册消息
  private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
    ......
  }

  //处理Master发送的注册响应
  private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
    msg match {
      case RegisteredWorker(...) => 注册成功
      case RegisterWorkerFailed(message) => 注册失败
      case MasterInStandby => Master 还没准备好
    }
  }

  //处理来自其他 Endpoint 的 send 消息
  override def receive: PartialFunction[Any, Unit] = synchronized {
    .......
  }
  
  //处理来自其他 Endpoint 的 ask 消息
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    .......
  }

  //向当前主机发送消息。如果尚未在任何主机上成功注册,则消息将被丢弃。
  private def sendToMaster(message: Any): Unit = {

  }

  ......

}

private[deploy] object Worker extends Logging {
  val SYSTEM_NAME = "sparkWorker"
  val ENDPOINT_NAME = "Worker"
  def main(argStrings: Array[String]): Unit = {
    val args = new WorkerArguments(argStrings, conf)
    //启动 RpcEnv 并将自己放进去
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf,
      resourceFileOpt = conf.get(SPARK_WORKER_RESOURCE_FILE))
      ......
      rpcEnv.awaitTermination()
  }

  def startRpcEnvAndEndpoint(......){
    //一个节点可以启动多个worker 默认是1个
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))
    rpcEnv
  }

}

3、NettyRpcEnv

//父类是RpcEnv
//RPC环境。[[RpcEndpoint]]需要在[[RpcEnv]]中注册一个名称来接收消息。
//然后[[RpcEnv]]将处理从[[RpcEndpointRef]]或远程节点发送的消息,并将其传递给相应的[[RpcEndpoint]]s。
//对于[[RpcEN]]捕获的未捕获异常,[[Rpcen]]将使用[[RpcCallContext.sendFailure]]将异常发送回发送方,或者在没有此类发送方或“NoterializableException”的情况下记录它们。
//[[RpcEnv]]还提供了一些方法来检索[[RpcEndpointRef]]的给定名称或uri
//它是各种角色通信的基础,是基于Netty的
private[netty] class NettyRpcEnv(...) extends RpcEnv {

  //分发器
  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
  //基于Netty 其初始化时会通过bootstrap引导启动Netty
  @volatile private var server: TransportServer = _
  //[[RpcAddress]]和[[发件箱]]的map。当我们连接到远程[[RpcAddress]]时,我们只需将消息放入其[[Outbox]]即可实现非阻塞的“send”方法。
  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
  
  //启动这个NettyRpcEnv
  def startServer(bindAddress: String, port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }
    //创建一个将尝试绑定到特定主机和端口的Netty服务器
    server = transportContext.createServer(bindAddress, port, bootstraps)
    //注册一个RpcEndpointVerifier 用于给远程RpcEndpoint查询是否存在对应的RpcEndpoint
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

  //设置一个RpcEndpoint
  override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint)
  }

  //往Outbox中放信息(且指明了收件人)
  private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
    ......
  }

  //向远程 RPC endpoint 发送信息 其实就是放到 Outbox中 
  private[netty] def send(message: RequestMessage): Unit = {
    ......
  }

  //异步发送消息,设置了超时时间,需要等待响应
  private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
    ......
  }

  ......
}

//RpcEndpointRef的NettyRpcEnv版本。
//此类的行为因创建位置而异。在“拥有”RpcEndpoint的节点上,它是一个围绕RpcEndpointAddress实例的简单包装器。
//在接收引用序列化版本的其他机器上,行为会发生变化。实例将跟踪发送引用的TransportClient,以便通过客户端连接向端点发送消息,而不需要打开新的连接。
//此引用的RpcAddress可以为空;这意味着ref只能通过客户端连接使用,因为承载端点的进程不监听传入连接。这些引用不应与第三方共享,因为它们将无法向端点发送消息。
private[netty] class NettyRpcEndpointRef(...){
    .......
}

//从发送方发送到接收方的消息。
private[netty] class RequestMessage(...){
    .......
}

//将传入的RPC分派到已注册的端点。
//处理程序跟踪与其通信的所有客户端实例,以便RpcEnv在向客户端端点(即不监听传入连接,而是需要通过客户端Socket联系的端点)发送RPC时知道要使用哪个“TransportClient”实例。
//事件是按每个连接发送的,因此,如果客户端打开多个到RpcEnv的连接,将为该客户端创建多个连接/断开连接事件(尽管具有不同的“RpcAddress”信息)。
private[netty] class NettyRpcHandler(...}
    .......
}

//通过工厂创建NettyRpcEnv
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
  def create(config: RpcEnvConfig): RpcEnv = {
    ......
  }    
}

4、TransportServer

public class TransportServer implements Closeable {

  //Netty服务端的引导,用于创建ServerChannel
  private final List<TransportServerBootstrap> bootstraps;
  private ServerBootstrap bootstrap;

  //构造器
  public TransportServer(...){
    ...
    init(hostToBind, portToBind);
    ...
  }

  //初始化该TransportServer 
  private void init(String hostToBind, int portToBind) {
    //IO模式 NIO 、 EPOLL 模式是NIO 
    //如果是Linux操作系统可以在spark-defaults.conf中设置spark.rpc.io.mode = EPOLL 来实现
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    //基于Netty中Reactor模型的启动
    EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
      conf.getModuleName() + "-boss");
    EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
      conf.getModuleName() + "-server");

    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, pooledAllocator)
      .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
      .childOption(ChannelOption.ALLOCATOR, pooledAllocator);
    
    //绑定端口启动一个Netty服务端
    channelFuture = bootstrap.bind(address);

  }

}

5、TransportClient

//客户端,用于获取预先协商的流的连续块。此API旨在实现大量数据的高效传输,这些数据被分解为大小从数百KB到几MB不等的块。
//请注意,虽然此客户端处理从流(即数据平面)中提取块,但流的实际设置是在传输层范围之外完成的。提供方便的方法“sendRPC”来实现客户端和服务器之间的控制平面通信,以执行此设置。
//例如:典型的工作流程是这样的:
//client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
//client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
//client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
//......
//client.sendRPC(new CloseStream(100))

//使用TransportClientFactory构造TransportClient的实例。单个TransportClient可用于多个流,但任何给定的流都必须限制在单个客户端,以避免乱序响应。
//注意:此类用于向服务器发出请求,而 TransportResponseHandler 负责处理来自服务器的响应。
//并发:线程安全,可以从多个线程调用。
public class TransportClient implements Closeable {

  //构造的时候给一个 Channel 就可以和Server端进行通信了
  //给一个TransportResponseHandler  就可以处理来自服务器的响应了
  public TransportClient(Channel channel, TransportResponseHandler handler) {
    this.channel = Preconditions.checkNotNull(channel);
    this.handler = Preconditions.checkNotNull(handler);
    this.timedOut = false;
  }

  public SocketAddress getSocketAddress() {
    return channel.remoteAddress();
  }

  //从远程端请求一个块,来自预先协商的streamId。
  //块指数从0开始。多次请求同一块是有效的,尽管某些流可能不支持此操作。
  //多个fetchChunk请求可能同时未完成,并且假设只使用单个TransportClient来获取块,则块保证会按照请求的顺序返回。
  public void fetchChunk(...){}

  //请求从远程端以给定的流ID流式传输数据。
  public void stream(String streamId, StreamCallback callback) {}

  //向服务器端的RpcHandler发送不透明消息。回调将随服务器的响应或在任何故障时调用。
  public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {}

  ......

}

6、Dispatcher

//消息调度器,负责将RPC消息路由到适当的端点。
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
  //端点和消息的映射
  private val endpoints: ConcurrentMap[String, MessageLoop] =
    new ConcurrentHashMap[String, MessageLoop]
  //端点和引用的映射
  private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
    new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]

  private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)

  //使用名字注册RpcEndpoint 放回对应的引用 
  //就是在endpoints加一条记录 (name -> RpcEndpoint)
  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    ......
  }

  //获取对应的引用
  def getRpcEndpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointRefs.get(endpoint)

  //向所有已注册的[[RpcEndpoint]]发送消息。这可用于使所有端点都知道网络事件(例如“新增了节点”)。
  def postToAll(message: InboxMessage): Unit = {
    ......
  }

  //向远程端点发送消息
  def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
    ......
  }

  //向本地端点发送消息
  def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
    ......
  }

  //发送一个单向消息
  def postOneWayMessage(message: RequestMessage): Unit = {}

  //向特定端点发送消息
  private def postMessage(){}

}

7、MessageLoop

//Dispatcher 用于向端点传递消息的消息循环。
private sealed abstract class MessageLoop(dispatcher: Dispatcher) extends Logging {
  //待消息循环处理的带有待处理消息的收件箱列表。
  private val active = new LinkedBlockingQueue[Inbox]()

  //消息循环任务;应该在消息循环池的所有线程中运行。
  protected val receiveLoopRunnable = new Runnable() {
    override def run(): Unit = receiveLoop()
  }

 //不停的处理inbox中的信息,
 private def receiveLoop(): Unit = {
      while (true) {
          val inbox = active.take()
          if (inbox == MessageLoop.PoisonPill) {
            // Put PoisonPill back so that other threads can see it.
            setActive(MessageLoop.PoisonPill)
            return
          }
          //处理inbox存储的消息
          inbox.process(dispatcher)
      }
  }

}

//使用共享线程池为多个RPC端点提供服务的消息循环。
private class DedicatedMessageLoop(...){}

8、Inbox

//一个收件箱,用于存储 RpcEndpoint 的消息,并安全地向其线程发布消息。
private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
  extends Logging {

  inbox => 给它一个别名,这样我们就可以在闭包中更清楚地使用它。
 
  protected val messages = new java.util.LinkedList[InboxMessage]()

  //OnStart是要处理的第一条消息
  inbox.synchronized {
    messages.add(OnStart)
  }

  //处理Inbox中的消息
  def process(dispatcher: Dispatcher): Unit = {
    message = messages.poll()
    while (true) {
      safelyCall(endpoint) {
        message match {
          case RpcMessage(_sender, content, context) =>
            endpoint.receiveAndReply
          case OneWayMessage(_sender, content) =>
            endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
          case OnStart =>
            endpoint.onStart() //调用端点的Onstart() Master 、 Worker 的Onstart() 就是这样起来的
          case OnStop =>
            dispatcher.removeRpcEndpointRef(endpoint)
            endpoint.onStop()
          case RemoteProcessConnected(remoteAddress) =>
            endpoint.onConnected(remoteAddress)

          case RemoteProcessDisconnected(remoteAddress) =>
            endpoint.onDisconnected(remoteAddress)

          case RemoteProcessConnectionError(cause, remoteAddress) =>
            endpoint.onNetworkError(cause, remoteAddress)
        }
      }
    }
  }

}

9、OutBox

private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {

  outbox =>  //给它一个别名,这样我们就可以在闭包中更清楚地使用它。

  private val messages = new java.util.LinkedList[OutboxMessage]

  //connectFuture指向连接任务。如果没有连接任务,connectFuture将为空
  private var connectFuture: java.util.concurrent.Future[Unit] = null

  //发送消息。如果没有活动连接,请缓存它并启动新连接
  def send(message: OutboxMessage): Unit = {}
  
  //清空消息队列
  private def drainOutbox(): Unit = {}

  //异步启动要给连接任务
  private def launchConnectTask(): Unit = {}

}

本文转载自: https://blog.csdn.net/lu070828/article/details/141249175
版权归原作者 隔着天花板看星星 所有, 如有侵权,请联系我们删除。

“Spark-环境启动”的评论:

还没有评论