From 82219c1e4cc3fa722c360f5b70b55a9191c73ad3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 2 Apr 2014 13:22:07 -0700 Subject: [PATCH] [SPARK-1395] Allow "local:" URIs to work on Yarn. Don't distribute resources that have the "local" scheme in their URIs. Also fix an issue where the classpath for custom log4j config files was incorrectly being set to include the file's path (instead of its parent), and propagate the user log4j configuration to the AM / workers. This has the side effect of eliminating the "SPARK_YARN_LOG4J_CONF" environment variable, which used to be used by the ExecutorRunner class, in favor of the "SPARK_LOG4J_CONF" variable used by the Yarn client class. --- .../org/apache/spark/deploy/SparkSubmit.scala | 4 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 2 +- .../apache/spark/deploy/yarn/ClientBase.scala | 190 ++++++++++++------ .../deploy/yarn/ExecutorRunnableUtil.scala | 17 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 6 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 2 +- 6 files changed, 142 insertions(+), 79 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e05fbfe321495..e5d593cade8b3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy import java.io.{PrintStream, File} -import java.net.URL +import java.net.{URI, URL} import org.apache.spark.executor.ExecutorURLClassLoader @@ -216,7 +216,7 @@ object SparkSubmit { } private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { - val localJarFile = new File(localJar) + val localJarFile = new File(new URI(localJar).getPath()) if (!localJarFile.exists()) { printWarning(s"Jar $localJar does not exist, skipping.") } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 3469b7decedf6..7dae248e3e7db 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -82,7 +82,7 @@ class ExecutorRunnable( ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - localResources.contains(ClientBase.LOG4J_PROP)) + localResources) logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 628dd98860639..566de712fc280 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.yarn import java.io.File -import java.net.{InetAddress, UnknownHostException, URI} +import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import scala.collection.JavaConversions._ @@ -209,53 +209,35 @@ trait ClientBase extends Logging { Map( ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar, - ClientBase.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF") + ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) ).foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (! localPath.isEmpty()) { val localURI = new URI(localPath) - val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false - val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - destName, statCache) + if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) { + val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false + val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + destName, statCache) + } } } - // Handle jars local to the ApplicationMaster. - if ((args.addJars != null) && (!args.addJars.isEmpty())){ - args.addJars.split(',').foreach { case file: String => - val localURI = new URI(file.trim()) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - // Only add the resource to the Spark ApplicationMaster. - val appMasterOnly = true - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - linkname, statCache, appMasterOnly) - } - } - - // Handle any distributed cache files - if ((args.files != null) && (!args.files.isEmpty())){ - args.files.split(',').foreach { case file: String => - val localURI = new URI(file.trim()) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, - linkname, statCache) - } - } - - // Handle any distributed cache archives - if ((args.archives != null) && (!args.archives.isEmpty())) { - args.archives.split(',').foreach { case file:String => - val localURI = new URI(file.trim()) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, - linkname, statCache) + val fileLists = List( (args.addJars, LocalResourceType.FILE, true), + (args.files, LocalResourceType.FILE, false), + (args.archives, LocalResourceType.ARCHIVE, false) ) + fileLists.foreach { case (flist, resType, appMasterOnly) => + if (flist != null && !flist.isEmpty()) { + flist.split(',').foreach { case file: String => + val localURI = new URI(file.trim()) + if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) { + val localPath = new Path(localURI) + val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val destPath = copyRemoteFile(dst, localPath, replication) + distCacheMgr.addResource(fs, conf, destPath, localResources, resType, + linkname, statCache, appMasterOnly) + } + } } } @@ -269,12 +251,14 @@ trait ClientBase extends Logging { logInfo("Setting up the launch environment") val env = new HashMap[String, String]() - - ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP), - env) + val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) + ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() + if (log4jConf != null) { + env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf + } // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) @@ -345,10 +329,7 @@ trait ClientBase extends Logging { if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += " " + env("SPARK_JAVA_OPTS") } - - if (!localResources.contains(ClientBase.LOG4J_PROP)) { - JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine() - } + JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources) // Command for the ApplicationMaster val commands = List[String]( @@ -377,6 +358,8 @@ object ClientBase { val SPARK_JAR: String = "spark.jar" val APP_JAR: String = "app.jar" val LOG4J_PROP: String = "log4j.properties" + val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF" + val LOCAL_SCHEME = "local" // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) { @@ -428,30 +411,113 @@ object ClientBase { } } - def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) { + /** + * Returns the java command line argument for setting up log4j. If there is a log4j.properties + * in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable + * is checked. + */ + def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = { + var log4jConf = LOG4J_PROP + if (!localResources.contains(log4jConf)) { + log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match { + case conf: String => + val confUri = new URI(conf) + if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) { + "file://" + confUri.getPath() + } else { + ClientBase.LOG4J_PROP + } + case null => "log4j-spark-container.properties" + } + } + " -Dlog4j.configuration=" + log4jConf + } + + def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf, + log4jConf: String, env: HashMap[String, String]) { YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(), File.pathSeparator) - // If log4j present, ensure ours overrides all others - if (addLog4j) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + LOG4J_PROP, File.pathSeparator) + if (log4jConf != null) { + // If a custom log4j config file is provided as a local: URI, add its parent directory to the + // classpath. Note that this only works if the custom config's file name is + // "log4j.properties". + val localPath = getLocalPath(log4jConf) + if (localPath != null) { + val parentPath = new File(localPath).getParent() + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath, + File.pathSeparator) + } } // Normally the users app.jar is last in case conflicts with spark jars val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + APP_JAR, File.pathSeparator) + addUserClasspath(args, env) } - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + SPARK_JAR, File.pathSeparator) + addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env); ClientBase.populateHadoopClasspath(conf, env) - if (!userClasspathFirst) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + APP_JAR, File.pathSeparator) + addUserClasspath(args, env) + } + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator) + } + + /** + * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly + * to the classpath. + */ + private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = { + if (args != null) { + addClasspathEntry(args.userJar, APP_JAR, env) + } + + if (args != null && args.addJars != null) { + args.addJars.split(",").foreach { case file: String => + addClasspathEntry(file, null, env) + } + } + } + + /** + * Adds the given path to the classpath, handling "local:" URIs correctly. + * + * If an alternate name for the file is given, and it's not a "local:" file, the alternate + * name will be added to the classpath (relative to the job's work directory). + * + * If not a "local:" file and no alternate name, the environment is not modified. + * + * @param path Path to add to classpath (optional). + * @param fileName Alternate name for the file (optional). + * @param env Map holding the environment variables. + */ + private def addClasspathEntry(path: String, fileName: String, + env: HashMap[String, String]) : Unit = { + if (path != null) { + scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { + val localPath = getLocalPath(path) + if (localPath != null) { + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath, + File.pathSeparator) + return + } + } + } + if (fileName != null) { + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator); + } + } + + /** + * Returns the local path if the URI is a "local:" URI, or null otherwise. + */ + private def getLocalPath(resource: String): String = { + val uri = new URI(resource) + if (LOCAL_SCHEME.equals(uri.getScheme())) { + return uri.getPath() } - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + - Path.SEPARATOR + "*", File.pathSeparator) + null } + } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 9159cc4ad5ee8..40b38661f794d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -52,7 +52,7 @@ trait ExecutorRunnableUtil extends Logging { hostname: String, executorMemory: Int, executorCores: Int, - userSpecifiedLogFile: Boolean) = { + localResources: HashMap[String, LocalResource]) = { // Extra options for the JVM var JAVA_OPTS = "" // Set the JVM memory @@ -64,10 +64,7 @@ trait ExecutorRunnableUtil extends Logging { JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - - if (!userSpecifiedLogFile) { - JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine() - } + JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources) // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. @@ -120,7 +117,7 @@ trait ExecutorRunnableUtil extends Logging { rtype: LocalResourceType, localResources: HashMap[String, LocalResource], timestamp: String, - size: String, + size: String, vis: String) = { val uri = new URI(file) val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] @@ -153,7 +150,7 @@ trait ExecutorRunnableUtil extends Logging { val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') for( i <- 0 to distArchives.length - 1) { - setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, + setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, timeStamps(i), fileSizes(i), visibilities(i)) } } @@ -165,7 +162,11 @@ trait ExecutorRunnableUtil extends Logging { def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) + val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) + ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env) + if (log4jConf != null) { + env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf + } // Allow users to specify some environment variables YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"), diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4ceed95a25b60..832d45b3ad10e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -54,7 +54,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } - override def getCurrentUserCredentials(): Credentials = { + override def getCurrentUserCredentials(): Credentials = { UserGroupInformation.getCurrentUser().getCredentials() } @@ -76,10 +76,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } object YarnSparkHadoopUtil { - def getLoggingArgsForContainerCommandLine(): String = { - "-Dlog4j.configuration=log4j-spark-container.properties" - } - def addToEnvironment( env: HashMap[String, String], variable: String, diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 81d9d1b5c9280..117b33f466f85 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -79,7 +79,7 @@ class ExecutorRunnable( ctx.setTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - localResources.contains(ClientBase.LOG4J_PROP)) + localResources) logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands)