Skip to content

Commit

Permalink
Merge pull request apache#107 from shivaram/master
Browse files Browse the repository at this point in the history
[SPARKR-136] Use SPARK_LOCAL_DIRS to create tmp files
  • Loading branch information
concretevitamin committed Nov 15, 2014
2 parents 08e24c3 + 7952180 commit e4217dd
Showing 1 changed file with 71 additions and 3 deletions.
74 changes: 71 additions & 3 deletions pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,14 @@ object RRDD {
numPartitions: Int,
splitIndex: Int) : File = {

val tempDir =
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
val env = SparkEnv.get
val conf = env.conf
val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("rSpark", "out", new File(tempDir))
val tempFileIn = File.createTempFile("rSpark", "in", new File(tempDir))

val tempFileName = tempFile.getAbsolutePath()
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val env = SparkEnv.get

// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for R") {
Expand Down Expand Up @@ -326,4 +326,72 @@ object RRDD {

tempFile
}

def isRunningInYarnContainer(conf: SparkConf): Boolean = {
// These environment variables are set by YARN.
// For Hadoop 0.23.X, we check for YARN_LOCAL_DIRS (we use this below in getYarnLocalDirs())
// For Hadoop 2.X, we check for CONTAINER_ID.
System.getenv("CONTAINER_ID") != null || System.getenv("YARN_LOCAL_DIRS") != null
}

/**
* Get the path of a temporary directory. Spark's local directories can be configured through
* multiple settings, which are used with the following precedence:
*
* - If called from inside of a YARN container, this will return a directory chosen by YARN.
* - If the SPARK_LOCAL_DIRS environment variable is set, this will return a directory from it.
* - Otherwise, if the spark.local.dir is set, this will return a directory from it.
* - Otherwise, this will return java.io.tmpdir.
*
* Some of these configuration options might be lists of multiple paths, but this method will
* always return a single directory.
*/
def getLocalDir(conf: SparkConf): String = {
getOrCreateLocalRootDirs(conf)(0)
}

/**
* Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS,
* and returns only the directories that exist / could be created.
*
* If no directories could be created, this will return an empty list.
*/
def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
val confValue = if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available.
getYarnLocalDirs(conf)
} else {
Option(System.getenv("SPARK_LOCAL_DIRS")).getOrElse(
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
}
val rootDirs = confValue.split(',')

rootDirs.flatMap { rootDir =>
val localDir: File = new File(rootDir)
val foundLocalDir = localDir.exists || localDir.mkdirs()
if (!foundLocalDir) {
None
} else {
Some(rootDir)
}
}
}

/** Get the Yarn approved local directories. */
def getYarnLocalDirs(conf: SparkConf): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
.getOrElse(""))

if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
}
localDirs
}


}

0 comments on commit e4217dd

Please sign in to comment.