From 8aac16355329809b11c76430fa8737d328f2e962 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 20 Mar 2014 14:34:34 -0700 Subject: [PATCH] Add basic application table --- .../spark/deploy/history/HistoryServer.scala | 21 +++---- .../spark/deploy/history/IndexPage.scala | 58 +++++++++---------- 2 files changed, 38 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 09f5d63b6c907..623e136d8b7a3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -44,10 +44,10 @@ import org.apache.spark.scheduler.ReplayListenerBus * @param baseLogDir The base directory in which event logs are found * @param requestedPort The requested port to which this server is to be bound */ -class HistoryServer(baseLogDir: String, requestedPort: Int, conf: SparkConf) +class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf) extends SparkUIContainer("History Server") with Logging { - private val host = Utils.localHostName() + private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName()) private val port = requestedPort private val indexPage = new IndexPage(this) private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir)) @@ -101,29 +101,24 @@ class HistoryServer(baseLogDir: String, requestedPort: Int, conf: SparkConf) // Remove any outdated SparkUIs val logPaths = logDirs.map(_.getPath.toString) - logPathToUI.keys.foreach { path => + logPathToUI.foreach { case (path, ui) => if (!logPaths.contains(path)) { + detachUI(ui) logPathToUI.remove(path) logPathToLastUpdated.remove(path) } } - - logWarning("By the end of check for logs, the map looks like") - logPathToUI.foreach { case (k, v) => logWarning("* %s".format(k)) } } /** Attempt to render a new SparkUI from event logs residing in the given log directory. */ def maybeRenderUI(logPath: String, lastUpdated: Long) { - logWarning("Maybe rendering UI %s".format(logPath)) - - val appName = logPath.split("/").last + val appName = getAppName(logPath) val replayBus = new ReplayListenerBus(conf) val ui = new SparkUI(conf, replayBus, appName, "/history/%s".format(appName)) // Do not call ui.bind() to avoid creating a new server for each application ui.start() val success = replayBus.replay(logPath) - logWarning("Just replayed the events. Successful? %s".format(success)) if (success) { attachUI(ui) logPathToUI(logPath) = ui @@ -131,6 +126,12 @@ class HistoryServer(baseLogDir: String, requestedPort: Int, conf: SparkConf) } } + /** Parse app name from the given log path. */ + def getAppName(logPath: String): String = logPath.split("/").last + + /** Return the address of this server. */ + def getAddress = "http://" + host + ":" + boundPort + } object HistoryServer { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala index 2200e41898942..acd8ff064d3bd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala @@ -17,30 +17,36 @@ package org.apache.spark.deploy.history +import java.text.SimpleDateFormat +import java.util.Date import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.deploy.DeployWebUI -import org.apache.spark.deploy.master.ApplicationInfo -import org.apache.spark.ui.UIUtils -import org.apache.spark.util.Utils +import org.apache.spark.ui.{SparkUI, UIUtils} private[spark] class IndexPage(parent: HistoryServer) { + val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") def render(request: HttpServletRequest): Seq[Node] = { + // Check if logs have been updated + parent.checkForLogs() + + // Populate app table, with most recently modified first + val appRows = parent.logPathToLastUpdated.toSeq + .sortBy { case (path, lastUpdated) => -lastUpdated } + .map { case (path, lastUpdated) => + // (appName, lastUpdated, UI) + (parent.getAppName(path), lastUpdated, parent.logPathToUI(path)) + } + val appTable = UIUtils.listingTable(appHeader, appRow, appRows) + val content =
    -
  • - Welcome to the Fastest and Furious-est HistoryServer in the World! -
  • - { - parent.logPathToUI.map { case (path, ui) => -
  • {path} at {ui.basePath}
  • - } - } +
  • Event Log Location: {parent.baseLogDir}
  • +

    Applications

    {appTable}
@@ -48,24 +54,14 @@ private[spark] class IndexPage(parent: HistoryServer) { UIUtils.basicSparkPage(content, "History Server") } - def appRow(app: ApplicationInfo): Seq[Node] = { - - - {app.id} - - - {app.desc.name} - - - {app.coresGranted} - - - {Utils.megabytesToString(app.desc.memoryPerSlave)} - - {DeployWebUI.formatDate(app.submitDate)} - {app.desc.user} - {app.state.toString} - {DeployWebUI.formatDuration(app.duration)} - + private val appHeader = Seq[String]("App Name", "Last Updated") + + private def appRow(info: (String, Long, SparkUI)): Seq[Node] = { + info match { case (appName, lastUpdated, ui) => + + {appName} + {dateFmt.format(new Date(lastUpdated))} + + } } }