Skip to content

Commit

Permalink
[SPARK-21090][CORE] Optimize the unified memory manager code
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
1.In `acquireStorageMemory`, when the Memory Mode is OFF_HEAP ,the `maxOffHeapMemory` should be modified to `maxOffHeapStorageMemory`. after this PR,it will same as ON_HEAP Memory Mode.
Because when acquire memory is between `maxOffHeapStorageMemory` and `maxOffHeapMemory`,it will fail surely, so if acquire memory is greater than  `maxOffHeapStorageMemory`(not greater than `maxOffHeapMemory`),we should fail fast.
2. Borrow memory from execution, `numBytes` modified to `numBytes - storagePool.memoryFree` will be more reasonable.
Because we just acquire `(numBytes - storagePool.memoryFree)`, unnecessary borrowed `numBytes` from execution

## How was this patch tested?
added unit test case

Author: liuxian <[email protected]>

Closes #18296 from 10110346/wip-lx-0614.

(cherry picked from commit 112bd9b)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
10110346 authored and cloud-fan committed Jun 19, 2017
1 parent c0d4acc commit d3c79b7
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapMemory)
maxOffHeapStorageMemory)
}
if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
Expand All @@ -171,7 +171,8 @@ private[spark] class UnifiedMemoryManager private[memory] (
if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
evictBlocksToFreeSpaceCalled.set(numBytesToFree)
if (numBytesToFree <= mm.storageMemoryUsed) {
// We can evict enough blocks to fulfill the request for space
mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP)
mm.releaseStorageMemory(numBytesToFree, mm.tungstenMemoryMode)
evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))
numBytesToFree
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,36 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
mm.invokePrivate[Unit](assertInvariants())
}

test("not enough free memory in the storage pool --OFF_HEAP") {
val conf = new SparkConf()
.set("spark.memory.offHeap.size", "1000")
.set("spark.testing.memory", "1000")
.set("spark.memory.offHeap.enabled", "true")
val taskAttemptId = 0L
val mm = UnifiedMemoryManager(conf, numCores = 1)
val ms = makeMemoryStore(mm)
val memoryMode = MemoryMode.OFF_HEAP

assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 400L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 400L)

// Fail fast
assert(!mm.acquireStorageMemory(dummyBlock, 700L, memoryMode))
assert(mm.storageMemoryUsed === 0L)

assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
assert(mm.storageMemoryUsed === 100L)
assertEvictBlocksToFreeSpaceNotCalled(ms)

// Borrow 50 from execution memory
assert(mm.acquireStorageMemory(dummyBlock, 450L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 550L)

// Borrow 50 from execution memory and evict 50 to free space
assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
assertEvictBlocksToFreeSpaceCalled(ms, 50)
assert(mm.storageMemoryUsed === 600L)
}
}

0 comments on commit d3c79b7

Please sign in to comment.