From c226dd6b8798b1b0009aa09d02b64510bd3cb025 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Fri, 29 Nov 2019 16:42:13 +0800 Subject: [PATCH 1/7] [SPARK-30069][CORE][YARN] Clean up non-shuffle disk block manager files following executor exists on YARN --- .../org/apache/spark/storage/BlockId.scala | 1 + .../spark/storage/DiskBlockManager.scala | 71 ++++++++++++++++--- .../org/apache/spark/storage/DiskStore.scala | 8 +-- .../spark/storage/DiskBlockManagerSuite.scala | 15 ++++ .../apache/spark/storage/DiskStoreSuite.scala | 2 +- 5 files changed, 82 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 68ed3aa5b062f..3a059d1b95b9e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -40,6 +40,7 @@ sealed abstract class BlockId { def isRDD: Boolean = isInstanceOf[RDDBlockId] def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId] def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId] + def isTemp: Boolean = isInstanceOf[TempLocalBlockId] || isInstanceOf[TempShuffleBlockId] override def toString: String = name } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ee43b76e17010..d771997304989 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -47,18 +47,36 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } + def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf) + + /* Create container directories on YARN to persist the temporary files. + * (temp_local, temp_shuffle) + * These files have no opportunity to be cleaned before application end on YARN. + * This is a real issue, especially for long-lived Spark application like Spark thrift-server. + * So we persist these files in YARN container directories which could be cleaned by YARN when + * the container exists. */ + private[spark] val containerDirs: Array[File] = + if (containerDirEnabled) createContainerDirs(conf) else Array.empty[File] + private[spark] val localDirsString: Array[String] = localDirs.map(_.toString) // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + private val subContainerDirs = if (containerDirEnabled) { + Array.fill(containerDirs.length)(new Array[File](subDirsPerLocalDir)) + } else { + Array.empty[Array[File]] + } + private val shutdownHook = addShutdownHook() - /** Looks up a file by hashing it into one of our local subdirectories. */ + /** Looks up a file by hashing it into one of our local/container subdirectories. */ // This method should be kept in sync with // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). - def getFile(filename: String): File = { + def getFile(localDirs: Array[File], subDirs: Array[Array[File]], + subDirsPerLocalDir: Int, filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -82,17 +100,29 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea new File(subDir, filename) } - def getFile(blockId: BlockId): File = getFile(blockId.name) + /** + * Used only for testing. + */ + private[spark] def getFile(filename: String): File = + getFile(localDirs, subDirs, subDirsPerLocalDir, filename) + + def getFile(blockId: BlockId): File = { + if (containerDirEnabled && blockId.isTemp) { + getFile(containerDirs, subContainerDirs, subDirsPerLocalDir, blockId.name) + } else { + getFile(localDirs, subDirs, subDirsPerLocalDir, blockId.name) + } + } /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getFile(blockId.name).exists() + getFile(blockId).exists() } /** List all the files currently stored on disk by the disk manager. */ def getAllFiles(): Seq[File] = { // Get all the files inside the array of array of directories - subDirs.flatMap { dir => + (subDirs ++ subContainerDirs).flatMap { dir => dir.synchronized { // Copy the content of dir because it may be modified in other threads dir.clone() @@ -172,6 +202,27 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } } + /** + * Create container directories for storing block data in YARN mode. + * These directories are located inside configured local directories and + * will be deleted in the processing of container clean of YARN. + */ + private def createContainerDirs(conf: SparkConf): Array[File] = { + Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => + val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}" + try { + val containerDir = Utils.createDirectory(containerDirPath, "blockmgr") + logInfo(s"Created YARN container directory at $containerDir") + Some(containerDir) + } catch { + case e: IOException => + logError(s"Failed to create YARN container dir in $containerDirPath." + + s" Ignoring this directory.", e) + None + } + } + } + private def addShutdownHook(): AnyRef = { logDebug("Adding shutdown hook") // force eager creation of logger ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () => @@ -194,15 +245,15 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea private def doStop(): Unit = { if (deleteFilesOnStop) { - localDirs.foreach { localDir => - if (localDir.isDirectory() && localDir.exists()) { + (localDirs ++ containerDirs).foreach { dir => + if (dir.isDirectory() && dir.exists()) { try { - if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) { - Utils.deleteRecursively(localDir) + if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(dir)) { + Utils.deleteRecursively(dir) } } catch { case e: Exception => - logError(s"Exception while deleting local spark dir: $localDir", e) + logError(s"Exception while deleting local spark dir: $dir", e) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index fbda4912e15ad..c6fb9af72a796 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -96,7 +96,7 @@ private[spark] class DiskStore( } def getBytes(blockId: BlockId): BlockData = { - getBytes(diskManager.getFile(blockId.name), getSize(blockId)) + getBytes(diskManager.getFile(blockId), getSize(blockId)) } def getBytes(f: File, blockSize: Long): BlockData = securityManager.getIOEncryptionKey() match { @@ -111,7 +111,7 @@ private[spark] class DiskStore( def remove(blockId: BlockId): Boolean = { blockSizes.remove(blockId) - val file = diskManager.getFile(blockId.name) + val file = diskManager.getFile(blockId) if (file.exists()) { val ret = file.delete() if (!ret) { @@ -129,12 +129,12 @@ private[spark] class DiskStore( */ def moveFileToBlock(sourceFile: File, blockSize: Long, targetBlockId: BlockId): Unit = { blockSizes.put(targetBlockId, blockSize) - val targetFile = diskManager.getFile(targetBlockId.name) + val targetFile = diskManager.getFile(targetBlockId) FileUtils.moveFile(sourceFile, targetFile) } def contains(blockId: BlockId): Boolean = { - val file = diskManager.getFile(blockId.name) + val file = diskManager.getFile(blockId) file.exists() } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index ccc525e854838..12297ec67e015 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.io.{File, FileWriter} +import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkFunSuite} @@ -129,6 +130,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B rootDir0.setExecutable(true) rootDir1.setExecutable(true) } + } + test("test write temp file into container dir") { + val conf = spy(testConf.clone) + val containerId = "container_e1987_1564558112805_31178_01_000131" + conf.set("spark.local.dir", rootDirs).set("spark.diskStore.subDirectories", "1") + when(conf.getenv("CONTAINER_ID")).thenReturn(containerId) + when(conf.getenv("LOCAL_DIRS")).thenReturn(System.getProperty("java.io.tmpdir")) + val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) + val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2 + val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2 + assert(tempShuffleFile1.exists(), "There are no bad disks, so temp shuffle file exists") + assert(tempShuffleFile1.getAbsolutePath.contains(containerId)) + assert(tempLocalFile1.exists(), "There are no bad disks, so temp local file exists") + assert(tempLocalFile1.getAbsolutePath.contains(containerId)) } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 97b9c973e97f2..75eb5fa343267 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -150,7 +150,7 @@ class DiskStoreSuite extends SparkFunSuite { assert(diskStore.getSize(blockId) === testData.length) - val diskData = Files.toByteArray(diskBlockManager.getFile(blockId.name)) + val diskData = Files.toByteArray(diskBlockManager.getFile(blockId)) assert(!Arrays.equals(testData, diskData)) val blockData = diskStore.getBytes(blockId) From cbc2d29d81a71f58257d7486599b4101f5b7c886 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Fri, 7 Aug 2020 14:46:31 +0800 Subject: [PATCH 2/7] fix ut --- .../org/apache/spark/storage/DiskBlockManagerSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 3530ff76d1cdb..c0cebb1c18837 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -92,7 +92,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B writer.close() } - test("test write temp file into container dir") { + test("SPARK-30069: test write temp file into container dir") { val conf = spy(testConf.clone) val containerId = "container_e1987_1564558112805_31178_01_000131" conf.set("spark.local.dir", rootDirs).set("spark.diskStore.subDirectories", "1") @@ -101,9 +101,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2 val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2 - assert(tempShuffleFile1.exists(), "There are no bad disks, so temp shuffle file exists") assert(tempShuffleFile1.getAbsolutePath.contains(containerId)) - assert(tempLocalFile1.exists(), "There are no bad disks, so temp local file exists") assert(tempLocalFile1.getAbsolutePath.contains(containerId)) } } From 612a6f1b265b4315a1d5d49264a20fa974c07151 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 10 Aug 2020 10:05:18 +0800 Subject: [PATCH 3/7] address comments --- .../apache/spark/storage/DiskBlockManager.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index e7a1dbde2ddf7..4d1656affe1d1 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -46,7 +46,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } - def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf) + private def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf) /* Create container directories on YARN to persist the temporary files. * (temp_local, temp_shuffle) @@ -71,10 +71,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea private val shutdownHook = addShutdownHook() - /** Looks up a file by hashing it into one of our local/container subdirectories. */ - // This method should be kept in sync with - // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). - def getFile(localDirs: Array[File], subDirs: Array[Array[File]], + private def getFile(localDirs: Array[File], subDirs: Array[Array[File]], subDirsPerLocalDir: Int, filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) @@ -99,10 +96,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea new File(subDir, filename) } - /** - * Used only for testing. - */ - private[spark] def getFile(filename: String): File = + /** Looks up a file by hashing it into one of our local/container subdirectories. */ + // This method should be kept in sync with + // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). + def getFile(filename: String): File = getFile(localDirs, subDirs, subDirsPerLocalDir, filename) def getFile(blockId: BlockId): File = { @@ -115,7 +112,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getFile(blockId).exists() + getFile(blockId.name).exists() } /** List all the files currently stored on disk by the disk manager. */ From 0e9c00429b0d32265a63797b1ea4cac56c296a70 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 10 Aug 2020 10:20:30 +0800 Subject: [PATCH 4/7] revert previous commit and add more comment --- .../apache/spark/storage/DiskBlockManager.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 4d1656affe1d1..669071d640f2a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -71,6 +71,8 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea private val shutdownHook = addShutdownHook() + // This method should be kept in sync with + // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). private def getFile(localDirs: Array[File], subDirs: Array[Array[File]], subDirsPerLocalDir: Int, filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that @@ -96,12 +98,14 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea new File(subDir, filename) } - /** Looks up a file by hashing it into one of our local/container subdirectories. */ - // This method should be kept in sync with - // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). - def getFile(filename: String): File = + /** + * Used only for testing. + * We should invoke getFile(blockId: BlockId) in production code. + */ + private[spark] def getFile(filename: String): File = getFile(localDirs, subDirs, subDirsPerLocalDir, filename) + /** Looks up a file by hashing it into one of our local/container subdirectories. */ def getFile(blockId: BlockId): File = { if (containerDirEnabled && blockId.isTemp) { getFile(containerDirs, subContainerDirs, subDirsPerLocalDir, blockId.name) @@ -112,7 +116,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getFile(blockId.name).exists() + getFile(blockId).exists() } /** List all the files currently stored on disk by the disk manager. */ From 39f27eb8c693ee2e98a2b71f71005e24698f8118 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 10 Aug 2020 10:44:37 +0800 Subject: [PATCH 5/7] remove def getFile(filename: String): File --- core/src/main/scala/org/apache/spark/storage/BlockId.scala | 6 ++++++ .../scala/org/apache/spark/storage/DiskBlockManager.scala | 7 ------- .../org/apache/spark/storage/DiskBlockManagerSuite.scala | 3 ++- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 7f75eed0932e8..1d3e2a851616a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -112,6 +112,12 @@ private[spark] case class TestBlockId(id: String) extends BlockId { override def name: String = "test_" + id } +// Intended only for testing purposes +// An UnmanagedFile isn't treated as a BlockId, we don't match it in BlockId.apply() +private[spark] case class UnmanagedFile(id: String) extends BlockId { + override def name: String = "unmanaged_" + id +} + @DeveloperApi class UnrecognizedBlockId(name: String) extends SparkException(s"Failed to parse $name into a block ID") diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 669071d640f2a..0aac9beb50cc7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -98,13 +98,6 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea new File(subDir, filename) } - /** - * Used only for testing. - * We should invoke getFile(blockId: BlockId) in production code. - */ - private[spark] def getFile(filename: String): File = - getFile(localDirs, subDirs, subDirsPerLocalDir, filename) - /** Looks up a file by hashing it into one of our local/container subdirectories. */ def getFile(blockId: BlockId): File = { if (containerDirEnabled && blockId.isTemp) { diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index c0cebb1c18837..a8c52c8bcd5e4 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -81,7 +81,8 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B } test("SPARK-22227: non-block files are skipped") { - val file = diskBlockManager.getFile("unmanaged_file") + val unmanagedFile = new UnmanagedFile("file") + val file = diskBlockManager.getFile(unmanagedFile) writeToFile(file, 10) assert(diskBlockManager.getAllBlocks().isEmpty) } From 21e6f609c4f66861c9b2e87d178fe160811dced5 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Fri, 14 Aug 2020 14:39:18 +0800 Subject: [PATCH 6/7] small refactor to reduce the yarn specific logic --- .../spark/storage/DiskBlockManager.scala | 95 ++++++++----------- .../apache/spark/storage/StorageUtils.scala | 41 ++++++++ 2 files changed, 82 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 0aac9beb50cc7..249404c6af34e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{File, IOException} +import java.io.File import java.nio.file.Files import java.util.UUID @@ -40,35 +40,34 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid * having really large inodes at the top level. */ - private[spark] val localDirs: Array[File] = createLocalDirs(conf) + private[spark] val localDirs: Array[File] = StorageUtils.createLocalDirs(conf) if (localDirs.isEmpty) { logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } - private def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf) - - /* Create container directories on YARN to persist the temporary files. - * (temp_local, temp_shuffle) - * These files have no opportunity to be cleaned before application end on YARN. - * This is a real issue, especially for long-lived Spark application like Spark thrift-server. - * So we persist these files in YARN container directories which could be cleaned by YARN when - * the container exists. */ - private[spark] val containerDirs: Array[File] = - if (containerDirEnabled) createContainerDirs(conf) else Array.empty[File] - private[spark] val localDirsString: Array[String] = localDirs.map(_.toString) // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) - private val subContainerDirs = if (containerDirEnabled) { - Array.fill(containerDirs.length)(new Array[File](subDirsPerLocalDir)) - } else { - Array.empty[Array[File]] + /* Directories persist the temporary files (temp_local, temp_shuffle). + * We separate the storage directories of temp block from non-temp block since + * the cleaning process for temp block may be different between deploy modes. + * For example, these files have no opportunity to be cleaned before application end on YARN. + * This is a real issue, especially for long-lived Spark application like Spark thrift-server. + * So for Yarn mode, we persist these files in YARN container directories which could be + * cleaned by YARN when the container exists. */ + private val tempDirs: Array[File] = createTempDirs(conf) + if (tempDirs.isEmpty) { + logError("Failed to create any temp dir.") + System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } + // Similar with subDirs, tempSubDirs are used only for temp block. + private val tempSubDirs = createTempSubDirs(conf) + private val shutdownHook = addShutdownHook() // This method should be kept in sync with @@ -98,10 +97,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea new File(subDir, filename) } - /** Looks up a file by hashing it into one of our local/container subdirectories. */ + /** Looks up a file by hashing it into one of our local/temp subdirectories. */ def getFile(blockId: BlockId): File = { - if (containerDirEnabled && blockId.isTemp) { - getFile(containerDirs, subContainerDirs, subDirsPerLocalDir, blockId.name) + if (blockId.isTemp) { + getFile(tempDirs, tempSubDirs, subDirsPerLocalDir, blockId.name) } else { getFile(localDirs, subDirs, subDirsPerLocalDir, blockId.name) } @@ -115,7 +114,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** List all the files currently stored on disk by the disk manager. */ def getAllFiles(): Seq[File] = { // Get all the files inside the array of array of directories - (subDirs ++ subContainerDirs).flatMap { dir => + // compare their references are same + val allSubDirs = if (subDirs eq tempSubDirs) subDirs else subDirs ++ tempSubDirs + allSubDirs.flatMap { dir => dir.synchronized { // Copy the content of dir because it may be modified in other threads dir.clone() @@ -159,42 +160,26 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } /** - * Create local directories for storing block data. These directories are - * located inside configured local directories and won't - * be deleted on JVM exit when using the external shuffle service. + * Create local temp directories for storing temp block data. These directories are + * located inside configured local directories. If executors are running in Yarn, + * these directories will be deleted on the Yarn container exit. Or store them in localDirs, + * if that they won't be deleted on JVM exit when using the external shuffle service. */ - private def createLocalDirs(conf: SparkConf): Array[File] = { - Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => - try { - val localDir = Utils.createDirectory(rootDir, "blockmgr") - logInfo(s"Created local directory at $localDir") - Some(localDir) - } catch { - case e: IOException => - logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) - None - } + private def createTempDirs(conf: SparkConf): Array[File] = { + if (Utils.isRunningInYarnContainer(conf)) { + StorageUtils.createContainerDirs(conf) + } else { + // To be compatible with current implementation, store temp block in localDirs + localDirs } } - /** - * Create container directories for storing block data in YARN mode. - * These directories are located inside configured local directories and - * will be deleted in the processing of container clean of YARN. - */ - private def createContainerDirs(conf: SparkConf): Array[File] = { - Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => - val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}" - try { - val containerDir = Utils.createDirectory(containerDirPath, "blockmgr") - logInfo(s"Created YARN container directory at $containerDir") - Some(containerDir) - } catch { - case e: IOException => - logError(s"Failed to create YARN container dir in $containerDirPath." + - s" Ignoring this directory.", e) - None - } + private def createTempSubDirs(conf: SparkConf): Array[Array[File]] = { + if (Utils.isRunningInYarnContainer(conf)) { + Array.fill(tempDirs.length)(new Array[File](subDirsPerLocalDir)) + } else { + // To be compatible with current implementation, store temp block in subDirsDirs + subDirs } } @@ -220,7 +205,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea private def doStop(): Unit = { if (deleteFilesOnStop) { - (localDirs ++ containerDirs).foreach { dir => + // compare their references are same + val toDelete = if (localDirs eq tempDirs) localDirs else localDirs ++ tempDirs + toDelete.foreach { dir => if (dir.isDirectory() && dir.exists()) { try { if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(dir)) { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index fc426eee608c0..18fc974e87b3d 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.storage +import java.io.{File, IOException} import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.Map @@ -253,4 +254,44 @@ private[spark] object StorageUtils extends Logging { tmpPort } } + + /** + * Create local directories for storing block data. These directories are + * located inside configured local directories and won't + * be deleted on JVM exit when using the external shuffle service. + */ + def createLocalDirs(conf: SparkConf): Array[File] = { + Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => + try { + val localDir = Utils.createDirectory(rootDir, "blockmgr") + logInfo(s"Created local directory at $localDir") + Some(localDir) + } catch { + case e: IOException => + logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) + None + } + } + } + + /** + * Create container directories for storing block data in YARN mode. + * These directories are located inside configured local directories and + * will be deleted in the processing of container clean of YARN. + */ + def createContainerDirs(conf: SparkConf): Array[File] = { + Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => + val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}" + try { + val containerDir = Utils.createDirectory(containerDirPath, "blockmgr") + logInfo(s"Created YARN container directory at $containerDir") + Some(containerDir) + } catch { + case e: IOException => + logError(s"Failed to create YARN container dir in $containerDirPath." + + s" Ignoring this directory.", e) + None + } + } + } } From 3520e339e2a9a8daf256ab1396fef9cf85a16354 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Fri, 14 Aug 2020 14:59:03 +0800 Subject: [PATCH 7/7] small refactor --- .../spark/storage/DiskBlockManager.scala | 31 ++-------- .../apache/spark/storage/StorageUtils.scala | 61 ++++++++++++++----- 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 249404c6af34e..e9e2ae25b325f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -50,7 +50,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content // of subDirs(i) is protected by the lock of subDirs(i) - private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + private val subDirs = StorageUtils.createSubDirs(conf, parent = localDirs) /* Directories persist the temporary files (temp_local, temp_shuffle). * We separate the storage directories of temp block from non-temp block since @@ -59,14 +59,15 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea * This is a real issue, especially for long-lived Spark application like Spark thrift-server. * So for Yarn mode, we persist these files in YARN container directories which could be * cleaned by YARN when the container exists. */ - private val tempDirs: Array[File] = createTempDirs(conf) + private val tempDirs: Array[File] = StorageUtils.createTempDirs(conf, replacement = localDirs) if (tempDirs.isEmpty) { logError("Failed to create any temp dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } // Similar with subDirs, tempSubDirs are used only for temp block. - private val tempSubDirs = createTempSubDirs(conf) + private val tempSubDirs = + StorageUtils.createTempSubDirs(conf, parent = tempDirs, replacement = subDirs) private val shutdownHook = addShutdownHook() @@ -159,30 +160,6 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea (blockId, getFile(blockId)) } - /** - * Create local temp directories for storing temp block data. These directories are - * located inside configured local directories. If executors are running in Yarn, - * these directories will be deleted on the Yarn container exit. Or store them in localDirs, - * if that they won't be deleted on JVM exit when using the external shuffle service. - */ - private def createTempDirs(conf: SparkConf): Array[File] = { - if (Utils.isRunningInYarnContainer(conf)) { - StorageUtils.createContainerDirs(conf) - } else { - // To be compatible with current implementation, store temp block in localDirs - localDirs - } - } - - private def createTempSubDirs(conf: SparkConf): Array[Array[File]] = { - if (Utils.isRunningInYarnContainer(conf)) { - Array.fill(tempDirs.length)(new Array[File](subDirsPerLocalDir)) - } else { - // To be compatible with current implementation, store temp block in subDirsDirs - subDirs - } - } - private def addShutdownHook(): AnyRef = { logDebug("Adding shutdown hook") // force eager creation of logger ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () => diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 18fc974e87b3d..eb68ed9ce8d90 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -255,6 +255,27 @@ private[spark] object StorageUtils extends Logging { } } + /** + * Create container directories for storing block data in YARN mode. + * These directories are located inside configured local directories and + * will be deleted in the processing of container clean of YARN. + */ + def createContainerDirs(conf: SparkConf): Array[File] = { + Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => + val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}" + try { + val containerDir = Utils.createDirectory(containerDirPath, "blockmgr") + logInfo(s"Created YARN container directory at $containerDir") + Some(containerDir) + } catch { + case e: IOException => + logError(s"Failed to create YARN container dir in $containerDirPath." + + s" Ignoring this directory.", e) + None + } + } + } + /** * Create local directories for storing block data. These directories are * located inside configured local directories and won't @@ -274,24 +295,32 @@ private[spark] object StorageUtils extends Logging { } } + def createSubDirs(conf: SparkConf, parent: Array[File]): Array[Array[File]] = { + Array.fill(parent.length)(new Array[File](conf.get(config.DISKSTORE_SUB_DIRECTORIES))) + } + /** - * Create container directories for storing block data in YARN mode. - * These directories are located inside configured local directories and - * will be deleted in the processing of container clean of YARN. + * Create local temp directories for storing temp block data. These directories are + * located inside configured local directories. If executors are running in Yarn, + * these directories will be deleted on the Yarn container exit. Or store them in localDirs, + * if that they won't be deleted on JVM exit when using the external shuffle service. */ - def createContainerDirs(conf: SparkConf): Array[File] = { - Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => - val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}" - try { - val containerDir = Utils.createDirectory(containerDirPath, "blockmgr") - logInfo(s"Created YARN container directory at $containerDir") - Some(containerDir) - } catch { - case e: IOException => - logError(s"Failed to create YARN container dir in $containerDirPath." + - s" Ignoring this directory.", e) - None - } + def createTempDirs(conf: SparkConf, replacement: Array[File]): Array[File] = { + if (Utils.isRunningInYarnContainer(conf)) { + createContainerDirs(conf) + } else { + // To be compatible with current implementation + replacement + } + } + + def createTempSubDirs(conf: SparkConf, parent: Array[File], + replacement: Array[Array[File]]): Array[Array[File]] = { + if (Utils.isRunningInYarnContainer(conf)) { + Array.fill(parent.length)(new Array[File](conf.get(config.DISKSTORE_SUB_DIRECTORIES))) + } else { + // To be compatible with current implementation + replacement } } }