Skip to content

Commit

Permalink
Merge pull request apache#533 from andrewor14/master. Closes apache#533.
Browse files Browse the repository at this point in the history
External spilling - generalize batching logic

The existing implementation consists of a hack for Kryo specifically and only works for LZF compression. Introducing an intermediate batch-level stream takes care of pre-fetching and other arbitrary behavior of higher level streams in a more general way.

Author: Andrew Or <[email protected]>

== Merge branch commits ==

commit 3ddeb7e
Author: Andrew Or <[email protected]>
Date:   Wed Feb 5 12:09:32 2014 -0800

    Also privatize fields

commit 090544a
Author: Andrew Or <[email protected]>
Date:   Wed Feb 5 10:58:23 2014 -0800

    Privatize methods

commit 13920c9
Author: Andrew Or <[email protected]>
Date:   Tue Feb 4 16:34:15 2014 -0800

    Update docs

commit bd5a1d7
Author: Andrew Or <[email protected]>
Date:   Tue Feb 4 13:44:24 2014 -0800

    Typo: phyiscal -> physical

commit 287ef44
Author: Andrew Or <[email protected]>
Date:   Tue Feb 4 13:38:32 2014 -0800

    Avoid reading the entire batch into memory; also simplify streaming logic

    Additionally, address formatting comments.

commit 3df7005
Merge: a531d2e 164489d
Author: Andrew Or <[email protected]>
Date:   Mon Feb 3 18:27:49 2014 -0800

    Merge branch 'master' of github.com:andrewor14/incubator-spark

commit a531d2e
Author: Andrew Or <[email protected]>
Date:   Mon Feb 3 18:18:04 2014 -0800

    Relax assumptions on compressors and serializers when batching

    This commit introduces an intermediate layer of an input stream on the batch level.
    This guards against interference from higher level streams (i.e. compression and
    deserialization streams), especially pre-fetching, without specifically targeting
    particular libraries (Kryo) and forcing shuffle spill compression to use LZF.

commit 164489d
Author: Andrew Or <[email protected]>
Date:   Mon Feb 3 18:18:04 2014 -0800

    Relax assumptions on compressors and serializers when batching

    This commit introduces an intermediate layer of an input stream on the batch level.
    This guards against interference from higher level streams (i.e. compression and
    deserialization streams), especially pre-fetching, without specifically targeting
    particular libraries (Kryo) and forcing shuffle spill compression to use LZF.
(cherry picked from commit 1896c6e)

Signed-off-by: Patrick Wendell <[email protected]>
  • Loading branch information
andrewor14 authored and James Z.M. Gao committed Mar 10, 2014
1 parent 8f302e0 commit 4deeb95
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
* Cumulative time spent performing blocking writes, in ns.
*/
def timeWriting(): Long

/**
* Number of bytes written so far
*/
def bytesWritten: Long
}

/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
Expand Down Expand Up @@ -183,7 +188,8 @@ private[spark] class DiskBlockObjectWriter(
// Only valid if called after close()
override def timeWriting() = _timeWriting

def bytesWritten: Long = {
// Only valid if called after commit()
override def bytesWritten: Long = {
lastValidPosition - initialPosition
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
addShutdownHook()

/**
* Returns the phyiscal file segment in which the given BlockId is located.
* Returns the physical file segment in which the given BlockId is located.
* If the BlockId has been mapped to a specific FileSegment, that will be returned.
* Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import com.google.common.io.ByteStreams

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.io.LZFCompressionCodec
import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BlockManager}

/**
* An append-only map that spills sorted content to disk when there is insufficient space for it
Expand Down Expand Up @@ -84,12 +84,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000

// Size of object batches when reading/writing from serializers. Objects are written in
// batches, with each batch using its own serialization stream. This cuts down on the size
// of reference-tracking maps constructed when deserializing a stream.
//
// NOTE: Setting this too low can cause excess copying when serializing, since some serializers
// grow internal data structures by growing + copying every time the number of objects doubles.
/**
* Size of object batches when reading/writing from serializers.
*
* Objects are written in batches, with each batch using its own serialization stream. This
* cuts down on the size of reference-tracking maps constructed when deserializing a stream.
*
* NOTE: Setting this too low can cause excessive copying when serializing, since some serializers
* grow internal data structures by growing + copying every time the number of objects doubles.
*/
private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000)

// How many times we have spilled so far
Expand All @@ -100,7 +103,6 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var _diskBytesSpilled = 0L

private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
private val comparator = new KCComparator[K, C]
private val ser = serializer.newInstance()

Expand Down Expand Up @@ -153,37 +155,21 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
var objectsWritten = 0

/* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
* closes and re-opens serialization and compression streams within each file. This makes some
* assumptions about the way that serialization and compression streams work, specifically:
*
* 1) The serializer input streams do not pre-fetch data from the underlying stream.
*
* 2) Several compression streams can be opened, written to, and flushed on the write path
* while only one compression input stream is created on the read path
*
* In practice (1) is only true for Java, so we add a special fix below to make it work for
* Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
*
* To avoid making these assumptions we should create an intermediate stream that batches
* objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
* This is a bit tricky because, within each segment, you'd need to track the total number
* of bytes written and then re-wind and write it at the beginning of the segment. This will
* most likely require using the file channel API.
*/
// List of batch sizes (bytes) in the order they are written to disk
val batchSizes = new ArrayBuffer[Long]

val shouldCompress = blockManager.shouldCompress(blockId)
val compressionCodec = new LZFCompressionCodec(sparkConf)
def wrapForCompression(outputStream: OutputStream) = {
if (shouldCompress) compressionCodec.compressedOutputStream(outputStream) else outputStream
// Flush the disk writer's contents to disk, and update relevant variables
def flush() = {
writer.commit()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
objectsWritten = 0
}

def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
wrapForCompression, syncWrites)

var writer = getNewWriter
var objectsWritten = 0
try {
val it = currentMap.destructiveSortedIterator(comparator)
while (it.hasNext) {
Expand All @@ -192,22 +178,21 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
objectsWritten += 1

if (objectsWritten == serializerBatchSize) {
writer.commit()
flush()
writer.close()
_diskBytesSpilled += writer.bytesWritten
writer = getNewWriter
objectsWritten = 0
writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
}
}

if (objectsWritten > 0) writer.commit()
if (objectsWritten > 0) {
flush()
}
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
writer.close()
_diskBytesSpilled += writer.bytesWritten
}

currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskMapIterator(file, blockId))
spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))

