Skip to content

Commit

Permalink
[SPARK-2974] [SPARK-2975] Fix two bugs related to spark.local.dirs
Browse files Browse the repository at this point in the history
This PR fixes two bugs related to `spark.local.dirs` and `SPARK_LOCAL_DIRS`, one where `Utils.getLocalDir()` might return an invalid directory (SPARK-2974) and another where the `SPARK_LOCAL_DIRS` override didn't affect the driver, which could cause problems when running tasks in local mode (SPARK-2975).

This patch fixes both issues: the new `Utils.getOrCreateLocalRootDirs(conf: SparkConf)` utility method manages the creation of local directories and handles the precedence among the different configuration options, so we should see the same behavior whether we're running in local mode or on a worker.

It's kind of a pain to mock out environment variables in tests (no easy way to mock System.getenv), so I added a `private[spark]` method to SparkConf for accessing environment variables (by default, it just delegates to System.getenv).  By subclassing SparkConf and overriding this method, we can mock out SPARK_LOCAL_DIRS in tests.

I also fixed a typo in PySpark where we used `SPARK_LOCAL_DIR` instead of `SPARK_LOCAL_DIRS` (I think this was technically innocuous, but it seemed worth fixing).

Author: Josh Rosen <[email protected]>

Closes apache#2002 from JoshRosen/local-dirs and squashes the following commits:

efad8c6 [Josh Rosen] Address review comments:
1dec709 [Josh Rosen] Minor updates to Javadocs.
7f36999 [Josh Rosen] Use env vars to detect if running in YARN container.
399ac25 [Josh Rosen] Update getLocalDir() documentation.
bb3ad89 [Josh Rosen] Remove duplicated YARN getLocalDirs() code.
3e92d44 [Josh Rosen] Move local dirs override logic into Utils; fix bugs:
b2c4736 [Josh Rosen] Add failing tests for SPARK-2974 and SPARK-2975.
007298b [Josh Rosen] Allow environment variables to be mocked in tests.
6d9259b [Josh Rosen] Fix typo in PySpark: SPARK_LOCAL_DIR should be SPARK_LOCAL_DIRS
  • Loading branch information
JoshRosen authored and conviva-zz committed Sep 4, 2014
1 parent 8bdba3e commit ad76b3b
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 118 deletions.
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

private val settings = new HashMap[String, String]()
private[spark] val settings = new HashMap[String, String]()

if (loadDefaults) {
// Load any spark.* system properties
Expand Down Expand Up @@ -210,6 +210,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
new SparkConf(false).setAll(settings)
}

/**
* By using this instead of System.getenv(), environment variables can be mocked
* in unit tests.
*/
private[spark] def getenv(name: String): String = System.getenv(name)

/** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
* idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
private[spark] def validateSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] class PythonRDD(
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
envVars += ("SPARK_LOCAL_DIR" -> localdir) // it's also used in monitor thread
envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)

// Start a thread to feed the process input from our parent's iterator
Expand Down
25 changes: 0 additions & 25 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ private[spark] class Executor(
val conf = new SparkConf(true)
conf.setAll(properties)

// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
if (java.lang.Boolean.valueOf(
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
conf.set("spark.local.dir", getYarnLocalDirs())
} else if (sys.env.contains("SPARK_LOCAL_DIRS")) {
conf.set("spark.local.dir", sys.env("SPARK_LOCAL_DIRS"))
}

if (!isLocal) {
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
Expand Down Expand Up @@ -134,21 +124,6 @@ private[spark] class Executor(
threadPool.shutdown()
}

/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
.getOrElse(""))

if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
}
localDirs
}

class TaskRunner(
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
extends Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ private[spark] class BlockManager(

private val port = conf.getInt("spark.blockManager.port", 0)
val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
val connectionManager =
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random, UUID}

import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.PathResolver
import org.apache.spark.util.Utils
Expand All @@ -33,9 +33,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
* However, it is also possible to have a block map to only a segment of a file, by calling
* mapBlockToFileSegment().
*
* @param rootDirs The directories to use for storing block files. Data will be hashed among these.
* Block files are hashed among the directories listed in spark.local.dir (or in
* SPARK_LOCAL_DIRS, if it's set).
*/
private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String)
private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf)
extends PathResolver with Logging {

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
Expand All @@ -46,7 +47,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
val localDirs: Array[File] = createLocalDirs()
val localDirs: Array[File] = createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
Expand Down Expand Up @@ -130,10 +131,9 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
(blockId, getFile(blockId))
}

