Skip to content

Commit

Permalink
[SPARK-1395] Allow "local:" URIs to work on Yarn.
Browse files Browse the repository at this point in the history
Don't distribute resources that have the "local" scheme
in their URIs.

Also fix an issue where the classpath for custom log4j
config files was incorrectly being set to include the
file's path (instead of its parent), and propagate the
user log4j configuration to the AM / workers. This has
the side effect of eliminating the "SPARK_YARN_LOG4J_CONF"
environment variable, which used to be used by the ExecutorRunner
class, in favor of the "SPARK_LOG4J_CONF" variable used
by the Yarn client class.
  • Loading branch information
Marcelo Vanzin committed Apr 16, 2014
1 parent 725925c commit 82219c1
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 79 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy

import java.io.{PrintStream, File}
import java.net.URL
import java.net.{URI, URL}

import org.apache.spark.executor.ExecutorURLClassLoader

Expand Down Expand Up @@ -216,7 +216,7 @@ object SparkSubmit {
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(localJar)
val localJarFile = new File(new URI(localJar).getPath())
if (!localJarFile.exists()) {
printWarning(s"Jar $localJar does not exist, skipping.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ExecutorRunnable(
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))

val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
localResources.contains(ClientBase.LOG4J_PROP))
localResources)
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)

Expand Down
190 changes: 128 additions & 62 deletions yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.yarn

import java.io.File
import java.net.{InetAddress, UnknownHostException, URI}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -209,53 +209,35 @@ trait ClientBase extends Logging {

Map(
ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar,
ClientBase.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")
ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
).foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (! localPath.isEmpty()) {
val localURI = new URI(localPath)
val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
}
}
}

// Handle jars local to the ApplicationMaster.
if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
// Only add the resource to the Spark ApplicationMaster.
val appMasterOnly = true
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache, appMasterOnly)
}
}

// Handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
args.files.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache)
}
}

// Handle any distributed cache archives
if ((args.archives != null) && (!args.archives.isEmpty())) {
args.archives.split(',').foreach { case file:String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
linkname, statCache)
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false) )
fileLists.foreach { case (flist, resType, appMasterOnly) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
linkname, statCache, appMasterOnly)
}
}
}
}

Expand All @@ -269,12 +251,14 @@ trait ClientBase extends Logging {
logInfo("Setting up the launch environment")

val env = new HashMap[String, String]()

ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP),
env)
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
if (log4jConf != null) {
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
}

// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
Expand Down Expand Up @@ -345,10 +329,7 @@ trait ClientBase extends Logging {
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
}

if (!localResources.contains(ClientBase.LOG4J_PROP)) {
JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
}
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)

// Command for the ApplicationMaster
val commands = List[String](
Expand Down Expand Up @@ -377,6 +358,8 @@ object ClientBase {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val LOG4J_PROP: String = "log4j.properties"
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
val LOCAL_SCHEME = "local"

// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
Expand Down Expand Up @@ -428,30 +411,113 @@ object ClientBase {
}
}

def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
/**
* Returns the java command line argument for setting up log4j. If there is a log4j.properties
* in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
* is checked.
*/
def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = {
var log4jConf = LOG4J_PROP
if (!localResources.contains(log4jConf)) {
log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match {
case conf: String =>
val confUri = new URI(conf)
if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) {
"file://" + confUri.getPath()
} else {
ClientBase.LOG4J_PROP
}
case null => "log4j-spark-container.properties"
}
}
" -Dlog4j.configuration=" + log4jConf
}

def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
log4jConf: String, env: HashMap[String, String]) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
File.pathSeparator)
// If log4j present, ensure ours overrides all others
if (addLog4j) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP, File.pathSeparator)
if (log4jConf != null) {
// If a custom log4j config file is provided as a local: URI, add its parent directory to the
// classpath. Note that this only works if the custom config's file name is
// "log4j.properties".
val localPath = getLocalPath(log4jConf)
if (localPath != null) {
val parentPath = new File(localPath).getParent()
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath,
File.pathSeparator)
}
}
// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR, File.pathSeparator)
addUserClasspath(args, env)
}
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR, File.pathSeparator)
addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env);
ClientBase.populateHadoopClasspath(conf, env)

if (!userClasspathFirst) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR, File.pathSeparator)
addUserClasspath(args, env)
}
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator)
}

/**
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
* to the classpath.
*/
private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
if (args != null) {
addClasspathEntry(args.userJar, APP_JAR, env)
}

if (args != null && args.addJars != null) {
args.addJars.split(",").foreach { case file: String =>
addClasspathEntry(file, null, env)
}
}
}

/**
* Adds the given path to the classpath, handling "local:" URIs correctly.
*
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
* name will be added to the classpath (relative to the job's work directory).
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
* @param path Path to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addClasspathEntry(path: String, fileName: String,
env: HashMap[String, String]) : Unit = {
if (path != null) {
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
val localPath = getLocalPath(path)
if (localPath != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
File.pathSeparator)
return
}
}
}
if (fileName != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
}
}

/**
* Returns the local path if the URI is a "local:" URI, or null otherwise.
*/
private def getLocalPath(resource: String): String = {
val uri = new URI(resource)
if (LOCAL_SCHEME.equals(uri.getScheme())) {
return uri.getPath()
}
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*", File.pathSeparator)
null
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ trait ExecutorRunnableUtil extends Logging {
hostname: String,
executorMemory: Int,
executorCores: Int,
userSpecifiedLogFile: Boolean) = {
localResources: HashMap[String, LocalResource]) = {
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
Expand All @@ -64,10 +64,7 @@ trait ExecutorRunnableUtil extends Logging {

JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "

if (!userSpecifiedLogFile) {
JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
}
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)

// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
Expand Down Expand Up @@ -120,7 +117,7 @@ trait ExecutorRunnableUtil extends Logging {
rtype: LocalResourceType,
localResources: HashMap[String, LocalResource],
timestamp: String,
size: String,
size: String,
vis: String) = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
Expand Down Expand Up @@ -153,7 +150,7 @@ trait ExecutorRunnableUtil extends Logging {
val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
for( i <- 0 to distArchives.length - 1) {
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
timeStamps(i), fileSizes(i), visibilities(i))
}
}
Expand All @@ -165,7 +162,11 @@ trait ExecutorRunnableUtil extends Logging {
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()

ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env)
if (log4jConf != null) {
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
}

// Allow users to specify some environment variables
YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}

override def getCurrentUserCredentials(): Credentials = {
override def getCurrentUserCredentials(): Credentials = {
UserGroupInformation.getCurrentUser().getCredentials()
}

Expand All @@ -76,10 +76,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}

object YarnSparkHadoopUtil {
def getLoggingArgsForContainerCommandLine(): String = {
"-Dlog4j.configuration=log4j-spark-container.properties"
}

def addToEnvironment(
env: HashMap[String, String],
variable: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class ExecutorRunnable(
ctx.setTokens(ByteBuffer.wrap(dob.getData()))

val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
localResources.contains(ClientBase.LOG4J_PROP))
localResources)

logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
Expand Down

0 comments on commit 82219c1

Please sign in to comment.