Skip to content

Commit

Permalink
[SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace
Browse files Browse the repository at this point in the history
MemoryStore.evictBlocksToFreeSpace acquires write locks for all the
blocks it intends to evict up front.  If there is a failure to evict
blocks (eg., some failure dropping a block to disk), then we have to
release the lock.  Otherwise the lock is never released and an executor
trying to get the lock will wait forever.

Added unit test.

Author: Imran Rashid <[email protected]>

Closes apache#19311 from squito/SPARK-22083.
  • Loading branch information
squito authored and Vinitha Gankidi committed Nov 28, 2017
1 parent 8327341 commit 08d7467
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
import org.apache.spark.storage._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
Expand Down Expand Up @@ -526,20 +526,38 @@ private[spark] class MemoryStore(
}

if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
s"(${Utils.bytesToString(freedMemory)} bytes)")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
dropBlock(blockId, entry)
var lastSuccessfulBlock = -1
try {
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
s"(${Utils.bytesToString(freedMemory)} bytes)")
(0 until selectedBlocks.size).foreach { idx =>
val blockId = selectedBlocks(idx)
val entry = entries.synchronized {
entries.get(blockId)
}
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
dropBlock(blockId, entry)
afterDropAction(blockId)
}
lastSuccessfulBlock = idx
}
logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
freedMemory
} finally {
// like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal
// with InterruptedException
if (lastSuccessfulBlock != selectedBlocks.size - 1) {
// the blocks we didn't process successfully are still locked, so we have to unlock them
(lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
val blockId = selectedBlocks(idx)
blockInfoManager.unlock(blockId)
}
}
}
logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
freedMemory
} else {
blockId.foreach { id =>
logInfo(s"Will not store $id")
Expand All @@ -552,6 +570,9 @@ private[spark] class MemoryStore(
}
}

// hook for testing, so we can simulate a race
protected def afterDropAction(blockId: BlockId): Unit = {}

def contains(blockId: BlockId): Boolean = {
entries.synchronized { entries.containsKey(blockId) }
}
Expand Down
119 changes: 119 additions & 0 deletions core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,123 @@ class MemoryStoreSuite
})
assert(memoryStore.getSize(blockId) === 10000)
}

test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
// Setup a memory store with many blocks cached, and then one request which leads to multiple
// blocks getting evicted. We'll make the eviction throw an exception, and make sure that
// all locks are released.
val ct = implicitly[ClassTag[Array[Byte]]]
val numInitialBlocks = 10
val memStoreSize = 100
val bytesPerSmallBlock = memStoreSize / numInitialBlocks
def testFailureOnNthDrop(numValidBlocks: Int, readLockAfterDrop: Boolean): Unit = {
val tc = TaskContext.empty()
val memManager = new StaticMemoryManager(conf, Long.MaxValue, memStoreSize, numCores = 1)
val blockInfoManager = new BlockInfoManager
blockInfoManager.registerTask(tc.taskAttemptId)
var droppedSoFar = 0
val blockEvictionHandler = new BlockEvictionHandler {
var memoryStore: MemoryStore = _

override private[storage] def dropFromMemory[T: ClassTag](
blockId: BlockId,
data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
if (droppedSoFar < numValidBlocks) {
droppedSoFar += 1
memoryStore.remove(blockId)
if (readLockAfterDrop) {
// for testing purposes, we act like another thread gets the read lock on the new
// block
StorageLevel.DISK_ONLY
} else {
StorageLevel.NONE
}
} else {
throw new RuntimeException(s"Mock error dropping block $droppedSoFar")
}
}
}
val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager,
blockEvictionHandler) {
override def afterDropAction(blockId: BlockId): Unit = {
if (readLockAfterDrop) {
// pretend that we get a read lock on the block (now on disk) in another thread
TaskContext.setTaskContext(tc)
blockInfoManager.lockForReading(blockId)
TaskContext.unset()
}
}
}

blockEvictionHandler.memoryStore = memoryStore
memManager.setMemoryStore(memoryStore)

// Put in some small blocks to fill up the memory store
val initialBlocks = (1 to numInitialBlocks).map { id =>
val blockId = BlockId(s"rdd_1_$id")
val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false)
val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
assert(initialWriteLock)
val success = memoryStore.putBytes(blockId, bytesPerSmallBlock, MemoryMode.ON_HEAP, () => {
new ChunkedByteBuffer(ByteBuffer.allocate(bytesPerSmallBlock))
})
assert(success)
blockInfoManager.unlock(blockId, None)
}
assert(blockInfoManager.size === numInitialBlocks)


// Add one big block, which will require evicting everything in the memorystore. However our
// mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared.
val largeBlockId = BlockId(s"rdd_2_1")
val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false)
val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
assert(initialWriteLock)
if (numValidBlocks < numInitialBlocks) {
val exc = intercept[RuntimeException] {
memoryStore.putBytes(largeBlockId, memStoreSize, MemoryMode.ON_HEAP, () => {
new ChunkedByteBuffer(ByteBuffer.allocate(memStoreSize))
})
}
assert(exc.getMessage().startsWith("Mock error dropping block"), exc)
// BlockManager.doPut takes care of releasing the lock for the newly written block -- not
// testing that here, so do it manually
blockInfoManager.removeBlock(largeBlockId)
} else {
memoryStore.putBytes(largeBlockId, memStoreSize, MemoryMode.ON_HEAP, () => {
new ChunkedByteBuffer(ByteBuffer.allocate(memStoreSize))
})
// BlockManager.doPut takes care of releasing the lock for the newly written block -- not
// testing that here, so do it manually
blockInfoManager.unlock(largeBlockId)
}

val largeBlockInMemory = if (numValidBlocks == numInitialBlocks) 1 else 0
val expBlocks = numInitialBlocks +
(if (readLockAfterDrop) 0 else -numValidBlocks) +
largeBlockInMemory
assert(blockInfoManager.size === expBlocks)

val blocksStillInMemory = blockInfoManager.entries.filter { case (id, info) =>
assert(info.writerTask === BlockInfo.NO_WRITER, id)
// in this test, all the blocks in memory have no reader, but everything dropped to disk
// had another thread read the block. We shouldn't lose the other thread's reader lock.
if (memoryStore.contains(id)) {
assert(info.readerCount === 0, id)
true
} else {
assert(info.readerCount === 1, id)
false
}
}
assert(blocksStillInMemory.size ===
(numInitialBlocks - numValidBlocks + largeBlockInMemory))
}

Seq(0, 3, numInitialBlocks).foreach { failAfterDropping =>
Seq(true, false).foreach { readLockAfterDropping =>
testFailureOnNthDrop(failAfterDropping, readLockAfterDropping)
}
}
}
}

0 comments on commit 08d7467

Please sign in to comment.