From 83b7a1c6503adce1826fc537b4db47e534da5cae Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Oct 2014 16:39:32 -0700 Subject: [PATCH] [SPARK-4019] [SPARK-3740] Fix MapStatus compression bug that could lead to empty results or Snappy errors This commit fixes a bug in MapStatus that could cause jobs to wrongly return empty results if those jobs contained stages with more than 2000 partitions where most of those partitions were empty. For jobs with > 2000 partitions, MapStatus uses HighlyCompressedMapStatus, which only stores the average size of blocks. If the average block size is zero, then this will cause all blocks to be reported as empty, causing BlockFetcherIterator to mistakenly skip them. For example, this would return an empty result: sc.makeRDD(0 until 10, 1000).repartition(2001).collect() This can also lead to deserialization errors (e.g. Snappy decoding errors) for jobs with > 2000 partitions where the average block size is non-zero but there is at least one empty block. In this case, the BlockFetcher attempts to fetch empty blocks and fails when trying to deserialize them. The root problem here is that MapStatus has a (previously undocumented) correctness property that was violated by HighlyCompressedMapStatus: If a block is non-empty, then getSizeForBlock must be non-zero. I fixed this by modifying HighlyCompressedMapStatus to store the average size of _non-empty_ blocks and to use a compressed bitmap to track which blocks are empty. I also removed a test which was broken as originally written: it attempted to check that HighlyCompressedMapStatus's size estimation error was < 10%, but this was broken because HighlyCompressedMapStatus is only used for map statuses with > 2000 partitions, but the test only created 50. Author: Josh Rosen Closes #2866 from JoshRosen/spark-4019 and squashes the following commits: fc8b490 [Josh Rosen] Roll back hashset change, which didn't improve performance. 5faa0a4 [Josh Rosen] Incorporate review feedback c8b8cae [Josh Rosen] Two performance fixes: 3b892dd [Josh Rosen] Address Reynold's review comments ba2e71c [Josh Rosen] Add missing newline 609407d [Josh Rosen] Use Roaring Bitmap to track non-empty blocks. c23897a [Josh Rosen] Use sets when comparing collect() results 91276a3 [Josh Rosen] [SPARK-4019] Fix MapStatus compression bug that could lead to empty results. --- core/pom.xml | 4 + .../apache/spark/scheduler/MapStatus.scala | 76 ++++++++++++++++--- .../scala/org/apache/spark/rdd/RDDSuite.scala | 5 ++ .../spark/scheduler/MapStatusSuite.scala | 53 +++++++------ pom.xml | 5 ++ 5 files changed, 109 insertions(+), 34 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index a5a178079bc57..7b68dbaea4789 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -132,6 +132,10 @@ com.twitter chill-java + + org.roaringbitmap + RoaringBitmap + commons-net commons-net diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index e25096ea92d70..2ab5d9637b593 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,6 +19,8 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} +import org.roaringbitmap.RoaringBitmap + import org.apache.spark.storage.BlockManagerId /** @@ -29,7 +31,12 @@ private[spark] sealed trait MapStatus { /** Location where this task was run. */ def location: BlockManagerId - /** Estimated size for the reduce block, in bytes. */ + /** + * Estimated size for the reduce block, in bytes. + * + * If a block is non-empty, then this method MUST return a non-zero size. This invariant is + * necessary for correctness, since block fetchers are allowed to skip zero-size blocks. + */ def getSizeForBlock(reduceId: Int): Long } @@ -38,7 +45,7 @@ private[spark] object MapStatus { def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > 2000) { - new HighlyCompressedMapStatus(loc, uncompressedSizes) + HighlyCompressedMapStatus(loc, uncompressedSizes) } else { new CompressedMapStatus(loc, uncompressedSizes) } @@ -112,35 +119,80 @@ private[spark] class CompressedMapStatus( } } - /** - * A [[MapStatus]] implementation that only stores the average size of the blocks. + * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, + * plus a bitmap for tracking which blocks are non-empty. During serialization, this bitmap + * is compressed. * - * @param loc location where the task is being executed. - * @param avgSize average size of all the blocks + * @param loc location where the task is being executed + * @param numNonEmptyBlocks the number of non-empty blocks + * @param emptyBlocks a bitmap tracking which blocks are empty + * @param avgSize average size of the non-empty blocks */ -private[spark] class HighlyCompressedMapStatus( +private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, + private[this] var numNonEmptyBlocks: Int, + private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long) extends MapStatus with Externalizable { - def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) { - this(loc, uncompressedSizes.sum / uncompressedSizes.length) - } + // loc could be null when the default constructor is called during deserialization + require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, + "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, 0L) // For deserialization only + protected def this() = this(null, -1, null, -1) // For deserialization only override def location: BlockManagerId = loc - override def getSizeForBlock(reduceId: Int): Long = avgSize + override def getSizeForBlock(reduceId: Int): Long = { + if (emptyBlocks.contains(reduceId)) { + 0 + } else { + avgSize + } + } override def writeExternal(out: ObjectOutput): Unit = { loc.writeExternal(out) + emptyBlocks.writeExternal(out) out.writeLong(avgSize) } override def readExternal(in: ObjectInput): Unit = { loc = BlockManagerId(in) + emptyBlocks = new RoaringBitmap() + emptyBlocks.readExternal(in) avgSize = in.readLong() } } + +private[spark] object HighlyCompressedMapStatus { + def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { + // We must keep track of which blocks are empty so that we don't report a zero-sized + // block as being non-empty (or vice-versa) when using the average block size. + var i = 0 + var numNonEmptyBlocks: Int = 0 + var totalSize: Long = 0 + // From a compression standpoint, it shouldn't matter whether we track empty or non-empty + // blocks. From a performance standpoint, we benefit from tracking empty blocks because + // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. + val emptyBlocks = new RoaringBitmap() + val totalNumBlocks = uncompressedSizes.length + while (i < totalNumBlocks) { + var size = uncompressedSizes(i) + if (size > 0) { + numNonEmptyBlocks += 1 + totalSize += size + } else { + emptyBlocks.add(i) + } + i += 1 + } + val avgSize = if (numNonEmptyBlocks > 0) { + totalSize / numNonEmptyBlocks + } else { + 0 + } + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) + } +} diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 465c1a8a43a79..6d2e696dc2fc4 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -459,6 +459,11 @@ class RDDSuite extends FunSuite with SharedSparkContext { for (i <- 0 until sample.size) assert(sample(i) === checkSample(i)) } + test("collect large number of empty partitions") { + // Regression test for SPARK-4019 + assert(sc.makeRDD(0 until 10, 1000).repartition(2001).collect().toSet === (0 until 10).toSet) + } + test("take") { var nums = sc.makeRDD(Range(1, 1000), 1) assert(nums.take(0).size === 0) diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 79e04f046e4c4..950c6dc58e332 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.FunSuite import org.apache.spark.SparkConf import org.apache.spark.serializer.JavaSerializer +import scala.util.Random class MapStatusSuite extends FunSuite { @@ -46,6 +47,26 @@ class MapStatusSuite extends FunSuite { } } + test("MapStatus should never report non-empty blocks' sizes as 0") { + import Math._ + for ( + numSizes <- Seq(1, 10, 100, 1000, 10000); + mean <- Seq(0L, 100L, 10000L, Int.MaxValue.toLong); + stddev <- Seq(0.0, 0.01, 0.5, 1.0) + ) { + val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean) + val status = MapStatus(BlockManagerId("a", "b", 10), sizes) + val status1 = compressAndDecompressMapStatus(status) + for (i <- 0 until numSizes) { + if (sizes(i) != 0) { + val failureMessage = s"Failed with $numSizes sizes with mean=$mean, stddev=$stddev" + assert(status.getSizeForBlock(i) !== 0, failureMessage) + assert(status1.getSizeForBlock(i) !== 0, failureMessage) + } + } + } + } + test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) { val sizes = Array.fill[Long](2001)(150L) val status = MapStatus(null, sizes) @@ -56,37 +77,25 @@ class MapStatusSuite extends FunSuite { assert(status.getSizeForBlock(2000) === 150L) } - test(classOf[HighlyCompressedMapStatus].getName + ": estimated size is within 10%") { - val sizes = Array.tabulate[Long](50) { i => i.toLong } + test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") { + val sizes = Array.tabulate[Long](3000) { i => i.toLong } + val avg = sizes.sum / sizes.filter(_ != 0).length val loc = BlockManagerId("a", "b", 10) val status = MapStatus(loc, sizes) - val ser = new JavaSerializer(new SparkConf) - val buf = ser.newInstance().serialize(status) - val status1 = ser.newInstance().deserialize[MapStatus](buf) + val status1 = compressAndDecompressMapStatus(status) + assert(status1.isInstanceOf[HighlyCompressedMapStatus]) assert(status1.location == loc) - for (i <- 0 until sizes.length) { - // make sure the estimated size is within 10% of the input; note that we skip the very small - // sizes because the compression is very lossy there. + for (i <- 0 until 3000) { val estimate = status1.getSizeForBlock(i) - if (estimate > 100) { - assert(math.abs(estimate - sizes(i)) * 10 <= sizes(i), - s"incorrect estimated size $estimate, original was ${sizes(i)}") + if (sizes(i) > 0) { + assert(estimate === avg) } } } - test(classOf[HighlyCompressedMapStatus].getName + ": estimated size should be the average size") { - val sizes = Array.tabulate[Long](3000) { i => i.toLong } - val avg = sizes.sum / sizes.length - val loc = BlockManagerId("a", "b", 10) - val status = MapStatus(loc, sizes) + def compressAndDecompressMapStatus(status: MapStatus): MapStatus = { val ser = new JavaSerializer(new SparkConf) val buf = ser.newInstance().serialize(status) - val status1 = ser.newInstance().deserialize[MapStatus](buf) - assert(status1.location == loc) - for (i <- 0 until 3000) { - val estimate = status1.getSizeForBlock(i) - assert(estimate === avg) - } + ser.newInstance().deserialize[MapStatus](buf) } } diff --git a/pom.xml b/pom.xml index 288bbf1114bea..a7e71f9ca5596 100644 --- a/pom.xml +++ b/pom.xml @@ -428,6 +428,11 @@ + + org.roaringbitmap + RoaringBitmap + 0.4.1 + commons-net commons-net