Skip to content

Commit

Permalink
[SPARK-15260] Atomically resize memory pools (branch 1.6)
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

(This is the branch-1.6 version of apache#13039)

When we acquire execution memory, we do a lot of things between shrinking the storage memory pool and enlarging the execution memory pool. In particular, we call memoryStore.evictBlocksToFreeSpace, which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes on that executor will be in a bad state.

This patch minimizes the things we do between the two calls to make the resizing more atomic.

## How was this patch tested?

Jenkins.

Author: Andrew Or <[email protected]>

Closes apache#13058 from andrewor14/safer-pool-1.6.

(cherry picked from commit fd2da7b)
  • Loading branch information
Andrew Or authored and zzcclp committed May 12, 2016
1 parent 7acd3e9 commit e8f1497
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
}

/**
* Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
* of bytes removed from the pool's capacity.
* Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
* Note: this method doesn't actually reduce the pool size but relies on the caller to do so.
*
* @return number of bytes to be removed from the pool's capacity.
*/
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
// First, shrink the pool by reclaiming free memory:
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
Expand All @@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
storageRegionSize,
maxMemory - storageRegionSize) {

assertInvariant()

// We always maintain this invariant:
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
private def assertInvariant(): Unit = {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
}

override def maxStorageMemory: Long = synchronized {
maxMemory - onHeapExecutionMemoryPool.memoryUsed
Expand All @@ -77,7 +81,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
Expand All @@ -99,9 +103,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
storageMemoryPool.decrementPoolSize(spaceToReclaim)
onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
}
}
}
Expand Down Expand Up @@ -137,7 +142,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
ms
}

/**
* Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is
* stubbed to always throw [[RuntimeException]].
*/
protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] {
override def answer(invocation: InvocationOnMock): Long = {
throw new RuntimeException("bad memory store!")
}
})
mm.setMemoryStore(ms)
ms
}

/**
* Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(evictedBlocks.nonEmpty)
}

test("SPARK-15260: atomically resize memory pools") {
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "0")
.set("spark.testing.memory", "1000")
val mm = UnifiedMemoryManager(conf, numCores = 2)
makeBadMemoryStore(mm)
val memoryMode = MemoryMode.ON_HEAP
// Acquire 1000 then release 600 bytes of storage memory, leaving the
// storage memory pool at 1000 bytes but only 400 bytes of which are used.
assert(mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
mm.releaseStorageMemory(600L)
// Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of
// unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool
// by the same amount. If the eviction threw an exception, then we would shrink one pool
// without enlarging the other, resulting in an assertion failure.
intercept[RuntimeException] {
mm.acquireExecutionMemory(1000L, 0, memoryMode)
}
val assertInvariant = PrivateMethod[Unit]('assertInvariant)
mm.invokePrivate[Unit](assertInvariant())
}

}

0 comments on commit e8f1497

Please sign in to comment.