MapReduce Architecture Implementation

The article below and the code is written by looking at the lab requirements as per MIT 6.824's Lab 1. You can find the original MapReduce Paper here. A helpful lecture video from the same MIT course is on YouTube. A helpful Computerphile video on the same topic is also on YouTube.

A crude and simple implentation using Go channels is here, however the original problem set goes about implenting it using IPC fasion using RPC to communicate between workers.

Overview of MapReduce

MapReduce is a programming model meant for large scale processing and generating of big datasets on a cluster in a parallel and distributed fashion. Several implementations of MapReduce are available, with Apache Hadoop being one of the most popular.

Schematic of the Map Reduce framework showing input data splits, map workers, intermediate files, reduce workers, and final output files.Schematic of the Map Reduce framework showing input data splits, map workers, intermediate files, reduce workers, and final output files.

MapReduce consists of the following programming model:

  • A Map() function that emits intermediate (key,value)(key, value) pairs to be picked up by the reduce phase later.
  • A Reduce() function that collects and aggregates the information passed from the map() grouped by the keykey.
  • An implicit Shuffle() procedure takes care of grouping the intermediate emitted values and group them to the correct reducer.
  • You can also specify a Combine() function that transforms the data similar to a Reduce() but is executed on the Mapper before being sent as intermediate data to the actual reduce workers. Often times, this will end up being the same function as Reduce().

With this programming model, several tasks that deal with distributed and large scale data mapping and processing can be expressed in very simple terms.

Word Count

We can model this as reading the contents of the document and emitting 1 for each time a word is encountered.

On the reducer side, we can group the values by the word from the document (the keykey) and sum up the number of occurences.

Pseudocode
1map(documentName):
2 documentContents = read(documentName)
3 for each word in documentContents:
4 emitIntermediate(word, 1)
1reduce(key, values):
2 # values are grouped by key when shuffled
3 result = 0
4 for each v in values:
5 result += int(v)
6
7 # output final count
8 emit(result)

Distributed Grep

We can model the map() function as emitting a line if the line contains the word we are trying to match against.

The reduce() function can be an identity function that just outputs what its input was.

This way we can get the lines where the search term occurs.

Pseudocode
1map(documentName, searchterm):
2 documentContents = readlines(documentName)
3 for each line in documentContents:
4 if searchterm in line:
5 emitIntermediate(line)
1reduce(key, line):
2 emit(line)

More examples like count of url access frequency, reverse web-link graph, distributed sort etc. are given in the original paper.

My MapReduce Implementation in Go

I took some time to write this simple model in Go. MapReduce is supposed to be a simple model to program and it should feel the same way while writing the code. I remember deleting the code I initially had as it was getting a bit complex and I felt it was unnecessary. Sometimes, simplicity is the key.

For this crude and simple map-reduce implementation, I had followed somewhat of the setup that is provided in Lab 1 of MIT 6.824 problem set.

They provide some starter code and some files which you can use for this lab.

1$ git clone git://g.csail.mit.edu/6.5840-golabs-2025 6.5840
2$ # a starter is under src/mr and src/mrapps

Using that as a reference, I wrote some of my own simple implementation that does this architecturally the same.

They provide some text under the data/ directory, to test your program against. In this case, there are a bunch of texts of classical stories which we can test against.

