MIT 6.5840 Notes: Lab 1 - MapReduce

Introduction

MIT 6.5840, as known as 6.824, is a famous distributed systems course offered by MIT. The course covers a wide range of topics in distributed systems, including fault tolerance, consistency, replication, and sharding. It also delves into the design and implementation of distributed systems, with a focus on real-world systems like MapReduce, Raft, and ZooKeeper.

In this series of notes, I will document my learnings from the labs and lectures of MIT 6.5840. These notes are intended to serve as a reference for myself and others who are interested in distributed systems.

In this article, we will discuss Lab 1 of MIT 6.5840, which focuses on implementing a simple MapReduce framework.

MapReduce

In the lectures, we learned about the MapReduce programming model, which simplifies the development of large-scale data processing applications. The MapReduce model consists of two main functions:

  1. Map Function: This function processes input key-value pairs and generates intermediate key-value pairs.
  2. Reduce Function: This function processes intermediate key-value pairs and produces the final output.

Actually, the MapReduce model can be described as follows:

  1. Map: The Map function processes input key-value pairs and generates intermediate key-value pairs.
  2. Shuffle: The intermediate key-value pairs are shuffled and sorted to group all values associated with the same key.
  3. Reduce: The Reduce function processes the grouped key-value pairs and produces the final output.

Here are some points to remember about MapReduce:

  1. MapReduce reads and writes data from and to the distributed file system, such as GFS or HDFS.
  2. Reduce tasks only run after all the Map tasks have completed.
  3. Map and Reduce run in parallel across multiple machines.
  4. MapReduce handles machine failures by re-executing tasks on other machines.

Lab

In Lab 1 of MIT 6.5840, we are tasked with implementing the coordinator and worker processes for a simple MapReduce framework. The coordinator is responsible for scheduling tasks and handling failures, while the workers execute the Map and Reduce tasks.

Map and Reduce Functions

Though the Map and Reduce functions are provided to us, understanding how they work can help us implement the lab more effectively. Here is an example of a Map function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// src/mrapps/wc.go

func Map(filename string, contents string) []mr.KeyValue {
    // function to detect word separators.
    ff := func(r rune) bool { return !unicode.IsLetter(r) }

    // split contents into an array of words.
    words := strings.FieldsFunc(contents, ff)

    kva := []mr.KeyValue{}
    for _, w := range words {
        kv := mr.KeyValue{w, "1"}
        kva = append(kva, kv)
    }
    return kva
}

func Reduce(key string, values []string) string {
    // return the number of occurrences of this word.
    return strconv.Itoa(len(values))
}

The Map function processes the input file and generates key-value pairs where the key is a word and the value is 1. The Reduce function processes the intermediate key-value pairs and returns the count of occurrences of each word. In additional, these functions will be loaded by workers using Go’s plugin package.

RPC

The communication between the coordinator and workers is done using Remote Procedure Calls (RPC). The RPC library is provided to us, we just need to learn how to use it. Here is a built-in example of an RPC call:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// src/mr/coordinator.go
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
    reply.Y = args.X + 1
    return nil
}

// src/mr/rpc.go
type ExampleArgs struct {
    X int
}

type ExampleReply struct {
    Y int
}

// src/mr/worker.go
func CallExample() {

    // declare an argument structure.
    args := ExampleArgs{}

    // fill in the argument(s).
    args.X = 99

    // declare a reply structure.
    reply := ExampleReply{}

    // send the RPC request, wait for the reply.
    // the "Coordinator.Example" tells the
    // receiving server that we'd like to call
    // the Example() method of struct Coordinator.
    ok := call("Coordinator.Example", &args, &reply)
    if ok {
        // reply.Y should be 100.
        fmt.Printf("reply.Y %v\n", reply.Y)
    } else {
        fmt.Printf("call failed!\n")
    }
}

This example shows how to make an RPC call from a worker to the coordinator. The coordinator has a method Example that takes an argument X from the first argument and returns X + 1 by setting the Y field of the reply.

Initializing the Coordinator

The coordinator is initialized by calling the MakeCoordinator function, which takes the list of input files and the number of reduce tasks as arguments. The coordinator then starts the workers and assigns tasks to them.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// src/mr/task.go

type MapTask struct {
    Id       int
    FileName string
    NReduce  int
    Status   TaskState
}

type ReduceTask struct {
    Id     int
    NMap   int
    Status TaskState
}

type TaskState int

const (
    TaskStatus_Idle TaskState = iota
    TaskStatus_InProgress
    TaskStatus_Completed
)

// src/mr/coordinator.go

