Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15736][CORE] Gracefully handle loss of DiskStore files #13473

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,17 @@ private[spark] class BlockManager(
locations
}

/**
* Cleanup code run in response to a failed local read.
* Must be called while holding a read lock on the block.
*/
private def handleLocalReadFailure(blockId: BlockId): Nothing = {
releaseLock(blockId)
// Remove the missing block so that its unavailability is reported to the driver
removeBlock(blockId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called before the releaseLock() call ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think so: internally, removeBlock acquires a write lock on the block, so if we called it before the releaseLock call then we'd be calling it while holding a read lock which would cause us to deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at BlockInfoManager#lockForWriting(), I think you're right.

throw new SparkException(s"Block $blockId was not found even though it's read-locked")
}

/**
* Get block from local block manager as an iterator of Java objects.
*/
Expand Down Expand Up @@ -441,8 +452,7 @@ private[spark] class BlockManager(
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
handleLocalReadFailure(blockId)
}
}
}
Expand Down Expand Up @@ -489,8 +499,7 @@ private[spark] class BlockManager(
// The block was not found on disk, so serialize an in-memory copy:
serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get)
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
handleLocalReadFailure(blockId)
}
} else { // storage level is serialized
if (level.useMemory && memoryStore.contains(blockId)) {
Expand All @@ -499,8 +508,7 @@ private[spark] class BlockManager(
val diskBytes = diskStore.getBytes(blockId)
maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
handleLocalReadFailure(blockId)
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/FailureSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.io.{IOException, NotSerializableException, ObjectInputStream}

import org.apache.spark.memory.TestMemoryConsumer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NonSerializable

// Common state shared by FailureSuite-launched tasks. We use a global object
Expand Down Expand Up @@ -241,6 +242,17 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
FailureSuiteState.clear()
}

test("failure because cached RDD partitions are missing from DiskStore (SPARK-15736)") {
sc = new SparkContext("local[1,2]", "test")
val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY)
rdd.count()
// Directly delete all files from the disk store, triggering failures when reading cached data:
SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete())
// Each task should fail once due to missing cached data, but then should succeed on its second
// attempt because the missing cache locations will be purged and the blocks will be recomputed.
rdd.count()
}

// TODO: Need to add tests with shuffle fetch failures.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getSingle("a3").isDefined, "a3 was not in store")
}

private def testReadWithLossOfOnDiskFiles(
storageLevel: StorageLevel,
readMethod: BlockManager => Option[_]): Unit = {
store = makeBlockManager(12000)
assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel))
assert(store.getStatus("blockId").isDefined)
// Directly delete all files from the disk store, triggering failures when reading blocks:
store.diskBlockManager.getAllFiles().foreach(_.delete())
// The BlockManager still thinks that these blocks exist:
assert(store.getStatus("blockId").isDefined)
// Because the BlockManager's metadata claims that the block exists (i.e. that it's present
// in at least one store), the read attempts to read it and fails when the on-disk file is
// missing.
intercept[SparkException] {
readMethod(store)
}
// Subsequent read attempts will succeed; the block isn't present but we return an expected
// "block not found" response rather than a fatal error:
assert(readMethod(store).isEmpty)
// The reason why this second read succeeded is because the metadata entry for the missing
// block was removed as a result of the read failure:
assert(store.getStatus("blockId").isEmpty)
}

test("remove block if a read fails due to missing DiskStore files (SPARK-15736)") {
val storageLevels = Seq(
StorageLevel(useDisk = true, useMemory = false, deserialized = false),
StorageLevel(useDisk = true, useMemory = false, deserialized = true))
val readMethods = Map[String, BlockManager => Option[_]](
"getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")),
"getLocalValues" -> ((m: BlockManager) => m.getLocalValues("blockId"))
)
testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, _.getLocalBytes("blockId"))
for ((readMethodName, readMethod) <- readMethods; storageLevel <- storageLevels) {
withClue(s"$readMethodName $storageLevel") {
testReadWithLossOfOnDiskFiles(storageLevel, readMethod)
}
}
}

test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") {
val mockBlockTransferService =
new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5))
Expand Down