Skip to content

Commit

Permalink
small refactor to reduce the yarn specific logic
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin committed Aug 14, 2020
1 parent 39f27eb commit 21e6f60
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 54 deletions.
95 changes: 41 additions & 54 deletions core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.{File, IOException}
import java.io.File
import java.nio.file.Files
import java.util.UUID

Expand All @@ -40,35 +40,34 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
private[spark] val localDirs: Array[File] = createLocalDirs(conf)
private[spark] val localDirs: Array[File] = StorageUtils.createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}

private def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf)

/* Create container directories on YARN to persist the temporary files.
* (temp_local, temp_shuffle)
* These files have no opportunity to be cleaned before application end on YARN.
* This is a real issue, especially for long-lived Spark application like Spark thrift-server.
* So we persist these files in YARN container directories which could be cleaned by YARN when
* the container exists. */
private[spark] val containerDirs: Array[File] =
if (containerDirEnabled) createContainerDirs(conf) else Array.empty[File]

private[spark] val localDirsString: Array[String] = localDirs.map(_.toString)

// The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

private val subContainerDirs = if (containerDirEnabled) {
Array.fill(containerDirs.length)(new Array[File](subDirsPerLocalDir))
} else {
Array.empty[Array[File]]
/* Directories persist the temporary files (temp_local, temp_shuffle).
* We separate the storage directories of temp block from non-temp block since
* the cleaning process for temp block may be different between deploy modes.
* For example, these files have no opportunity to be cleaned before application end on YARN.
* This is a real issue, especially for long-lived Spark application like Spark thrift-server.
* So for Yarn mode, we persist these files in YARN container directories which could be
* cleaned by YARN when the container exists. */
private val tempDirs: Array[File] = createTempDirs(conf)
if (tempDirs.isEmpty) {
logError("Failed to create any temp dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}

// Similar with subDirs, tempSubDirs are used only for temp block.
private val tempSubDirs = createTempSubDirs(conf)

private val shutdownHook = addShutdownHook()

// This method should be kept in sync with
Expand Down Expand Up @@ -98,10 +97,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
new File(subDir, filename)
}

/** Looks up a file by hashing it into one of our local/container subdirectories. */
/** Looks up a file by hashing it into one of our local/temp subdirectories. */
def getFile(blockId: BlockId): File = {
if (containerDirEnabled && blockId.isTemp) {
getFile(containerDirs, subContainerDirs, subDirsPerLocalDir, blockId.name)
if (blockId.isTemp) {
getFile(tempDirs, tempSubDirs, subDirsPerLocalDir, blockId.name)
} else {
getFile(localDirs, subDirs, subDirsPerLocalDir, blockId.name)
}
Expand All @@ -115,7 +114,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
/** List all the files currently stored on disk by the disk manager. */
def getAllFiles(): Seq[File] = {
// Get all the files inside the array of array of directories
(subDirs ++ subContainerDirs).flatMap { dir =>
// compare their references are same
val allSubDirs = if (subDirs eq tempSubDirs) subDirs else subDirs ++ tempSubDirs
allSubDirs.flatMap { dir =>
dir.synchronized {
// Copy the content of dir because it may be modified in other threads
dir.clone()
Expand Down Expand Up @@ -159,42 +160,26 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
}

/**
* Create local directories for storing block data. These directories are
* located inside configured local directories and won't
* be deleted on JVM exit when using the external shuffle service.
* Create local temp directories for storing temp block data. These directories are
* located inside configured local directories. If executors are running in Yarn,
* these directories will be deleted on the Yarn container exit. Or store them in localDirs,
* if that they won't be deleted on JVM exit when using the external shuffle service.
*/
private def createLocalDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
try {
val localDir = Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
Some(localDir)
} catch {
case e: IOException =>
logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
None
}
private def createTempDirs(conf: SparkConf): Array[File] = {
if (Utils.isRunningInYarnContainer(conf)) {
StorageUtils.createContainerDirs(conf)
} else {
// To be compatible with current implementation, store temp block in localDirs
localDirs
}
}

/**
* Create container directories for storing block data in YARN mode.
* These directories are located inside configured local directories and
* will be deleted in the processing of container clean of YARN.
*/
private def createContainerDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}"
try {
val containerDir = Utils.createDirectory(containerDirPath, "blockmgr")
logInfo(s"Created YARN container directory at $containerDir")
Some(containerDir)
} catch {
case e: IOException =>
logError(s"Failed to create YARN container dir in $containerDirPath." +
s" Ignoring this directory.", e)
None
}
private def createTempSubDirs(conf: SparkConf): Array[Array[File]] = {
if (Utils.isRunningInYarnContainer(conf)) {
Array.fill(tempDirs.length)(new Array[File](subDirsPerLocalDir))
} else {
// To be compatible with current implementation, store temp block in subDirsDirs
subDirs
}
}

Expand All @@ -220,7 +205,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

private def doStop(): Unit = {
if (deleteFilesOnStop) {
(localDirs ++ containerDirs).foreach { dir =>
// compare their references are same
val toDelete = if (localDirs eq tempDirs) localDirs else localDirs ++ tempDirs
toDelete.foreach { dir =>
if (dir.isDirectory() && dir.exists()) {
try {
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(dir)) {
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.storage

import java.io.{File, IOException}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.Map
Expand Down Expand Up @@ -253,4 +254,44 @@ private[spark] object StorageUtils extends Logging {
tmpPort
}
}

/**
* Create local directories for storing block data. These directories are
* located inside configured local directories and won't
* be deleted on JVM exit when using the external shuffle service.
*/
def createLocalDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
try {
val localDir = Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
Some(localDir)
} catch {
case e: IOException =>
logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
None
}
}
}

/**
* Create container directories for storing block data in YARN mode.
* These directories are located inside configured local directories and
* will be deleted in the processing of container clean of YARN.
*/
def createContainerDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}"
try {
val containerDir = Utils.createDirectory(containerDirPath, "blockmgr")
logInfo(s"Created YARN container directory at $containerDir")
Some(containerDir)
} catch {
case e: IOException =>
logError(s"Failed to create YARN container dir in $containerDirPath." +
s" Ignoring this directory.", e)
None
}
}
}
}

0 comments on commit 21e6f60

Please sign in to comment.