// Reset the amount of shuffle memory used by this map in the global pool
val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
Expand Down Expand Up @@ -239,24 +224,25 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private class ExternalIterator extends Iterator[(K, C)] {

// A fixed-size queue that maintains a buffer for each stream we are currently merging
val mergeHeap = new mutable.PriorityQueue[StreamBuffer]
private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]

// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
val sortedMap = currentMap.destructiveSortedIterator(comparator)
val inputStreams = Seq(sortedMap) ++ spilledMaps
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
private val inputStreams = Seq(sortedMap) ++ spilledMaps

inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
mergeHeap.enqueue(StreamBuffer(it, kcPairs))
}

/**
* Fetch from the given iterator until a key of different hash is retrieved. In the
* event of key hash collisions, this ensures no pairs are hidden from being merged.
* Fetch from the given iterator until a key of different hash is retrieved.
*
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/
def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
var kc = it.next()
Expand All @@ -274,7 +260,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
* If the given buffer contains a value for the given key, merge that value into
* baseCombiner and remove the corresponding (K, C) pair from the buffer
*/
def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
private def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
var i = 0
while (i < buffer.pairs.size) {
val (k, c) = buffer.pairs(i)
Expand All @@ -293,7 +279,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
override def hasNext: Boolean = mergeHeap.exists(!_.pairs.isEmpty)

/**
* Select a key with the minimum hash, then combine all values with the same key from all input streams.
* Select a key with the minimum hash, then combine all values with the same key from all
* input streams
*/
override def next(): (K, C) = {
// Select a key from the StreamBuffer that holds the lowest key hash
Expand Down Expand Up @@ -333,7 +320,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {

def minKeyHash: Int = {
Expand All @@ -355,51 +342,53 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
/**
* An iterator that returns (K, C) pairs in sorted order from an on-disk map
*/
private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)

val shouldCompress = blockManager.shouldCompress(blockId)
val compressionCodec = new LZFCompressionCodec(sparkConf)
val compressedStream =
if (shouldCompress) {
compressionCodec.compressedInputStream(bufferedStream)
private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
extends Iterator[(K, C)] {
private val fileStream = new FileInputStream(file)
private val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var batchStream = nextBatchStream()
private var compressedStream = blockManager.wrapForCompression(blockId, batchStream)
private var deserializeStream = ser.deserializeStream(compressedStream)
private var nextItem: (K, C) = null
private var objectsRead = 0

/**
* Construct a stream that reads only from the next batch
*/
private def nextBatchStream(): InputStream = {
if (batchSizes.length > 0) {
ByteStreams.limit(bufferedStream, batchSizes.remove(0))
} else {
// No more batches left
bufferedStream
}
var deserializeStream = ser.deserializeStream(compressedStream)
var objectsRead = 0

var nextItem: (K, C) = null
var eof = false

def readNextItem(): (K, C) = {
if (!eof) {
try {
if (objectsRead == serializerBatchSize) {
val newInputStream = deserializeStream match {
case stream: KryoDeserializationStream =>
// Kryo's serializer stores an internal buffer that pre-fetches from the underlying
// stream. We need to capture this buffer and feed it to the new serialization
// stream so that the bytes are not lost.
val kryoInput = stream.input
val remainingBytes = kryoInput.limit() - kryoInput.position()
val extraBuf = kryoInput.readBytes(remainingBytes)
new SequenceInputStream(new ByteArrayInputStream(extraBuf), compressedStream)
case _ => compressedStream
}
deserializeStream = ser.deserializeStream(newInputStream)
objectsRead = 0
}
objectsRead += 1
return deserializeStream.readObject().asInstanceOf[(K, C)]
} catch {
case e: EOFException =>
eof = true
cleanup()
}

/**
* Return the next (K, C) pair from the deserialization stream.
*
* If the current batch is drained, construct a stream for the next batch and read from it.
* If no more pairs are left, return null.
*/
private def readNextItem(): (K, C) = {
try {
val item = deserializeStream.readObject().asInstanceOf[(K, C)]
objectsRead += 1
if (objectsRead == serializerBatchSize) {
batchStream = nextBatchStream()
compressedStream = blockManager.wrapForCompression(blockId, batchStream)
deserializeStream = ser.deserializeStream(compressedStream)
objectsRead = 0
}
item
} catch {
case e: EOFException =>
cleanup()
null
}
null
}

override def hasNext: Boolean = {
Expand All @@ -419,7 +408,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
}

// TODO: Ensure this gets called even if the iterator isn't drained.
def cleanup() {
private def cleanup() {
deserializeStream.close()
file.delete()
}
Expand Down
4 changes: 1 addition & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.shuffle.spill.compress</td>
<td>true</td>
<td>
Whether to compress data spilled during shuffles. If enabled, spill compression
always uses the `org.apache.spark.io.LZFCompressionCodec` codec,
regardless of the value of `spark.io.compression.codec`.
Whether to compress data spilled during shuffles.
</td>
</tr>
<tr>
Expand Down

0 comments on commit 4deeb95

Please sign in to comment.