Skip to content

Rosetta Code

echen edited this page Apr 5, 2012 · 27 revisions

A collection of MapReduce tasks translated (from Pig, Hive, MapReduce streaming, etc.) into Scalding. For fully runnable code, see the repository here.

Word Count

Pig

tweets = LOAD 'tweets.tsv' AS (text:chararray);
words = FOREACH tweets GENERATE FLATTEN(TOKENIZE(text)) AS word;
word_groups = GROUP words BY word;
word_counts = FOREACH word_groups GENERATE group AS word, COUNT(words) AS count;

STORE word_counts INTO 'word_counts.tsv';

Scalding

Tsv("tweets.tsv", 'text)
  .flatMap('text -> 'word) { text : String => text.split("\\s+") }
  .groupBy('word) { _.size }
  .write(Tsv("word_counts.tsv"))

Distributed Grep

The map function emits a line if it matches a given pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output. (Example taken from Google Code University.)

Pig

%declare PATTERN '.*hello.*';

tweets = LOAD 'tweets.tsv' AS (text:chararray);
results = FILTER tweets BY (text MATCHES '$PATTERN');

Scalding

val Pattern = ".*hello.*";

Tsv("tweets.tsv", 'text)
  .filter('text) { text : String => text.matches(Pattern) }

Inverted Index

The map function parses each document, and emits a sequence of <word, document ID> pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a <word, list(document ID)> pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions. (Example taken from Google Code University.)

Pig

tweets = LOAD 'tweets.tsv' AS (tweet_id:int, text:chararray);

words = FOREACH tweets GENERATE tweet_id, FLATTEN(TOKENIZE(text)) AS word;
word_groups = GROUP words BY word;
inverted_index = FOREACH word_groups GENERATE group AS word, words.tweet_id;

Scalding

val tweets = Tsv("tweets.tsv", ('id, 'text))

val wordToTweets =
  tweets
    .flatMap(('id, 'text) -> ('word, 'tweetId)) { 
      fields : (Long, String) => 
      val (tweetId, text) = fields
      text.split("\\s+").map { word => (word, tweetId) }
    }

val invertedIndex =
  wordToTweets.groupBy('word) {  _.toList[Long]('tweetId -> 'tweetIds) }

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally