编程技术分享

  • 首页
  1. 首页
  2. 分布式
  3. 正文

6.5840 Lab 1: MapReduce

2024年1月13日 2928点热度 0人点赞 0条评论

1、前言

对于后台开发的同学来说,几乎每天都接触分布式系统,如数据库、消息队列、注册中心等各种中间件,那分布式系统的原理是什么?最近学习了MIT 6.824分布式系统课程,感觉不错,也推荐后台开发的同学有兴趣的可以看看,课程主页:https://pdos.csail.mit.edu/6.824/index.html,另外B站上搜索6.824分布式系统可以找到很多视频资源,对英语有困难的同学也可以看看网友整理的课程翻译:https://zhuanlan.zhihu.com/c_1273718607160393728,该网友公众号:honghui_writing。

这门课程共有四个Lab,仓库地址为:git://g.csail.mit.edu/6.5840-golabs-2023,Lab1是实现一个简化版MapReduce,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html。Lab2是实现Raft,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-raft.html。Lab3是实现容错的Key/Value服务,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-kvraft.html。Lab4是实现分片Key/Value服务,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-shard.html。

本系列会针对每个lab做详细开发介绍,接下来本文先介绍Lab1的开发。

2、流程

此Lab中进程角色主要分为两种:Coordinator、Worker。Coordinator负责任务分发、协调,Worker负责执行任务。整体执行流程如下:

Worker中会起两个协程分别用于执行Map、Reduce任务,Reduce任务需要在所有Map任务完成后开始,步骤为:

  • Map协程从Coordinator获取Map任务
  • Map协程读取输入文件
  • Map协程写入处理后的中间文件
  • Map协程上报Coordinator处理结果
  • Reduce协程从Coordinator获取Reduce任务
  • Reduce协程读取中间文件
  • Reduce协程写入处理后的结果文件
  • Reduce协程上报Coordinator处理结果

3、Task

MapTask、ReduceTask结构定义如下:

type MapTask struct {
    FileName  string
    WorkerId  string
    StartTime int64
}

type ReduceTask struct {
    FileNames []string
    WorkerId  string
    StartTime int64
}

4、RPC

RPC请求、响应结构定义如下:

type GetMapTaskReq struct {
    WorkerId string
}

type GetMapTaskRsp struct {
    FileName string
    NReduce  int
}

type ReportMapTaskReq struct {
    WorkerId       string
    InputFileName  string
    OuputFileNames map[int]string
    State          string
}

type ReportMapTaskRsp struct {
}

type GetReduceTaskReq struct {
    WorkerId string
}

type GetReduceTaskRsp struct {
    Reduce    int
    FileNames []string
}

type ReportReduceTaskReq struct {
    WorkerId string
    Reduce   int
    State    string
}

type ReportReduceTaskRsp struct {
}

type IsFinishReq struct {
    Stage string
}

type IsFinishRsp struct {
    Finish bool
}

5、Coordinator

5.1 结构

Coordinator定义如下:

type Coordinator struct {
    mu                 sync.RWMutex
    PendingMapTasks    []string            //待处理的Map任务,格式:[filename,...]
    ProcessMapTasks    map[string]*MapTask //处理中的Map任务,格式:{filename:MapTask,...}
    PendingReduceTasks map[int][]string    //待处理的Reduce任务,格式:{reduce: [filename,...]}
    ProcessReduceTasks map[int]*ReduceTask //处理中的Reduce任务,格式:{reduce: ReduceTask,...}
    NReduce            int                 //Reduce任务个数
}

5.2 GetMapTask

Coordinator处理来自Worker的获取Map任务请求流程如下:

代码如下:

func (c *Coordinator) GetMapTask(req *GetMapTaskReq, rsp *GetMapTaskRsp) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    if len(c.PendingMapTasks) == 0 {
        return ErrNoTask
    }
    fileName := c.PendingMapTasks[0]
    c.PendingMapTasks = c.PendingMapTasks[1:]
    mapTask := &MapTask{
        FileName:  fileName,
        WorkerId:  req.WorkerId,
        StartTime: time.Now().Unix(),
    }
    c.ProcessMapTasks[fileName] = mapTask
    rsp.FileName = fileName
    rsp.NReduce = c.NReduce
    timer := time.NewTimer(10 * time.Second)
    go func() {
        <-timer.C
        c.mu.Lock()
        defer c.mu.Unlock()
        if _, ok := c.ProcessMapTasks[fileName]; ok {
            delete(c.ProcessMapTasks, fileName)
            c.PendingMapTasks = append(c.PendingMapTasks, fileName)
            log.Printf("map task timeout %v, %v", fileName, c.PendingMapTasks)
        }
    }()
    return nil
}

5.3 ReportMapTask

Coordinator处理来自Worker的上报Map任务结果请求流程如下:

代码如下:

func (c *Coordinator) ReportMapTask(req *ReportMapTaskReq, rsp *ReportMapTaskRsp) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    mapTask, ok := c.ProcessMapTasks[req.InputFileName]
    if !ok {
        return ErrNotFound
    }
    if mapTask.WorkerId != req.WorkerId {
        return ErrInvalidWorker
    }
    if req.State == StateSuccess {
        delete(c.ProcessMapTasks, req.InputFileName)
        for reduce, fileName := range req.OuputFileNames {
            if _, ok := c.PendingReduceTasks[reduce]; !ok {
                c.PendingReduceTasks[reduce] = []string{}
            }
            exist := false
            for _, fn := range c.PendingReduceTasks[reduce] {
                if fn == fileName {
                    exist = true
                    break
                }
            }
            if !exist {
                c.PendingReduceTasks[reduce] = append(c.PendingReduceTasks[reduce], fileName)
            }
        }
        return nil
    } else if req.State == StateFailure {
        delete(c.ProcessMapTasks, req.InputFileName)
        c.PendingMapTasks = append(c.PendingMapTasks, req.InputFileName)
        return nil
    } else {
        return ErrInvalidState
    }
}

5.4 GetReduceTask

Coordinator处理来自Worker的获取Reduce任务请求流程如下:

代码如下:

func (c *Coordinator) GetReduceTask(req *GetReduceTaskReq, rsp *GetReduceTaskRsp) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    if len(c.PendingReduceTasks) == 0 {
        return ErrNoTask
    }
    var reduce int
    var fileNames []string
    for r, fn := range c.PendingReduceTasks {
        reduce = r
        fileNames = fn
        break
    }
    delete(c.PendingReduceTasks, reduce)
    reduceTask := &ReduceTask{
        FileNames: fileNames,
        WorkerId:  req.WorkerId,
        StartTime: time.Now().Unix(),
    }
    c.ProcessReduceTasks[reduce] = reduceTask
    rsp.Reduce = reduce
    rsp.FileNames = fileNames
    timer := time.NewTimer(10 * time.Second)
    go func() {
        <-timer.C
        c.mu.Lock()
        defer c.mu.Unlock()
        if _, ok := c.ProcessReduceTasks[reduce]; ok {
            delete(c.ProcessReduceTasks, reduce)
            c.PendingReduceTasks[reduce] = fileNames
        }
    }()
    return nil
}

5.5 ReportReduceTask

Coordinator处理来自Worker的上报Reduce任务结果请求流程如下:

代码如下:

func (c *Coordinator) ReportReduceTask(req *ReportReduceTaskReq, rsp *ReportReduceTaskRsp) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    reduceTask, ok := c.ProcessReduceTasks[req.Reduce]
    if !ok {
        return ErrNotFound
    }
    if reduceTask.WorkerId != req.WorkerId {
        return ErrInvalidWorker
    }
    if req.State == StateSuccess {
        delete(c.ProcessReduceTasks, req.Reduce)
        return nil
    } else if req.State == StateFailure {
        delete(c.ProcessReduceTasks, req.Reduce)
        c.PendingReduceTasks[req.Reduce] = reduceTask.FileNames
        return nil
    } else {
        return ErrInvalidState
    }
}

5.6 IsFinish

Coordinator处理来自Worker的判断任务是否结束请求流程如下:

代码如下:

func (c *Coordinator) IsFinish(req *IsFinishReq, rsp *IsFinishRsp) error {
    c.mu.RLock()
    defer c.mu.RUnlock()
    switch strings.ToLower(req.Stage) {
    case "map":
        rsp.Finish = len(c.PendingMapTasks) == 0 && len(c.ProcessMapTasks) == 0
    case "reduce":
        rsp.Finish = len(c.PendingReduceTasks) == 0 && len(c.ProcessReduceTasks) == 0
    case "all":
        rsp.Finish = len(c.PendingMapTasks) == 0 && len(c.ProcessMapTasks) == 0 &&
            len(c.PendingReduceTasks) == 0 && len(c.ProcessReduceTasks) == 0
    default:
        return ErrInvalidStage
    }
    return nil
}

6、Worker

6.1 Mapper

Worker Map协程处理任务流程如下:

代码如下:

