Skip to content

Commit

Permalink
Add @mridulm's fixes to ExternalAppendOnlyMap for batch sizes
Browse files Browse the repository at this point in the history
All these changes are from @mridulm's work in apache#1609, but extracted here
to fix this specific issue. This particular set of changes is to make
sure that we read exactly the right range of bytes from each spill file
in EAOM: some serializers can write bytes after the last object (e.g.
the TC_RESET flag in Java serialization) and that would confuse the
previous code into reading it as part of the next batch. There are also
improvements to cleanup to make sure files are closed.
  • Loading branch information
mateiz committed Aug 1, 2014
1 parent 78f2af5 commit 9a78e4b
Showing 1 changed file with 77 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util.collection

import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
import java.io._
import java.util.Comparator

import scala.collection.BufferedIterator
Expand All @@ -28,7 +28,7 @@ import com.google.common.io.ByteStreams

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.Serializer
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager}
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator

Expand Down Expand Up @@ -199,13 +199,16 @@ class ExternalAppendOnlyMap[K, V, C](

// Flush the disk writer's contents to disk, and update relevant variables
def flush() = {
writer.commitAndClose()
val bytesWritten = writer.bytesWritten
val w = writer
writer = null
w.commitAndClose()
val bytesWritten = w.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
objectsWritten = 0
}

var success = false
try {
val it = currentMap.destructiveSortedIterator(keyComparator)
while (it.hasNext) {
Expand All @@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C](

if (objectsWritten == serializerBatchSize) {
flush()
writer.close()
writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
}
}
if (objectsWritten > 0) {
flush()
} else if (writer != null) {
val w = writer
writer = null
w.revertPartialWritesAndClose()
}
success = true
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
writer.close()
if (!success) {
// This code path only happens if an exception was thrown above before we set success;
// close our stuff and let the exception be thrown further
if (writer != null) {
writer.revertPartialWritesAndClose()
}
if (file.exists()) {
file.delete()
}
}
}

currentMap = new SizeTrackingAppendOnlyMap[K, C]
Expand Down Expand Up @@ -390,26 +405,49 @@ class ExternalAppendOnlyMap[K, V, C](
*/
private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
extends Iterator[(K, C)] {
private val fileStream = new FileInputStream(file)
private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1
assert(file.length() == batchOffsets(batchOffsets.length - 1))

private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var batchStream = nextBatchStream()
private var compressedStream = blockManager.wrapForCompression(blockId, batchStream)
private var deserializeStream = ser.deserializeStream(compressedStream)
private var deserializeStream = nextBatchStream()
private var nextItem: (K, C) = null
private var objectsRead = 0

/**
* Construct a stream that reads only from the next batch.
*/
private def nextBatchStream(): InputStream = {
if (batchSizes.length > 0) {
ByteStreams.limit(bufferedStream, batchSizes.remove(0))
private def nextBatchStream(): DeserializationStream = {
// Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
// we're still in a valid batch.
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileStream.close()
deserializeStream = null
fileStream = null
}

val start = batchOffsets(batchIndex)
fileStream = new FileInputStream(file)
fileStream.getChannel.position(start)
batchIndex += 1

val end = batchOffsets(batchIndex)

assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))

val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
ser.deserializeStream(compressedStream)
} else {
// No more batches left
bufferedStream
cleanup()
null
}
}

Expand All @@ -424,10 +462,8 @@ class ExternalAppendOnlyMap[K, V, C](
val item = deserializeStream.readObject().asInstanceOf[(K, C)]
objectsRead += 1
if (objectsRead == serializerBatchSize) {
batchStream = nextBatchStream()
compressedStream = blockManager.wrapForCompression(blockId, batchStream)
deserializeStream = ser.deserializeStream(compressedStream)
objectsRead = 0
deserializeStream = nextBatchStream()
}
item
} catch {
Expand All @@ -439,6 +475,9 @@ class ExternalAppendOnlyMap[K, V, C](

override def hasNext: Boolean = {
if (nextItem == null) {
if (deserializeStream == null) {
return false
}
nextItem = readNextItem()
}
nextItem != null
Expand All @@ -455,7 +494,25 @@ class ExternalAppendOnlyMap[K, V, C](

// TODO: Ensure this gets called even if the iterator isn't drained.
private def cleanup() {
deserializeStream.close()
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
val fs = fileStream
deserializeStream = null
fileStream = null

if (ds != null) {
try {
ds.close()
} catch {
case e: IOException =>
// Make sure we at least close the file handle
if (fs != null) {
try { fs.close() } catch { case e2: IOException => }
}
throw e
}
}

file.delete()
}
}
Expand Down

0 comments on commit 9a78e4b

Please sign in to comment.