0


Spark-环境启动

一、概览

从start-all.sh开始捋,一直捋到Master、Worker的启动并建立通信

二、宏观描述

Master端

1、start-all.sh调用start-master.sh启动Master

2、执行org.apache.spark.deploy.master.Master中main方法

3、通过工厂模式创建RpcEnv子类NettyRpcEnv

  1. a、创建TransportServer
  2. b、初始化TransportServer(通过ServerBootstrap去引导Netty并并绑定端口)
  3. cDispatcherMaster作为一个RpcEndpoint注册到RpcEnv中并返回RpcEndpointRef
  4. d、创建一个端点传递消息的MessageLoop,并将这个MessageLoopMaster绑定
  5. eNew一个带有待处理消息的收件箱(Inbox)同时也和Master绑定
  6. fInbox中有一个代码块会把OnStart方法添加到消息列表
  7. g、调用setActive(inbox)将其标记为活动以处理OnStart消息

4、Master中的OnStart的方法被调起

5、处理来自Workers的注册信息RegisterWorker,向Workers发送注册成功信息RegisterWorkerResponse

5、检查并删除超时的Worker

Worker端

1、start-all.sh调用start-workers.sh脚本

2、start-workers.sh去workers文件中查询节点ip或host依次ssh过去启动start-worker.sh脚本

3、各个节点执行org.apache.spark.deploy.worker.Worker中main方法

4、通过工厂模式创建RpcEnv子类NettyRpcEnv

  1. a、创建TransportServer
  2. b、初始化TransportServer(通过ServerBootstrap去引导Netty并并绑定端口)
  3. cDispatcherMaster作为一个RpcEndpoint注册到RpcEnv中并返回RpcEndpointRef
  4. d、创建一个端点传递消息的MessageLoop,并将这个MessageLoopMaster绑定
  5. eNew一个带有待处理消息的收件箱(Inbox)同时也和Master绑定
  6. fInbox中有一个代码块会把OnStart方法添加到消息列表
  7. g、调用setActive(inbox)将其标记为活动以处理OnStart消息

5、Worker中的OnStart的方法被调起

6、根据配置看是否启动外部ShuffleService

7、获取资源信息

8、根据Master的URL从RpcEnv中获取MasterEndpointRef

9、向所有的Master(可能会是HA的场景)发送注册信息RegisterWorker

10、接收到来自Master的RegisterWorkerResponse信息

11、向Master发送心跳Heartbeat

三、源码调用绘图

把图下载下来,放大可以看清楚

四、核心源码

脚本部分

1、start-all.sh

  1. #启动所有spark守护进程
  2. #在此节点上启动Master
  3. #在conf/workers中指定的每个节点上启动一个worker
  4. # Start Master
  5. "${SPARK_HOME}/sbin"/start-master.sh
  6. # Start Workers
  7. "${SPARK_HOME}/sbin"/start-workers.sh

2、start-master.sh

  1. CLASS="org.apache.spark.deploy.master.Master"
  2. #默认端口
  3. SPARK_MASTER_PORT=7077
  4. SPARK_MASTER_WEBUI_PORT=8080
  5. #获取Master节点的域名
  6. $SPARK_MASTER_HOST
  7. #启动Master守护进程
  8. "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
  9. --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
  10. $ORIGINAL_ARGS

3、start-workers.sh

  1. #Master节点默认端口
  2. SPARK_MASTER_PORT=7077
  3. #获取Master节点的域名
  4. $SPARK_MASTER_HOST
  5. #启动所有的worker
  6. "${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"

4、start-worker.sh

  1. CLASS="org.apache.spark.deploy.worker.Worker"
  2. #启动几个Worker实例
  3. $SPARK_WORKER_INSTANCES
  4. #第一个worker的端口号。如果设置,后续的workers将递增此数字。如果未设置,Spark将找到一个有效的端口号,但不能保证模式是可预测的。
  5. $SPARK_WORKER_PORT
  6. #第一个worker的web界面的基本端口。后续的workers将增加这个数字。默认值为8081。
  7. $SPARK_WORKER_WEBUI_PORT
  8. #启动worker
  9. "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
  10. --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"

