Michael Kopp About the Author

Michael is aTechnical Product Manager at Compuware. Reach him at @mikopp

About the Performance of Map Reduce Jobs

One of the big topics in the BigData community is Map/Reduce. There are a lot of good blogs that explain what Map/Reduce does and how it works logically, so I won’t repeat it (look here, here and here for a few). Very few of them however explain the technical flow of things, which I at least need, to understand the performance implications. You can always throw more hardware at a map reduce job to improve the overall time. I don’t like that as a general solution and many Map/Reduce programs can be optimized quite easily, if you know what too look for. And optimizing a large map/reduce jobs can be instantly translated into ROI!

The Word Count Example

I went over some blogs and tutorials about performance of Map/Reduce. Here is one that I liked. While there are a lot of good tips out there, none, except the one mentioned, talk about the Map/Reduce program itself. Most dive right into the various hadoop options to improve distribution and utilization. While this is important, I think we should start the actual problem we try to solve, that means the Map/Reduce Job.

To make things simple I am using Amazons Elastic Map Reduce. In my setup I started a new Job Flow with multiple steps for every execution. The Job Flow consisted of one master node and two task nodes. All of them were using the Small Standard instance.

While AWS Elastic Map/Reduce has its drawbacks in terms of startup and file latency (Amazon S3 has a high volatility), it is a very easy and consistent way to execute Map/Reduce jobs without needing to setup your own hadoop cluster. And you only pay for what you need! I started out with the word count example that you see in every map reduce documentation, tutorial or Blog. The result of the job always produces files that look something like this:

the: 5967
all: 611
a: 21586

That idea is to count the occurrence of every word in a large number of text files. I processed around 30 files totaling somewhere around 200MB in size. I ran the original python version and then made a very small change to it. Without touching the configuration of hadoop I cut the execution time in half:

The Original Code:

#!/usr/bin/python import sys import re def main(argv): line = sys.stdin.readline() pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*") try: while line: for word in pattern.findall(line): print "LongValueSum:" + word.lower() + "\t" + "1" line = sys.stdin.readline() except "end of file": return None if __name__ == "__main__": main(sys.argv)

The Optimized Code:

#!/usr/bin/python import sys import re def main(argv): line = sys.stdin.readline() pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*") map = dict() try: while line: for word in pattern.findall(line): map[word.lower()] = map.get(word.lower(), 0) + 1 if ( len(map) > 10000 ): for item in map.iteritems(): print "LongValueSum:" + item[0] + "\t" + str(item[1]) map.clear() line = sys.stdin.readline() for item in map.iteritems(): print "LongValueSum:" + item[0] + "\t" + str(item[1]) except "end of file": return None if __name__ == "__main__": main(sys.argv)

Instead of “emitting” every word with value 1 to the OutputCollector, I did an internal reduce before emitting it. The result is that instead of emitted the word ‘a’ 1000 times with value 1, I emitted it 1 time with value 1000. The end result of the job is the same, but in half the time. To understand this we need to look at the execution flow of map reduce.

Execution Path and Distribution

Look at the following Flow Diagram taken from the “Hadoop Tutorial from Yahoo!” (Yahoo! Inc.) / CC BY 3.0

Map Reduce Flow

Map Reduce Flow

Elastic Map Reduce first schedules a Map Task task per file (or parts of the file). It then feeds each line of the file into the map function. The map function will emit each key/value, in this case each word of the line, to the OutputCollector. Each emitted key/value will be written to an intermediate file for later reduce. The Shuffle Process will make sure that each key, in this case each word, will be sent to the same reduce task (meaning hadoop node) for aggregation. If we emit the same word multiple times it also needs to be written and sent multiple times, which results in more I/O (disk and network). The logical conclusion is that we should „pre-reduce“ this on a per task node basis and send the minimal amount of data. This is what the Combiner is for, which is really a Reducer that is run locally on the same node after the Mapping. So we should be fine, right? Not really.

Inside of Amazons Elastic Map Reduce

To get a better idea of where I spent the time, I deployed dynaTrace into Amazons Map Reduce environment. This can be done fully automated with a simple bootstrap action (I will publish the result as a Fastpack on our community at a later time).

