1、前言
对于后台开发的同学来说,几乎每天都接触分布式系统,如数据库、消息队列、注册中心等各种中间件,那分布式系统的原理是什么?最近学习了MIT 6.824分布式系统课程,感觉不错,也推荐后台开发的同学有兴趣的可以看看,课程主页:https://pdos.csail.mit.edu/6.824/index.html,另外B站上搜索6.824分布式系统可以找到很多视频资源,对英语有困难的同学也可以看看网友整理的课程翻译:https://zhuanlan.zhihu.com/c_1273718607160393728,该网友公众号:honghui_writing。
这门课程共有四个Lab,仓库地址为:git://g.csail.mit.edu/6.5840-golabs-2023,Lab1是实现一个简化版MapReduce,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html。Lab2是实现Raft,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-raft.html。Lab3是实现容错的Key/Value服务,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-kvraft.html。Lab4是实现分片Key/Value服务,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-shard.html。
本系列会针对每个lab做详细开发介绍,接下来本文先介绍Lab1的开发。
2、流程
此Lab中进程角色主要分为两种:Coordinator、Worker。Coordinator负责任务分发、协调,Worker负责执行任务。整体执行流程如下:
Worker中会起两个协程分别用于执行Map、Reduce任务,Reduce任务需要在所有Map任务完成后开始,步骤为:
- Map协程从Coordinator获取Map任务
- Map协程读取输入文件
- Map协程写入处理后的中间文件
- Map协程上报Coordinator处理结果
- Reduce协程从Coordinator获取Reduce任务
- Reduce协程读取中间文件
- Reduce协程写入处理后的结果文件
- Reduce协程上报Coordinator处理结果
3、Task
MapTask、ReduceTask结构定义如下:
type MapTask struct { FileName string WorkerId string StartTime int64 } type ReduceTask struct { FileNames []string WorkerId string StartTime int64 }
4、RPC
RPC请求、响应结构定义如下:
type GetMapTaskReq struct { WorkerId string } type GetMapTaskRsp struct { FileName string NReduce int } type ReportMapTaskReq struct { WorkerId string InputFileName string OuputFileNames map[int]string State string } type ReportMapTaskRsp struct { } type GetReduceTaskReq struct { WorkerId string } type GetReduceTaskRsp struct { Reduce int FileNames []string } type ReportReduceTaskReq struct { WorkerId string Reduce int State string } type ReportReduceTaskRsp struct { } type IsFinishReq struct { Stage string } type IsFinishRsp struct { Finish bool }
5、Coordinator
5.1 结构
Coordinator定义如下:
type Coordinator struct { mu sync.RWMutex PendingMapTasks []string //待处理的Map任务,格式:[filename,...] ProcessMapTasks map[string]*MapTask //处理中的Map任务,格式:{filename:MapTask,...} PendingReduceTasks map[int][]string //待处理的Reduce任务,格式:{reduce: [filename,...]} ProcessReduceTasks map[int]*ReduceTask //处理中的Reduce任务,格式:{reduce: ReduceTask,...} NReduce int //Reduce任务个数 }
5.2 GetMapTask
Coordinator处理来自Worker的获取Map任务请求流程如下:
代码如下:
func (c *Coordinator) GetMapTask(req *GetMapTaskReq, rsp *GetMapTaskRsp) error { c.mu.Lock() defer c.mu.Unlock() if len(c.PendingMapTasks) == 0 { return ErrNoTask } fileName := c.PendingMapTasks[0] c.PendingMapTasks = c.PendingMapTasks[1:] mapTask := &MapTask{ FileName: fileName, WorkerId: req.WorkerId, StartTime: time.Now().Unix(), } c.ProcessMapTasks[fileName] = mapTask rsp.FileName = fileName rsp.NReduce = c.NReduce timer := time.NewTimer(10 * time.Second) go func() { <-timer.C c.mu.Lock() defer c.mu.Unlock() if _, ok := c.ProcessMapTasks[fileName]; ok { delete(c.ProcessMapTasks, fileName) c.PendingMapTasks = append(c.PendingMapTasks, fileName) log.Printf("map task timeout %v, %v", fileName, c.PendingMapTasks) } }() return nil }
5.3 ReportMapTask
Coordinator处理来自Worker的上报Map任务结果请求流程如下:
代码如下:
func (c *Coordinator) ReportMapTask(req *ReportMapTaskReq, rsp *ReportMapTaskRsp) error { c.mu.Lock() defer c.mu.Unlock() mapTask, ok := c.ProcessMapTasks[req.InputFileName] if !ok { return ErrNotFound } if mapTask.WorkerId != req.WorkerId { return ErrInvalidWorker } if req.State == StateSuccess { delete(c.ProcessMapTasks, req.InputFileName) for reduce, fileName := range req.OuputFileNames { if _, ok := c.PendingReduceTasks[reduce]; !ok { c.PendingReduceTasks[reduce] = []string{} } exist := false for _, fn := range c.PendingReduceTasks[reduce] { if fn == fileName { exist = true break } } if !exist { c.PendingReduceTasks[reduce] = append(c.PendingReduceTasks[reduce], fileName) } } return nil } else if req.State == StateFailure { delete(c.ProcessMapTasks, req.InputFileName) c.PendingMapTasks = append(c.PendingMapTasks, req.InputFileName) return nil } else { return ErrInvalidState } }
5.4 GetReduceTask
Coordinator处理来自Worker的获取Reduce任务请求流程如下:
代码如下:
func (c *Coordinator) GetReduceTask(req *GetReduceTaskReq, rsp *GetReduceTaskRsp) error { c.mu.Lock() defer c.mu.Unlock() if len(c.PendingReduceTasks) == 0 { return ErrNoTask } var reduce int var fileNames []string for r, fn := range c.PendingReduceTasks { reduce = r fileNames = fn break } delete(c.PendingReduceTasks, reduce) reduceTask := &ReduceTask{ FileNames: fileNames, WorkerId: req.WorkerId, StartTime: time.Now().Unix(), } c.ProcessReduceTasks[reduce] = reduceTask rsp.Reduce = reduce rsp.FileNames = fileNames timer := time.NewTimer(10 * time.Second) go func() { <-timer.C c.mu.Lock() defer c.mu.Unlock() if _, ok := c.ProcessReduceTasks[reduce]; ok { delete(c.ProcessReduceTasks, reduce) c.PendingReduceTasks[reduce] = fileNames } }() return nil }
5.5 ReportReduceTask
Coordinator处理来自Worker的上报Reduce任务结果请求流程如下:
代码如下:
func (c *Coordinator) ReportReduceTask(req *ReportReduceTaskReq, rsp *ReportReduceTaskRsp) error { c.mu.Lock() defer c.mu.Unlock() reduceTask, ok := c.ProcessReduceTasks[req.Reduce] if !ok { return ErrNotFound } if reduceTask.WorkerId != req.WorkerId { return ErrInvalidWorker } if req.State == StateSuccess { delete(c.ProcessReduceTasks, req.Reduce) return nil } else if req.State == StateFailure { delete(c.ProcessReduceTasks, req.Reduce) c.PendingReduceTasks[req.Reduce] = reduceTask.FileNames return nil } else { return ErrInvalidState } }
5.6 IsFinish
Coordinator处理来自Worker的判断任务是否结束请求流程如下:
代码如下:
func (c *Coordinator) IsFinish(req *IsFinishReq, rsp *IsFinishRsp) error { c.mu.RLock() defer c.mu.RUnlock() switch strings.ToLower(req.Stage) { case "map": rsp.Finish = len(c.PendingMapTasks) == 0 && len(c.ProcessMapTasks) == 0 case "reduce": rsp.Finish = len(c.PendingReduceTasks) == 0 && len(c.ProcessReduceTasks) == 0 case "all": rsp.Finish = len(c.PendingMapTasks) == 0 && len(c.ProcessMapTasks) == 0 && len(c.PendingReduceTasks) == 0 && len(c.ProcessReduceTasks) == 0 default: return ErrInvalidStage } return nil }
6、Worker
6.1 Mapper
Worker Map协程处理任务流程如下:
代码如下:
func mapper(mapf func(string, string) []KeyValue) { interfileMap := make(map[string]*os.File) for { //判断map是否都结束 finish, err := IsFinish("map") if err != nil { log.Fatalf("is map finish error: %v", err) } if finish { break } mapTaskRsp, err := GetMapTask() if err != nil { if err.Error() == ErrNoTask.Error() { time.Sleep(time.Second) continue } log.Fatalf("get map task error: %v", err) } input, err := os.Open(mapTaskRsp.FileName) if err != nil { log.Fatalf("cantnot open %v", mapTaskRsp.FileName) } content, err := io.ReadAll(input) if err != nil { log.Fatalf("cannot read %v", mapTaskRsp.FileName) } input.Close() intermediate := mapf(mapTaskRsp.FileName, string(content)) outputFileNames := make(map[int]string) for _, kv := range intermediate { reduce := ihash(kv.Key) % mapTaskRsp.NReduce fileName := fmt.Sprintf("mr-%d-%d", os.Getpid(), reduce) outputFileNames[reduce] = fileName if _, ok := interfileMap[fileName]; !ok { interfileMap[fileName], _ = os.Create(fileName) } enc := json.NewEncoder(interfileMap[fileName]) enc.Encode(&kv) } for _, interfile := range interfileMap { interfile.Sync() } err = ReportMapTask(mapTaskRsp.FileName, outputFileNames, StateSuccess) if err != nil { log.Fatalf("report map task error: %v", err) } } for _, interfile := range interfileMap { interfile.Close() } log.Printf("mapper finish") }
6.2 Reducer
Worker Reducer协程处理任务流程如下:
代码如下:
func reducer(reducef func(string, []string) string) { //等待所有mapper结束 ticker := time.NewTicker(time.Second) waitmap: for { select { case <-ticker.C: finish, err := IsFinish("map") if err != nil { log.Fatalf("is map finish error: %v", err) } if finish { break waitmap } } } for { //判断reduce是否都结束 finish, err := IsFinish("reduce") if err != nil { log.Fatalf("is reduce finish error: %v", err) } if finish { break } reduceTaskRsp, err := GetReduceTask() log.Printf("get reduce task: %v, %v, %v", os.Getpid(), reduceTaskRsp, err) if err != nil { if err.Error() == ErrNoTask.Error() { time.Sleep(time.Second) continue } log.Fatalf("get reduce task error: %v", err) } var kva []KeyValue for _, fileName := range reduceTaskRsp.FileNames { interfile, _ := os.Open(fileName) dec := json.NewDecoder(interfile) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) } interfile.Close() } sort.Sort(ByKey(kva)) oname := fmt.Sprintf("mr-out-%v", reduceTaskRsp.Reduce) ofile, _ := os.Create(oname) i := 0 for i < len(kva) { j := i + 1 for j < len(kva) && kva[j].Key == kva[i].Key { j++ } var values []string for k := i; k < j; k++ { values = append(values, kva[k].Value) } output := reducef(kva[i].Key, values) // this is the correct format for each line of Reduce output. fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output) i = j } ofile.Close() err = ReportReduceTask(reduceTaskRsp.Reduce, StateSuccess) if err != nil { log.Fatalf("report reduce task error: %v", err) } } log.Printf("reducer finish") }
6.3 启动
Worker 启动Mapper、Reducer代码如下:
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() mapper(mapf) }() wg.Add(1) go func() { defer wg.Done() reducer(reducef) }() wg.Wait() log.Printf("worker finish:%v", os.Getpid()) }
7、测试
测试脚本为:src/main/test-mr.sh、src/main/test-mr-many.sh,前者为单次测试,后者为多次测试。测试脚本会从多个维度进行测试,测试内容可以自行研究,只有当所有测试用例通过整个Lab才算通过。在开发调试阶段可以调整脚本进行针对性测试。