0


Mit6.824 lab1全解析(推导历程+代码)

0.前言

mit 6.824分布式系统
课程主页

lab1是第一次作业,本菜鸡用了好几天独立完成,经过一次改版优化了数据结构和解决任务元数据并发环境下的data race问题,建议大家做之前有自己独立的思考,有很多可行方案都能完成任务。比如看到有的小伙伴采用master(coordinator)轮询slave(worker)进行交互,我是用slave定时发送请求触发master懒执行大部分任务(后面会聊到原因)。也有的小伙伴用队列增删加锁实现并发安全,本人用的golang自带的channel作为任务队列。不得不感叹人家本科生就有机会学这么有意思的课程,听说lab2更酸爽,后面会接着去冲塔。总之,集中一段时间做这个lab1挺有趣的。

我的代码在这里 :送餐员小李Gitee

1.实验准备

  • MapReduce论文阅读
  • Golang入门:需要掌握 基本语法/IDE配置/数组/面向对象/管道协程,小案例都实现一遍即可,十几个小时够了Golang核心-韩顺平
  • 阅读 lab1 note lab1 看下mrsequential.go的流程,全程一个worker干到底无coordinator,看下map,reduce函数的加载和数据的流向

2 goland配置调试环境

  • windows

windows不支持动态插件编译,硬塞一个linux编译好的so文件也不行,当然可以通过改代码的形式把map reduce 的wordcount函数硬编码进main/worker.go,但不建议这样做后面会很麻烦。

buildmode=plugin not supported on windows/amd64

VMware搞个Ubuntu,然后golang,goland配置好

Ubuntu 16 镜像下载
VMware
VMware如何安装Ubuntu

  • linux

GOPATH选择git clone下来的文件夹6.824

GOROOT是golang的安装位置

wc.so运行前每次都要重新build,因此在写个build-wc.sh脚本放在main路径下面 configuration取名"build wc"

也可以在脚本加一个去掉"mr-"开头文件的句子,把后面频繁调试mr程序生成的结果文件去掉,省的每次运行前都去手删

go build -buildmode=plugin ../mrapps/wc.go
# rm -f mr-*

在这里插入图片描述

mrsequential.go 最下面的Before launch点击加号Run Another Configuration添加"build wc",这样每次运行之前都build一下wc.so

在这里插入图片描述

  • 至此mrsequential.go 应该可以在goland运行起来了

在这里插入图片描述

同理mrworker.go也需要这样配置

在这里插入图片描述

mrcoordinator.go 需要将所有mian下面的txt文本加入进去

pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt

2.任务梳理

lab1 note 里面的内容,内容梳理,一开始比较模糊也没关系,做到哪里回看就行

2.1 描述

实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker请求任务,进行运算,写出结果到文件。coordinator需要关心worker的任务是否完成,在超时情况下将任务重新分配给别的worker。

2.2 规则

  • map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()
  • worker需要将第X个reduce task结果放到mr-out-X中。
  • mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了
  • main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。
  • 任务都完成后,worker也得关闭

2.3 提示

  • 一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。
  • map reduce函数都是通过go插件装载 (.so文件)
  • mr/ 文件变了就需要重新build
  • 都在一个文件系统,worker天然实现文件共享,先凑合着起步
  • 中间文件命名 mr-X-Y X是map任务号,y是reduce任务号
  • worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。
  • worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号
  • mrsequential.go 代码可以借鉴
  • coordinator里面的共享数据需要加锁
  • worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下
  • 如果任务重试机制,记得不要生成重复任务
  • mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。
  • 确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名

3.开始做任务

每一步提示思路,是我的心路历程,应该符合大多数人的逻辑

3.1 小目标:coordinator分配好任务worker取到后打印出来

首先目标锁定在coordinator和worker的互动,其他的都不需要考虑,worker打印出来自己要做任务的文件名

  • mrsequential.go 里面逻辑看懂
  • 看懂coordinator和worker的rpc交互流程call 函数通过1234端口传入args和reply的内存地址,调用rpcname(Coordinator.函数名),通过反射机制"远程"调用Coordinator的该函数,Coordinator通过内存地址读取入参写出结果。worker.go里面的Worker方法调用CallExample,先运行Coordinator,再运行worker,看看worker端打印返回来的经过Coordinator加工过的数字
  • 至此worker和coordinator可以互动了
  • 发放给worker任务的结构体Job类型(枚举常量):JobType,以便worker知道这是Map任务还是Reduce任务。InputFile 文件名数组 :map情况数组里面就一个分配给worker的文件(Hadoop里面大文件是需要切块的,但是这里面的文件都很小就不切了直接一个文件给一个worker),reduce情况下是worker需要选取的一些需要聚合到一起的中间文件JobId/ReducerNum worker需要知道这些以便生成中间结果文件"mr-tmp-x-y" x是jobid,y是经过hash后的reduce id, y用来标识哪些文件汇入同一个reduce