scala代码部分

1、Master

  1. //Master 是一个多线程安全的RpcEndpoint
  2. private[deploy] class Master(
  3. override val rpcEnv: RpcEnv,
  4. address: RpcAddress,
  5. webUiPort: Int,
  6. val securityMgr: SecurityManager,
  7. val conf: SparkConf)
  8. extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
  9. private val forwardMessageThread =
  10. ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
  11. //Master端有所有worker的信息 并且有它们的 Rpc通信地址
  12. val workers = new HashSet[WorkerInfo]
  13. private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
  14. override def onStart(): Unit = {
  15. logInfo("Starting Spark master at " + masterUrl)
  16. logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
  17. webUi = new MasterWebUI(this, webUiPort)
  18. webUi.bind()
  19. masterWebUiUrl = webUi.webUrl
  20. //检测所有Worker的超时任务
  21. checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
  22. () => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
  23. 0, workerTimeoutMs, TimeUnit.MILLISECONDS)
  24. }
  25. override def onStop(): Unit = {...}
  26. //HA下的Leader选举
  27. override def electedLeader(): Unit = {
  28. self.send(ElectedLeader)
  29. }
  30. //接收其他 Endpoint 的 send 消息
  31. override def receive: PartialFunction[Any, Unit] = {
  32. case ElectedLeader => Leader选举
  33. case RegisterWorker(...)=>
  34. worker 此时会携带它的资源信息 xx xx 内存
  35. master 会添加这个 worker 并向这个 worker send RegisterWorkerResponse 消息
  36. case RegisterApplication(description, driver) => derver端的应用注册
  37. case Heartbeat(workerId, worker) => 接收worker端的心跳
  38. case CheckForWorkerTimeOut => 检测worker的心跳
  39. ......
  40. }
  41. //接收其他 Endpoint 的 ask 消息
  42. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  43. case RequestSubmitDriver(description) => 提交 driver
  44. case RequestKillDriver(driverId) => 杀掉 driver
  45. case RequestDriverStatus(driverId) => driver 状态
  46. case RequestMasterState => master 状态
  47. case RequestExecutors(appId, requestedTotal) => 启动 executors
  48. case KillExecutors(appId, executorIds) => 杀掉 executors
  49. ......
  50. }
  51. //在workers上启动executors
  52. //返回一个数组,其中包含分配给每个worker的核心数。
  53. //有两种启动executors 的模式。
  54. //第一种方法试图将应用程序的executors 分散到尽可能多的worker进程上, 更适合数据局部性,是默认设置
  55. //第二种方法则相反(即在尽可能少的worker进程中启动它们)。
  56. //分配给每个executor的核心数量是可配置的。当显式设置此选项时,如果工作进程有足够的内核和内存,则可以在同一worker进程上启动来自同一应用程序的多个executor。否则,
  57. //默认情况下,每个executor都会抓取worker上可用的所有内核,在这种情况下,在一次单独的计划迭代中,每个应用程序只能在每个worker上启动一个executor。
  58. //请注意,当未设置“spark.executor.cores”时,我们仍然可以在同一个worker上从同一个应用程序启动多个executor。
  59. //假设appA和appB都有一个executor在worker1上运行,并且appA.cores>0,则appB完成并释放worker1上的所有内核,因此对于下一个计划迭代,appA启动一个新的executor,抓取worker1上所有空闲的内核,因此我们从运行在worker1的appA中获得多个executor。
  60. private def scheduleExecutorsOnWorkers(...){
  61. ......
  62. }
  63. //在workers上启动executors
  64. private def startExecutorsOnWorkers(): Unit = {
  65. .......
  66. }
  67. //分配worker上的资源给 1个 或多个 executor
  68. private def allocateWorkerResourceToExecutors(){
  69. .......
  70. }
  71. //在等待的应用程序之间安排当前可用的资源。每当有新应用加入或资源可用性发生变化时,都会调用此方法。
  72. private def schedule(): Unit = {
  73. .......
  74. }
  75. ......
  76. }
  77. private[deploy] object Master extends Logging {
  78. val SYSTEM_NAME = "sparkMaster"
  79. val ENDPOINT_NAME = "Master"
  80. def main(argStrings: Array[String]): Unit = {
  81. val args = new MasterArguments(argStrings, conf)
  82. //启动Master 并返回一个 tuple 3
  83. val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
  84. rpcEnv.awaitTermination()
  85. }
  86. def startRpcEnvAndEndpoint(...){
  87. //启动一个RpcEnv
  88. val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
  89. //将Master Endpoint 注册到 RpcEnv 中
  90. val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
  91. new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
  92. val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
  93. (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  94. }
  95. }

2、Worker

  1. //Worker 是一个多线程安全的RpcEndpoint
  2. private[deploy] class Worker(...){
  3. //Worker 端也有 Master 端的RpcEndpointRef
  4. private var master: Option[RpcEndpointRef] = None
  5. private var activeMasterUrl: String = ""
  6. //该Worker上有哪些 app driver executor
  7. val drivers = new HashMap[String, DriverRunner]
  8. val executors = new HashMap[String, ExecutorRunner]
  9. val appDirectories = new HashMap[String, Seq[String]]
  10. override def onStart(): Unit = {
  11. //根据配置启动外部ShuffleService
  12. startExternalShuffleService()
  13. //设置资源
  14. setupWorkerResources()
  15. //向所有Master注册自己
  16. registerWithMaster()
  17. ......
  18. }
  19. //使用个新的Master
  20. private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String,
  21. masterAddress: RpcAddress): Unit = {
  22. ......
  23. }
  24. //向所有的Master注册
  25. private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  26. ......
  27. }
  28. //向Master发送注册消息
  29. private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
  30. ......
  31. }
  32. //处理Master发送的注册响应
  33. private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
  34. msg match {
  35. case RegisteredWorker(...) => 注册成功
  36. case RegisterWorkerFailed(message) => 注册失败
  37. case MasterInStandby => Master 还没准备好
  38. }
  39. }
  40. //处理来自其他 Endpoint 的 send 消息
  41. override def receive: PartialFunction[Any, Unit] = synchronized {
  42. .......
  43. }
  44. //处理来自其他 Endpoint 的 ask 消息
  45. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  46. .......
  47. }
  48. //向当前主机发送消息。如果尚未在任何主机上成功注册,则消息将被丢弃。
  49. private def sendToMaster(message: Any): Unit = {
  50. }
  51. ......
  52. }
  53. private[deploy] object Worker extends Logging {
  54. val SYSTEM_NAME = "sparkWorker"
  55. val ENDPOINT_NAME = "Worker"
  56. def main(argStrings: Array[String]): Unit = {
  57. val args = new WorkerArguments(argStrings, conf)
  58. //启动 RpcEnv 并将自己放进去
  59. val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
  60. args.memory, args.masters, args.workDir, conf = conf,
  61. resourceFileOpt = conf.get(SPARK_WORKER_RESOURCE_FILE))
  62. ......
  63. rpcEnv.awaitTermination()
  64. }
  65. def startRpcEnvAndEndpoint(......){
  66. //一个节点可以启动多个worker 默认是1个
  67. val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
  68. val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
  69. val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
  70. rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
  71. masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))
  72. rpcEnv
  73. }
  74. }

