0.前言
mit 6.824分布式系统
课程主页
lab1是第一次作业,本菜鸡用了好几天独立完成,经过一次改版优化了数据结构和解决任务元数据并发环境下的data race问题,建议大家做之前有自己独立的思考,有很多可行方案都能完成任务。比如看到有的小伙伴采用master(coordinator)轮询slave(worker)进行交互,我是用slave定时发送请求触发master懒执行大部分任务(后面会聊到原因)。也有的小伙伴用队列增删加锁实现并发安全,本人用的golang自带的channel作为任务队列。不得不感叹人家本科生就有机会学这么有意思的课程,听说lab2更酸爽,后面会接着去冲塔。总之,集中一段时间做这个lab1挺有趣的。
我的代码在这里 :送餐员小李Gitee
1.实验准备
- MapReduce论文阅读
- Golang入门:需要掌握 基本语法/IDE配置/数组/面向对象/管道协程,小案例都实现一遍即可,十几个小时够了Golang核心-韩顺平
- 阅读 lab1 note lab1 看下mrsequential.go的流程,全程一个worker干到底无coordinator,看下map,reduce函数的加载和数据的流向
2 goland配置调试环境
- windows
windows不支持动态插件编译,硬塞一个linux编译好的so文件也不行,当然可以通过改代码的形式把map reduce 的wordcount函数硬编码进main/worker.go,但不建议这样做后面会很麻烦。
buildmode=plugin not supported on windows/amd64
VMware搞个Ubuntu,然后golang,goland配置好
Ubuntu 16 镜像下载
VMware
VMware如何安装Ubuntu
- linux
GOPATH选择git clone下来的文件夹6.824
GOROOT是golang的安装位置
wc.so运行前每次都要重新build,因此在写个build-wc.sh脚本放在main路径下面 configuration取名"build wc"
也可以在脚本加一个去掉"mr-"开头文件的句子,把后面频繁调试mr程序生成的结果文件去掉,省的每次运行前都去手删
go build -buildmode=plugin ../mrapps/wc.go
# rm -f mr-*
mrsequential.go 最下面的Before launch点击加号Run Another Configuration添加"build wc",这样每次运行之前都build一下wc.so
- 至此mrsequential.go 应该可以在goland运行起来了
同理mrworker.go也需要这样配置
mrcoordinator.go 需要将所有mian下面的txt文本加入进去
pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt
2.任务梳理
lab1 note 里面的内容,内容梳理,一开始比较模糊也没关系,做到哪里回看就行
2.1 描述
实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker请求任务,进行运算,写出结果到文件。coordinator需要关心worker的任务是否完成,在超时情况下将任务重新分配给别的worker。
2.2 规则
- map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()
- worker需要将第X个reduce task结果放到mr-out-X中。
- mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了
- main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。
- 任务都完成后,worker也得关闭
2.3 提示
- 一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。
- map reduce函数都是通过go插件装载 (.so文件)
- mr/ 文件变了就需要重新build
- 都在一个文件系统,worker天然实现文件共享,先凑合着起步
- 中间文件命名 mr-X-Y X是map任务号,y是reduce任务号
- worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。
- worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号
- mrsequential.go 代码可以借鉴
- coordinator里面的共享数据需要加锁
- worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下
- 如果任务重试机制,记得不要生成重复任务
- mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。
- 确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名
3.开始做任务
每一步提示思路,是我的心路历程,应该符合大多数人的逻辑
3.1 小目标:coordinator分配好任务worker取到后打印出来
首先目标锁定在coordinator和worker的互动,其他的都不需要考虑,worker打印出来自己要做任务的文件名
- mrsequential.go 里面逻辑看懂
- 看懂coordinator和worker的rpc交互流程call 函数通过1234端口传入args和reply的内存地址,调用rpcname(Coordinator.函数名),通过反射机制"远程"调用Coordinator的该函数,Coordinator通过内存地址读取入参写出结果。worker.go里面的Worker方法调用CallExample,先运行Coordinator,再运行worker,看看worker端打印返回来的经过Coordinator加工过的数字
- 至此worker和coordinator可以互动了
- 发放给worker任务的结构体Job类型(枚举常量):JobType,以便worker知道这是Map任务还是Reduce任务。InputFile 文件名数组 :map情况数组里面就一个分配给worker的文件(Hadoop里面大文件是需要切块的,但是这里面的文件都很小就不切了直接一个文件给一个worker),reduce情况下是worker需要选取的一些需要聚合到一起的中间文件JobId/ReducerNum worker需要知道这些以便生成中间结果文件"mr-tmp-x-y" x是jobid,y是经过hash后的reduce id, y用来标识哪些文件汇入同一个reduce
type Job struct{
JobType JobType
InputFile []string
JobId int
ReducerNum int//TmpFileList []string}
- 任务存放
放channel里面就行,省得自己写队列的各种方法,还能天然并发安全
channel 是golang特有的类型化消息的队列,可以通过它们发送类型化的数据在协程之间通信,可以避开所有内存共享导致的坑;通道的通信方式保证了同步性。数据通过通道:同一时间只有一个协程可以访问数据:所以不会出现数据竞争,设计如此。数据的归属(可以读写数据的能力)被传递。
因此Coordinator定义了两个channel来存放做好的map和reduce,jobMetaHolder(元数据管理相关)和CoordinatorCondition(coordinator状态),暂时不用管可以注释掉
type Coordinator struct{// channel which hold uncompleted task
JobChannelMap chan*Job
JobChannelReduce chan*Job
ReducerNum int
MapNum int
CoordinatorCondition Condition
uniqueJobId int
jobMetaHolder JobMetaHolder
}
- map任务制作 Coordinator制作map任务,在一开始程序运行的时候就执行
func(c *Coordinator)makeMapJobs(files []string){for_, v :=range files {
id := c.generateJobId()//fmt.Println("making map job :", id)
job := Job{
JobType: MapJob,
InputFile:[]string{v},
JobId: id,
ReducerNum: c.ReducerNum,}//这下面暂时不需要
jobMetaINfo := JobMetaInfo{
condition: JobWaiting,
JobPtr:&job,}
c.jobMetaHolder.putJob(&jobMetaINfo)
fmt.Println("making map job :",&job)
c.JobChannelMap <-&job
}//这上面暂时不需要
fmt.Println("done making map jobs")
c.jobMetaHolder.checkJobDone()}
照着把那个样例rpc交互函数写个distribute方法,把的coordinate端和worker端入参数据类型分别改下,加入这句话即可将JobChannelMap里面的一个job给reply
*reply =*<-c.JobChannelMap
worker端对reply取InputFile第一个元素,打印结果如下,worker线程取到任务了
worker get job which is pg-being_ernest.txt
- 3.1小目标完成
3.2 worker通过传过来的文件名做map任务,写出结果
这个步骤简单粗暴,照着mrsequential.go里面写一下,记得用ihash处理下key分成Nreduce份用json编码后写出到"mr-tmp-x-y"文件。注意mr论文这步是有排序的,因为真正生产活动数据量是非常巨大的,map端提前排序好后,reduce的排序压力会减小很多。这里排不排序无所谓。
- 3.2小目标完成
3.3 coordinator感知各个job任务运行完毕和map转reduce时机
- 任务元数据管理从coordinator视角看任务分发> 制作任务 -> 放入队列 -> worker来取如果worker维护任务的状态显然不合理,每个任务的运行开始时间,任务状态。这些内容worker没必要知晓,是coordinator用来判断任务超时,或者map转reduce的。因此用JobMetaInfo把Job + Job状态包装下> 制作任务 -> 放入JobMetaInfo(元数据) -> 放入队列 -> worker来取
type JobMetaInfo struct {
condition JobCondition
StartTime time.Time
JobPtr *Job
}
condition job状态: 包含等待,运行,完成
StartTime : 开始运行的时间(等待变为运行)
JobPtr : job内存地址(golang是值传递,用地址更高效)
所有任务用map存储在JobMetaHolder,key是job的唯一id
```go
type JobMetaHolder struct {
MetaMap map[int]*JobMetaInfo
}
针对这个JobMetaHolder的操作都可以安排上了,比如放入任务,任务状态更新,检查同一阶段任务是否完成
放入任务函数 putJob
func(j *JobMetaHolder)putJob(JobInfo *JobMetaInfo)bool{
jobId := JobInfo.JobPtr.JobId
meta,_:= j.MetaMap[jobId]if meta !=nil{
fmt.Println("meta contains job which id = ", jobId)returnfalse}else{
j.MetaMap[jobId]= JobInfo
}returntrue}
任务发射方法,当channel给出任务后,元数据管理器对任务元数据进行状态变更和运行开始时间记录(后面超时任务有用)。和Kafka的InflightMessage有点像。
func(j *JobMetaHolder)fireTheJob(jobId int)bool{
ok, jobInfo := j.getJobMetaInfo(jobId)if!ok || jobInfo.condition != JobWaiting {returnfalse}
jobInfo.condition = JobWorking
jobInfo.StartTime = time.Now()returntrue}
检查当前阶段任务是否完成。因为每次制作jobs后,实在加锁情况下一股脑更新到元数据的,因此这边通过遍历先检查reduce完成未完成数量再检查map就能判断两种情况下的
完成情况。每次也会print 任务的数量信息方便调试
func(j *JobMetaHolder)checkJobDone()bool{
reduceDoneNum :=0
reduceUndoneNum :=0
mapDoneNum :=0
mapUndoneNum :=0for_, v :=range j.MetaMap {if v.JobPtr.JobType == MapJob {if v.condition == JobDone {
mapDoneNum +=1}else{
mapUndoneNum++}}else{if v.condition == JobDone {
reduceDoneNum++}else{
reduceUndoneNum++}}}
fmt.Printf("%d/%d map jobs are done, %d/%d reduce job are done\n",
mapDoneNum, mapDoneNum+mapUndoneNum, reduceDoneNum, reduceDoneNum+reduceUndoneNum)return(reduceDoneNum >0&& reduceUndoneNum ==0)||(mapDoneNum >0&& mapUndoneNum ==0)}
- coordinator的状态转换 每次当channel的长度为0的时候,去checkJobDone检查一下,当这个阶段所有任务都完成以后,进行状态转换并作出相应操作nextPhase()
func(c *Coordinator)nextPhase(){if c.CoordinatorCondition == MapPhase {
c.makeReduceJobs()
c.CoordinatorCondition = ReducePhase
}elseif c.CoordinatorCondition == ReducePhase {
c.CoordinatorCondition = AllDone
}}
- 3.3小目标完成
3.4 coordinator 的任务分配函数
- 任务分配方法 任务分配方法是coordinator最核心的函数,worker每次来询问都会调用这个方法
func(c *Coordinator)DistributeJob(args *ExampleArgs, reply *Job)error{
mu.Lock()defer mu.Unlock()
fmt.Println("coordinator get a request from worker :")if c.CoordinatorCondition == MapPhase {iflen(c.JobChannelMap)>0{*reply =*<-c.JobChannelMap
if!c.jobMetaHolder.fireTheJob(reply.JobId){
fmt.Printf("[duplicated job id]job %d is running\n", reply.JobId)}}else{
reply.JobType = WaittingJob
if c.jobMetaHolder.checkJobDone(){
c.nextPhase()}returnnil}}elseif c.CoordinatorCondition == ReducePhase {iflen(c.JobChannelReduce)>0{*reply =*<-c.JobChannelReduce
if!c.jobMetaHolder.fireTheJob(reply.JobId){
fmt.Printf("job %d is running\n", reply.JobId)}}else{
reply.JobType = WaittingJob
if c.jobMetaHolder.checkJobDone(){
c.nextPhase()}returnnil}}else{
reply.JobType = KillJob
}returnnil}
- worker完成任务调用的操作 这里使用了判断元数据的办法看看任务是否会重复完成,在后面的崩溃测试下,worker会失效,coordinator需要重新发放Jobg给另一个worker,加入刚才那个失效的worker恢复了然后写入回来,不能覆盖已经完成的数据(甚至都下一个阶段了)。
func(c *Coordinator)JobIsDone(args *Job, reply *ExampleReply)error{ mu.Lock()defer mu.Unlock()switch args.JobType {case MapJob: ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)//prevent a duplicated work which returned from another workerif ok && meta.condition == JobWorking { meta.condition = JobDone fmt.Printf("Map task on %d complete\n", args.JobId)}else{ fmt.Println("[duplicated] job done", args.JobId)}breakcase ReduceJob: fmt.Printf("Reduce task on %d complete\n", args.JobId) ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)//prevent a duplicated work which returned from another workerif ok && meta.condition == JobWorking { meta.condition = JobDone }else{ fmt.Println("[duplicated] job done", args.JobId)}breakdefault:panic("wrong job done")}returnnil
``
3.5 加锁
由于我用了channel所以在任务分配队列实现了天然并发安全,但是在别的地方还是遇到了问题,比如Done函数通过mrcoordinator主线程去时不时读取coordinator的状态来判断是否结束死循环。还有在一个worker调coordinator拉取数据的时候,另一个worker调coordinator的checkJobDone()函数进行检查。因此在响应可能发生冲突的地方加锁。
3.6 crash test
这里我是又起了一个线程去检查JobMetaHolder里面超时的任务,具体可以参考代码
但是无法通过test-mr.sh
去看了crash.go和test-mr.sh里面的逻辑,发现起的worker线程太少了,crash.go里面的maybecrash方法很有可能瘫痪这个worker,只起一个甚至三个都会导致最后没有可用worker,即使将任务重新放到channel里面
因此我修改了test-mr.sh,起了更多的worker,通过了测试。
版权归原作者 ligen1112 所有, 如有侵权,请联系我们删除。