Skip to content

Commit

Permalink
[SPARK-15782][YARN] Fix spark.jars and spark.yarn.dist.jars handling
Browse files Browse the repository at this point in the history
When `--packages` is specified with spark-shell the classes from those packages cannot be found, which I think is due to some of the changes in SPARK-12343.

Tested manually with both scala 2.10 and 2.11 repls.

vanzin davies can you guys please review?

Author: Marcelo Vanzin <[email protected]>
Author: Nezih Yigitbasi <[email protected]>

Closes #13709 from nezihyigitbasi/SPARK-15782.

(cherry picked from commit 63470af)
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
nezihyigitbasi authored and Marcelo Vanzin committed Jun 17, 2016
1 parent 68e7a25 commit feaba97
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 16 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

_jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten

Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2352,6 +2352,31 @@ private[spark] object Utils extends Logging {
log.info(s"Started daemon with process name: ${Utils.getProcessName()}")
SignalUtils.registerLogger(log)
}

/**
* Unions two comma-separated lists of files and filters out empty strings.
*/
def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = {
var allFiles = Set[String]()
leftList.foreach { value => allFiles ++= value.split(",") }
rightList.foreach { value => allFiles ++= value.split(",") }
allFiles.filter { _.nonEmpty }
}

/**
* In YARN mode this method returns a union of the jar files pointed by "spark.jars" and the
* "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed by
* only the "spark.jars" property.
*/
def getUserJars(conf: SparkConf): Seq[String] = {
val sparkJars = conf.getOption("spark.jars")
if (conf.get("spark.master") == "yarn") {
val yarnJars = conf.getOption("spark.yarn.dist.jars")
unionFileLists(sparkJars, yarnJars).toSeq
} else {
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}
}
}

/**
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,18 @@ class SparkSubmitSuite
appArgs.executorMemory should be ("2.3g")
}
}

test("comma separated list of files are unioned correctly") {
val left = Option("/tmp/a.jar,/tmp/b.jar")
val right = Option("/tmp/c.jar,/tmp/a.jar")
val emptyString = Option("")
Utils.unionFileLists(left, right) should be (Set("/tmp/a.jar", "/tmp/b.jar", "/tmp/c.jar"))
Utils.unionFileLists(emptyString, emptyString) should be (Set.empty)
Utils.unionFileLists(Option("/tmp/a.jar"), emptyString) should be (Set("/tmp/a.jar"))
Utils.unionFileLists(emptyString, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
Utils.unionFileLists(None, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar"))
Utils.unionFileLists(Option("/tmp/a.jar"), None) should be (Set("/tmp/a.jar"))
}
// scalastyle:on println

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ class SparkILoop(
if (Utils.isWindows) {
// Strip any URI scheme prefix so we can add the correct path to the classpath
// e.g. file:/C:/my/path.jar -> C:/my/path.jar
SparkILoop.getAddedJars.map { jar => new URI(jar).getPath.stripPrefix("/") }
getAddedJars().map { jar => new URI(jar).getPath.stripPrefix("/") }
} else {
// We need new URI(jar).getPath here for the case that `jar` includes encoded white space (%20).
SparkILoop.getAddedJars.map { jar => new URI(jar).getPath }
getAddedJars().map { jar => new URI(jar).getPath }
}
// work around for Scala bug
val totalClassPath = addedJars.foldLeft(
Expand Down Expand Up @@ -1005,7 +1005,7 @@ class SparkILoop(
@DeveloperApi
def createSparkSession(): SparkSession = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
val jars = SparkILoop.getAddedJars
val jars = getAddedJars()
val conf = new SparkConf()
.setMaster(getMaster())
.setJars(jars)
Expand Down Expand Up @@ -1060,22 +1060,30 @@ class SparkILoop(

@deprecated("Use `process` instead", "2.9.0")
private def main(settings: Settings): Unit = process(settings)
}

object SparkILoop extends Logging {
implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp
private def echo(msg: String) = Console println msg

def getAddedJars: Array[String] = {
private[repl] def getAddedJars(): Array[String] = {
val conf = new SparkConf().setMaster(getMaster())
val envJars = sys.env.get("ADD_JARS")
if (envJars.isDefined) {
logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead")
}
val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) }
val jars = propJars.orElse(envJars).getOrElse("")
val jars = {
val userJars = Utils.getUserJars(conf)
if (userJars.isEmpty) {
envJars.getOrElse("")
} else {
userJars.mkString(",")
}
}
Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
}

}

object SparkILoop extends Logging {
implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp
private def echo(msg: String) = Console println msg

// Designed primarily for use by test code: take a String with a
// bunch of code, and prints out a transcript of what it would look
// like if you'd just typed it into the repl.
Expand Down Expand Up @@ -1109,7 +1117,7 @@ object SparkILoop extends Logging {
if (settings.classpath.isDefault)
settings.classpath.value = sys.props("java.class.path")

getAddedJars.map(jar => new URI(jar).getPath).foreach(settings.classpath.append(_))
repl.getAddedJars().map(jar => new URI(jar).getPath).foreach(settings.classpath.append(_))

repl process settings
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
val jars = conf.getOption("spark.jars")
.map(_.replace(",", File.pathSeparator))
.getOrElse("")
val jars = Utils.getUserJars(conf).mkString(File.pathSeparator)
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
Expand Down

0 comments on commit feaba97

Please sign in to comment.