Byzantine Reality

Searching for Byzantine failures in the world around us

App Engine, Meet MapReduce

Google’s AppEngine is a simple web framework that allows you to write web apps and run it on Google’s infrastructure, having it magically run fast on their hardware. And while that’s good and fine and all, what if you don’t want to write the kind of apps that this forces you into? Although I do agree that, as Jeff Atwood puts it, most applications these days are “web applications” (that is, it does something but the interface to the app is via the web), I’m talking about writing applications that exhibit very different behavior from your run-of-the-mill web app. What if I want to write compute-intensive applications and distribute that around? AppEngine, meet MapReduce.

Let’s start off with a word of caution: there’s two very good reasons AppEngine doesn’t let you write MapReduce programs:

  1. AppEngine is aimed at making traditional web application design simple, and since MapReduce does not fall into that realm, it is naturally left out.
  2. The original BigTable paper tells us that the Google team can use BigTable with MapReduce (page 3 of that paper), so by deploying AppEngine apps to Google, you may already be getting MapReduce under the hood on your app. Of course, since the Google implementation is proprietary / closed-source, there’s no way to know if they actually are doing it or not.

Then why mix AppEngine and MapReduce? I actually thing it’s quite simple. If we just consider the web framework as just a tool to generate a user interface around some other computation, then using AppEngine (the web framework) with MapReduce (the computation) is a natural enough fit, provided the hooks between the two are done in a relatively sane fashion.

With that said and done, let’s look at how to use AppEngine to do MapReduce tasks. The modified version of Google AppEngine can be found here, and if you’ve got Bazaar installed you can just run bzr branch lp:~cgb-cs/+junk/appengine-tasks-mr to get the latest version from Launchpad. You also should note that since this is the home SDK, you should definitely not expect high-performance results out of this, as it’s just something simple to test and play around with. The version that we’ll be putting into AppScale will hook up to Hadoop’s MapReduce and run in a distributed environment, however, and we will talk about how that operates once it’s all set up. It also doesn’t work on Windows (although it may work if you have Cygwin installed, but no promises there), and that’s due to how running MapReduce jobs is done (more on that later).

Today we’ll be looking at the sample application we’ve bundled with this modified version of AppEngine, aptly named “mapreduce”. It is an implementation of one of the NAS Parallel Benchmarks, aptly named Embarassingly Parallel (EP). As usual, Wikipedia has a good description of the benchmark:

Generate independent Gaussian random variates using the Marsaglia polar method.

Let’s boil it down for the programmer-layperson. They give you a pseudo-random number generator and with it, you generate a pair of numbers (floats) between zero and one. If they meet a simple test (x^2 + y^2 < 1, where x and y are the pair of random numbers), multiply them by a certain value and output them (they are Gaussian random variates). Simple enough. The reason it’s a NAS Parallel Benchmark is because of the large scale of numbers the benchmark requires you to generate (the “small” problem set requires 2^28 random numbers to be generated) and that this specific algorithm is extremely parallelizable.

The implementation is not that difficult to write up, but if you haven’t programmed with MapReduce in mind, it can take a little getting used to. MapReduce is certainly not a new topic here, but the way it’s implemented may make it a bit easier for you to understand if you don’t already know it. This method is the way we run MapReduce jobs under-the-hood for the non-distributed SDK and I actually got it from this great Hadoop how-to:

cat inputFile | ./mapper | sort | ./reducer > outputFile

Of course the distributed, Hadoop version works much differently than this, but this crude method works great for testing out your MapReduce code on smaller data sets. Basically it works as follows. For each line of the input file, the Mapper will perform some logic and output some number of lines of data in the form “key\tvalue”. The sort then puts together all the values for the same key, and the Reducer then reads all the values for each key and performs some computation based on that, outputting it into the final product. You only write the code that goes in the “some logic” and “some computation” parts and there you go! It’s not a whole lot in the serial version, but the parallel, distributed Hadoop version can achieve quite a speedup if you program in this fashion.

Now that you know how it works, let’s look at the MapReduce implementation of our Embarassingly Parallel benchmark. Although AppEngine is written in Python, our implementation as we showed before doesn’t care what language the Mapper and Reducer are in as long as they are executable. Therefore, we’ll try out a MapReduce in Ruby. Let’s begin with how the input file is generated. Although we’re testing on the serial version, we want to write our code with the parallel implementation in mind, so we’ll try this approach. The random number generator takes as input a value dictating which number in the sequence we want (so it’s really a random sequence and not a random number generator, since asking for number one in the sequence always returns the same value), so each line will have two values, the starting value in the sequence to generate and the ending value to generate. The idea then is that in the parallel version, each process will generate a decent number of random numbers. The Python code that does this input generation is as follows:

def genInput(self):
  power
= 10
  n
= 2 ** power
  bucket_size
= 2 ** (power / 2)

  vals
= range(1, n / bucket_size + 1)
  vals
= [i * bucket_size for i in vals]

  buckets
= ""
  index
= 0
 
for i in vals:
   
if index == 0:
      start
= 0
   
else:
      start
= vals[index - 1]

    this_range
= str(start+1) + "\t" + str(vals[index]) + "\n"
    buckets
+= this_range
    index
