Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

The (deprecated) builder api

Sam Ritchie edited this page Aug 27, 2013 · 4 revisions

This section describes Summingbird’s deprecated builder API. This API has been superceded by The Producer API.

Configuration Options

Summingbird provides the following options to help you tune the behavior of your job. All of these options are specified by passing an instance of the described case class to the set function on your job’s builder:

source.set(FlatMapShards(10)).flatMap { t => .... }

set returns the builder with the new option applied, so you can keep mapping and flatMapping ‘til the cows come home.

Scalding Options

The following options tune the behavior of the Scalding job generated by your Summingbird job class.

MonoidIsCommutative

By default, Summingbird assumes that your monoid is non-commutative. Use the MonoidIsCommutative option to tell your job that the monoid is, in fact, commutative:

source.groupAndSumTo(compoundStore)
  .set(MonoidIsCommutative(true))

This setting helps Summingbird apply a number of helpful optimizations to its Scalding job, including aggressive map-side aggregation. Many monoids are commutative, but for correctness in the cases where this is not true it’s important that this option default to false.

FlatMapShards

Some datasets contain remarkable skew in Scalding mode. If certain input files are much larger than others (because you just received a massive burst of information during peak hours, say), the mappers assigned to these files will choke. Summingbird addresses this with the FlatMapShards option, which introduces a shuffle. The following job will use 10 reducers in an extra pre-processing MapReduce step designed to shuffle the data up and mitigate the effects of data skew:

source.set(FlatMapShards(10))

You won’t use this often, but when you need it, it’s critical.

Storm Options

The following configuration options tune the Storm topology generated by your Summingbird AbstractJob extension.

FlatMapParallelism

FlatMapParallelism sets the number of executors that you want Storm to assign to the flatMap phase of your job. Use it by inserting a set call after your flatMap function:

import com.twitter.summingbird.builder._

source.flatMap { t => .... }
  .set(FlatMapParallelism(20)) // Storm will use 20 executors

SinkParallelism

SinkParallelism is similar, but controls the parallelism of the final sinking phase. Jack this up if you need more parallelism on your onlineStore writers.

source.groupAndSumTo(compoundStore)
  .set(SinkParallelism(20)) // Storm will use 20 executors

CacheSize

The CacheSize option controls Summingbird’s map-side aggregation. If you insert a CacheSize(n) option after the flatMap phase, Summingbird will buffer n key-value pairs on each flatMap bolt executor before it does an emit. For example:

source.flatMap { t => ... }
  .set(FlatMapParallelism(20))
  .set(CacheSize(100))

In this scenario, each of the 20 executors will buffer 100 key-value pairs. When each executor hits 100, it’ll sum all pairs up by key and emit the aggregated values. This is Summingbird’s realtime version of map-side aggregation, and can be really useful when your key space isn’t that large, or when you have huge skew in your data (since the skewed key will be pre-aggregated instead of kicking out large numbers of values).

If you insert a CacheSize option after the groupAndSumTo, Summingbird will do the same sort of buffering in the SinkBolt on its commit out to the underlying onlineStore. Same exact idea as the flatMap cache size, just another knob that you get to control.

Other Options

All other Storm and Hadoop options can be set by overriding the transformConfig method in your job class. The following example configures Storm to use 60 workers total, and 60 executors for its acking processes:

import backtype.storm.Config
import com.twitter.bijection.Conversion.asMethod
import java.lang.{ Integer => JInt }

class MyJob(env: Env) extends AbstractJob(env) {
  override def transformConfig(m: Map[String,AnyRef]): Map[String,AnyRef] = {
    val basic = super.transformConfig(m)
    basic ++ Map(
      Config.TOPOLOGY_WORKERS -> 60.as[JInt],
      Config.TOPOLOGY_ACKER_EXECUTORS -> 60.as[JInt]
    )
  }

  // Continue the job below as usual
}

Be sure to call super.transformConfig when extending a class other than AbstractJob.