private def createLocalDirs(): Array[File] = {
logDebug(s"Creating local directories at root dirs '$rootDirs'")
private def createLocalDirs(conf: SparkConf): Array[File] = {
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").flatMap { rootDir =>
Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
var foundLocalDir = false
var localDir: File = null
var localDirId: String = null
Expand Down
67 changes: 63 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,71 @@ private[spark] object Utils extends Logging {
}

/**
* Get a temporary directory using Spark's spark.local.dir property, if set. This will always
* return a single directory, even though the spark.local.dir property might be a list of
* multiple paths.
* Get the path of a temporary directory. Spark's local directories can be configured through
* multiple settings, which are used with the following precedence:
*
* - If called from inside of a YARN container, this will return a directory chosen by YARN.
* - If the SPARK_LOCAL_DIRS environment variable is set, this will return a directory from it.
* - Otherwise, if the spark.local.dir is set, this will return a directory from it.
* - Otherwise, this will return java.io.tmpdir.
*
* Some of these configuration options might be lists of multiple paths, but this method will
* always return a single directory.
*/
def getLocalDir(conf: SparkConf): String = {
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
getOrCreateLocalRootDirs(conf)(0)
}

private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = {
// These environment variables are set by YARN.
// For Hadoop 0.23.X, we check for YARN_LOCAL_DIRS (we use this below in getYarnLocalDirs())
// For Hadoop 2.X, we check for CONTAINER_ID.
conf.getenv("CONTAINER_ID") != null || conf.getenv("YARN_LOCAL_DIRS") != null
}

/**
* Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS,
* and returns only the directories that exist / could be created.
*
* If no directories could be created, this will return an empty list.
*/
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
val confValue = if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available.
getYarnLocalDirs(conf)
} else {
Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
}
val rootDirs = confValue.split(',')
logDebug(s"Getting/creating local root dirs at '$confValue'")

rootDirs.flatMap { rootDir =>
val localDir: File = new File(rootDir)
val foundLocalDir = localDir.exists || localDir.mkdirs()
if (!foundLocalDir) {
logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.")
None
} else {
Some(rootDir)
}
}
}

/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(conf: SparkConf): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(conf.getenv("LOCAL_DIRS"))
.getOrElse(""))

if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
}
localDirs
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val blockManager = mock(classOf[BlockManager])
val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
when(shuffleBlockManager.conf).thenReturn(conf)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
System.getProperty("java.io.tmpdir"))
val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)

when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
}

override def beforeEach() {
diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
shuffleBlockManager.idToSegmentMap.clear()
}

Expand Down
61 changes: 61 additions & 0 deletions core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import java.io.File

import org.apache.spark.util.Utils
import org.scalatest.FunSuite

import org.apache.spark.SparkConf


/**
* Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
*/
class LocalDirsSuite extends FunSuite {

test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") {
// Regression test for SPARK-2974
assert(!new File("/NONEXISTENT_DIR").exists())
val conf = new SparkConf(false)
.set("spark.local.dir", s"/NONEXISTENT_PATH,${System.getProperty("java.io.tmpdir")}")
assert(new File(Utils.getLocalDir(conf)).exists())
}

test("SPARK_LOCAL_DIRS override also affects driver") {
// Regression test for SPARK-2975
assert(!new File("/NONEXISTENT_DIR").exists())
// SPARK_LOCAL_DIRS is a valid directory:
class MySparkConf extends SparkConf(false) {
override def getenv(name: String) = {
if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir")
else super.getenv(name)
}

override def clone: SparkConf = {
new MySparkConf().setAll(settings)
}
}
// spark.local.dir only contains invalid directories, but that's not a problem since
// SPARK_LOCAL_DIRS will override it on both the driver and workers:
val conf = new MySparkConf().set("spark.local.dir", "/NONEXISTENT_PATH")
assert(new File(Utils.getLocalDir(conf)).exists())
}

}
2 changes: 1 addition & 1 deletion python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def __init__(self, aggregator, memory_limit=512, serializer=None,

def _get_dirs(self):
""" Get all the directories """
path = os.environ.get("SPARK_LOCAL_DIR", "/tmp")
path = os.environ.get("SPARK_LOCAL_DIRS", "/tmp")
dirs = path.split(",")
return [os.path.join(d, "python", str(os.getpid()), str(id(self)))
for d in dirs]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false

def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())

// set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
Expand Down Expand Up @@ -138,20 +134,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
params)
}

/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))

localDirs match {
case None => throw new Exception("Yarn Local dirs can't be empty")
case Some(l) => l
}
}

private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}

def run() {

// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())

appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()

Expand Down Expand Up @@ -152,20 +147,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
System.exit(0)
}

/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))

localDirs match {
case None => throw new Exception("Yarn Local dirs can't be empty")
case Some(l) => l
}
}

private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false

def run() {
// Setup the directories so things go to YARN approved directories rather
// than user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())

// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
Expand Down Expand Up @@ -144,20 +140,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
"spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
}

// Get the Yarn approved local directories.
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))

localDirs match {
case None => throw new Exception("Yarn local dirs can't be empty")
case Some(l) => l
}
}

private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
Expand Down
Loading

0 comments on commit ad76b3b

Please sign in to comment.