-
Notifications
You must be signed in to change notification settings - Fork 1
2.6 The Java MapReduce Library
This page describes the major classes in the MapReduce library.
- The Input class
- The Mapping classes
- The Reducer class
- The Output class
- The Marshaller class
- The Counter Class
- Size limits
When you define a job, you specify an Input class that reads a particular type of input and returns records to the map stage. The MapReduce library contains several Input
classes that handle data from different types of sources.
Each type of Input
class is paired with a corresponding subclass of InputReader. The MapReduce library calls the Input
class method createReaders()
, which determines how many map shards are needed (the shard count) and creates an instance of InputReader
for each shard. Each reader is initialized to process the non-overlapping subset of data from its shard. An Input
class may explicitly set the shard count, or it can compute the count by analyzing the input data.
The available Input
classes can be found in the MapReduce library's inputs package. Some of the most often-used classes are:
- BlobstoreInput - Reads an object from your app’s blobstore, separating the input into records (byte arrays) that are delineated by a separator, which is a single byte. You declare the shard count, and the input is divided into equal-sized shards. A byte array is passed to the Mapper. (A similar class, GoogleCloudStorageLineInput shards files in Google Cloud Storage on separator boundaries.)
- DatastoreInput - Uses a datastore query to select entities from your app's datastore for processing. The shard count is given by the user. An Entity object is passed to the Mapper.
- ConsecutiveLongInput - Generates a specified number of consecutive long numbers with each shard using a different starting offset to prevent duplicates. A Long object is passed to the Mapper. This is useful in testing applications.
- RandomLongInput - Random number generator, generates a specified number of random long values across the specified number of shards. This is useful in testing applications. By specifying the random seed, the job can be reproduced to help debugging. A Long object is passed to the Mapper.
If the existing Input
classes don't serve your purpose, you can write your own, along with an associated InputReader
class, to read and parse other types of input.
The Input
class must define the createReaders()
method:
List<? extends InputReader<I>> createReaders() throws IOException;
This method determines how many map shards are needed and creates and initializes an instance of InputReader
for each shard. The number of readers may be explicitly set by the user, or the method could analyze the input and determine how many readers are needed to partition the data efficiently.
The InputReader
class implements the Serializable interface. You should be sure your implementation is serializable. The class must implement the next()
method:
public I next() throws IOException, NoSuchElementException;
This method is similar to an Iterator interface. next
() is called repeatedly. It returns a new record each time until the data source is exhausted, in which case it throws NoSuchElementException
.
InputReader
may also override these methods, see the class reference for more details:
public Double getProgress();
public void beginSlice() throws IOException;
public void endSlice() throws IOException;
public void beginShard() throws IOException;
public void endShard() throws IOException;
public long estimateMemoryRequirement();
To see an example of an Input
implementation, take a look at the source code for ConsecutiveLongInput. This file defines the ConsecutiveLongInput
class and its InputReader
class, Reader
, which is nested.
There are two different classes for defining mappers for the two types of jobs.
The map stage of a MapReduce job must implement the Mapper class, which defines the map()
method:
public abstract void map(I value);
The library calls map()
once for each record produced by an InputReader
. The method can call [Mapper.emit()
](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/java/src/main/java/com/google/appengine/tools/mapreduce/Mapper.html#emit(K, V)) to emit zero or more key-value pairs. The size of each key-value pair must be less than 1MB:
protected void emit(K key, V value);
The map stage of a Map job must implement the MapOnlyMapper class, which defines the map()
method:
public abstract void map(I value);
The library calls map()
once for each record produced by an InputReader
. The method can call MapOnlyMapper.emit()
to emit zero or more values. You must be sure that the size of the emitted value is valid for the job's output type:
protected void emit(O output);
Note: you may use an instance of the Mapper
class in a Map job instead, provided its key type is Void
. You can then pass the Mapper instance to the MapSpecification.Builder.setMapper()
method.
A map()
method should be simple and small. If you need to do a lot of
work related to one input item, try re-thinking what the input should be. It is
usually possible to formulate a job so the library does the hard work.
Both types of mappers handle input data in zero or more shards; each shard is subdivided into zero or more slices. A mapper may be serialized and deserialized before processing its first shard (or first slice in a shard), after processing its last shard (or last slice in a shard), or between any consecutive shards (or slices). To make it easy to initialize, save, and restore state, a mapper has these methods, which can be overridden:
public void beginShard();
public void endShard();
public void beginSlice();
public void endSlice();
public long estimateMemoryRequirement();
Since a Mapper
may be serialized many times, avoid saving references to large objects or any data you can easily reconstruct in the beginSlice()
methods. If you use IOC frameworks to inject members into your mapper, be sure the frameworks work with MapReduce serialization. If this is not done properly, when the class is serialized between slices its members may not be preserved.
MapReduce jobs must provide an implementation of
Reducer class, which defines the reduce()
method:
public abstract void reduce(K key, ReducerInput<V> values);
The library calls reduce()
once for each unique key. The values
argument contains all the values produced by every Mapper
instance for the same key. The method evaluates all the values and generates the final output for the key. Note that the type of the values
(ReducerInput) is an Iterator. The reduce()
method calls Reducer.emit()
to emit the final output for the key.
protected void emit(K key, V value);
Like the Mapper
class, Reducer
handles its input in zero or more shards; each shard is subdivided into zero or more slices. A Reducer
may be serialized and deserialized before processing its first shard (or first slice in a shard), after processing its last shard (or last slice in a shard), or between any consecutive shards (or slices). To make it easy to initialize, save, and restore state, Reducer
has these methods, which can be overridden:
public void beginShard();
public void endShard();
public void beginSlice();
public void endSlice();
public long estimateMemoryRequirement();
As with map()
, the reduce()
method should also be simple and small. If you need to do a lot of work related to one output item, try re-thinking what the output should be. Let the library do the hard work.
Since a Reducer
may be serialized many times, avoid saving references to large objects or any data you can easily reconstruct in the beginSlice()
method. If you use IOC frameworks to inject members into your reducer, be sure the frameworks work with MapReduce serialization. If this is not done properly, when the class is serialized between slices its members may not be preserved.
The MapReduce library contains a few Reducer
classes; they can be found in the MapReduce library's reducers package.
When you define a job, you specify an Output class that takes the data emitted by the final stage of the job (the map stage of a Map job, or the reduce stage of a MapReduce job) and writes it to a specific type of destination.
The MapReduce library contains several Output
classes that write to different
types of output destinations. Each type of Output
class is paired with a
corresponding subclass of
OutputWriter. The MapReduce library calls the Output
class method createWriters(numShards)
with the number of shards requested in
the MapReduceSpecification
.
The available Output
classes can be found in the MapReduce library's outputs package. Some of the most often-used classes are:
- GoogleCloudStorageFileOutput - Writes the data emitted by reduce to a GCS file. There is no delimiter inserted between records. This allows the reducer to define its own format.
- BlobFileOutput - Produces a Blobstore file for each reducer shard. Each file name is generated using a given format string that includes a zero-based integer parameter that is replaced with the shard number. The user specifies the number of shards.
- GoogleCloudStorageLevelDbOutput - Writes the data emitted by reduce to a GCS file. Each item is written as a single record in LevelDb format. This is useful for chaining jobs.
- MarshallingOutput - This can be used in conjunction with either of the above. It allows the reducer to write any type of record for which a Marshaller can be provided, which greatly simplifies coding.
- InMemoryOutput - Used to return an arbitrary Java object in memory. This is normally used for testing.
- NoOutput - Useful for Map jobs that do not need to produce any output.
- DatastoreOutput - Writes Entities to datastore.
If the existing Output
classes don't serve your purpose, you can write your own, along with its associated OutputWriter
class.
The Output
class must define these methods:
List<? extends OutputWriter<O>> createWriters(int numShards);
R finish(Collection<? extends OutputWriter<O>> writers) throws IOException;
createWriters(numShards)
creates and initializes an instance of OutputWriter
for each shard.
The finish()
method returns the job's result (of type R
), or null. In many cases it returns a pointer to the data written by the Output
class.
The OutputWriter
class must define the method write()
, which writes a value to the output.
OutputWriter
may also override these methods, see the class reference for more details:
public void beginSlice() throws IOException;
public void endSlice() throws IOException;
public void beginShard() throws IOException;
public void endShard() throws IOException;
public long estimateMemoryRequirement();
public boolean allowSliceRetry();
To see an example of an Output
implementation, take a look at the source code for DatastoreOutput. This file defines the DatastoreOutput
class and the DatastoreOutputWriter
class, which is nested.
After mapping, each key-value goes into the shuffle stage and then into a Reducer. Between each stage the data is temporarily stored. This means the keys and values are serialized and deserialized by marshallers. You must provide the MapReduceSpecification
with a marshaller for keys and for values separately, and you may use the same Marshaller
for both.
The MapReduce library provides several marshallers for you to use. They are private classes. To use them you call a getXXXMarshaller()
method in the Marshallers
class:
-
StringMarshaller
- Takes Java strings and encodes them using utf-8. -
SerializationMarshaller
- this will handle any Java object that implements the Serializable interface -
LongMarshaller
- an optimized marshaller for long values. Unlike the SerializationMarshaller that uses Java’s built in serialization methods, the LongMarshaller will produce a more compact representation (since it knows every entry is a long). This marshaller does not accept null values. -
IntegerMarshaller
- Just like the LongMarshaller except for integers. -
ByteBufferMarshaller
- a marshaller for ByteBuffers.
If the SerializationMarshaller
is not efficient enough for your types you can write your own marshaller. It will need to implement the Marshaller
interface which requires two methods:
public ByteBuffer toBytes(T object);
public T fromBytes(ByteBuffer b) throws IOException;
You may find it convenient to use the Counter
class. A counter is an integer variable that is aggregated across multiple shards. Counters can be used to do statistical calculations. You acquire a counter by calling getCounter()
on the context from the map() or reduce() function. For example:
Counter c = getContext().getCounter("name");
This retrieves an existing counter or creates one if it doesn't already exist.
The total size of all the instances of Mapper
, InputReader
, OutputWriter
and Counter
s must be less than 1MB between slices. This is because these instances are serialized and saved to the datastore between slices.