3、NettyRpcEnv

  1. //父类是RpcEnv
  2. //RPC环境。[[RpcEndpoint]]需要在[[RpcEnv]]中注册一个名称来接收消息。
  3. //然后[[RpcEnv]]将处理从[[RpcEndpointRef]]或远程节点发送的消息,并将其传递给相应的[[RpcEndpoint]]s。
  4. //对于[[RpcEN]]捕获的未捕获异常,[[Rpcen]]将使用[[RpcCallContext.sendFailure]]将异常发送回发送方,或者在没有此类发送方或“NoterializableException”的情况下记录它们。
  5. //[[RpcEnv]]还提供了一些方法来检索[[RpcEndpointRef]]的给定名称或uri
  6. //它是各种角色通信的基础,是基于Netty的
  7. private[netty] class NettyRpcEnv(...) extends RpcEnv {
  8. //分发器
  9. private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
  10. //基于Netty 其初始化时会通过bootstrap引导启动Netty
  11. @volatile private var server: TransportServer = _
  12. //[[RpcAddress]]和[[发件箱]]的map。当我们连接到远程[[RpcAddress]]时,我们只需将消息放入其[[Outbox]]即可实现非阻塞的“send”方法。
  13. private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
  14. //启动这个NettyRpcEnv
  15. def startServer(bindAddress: String, port: Int): Unit = {
  16. val bootstraps: java.util.List[TransportServerBootstrap] =
  17. if (securityManager.isAuthenticationEnabled()) {
  18. java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
  19. } else {
  20. java.util.Collections.emptyList()
  21. }
  22. //创建一个将尝试绑定到特定主机和端口的Netty服务器
  23. server = transportContext.createServer(bindAddress, port, bootstraps)
  24. //注册一个RpcEndpointVerifier 用于给远程RpcEndpoint查询是否存在对应的RpcEndpoint
  25. dispatcher.registerRpcEndpoint(
  26. RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  27. }
  28. //设置一个RpcEndpoint
  29. override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
  30. dispatcher.registerRpcEndpoint(name, endpoint)
  31. }
  32. //往Outbox中放信息(且指明了收件人)
  33. private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
  34. ......
  35. }
  36. //向远程 RPC endpoint 发送信息 其实就是放到 Outbox中
  37. private[netty] def send(message: RequestMessage): Unit = {
  38. ......
  39. }
  40. //异步发送消息,设置了超时时间,需要等待响应
  41. private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
  42. ......
  43. }
  44. ......
  45. }
  46. //RpcEndpointRef的NettyRpcEnv版本。
  47. //此类的行为因创建位置而异。在“拥有”RpcEndpoint的节点上,它是一个围绕RpcEndpointAddress实例的简单包装器。
  48. //在接收引用序列化版本的其他机器上,行为会发生变化。实例将跟踪发送引用的TransportClient,以便通过客户端连接向端点发送消息,而不需要打开新的连接。
  49. //此引用的RpcAddress可以为空;这意味着ref只能通过客户端连接使用,因为承载端点的进程不监听传入连接。这些引用不应与第三方共享,因为它们将无法向端点发送消息。
  50. private[netty] class NettyRpcEndpointRef(...){
  51. .......
  52. }
  53. //从发送方发送到接收方的消息。
  54. private[netty] class RequestMessage(...){
  55. .......
  56. }
  57. //将传入的RPC分派到已注册的端点。
  58. //处理程序跟踪与其通信的所有客户端实例,以便RpcEnv在向客户端端点(即不监听传入连接,而是需要通过客户端Socket联系的端点)发送RPC时知道要使用哪个“TransportClient”实例。
  59. //事件是按每个连接发送的,因此,如果客户端打开多个到RpcEnv的连接,将为该客户端创建多个连接/断开连接事件(尽管具有不同的“RpcAddress”信息)。
  60. private[netty] class NettyRpcHandler(...}
  61. .......
  62. }
  63. //通过工厂创建NettyRpcEnv
  64. private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
  65. def create(config: RpcEnvConfig): RpcEnv = {
  66. ......
  67. }
  68. }

