Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2974] [SPARK-2975] Fix two bugs related to spark.local.dirs #2002

Closed
wants to merge 9 commits into from
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

private val settings = new HashMap[String, String]()
private[spark] val settings = new HashMap[String, String]()

if (loadDefaults) {
// Load any spark.* system properties
Expand Down Expand Up @@ -210,6 +210,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
new SparkConf(false).setAll(settings)
}

/**
* By using this instead of System.getenv(), environment variables can be mocked
* in unit tests.
*/
private[spark] def getenv(name: String): String = System.getenv(name)

/** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
* idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
private[spark] def validateSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] class PythonRDD(
val env = SparkEnv.get
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
envVars += ("SPARK_LOCAL_DIR" -> localdir) // it's also used in monitor thread
envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was simply a typo before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. It looks like it was innocuous because both the Java and Python sides had the same typo.

I suppose it might be cleaner here to read the local directories using the methods in Utils instead of reading them out of diskBlockManager, but I think this code is correct as written.

val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)

// Start a thread to feed the process input from our parent's iterator
Expand Down
25 changes: 0 additions & 25 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ private[spark] class Executor(
val conf = new SparkConf(true)
conf.setAll(properties)

// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
if (java.lang.Boolean.valueOf(
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
conf.set("spark.local.dir", getYarnLocalDirs())
} else if (sys.env.contains("SPARK_LOCAL_DIRS")) {
conf.set("spark.local.dir", sys.env("SPARK_LOCAL_DIRS"))
}

if (!isLocal) {
// Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire
Expand Down Expand Up @@ -134,21 +124,6 @@ private[spark] class Executor(
threadPool.shutdown()
}

/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
.getOrElse(""))

if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
}
localDirs
}

class TaskRunner(
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
extends Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ private[spark] class BlockManager(

private val port = conf.getInt("spark.blockManager.port", 0)
val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
val connectionManager =
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random, UUID}

import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
import org.apache.spark.util.Utils
Expand All @@ -33,9 +33,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
* However, it is also possible to have a block map to only a segment of a file, by calling
* mapBlockToFileSegment().
*
* @param rootDirs The directories to use for storing block files. Data will be hashed among these.
* Block files are hashed among the directories listed in spark.local.dir (or in
* SPARK_LOCAL_DIRS, if it's set).
*/
private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String)
private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf)
extends PathResolver with Logging {

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
Expand All @@ -46,7 +47,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
val localDirs: Array[File] = createLocalDirs()
val localDirs: Array[File] = createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
Expand Down Expand Up @@ -131,10 +132,9 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
(blockId, getFile(blockId))
}

private def createLocalDirs(): Array[File] = {
logDebug(s"Creating local directories at root dirs '$rootDirs'")
private def createLocalDirs(conf: SparkConf): Array[File] = {
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").flatMap { rootDir =>
Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is some logic below that also tries to create parent dirs - would it make sense to remove that now since you assume all directories given here exist?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see - I think that logic only applies to sub directories

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep; the only difference would be replacing localDir.mkdirs() with localDir.mkdir(); the code below creates subdirectories that are used only by the DiskBlockManager.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I think there is still a difference. In the case where the top level directories cannot be created, the behavior now differs from what was there before. It will now be silent whereas previously there would be an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think this code works correctly as written. If getOrCreateLocalRootDirs() returns an empty list, then createLocalDirs() will return an empty list, but its caller immediately checks whether the result is empty; if it is, it logs an error and exits the JVM. This is the same behavior as before (since the code below never actually throws any uncaught exceptions).

var foundLocalDir = false
var localDir: File = null
var localDirId: String = null
Expand Down
67 changes: 63 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,71 @@ private[spark] object Utils extends Logging {
}

/**
* Get a temporary directory using Spark's spark.local.dir property, if set. This will always
* return a single directory, even though the spark.local.dir property might be a list of
* multiple paths.
* Get the path of a temporary directory. Spark's local directories can be configured through
* multiple settings, which are used with the following precedence:
*
* - If called from inside of a YARN container, this will return a directory chosen by YARN.
* - If the SPARK_LOCAL_DIRS environment variable is set, this will return a directory from it.
* - Otherwise, if the spark.local.dir is set, this will return a directory from it.
* - Otherwise, this will return java.io.tmpdir.
*
* Some of these configuration options might be lists of multiple paths, but this method will
* always return a single directory.
*/
def getLocalDir(conf: SparkConf): String = {
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
getOrCreateLocalRootDirs(conf)(0)
}

private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = {
// These environment variables are set by YARN.
// For Hadoop 0.23.X, we check for YARN_LOCAL_DIRS (we use this below in getYarnLocalDirs())
// For Hadoop 2.X, we check for CONTAINER_ID.
conf.getenv("CONTAINER_ID") != null || conf.getenv("YARN_LOCAL_DIRS") != null
}

/**
* Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth adding note about YARN here.

* and returns only the directories that exist / could be created.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it's redundant - it might be good to explicitly say that this can return an empty list.

*
* If no directories could be created, this will return an empty list.
*/
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
val confValue = if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available.
getYarnLocalDirs(conf)
} else {
Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
}
val rootDirs = confValue.split(',')
logDebug(s"Getting/creating local root dirs at '$confValue'")

rootDirs.flatMap { rootDir =>
val localDir: File = new File(rootDir)
val foundLocalDir = localDir.exists || localDir.mkdirs()
if (!foundLocalDir) {
logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.")
None
} else {
Some(rootDir)
}
}
}

