Writing writables is fun when you do it wrong

Your own writeables

How making your own writeables is the best thing you can do

Hadoop is a very nice tool that has over the last few years become the workhorse of the new data science industry that is gathering maturity. It does a lot of things for you, mostly shuffling your data to the computer where it is needed for computation. To do this it has a transport mechanism taking messages across nodes constituted. The messages are constituted of two parts a routing address, which states where it should end, and a payload which is what you are transporting. This is as old as the world, and it also applies to hadoop.

I will focus in this post on the payload, as the writables are effectively the entry point to make your application more reliable and maintainable.

The basic writable interface is formed of two very simple methods. A write in which it is the responsibility of the developer to write all the fields that need to travel to a DataOutput object and a readFields in which all fields should be read back into the object state. The nice thing about the Data input and output interface is that it provides a read and write method for each basic type and Strings, freeing you from most of the hassle of casting and converting the bytes you would read into something usable.

There are however a few idiosyncrasies with it. At first sight the fact that you expect an end of file exception EOFException when the output has been consumed sounds like a good idea. You just put your fields in a while(true) loop and you are a happy bunny. Until you make part of your state a collection of writables. Then you are in a pickle forever. Why? Very simple the contained type will catch the end of file exception react to it as expected and not forward it upstream since in all strict justice the exception has been consumed. So now you have to change the behaviour of a writable depending on its location in the encapsulation. Is it the encapsulated or does it encapsulate? If encapsulated it has to forward up the call chain the exception if it encapsulates it just consumes it. This is a text book example of why its a bad idea to rely on exceptions for program flow.

When storing collections in a DataOutoutStream the simplest at most straight forwards way is to pre-pend the size of any collection to the collection elements, like that you do not have to loop forever in the loading of your data.

As per usual the code is over at github, this time I decided to use a maven as a build mechanism. Once cloned a simple mvn test will execute all the unit tests associated with the code. Hoping you learn from this valuable code which shows clearly how not to do things and you consider doing them properly.

Author: Juan Amiguet

Created: 2015-03-23 Mon 06:55

Emacs 24.3.1 (Org mode 8.2.4)



Coalescing Tools

Coalescing Tools

A past Friday whose date I refuse to remember I hit a wall on my Hadoop experiments. A Reducer was dying out of memory and no matter how many of his peers came to the rescue the error was consistent.

  • Nooo, I refuse to work on such large data. What do you think I am, a donkey?!? - My reducers were saying.

Obvious things first I went for the two courses of action as most people not understanding a problem in software do.

  • Here, have more reducers to help you out. -D mapreduce.job.reduces=600

Little did I realise that if your reducer is stateless that helps little.

  • Here, have more memory -

As a manger would say, if more developers wont cut it I still have to throw more money at the problem so that it goes away. I have always had little empathy with such kinds but really its not their fault if they do not understand the problem and just want it to go away. Their decisions, in most cases, are still rational and full of good will, but they come from a completely different angle.

By time I started throwing memory at the problem with the almighty -D mapreduce.reduce.memory.mb=14336 and -D mapreduce.reduce.java.opts=-Xmx14g I knew that if even that worked, I was only buying time. This was still the smallest of my production datasets so a quick fix would not cut it for the larger ones.

So then I took my week-end away from the computer. I did the usual shopping and other chores, but already on Friday evening the solution was brewing at the back of my mind.

1 Problem description

What the reducer was supposed to do was to merge the collections in the state of a bunch of objects into an object of the same type but will all the data. One of those collections was in the form of a string which had to be concatenated. Further since this is Hadoop the containing object had to be writable and be capable of flying through the whole infrastructure for subsequent processing. That meant the data had to be encoded to occupy as little space possible and the representation had to be unique, that is the same string had to have, unequivocally, the same representation.

2 Ideal theoretical solution

I started looking at hashes, but I had another constraint in my problem. The string representations have to be comparable and the comparator needs to order the strings in a given way that I want. If I had a non inversable hash function, which most are, at least it had to be monotonic, good luck with that. A collision free hash would have done the trick for the first criteria, but then I needed the second. Now if there are any mathematicians out there with time and would like to design such a function I will gladly speak with you. Such a function or class of functions would make my life much easier.

In the absence of such a nice thing, which may well exist but I do not know about. I had to apply another strategy.

3 Brute force and Ignorance

The choice for data representation was clear it had to a lose-less compression. Nice, java.util.zip is full of goodies to do just that.

Since memory was not big enough, I went for disk. The idea was to coalesce the state of the objects into files. Concatenating in the file the state and then reading the compressed versions of the files into memory. I am lucky in that the state of the object, specially the string compresses well since it has a lot of repetition.

Now, there may be a need to coalesce coalesced objects. That is my merging of state can be done in stages. To solve that problem I needed to be careful because most compression mechanisms need a dictionary that is they compress what they have seen not something that may potentially arrive later. I could have potentially used something like a Bloom filter, but if my strings are not preserved verbatim I need to re-think some of the theory of what I do with them and that may take yet another PhD.

4 Rules of engagement

State is stored in objects uncompressed at first. When I need to merge I dump that state to a file, UUIDs make for nice file names. When I am done with merging I read the contents of the file through a DeflaterInputStream which hands be the compressed representation of the file as bytes. To make things transport ready I BASE64 encode it and store it in my object state, making sure I remember it is compressed. When I next have to merge the state of objects which are compressed I write back the state to file through a InflaterInutStream which writes to file the uncompressed version of the data. I can then append all the data I want to that file, which will then be slurped back into memory and compressed in the process.

Since I have several types I play with what I described above is what happens to strings. For the longs and ints that I also use I do the same only I write them to a DataOutputStream so that I do not have to handle their string representation with toString(), valueOf() and field delimiters explicitly.

Of course you then have to use the data, and preferably with it not blowing up your memory. For the string a buffered reader way of accessing it will appear in the code base shortly, it will come to play in later parts of the processing. For the ints and longs a DataInputStream makes them available to you, so you can consume them and as long as you do not hold on to too many of them in memory you are good.

To read the compressed versions you use the read function. To read the usable, non compressed versions, you use the readData function which hands over to you a DataInputStream.

5 Here is one I made earlier

There is no need for you to program all that, you can view, review, inspect, poke use, abuse and extend an implementation of this, it is over at github. It ships with a test class, for my own sanity and so that utilisation is documented. And all is blended with an old-school Makefile, just because.

Enjoy and let me know in the comments if you run into any problems or have a better solution.

Author: Juan Amiguet

Created: 2015-03-07 Sat 10:09

Emacs 24.3.1 (Org mode 8.2.4)