From e8f149706aa6a57699a02e5aba718e8f7c7e7b4c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 11 May 2016 17:25:57 -0700 Subject: [PATCH] [SPARK-15260] Atomically resize memory pools (branch 1.6) ## What changes were proposed in this pull request? (This is the branch-1.6 version of #13039) When we acquire execution memory, we do a lot of things between shrinking the storage memory pool and enlarging the execution memory pool. In particular, we call memoryStore.evictBlocksToFreeSpace, which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes on that executor will be in a bad state. This patch minimizes the things we do between the two calls to make the resizing more atomic. ## How was this patch tested? Jenkins. Author: Andrew Or Closes #13058 from andrewor14/safer-pool-1.6. (cherry picked from commit fd2da7b91e33e8fc994c4a6a0524831807f1324f) --- .../spark/memory/StorageMemoryPool.scala | 11 ++++----- .../spark/memory/UnifiedMemoryManager.scala | 15 ++++++++---- .../spark/memory/MemoryManagerSuite.scala | 15 ++++++++++++ .../memory/UnifiedMemoryManagerSuite.scala | 23 +++++++++++++++++++ 4 files changed, 53 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 70af83b5ee092..89edaf58ebc29 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w } /** - * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number - * of bytes removed from the pool's capacity. + * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes. + * Note: this method doesn't actually reduce the pool size but relies on the caller to do so. + * + * @return number of bytes to be removed from the pool's capacity. */ - def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized { - // First, shrink the pool by reclaiming free memory: + def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) - decrementPoolSize(spaceFreedByReleasingUnusedMemory) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: @@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. - decrementPoolSize(spaceFreedByEviction) spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 829f054dba0e9..802087c82b713 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] ( storageRegionSize, maxMemory - storageRegionSize) { + assertInvariant() + // We always maintain this invariant: - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + private def assertInvariant(): Unit = { + assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + } override def maxStorageMemory: Long = synchronized { maxMemory - onHeapExecutionMemoryPool.memoryUsed @@ -77,7 +81,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + assertInvariant() assert(numBytes >= 0) memoryMode match { case MemoryMode.ON_HEAP => @@ -99,9 +103,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: - val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace( + val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) - onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed) + storageMemoryPool.decrementPoolSize(spaceToReclaim) + onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim) } } } @@ -137,7 +142,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + assertInvariant() assert(numBytes >= 0) if (numBytes > maxStorageMemory) { // Fail fast if the block simply won't fit diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 555b640cb4244..6a195ef7fe5b3 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -76,6 +76,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft ms } + /** + * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is + * stubbed to always throw [[RuntimeException]]. + */ + protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = { + val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS) + when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] { + override def answer(invocation: InvocationOnMock): Long = { + throw new RuntimeException("bad memory store!") + } + }) + mm.setMemoryStore(ms) + ms + } + /** * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory. * diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 6cc48597d38f9..46b6916a12fc2 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -255,4 +255,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(evictedBlocks.nonEmpty) } + test("SPARK-15260: atomically resize memory pools") { + val conf = new SparkConf() + .set("spark.memory.fraction", "1") + .set("spark.memory.storageFraction", "0") + .set("spark.testing.memory", "1000") + val mm = UnifiedMemoryManager(conf, numCores = 2) + makeBadMemoryStore(mm) + val memoryMode = MemoryMode.ON_HEAP + // Acquire 1000 then release 600 bytes of storage memory, leaving the + // storage memory pool at 1000 bytes but only 400 bytes of which are used. + assert(mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks)) + mm.releaseStorageMemory(600L) + // Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of + // unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool + // by the same amount. If the eviction threw an exception, then we would shrink one pool + // without enlarging the other, resulting in an assertion failure. + intercept[RuntimeException] { + mm.acquireExecutionMemory(1000L, 0, memoryMode) + } + val assertInvariant = PrivateMethod[Unit]('assertInvariant) + mm.invokePrivate[Unit](assertInvariant()) + } + }