Hadoop rant

Hadoop’s rise to fame is based on a fundamental optimization principle in computer science: data locality. Which translated to Hadoop speak would be: Move computation to data, not the other way around

In this post I will rant about one core Hadoop area where this principle is broken (or at least not implemented yet). But, before that I will highlight the submission process of a MapReduce job that processes data residing in HDFS:

On the client:
1. gather all the correct confs, user input etc ...
2. contact NameNode to get a list of files that need to be processed
3. generate a lists of splits that need to run Map tasks on, by:
3.1 for each file returned in (2):
3.2     ask NameNode for a list of the file's blocks and their locations
3.3     compute splits bases on config settings etc ...
4. send all the split info and some metadata to the JobTracker

Let’s repeat the principle again: Move computation to data, not the other way around

My problem with the above pseudo code revolves with step 2 and 3. If we think of file metadata as another kind of data then the NameNode is where the data for the steps (2) and (3) resides – then why is this data being moved to the client?! That same split logic can be executed much more efficiently in the NameNode, where the data is present locally, without the high cost of RPC calls to get the block locations. This is also at the root cause of the slow startup of MR jobs which process a lot of small files as compared to MR jobs, with the same number of splits, but much larger files.

Yes, someone could make the point that the reason for not performing the computation in the NN would be to keep the load on the NN minimal so that it can serve the DNs in the cluster as well as other clients. Well, imagine a job processing N files (say 10K).  Is it computationally cheaper to serve N block location requests or a single request to compute the splits? I’ll speculate the latter.

Some better job submission models would be

Model 1

1. gather all the correct confs, user input etc ...
2. contact NameNode and ask to generate a list of splits and store them locally under some id
3. submit job to JobTracker and ask it to fetch the list of splits from the NameNode under the given id

This would achieve both (a) data locality when processing the splits and (b) minimize network traffic between NN -> Client and Client -> JT

Model 2

1. gather all the correct confs, user input etc ...
2. submit job to JobTracker and let it:
2.1 figure out the splits by asking the Namenode as it's done today, *but*
2.2 start scheduling the Map tasks in a streaming fashion as it computes the splits

This would (a) reduce the amount of time it takes to start an MR job by starting to execute tasks as the splits are computed (b) smoothen the load on the NameNode as JT should not need to know all the splits before it can load up the cluster and most importantly (c) allow for extremely large jobs (millions of map tasks) to be executed.

A thought experiment

I’m certain that you’ve heard from different Hadoop vendors that their versions can scale to peta/exa/zetta/yotta/whatever bytes. What they refer to is usually the storage side of things. However, what good is having data when you can’t efficiently process it? Here’s a thought experiment: imagine running a MapReduce job that processes “just” 10 peta bytes …. even with a generous 1GB block size in HDFS that would mean 10 million splits and the same number of map tasks … anyone out there brave/willing to take this experiment beyond just a thought experiment? If you succeed, can you do the same for 10x, 100x of that data size?

Ledion Bitincka

Posted by