/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(conf: SparkConf): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(conf.getenv("LOCAL_DIRS"))
.getOrElse(""))

if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
}
localDirs
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val blockManager = mock(classOf[BlockManager])
val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
when(shuffleBlockManager.conf).thenReturn(conf)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
System.getProperty("java.io.tmpdir"))
val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)

when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
}

override def beforeEach() {
diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
shuffleBlockManager.idToSegmentMap.clear()
}

Expand Down
61 changes: 61 additions & 0 deletions core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import java.io.File

import org.apache.spark.util.Utils
import org.scalatest.FunSuite

import org.apache.spark.SparkConf


/**
* Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
*/
class LocalDirsSuite extends FunSuite {

test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") {
// Regression test for SPARK-2974
assert(!new File("/NONEXISTENT_DIR").exists())
val conf = new SparkConf(false)
.set("spark.local.dir", s"/NONEXISTENT_PATH,${System.getProperty("java.io.tmpdir")}")
assert(new File(Utils.getLocalDir(conf)).exists())
}

test("SPARK_LOCAL_DIRS override also affects driver") {
// Regression test for SPARK-2975
assert(!new File("/NONEXISTENT_DIR").exists())
// SPARK_LOCAL_DIRS is a valid directory:
class MySparkConf extends SparkConf(false) {
override def getenv(name: String) = {
if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir")
else super.getenv(name)
}

override def clone: SparkConf = {
new MySparkConf().setAll(settings)
}
}
// spark.local.dir only contains invalid directories, but that's not a problem since
// SPARK_LOCAL_DIRS will override it on both the driver and workers:
val conf = new MySparkConf().set("spark.local.dir", "/NONEXISTENT_PATH")
assert(new File(Utils.getLocalDir(conf)).exists())
}

}
2 changes: 1 addition & 1 deletion python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def __init__(self, aggregator, memory_limit=512, serializer=None,

def _get_dirs(self):
""" Get all the directories """
path = os.environ.get("SPARK_LOCAL_DIR", "/tmp")
path = os.environ.get("SPARK_LOCAL_DIRS", "/tmp")
dirs = path.split(",")
return [os.path.join(d, "python", str(os.getpid()), str(id(self)))
for d in dirs]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false

def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())

// set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
Expand Down Expand Up @@ -138,20 +134,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
params)
}

/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))

localDirs match {
case None => throw new Exception("Yarn Local dirs can't be empty")
case Some(l) => l
}
}

private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}

def run() {

// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())

appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()

Expand Down Expand Up @@ -152,20 +147,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
System.exit(0)
}

/** Get the Yarn approved local directories. */
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))

localDirs match {
case None => throw new Exception("Yarn Local dirs can't be empty")
case Some(l) => l
}
}

private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false

def run() {
// Setup the directories so things go to YARN approved directories rather
// than user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())

// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
Expand Down Expand Up @@ -144,20 +140,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
"spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
}

// Get the Yarn approved local directories.
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))

localDirs match {
case None => throw new Exception("Yarn local dirs can't be empty")
case Some(l) => l
}
}

private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
Expand Down
Loading