Skip to content

Commit

Permalink
fix Spark compilation is broken with the latest hadoop-2.4.0 release
Browse files Browse the repository at this point in the history
  • Loading branch information
xuan committed Apr 11, 2014
1 parent 3bd3129 commit be89fa7
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.File
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer

Expand Down Expand Up @@ -280,7 +281,8 @@ trait ClientBase extends Logging {
distCacheMgr.setDistArchivesEnv(env)

// Allow users to specify some environment variables.
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
File.pathSeparator)

// Add each SPARK_* key to the environment.
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
Expand Down Expand Up @@ -382,15 +384,17 @@ object ClientBase {
YarnConfiguration.YARN_APPLICATION_CLASSPATH)).getOrElse(
getDefaultYarnApplicationClasspath())
for (c <- classpathEntries) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
File.pathSeparator)
}

val mrClasspathEntries = Option(conf.getStrings(
"mapreduce.application.classpath")).getOrElse(
getDefaultMRApplicationClasspath())
if (mrClasspathEntries != null) {
for (c <- mrClasspathEntries) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
File.pathSeparator)
}
}
}
Expand Down Expand Up @@ -425,28 +429,29 @@ object ClientBase {
}

def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
File.pathSeparator)
// If log4j present, ensure ours overrides all others
if (addLog4j) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP, File.pathSeparator)
}
// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR, File.pathSeparator)
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR)
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR, File.pathSeparator)
ClientBase.populateHadoopClasspath(conf, env)

if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR, File.pathSeparator)
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*", File.pathSeparator)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.File
import java.net.URI
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
Expand Down Expand Up @@ -167,7 +168,8 @@ trait ExecutorRunnableUtil extends Logging {
ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)

// Allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
File.pathSeparator)

System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
env
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@

package org.apache.spark.deploy.yarn

import java.util.Map
import java.util.regex.Matcher
import java.util.regex.Pattern

import scala.collection.mutable.HashMap
import scala.collection.mutable.Map

import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.Shell
import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -73,4 +82,52 @@ object YarnSparkHadoopUtil {
def getLoggingArgsForContainerCommandLine(): String = {
"-Dlog4j.configuration=log4j-spark-container.properties"
}

def addToEnvironment(
env: HashMap[String, String],
variable: String,
value: String,
classPathSeparator: String) = {
var envVariable = ""
if (env.get(variable) == None) {
envVariable = value
} else {
envVariable = env.get(variable).get + classPathSeparator + value
}
env put (StringInterner.weakIntern(variable), StringInterner.weakIntern(envVariable))
}

def setEnvFromInputString(
env: HashMap[String, String],
envString: String,
classPathSeparator: String) = {
if (envString != null && envString.length() > 0) {
var childEnvs = envString.split(",")
var p = Pattern.compile(Shell.getEnvironmentVariableRegex())
for (cEnv <- childEnvs) {
var parts = cEnv.split("=") // split on '='
var m = p.matcher(parts(1))
val sb = new StringBuffer
while (m.find()) {
val variable = m.group(1)
var replace = ""
if (env.get(variable) != None) {
replace = env.get(variable).get
} else {
// if this key is not configured for the child .. get it
// from the env
replace = System.getenv(variable)
if (replace == null) {
// the env key is note present anywhere .. simply set it
replace = ""
}
}
m.appendReplacement(sb, Matcher.quoteReplacement(replace))
}
m.appendTail(sb)
addToEnvironment(env, parts(0), sb.toString(), classPathSeparator)
}
}
}

}

0 comments on commit be89fa7

Please sign in to comment.