Skip to content

Commit

Permalink
[SPARK-13122] Fix race condition in MemoryStore.unrollSafely()
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/SPARK-13122

A race condition can occur in MemoryStore's unrollSafely() method if two threads that
return the same value for currentTaskAttemptId() execute this method concurrently. This
change makes the operation of reading the initial amount of unroll memory used, performing
the unroll, and updating the associated memory maps atomic in order to avoid this race
condition.

Initial proposed fix wraps all of unrollSafely() in a memoryManager.synchronized { } block. A cleaner approach might be introduce a mechanism that synchronizes based on task attempt ID. An alternative option might be to track unroll/pending unroll memory based on block ID rather than task attempt ID.

Author: Adam Budde <[email protected]>

Closes #11012 from budde/master.

(cherry picked from commit ff71261)
Signed-off-by: Andrew Or <[email protected]>

Conflicts:
	core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
  • Loading branch information
Adam Budde authored and Andrew Or committed Feb 3, 2016
1 parent e81333b commit 2f8abb4
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5
// Previous unroll memory held by this task, for releasing later (only at the very end)
val previousMemoryReserved = currentUnrollMemoryForThisTask
// Keep track of pending unroll memory reserved by this method.
var pendingMemoryReserved = 0L
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[Any]

Expand All @@ -275,6 +275,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
pendingMemoryReserved += initialMemoryThreshold
}

// Unroll this block safely, checking whether we have exceeded our threshold periodically
Expand All @@ -288,6 +290,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(
blockId, amountToRequest, droppedBlocks)
if (keepUnrolling) {
pendingMemoryReserved += amountToRequest
}
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
Expand All @@ -314,10 +319,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
// release the unroll memory yet. Instead, we transfer it to pending unroll memory
// so `tryToPut` can further transfer it to normal storage memory later.
// TODO: we can probably express this without pending unroll memory (SPARK-10907)
val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
pendingUnrollMemoryMap(taskAttemptId) =
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved
}
} else {
// Otherwise, if we return an iterator, we can only release the unroll memory when
Expand Down

0 comments on commit 2f8abb4

Please sign in to comment.