Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1395] Allow "local:" URIs to work on Yarn. #303

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy

import java.io.{PrintStream, File}
import java.net.URL
import java.net.{URI, URL}

import org.apache.spark.executor.ExecutorURLClassLoader

Expand Down Expand Up @@ -216,7 +216,7 @@ object SparkSubmit {
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(localJar)
val localJarFile = new File(new URI(localJar).getPath())
if (!localJarFile.exists()) {
printWarning(s"Jar $localJar does not exist, skipping.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ExecutorRunnable(
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))

val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
localResources.contains(ClientBase.LOG4J_PROP))
localResources)
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)

Expand Down
190 changes: 128 additions & 62 deletions yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.yarn

import java.io.File
import java.net.{InetAddress, UnknownHostException, URI}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -209,53 +209,35 @@ trait ClientBase extends Logging {

Map(
ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar,
ClientBase.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")
ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
).foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (! localPath.isEmpty()) {
val localURI = new URI(localPath)
val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
val setPermissions = if (destName.equals(ClientBase.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
}
}
}

// Handle jars local to the ApplicationMaster.
if ((args.addJars != null) && (!args.addJars.isEmpty())){
args.addJars.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
// Only add the resource to the Spark ApplicationMaster.
val appMasterOnly = true
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache, appMasterOnly)
}
}

// Handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
args.files.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
linkname, statCache)
}
}

// Handle any distributed cache archives
if ((args.archives != null) && (!args.archives.isEmpty())) {
args.archives.split(',').foreach { case file:String =>
val localURI = new URI(file.trim())
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
linkname, statCache)
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false) )
fileLists.foreach { case (flist, resType, appMasterOnly) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
linkname, statCache, appMasterOnly)
}
}
}
}

Expand All @@ -269,12 +251,14 @@ trait ClientBase extends Logging {
logInfo("Setting up the launch environment")

val env = new HashMap[String, String]()

ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP),
env)
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
if (log4jConf != null) {
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
}

// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
Expand Down Expand Up @@ -345,10 +329,7 @@ trait ClientBase extends Logging {
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
}

if (!localResources.contains(ClientBase.LOG4J_PROP)) {
JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
}
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)

// Command for the ApplicationMaster
val commands = List[String](
Expand Down Expand Up @@ -377,6 +358,8 @@ object ClientBase {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val LOG4J_PROP: String = "log4j.properties"
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
val LOCAL_SCHEME = "local"

// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
Expand Down Expand Up @@ -428,30 +411,113 @@ object ClientBase {
}
}

def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
/**
* Returns the java command line argument for setting up log4j. If there is a log4j.properties
* in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable
* is checked.
*/
def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = {
var log4jConf = LOG4J_PROP
if (!localResources.contains(log4jConf)) {
log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match {
case conf: String =>
val confUri = new URI(conf)
if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) {
"file://" + confUri.getPath()
} else {
ClientBase.LOG4J_PROP
}
case null => "log4j-spark-container.properties"
}
}
" -Dlog4j.configuration=" + log4jConf
}

def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
log4jConf: String, env: HashMap[String, String]) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
File.pathSeparator)
// If log4j present, ensure ours overrides all others
if (addLog4j) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP, File.pathSeparator)
if (log4jConf != null) {
// If a custom log4j config file is provided as a local: URI, add its parent directory to the
// classpath. Note that this only works if the custom config's file name is
// "log4j.properties".
val localPath = getLocalPath(log4jConf)
if (localPath != null) {
val parentPath = new File(localPath).getParent()
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath,
File.pathSeparator)
}
}
// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR, File.pathSeparator)
addUserClasspath(args, env)
}
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR, File.pathSeparator)
addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env);
ClientBase.populateHadoopClasspath(conf, env)

if (!userClasspathFirst) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR, File.pathSeparator)
addUserClasspath(args, env)
}
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator)
}

/**
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
* to the classpath.
*/
private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
if (args != null) {
addClasspathEntry(args.userJar, APP_JAR, env)
}

if (args != null && args.addJars != null) {
args.addJars.split(",").foreach { case file: String =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment here stating this is to pick up the local: things would be nice for future reference.

addClasspathEntry(file, null, env)
}
}
}

/**
* Adds the given path to the classpath, handling "local:" URIs correctly.
*
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
* name will be added to the classpath (relative to the job's work directory).
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
* @param path Path to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addClasspathEntry(path: String, fileName: String,
env: HashMap[String, String]) : Unit = {
if (path != null) {
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
val localPath = getLocalPath(path)
if (localPath != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
File.pathSeparator)
return
}
}
}
if (fileName != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
}
}

/**
* Returns the local path if the URI is a "local:" URI, or null otherwise.
*/
private def getLocalPath(resource: String): String = {
val uri = new URI(resource)
if (LOCAL_SCHEME.equals(uri.getScheme())) {
return uri.getPath()
}
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*", File.pathSeparator)
null
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ trait ExecutorRunnableUtil extends Logging {
hostname: String,
executorMemory: Int,
executorCores: Int,
userSpecifiedLogFile: Boolean) = {
localResources: HashMap[String, LocalResource]) = {
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
Expand All @@ -64,10 +64,7 @@ trait ExecutorRunnableUtil extends Logging {

JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "

if (!userSpecifiedLogFile) {
JAVA_OPTS += " " + YarnSparkHadoopUtil.getLoggingArgsForContainerCommandLine()
}
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)

// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
Expand Down Expand Up @@ -120,7 +117,7 @@ trait ExecutorRunnableUtil extends Logging {
rtype: LocalResourceType,
localResources: HashMap[String, LocalResource],
timestamp: String,
size: String,
size: String,
vis: String) = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
Expand Down Expand Up @@ -153,7 +150,7 @@ trait ExecutorRunnableUtil extends Logging {
val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
for( i <- 0 to distArchives.length - 1) {
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
timeStamps(i), fileSizes(i), visibilities(i))
}
}
Expand All @@ -165,7 +162,11 @@ trait ExecutorRunnableUtil extends Logging {
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()

ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env)
if (log4jConf != null) {
env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
}

// Allow users to specify some environment variables
YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}

override def getCurrentUserCredentials(): Credentials = {
override def getCurrentUserCredentials(): Credentials = {
UserGroupInformation.getCurrentUser().getCredentials()
}

Expand All @@ -76,10 +76,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}

object YarnSparkHadoopUtil {
def getLoggingArgsForContainerCommandLine(): String = {
"-Dlog4j.configuration=log4j-spark-container.properties"
}

def addToEnvironment(
env: HashMap[String, String],
variable: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class ExecutorRunnable(
ctx.setTokens(ByteBuffer.wrap(dob.getData()))

val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
localResources.contains(ClientBase.LOG4J_PROP))
localResources)

logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
Expand Down