Skip to content

Commit

Permalink
[SPARK-6175] Fix standalone executor log links when ephemeral ports o…
Browse files Browse the repository at this point in the history
…r SPARK_PUBLIC_DNS are used

This patch fixes two issues with the executor log viewing links added in Spark 1.3.  In standalone mode, the log URLs might include a port value of 0 rather than the actual bound port of the UI, which broke the ability to view logs from workers whose web UIs had been configured to bind to ephemeral ports.  In addition, the URLs used workers' local hostnames instead of respecting SPARK_PUBLIC_DNS, which prevented this feature from working properly on Spark EC2 clusters because the links would point to internal DNS names instead of external ones.

I included tests for both of these bugs:

- We now browse to the URLs and verify that they point to the expected pages.
- To test SPARK_PUBLIC_DNS, I changed the code that reads the environment variable to do so via `SparkConf.getenv`, then used a custom SparkConf subclass to mock the environment variable (this pattern is used elsewhere in Spark's tests).

Author: Josh Rosen <[email protected]>

Closes apache#4903 from JoshRosen/SPARK-6175 and squashes the following commits:

5577f41 [Josh Rosen] Remove println
cfec135 [Josh Rosen] Use webUi.boundPort and publicAddress in log links
27918c7 [Josh Rosen] Add failing unit tests for standalone log URL viewing
c250fbe [Josh Rosen] Respect SparkConf in local-cluster Workers.
422a2ef [Josh Rosen] Use conf.getenv to read SPARK_PUBLIC_DNS
  • Loading branch information
JoshRosen committed Mar 5, 2015
1 parent 0bfacd5 commit 424a86a
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LocalSparkCluster(
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum))
memoryPerWorker, masters, null, Some(workerNum), _conf)
workerActorSystems += workerSystem
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class Master(
val webUi = new MasterWebUI(this, webUiPort)

val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val webUiPort: Int,
val publicAddress: String,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
Expand Down Expand Up @@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

// Add webUI log urls
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private[spark] class Worker(
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)

val publicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
var webUi: WorkerWebUI = null
Expand Down Expand Up @@ -362,7 +362,8 @@ private[spark] class Worker(
self,
workerId,
host,
webUiPort,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
akkaUrl,
Expand Down Expand Up @@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None): (ActorSystem, Int) = {
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostName()
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)

def getBasePath: String = basePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
new File("sparkHome"), new File("workDir"), "akka://worker",
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,69 @@

package org.apache.spark.deploy

import java.net.URL

import scala.collection.mutable
import scala.io.Source

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.FunSuite

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
import org.apache.spark.{SparkContext, LocalSparkContext}
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}

class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
private val WAIT_TIMEOUT_MILLIS = 10000

before {
test("verify that correct log urls get propagated from workers") {
sc = new SparkContext("local-cluster[2,1,512]", "test")

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
info.logUrlMap.foreach { case (logType, logUrl) =>
val html = Source.fromURL(logUrl).mkString
assert(html.contains(s"$logType log page"))
}
}
}

test("verify log urls get propagated from workers") {
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
val SPARK_PUBLIC_DNS = "public_dns"
class MySparkConf extends SparkConf(false) {
override def getenv(name: String) = {
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
else super.getenv(name)
}

override def clone: SparkConf = {
new MySparkConf().setAll(getAll)
}
}
val conf = new MySparkConf()
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count()
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
info.logUrlMap.values.foreach { logUrl =>
assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
"publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
assert(builder.command().last === appId)
Expand Down

0 comments on commit 424a86a

Please sign in to comment.