diff --git a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala index 00fce5b57cf1f..ccebb6281e751 100644 --- a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala +++ b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala @@ -250,14 +250,14 @@ object RRDD { numPartitions: Int, splitIndex: Int) : File = { - val tempDir = - System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0) + val env = SparkEnv.get + val conf = env.conf + val tempDir = getLocalDir(conf) val tempFile = File.createTempFile("rSpark", "out", new File(tempDir)) val tempFileIn = File.createTempFile("rSpark", "in", new File(tempDir)) val tempFileName = tempFile.getAbsolutePath() val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt - val env = SparkEnv.get // Start a thread to feed the process input from our parent's iterator new Thread("stdin writer for R") { @@ -326,4 +326,72 @@ object RRDD { tempFile } + + def isRunningInYarnContainer(conf: SparkConf): Boolean = { + // These environment variables are set by YARN. + // For Hadoop 0.23.X, we check for YARN_LOCAL_DIRS (we use this below in getYarnLocalDirs()) + // For Hadoop 2.X, we check for CONTAINER_ID. + System.getenv("CONTAINER_ID") != null || System.getenv("YARN_LOCAL_DIRS") != null + } + + /** + * Get the path of a temporary directory. Spark's local directories can be configured through + * multiple settings, which are used with the following precedence: + * + * - If called from inside of a YARN container, this will return a directory chosen by YARN. + * - If the SPARK_LOCAL_DIRS environment variable is set, this will return a directory from it. + * - Otherwise, if the spark.local.dir is set, this will return a directory from it. + * - Otherwise, this will return java.io.tmpdir. + * + * Some of these configuration options might be lists of multiple paths, but this method will + * always return a single directory. + */ + def getLocalDir(conf: SparkConf): String = { + getOrCreateLocalRootDirs(conf)(0) + } + + /** + * Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS, + * and returns only the directories that exist / could be created. + * + * If no directories could be created, this will return an empty list. + */ + def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { + val confValue = if (isRunningInYarnContainer(conf)) { + // If we are in yarn mode, systems can have different disk layouts so we must set it + // to what Yarn on this system said was available. + getYarnLocalDirs(conf) + } else { + Option(System.getenv("SPARK_LOCAL_DIRS")).getOrElse( + conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) + } + val rootDirs = confValue.split(',') + + rootDirs.flatMap { rootDir => + val localDir: File = new File(rootDir) + val foundLocalDir = localDir.exists || localDir.mkdirs() + if (!foundLocalDir) { + None + } else { + Some(rootDir) + } + } + } + + /** Get the Yarn approved local directories. */ + def getYarnLocalDirs(conf: SparkConf): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .getOrElse(Option(System.getenv("LOCAL_DIRS")) + .getOrElse("")) + + if (localDirs.isEmpty) { + throw new Exception("Yarn Local dirs can't be empty") + } + localDirs + } + + }