The original python run lasted roughly 5 minutes each run (between 290 and 320 seconds), while the optimized ran around 3 minutes (160-170 seconds). I used dynaTrace to split those run times into their different components to get a feel for where we spend time. Some numbers have a rather high volatility which, as I found out, is due to Amazon S3 and to a smaller degree garbage collection. I executed it several times and the volatility did not have a big impact on the overall job execution time.

Map Reduce Job Business Transaction that details where we spend our time

Map Reduce Job Business Transaction that details where we spend our time

Click on the picture to analyze the details and you will see dramatic improvements on the mapping, combine and sort times. The Total Mapping time in this example is the overall execution time of all scheduled map tasks. The optimized code executed in less than 60% of the time. To a large degree this is due the map function itself (Map Only Time), which is actually quite surprising, after all we were not doing anything less really?

The next thing we see is that the combine time has dropped dramatically, we could say it nearly vanished! That makes sense after all we were making sure that we emitted less duplicates, thus less to combine. In fact it might make sense to stop combining at all as we will see later on. Another item that has dramatically improved is the sort. Again that makes a lot of sense, less data to sort. While the majority of the combine and sort happens in a separate thread, it still saves a lot of CPU and I/O time!

On the other hand neither shuffle nor reduce time itself have changed really. I identified the fluctuations the table does show, as being AWS S3 volatility issues via a hotspot analysis, so I ignored them. The fact that we see no significant improvements here makes sense. The resulting intermediate files of each map task do not look much different, whether we combine or use the optimized code.

So it really was the optimization of the map operation itself, that lead to overall improvement in job run time. While I might have achieved the same goal by doubling the number of map nodes, it would cost me more to do so.

What happens during mapping

To understand why that simple change has such a large impact we need to look at what happens to emitted keys in a Map Job.

flow of the data from the mapper to memory buffer, sort&combine and finally the merge

flow of the data from the mapper to memory buffer, sort&combine and finally the merge

What most Map/Reduce tutorials forget to mention is that the collect method called by the Mapper serializes the key/value directly to an in-memory buffer, as can be seen in the diagram above and the hotspot below.

When the Mapper emits a key via collect, it gets written to an in memory buffer

When the Mapper emits a key via collect, it gets written to an in memory buffer

Once that buffer has reached a certain saturation, the Spill Thread kicks in and writes the data to an intermediate file (this is controlled by several io.sort.spill. options). Map/Reduce normally deals with a large amount of potentially never repeating data, so it has to spill to file eventually.

The Spill Thread sorts, combines and writes the data to file in parallel to the mapping

The Spill Thread sorts, combines and writes the data to file in parallel to the mapping

It is not enough to simple dump the data to file, the content has to be sorted and combined first. The sort is a preparation for the shuffle process and relative efficient (it sorts based on binary bytes, because the actual order is not important). The combine however needs to de-serialize the key and values again prior to writing.

The combine in the spill thread needs to deserialize the data again

The combine in the spill thread needs to deserialize the data again

So emitting a key multiple times has

  1. a direct negative impact on the map time and CPU usage, due to more serialization
  2. an indirect negative impact on CPU due to more spilling and additional deserialization in the combine step
  3. a direct impact on the map task, due to more intermediate files, which makes the final merge more expensive

Slower mapping obviously impacts the overall Job time directly. The more data we emit, the more CPU and I/O is consumed by the Spill Thread. If the SpillThread is too slow (e.g. expensive combine, slow disk), the in memory buffer might get fully saturated, in which case the map task has to wait (this can be improved by adjusting the io.sort.spill.percent hadoop option).

Showing that the Mapper can be slowed down by the Spill Thread if there are too many keys to sort or combine

The Mapper was paused by the Spill Thread, because there was too much data to sort and combine

Finally after the Map Task finishes the actual mapping, it writes, sorts and combines the remaining data to file. Finally it merges all intermediate files into a single output file (which it might combine again). More emitted key’s thus mean more intermediate files to merge as well.

The complete Map Task shows the mapping itself and the flush at the end, which sorts, combines, merges and combines again

The complete Map Task shows the mapping itself and the flush at the end, which sorts, combines, merges and combines again

While the final flush only “slows” us down for 1.5 seconds, this still amounts to roughly 8 percent of the Mapper task. So we see it really does make a lot of sense to optimize the output of the map operation, prior to the combine or reduce step. It will save CPU, Disk and Network I/O and this of course means less Nodes are needed for the same work!