func mapper(mapf func(string, string) []KeyValue) {
    interfileMap := make(map[string]*os.File)
    for {
        //判断map是否都结束
        finish, err := IsFinish("map")
        if err != nil {
            log.Fatalf("is map finish error: %v", err)
        }
        if finish {
            break
        }
        mapTaskRsp, err := GetMapTask()
        if err != nil {
            if err.Error() == ErrNoTask.Error() {
                time.Sleep(time.Second)
                continue
            }
            log.Fatalf("get map task error: %v", err)
        }
        input, err := os.Open(mapTaskRsp.FileName)
        if err != nil {
            log.Fatalf("cantnot open %v", mapTaskRsp.FileName)
        }
        content, err := io.ReadAll(input)
        if err != nil {
            log.Fatalf("cannot read %v", mapTaskRsp.FileName)
        }
        input.Close()
        intermediate := mapf(mapTaskRsp.FileName, string(content))
        outputFileNames := make(map[int]string)
        for _, kv := range intermediate {
            reduce := ihash(kv.Key) % mapTaskRsp.NReduce
            fileName := fmt.Sprintf("mr-%d-%d", os.Getpid(), reduce)
            outputFileNames[reduce] = fileName
            if _, ok := interfileMap[fileName]; !ok {
                interfileMap[fileName], _ = os.Create(fileName)
            }
            enc := json.NewEncoder(interfileMap[fileName])
            enc.Encode(&kv)
        }
        for _, interfile := range interfileMap {
            interfile.Sync()
        }
        err = ReportMapTask(mapTaskRsp.FileName, outputFileNames, StateSuccess)
        if err != nil {
            log.Fatalf("report map task error: %v", err)
        }
    }
    for _, interfile := range interfileMap {
        interfile.Close()
    }
    log.Printf("mapper finish")
}

6.2 Reducer

Worker Reducer协程处理任务流程如下:

代码如下:

func reducer(reducef func(string, []string) string) {
    //等待所有mapper结束
    ticker := time.NewTicker(time.Second)
waitmap:
    for {
        select {
        case <-ticker.C:
            finish, err := IsFinish("map")
            if err != nil {
                log.Fatalf("is map finish error: %v", err)
            }
            if finish {
                break waitmap
            }
        }
    }
    for {
        //判断reduce是否都结束
        finish, err := IsFinish("reduce")
        if err != nil {
            log.Fatalf("is reduce finish error: %v", err)
        }
        if finish {
            break
        }
        reduceTaskRsp, err := GetReduceTask()
        log.Printf("get reduce task: %v, %v, %v", os.Getpid(), reduceTaskRsp, err)
        if err != nil {
            if err.Error() == ErrNoTask.Error() {
                time.Sleep(time.Second)
                continue
            }
            log.Fatalf("get reduce task error: %v", err)
        }
        var kva []KeyValue
        for _, fileName := range reduceTaskRsp.FileNames {
            interfile, _ := os.Open(fileName)
            dec := json.NewDecoder(interfile)
            for {
                var kv KeyValue
                if err := dec.Decode(&kv); err != nil {
                    break
                }
                kva = append(kva, kv)
            }
            interfile.Close()
        }
        sort.Sort(ByKey(kva))
        oname := fmt.Sprintf("mr-out-%v", reduceTaskRsp.Reduce)
        ofile, _ := os.Create(oname)

        i := 0
        for i < len(kva) {
            j := i + 1
            for j < len(kva) && kva[j].Key == kva[i].Key {
                j++
            }
            var values []string
            for k := i; k < j; k++ {
                values = append(values, kva[k].Value)
            }
            output := reducef(kva[i].Key, values)

            // this is the correct format for each line of Reduce output.
            fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)

            i = j
        }
        ofile.Close()
        err = ReportReduceTask(reduceTaskRsp.Reduce, StateSuccess)
        if err != nil {
            log.Fatalf("report reduce task error: %v", err)
        }
    }
    log.Printf("reducer finish")
}

6.3 启动

Worker 启动Mapper、Reducer代码如下:

func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        mapper(mapf)
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        reducer(reducef)
    }()
    wg.Wait()
    log.Printf("worker finish:%v", os.Getpid())
}

7、测试

测试脚本为:src/main/test-mr.sh、src/main/test-mr-many.sh,前者为单次测试,后者为多次测试。测试脚本会从多个维度进行测试,测试内容可以自行研究,只有当所有测试用例通过整个Lab才算通过。在开发调试阶段可以调整脚本进行针对性测试。

标签: 暂无
最后更新:2024年1月13日

jemuel

这个人很懒,什么都没留下

点赞
< 上一篇
文章目录
  • 1、前言
  • 2、流程
  • 3、Task
  • 4、RPC
  • 5、Coordinator
    • 5.1 结构
    • 5.2 GetMapTask
    • 5.3 ReportMapTask
    • 5.4 GetReduceTask
    • 5.5 ReportReduceTask
    • 5.6 IsFinish
  • 6、Worker
    • 6.1 Mapper
    • 6.2 Reducer
    • 6.3 启动
  • 7、测试
最新 热点 随机
最新 热点 随机
K8S源码分析系列2—远程调试K8S组件 Volcano源码分析系列—调度篇 K8S源码分析系列1—搭建K8S调试集群 K8S Controller开发 6.5840 Lab 1: MapReduce MongoDB源码分析系列1——编译环境搭建
K8S源码分析系列2—远程调试K8S组件
MySQL源码分析系列2——启动流程 MySQL源码分析系列3——登录协议解析 Go调度模型 大数据平台之binlog采集方案 MySQL源码分析系列5——ibd解析 K8S Controller开发

COPYRIGHT © 2021 www.miaozhouguang.com. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS

粤ICP备2022006024号

粤公网安备 44030602006568号