Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin committed Aug 10, 2020
1 parent cbc2d29 commit 612a6f1
Showing 1 changed file with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}

def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf)
private def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf)

/* Create container directories on YARN to persist the temporary files.
* (temp_local, temp_shuffle)
Expand All @@ -71,10 +71,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

private val shutdownHook = addShutdownHook()

/** Looks up a file by hashing it into one of our local/container subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile().
def getFile(localDirs: Array[File], subDirs: Array[Array[File]],
private def getFile(localDirs: Array[File], subDirs: Array[Array[File]],
subDirsPerLocalDir: Int, filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
Expand All @@ -99,10 +96,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
new File(subDir, filename)
}

/**
* Used only for testing.
*/
private[spark] def getFile(filename: String): File =
/** Looks up a file by hashing it into one of our local/container subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile().
def getFile(filename: String): File =
getFile(localDirs, subDirs, subDirsPerLocalDir, filename)

def getFile(blockId: BlockId): File = {
Expand All @@ -115,7 +112,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

/** Check if disk block manager has a block. */
def containsBlock(blockId: BlockId): Boolean = {
getFile(blockId).exists()
getFile(blockId.name).exists()
}

/** List all the files currently stored on disk by the disk manager. */
Expand Down

0 comments on commit 612a6f1

Please sign in to comment.