4、TransportServer

  1. public class TransportServer implements Closeable {
  2. //Netty服务端的引导,用于创建ServerChannel
  3. private final List<TransportServerBootstrap> bootstraps;
  4. private ServerBootstrap bootstrap;
  5. //构造器
  6. public TransportServer(...){
  7. ...
  8. init(hostToBind, portToBind);
  9. ...
  10. }
  11. //初始化该TransportServer
  12. private void init(String hostToBind, int portToBind) {
  13. //IO模式 NIO 、 EPOLL 模式是NIO
  14. //如果是Linux操作系统可以在spark-defaults.conf中设置spark.rpc.io.mode = EPOLL 来实现
  15. IOMode ioMode = IOMode.valueOf(conf.ioMode());
  16. //基于Netty中Reactor模型的启动
  17. EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
  18. conf.getModuleName() + "-boss");
  19. EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
  20. conf.getModuleName() + "-server");
  21. bootstrap = new ServerBootstrap()
  22. .group(bossGroup, workerGroup)
  23. .channel(NettyUtils.getServerChannelClass(ioMode))
  24. .option(ChannelOption.ALLOCATOR, pooledAllocator)
  25. .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
  26. .childOption(ChannelOption.ALLOCATOR, pooledAllocator);
  27. //绑定端口启动一个Netty服务端
  28. channelFuture = bootstrap.bind(address);
  29. }
  30. }