type Job struct{
   JobType    JobType
   InputFile  []string
   JobId      int
   ReducerNum int//TmpFileList []string}
  • 任务存放

放channel里面就行,省得自己写队列的各种方法,还能天然并发安全

channel 是golang特有的类型化消息的队列,可以通过它们发送类型化的数据在协程之间通信,可以避开所有内存共享导致的坑;通道的通信方式保证了同步性。数据通过通道:同一时间只有一个协程可以访问数据:所以不会出现数据竞争,设计如此。数据的归属(可以读写数据的能力)被传递。

因此Coordinator定义了两个channel来存放做好的map和reduce,jobMetaHolder(元数据管理相关)和CoordinatorCondition(coordinator状态),暂时不用管可以注释掉

type Coordinator struct{// channel which hold uncompleted task
   JobChannelMap        chan*Job
   JobChannelReduce     chan*Job
   ReducerNum           int
   MapNum               int
   CoordinatorCondition Condition
   uniqueJobId          int
   jobMetaHolder        JobMetaHolder
}
  • map任务制作 Coordinator制作map任务,在一开始程序运行的时候就执行
func(c *Coordinator)makeMapJobs(files []string){for_, v :=range files {
      id := c.generateJobId()//fmt.Println("making map job :", id)
      job := Job{
          JobType:    MapJob,
          InputFile:[]string{v},
          JobId:      id,
          ReducerNum: c.ReducerNum,}//这下面暂时不需要
      jobMetaINfo := JobMetaInfo{
          condition: JobWaiting,
          JobPtr:&job,}
      c.jobMetaHolder.putJob(&jobMetaINfo)
      fmt.Println("making map job :",&job)
      c.JobChannelMap <-&job
  }//这上面暂时不需要
  fmt.Println("done making map jobs")
  c.jobMetaHolder.checkJobDone()}

照着把那个样例rpc交互函数写个distribute方法,把的coordinate端和worker端入参数据类型分别改下,加入这句话即可将JobChannelMap里面的一个job给reply

*reply =*<-c.JobChannelMap

worker端对reply取InputFile第一个元素,打印结果如下,worker线程取到任务了

worker get job which is pg-being_ernest.txt
  • 3.1小目标完成

3.2 worker通过传过来的文件名做map任务,写出结果

这个步骤简单粗暴,照着mrsequential.go里面写一下,记得用ihash处理下key分成Nreduce份用json编码后写出到"mr-tmp-x-y"文件。注意mr论文这步是有排序的,因为真正生产活动数据量是非常巨大的,map端提前排序好后,reduce的排序压力会减小很多。这里排不排序无所谓。

  • 3.2小目标完成

3.3 coordinator感知各个job任务运行完毕和map转reduce时机

  • 任务元数据管理从coordinator视角看任务分发> 制作任务 -> 放入队列 -> worker来取如果worker维护任务的状态显然不合理,每个任务的运行开始时间,任务状态。这些内容worker没必要知晓,是coordinator用来判断任务超时,或者map转reduce的。因此用JobMetaInfo把Job + Job状态包装下> 制作任务 -> 放入JobMetaInfo(元数据) -> 放入队列 -> worker来取

type JobMetaInfo struct {
condition JobCondition
StartTime time.Time
JobPtr *Job
}


 condition job状态: 包含等待,运行,完成
 StartTime : 开始运行的时间(等待变为运行)
 JobPtr  : job内存地址(golang是值传递,用地址更高效)

所有任务用map存储在JobMetaHolder,key是job的唯一id
```go
type JobMetaHolder struct {
  MetaMap map[int]*JobMetaInfo
}

针对这个JobMetaHolder的操作都可以安排上了,比如放入任务,任务状态更新,检查同一阶段任务是否完成

放入任务函数 putJob

func(j *JobMetaHolder)putJob(JobInfo *JobMetaInfo)bool{
  jobId := JobInfo.JobPtr.JobId
  meta,_:= j.MetaMap[jobId]if meta !=nil{
      fmt.Println("meta contains job which id = ", jobId)returnfalse}else{
      j.MetaMap[jobId]= JobInfo
  }returntrue}

任务发射方法,当channel给出任务后,元数据管理器对任务元数据进行状态变更和运行开始时间记录(后面超时任务有用)。和Kafka的InflightMessage有点像。

func(j *JobMetaHolder)fireTheJob(jobId int)bool{
    ok, jobInfo := j.getJobMetaInfo(jobId)if!ok || jobInfo.condition != JobWaiting {returnfalse}
    jobInfo.condition = JobWorking
    jobInfo.StartTime = time.Now()returntrue}

检查当前阶段任务是否完成。因为每次制作jobs后,实在加锁情况下一股脑更新到元数据的,因此这边通过遍历先检查reduce完成未完成数量再检查map就能判断两种情况下的
完成情况。每次也会print 任务的数量信息方便调试

func(j *JobMetaHolder)checkJobDone()bool{
  reduceDoneNum :=0
  reduceUndoneNum :=0
  mapDoneNum :=0
  mapUndoneNum :=0for_, v :=range j.MetaMap {if v.JobPtr.JobType == MapJob {if v.condition == JobDone {
              mapDoneNum +=1}else{
              mapUndoneNum++}}else{if v.condition == JobDone {
              reduceDoneNum++}else{
              reduceUndoneNum++}}}
  fmt.Printf("%d/%d map jobs are done, %d/%d reduce job are done\n",
      mapDoneNum, mapDoneNum+mapUndoneNum, reduceDoneNum, reduceDoneNum+reduceUndoneNum)return(reduceDoneNum >0&& reduceUndoneNum ==0)||(mapDoneNum >0&& mapUndoneNum ==0)}
  • coordinator的状态转换 每次当channel的长度为0的时候,去checkJobDone检查一下,当这个阶段所有任务都完成以后,进行状态转换并作出相应操作nextPhase()
func(c *Coordinator)nextPhase(){if c.CoordinatorCondition == MapPhase {
        c.makeReduceJobs()
        c.CoordinatorCondition = ReducePhase
    }elseif c.CoordinatorCondition == ReducePhase {
        c.CoordinatorCondition = AllDone
    }}
  • 3.3小目标完成

3.4 coordinator 的任务分配函数

  • 任务分配方法 任务分配方法是coordinator最核心的函数,worker每次来询问都会调用这个方法
func(c *Coordinator)DistributeJob(args *ExampleArgs, reply *Job)error{
    mu.Lock()defer mu.Unlock()
    fmt.Println("coordinator get a request from worker :")if c.CoordinatorCondition == MapPhase {iflen(c.JobChannelMap)>0{*reply =*<-c.JobChannelMap
            if!c.jobMetaHolder.fireTheJob(reply.JobId){
                fmt.Printf("[duplicated job id]job %d is running\n", reply.JobId)}}else{
            reply.JobType = WaittingJob
            if c.jobMetaHolder.checkJobDone(){
                c.nextPhase()}returnnil}}elseif c.CoordinatorCondition == ReducePhase {iflen(c.JobChannelReduce)>0{*reply =*<-c.JobChannelReduce
            if!c.jobMetaHolder.fireTheJob(reply.JobId){
                fmt.Printf("job %d is running\n", reply.JobId)}}else{
            reply.JobType = WaittingJob
            if c.jobMetaHolder.checkJobDone(){
                c.nextPhase()}returnnil}}else{
        reply.JobType = KillJob
    }returnnil}
  • worker完成任务调用的操作 这里使用了判断元数据的办法看看任务是否会重复完成,在后面的崩溃测试下,worker会失效,coordinator需要重新发放Jobg给另一个worker,加入刚才那个失效的worker恢复了然后写入回来,不能覆盖已经完成的数据(甚至都下一个阶段了)。func(c *Coordinator)JobIsDone(args *Job, reply *ExampleReply)error{ mu.Lock()defer mu.Unlock()switch args.JobType {case MapJob: ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)//prevent a duplicated work which returned from another workerif ok && meta.condition == JobWorking { meta.condition = JobDone fmt.Printf("Map task on %d complete\n", args.JobId)}else{ fmt.Println("[duplicated] job done", args.JobId)}breakcase ReduceJob: fmt.Printf("Reduce task on %d complete\n", args.JobId) ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)//prevent a duplicated work which returned from another workerif ok && meta.condition == JobWorking { meta.condition = JobDone }else{ fmt.Println("[duplicated] job done", args.JobId)}breakdefault:panic("wrong job done")}returnnil

``

3.5 加锁

由于我用了channel所以在任务分配队列实现了天然并发安全,但是在别的地方还是遇到了问题,比如Done函数通过mrcoordinator主线程去时不时读取coordinator的状态来判断是否结束死循环。还有在一个worker调coordinator拉取数据的时候,另一个worker调coordinator的checkJobDone()函数进行检查。因此在响应可能发生冲突的地方加锁。

3.6 crash test

这里我是又起了一个线程去检查JobMetaHolder里面超时的任务,具体可以参考代码
但是无法通过test-mr.sh

去看了crash.go和test-mr.sh里面的逻辑,发现起的worker线程太少了,crash.go里面的maybecrash方法很有可能瘫痪这个worker,只起一个甚至三个都会导致最后没有可用worker,即使将任务重新放到channel里面

因此我修改了test-mr.sh,起了更多的worker,通过了测试。


本文转载自: https://blog.csdn.net/ligen1112/article/details/120931874
版权归原作者 ligen1112 所有, 如有侵权,请联系我们删除。

“Mit6.824 lab1全解析(推导历程+代码)”的评论:

还没有评论