Wednesday, August 4, 2010

Word Count using Plume

Plume is working for toy programs!

You can get the source code at http://github.com/tdunning/Plume

Here is a quick description of how to code up the perennial map-reduce demo program for counting words.  The idea is that we have lines of text that we have to tokenize and then count the words.  This example is to be found in the class WordCountTest in Plume.

So we start with PCollection lines for input.  For each line, we split the line
into words and emit them as a separate record:

    
  PCollection words = lines
    .map(new DoFn() {
      @Override
      public void process(String x, EmitFn emitter) {
        for (String word : onNonWordChar.split(x)) {
          emitter.emit(word);
        }
      }
    }, collectionOf(strings()));


Then we emit each word as a key for a PTable with a count of 1.  This is just the same as most word-count implementations except that we have separated the tokenization from the emission of the original counts.  We could have put them together into a single map operation, but the optimizer will do that for us (when it exists) so keeping the functions modular is probably better.


  PTable wc = words
    .map(new DoFn>() {
      @Override
      public void process(String x, 
                         EmitFn> emitter) {
         emitter.emit(Pair.create(x, 1));
      }
    }, tableOf(strings(), integers()))


Then we group by word


  .groupByKey()


And do the counting.  Note how we don't have to worry about the details of using a combiner or a reducer.


  .combine(new CombinerFn() {
     @Override
     public Integer combine(Iterable counts) {
       int sum = 0;
       for (Integer k : counts) {
         sum += k;
       }
       return sum;
     }
   });

In all, it takes 27 lines to implement a slightly more general word-count than the one in the Hadoop tutorial.  If we were compare apples to apples, this could would probably be a few lines shorter.  The original word-count demo was 210 lines to do the same thing.

5 comments:

John said...

Hi Ted,
Read your blog about the need for a Groovy 'hello world' example for mapReduce, so I coded up a little example (inspired by Ken Barclay) which counts the words in a dictionary, partitioning the count by dictionary entry. No threads or Hadoop involved, just mapping the computation by entry and then reducing the entry counts to a single number.
Hello world. 27 lines.

Best regards,
John Day
Palm Bay, FLorida


/*-------------- 'mapReduce' as a Groovy closure --------------*/
def mapReduce(data, mapFunction, reduceFunction)
{
mappedData = data.inject([:], mapFunction)
mappedData.inject(0, reduceFunction)
}
/*---------- Create 'key-valued' data (a dictionary) ----------*/
def dictionary =
[
abacus: "a device for making arithmetic calculations",
arc: "any unbroken part of the circumference of a circle",
beaver: "a large, amphibious rodent of the genus Castor"
]
/*--- map: partition problem into separate counting problems --*/
def mapWordCounts =
{map, entry ->
def count = 0
entry.value.split(' ').each{count++}
map.put(entry.key, count)
map
}

/*---------- reduce: combine sub-counts into one result -------*/
def reduceWordCounts= {sum, entry -> sum + entry.value }

/*===== Launch mapReduce on counting words in dictionary ======*/
println mapReduce(dictionary, mapWordCounts, reduceWordCounts)

Ted Dunning ... apparently Bayesian said...

John,

This is nice, but the point of my interest in a hello world program for groovy was to be able to write map-reduce programs that actually run on Hadoop in a very simple way.

There would be extra points if the same program could be run on a variety of execution models without change.

Simulating map/reduce is nice for illustrating what map-reduce is, but it doesn't solve the need for writing simple programs in simple form.

The FlumeJava paper that I cited is exactly what I am talking about. The very interesting thing about FlumeJava is that it is actually easier to implement this in Java than it is to implement in a more flexible language like Groovy.

John said...

Ted,
I'm a novice at Hadoop/Mahout, still struggling with some of the basic concepts. That's why I wrote the word counter, just to start getting my head around the Map-Reduce idea (also trying to learn a little more about Groovy).

So, I didn't understand most of the Chambers paper. But if I understand correctly, FlumeJava will provide some parallelized container classes which will transparently perform the map-reduce under the hood.

Has the FlumeJava been released under a free license? I couldn't find it. In the meantime I'll look at your Plume code and see if I can I can learn from it.

Thanks,
John

Ted Dunning ... apparently Bayesian said...

John,

Your code is great as a didactic exercise.

Yes, FlumeJava automagically does the map-reduce stuff under the hood and no it hasn't been released in any form other than the article (that I know of).

Plume is intended to fill that gap by being a nearly identical clone with multiple execution models.

Right now, Plume won't be very understandable since it is mostly just a bunch of code. There are some examples of how to use it, however, in the test cases.

John said...

Thanks, I will go off and study the Plume test cases. BTW, I rewrote some lines in my code, added another 'inject', so now it's officially 25 lines :-]

/*------------- 'mapReduce' as Groovy closures --------------*/
def mapReduce=
{data, mapFunction, reduceFunction ->
mappedData = data.inject([:], mapFunction)
mappedData.inject(0, reduceFunction)
}
/*--------- Create 'key-valued' data (a dictionary) ----------*/
def dictionary =
[
abacus: "a device for making arithmetic calculations",
arc: "any unbroken part of the circumference of a circle",
beaver: "a large, amphibious rodent of the genus Castor"
]
/*-- map: partition problem into separate counting problems --*/
def mapWordCounts =
{map, entry ->
count=entry.value.split(' ').inject(0){i,j->i+1}
println "entry: ${entry.key} count=$count"
map.put(entry.key, count)
return map
}
/*--------- reduce: combine sub-counts into one result -------*/
def reduceWordCounts= {sum, entry -> sum + entry.value }
/*==== Launch mapReduce on counting words in dictionary ======*/
println mapReduce(dictionary, mapWordCounts, reduceWordCounts)