Skip to content

Commit

Permalink
[SPARK-7657] [YARN] Add driver logs links in application UI, in clust…
Browse files Browse the repository at this point in the history
…er mode.

This PR adds the URLs to the driver logs to `SparkListenerApplicationStarted` event, which is later used by the `ExecutorsListener` to populate the URLs to the driver logs in its own state. This info is then used when the UI is rendered to display links to the logs.

Author: Hari Shreedharan <[email protected]>

Closes apache#6166 from harishreedharan/am-log-link and squashes the following commits:

943fc4f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link
9e5c04b [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link
b3f9b9d [Hari Shreedharan] Updated comment based on feedback.
0840a95 [Hari Shreedharan] Move the result and sc.stop back to original location, minor import changes.
537a2f7 [Hari Shreedharan] Add test to ensure the log urls are populated and valid.
4033725 [Hari Shreedharan] Adding comments explaining how node reports are used to get the log urls.
6c5c285 [Hari Shreedharan] Import order.
346f4ea [Hari Shreedharan] Review feedback fixes.
629c1dc [Hari Shreedharan] Cleanup.
99fb1a3 [Hari Shreedharan] Send the log urls in App start event, to ensure that other listeners are not affected.
c0de336 [Hari Shreedharan] Ensure new unit test cleans up after itself.
50cdae3 [Hari Shreedharan] Added unit test, made the approach generic.
402e8e4 [Hari Shreedharan] Use `NodeReport` to get the URL for the logs. Also, make the environment variables generic so other cluster managers can use them as well.
1cf338f [Hari Shreedharan] [SPARK-7657][YARN] Add driver link in application UI, in cluster mode.
  • Loading branch information
harishreedharan authored and squito committed May 22, 2015
1 parent 85b9637 commit 956c4c9
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 12 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId))
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
}

/** Post the application end event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ private[spark] trait SchedulerBackend {
*/
def applicationAttemptId(): Option[String] = None

/**
* Get the URLs for the driver logs. These URLs are used to display the links in the UI
* Executors tab for the driver.
* @return Map containing the log names and their respective URLs
*/
def getDriverLogUrls: Option[Map[String, String]] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,13 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, appId: Option[String],
time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent
case class SparkListenerApplicationStart(
appName: String,
appId: Option[String],
time: Long,
sparkUser: String,
appAttemptId: Option[String],
driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec

import scala.collection.mutable.HashMap

import org.apache.spark.ExceptionFailure
import org.apache.spark.{ExceptionFailure, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
Expand Down Expand Up @@ -73,6 +73,16 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
uiData.finishReason = Some(executorRemoved.reason)
}

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
val storageStatus = storageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
}
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ private[spark] object JsonProtocol {
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser) ~
("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing))
}

def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
Expand Down Expand Up @@ -570,7 +571,8 @@ private[spark] object JsonProtocol {
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson)
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs)
}

def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg

/** Returns the attempt ID. */
def getAttemptId(): ApplicationAttemptId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
val containerId = ConverterUtils.toContainerId(containerIdString)
containerId.getApplicationAttemptId()
YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
}

/** Returns the configuration for the AmIpFilter to add to the Spark UI. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.util.ConverterUtils

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
Expand Down Expand Up @@ -136,6 +137,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
tokenRenewer.foreach(_.stop())
}

private[spark] def getContainerId: ContainerId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
ConverterUtils.toContainerId(containerIdString)
}
}

object YarnSparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@

package org.apache.spark.scheduler.cluster

import java.net.NetworkInterface

import scala.collection.JavaConverters._

import org.apache.hadoop.yarn.api.records.NodeState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.IntParam
import org.apache.spark.util.{IntParam, Utils}

private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
Expand Down Expand Up @@ -53,4 +62,70 @@ private[spark] class YarnClusterSchedulerBackend(
logError("Application attempt ID is not set.")
super.applicationAttemptId
}

override def getDriverLogUrls: Option[Map[String, String]] = {
var yarnClientOpt: Option[YarnClient] = None
var driverLogs: Option[Map[String, String]] = None
try {
val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
val containerId = YarnSparkHadoopUtil.get.getContainerId
yarnClientOpt = Some(YarnClient.createYarnClient())
yarnClientOpt.foreach { yarnClient =>
yarnClient.init(yarnConf)
yarnClient.start()

// For newer versions of YARN, we can find the HTTP address for a given node by getting a
// container report for a given container. But container reports came only in Hadoop 2.4,
// so we basically have to get the node reports for all nodes and find the one which runs
// this container. For that we have to compare the node's host against the current host.
// Since the host can have multiple addresses, we need to compare against all of them to
// find out if one matches.

// Get all the addresses of this node.
val addresses =
NetworkInterface.getNetworkInterfaces.asScala
.flatMap(_.getInetAddresses.asScala)
.toSeq

// Find a node report that matches one of the addresses
val nodeReport =
yarnClient.getNodeReports(NodeState.RUNNING).asScala.find { x =>
val host = x.getNodeId.getHost
addresses.exists { address =>
address.getHostAddress == host ||
address.getHostName == host ||
address.getCanonicalHostName == host
}
}

// Now that we have found the report for the Node Manager that the AM is running on, we
// can get the base HTTP address for the Node manager from the report.
// The format used for the logs for each container is well-known and can be constructed
// using the NM's HTTP address and the container ID.
// The NM may be running several containers, but we can build the URL for the AM using
// the AM's container ID, which we already know.
nodeReport.foreach { report =>
val httpAddress = report.getHttpAddress
// lookup appropriate http scheme for container log urls
val yarnHttpPolicy = yarnConf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
)
val user = Utils.getCurrentUserName()
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
logDebug(s"Base URL for logs: $baseUrl")
driverLogs = Some(
Map("stderr" -> s"$baseUrl/stderr?start=0", "stdout" -> s"$baseUrl/stdout?start=0"))
}
}
} catch {
case e: Exception =>
logInfo("Node Report API is not available in the version of YARN being used, so AM" +
" logs link will not appear in application UI", e)
} finally {
yarnClientOpt.foreach(_.close())
}
driverLogs
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.io.Source

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.ByteStreams
Expand All @@ -33,7 +34,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
SparkListenerExecutorAdded}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -290,10 +292,15 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit

private[spark] class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
var driverLogs: Option[collection.Map[String, String]] = None

override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}

override def onApplicationStart(appStart: SparkListenerApplicationStart): Unit = {
driverLogs = appStart.driverLogs
}
}

private object YarnClusterDriver extends Logging with Matchers {
Expand All @@ -314,6 +321,7 @@ private object YarnClusterDriver extends Logging with Matchers {
val sc = new SparkContext(new SparkConf()
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
val conf = sc.getConf
val status = new File(args(0))
var result = "failure"
try {
Expand All @@ -335,6 +343,20 @@ private object YarnClusterDriver extends Logging with Matchers {
executorInfos.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}

// If we are running in yarn-cluster mode, verify that driver logs are downloadable.
if (conf.get("spark.master") == "yarn-cluster") {
assert(listener.driverLogs.nonEmpty)
val driverLogs = listener.driverLogs.get
assert(driverLogs.size === 2)
assert(driverLogs.containsKey("stderr"))
assert(driverLogs.containsKey("stdout"))
val stderr = driverLogs("stderr") // YARN puts everything in stderr.
val lines = Source.fromURL(stderr).getLines()
// Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
// cluster mode.
assert(lines.exists(_.contains("YarnClusterSchedulerBackend")))
}
}

}
Expand Down

0 comments on commit 956c4c9

Please sign in to comment.