+= 1

 
return taskqueue.putMRInput(buckets)

We then use one of the three AppEngine-MapReduce hook methods, putMRInput, which will take the string containing the input file’s contents and write it to the file system (the parallel version writes to the Hadoop Distributed File System instead). It then returns the name of the file written to (since AppEngine normally doesn’t allow writing files), which we use later. With our input generated, let’s look at our Mapper:

#!/usr/local/bin/ruby -w
# Programmer: Chris Bunch
# mapper-ruby.rb: Solves part of the EP parallel benchmark via the
# MapReduce framework as follows:
# Input: Takes in ranges of k values to compute over STDIN.
# Output: list [l, X_k, Y_k]

A
= 5 ** 13
S
= 271828183
MIN_VAL
= 2 ** -46
MAX_VAL
= 2 ** 46

def generate_random(k)
  xk
= (A ** k) * S % MAX_VAL
  MIN_VAL
* xk
end

def ep(k)
  k
= Integer(k)

  xj
= generate_random(k)
  yj
= generate_random(k+1)

  t
= xj * xj + yj * yj

 
if t <= 1
    xk
= xj * Math.sqrt(-2 * Math.log(t) / t)
    yk
= yj * Math.sqrt(-2 * Math.log(t) / t)
    max
= [xk.abs, yk.abs].max
    l
= max.floor
    puts
"#{l}\t#{xk}\t#{yk}"
 
end
end

loop
{
  input
= STDIN.gets
 
break if input.nil?
  start
, fin = input.chomp.split
  start
= Integer(start)
  fin
= Integer(fin)
  current
= start
  loop
{
    ep
(current)
    current
= current + 2
   
break if current > fin
 
}
}

Once more there’s neither a lot of code nor a lot of complexity going on. Like I said earlier, it just finds the beginning and ending range, and generates the numbers in that range. If they pass the test, we multiply them by a number and output them as floor(num)\tnum. We use the floor of the number as the key (the first part) since the benchmark requires us to report how many numbers were in each bucket, since the multiplication will produce numbers between zero and ten and we want to know how they are distributed. Our Reducer function then does just that, and just reports that as well as the sum of the values in each bucket (also required by the benchmark). The Reducer is thus as follows:

#!/usr/local/bin/ruby -w
# Programmer: Chris Bunch
# reducer-ruby.rb: Solves part of the EP parallel benchmark via the
# MapReduce framework as follows:
# Input: list [l, X_k, Y_k]
# Output: [l, sum(X_k), sum(Y_k)]

current_l
= nil

x_count
= 0
y_count
= 0

sum_x
= 0.0
sum_y
= 0.0

loop
{
  input
= STDIN.gets
 
break if input.nil?
  l
, x, y = input.chomp.split
  l
= Integer(l)
  x
= Float(x)
  y
= Float(y)

  current_l
= l if current_l.nil?

 
if l != current_l
    puts
"l = #{current_l}, |x| = #{x_count}, |y| = #{y_count}"
    current_l
= l
    x_count
= 0
    y_count
= 0
 
end

  sum_x
= sum_x + x
  sum_y
= sum_y + y

  abs_x
= x.abs
  abs_y
= y.abs

 
if abs_x > abs_y
    x_count
= x_count + 1
 
else
    y_count
= y_count + 1
 
end
}

puts
"l = #{current_l}, |x| = #{x_count}, |y| = #{y_count}"

puts
"sum_x = #{sum_x}, sum_y = #{sum_y}"

You then can verify that the Mapper and Reducer are working with the bash we mentioned earlier, once you’ve run the initial Python code to generate the input file (note l is the bucket, floor(val)):

Nirvana@bahamut:~/google_appengine/demos/mapreduce$ cat input | ./mapper-ruby.rb
| sort | ./reducer-ruby.rb
l
= 0, |x| = 88, |y| = 103
l
= 1, |x| = 88, |y| = 87
l
= 2, |x| = 17, |y| = 7
l
= 3, |x| = 2, |y| = 5
sum_x
= 317.845887803116, sum_y = 303.618961136228

All that’s left then is to verify it works in AppEngine. The remaining two calls, runMRJob and getMROutput, are pretty straightforward now that you know how it all works. The former takes as parameters the paths to the input file, Mapper file, Reducer file, and where the output file should go. The latter simply reads the output file and returns its contents as a string (since reading the filesystem is also not normally allowed in AppEngine). The sample app shows it off simply enough, which runs the job, gets the output, formats it for reading on the web, and saves it in a variable we can grab from the view):

taskqueue.runMRJob(mapper, reducer, inputSource, outputSource)
resultStr
= taskqueue.getMROutput(outputSource)
resultStr
= resultStr.replace("\n", "
"
)
mrresult
= MRResult(key_name="baz", result=resultStr)
mrresult
.put()

And there you go! The distributed version will have the same semantics but hopefully this will provide an easy way to deploy AppEngine apps and write compute-intensive applications with MapReduce. If you want to run the MapReduce demo included, it’s as simple as running the following:

Clicking on the “Run MapReduce Job” button should yield the same output as the command earlier showed, so you should be good to go! Once the distributed implementation via Hadoop is done we’ll revisit this topic and see how the two approaches differ. Enjoy!