Introduction
MIT 6.5840, as known as 6.824, is a famous distributed systems course offered by MIT. The course covers a wide range of topics in distributed systems, including fault tolerance, consistency, replication, and sharding. It also delves into the design and implementation of distributed systems, with a focus on real-world systems like MapReduce, Raft, and ZooKeeper.
In this series of notes, I will document my learnings from the labs and lectures of MIT 6.5840. These notes are intended to serve as a reference for myself and others who are interested in distributed systems.
In this article, we will discuss Lab 1 of MIT 6.5840, which focuses on implementing a simple MapReduce framework.
MapReduce
In the lectures, we learned about the MapReduce programming model, which simplifies the development of large-scale data processing applications. The MapReduce model consists of two main functions:
- Map Function: This function processes input key-value pairs and generates intermediate key-value pairs.
- Reduce Function: This function processes intermediate key-value pairs and produces the final output.
Actually, the MapReduce model can be described as follows:
- Map: The Map function processes input key-value pairs and generates intermediate key-value pairs.
- Shuffle: The intermediate key-value pairs are shuffled and sorted to group all values associated with the same key.
- Reduce: The Reduce function processes the grouped key-value pairs and produces the final output.
Here are some points to remember about MapReduce:
- MapReduce reads and writes data from and to the distributed file system, such as GFS or HDFS.
- Reduce tasks only run after all the Map tasks have completed.
- Map and Reduce run in parallel across multiple machines.
- MapReduce handles machine failures by re-executing tasks on other machines.
Lab
In Lab 1 of MIT 6.5840, we are tasked with implementing the coordinator and worker processes for a simple MapReduce framework. The coordinator is responsible for scheduling tasks and handling failures, while the workers execute the Map and Reduce tasks.
Map and Reduce Functions
Though the Map and Reduce functions are provided to us, understanding how they work can help us implement the lab more effectively. Here is an example of a Map function:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| // src/mrapps/wc.go
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}
|
The Map
function processes the input file and generates key-value pairs where the key is a word and the value is 1
. The Reduce
function processes the intermediate key-value pairs and returns the count of occurrences of each word. In additional, these functions will be loaded by workers using Go’s plugin
package.
RPC
The communication between the coordinator and workers is done using Remote Procedure Calls (RPC). The RPC library is provided to us, we just need to learn how to use it. Here is a built-in example of an RPC call:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| // src/mr/coordinator.go
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}
// src/mr/rpc.go
type ExampleArgs struct {
X int
}
type ExampleReply struct {
Y int
}
// src/mr/worker.go
func CallExample() {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
reply := ExampleReply{}
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.Example", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
} else {
fmt.Printf("call failed!\n")
}
}
|
This example shows how to make an RPC call from a worker to the coordinator. The coordinator has a method Example
that takes an argument X
from the first argument and returns X + 1
by setting the Y
field of the reply.
Initializing the Coordinator
The coordinator is initialized by calling the MakeCoordinator
function, which takes the list of input files and the number of reduce tasks as arguments. The coordinator then starts the workers and assigns tasks to them.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
| // src/mr/task.go
type MapTask struct {
Id int
FileName string
NReduce int
Status TaskState
}
type ReduceTask struct {
Id int
NMap int
Status TaskState
}
type TaskState int
const (
TaskStatus_Idle TaskState = iota
TaskStatus_InProgress
TaskStatus_Completed
)
// src/mr/coordinator.go
type Coordinator struct {
// Your definitions here.
sync.Mutex
*sync.Cond
MapTasks []*MapTask
ReduceTasks []*ReduceTask
}
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
c.Mutex = sync.Mutex{}
c.Cond = sync.NewCond(&c.Mutex)
nMap := len(files)
c.MapTasks = make([]*MapTask, nMap)
for i, filename := range files {
c.MapTasks[i] = &MapTask{
Id: i,
FileName: filename,
NReduce: nReduce,
Status: TaskStatus_Idle,
}
}
c.ReduceTasks = make([]*ReduceTask, nReduce)
for i := range nReduce {
c.ReduceTasks[i] = &ReduceTask{
Id: i,
NMap: nMap,
Status: TaskStatus_Idle,
}
}
c.server()
return &c
}
|
Following the initialization, the coordinator add the map tasks for every input file and nReduce
reduce tasks to the task queue. Then it starts the RPC server to listen for incoming worker connections.
Worker
The worker fetch tasks from the coordinator and execute them until there are no more tasks to process. The worker can be implemented as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // src/mr/worker.go
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
typ, mapTask, reduceTask, err := CallFetchTask()
if err != nil {
log.Fatalf("fetch task failed: %v", err)
}
switch typ {
case TaskType_None:
return
case TaskType_Map:
doMapTask(mapTask, mapf)
case TaskType_Reduce:
doReduceTask(reduceTask, reducef)
default:
log.Fatalf("unknown task type: %v", typ)
}
}
}
|
The Worker
function fetches tasks from the coordinator using the CallFetchTask
RPC call. It then processes the tasks based on their type (Map or Reduce) by calling the doMapTask
or doReduceTask
functions.
Fetching Tasks
The coordinator provides a FetchTask
RPC handler for workers to fetch tasks. The coordinator fetches map tasks first and then reduce tasks. If there are no more tasks to process, it returns TaskType_None
. Because a worker may fail during task execution, when all tasks are fetched but not completed, the coordinator will wait for the tasks to be completed before returning.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
| // src/mr/rpc.go
type FetchTaskArgs struct{}
type FetchTaskReply struct {
TaskType TaskType
MapTask *MapTask
ReduceTask *ReduceTask
}
type TaskType int
const (
TaskType_None TaskType = iota
TaskType_Map
TaskType_Reduce
)
// src/mr/coordinator.go
// FetchTask RPC handler for worker to fetch a task
func (c *Coordinator) FetchTask(_ *FetchTaskArgs, reply *FetchTaskReply) error {
c.Lock()
defer c.Unlock()
// Map tasks
for {
if c.MapDone() {
break
}
if task := c.fetchMapTask(); task != nil {
reply.TaskType = TaskType_Map
reply.MapTask = task
c.startMapTask(task)
return nil
} else {
c.Wait() // wait for all map tasks to be completed
}
}
// Reduce tasks
for {
if c.Done() {
break
}
if task := c.fetchReduceTask(); task != nil {
reply.TaskType = TaskType_Reduce
reply.ReduceTask = task
c.startReduceTask(task)
return nil
} else {
c.Wait()
}
}
// ALl tasks are done
return nil
}
// fetch a map task that is idle
func (c *Coordinator) fetchMapTask() *MapTask {
for _, task := range c.MapTasks {
if task.Status == TaskStatus_Idle {
return task
}
}
return nil
}
|
startMapTask
and startReduceTask
functions are used to start the timer for a task. If the task is not completed within the timeout, the coordinator will mark the task as idle and notify other workers to fetch the task.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func (c *Coordinator) startMapTask(task *MapTask) {
task.Status = TaskStatus_InProgress
go func() {
timeoutTimer := time.NewTimer(10 * time.Second)
<-timeoutTimer.C
c.Lock()
defer c.Unlock()
if task.Status != TaskStatus_Completed { // task is still in progress, worker is dead
task.Status = TaskStatus_Idle
c.Broadcast()
}
}()
}
|
Mapping
The doMapTask
function reads the input file, calls the mapf
function to generate intermediate key-value pairs, and writes the intermediate files to disk. The intermediate files are partitioned based on the hash of the key and the number of reduce tasks. After that worker reports the task is done to the coordinator.
To avoid a failed worker’s intermediate files conflicting the next worker’s intermediate files to run the same task, the intermediate files are written to temporary files first and then renamed to the final file name.
To facilitate the shuffle and reduce phases, the intermediate files are named using the format mr-X-Y
, where X
represents the Map task number and Y
represents the Reduce task number.
Then nMap * nReduce
intermediate files are generated, these files will be easy to be assigned to nReduce
reduce tasks.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| // src/mr/worker.go
func doMapTask(task *MapTask, mapf func(string, string) []KeyValue) {
filename := task.FileName
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
// partition intermediate data to nReduce files
intermediate := make([][]KeyValue, task.NReduce)
for _, kv := range kva {
idx := ihash(kv.Key) % task.NReduce
intermediate[idx] = append(intermediate[idx], kv)
}
// write intermediate files to disk
for i, inkva := range intermediate {
// Hints: A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number,
// and Y is the reduce task number.
filename = fmt.Sprintf("mr-%v-%v", task.Id, i)
// existing file will be replaced
file, err = os.CreateTemp("", "mr-*.tmp")
if err != nil {
log.Fatalf("cannot create %v", filename)
}
defer file.Close()
for _, kv := range inkva {
fmt.Fprintf(file, "%v %v\n", kv.Key, kv.Value)
}
if err := os.Rename(file.Name(), filename); err != nil {
log.Fatalf("cannot rename %v", filename)
}
}
// report task done
if err := CallReportTaskDone(false, task.Id); err != nil {
log.Fatalf("report task done failed: %v", err)
}
}
|
Reducing
The doReduceTask
function is similar to the doMapTask
function. It reads the intermediate files, groups the key-value pairs by key, and calls the reducef
function to generate the final output. The final output is written to the output file.
The shuffling step is same to the example in src/main/mrsequential.go
, which sorts the intermediate key-value pairs by key and uses two pointers to group the values with the same key
before reducing.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| // src/mr/worker.go
func doReduceTask(task *ReduceTask, reducef func(string, []string) string) {
intermediate := make([]KeyValue, 0)
for i := range task.NMap {
filename := fmt.Sprintf("mr-%v-%v", i, task.Id) // intermediate file
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
reader := bufio.NewReader(file)
// read intermediate file by line
for {
line, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("cannot read %v", filename)
}
var kv KeyValue
if _, err := fmt.Sscanf(line, "%s %s", &kv.Key, &kv.Value); err != nil {
log.Fatalf("cannot parse %v", line)
}
intermediate = append(intermediate, kv)
}
file.Close()
}
sort.Sort(ByKey(intermediate))
tmpFile, err := os.CreateTemp("./", "mr-*.tmp")
if err != nil {
log.Fatal(err)
}
outputFilename := fmt.Sprintf("mr-out-%v", task.Id)
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(tmpFile, "%v %v\n", intermediate[i].Key, output)
i = j
}
if err := os.Rename(tmpFile.Name(), outputFilename); err != nil {
log.Fatalf("cannot rename %v", outputFilename)
}
// report task done
if err := CallReportTaskDone(true, task.Id); err != nil {
log.Fatalf("report task done failed: %v", err)
}
}
|
Testing
To test the MapReduce framework, we just to need to run the test-mr.sh
script provided by the course. When getting PASSED ALL TESTS
output, the implementation is correct.
Conclusion
In this article, we discussed Lab 1 of MIT 6.5840, which focuses on implementing a simple MapReduce framework. We learned about the MapReduce programming model, the coordinator and worker processes, and the RPC communication between them. By implementing the Map and Reduce functions, fetching tasks, and processing the tasks, we can build a basic MapReduce framework that can process large-scale data in a distributed manner.
I have published the full source code of the MapReduce framework on GitHub, hope it can help you understand the implementation details better.