Skip to content
Neale Swinnerton edited this page Jun 18, 2013 · 17 revisions

Cascading for the Impatient, Part 4

In our third installment of this series we showed how to write a custom operation for a Cascalog application. If you haven’t read that yet, it’s probably best to start there.

Today's lesson takes that same Word Count app and expands on it to implement a stop words filter, which is a list of tokens to nix from the stream. We’ll show how to use a left-join, so we can perform that filtering at scale. Again, this code is leading toward an implementation of TF-IDF in Cascalog. We’ll show best practices for workflow orchestration and test-driven development (TDD) at scale.

Theory

The first question to consider is, why do we want to use a stop words list? After all, the TF-IDF algorithm is supposed to filter out the less significant words anyway. Why would we need to include additional filtering if the TF-IDF is implemented correctly?

Use of a stop words list originated in work by Hans Peter Luhn at IBM Research, during the dawn of computing. The reasons for it are two-fold. On one hand, consider that the most common words in any given natural language are generally not useful for text analytics. For example in English, words such as “the”, “of”, “and” are probably not what you want to search, and probably not interesting for Word Count metrics. They represent the long tail of the token distribution: high frequency, low semantic value. Consequently, they cause the bulk of the processing. Natural languages tend to have on the order of 10^5 words, so the potential size of any stop words list is nicely bounded. Filtering those high-frequency words out of the token stream reduces the amount processing required later in the workflow, dramatically.

On the other hand, you may also want to remove some words explicitly from the token stream. This almost always comes up in practice, especially when working with public discussions such as social network comments. Think about it, what are some of the most common words posted online in comments? Words which are not the most common words in “polite” English? Based on the math for TF-IDF, those would tend to get ranked highest. Do you really want those words to bubble up to the “most significant” positions in your text analytics? In automated systems which leverage unsupervised learning, this can lead to highly embarrassing situations. Caveat machinator.

Next, let’s consider about working with a join in Cascalog. We will have generators, one for the original token stream and another for the stop words list. We want to filter all instances of tokens from the stop words list out of the token stream. If we weren’t working in MapReduce, a naive approach would simply load the stop words list into a hashtable, then iterate through our token stream to lookup each token in the hashtable and delete it if found. If we were coding in Hadoop directly, a less naive approach would be to put the stop words list into the distributed cache and have a job step which loads it during setup, then rinse/lather/repeat from the naive coding approach described above.

Instead we want to leverage the workflow orchestration in Cascalog. One might try to write a custom operation, as we did in Part 3. That sounds like extra work, plus also extra code to verify and maintain, when the built-in primitives will tend to be more efficient anyway.

Cascalog provides for joins on pipes, and conceptually a Left Outer Join would solve our requirement to filter stop words. Think of joining the token stream with the stop words list. When the result is non-null, the join has identified a stop word. Discard it.

Understand that there’s a big problem with using joins in MapReduce. Outside of the context of a relational database, arbitrary joins do not work efficiently. Suppose you have N items in one tuple stream and M items in another, and want to join them? In the general case, for an arbitrary join, that requires N x M operations and also introduces a data dependeny, such that the join cannot be performed in parallel. If both N and M are relatively large, say in the millions of tuples, then we’d end up processing 10^12 operations on a single processor — which kind of defeats the purpose, in terms of leveraging MapReduce.

A conceptual diagram for this implementation of Word Count in Cascading is shown as:

Conceptual Workflow Diagram

Source

Download source for this example on GitHub. A log for this example are listed in this gist. The input data stays the same as in the earlier code.

This example in Part 4 uses an Outer Join in Cascalog to implement a stop words list, then filtering some words out of the token stream prior to counting.

First let's source the stop words.

(hfs-delimited stop :skip-header? true)

Next we'll perform the actual outer join on the two sources simply by adding the stop words generator into our query and specifying ?word as its anchoring field. The anchoring field(s) is the field that is common in the two generators. In this case, as ?word is a common field name to both, the word tokens will be joined with stop tokens by ?word.

For this outer join, we want any token that exist in the words list but not in the stop list. Thus we make use of negation via a :> false filter at the stop word generator as such.

(?<- (hfs-delimited out)
     [?word ?count]
     ((hfs-delimited in :skip-header? true) _ ?line)
     (split ?line :> ?word-dirty)
     (scrub-text ?word-dirty :> ?word)
     ((hfs-delimited stop :skip-header? true) ?word :> false)
     (c/count ?count))

That's it for filtering tokens by a tuple of stop words. But let's do a bit more to clean the query up.

Seeing that the source taps are taking up a lot of space in the query, let's put the generators explicitly in a let.

(let [rain (hfs-delimited in :skip-header? true)
      stop (hfs-delimited stop :skip-header? true)]
    ...)

While we're at this, let's make use of a Predicate Macro to inline scrub-text.

((c/comp s/trim s/lower-case) ?word-dirty :> ?word))

Putting everything together, here's the query.

(let [rain (hfs-delimited in :skip-header? true)
      stop (hfs-delimited stop :skip-header? true)]
   (?<- (hfs-delimited out)
        [?word ?count]
        (rain _ ?line)
        (split ?line :> ?word-dirty)
        ((c/comp #'s/trim #'s/lower-case) ?word-dirty :> ?word)
        (stop ?word :> false)
        (c/count ?count)))

Modify the main method to make those changes, then build a JAR file. You should be good to go. For those keeping score, the resulting physical plan in MapReduce for Part 4 still uses one mapper and one reducer.

Here we have annotated the Cascading flow diagram to show where the mapper and reducer phases are running, and also the section which was added since Part 3:

Word Count Flow Diagram

Build

To build the sample app from the command line use:

lein uberjar 

To view the results: What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org

Run

Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:

$ hadoop version
Hadoop 1.0.3

Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you're running in standalone mode) and run the app:

rm -rf output
hadoop jar ./target/impatient.jar data/rain.txt output/wc data/en.stop

Output text gets stored in the partition file output/wc which you can then verify:

more output/wc/part-00000

Here’s a log file from our run of the sample app, part 4. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascalog-user email forum.

Stay tuned for the next installments of our Cascalog for the Impatient series.

Clone this wiki locally