type Coordinator struct {
    // Your definitions here.
    sync.Mutex
    *sync.Cond
    MapTasks    []*MapTask
    ReduceTasks []*ReduceTask
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{}

    // Your code here.
    c.Mutex = sync.Mutex{}
    c.Cond = sync.NewCond(&c.Mutex)

    nMap := len(files)
    c.MapTasks = make([]*MapTask, nMap)
    for i, filename := range files {
        c.MapTasks[i] = &MapTask{
            Id:       i,
            FileName: filename,
            NReduce:  nReduce,
            Status:   TaskStatus_Idle,
        }
    }

    c.ReduceTasks = make([]*ReduceTask, nReduce)
    for i := range nReduce {
        c.ReduceTasks[i] = &ReduceTask{
            Id:     i,
            NMap:   nMap,
            Status: TaskStatus_Idle,
        }
    }

    c.server()
    return &c
}

Following the initialization, the coordinator add the map tasks for every input file and nReduce reduce tasks to the task queue. Then it starts the RPC server to listen for incoming worker connections.

Worker

The worker fetch tasks from the coordinator and execute them until there are no more tasks to process. The worker can be implemented as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// src/mr/worker.go

func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {
    for {
        typ, mapTask, reduceTask, err := CallFetchTask()
        if err != nil {
            log.Fatalf("fetch task failed: %v", err)
        }

        switch typ {
        case TaskType_None:
            return
        case TaskType_Map:
            doMapTask(mapTask, mapf)
        case TaskType_Reduce:
            doReduceTask(reduceTask, reducef)
        default:
            log.Fatalf("unknown task type: %v", typ)
        }
    }
}

The Worker function fetches tasks from the coordinator using the CallFetchTask RPC call. It then processes the tasks based on their type (Map or Reduce) by calling the doMapTask or doReduceTask functions.

Fetching Tasks

The coordinator provides a FetchTask RPC handler for workers to fetch tasks. The coordinator fetches map tasks first and then reduce tasks. If there are no more tasks to process, it returns TaskType_None. Because a worker may fail during task execution, when all tasks are fetched but not completed, the coordinator will wait for the tasks to be completed before returning.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// src/mr/rpc.go

type FetchTaskArgs struct{}
type FetchTaskReply struct {
    TaskType   TaskType
    MapTask    *MapTask
    ReduceTask *ReduceTask
}

type TaskType int

const (
    TaskType_None TaskType = iota
    TaskType_Map
    TaskType_Reduce
)

// src/mr/coordinator.go

// FetchTask RPC handler for worker to fetch a task
func (c *Coordinator) FetchTask(_ *FetchTaskArgs, reply *FetchTaskReply) error {
    c.Lock()
    defer c.Unlock()

    // Map tasks
    for {
        if c.MapDone() {
            break
        }

        if task := c.fetchMapTask(); task != nil {
            reply.TaskType = TaskType_Map
            reply.MapTask = task
            c.startMapTask(task)

            return nil
        } else {
            c.Wait() // wait for all map tasks to be completed
        }
    }

    // Reduce tasks
    for {
        if c.Done() {
            break
        }

        if task := c.fetchReduceTask(); task != nil {
            reply.TaskType = TaskType_Reduce
            reply.ReduceTask = task
            c.startReduceTask(task)

            return nil
        } else {
            c.Wait()
        }
    }

    // ALl tasks are done
    return nil
}

// fetch a map task that is idle
func (c *Coordinator) fetchMapTask() *MapTask {
    for _, task := range c.MapTasks {
        if task.Status == TaskStatus_Idle {
            return task
        }
    }

    return nil
}

startMapTask and startReduceTask functions are used to start the timer for a task. If the task is not completed within the timeout, the coordinator will mark the task as idle and notify other workers to fetch the task.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (c *Coordinator) startMapTask(task *MapTask) {
    task.Status = TaskStatus_InProgress

    go func() {
        timeoutTimer := time.NewTimer(10 * time.Second)
        <-timeoutTimer.C
        c.Lock()
        defer c.Unlock()

        if task.Status != TaskStatus_Completed { // task is still in progress, worker is dead
            task.Status = TaskStatus_Idle
            c.Broadcast()
        }
    }()
}

Mapping

The doMapTask function reads the input file, calls the mapf function to generate intermediate key-value pairs, and writes the intermediate files to disk. The intermediate files are partitioned based on the hash of the key and the number of reduce tasks. After that worker reports the task is done to the coordinator.

To avoid a failed worker’s intermediate files conflicting the next worker’s intermediate files to run the same task, the intermediate files are written to temporary files first and then renamed to the final file name.

