A past Friday whose date I refuse to remember I hit a wall on my Hadoop experiments. A Reducer was dying out of memory and no matter how many of his peers came to the rescue the error was consistent.
- Nooo, I refuse to work on such large data. What do you think I am, a donkey?!? - My reducers were saying.
Obvious things first I went for the two courses of action as most people not understanding a problem in software do.
- Here, have more reducers to help you out. -D mapreduce.job.reduces=600
Little did I realise that if your reducer is stateless that helps little.
- Here, have more memory -
As a manger would say, if more developers wont cut it I still have to throw more money at the problem so that it goes away. I have always had little empathy with such kinds but really its not their fault if they do not understand the problem and just want it to go away. Their decisions, in most cases, are still rational and full of good will, but they come from a completely different angle.
By time I started throwing memory at the problem with the almighty -D mapreduce.reduce.memory.mb=14336 and -D mapreduce.reduce.java.opts=-Xmx14g I knew that if even that worked, I was only buying time. This was still the smallest of my production datasets so a quick fix would not cut it for the larger ones.
So then I took my week-end away from the computer. I did the usual shopping and other chores, but already on Friday evening the solution was brewing at the back of my mind.
1 Problem description
What the reducer was supposed to do was to merge the collections in the state of a bunch of objects into an object of the same type but will all the data. One of those collections was in the form of a string which had to be concatenated. Further since this is Hadoop the containing object had to be writable and be capable of flying through the whole infrastructure for subsequent processing. That meant the data had to be encoded to occupy as little space possible and the representation had to be unique, that is the same string had to have, unequivocally, the same representation.
2 Ideal theoretical solution
I started looking at hashes, but I had another constraint in my problem. The string representations have to be comparable and the comparator needs to order the strings in a given way that I want. If I had a non inversable hash function, which most are, at least it had to be monotonic, good luck with that. A collision free hash would have done the trick for the first criteria, but then I needed the second. Now if there are any mathematicians out there with time and would like to design such a function I will gladly speak with you. Such a function or class of functions would make my life much easier.
In the absence of such a nice thing, which may well exist but I do not know about. I had to apply another strategy.
3 Brute force and Ignorance
The choice for data representation was clear it had to a lose-less compression. Nice, java.util.zip is full of goodies to do just that.
Since memory was not big enough, I went for disk. The idea was to coalesce the state of the objects into files. Concatenating in the file the state and then reading the compressed versions of the files into memory. I am lucky in that the state of the object, specially the string compresses well since it has a lot of repetition.
Now, there may be a need to coalesce coalesced objects. That is my merging of state can be done in stages. To solve that problem I needed to be careful because most compression mechanisms need a dictionary that is they compress what they have seen not something that may potentially arrive later. I could have potentially used something like a Bloom filter, but if my strings are not preserved verbatim I need to re-think some of the theory of what I do with them and that may take yet another PhD.
4 Rules of engagement
State is stored in objects uncompressed at first. When I need to merge I dump that state to a file, UUIDs make for nice file names. When I am done with merging I read the contents of the file through a DeflaterInputStream which hands be the compressed representation of the file as bytes. To make things transport ready I BASE64 encode it and store it in my object state, making sure I remember it is compressed. When I next have to merge the state of objects which are compressed I write back the state to file through a InflaterInutStream which writes to file the uncompressed version of the data. I can then append all the data I want to that file, which will then be slurped back into memory and compressed in the process.
Since I have several types I play with what I described above is what happens to strings. For the longs and ints that I also use I do the same only I write them to a DataOutputStream so that I do not have to handle their string representation with toString(), valueOf() and field delimiters explicitly.
Of course you then have to use the data, and preferably with it not blowing up your memory. For the string a buffered reader way of accessing it will appear in the code base shortly, it will come to play in later parts of the processing. For the ints and longs a DataInputStream makes them available to you, so you can consume them and as long as you do not hold on to too many of them in memory you are good.
To read the compressed versions you use the read function. To read the usable, non compressed versions, you use the readData function which hands over to you a DataInputStream.
5 Here is one I made earlier
There is no need for you to program all that, you can view, review, inspect, poke use, abuse and extend an implementation of this, it is over at github. It ships with a test class, for my own sanity and so that utilisation is documented. And all is blended with an old-school Makefile, just because.
Enjoy and let me know in the comments if you run into any problems or have a better solution.