5、TransportClient

  1. //客户端,用于获取预先协商的流的连续块。此API旨在实现大量数据的高效传输,这些数据被分解为大小从数百KB到几MB不等的块。
  2. //请注意,虽然此客户端处理从流(即数据平面)中提取块,但流的实际设置是在传输层范围之外完成的。提供方便的方法“sendRPC”来实现客户端和服务器之间的控制平面通信,以执行此设置。
  3. //例如:典型的工作流程是这样的:
  4. //client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
  5. //client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
  6. //client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
  7. //......
  8. //client.sendRPC(new CloseStream(100))
  9. //使用TransportClientFactory构造TransportClient的实例。单个TransportClient可用于多个流,但任何给定的流都必须限制在单个客户端,以避免乱序响应。
  10. //注意:此类用于向服务器发出请求,而 TransportResponseHandler 负责处理来自服务器的响应。
  11. //并发:线程安全,可以从多个线程调用。
  12. public class TransportClient implements Closeable {
  13. //构造的时候给一个 Channel 就可以和Server端进行通信了
  14. //给一个TransportResponseHandler 就可以处理来自服务器的响应了
  15. public TransportClient(Channel channel, TransportResponseHandler handler) {
  16. this.channel = Preconditions.checkNotNull(channel);
  17. this.handler = Preconditions.checkNotNull(handler);
  18. this.timedOut = false;
  19. }
  20. public SocketAddress getSocketAddress() {
  21. return channel.remoteAddress();
  22. }
  23. //从远程端请求一个块,来自预先协商的streamId。
  24. //块指数从0开始。多次请求同一块是有效的,尽管某些流可能不支持此操作。
  25. //多个fetchChunk请求可能同时未完成,并且假设只使用单个TransportClient来获取块,则块保证会按照请求的顺序返回。
  26. public void fetchChunk(...){}
  27. //请求从远程端以给定的流ID流式传输数据。
  28. public void stream(String streamId, StreamCallback callback) {}
  29. //向服务器端的RpcHandler发送不透明消息。回调将随服务器的响应或在任何故障时调用。
  30. public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {}
  31. ......
  32. }

6、Dispatcher

  1. //消息调度器,负责将RPC消息路由到适当的端点。
  2. private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
  3. //端点和消息的映射
  4. private val endpoints: ConcurrentMap[String, MessageLoop] =
  5. new ConcurrentHashMap[String, MessageLoop]
  6. //端点和引用的映射
  7. private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
  8. new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
  9. private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)
  10. //使用名字注册RpcEndpoint 放回对应的引用
  11. //就是在endpoints加一条记录 (name -> RpcEndpoint)
  12. def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
  13. ......
  14. }
  15. //获取对应的引用
  16. def getRpcEndpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointRefs.get(endpoint)
  17. //向所有已注册的[[RpcEndpoint]]发送消息。这可用于使所有端点都知道网络事件(例如“新增了节点”)。
  18. def postToAll(message: InboxMessage): Unit = {
  19. ......
  20. }
  21. //向远程端点发送消息
  22. def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
  23. ......
  24. }
  25. //向本地端点发送消息
  26. def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
  27. ......
  28. }
  29. //发送一个单向消息
  30. def postOneWayMessage(message: RequestMessage): Unit = {}
  31. //向特定端点发送消息
  32. private def postMessage(){}
  33. }

