diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index bdab8c2332fae..17aae6e1bbddb 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -264,8 +264,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 - // Previous unroll memory held by this task, for releasing later (only at the very end) - val previousMemoryReserved = currentUnrollMemoryForThisTask + // Keep track of pending unroll memory reserved by this method. + var pendingMemoryReserved = 0L // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] @@ -275,6 +275,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") + } else { + pendingMemoryReserved += initialMemoryThreshold } // Unroll this block safely, checking whether we have exceeded our threshold periodically @@ -288,6 +290,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong keepUnrolling = reserveUnrollMemoryForThisTask( blockId, amountToRequest, droppedBlocks) + if (keepUnrolling) { + pendingMemoryReserved += amountToRequest + } // New threshold is currentSize * memoryGrowthFactor memoryThreshold += amountToRequest } @@ -314,10 +319,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // release the unroll memory yet. Instead, we transfer it to pending unroll memory // so `tryToPut` can further transfer it to normal storage memory later. // TODO: we can probably express this without pending unroll memory (SPARK-10907) - val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved - unrollMemoryMap(taskAttemptId) -= amountToTransferToPending + unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved } } else { // Otherwise, if we return an iterator, we can only release the unroll memory when