In a real scenario (and the ones in the MIT lab's code) RPC calls are used for transferring data between workers, I simply use Go's channels.

1$ ls -1 data/
2pg-being_ernest.txt
3pg-dorian_gray.txt
4pg-frankenstein.txt
5pg-grimm.txt
6pg-huckleberry_finn.txt
7pg-metamorphosis.txt
8pg-sherlock_holmes.txt
9pg-tom_sawyer.txt

Details

I create a type to handle and pass around the intermediate values:

1type KV struct {
2 key string
3 value int
4}

I use the main function to accept files as arguments to the program and we will run the word count against them. Each file will be processed by a single separate map worker before passing on the data to the reduce worker.

1func main() {
2 args := os.Args[1:]
3
4 if len(args) < 1 {
5 fmt.Println("Pass in files to process")
6 return
7 }
8
9 fmt.Println(len(args), "files passed")
10}

Here, main is the coordinator or the master as in the MapReduce paper.

We make 2 channels, mapChan to send intermediate values to reducers and mapperDoneChan to signal that we are done with all the mapping; there will be no more data and are just waiting for the reducers to finish.

1mapChan := make(chan KV)
2mapperDoneChan := make(chan bool)

We can spin off the map workers for all the files in parallel using the go keyword.

1for _, f := range args {
2 go mapWorker(f, &mapChan, mapperDoneChan)
3}

We expect to get back some data about the words and their count of occurences. This can be modelled using a map. We can pass the result back when we are done reducing over all the data.

1resultChan := make(chan map[string]int)
2
3go reduceWorker(&mapChan, resultChan)

The next part is crucial. We keep the mapChan open until all the mappers are done. We can then signal back that there is no more mapping needed and instead we should now wait for the reduce to finish.

1cnt := 0
2for cnt != len(args) {
3 <-mapperDoneChan
4 cnt++
5}
6
7// close the intermediate channel to signal that all mappers are done,
8// no more sending to reducers is needed.
9close(mapChan)

We can now wait and receive the result from our reducer on the result channel.

1result := <-resultChan

I write the output to a file for persistence and testing later.

1outfile, err := os.Create("output.txt")
2if err != nil {
3 log.Println("Could not create output file, length of the count map is", len(result))
4}
5
6for k, v := range result {
7 fmt.Fprintf(outfile, "%v - %v\n", k, v)
8}
9
10outfile.Close()

This far, it was all plumbing code. A framework like Hadoop will take care of all this file and message passing for us. All we need to do is supply the map and the reduce functions.

Let's define them next.

1func mapWorker(fn string, intermediateChan *chan KV, mapperDoneChan chan bool) {
2 f, err := os.Open(fn)
3 if err != nil {
4 log.Printf("Could not open file %v", fn)
5 mapperDoneChan <- true
6 return
7 }
8 scanner := bufio.NewScanner(f)
9 // Set the split function for the scanning operation.
10 scanner.Split(bufio.ScanWords)
11
12 for scanner.Scan() {
13 word := scanner.Text()
14 *intermediateChan <- KV{word, 1}
15 }
16 if err := scanner.Err(); err != nil {
17 fmt.Println("Error reading input file:", fn, err)
18 }
19
20 mapperDoneChan <- true
21}

In the mapWorker, I open the file for reading and set the bufio.Scanner to scan for each word at a time.

While reading in words, I pass along a struct of {key: word, value: 1} into the channel to the reduce worker. This corresponds to the emitIntermediate(word, 1) call in our pseudocode.

1func reduceWorker(intermediateChan *chan KV, resultChan chan map[string]int) {
2 result := make(map[string]int)
3
4 for imdt := range *intermediateChan {
5 if v, ok := result[imdt.key]; ok {
6 result[imdt.key] = v + 1
7 } else {
8 result[imdt.key] = 1
9 }
10 }
11
12 resultChan <- result
13}

For the reduce worker, I keep track of the counts in a map as they come in from the various mappers over the channel, incrementing their count by 1.

How it all works

Map workers are spun off for each file provided and they scan the document by words and for each word emit a struct containing word as the keykey and 1 as the valuevalue.

The intermediate values are collected over a channel in a Reduce worker which is keeping track of the counts from the different map workers.

In the end, we pass this final map of count of words back to the main goroutine to print the result.


Final results

Running the code the output produced is of the following format.

1abandon - 3
2abandoned, - 1
3abandoned - 10
4abandoned. - 2
5abandoning - 2
6abandons - 1
7a-barking - 1
8abash - 1
9...

To test out, we can compare our program's output with the Unix wc program. We need a sum of all the counts first however. We can do so by using awk and pulling the third column from the above output. We can join them back on + to form an addition expression. Finally we can pass the expression to bc for evaluation.

There is a slight problem, we have a trailing new line and the tr command will therefore have an extra + at the end which will cause bc to yell at a syntax error. We can suffix the entire expression with a 0 to have a fix for our addition.

1$ echo $(cat output.txt | awk '{ print $3}' | tr '\n' '+')0 | bc -ql

We can compare this with the total words output from wc.

1$ wc -w data/*.txt --total=only

Looks like our modelling is correct and our simple implementation works.

1~/c/m/g/mr main* % ./test-mr-wc.sh
2Program output
3608645
4Output from wc
5608645