Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30069][CORE][YARN] Clean up non-shuffle disk block manager files following executor exists on YARN #29378

Closed
wants to merge 8 commits into from
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ sealed abstract class BlockId {
isInstanceOf[ShuffleDataBlockId] || isInstanceOf[ShuffleIndexBlockId])
}
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
def isTemp: Boolean = isInstanceOf[TempLocalBlockId] || isInstanceOf[TempShuffleBlockId]

override def toString: String = name
}
Expand Down Expand Up @@ -111,6 +112,12 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
override def name: String = "test_" + id
}

// Intended only for testing purposes
// An UnmanagedFile isn't treated as a BlockId, we don't match it in BlockId.apply()
private[spark] case class UnmanagedFile(id: String) extends BlockId {
override def name: String = "unmanaged_" + id
}

@DeveloperApi
class UnrecognizedBlockId(name: String)
extends SparkException(s"Failed to parse $name into a block ID")
Expand Down
73 changes: 41 additions & 32 deletions core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.{File, IOException}
import java.io.File
import java.nio.file.Files
import java.util.UUID

Expand All @@ -40,7 +40,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
private[spark] val localDirs: Array[File] = createLocalDirs(conf)
private[spark] val localDirs: Array[File] = StorageUtils.createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
Expand All @@ -50,14 +50,31 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

// The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private val subDirs = StorageUtils.createSubDirs(conf, parent = localDirs)

