Skip to content

Commit

Permalink
[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to e…
Browse files Browse the repository at this point in the history
…nsure deleting the temp file

This PR adds a `finalize` method in DiskMapIterator to clean up the resources even if some exception happens during processing data.

Author: zsxwing <[email protected]>

Closes apache#4219 from zsxwing/SPARK-5423 and squashes the following commits:

d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file
  • Loading branch information
zsxwing authored and Ubuntu committed Feb 19, 2015
1 parent 38e624a commit 90095bf
Showing 1 changed file with 43 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null

@volatile private var closed = false

// A volatile variable to remember which DeserializationStream is using. Need to set it when we
// open a DeserializationStream. But we should use `deserializeStream` rather than
// `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
// reduce the performance. It must be volatile so that we can see its correct value in the
// `finalize` method, which could run in any thread.
@volatile private var deserializeStreamToBeClosed: DeserializationStream = null

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
Expand All @@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
// we're still in a valid batch.
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStreamToBeClosed = null
deserializeStream.close()
fileStream.close()
deserializeStream = null
Expand All @@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](

val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
ser.deserializeStream(compressedStream)
// Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
// close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
// during reading the (K, C) pairs.
deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
deserializeStreamToBeClosed
} else {
// No more batches left
cleanup()
Expand Down Expand Up @@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
item
}

// TODO: Ensure this gets called even if the iterator isn't drained.
private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileStream = null
ds.close()
file.delete()
// TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
// future, we need some mechanism to ensure this gets called once the resources are not used.
private def cleanup(): Unit = {
if (!closed) {
closed = true
batchIndex = batchOffsets.length // Prevent reading any other batch
fileStream = null
try {
val ds = deserializeStreamToBeClosed
deserializeStreamToBeClosed = null
deserializeStream = null
if (ds != null) {
ds.close()
}
} finally {
if (file.exists()) {
file.delete()
}
}
}
}

override def finalize(): Unit = {
try {
cleanup()
} finally {
super.finalize()
}
}
}

Expand Down

0 comments on commit 90095bf

Please sign in to comment.