Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 11, 2016
1 parent 632ae14 commit df67bda
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
ms
}

/**
* Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is
* stubbed to always throw [[RuntimeException]].
*/
protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] {
override def answer(invocation: InvocationOnMock): Long = {
throw new RuntimeException("bad memory store!")
}
})
mm.setMemoryStore(ms)
ms
}

/**
* Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(evictedBlocks.nonEmpty)
}

test("SPARK-15260: atomically resize memory pools") {
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "0")
.set("spark.testing.memory", "1000")
val mm = UnifiedMemoryManager(conf, numCores = 2)
makeBadMemoryStore(mm)
val memoryMode = MemoryMode.ON_HEAP
// Acquire 1000 then release 600 bytes of storage memory, leaving the
// storage memory pool at 1000 bytes but only 400 bytes of which are used.
assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode))
mm.releaseStorageMemory(600L, memoryMode)
// Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of
// unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool
// by the same amount. If the eviction threw an exception, then we would shrink one pool
// without enlarging the other, resulting in an assertion failure.
intercept[RuntimeException] {
mm.acquireExecutionMemory(1000L, 0, memoryMode)
}
val assertInvariants = PrivateMethod[Unit]('assertInvariants)
mm.invokePrivate[Unit](assertInvariants())
}

}

0 comments on commit df67bda

Please sign in to comment.