From 2f8abb4afc08aa8dc4ed763bcb93ff6b1d6f0d78 Mon Sep 17 00:00:00 2001 From: Adam Budde Date: Tue, 2 Feb 2016 19:35:33 -0800 Subject: [PATCH] [SPARK-13122] Fix race condition in MemoryStore.unrollSafely() https://issues.apache.org/jira/browse/SPARK-13122 A race condition can occur in MemoryStore's unrollSafely() method if two threads that return the same value for currentTaskAttemptId() execute this method concurrently. This change makes the operation of reading the initial amount of unroll memory used, performing the unroll, and updating the associated memory maps atomic in order to avoid this race condition. Initial proposed fix wraps all of unrollSafely() in a memoryManager.synchronized { } block. A cleaner approach might be introduce a mechanism that synchronizes based on task attempt ID. An alternative option might be to track unroll/pending unroll memory based on block ID rather than task attempt ID. Author: Adam Budde Closes #11012 from budde/master. (cherry picked from commit ff71261b651a7b289ea2312abd6075da8b838ed9) Signed-off-by: Andrew Or Conflicts: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- .../org/apache/spark/storage/MemoryStore.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index bdab8c2332fae..17aae6e1bbddb 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -264,8 +264,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 - // Previous unroll memory held by this task, for releasing later (only at the very end) - val previousMemoryReserved = currentUnrollMemoryForThisTask + // Keep track of pending unroll memory reserved by this method. + var pendingMemoryReserved = 0L // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] @@ -275,6 +275,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") + } else { + pendingMemoryReserved += initialMemoryThreshold } // Unroll this block safely, checking whether we have exceeded our threshold periodically @@ -288,6 +290,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong keepUnrolling = reserveUnrollMemoryForThisTask( blockId, amountToRequest, droppedBlocks) + if (keepUnrolling) { + pendingMemoryReserved += amountToRequest + } // New threshold is currentSize * memoryGrowthFactor memoryThreshold += amountToRequest } @@ -314,10 +319,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // release the unroll memory yet. Instead, we transfer it to pending unroll memory // so `tryToPut` can further transfer it to normal storage memory later. // TODO: we can probably express this without pending unroll memory (SPARK-10907) - val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved - unrollMemoryMap(taskAttemptId) -= amountToTransferToPending + unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved } } else { // Otherwise, if we return an iterator, we can only release the unroll memory when