-
Notifications
You must be signed in to change notification settings - Fork 266
Frequently asked questions
If your job has dependencies that clash with Hadoop's, Hadoop can replace your version of a library (like log4j or ASM) with its own native version. You can fix this with an environment flag that makes sure that your jars show up on the classpath before Hadoop's. Set these environment variables:
export HADOOP_CLASSPATH=<your_jar_file>
export HADOOP_USER_CLASSPATH_FIRST=true
A. For a better stacktrace than the usual opaque dump, try submitting your job again with the extendedDebugInfo
flag set:
export HADOOP_OPTS="-Dsun.io.serialization.extendedDebugInfo=true"; hadoop <your-commands>
You should see a much larger stacktrace, with many entries like this:
- field (class "com.twitter.scalding.MapsideReduce", name: "commutativeSemigroup", type: "interface com.twitter.algebird.Semigroup")
- object (class "com.twitter.scalding.MapsideReduce", MapsideReduce[decl:'key', 'value'])
- field (class "cascading.pipe.Operator", name: "operation", type: "interface cascading.operation.Operation")
- object (class "cascading.pipe.Each", Each(_pipe_2*_pipe_3)[MapsideReduce[decl:'key', 'value']])
- field (class "org.jgrapht.graph.IntrusiveEdge", name: "target", type: "class java.lang.Object")
- object (class "org.jgrapht.graph.IntrusiveEdge", org.jgrapht.graph.IntrusiveEdge@6ed95e60)
- custom writeObject data (class "java.util.HashMap")
- object (class "java.util.LinkedHashMap", {[{?}:UNKNOWN]
[{?}:UNKNOWN]=org.jgrapht.graph.IntrusiveEdge@6ce4ece3, [{2}:0:1]
Typically, if you start reading from the bottom of these entries upward, the first familiar class you see will be the object that's being unexpectedly serialized and causing you issues. In this case, the error was with Scalding's MapsideReduce
class.
A. This requires a brief explanation of how Summingbird serializes different types in the background.
Summingbird uses Injections and Bijections (part of com.twitter.bijection package) to convert one type into another. Generally, keys and values are converted to Thrift objects, serialized into Array[Byte]
and upon reception by another node, deserialized back into the Thrift object and finally converted to their original type. Injections and Bijections are used for this purpose. Injections provide one-way conversion from one type to another with the option of reversing the conversion. Bijections are two-way conversions between two types.
Consider the case where we have (String, DecayedValue)
as our key-value pair. Each of these types require Injections or Bijections to convert to/from Thrift objects and then to/from Array[Byte]
. Therefore, we need to define the following implicit values:
implicit val stringThriftBijection: Injection[String, TsmObject] = { Injection definition }
implicit val decayedValueThriftBijeciton: Bijection[DecayedValue, TsmObject] = { Injection definition }
implicit val tsmObjectCodec: Injection[TsmObject, Array[Byte]] = { Injection definition }
Where TsmObject is a Thrift object.
When these injections are defined, they need to be connected such that, for instance, a String-to-TsmObject conversion is followed by a TsmObject-to-Array[Byte]
conversion. You can think of this as a graph where nodes are types and edges are Injections or Bijections between types. The multi-hop connection between nodes is performed using Injection.connect[Type A, Type B, Type C]
. For instance, for DecayedValue:
implicit def dvToBytes: Injection[DecayedValue, Array[Byte]] = Injection.connect[DecayedValue, TsmObject, Array[Byte]]
Note that all Injections/Bijections are defined as implicit so they may be performed when needed.
What that means is that it is keyed, but the value is (Option[V], V)
where the first item in that tuple is the value the store had just before an aggregation, and the V is the change added most recently.
These are weak semantics: we do not promise to deliver an output for every input to the sumByKey. Instead, the platform may batch up some items and send "steps".
So, if you have:
(k, v1), (k, v2), (k, v3)
into an empty store the output will be one of:
(k (None, v1 + v2 + v3))
(k, (None, v1 + v2)), (k, (Some(v1 + v2), v3))
(k, (None, v1)), (k, (Some(v1), v2 + v3))
(k, (None, v1)), (k, (Some(v1), v2)), (k, (Some(v1+v2), v3))
Which one exactly is up to details about caching and batch sizes. Currently, for batched storm + scalding, the aggregation never crosses the batch boundary.