0


Apache Uniffle 学习 —— ShuffleManagerGrpcService

ShuffleManagerGrpcService 服务的开启条件

Spark Driver 当配置了 Uniffle 的 RssShuffleManager 客户端后,会在创建 RssShuffleManager 过程中,检查当满足以下

一个

条件时,会开启 ShuffleManagerGrpcService 这个 grpc 服务。

  • rss.stageRetry.enabled=true,开启了总的 stage retry 开关,当 write、fetch 失败时,都会进行 stageRetry
  • rss.stageRetry.writeFailureEnabled=true,开启了 write 失败时的 stage retry
  • rss.stageRetry.fetchFailureEnabled=true,开启了 fetch 失败时的 stage retry
  • rss.client.reassign.enabled=true,开启了 Uniffle 的 partition to server 的 reassign
  • rss.blockId.selfManagementEnabled=true,开启了 blockId 在 spark driver 中自管理的功能。

分区 shuffle server 重分配(shuffle reassign)

开启功能,spark driver registerShuffle 时,分配结果会被封装为一个 MutableShuffleHandleInfo,返回给 driver,最终广播给 executor。当 executor 发生 sendFailure 时,会向 ShuffleManagerGrpcService 发送 reassignOnBlockSendFailure 请求,进行重分配,并更新分配结果。从而避免在发送数据时,导致整个 stage 乃至 job 失败。

注:目前该功能与两副本功能不可同时使用。

spark stage 重试(stageRetry(resubmit))

开启功能,隐含着 reassign 功能自动开启。spark driver registerShuffle 时,分配结果会被封装为一个 StageAttemptShuffleHandleInfo,用来记录 stage retry 时,前一个 stage 的 shuffle server 列表。 将这个 StageAttemptShuffleHandleInfo 返回给 driver,最终广播给 executor。当 executor 发生 fetchFailure\WriteFailure 时,都会向 ShuffleManagerGrpcService 发送请求,进行 stage retry 和 shuffle 分区-> shuffleServer 重分配。

块 ID 自管理(BlockId Self Management)

开启该功能,在 Spark Driver 里会使用 Uniffle 实现的 BlockIdManager,进行自行块 ID 分配和管理,这会减少 Shuffle Server 的内存消耗,但是会增加 Driver 的内存消耗。

ShuffleManagerGrpcService 服务端口的设定和传递

ShuffleManagerGrpcService

服务的端口不由人为设定,是在 Spark Driver 以 0 号端口启动

ShuffleManagerGrpcService

服务,即随机使用可用端口,当服务启动成功后,将随机分配使用的端口号设置到 spark conf 的

rss.shuffle.manager.grpc.port

配置项中。
Spark Executor 启动的 Uniffle RssShuffleManager 客户端可以通过

driver.host

rss.shuffle.manager.grpc.port

这个配置项找到

ShuffleManagerGrpcService

host 和 port,并与之通信。

ShuffleManagerGrpcService 的 proto 定义

ShuffleManager service 是启动在计算引擎的 Application Master 中,为这个 Application 处理 RSS 特定的逻辑。

service ShuffleManager {
  // 当 Fetch 数据失败时,汇报 fetch 失败,由 ShuffleManagerGrpcService 决定是否要进行 stageRetry
  rpc reportShuffleFetchFailure (ReportShuffleFetchFailureRequest) returns (ReportShuffleFetchFailureResponse);
  // 当发生 stage retry 时,获取特定 shuffleId 的,由 RssShuffleManager 自己管理的 partition 到 ShuffleServer 的映射信息。
  rpc getPartitionToShufflerServerWithStageRetry(PartitionToShuffleServerRequest) returns (ReassignOnStageRetryResponse);
  // 当发生 Block retry 时,获取特定 shuffleId 的,由 RssShuffleManager 自己管理的 partition 到 ShuffleServer 的映射信息。
  rpc getPartitionToShufflerServerWithBlockRetry(PartitionToShuffleServerRequest) returns (ReassignOnBlockSendFailureResponse);
  // 当写入 shuffle 数据失败时,汇报 write 失败,由 ShuffleManagerGrpcService 决定是否要进行 stageRetry
  rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns (ReportShuffleWriteFailureResponse);
  // 当发生 stageResubmit(stage retry) 时,由 ShuffleManagerGrpcService 请求 coordinator,进行分区到 shuffleServer 的重分配(reassign)。
  rpc reassignOnStageResubmit(ReassignServersRequest) returns (ReassignServersResponse);
  // 当发生 block 发送错误时,由 ShuffleManagerGrpcService 请求 coordinator,进行分区到 shuffleServer 的重分配(reassign)。
  rpc reassignOnBlockSendFailure(RssReassignOnBlockSendFailureRequest) returns (ReassignOnBlockSendFailureResponse);
  
  // BlockIdSelfManagedShuffleWriteClient 
  // 
  // 报告 shuffle 结果
  rpc reportShuffleResult (ReportShuffleResultRequest) returns (ReportShuffleResultResponse);
  // 获取 shuffle 结果
  rpc getShuffleResult (GetShuffleResultRequest) returns (GetShuffleResultResponse);
  // 获取多个部分的 shuffle 结果
  rpc getShuffleResultForMultiPart (GetShuffleResultForMultiPartRequest) returns (GetShuffleResultForMultiPartResponse);
}
标签: apache 学习 spark

本文转载自: https://blog.csdn.net/maobaolong/article/details/141123640
版权归原作者 maobaolong 所有, 如有侵权,请联系我们删除。

“Apache Uniffle 学习 —— ShuffleManagerGrpcService”的评论:

还没有评论