The one million X factor

Until now I have tried to explain the I/O and CPU implications of emitting many keys, but there is also another factor that should be considered when writing Map/Reduce jobs. The map function is executed potentially millions of times. Every ms consumed here can potentially lead to minutes in job time. Indeed most of the gain of my “optimization” came from speeding up the mapping itself and not from more effective combine and disk writes. On average each map method call had a little less to do and that paid off.

What that struck me when looking at Map/Reduce first, was that most samples and tutorials use scripting languages like python, perl or something else. This is realized via the Hadoop Streaming framework. While I understand that this lowers the barrier to write Map/Reduce jobs, it should not be used for serious tasks! To illustrate this I ran a randomly selected java version of the Word Count sample. The result is another 50-60% improvement on top of the optimized python (it might be even better, in a larger task).

Several Java and Python Word Count Map Reduce Jobs, that show the vast differences in execution times

Several Java and Python Word Count Map Reduce Jobs, that show the vast differences in execution times

The table shows the various execution times for:

  • Optimized Java: ~1.5 minutes job execution time
    The same trick as in python, if anybody really wants to have the code, it let me know.
  • Optimized Java with no Combiner: roughly 10 seconds faster than the optimized one
    As pointed out the pre-reduce in the map method makes the combine nearly redundant. The improvement in overall job time is minimal however due to the smallness of the job.
  • Original Java: ~2.5 minutes
    We see that all the times (mapping, combining, sorting, spilling) are a lot higher, as we came to expect
  • Optimized Python: ~3 minutes
  • Non-optimized python: ~5 minutes

Java is faster than Python every time and the optimized version of Java is twice as fast as the optimized python version. Remember that this is a small example and that the hadoop parameters are the same for all runs . In addition CPU was never a limiting factor. If you execute the same small code millions of times, even small differences matter. The difference between a single line mapped in java and python is maybe not even measurable. With 200 MB of text it adds up to more than a minute! The same would be true for small changes in any java Map/Reduce job. The difference between the original and the optimized java version is still more than 60% improvement!

Word Count is very simple compared to some of the map/reduce jobs are out there, but it illustrates quite nicely that performance of our own code still matters. The key take away is that we still need to analyze and optimize the map task and our own code. Only after that is satisfactory, do we need to play around with the various hadoop options to improve distribution and utilization.

Conclusion

Map Reduce is a very powerful and elegant way to distribute processing of large amounts of data across many hosts. It is also a bit of a brute and it pays of to analyze and optimize the performance of the map and reduce tasks before we start playing with hadoop options. While Map/Reduce can reduce the job time by throwing more hardware the problem, easy optimizations often reach a similar effect. In the cloud and AWS Elastic Map Reduce that means less cost!

Comments

  1. @Janno

    no I haven’t. But then I wasn’t trying to show that java is faster than python, but that small changes in the map function can have a huge impact on the overall job time.

  2. thiago vieira says:

    Great text.
    Is the Optimized Java used available? where can I get it?

  3. Scott Giles says:

    This is a great optimization… until the Java Map (vice a python dict) takes up more space than the JVM has available and it has to start swapping. Did you try throwing a real dictionary at this job? Or even worse, a englishforeign language dictionary?

  4. @thiago You can have it if you want, send me an email: michael.kopp@dynatrace.com

    @Scott I ran this against a documents using 100K random words from a german and english dictionary. Which is not really realistic as both english and german have words that repeat it self very often, like ‘a’ ‘and’ ‘to’ and so on. I also ran it against real texts, which fairs a lot better than the random generated once!
    10K Words is really not much for a modern machine anyway. Anyway the point is not to make word count faster. Its that a Map/Reduce Job is running the same thing millions of times and is not meant for fine grained data. Which is what we can optimize for.

  5. A very informative information…Its really a great and useful tips. Thank you for sharing this up.

  6. This is a great optimization… until the Java Map (vice a python dict) takes up more space than the JVM has available and it has to start swapping.
    Thank you for sharing this up.

  7. Hello dear.You have written a great post.it way to hard to understand it but you did good job elaborate it.

Comments

*


− six = 2