To facilitate the shuffle and reduce phases, the intermediate files are named using the format mr-X-Y, where X represents the Map task number and Y represents the Reduce task number. Then nMap * nReduce intermediate files are generated, these files will be easy to be assigned to nReduce reduce tasks.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// src/mr/worker.go

func doMapTask(task *MapTask, mapf func(string, string) []KeyValue) {
    filename := task.FileName
    file, err := os.Open(filename)
    if err != nil {
        log.Fatalf("cannot open %v", filename)
    }

    content, err := io.ReadAll(file)
    if err != nil {
        log.Fatalf("cannot read %v", filename)
    }
    file.Close()

    kva := mapf(filename, string(content))

    // partition intermediate data to nReduce files
    intermediate := make([][]KeyValue, task.NReduce)
    for _, kv := range kva {
        idx := ihash(kv.Key) % task.NReduce
        intermediate[idx] = append(intermediate[idx], kv)
    }

    // write intermediate files to disk
    for i, inkva := range intermediate {
        // Hints: A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number,
        // and Y is the reduce task number.
        filename = fmt.Sprintf("mr-%v-%v", task.Id, i)
        // existing file will be replaced
        file, err = os.CreateTemp("", "mr-*.tmp")
        if err != nil {
            log.Fatalf("cannot create %v", filename)
        }
        defer file.Close()
        for _, kv := range inkva {
            fmt.Fprintf(file, "%v %v\n", kv.Key, kv.Value)
        }
        if err := os.Rename(file.Name(), filename); err != nil {
            log.Fatalf("cannot rename %v", filename)
        }
    }
    // report task done
    if err := CallReportTaskDone(false, task.Id); err != nil {
        log.Fatalf("report task done failed: %v", err)
    }
}

Reducing

The doReduceTask function is similar to the doMapTask function. It reads the intermediate files, groups the key-value pairs by key, and calls the reducef function to generate the final output. The final output is written to the output file.

The shuffling step is same to the example in src/main/mrsequential.go, which sorts the intermediate key-value pairs by key and uses two pointers to group the values with the same key before reducing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// src/mr/worker.go

func doReduceTask(task *ReduceTask, reducef func(string, []string) string) {
    intermediate := make([]KeyValue, 0)
    for i := range task.NMap {
        filename := fmt.Sprintf("mr-%v-%v", i, task.Id) // intermediate file
        file, err := os.Open(filename)
        if err != nil {
            log.Fatalf("cannot open %v", filename)
        }
        reader := bufio.NewReader(file)

        // read intermediate file by line
        for {
            line, err := reader.ReadString('\n')
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatalf("cannot read %v", filename)
            }
            var kv KeyValue
            if _, err := fmt.Sscanf(line, "%s %s", &kv.Key, &kv.Value); err != nil {
                log.Fatalf("cannot parse %v", line)
            }
            intermediate = append(intermediate, kv)
        }

        file.Close()
    }

    sort.Sort(ByKey(intermediate))

    tmpFile, err := os.CreateTemp("./", "mr-*.tmp")
    if err != nil {
        log.Fatal(err)
    }
    outputFilename := fmt.Sprintf("mr-out-%v", task.Id)

    i := 0
    for i < len(intermediate) {
        j := i + 1
        for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
            j++
        }
        values := []string{}
        for k := i; k < j; k++ {
            values = append(values, intermediate[k].Value)
        }
        output := reducef(intermediate[i].Key, values)

        // this is the correct format for each line of Reduce output.
        fmt.Fprintf(tmpFile, "%v %v\n", intermediate[i].Key, output)

        i = j
    }

    if err := os.Rename(tmpFile.Name(), outputFilename); err != nil {
        log.Fatalf("cannot rename %v", outputFilename)
    }

    // report task done
    if err := CallReportTaskDone(true, task.Id); err != nil {
        log.Fatalf("report task done failed: %v", err)
    }
}

Testing

To test the MapReduce framework, we just to need to run the test-mr.sh script provided by the course. When getting PASSED ALL TESTS output, the implementation is correct.

Conclusion

In this article, we discussed Lab 1 of MIT 6.5840, which focuses on implementing a simple MapReduce framework. We learned about the MapReduce programming model, the coordinator and worker processes, and the RPC communication between them. By implementing the Map and Reduce functions, fetching tasks, and processing the tasks, we can build a basic MapReduce framework that can process large-scale data in a distributed manner.

I have published the full source code of the MapReduce framework on GitHub, hope it can help you understand the implementation details better.

Built with Hugo
Theme Stack designed by Jimmy