/* Directories persist the temporary files (temp_local, temp_shuffle).
* We separate the storage directories of temp block from non-temp block since
* the cleaning process for temp block may be different between deploy modes.
* For example, these files have no opportunity to be cleaned before application end on YARN.
* This is a real issue, especially for long-lived Spark application like Spark thrift-server.
* So for Yarn mode, we persist these files in YARN container directories which could be
* cleaned by YARN when the container exists. */
private val tempDirs: Array[File] = StorageUtils.createTempDirs(conf, replacement = localDirs)
if (tempDirs.isEmpty) {
logError("Failed to create any temp dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}

// Similar with subDirs, tempSubDirs are used only for temp block.
private val tempSubDirs =
StorageUtils.createTempSubDirs(conf, parent = tempDirs, replacement = subDirs)

private val shutdownHook = addShutdownHook()

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile().
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin . This comment reminds me that we need to handle ExecutorDiskUtils properly. Do we need to update ExecutorDiskUtils.getFile? Or, do we need to remove this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I didn't change any behaviour for non-temp files. This ExecutorDiskUtils.getFile is handing the non-temp files such as start with "shuffle_", "rdd_". So it should still be kept in sync with DiskBlockManager.getFile.

Copy link
Contributor Author

@LantaoJin LantaoJin Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the method body is not changed. Just change the signature from def getFile(filename: String): File to private def getFile(localDirs: Array[File], subDirs: Array[Array[File]], subDirsPerLocalDir: Int, filename: String): File

Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this function need to have unused parameters? Maybe, I'm still confused with this frame change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  def getFile(blockId: BlockId): File = {
    if (containerDirEnabled && blockId.isTemp) {
      getFile(containerDirs, subContainerDirs, subDirsPerLocalDir, blockId.name)
    } else {
      getFile(localDirs, subDirs, subDirsPerLocalDir, blockId.name)
    }
  }

Now we just storage temp files and non-temp files to different root paths. The algorithm how to find a file doesn't change.

Copy link
Contributor Author

@LantaoJin LantaoJin Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this function need to have unused parameters?

Which parameters?

Copy link
Contributor Author

@LantaoJin LantaoJin Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this function need to have unused parameters?

Ah, you must mean the getFile in DiskBlockManager has 4 parameters and getFile in ExecutorDiskUtils has 3 parameters. ExecutorDiskUtils is a utility, so 3 parameters can define a file path structure. The parameter subDirs in DiskBlockManager only is a local variable which used in other parts. It does not impact the finding algorithm result. I can use 3 parameters in DiskBlockManager.getFile as well. But I need to use different array by a condition, Like:

private def getFile(localDirs: Array[File],  subDirsPerLocalDir: Int, filename: String): File = {
...
    val properSubDirs = if (someConditions) {
      subDirs
    } else {
      subContainerDirs
    }
    val old = properSubDirs(dirId)(subDirId)
...

But the someConditions is a little hard to determine in this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the getFile in DiskBlockManager has 4 parameters:

private def getFile(localDirs: Array[File], subDirs: Array[Array[File]],
      subDirsPerLocalDir: Int, filename: String): File

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to pay more attention on how parameters in getFile. Before this PR. There was only 1 parameter filename: String in DiskBlockManager and 3 parameters in ExecutorDiskUtils.

def getFile(filename: String): File = {
private def getFile(localDirs: Array[File], subDirs: Array[Array[File]],
subDirsPerLocalDir: Int, filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
Expand All @@ -81,17 +98,26 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
new File(subDir, filename)
}

def getFile(blockId: BlockId): File = getFile(blockId.name)
/** Looks up a file by hashing it into one of our local/temp subdirectories. */
def getFile(blockId: BlockId): File = {
if (blockId.isTemp) {
getFile(tempDirs, tempSubDirs, subDirsPerLocalDir, blockId.name)
} else {
getFile(localDirs, subDirs, subDirsPerLocalDir, blockId.name)
}
}

/** Check if disk block manager has a block. */
def containsBlock(blockId: BlockId): Boolean = {
getFile(blockId.name).exists()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a performance optimization? I guess we can leave the original style because def getFile(blockId: BlockId): File still exist.

Copy link
Contributor Author

@LantaoJin LantaoJin Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def getFile(blockId: BlockId) will check the blockId is temp block or not, if is a temp block, then storage to container directory.
def getFile(filename: String) just storage a block to local directory.

Looks like we can change def getFile(filename: String) to private since this PR changes all invoker outside class to def getFile(blockId: BlockId), such as:

-   val targetFile = diskManager.getFile(targetBlockId.name)
+   val targetFile = diskManager.getFile(targetBlockId)

Copy link
Contributor Author

@LantaoJin LantaoJin Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So def getFile(filename: String) is only used for testing now. So I added private[spark] on it. Actually, better to remove it but test "SPARK-22227: non-block files are skipped" still uses this interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the description for the interface changing part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I refactor the test case and remove def getFile(filename: String) now!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for explanation. It's a little confusing to me, but I'll take a look more in that way later.

getFile(blockId).exists()
}

/** List all the files currently stored on disk by the disk manager. */
def getAllFiles(): Seq[File] = {
// Get all the files inside the array of array of directories
subDirs.flatMap { dir =>
// compare their references are same
val allSubDirs = if (subDirs eq tempSubDirs) subDirs else subDirs ++ tempSubDirs
allSubDirs.flatMap { dir =>
dir.synchronized {
// Copy the content of dir because it may be modified in other threads
dir.clone()
Expand Down Expand Up @@ -134,25 +160,6 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
(blockId, getFile(blockId))
}

/**
* Create local directories for storing block data. These directories are
* located inside configured local directories and won't
* be deleted on JVM exit when using the external shuffle service.
*/
private def createLocalDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
try {
val localDir = Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
Some(localDir)
} catch {
case e: IOException =>
logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
None
}
}
}

private def addShutdownHook(): AnyRef = {
logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
Expand All @@ -175,15 +182,17 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

private def doStop(): Unit = {
if (deleteFilesOnStop) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
// compare their references are same
val toDelete = if (localDirs eq tempDirs) localDirs else localDirs ++ tempDirs
toDelete.foreach { dir =>
if (dir.isDirectory() && dir.exists()) {
try {
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
Utils.deleteRecursively(localDir)
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(dir)) {
Utils.deleteRecursively(dir)
}
} catch {
case e: Exception =>
logError(s"Exception while deleting local spark dir: $localDir", e)
logError(s"Exception while deleting local spark dir: $dir", e)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class DiskStore(
}

def getBytes(blockId: BlockId): BlockData = {
getBytes(diskManager.getFile(blockId.name), getSize(blockId))
getBytes(diskManager.getFile(blockId), getSize(blockId))
}

def getBytes(f: File, blockSize: Long): BlockData = securityManager.getIOEncryptionKey() match {
Expand All @@ -111,7 +111,7 @@ private[spark] class DiskStore(

def remove(blockId: BlockId): Boolean = {
blockSizes.remove(blockId)
val file = diskManager.getFile(blockId.name)
val file = diskManager.getFile(blockId)
if (file.exists()) {
val ret = file.delete()
if (!ret) {
Expand All @@ -129,12 +129,12 @@ private[spark] class DiskStore(
*/
def moveFileToBlock(sourceFile: File, blockSize: Long, targetBlockId: BlockId): Unit = {
blockSizes.put(targetBlockId, blockSize)
val targetFile = diskManager.getFile(targetBlockId.name)
val targetFile = diskManager.getFile(targetBlockId)
FileUtils.moveFile(sourceFile, targetFile)
}

def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
val file = diskManager.getFile(blockId)
file.exists()
}

Expand Down
70 changes: 70 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.storage

import java.io.{File, IOException}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.Map
Expand Down Expand Up @@ -253,4 +254,73 @@ private[spark] object StorageUtils extends Logging {
tmpPort
}
}

/**
* Create container directories for storing block data in YARN mode.
* These directories are located inside configured local directories and
* will be deleted in the processing of container clean of YARN.
*/
def createContainerDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}"
try {
val containerDir = Utils.createDirectory(containerDirPath, "blockmgr")
logInfo(s"Created YARN container directory at $containerDir")
Some(containerDir)
} catch {
case e: IOException =>
logError(s"Failed to create YARN container dir in $containerDirPath." +
s" Ignoring this directory.", e)
None
}
}
}

/**
* Create local directories for storing block data. These directories are
* located inside configured local directories and won't
* be deleted on JVM exit when using the external shuffle service.
*/
def createLocalDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
try {
val localDir = Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
Some(localDir)
} catch {
case e: IOException =>
logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
None
}
}
}

def createSubDirs(conf: SparkConf, parent: Array[File]): Array[Array[File]] = {
Array.fill(parent.length)(new Array[File](conf.get(config.DISKSTORE_SUB_DIRECTORIES)))
}

/**
* Create local temp directories for storing temp block data. These directories are
* located inside configured local directories. If executors are running in Yarn,
* these directories will be deleted on the Yarn container exit. Or store them in localDirs,
* if that they won't be deleted on JVM exit when using the external shuffle service.
*/
def createTempDirs(conf: SparkConf, replacement: Array[File]): Array[File] = {
if (Utils.isRunningInYarnContainer(conf)) {
createContainerDirs(conf)
} else {
// To be compatible with current implementation
replacement
}
}

def createTempSubDirs(conf: SparkConf, parent: Array[File],
replacement: Array[Array[File]]): Array[Array[File]] = {
if (Utils.isRunningInYarnContainer(conf)) {
Array.fill(parent.length)(new Array[File](conf.get(config.DISKSTORE_SUB_DIRECTORIES)))
} else {
// To be compatible with current implementation
replacement
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import java.io.{File, FileWriter}

import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand Down Expand Up @@ -80,7 +81,8 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
}

test("SPARK-22227: non-block files are skipped") {
val file = diskBlockManager.getFile("unmanaged_file")
val unmanagedFile = new UnmanagedFile("file")
val file = diskBlockManager.getFile(unmanagedFile)
writeToFile(file, 10)
assert(diskBlockManager.getAllBlocks().isEmpty)
}
Expand All @@ -90,4 +92,17 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
for (i <- 0 until numBytes) writer.write(i)
writer.close()
}

test("SPARK-30069: test write temp file into container dir") {
val conf = spy(testConf.clone)
val containerId = "container_e1987_1564558112805_31178_01_000131"
conf.set("spark.local.dir", rootDirs).set("spark.diskStore.subDirectories", "1")
when(conf.getenv("CONTAINER_ID")).thenReturn(containerId)
when(conf.getenv("LOCAL_DIRS")).thenReturn(System.getProperty("java.io.tmpdir"))
val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2
assert(tempShuffleFile1.getAbsolutePath.contains(containerId))
assert(tempLocalFile1.getAbsolutePath.contains(containerId))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class DiskStoreSuite extends SparkFunSuite {

assert(diskStore.getSize(blockId) === testData.length)

val diskData = Files.toByteArray(diskBlockManager.getFile(blockId.name))
val diskData = Files.toByteArray(diskBlockManager.getFile(blockId))
assert(!Arrays.equals(testData, diskData))

val blockData = diskStore.getBytes(blockId)
Expand Down