diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index a7fa057ee05f7..34bc3124097bb 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -35,16 +35,15 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In /** * Calling reset to avoid memory leak: * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api - * But only call it every 10,000th time to avoid bloated serialization streams (when + * But only call it every 100th time to avoid bloated serialization streams (when * the stream 'resets' object class descriptions have to be re-written) */ def writeObject[T: ClassTag](t: T): SerializationStream = { objOut.writeObject(t) + counter += 1 if (counterReset > 0 && counter >= counterReset) { objOut.reset() counter = 0 - } else { - counter += 1 } this } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index cb67a1c039f20..5d10a1f84493c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.collection -import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException} +import java.io._ import java.util.Comparator import scala.collection.BufferedIterator @@ -28,7 +28,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.serializer.Serializer +import org.apache.spark.serializer.{DeserializationStream, Serializer} import org.apache.spark.storage.{BlockId, BlockManager} import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator @@ -199,13 +199,16 @@ class ExternalAppendOnlyMap[K, V, C]( // Flush the disk writer's contents to disk, and update relevant variables def flush() = { - writer.commitAndClose() - val bytesWritten = writer.bytesWritten + val w = writer + writer = null + w.commitAndClose() + val bytesWritten = w.bytesWritten batchSizes.append(bytesWritten) _diskBytesSpilled += bytesWritten objectsWritten = 0 } + var success = false try { val it = currentMap.destructiveSortedIterator(keyComparator) while (it.hasNext) { @@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C]( if (objectsWritten == serializerBatchSize) { flush() - writer.close() writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize) } } if (objectsWritten > 0) { flush() + } else if (writer != null) { + val w = writer + writer = null + w.revertPartialWritesAndClose() } + success = true } finally { - // Partial failures cannot be tolerated; do not revert partial writes - writer.close() + if (!success) { + // This code path only happens if an exception was thrown above before we set success; + // close our stuff and let the exception be thrown further + if (writer != null) { + writer.revertPartialWritesAndClose() + } + if (file.exists()) { + file.delete() + } + } } currentMap = new SizeTrackingAppendOnlyMap[K, C] @@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C]( * An iterator that returns (K, C) pairs in sorted order from an on-disk map */ private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) - extends Iterator[(K, C)] { - private val fileStream = new FileInputStream(file) - private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) + extends Iterator[(K, C)] + { + private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1 + assert(file.length() == batchOffsets(batchOffsets.length - 1)) + + private var batchIndex = 0 // Which batch we're in + private var fileStream: FileInputStream = null // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams - private var batchStream = nextBatchStream() - private var compressedStream = blockManager.wrapForCompression(blockId, batchStream) - private var deserializeStream = ser.deserializeStream(compressedStream) + private var deserializeStream = nextBatchStream() private var nextItem: (K, C) = null private var objectsRead = 0 /** * Construct a stream that reads only from the next batch. */ - private def nextBatchStream(): InputStream = { - if (batchSizes.length > 0) { - ByteStreams.limit(bufferedStream, batchSizes.remove(0)) + private def nextBatchStream(): DeserializationStream = { + // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether + // we're still in a valid batch. + if (batchIndex < batchOffsets.length - 1) { + if (deserializeStream != null) { + deserializeStream.close() + fileStream.close() + deserializeStream = null + fileStream = null + } + + val start = batchOffsets(batchIndex) + fileStream = new FileInputStream(file) + fileStream.getChannel.position(start) + batchIndex += 1 + + val end = batchOffsets(batchIndex) + + assert(end >= start, "start = " + start + ", end = " + end + + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) + + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + ser.deserializeStream(compressedStream) } else { // No more batches left - bufferedStream + cleanup() + null } } @@ -424,10 +463,8 @@ class ExternalAppendOnlyMap[K, V, C]( val item = deserializeStream.readObject().asInstanceOf[(K, C)] objectsRead += 1 if (objectsRead == serializerBatchSize) { - batchStream = nextBatchStream() - compressedStream = blockManager.wrapForCompression(blockId, batchStream) - deserializeStream = ser.deserializeStream(compressedStream) objectsRead = 0 + deserializeStream = nextBatchStream() } item } catch { @@ -439,6 +476,9 @@ class ExternalAppendOnlyMap[K, V, C]( override def hasNext: Boolean = { if (nextItem == null) { + if (deserializeStream == null) { + return false + } nextItem = readNextItem() } nextItem != null @@ -455,7 +495,11 @@ class ExternalAppendOnlyMap[K, V, C]( // TODO: Ensure this gets called even if the iterator isn't drained. private def cleanup() { - deserializeStream.close() + batchIndex = batchOffsets.length // Prevent reading any other batch + val ds = deserializeStream + deserializeStream = null + fileStream = null + ds.close() file.delete() } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 6e415a2bd8ce2..b04c50bd3e196 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import com.google.common.io.ByteStreams import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} -import org.apache.spark.serializer.Serializer +import org.apache.spark.serializer.{DeserializationStream, Serializer} import org.apache.spark.storage.BlockId /** @@ -273,13 +273,16 @@ private[spark] class ExternalSorter[K, V, C]( // Flush the disk writer's contents to disk, and update relevant variables. // The writer is closed at the end of this process, and cannot be reused. def flush() = { - writer.commitAndClose() - val bytesWritten = writer.bytesWritten + val w = writer + writer = null + w.commitAndClose() + val bytesWritten = w.bytesWritten batchSizes.append(bytesWritten) _diskBytesSpilled += bytesWritten objectsWritten = 0 } + var success = false try { val it = collection.destructiveSortedIterator(partitionKeyComparator) while (it.hasNext) { @@ -299,13 +302,23 @@ private[spark] class ExternalSorter[K, V, C]( } if (objectsWritten > 0) { flush() + } else if (writer != null) { + val w = writer + writer = null + w.revertPartialWritesAndClose() + } + success = true + } finally { + if (!success) { + // This code path only happens if an exception was thrown above before we set success; + // close our stuff and let the exception be thrown further + if (writer != null) { + writer.revertPartialWritesAndClose() + } + if (file.exists()) { + file.delete() + } } - writer.close() - } catch { - case e: Exception => - writer.close() - file.delete() - throw e } if (usingMap) { @@ -472,36 +485,58 @@ private[spark] class ExternalSorter[K, V, C]( * partitions to be requested in order. */ private[this] class SpillReader(spill: SpilledFile) { - val fileStream = new FileInputStream(spill.file) - val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) + // Serializer batch offsets; size will be batchSize.length + 1 + val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _) // Track which partition and which batch stream we're in. These will be the indices of // the next element we will read. We'll also store the last partition read so that // readNextPartition() can figure out what partition that was from. var partitionId = 0 var indexInPartition = 0L - var batchStreamsRead = 0 + var batchId = 0 var indexInBatch = 0 var lastPartitionId = 0 skipToNextPartition() - // An intermediate stream that reads from exactly one batch + + // Intermediate file and deserializer streams that read from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams - var batchStream = nextBatchStream() - var compressedStream = blockManager.wrapForCompression(spill.blockId, batchStream) - var deserStream = serInstance.deserializeStream(compressedStream) + var fileStream: FileInputStream = null + var deserializeStream = nextBatchStream() // Also sets fileStream + var nextItem: (K, C) = null var finished = false /** Construct a stream that only reads from the next batch */ - def nextBatchStream(): InputStream = { - if (batchStreamsRead < spill.serializerBatchSizes.length) { - batchStreamsRead += 1 - ByteStreams.limit(bufferedStream, spill.serializerBatchSizes(batchStreamsRead - 1)) + def nextBatchStream(): DeserializationStream = { + // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether + // we're still in a valid batch. + if (batchId < batchOffsets.length - 1) { + if (deserializeStream != null) { + deserializeStream.close() + fileStream.close() + deserializeStream = null + fileStream = null + } + + val start = batchOffsets(batchId) + fileStream = new FileInputStream(spill.file) + fileStream.getChannel.position(start) + batchId += 1 + + val end = batchOffsets(batchId) + + assert(end >= start, "start = " + start + ", end = " + end + + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) + + val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) + val compressedStream = blockManager.wrapForCompression(spill.blockId, bufferedStream) + serInstance.deserializeStream(compressedStream) } else { - // No more batches left; give an empty stream - bufferedStream + // No more batches left + cleanup() + null } } @@ -525,19 +560,17 @@ private[spark] class ExternalSorter[K, V, C]( * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { - if (finished) { + if (finished || deserializeStream == null) { return null } - val k = deserStream.readObject().asInstanceOf[K] - val c = deserStream.readObject().asInstanceOf[C] + val k = deserializeStream.readObject().asInstanceOf[K] + val c = deserializeStream.readObject().asInstanceOf[C] lastPartitionId = partitionId // Start reading the next batch if we're done with this one indexInBatch += 1 if (indexInBatch == serializerBatchSize) { - batchStream = nextBatchStream() - compressedStream = blockManager.wrapForCompression(spill.blockId, batchStream) - deserStream = serInstance.deserializeStream(compressedStream) indexInBatch = 0 + deserializeStream = nextBatchStream() } // Update the partition location of the element we're reading indexInPartition += 1 @@ -545,7 +578,9 @@ private[spark] class ExternalSorter[K, V, C]( // If we've finished reading the last partition, remember that we're done if (partitionId == numPartitions) { finished = true - deserStream.close() + if (deserializeStream != null) { + deserializeStream.close() + } } (k, c) } @@ -578,6 +613,17 @@ private[spark] class ExternalSorter[K, V, C]( item } } + + // Clean up our open streams and put us in a state where we can't read any more data + def cleanup() { + batchId = batchOffsets.length // Prevent reading any other batch + val ds = deserializeStream + deserializeStream = null + fileStream = null + ds.close() + // NOTE: We don't do file.delete() here because that is done in ExternalSorter.stop(). + // This should also be fixed in ExternalAppendOnlyMap. + } } /** diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 7de5df6e1c8bd..04d7338488628 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2 + private def createSparkConf(loadDefaults: Boolean): SparkConf = { + val conf = new SparkConf(loadDefaults) + // Make the Java serializer write a reset instruction (TC_RESET) after each object to test + // for a bug we had with bytes written past the last object in a batch (SPARK-2792) + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + // Ensure that we actually have multiple batches per spill file + conf.set("spark.shuffle.spill.batchSize", "10") + conf + } + test("simple insert") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) sc = new SparkContext("local", "test", conf) val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, @@ -57,7 +68,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("insert with collision") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) sc = new SparkContext("local", "test", conf) val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, @@ -80,7 +91,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("ordering") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) sc = new SparkContext("local", "test", conf) val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, @@ -125,7 +136,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("null keys and values") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) sc = new SparkContext("local", "test", conf) val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, @@ -166,7 +177,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("simple aggregator") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) sc = new SparkContext("local", "test", conf) // reduceByKey @@ -181,7 +192,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("simple cogroup") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) sc = new SparkContext("local", "test", conf) val rdd1 = sc.parallelize(1 to 4).map(i => (i, i)) val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i)) @@ -199,7 +210,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("spilling") { - val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found + val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) @@ -249,7 +260,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("spilling with hash collisions") { - val conf = new SparkConf(true) + val conf = createSparkConf(true) conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) @@ -304,7 +315,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("spilling with many hash collisions") { - val conf = new SparkConf(true) + val conf = createSparkConf(true) conf.set("spark.shuffle.memoryFraction", "0.0001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) @@ -329,7 +340,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("spilling with hash collisions using the Int.MaxValue key") { - val conf = new SparkConf(true) + val conf = createSparkConf(true) conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) @@ -347,7 +358,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } test("spilling with null keys and values") { - val conf = new SparkConf(true) + val conf = createSparkConf(true) conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 65a71e5a83698..57dcb4ffabac1 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -25,6 +25,17 @@ import org.apache.spark._ import org.apache.spark.SparkContext._ class ExternalSorterSuite extends FunSuite with LocalSparkContext { + private def createSparkConf(loadDefaults: Boolean): SparkConf = { + val conf = new SparkConf(loadDefaults) + // Make the Java serializer write a reset instruction (TC_RESET) after each object to test + // for a bug we had with bytes written past the last object in a batch (SPARK-2792) + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + // Ensure that we actually have multiple batches per spill file + conf.set("spark.shuffle.spill.batchSize", "10") + conf + } + test("empty data stream") { val conf = new SparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") @@ -60,7 +71,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("few elements per partition") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -102,7 +113,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("empty partitions with spilling") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -127,7 +138,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("spilling in local cluster") { - val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found + val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) @@ -198,7 +209,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("spilling in local cluster with many reduce tasks") { - val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found + val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local-cluster[2,1,512]", "test", conf) @@ -269,7 +280,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("cleanup of intermediate files in sorter") { - val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found + val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -290,7 +301,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("cleanup of intermediate files in sorter if there are errors") { - val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found + val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -311,7 +322,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("cleanup of intermediate files in shuffle") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -326,7 +337,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("cleanup of intermediate files in shuffle with errors") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -348,7 +359,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("no partial aggregation or sorting") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -363,7 +374,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("partial aggregation without spill") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -379,7 +390,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("partial aggregation with spill, no ordering") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -395,7 +406,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("partial aggregation with spill, with ordering") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -412,7 +423,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("sorting without aggregation, no spill") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -429,7 +440,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("sorting without aggregation, with spill") { - val conf = new SparkConf(false) + val conf = createSparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -446,7 +457,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("spilling with hash collisions") { - val conf = new SparkConf(true) + val conf = createSparkConf(true) conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) @@ -503,7 +514,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("spilling with many hash collisions") { - val conf = new SparkConf(true) + val conf = createSparkConf(true) conf.set("spark.shuffle.memoryFraction", "0.0001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) @@ -526,7 +537,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("spilling with hash collisions using the Int.MaxValue key") { - val conf = new SparkConf(true) + val conf = createSparkConf(true) conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) @@ -547,7 +558,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { } test("spilling with null keys and values") { - val conf = new SparkConf(true) + val conf = createSparkConf(true) conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) diff --git a/docs/configuration.md b/docs/configuration.md index 2a71d7b820e5f..870343f1c0bd2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -385,7 +385,7 @@ Apart from these, the following properties are also available, and may be useful When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches objects to prevent writing redundant data, however that stops garbage collection of those objects. By calling 'reset' you flush that info from the serializer, and allow old - objects to be collected. To turn off this periodic reset set it to a value <= 0. + objects to be collected. To turn off this periodic reset set it to -1. By default it will reset the serializer every 100 objects.