7、MessageLoop

  1. //Dispatcher 用于向端点传递消息的消息循环。
  2. private sealed abstract class MessageLoop(dispatcher: Dispatcher) extends Logging {
  3. //待消息循环处理的带有待处理消息的收件箱列表。
  4. private val active = new LinkedBlockingQueue[Inbox]()
  5. //消息循环任务;应该在消息循环池的所有线程中运行。
  6. protected val receiveLoopRunnable = new Runnable() {
  7. override def run(): Unit = receiveLoop()
  8. }
  9. //不停的处理inbox中的信息,
  10. private def receiveLoop(): Unit = {
  11. while (true) {
  12. val inbox = active.take()
  13. if (inbox == MessageLoop.PoisonPill) {
  14. // Put PoisonPill back so that other threads can see it.
  15. setActive(MessageLoop.PoisonPill)
  16. return
  17. }
  18. //处理inbox存储的消息
  19. inbox.process(dispatcher)
  20. }
  21. }
  22. }
  23. //使用共享线程池为多个RPC端点提供服务的消息循环。
  24. private class DedicatedMessageLoop(...){}

8、Inbox

  1. //一个收件箱,用于存储 RpcEndpoint 的消息,并安全地向其线程发布消息。
  2. private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
  3. extends Logging {
  4. inbox => 给它一个别名,这样我们就可以在闭包中更清楚地使用它。
  5. protected val messages = new java.util.LinkedList[InboxMessage]()
  6. //OnStart是要处理的第一条消息
  7. inbox.synchronized {
  8. messages.add(OnStart)
  9. }
  10. //处理Inbox中的消息
  11. def process(dispatcher: Dispatcher): Unit = {
  12. message = messages.poll()
  13. while (true) {
  14. safelyCall(endpoint) {
  15. message match {
  16. case RpcMessage(_sender, content, context) =>
  17. endpoint.receiveAndReply
  18. case OneWayMessage(_sender, content) =>
  19. endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
  20. case OnStart =>
  21. endpoint.onStart() //调用端点的Onstart() Master 、 Worker 的Onstart() 就是这样起来的
  22. case OnStop =>
  23. dispatcher.removeRpcEndpointRef(endpoint)
  24. endpoint.onStop()
  25. case RemoteProcessConnected(remoteAddress) =>
  26. endpoint.onConnected(remoteAddress)
  27. case RemoteProcessDisconnected(remoteAddress) =>
  28. endpoint.onDisconnected(remoteAddress)
  29. case RemoteProcessConnectionError(cause, remoteAddress) =>
  30. endpoint.onNetworkError(cause, remoteAddress)
  31. }
  32. }
  33. }
  34. }
  35. }

9、OutBox

  1. private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
  2. outbox => //给它一个别名,这样我们就可以在闭包中更清楚地使用它。
  3. private val messages = new java.util.LinkedList[OutboxMessage]
  4. //connectFuture指向连接任务。如果没有连接任务,connectFuture将为空
  5. private var connectFuture: java.util.concurrent.Future[Unit] = null
  6. //发送消息。如果没有活动连接,请缓存它并启动新连接
  7. def send(message: OutboxMessage): Unit = {}
  8. //清空消息队列
  9. private def drainOutbox(): Unit = {}
  10. //异步启动要给连接任务
  11. private def launchConnectTask(): Unit = {}
  12. }

本文转载自: https://blog.csdn.net/lu070828/article/details/141249175
版权归原作者 隔着天花板看星星 所有, 如有侵权,请联系我们删除。

“Spark-环境启动”的评论:

还没有评论