1、前言
对于后台开发的同学来说,几乎每天都接触分布式系统,如数据库、消息队列、注册中心等各种中间件,那分布式系统的原理是什么?最近学习了MIT 6.824分布式系统课程,感觉不错,也推荐后台开发的同学有兴趣的可以看看,课程主页:https://pdos.csail.mit.edu/6.824/index.html,另外B站上搜索6.824分布式系统可以找到很多视频资源,对英语有困难的同学也可以看看网友整理的课程翻译:https://zhuanlan.zhihu.com/c_1273718607160393728,该网友公众号:honghui_writing。
这门课程共有四个Lab,仓库地址为:git://g.csail.mit.edu/6.5840-golabs-2023,Lab1是实现一个简化版MapReduce,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html。Lab2是实现Raft,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-raft.html。Lab3是实现容错的Key/Value服务,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-kvraft.html。Lab4是实现分片Key/Value服务,题目详情见:https://pdos.csail.mit.edu/6.824/labs/lab-shard.html。
本系列会针对每个lab做详细开发介绍,接下来本文先介绍Lab1的开发。
2、流程
此Lab中进程角色主要分为两种:Coordinator、Worker。Coordinator负责任务分发、协调,Worker负责执行任务。整体执行流程如下:
Worker中会起两个协程分别用于执行Map、Reduce任务,Reduce任务需要在所有Map任务完成后开始,步骤为:
- Map协程从Coordinator获取Map任务
- Map协程读取输入文件
- Map协程写入处理后的中间文件
- Map协程上报Coordinator处理结果
- Reduce协程从Coordinator获取Reduce任务
- Reduce协程读取中间文件
- Reduce协程写入处理后的结果文件
- Reduce协程上报Coordinator处理结果
3、Task
MapTask、ReduceTask结构定义如下:
type MapTask struct {
FileName string
WorkerId string
StartTime int64
}
type ReduceTask struct {
FileNames []string
WorkerId string
StartTime int64
}
4、RPC
RPC请求、响应结构定义如下:
type GetMapTaskReq struct {
WorkerId string
}
type GetMapTaskRsp struct {
FileName string
NReduce int
}
type ReportMapTaskReq struct {
WorkerId string
InputFileName string
OuputFileNames map[int]string
State string
}
type ReportMapTaskRsp struct {
}
type GetReduceTaskReq struct {
WorkerId string
}
type GetReduceTaskRsp struct {
Reduce int
FileNames []string
}
type ReportReduceTaskReq struct {
WorkerId string
Reduce int
State string
}
type ReportReduceTaskRsp struct {
}
type IsFinishReq struct {
Stage string
}
type IsFinishRsp struct {
Finish bool
}
5、Coordinator
5.1 结构
Coordinator定义如下:
type Coordinator struct {
mu sync.RWMutex
PendingMapTasks []string //待处理的Map任务,格式:[filename,...]
ProcessMapTasks map[string]*MapTask //处理中的Map任务,格式:{filename:MapTask,...}
PendingReduceTasks map[int][]string //待处理的Reduce任务,格式:{reduce: [filename,...]}
ProcessReduceTasks map[int]*ReduceTask //处理中的Reduce任务,格式:{reduce: ReduceTask,...}
NReduce int //Reduce任务个数
}
5.2 GetMapTask
Coordinator处理来自Worker的获取Map任务请求流程如下:
代码如下:
func (c *Coordinator) GetMapTask(req *GetMapTaskReq, rsp *GetMapTaskRsp) error {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.PendingMapTasks) == 0 {
return ErrNoTask
}
fileName := c.PendingMapTasks[0]
c.PendingMapTasks = c.PendingMapTasks[1:]
mapTask := &MapTask{
FileName: fileName,
WorkerId: req.WorkerId,
StartTime: time.Now().Unix(),
}
c.ProcessMapTasks[fileName] = mapTask
rsp.FileName = fileName
rsp.NReduce = c.NReduce
timer := time.NewTimer(10 * time.Second)
go func() {
<-timer.C
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.ProcessMapTasks[fileName]; ok {
delete(c.ProcessMapTasks, fileName)
c.PendingMapTasks = append(c.PendingMapTasks, fileName)
log.Printf("map task timeout %v, %v", fileName, c.PendingMapTasks)
}
}()
return nil
}
5.3 ReportMapTask
Coordinator处理来自Worker的上报Map任务结果请求流程如下:
代码如下:
func (c *Coordinator) ReportMapTask(req *ReportMapTaskReq, rsp *ReportMapTaskRsp) error {
c.mu.Lock()
defer c.mu.Unlock()
mapTask, ok := c.ProcessMapTasks[req.InputFileName]
if !ok {
return ErrNotFound
}
if mapTask.WorkerId != req.WorkerId {
return ErrInvalidWorker
}
if req.State == StateSuccess {
delete(c.ProcessMapTasks, req.InputFileName)
for reduce, fileName := range req.OuputFileNames {
if _, ok := c.PendingReduceTasks[reduce]; !ok {
c.PendingReduceTasks[reduce] = []string{}
}
exist := false
for _, fn := range c.PendingReduceTasks[reduce] {
if fn == fileName {
exist = true
break
}
}
if !exist {
c.PendingReduceTasks[reduce] = append(c.PendingReduceTasks[reduce], fileName)
}
}
return nil
} else if req.State == StateFailure {
delete(c.ProcessMapTasks, req.InputFileName)
c.PendingMapTasks = append(c.PendingMapTasks, req.InputFileName)
return nil
} else {
return ErrInvalidState
}
}
5.4 GetReduceTask
Coordinator处理来自Worker的获取Reduce任务请求流程如下:
代码如下:
func (c *Coordinator) GetReduceTask(req *GetReduceTaskReq, rsp *GetReduceTaskRsp) error {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.PendingReduceTasks) == 0 {
return ErrNoTask
}
var reduce int
var fileNames []string
for r, fn := range c.PendingReduceTasks {
reduce = r
fileNames = fn
break
}
delete(c.PendingReduceTasks, reduce)
reduceTask := &ReduceTask{
FileNames: fileNames,
WorkerId: req.WorkerId,
StartTime: time.Now().Unix(),
}
c.ProcessReduceTasks[reduce] = reduceTask
rsp.Reduce = reduce
rsp.FileNames = fileNames
timer := time.NewTimer(10 * time.Second)
go func() {
<-timer.C
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.ProcessReduceTasks[reduce]; ok {
delete(c.ProcessReduceTasks, reduce)
c.PendingReduceTasks[reduce] = fileNames
}
}()
return nil
}
5.5 ReportReduceTask
Coordinator处理来自Worker的上报Reduce任务结果请求流程如下:
代码如下:
func (c *Coordinator) ReportReduceTask(req *ReportReduceTaskReq, rsp *ReportReduceTaskRsp) error {
c.mu.Lock()
defer c.mu.Unlock()
reduceTask, ok := c.ProcessReduceTasks[req.Reduce]
if !ok {
return ErrNotFound
}
if reduceTask.WorkerId != req.WorkerId {
return ErrInvalidWorker
}
if req.State == StateSuccess {
delete(c.ProcessReduceTasks, req.Reduce)
return nil
} else if req.State == StateFailure {
delete(c.ProcessReduceTasks, req.Reduce)
c.PendingReduceTasks[req.Reduce] = reduceTask.FileNames
return nil
} else {
return ErrInvalidState
}
}
5.6 IsFinish
Coordinator处理来自Worker的判断任务是否结束请求流程如下:
代码如下:
func (c *Coordinator) IsFinish(req *IsFinishReq, rsp *IsFinishRsp) error {
c.mu.RLock()
defer c.mu.RUnlock()
switch strings.ToLower(req.Stage) {
case "map":
rsp.Finish = len(c.PendingMapTasks) == 0 && len(c.ProcessMapTasks) == 0
case "reduce":
rsp.Finish = len(c.PendingReduceTasks) == 0 && len(c.ProcessReduceTasks) == 0
case "all":
rsp.Finish = len(c.PendingMapTasks) == 0 && len(c.ProcessMapTasks) == 0 &&
len(c.PendingReduceTasks) == 0 && len(c.ProcessReduceTasks) == 0
default:
return ErrInvalidStage
}
return nil
}
6、Worker
6.1 Mapper
Worker Map协程处理任务流程如下:
代码如下:
func mapper(mapf func(string, string) []KeyValue) {
interfileMap := make(map[string]*os.File)
for {
//判断map是否都结束
finish, err := IsFinish("map")
if err != nil {
log.Fatalf("is map finish error: %v", err)
}
if finish {
break
}
mapTaskRsp, err := GetMapTask()
if err != nil {
if err.Error() == ErrNoTask.Error() {
time.Sleep(time.Second)
continue
}
log.Fatalf("get map task error: %v", err)
}
input, err := os.Open(mapTaskRsp.FileName)
if err != nil {
log.Fatalf("cantnot open %v", mapTaskRsp.FileName)
}
content, err := io.ReadAll(input)
if err != nil {
log.Fatalf("cannot read %v", mapTaskRsp.FileName)
}
input.Close()
intermediate := mapf(mapTaskRsp.FileName, string(content))
outputFileNames := make(map[int]string)
for _, kv := range intermediate {
reduce := ihash(kv.Key) % mapTaskRsp.NReduce
fileName := fmt.Sprintf("mr-%d-%d", os.Getpid(), reduce)
outputFileNames[reduce] = fileName
if _, ok := interfileMap[fileName]; !ok {
interfileMap[fileName], _ = os.Create(fileName)
}
enc := json.NewEncoder(interfileMap[fileName])
enc.Encode(&kv)
}
for _, interfile := range interfileMap {
interfile.Sync()
}
err = ReportMapTask(mapTaskRsp.FileName, outputFileNames, StateSuccess)
if err != nil {
log.Fatalf("report map task error: %v", err)
}
}
for _, interfile := range interfileMap {
interfile.Close()
}
log.Printf("mapper finish")
}
6.2 Reducer
Worker Reducer协程处理任务流程如下:
代码如下:
func reducer(reducef func(string, []string) string) {
//等待所有mapper结束
ticker := time.NewTicker(time.Second)
waitmap:
for {
select {
case <-ticker.C:
finish, err := IsFinish("map")
if err != nil {
log.Fatalf("is map finish error: %v", err)
}
if finish {
break waitmap
}
}
}
for {
//判断reduce是否都结束
finish, err := IsFinish("reduce")
if err != nil {
log.Fatalf("is reduce finish error: %v", err)
}
if finish {
break
}
reduceTaskRsp, err := GetReduceTask()
log.Printf("get reduce task: %v, %v, %v", os.Getpid(), reduceTaskRsp, err)
if err != nil {
if err.Error() == ErrNoTask.Error() {
time.Sleep(time.Second)
continue
}
log.Fatalf("get reduce task error: %v", err)
}
var kva []KeyValue
for _, fileName := range reduceTaskRsp.FileNames {
interfile, _ := os.Open(fileName)
dec := json.NewDecoder(interfile)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
interfile.Close()
}
sort.Sort(ByKey(kva))
oname := fmt.Sprintf("mr-out-%v", reduceTaskRsp.Reduce)
ofile, _ := os.Create(oname)
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
i = j
}
ofile.Close()
err = ReportReduceTask(reduceTaskRsp.Reduce, StateSuccess)
if err != nil {
log.Fatalf("report reduce task error: %v", err)
}
}
log.Printf("reducer finish")
}
6.3 启动
Worker 启动Mapper、Reducer代码如下:
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
mapper(mapf)
}()
wg.Add(1)
go func() {
defer wg.Done()
reducer(reducef)
}()
wg.Wait()
log.Printf("worker finish:%v", os.Getpid())
}
7、测试
测试脚本为:src/main/test-mr.sh、src/main/test-mr-many.sh,前者为单次测试,后者为多次测试。测试脚本会从多个维度进行测试,测试内容可以自行研究,只有当所有测试用例通过整个Lab才算通过。在开发调试阶段可以调整脚本进行针对性测试。







