Skip to content

Commit

Permalink
[SPARK-1395] Allow "local:" URIs to work on Yarn.
Browse files Browse the repository at this point in the history
This only works for the three paths defined in the environment
(SPARK_JAR, SPARK_YARN_APP_JAR and SPARK_LOG4J_CONF).

Tested by running SparkPi with local: and file: URIs against Yarn cluster (no "upload" shows up in logs in the local case).

Author: Marcelo Vanzin <[email protected]>

Closes apache#303 from vanzin/yarn-local and squashes the following commits:

82219c1 [Marcelo Vanzin] [SPARK-1395] Allow "local:" URIs to work on Yarn.
  • Loading branch information
Marcelo Vanzin authored and pdeyhim committed Jun 25, 2014
1 parent 456f88d commit fd86fe8
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 79 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy

import java.io.{PrintStream, File}
import java.net.URL
import java.net.{URI, URL}

import org.apache.spark.executor.ExecutorURLClassLoader

Expand Down Expand Up @@ -216,7 +216,7 @@ object SparkSubmit {
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(localJar)
val localJarFile = new File(new URI(localJar).getPath())
if (!localJarFile.exists()) {
printWarning(s"Jar $localJar does not exist, skipping.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ExecutorRunnable(
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))

val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
localResources.contains(ClientBase.LOG4J_PROP))
localResources)
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)

Expand Down
190 changes: 128 additions & 62 deletions yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.yarn

import java.io.File
import java.net.{InetAddress, UnknownHostException, URI}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -209,53 +209,35 @@ trait ClientBase extends Logging {

Map(
ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar,
ClientBase.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")
ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
).foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (! localPath.isEmpty()) {
val localURI = new URI(localPath)
val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
}
}
}

// Handle jars local to the ApplicationMaster.
if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
// Only add the resource to the Spark ApplicationMaster.
val appMasterOnly = true
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache, appMasterOnly)
}
}

// Handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
args.files.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache)
}
}

// Handle any distributed cache archives
if ((args.archives != null) && (!args.archives.isEmpty())) {
args.archives.split(',').foreach { case file:String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
linkname, statCache)
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false) )
fileLists.foreach { case (flist, resType, appMasterOnly) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
linkname, statCache, appMasterOnly)
}
}
}
}

Expand All @@ -269,12 +251,14 @@ trait ClientBase extends Logging {
logInfo("Setting up the launch environment")

val env = new HashMap[String, String]()

ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP),
env)
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
if (log4jConf != null) {
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
}

// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
Expand Down Expand Up @@ -345,10 +329,7 @@ trait ClientBase extends Logging {
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
}

if (!localResources.contains(ClientBase.LOG4J_PROP)) {
JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
}
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)

// Command for the ApplicationMaster
val commands = List[String](
Expand Down Expand Up @@ -377,6 +358,8 @@ object ClientBase {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val LOG4J_PROP: String = "log4j.properties"
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
val LOCAL_SCHEME = "local"

// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
Expand Down Expand Up @@ -428,30 +411,113 @@ object ClientBase {
}
}

def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
/**
* Returns the java command line argument for setting up log4j. If there is a log4j.properties
* in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
* is checked.
*/
def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = {
var log4jConf = LOG4J_PROP
if (!localResources.contains(log4jConf)) {
log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match {
case conf: String =>
val confUri = new URI(conf)
if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) {
"file://" + confUri.getPath()
} else {
ClientBase.LOG4J_PROP
}
case null => "log4j-spark-container.properties"
}
}
" -Dlog4j.configuration=" + log4jConf
}

def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
log4jConf: String, env: HashMap[String, String]) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
File.pathSeparator)
// If log4j present, ensure ours overrides all others
if (addLog4j) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP, File.pathSeparator)
if (log4jConf != null) {
// If a custom log4j config file is provided as a local: URI, add its parent directory to the
// classpath. Note that this only works if the custom config's file name is
// "log4j.properties".
val localPath = getLocalPath(log4jConf)
if (localPath != null) {
val parentPath = new File(localPath).getParent()
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath,
File.pathSeparator)
}
}
// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR, File.pathSeparator)
addUserClasspath(args, env)
}
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR, File.pathSeparator)
addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env);
ClientBase.populateHadoopClasspath(conf, env)

if (!userClasspathFirst) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR, File.pathSeparator)
addUserClasspath(args, env)
}
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator)
}

/**
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
* to the classpath.
*/
private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
if (args != null) {
addClasspathEntry(args.userJar, APP_JAR, env)
}

if (args != null && args.addJars != null) {
args.addJars.split(",").foreach { case file: String =>
addClasspathEntry(file, null, env)
}
}
}

/**
* Adds the given path to the classpath, handling "local:" URIs correctly.
*
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
* name will be added to the classpath (relative to the job's work directory).
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
* @param path Path to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addClasspathEntry(path: String, fileName: String,
env: HashMap[String, String]) : Unit = {
if (path != null) {
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
val localPath = getLocalPath(path)
if (localPath != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
File.pathSeparator)
return
}
}
}
if (fileName != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
}
}

/**
* Returns the local path if the URI is a "local:" URI, or null otherwise.
*/
private def getLocalPath(resource: String): String = {
val uri = new URI(resource)
if (LOCAL_SCHEME.equals(uri.getScheme())) {
return uri.getPath()
}
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*", File.pathSeparator)
null
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ trait ExecutorRunnableUtil extends Logging {
hostname: String,
executorMemory: Int,
executorCores: Int,
userSpecifiedLogFile: Boolean) = {
localResources: HashMap[String, LocalResource]) = {
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
Expand All @@ -64,10 +64,7 @@ trait ExecutorRunnableUtil extends Logging {

JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "

if (!userSpecifiedLogFile) {
JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
}
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)

// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
Expand Down Expand Up @@ -120,7 +117,7 @@ trait ExecutorRunnableUtil extends Logging {
rtype: LocalResourceType,
localResources: HashMap[String, LocalResource],
timestamp: String,
size: String,
size: String,
vis: String) = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
Expand Down Expand Up @@ -153,7 +150,7 @@ trait ExecutorRunnableUtil extends Logging {
val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
for( i <- 0 to distArchives.length - 1) {
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
timeStamps(i), fileSizes(i), visibilities(i))
}
}
Expand All @@ -165,7 +162,11 @@ trait ExecutorRunnableUtil extends Logging {
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()

ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env)
if (log4jConf != null) {
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
}

// Allow users to specify some environment variables
YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}

override def getCurrentUserCredentials(): Credentials = {
override def getCurrentUserCredentials(): Credentials = {
UserGroupInformation.getCurrentUser().getCredentials()
}

Expand All @@ -76,10 +76,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}

object YarnSparkHadoopUtil {
def getLoggingArgsForContainerCommandLine(): String = {
"-Dlog4j.configuration=log4j-spark-container.properties"
}

def addToEnvironment(
env: HashMap[String, String],
variable: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class ExecutorRunnable(
ctx.setTokens(ByteBuffer.wrap(dob.getData()))

val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
localResources.contains(ClientBase.LOG4J_PROP))
localResources)

logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
Expand Down

0 comments on commit fd86fe8

Please sign in to comment.