Skip to content

Commit

Permalink
Add HistoryServer and scripts ++ Refactor WebUI interface
Browse files Browse the repository at this point in the history
HistoryServer can be launched with ./sbin/start-history-server.sh <log-dir>
and stopped with ./sbin/stop-history-server.sh. This commit also involves
refactoring all the UIs to avoid duplicate code.
  • Loading branch information
andrewor14 committed Mar 20, 2014
1 parent ffe272d commit c086bd5
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ private[spark] object DeployWebUI {
return "%.0f min".format(minutes)
}
val hours = minutes / 60
return "%.1f h".format(hours)
"%.1f h".format(hours)
}
}
50 changes: 50 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import org.apache.spark.ui.{SparkUI, WebUI}

private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) {

/** Attach a SparkUI to this container. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined,
"%s must be bound to a server before attaching SparkUIs".format(name))
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
rootHandler.addHandler(handler)
if (!handler.isStarted) {
handler.start()
}
}
}

/** Detach a SparkUI from this container. Only valid after bind(). */
def detachUI(ui: SparkUI) {
assert(serverInfo.isDefined,
"%s must be bound to a server before detaching SparkUIs".format(name))
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
if (handler.isStarted) {
handler.stop()
}
rootHandler.removeHandler(handler)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.net.URI
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkUIContainer
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.ReplayListenerBus

/**
* A web server that re-renders SparkUIs of finished applications.
*
* For the standalone mode, MasterWebUI already achieves this functionality. Thus, the
* main use case of the HistoryServer is in other deploy modes (e.g. Yarn or Mesos).
*
* The logging directory structure is as follows: Within the given base directory, each
* application's event logs are maintained in the application's own sub-directory.
*
* @param baseLogDir The base directory in which event logs are found
* @param requestedPort The requested port to which this server is to be bound
*/
class HistoryServer(baseLogDir: String, requestedPort: Int, conf: SparkConf)
extends SparkUIContainer("History Server") with Logging {

private val host = Utils.localHostName()
private val port = requestedPort
private val indexPage = new IndexPage(this)
private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir))
private val securityManager = new SecurityManager(conf)

private val handlers = Seq[ServletContextHandler](
createStaticHandler(HistoryServer.STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
)

// A mapping from an event log path to the associated, already rendered, SparkUI
val logPathToUI = mutable.HashMap[String, SparkUI]()

// A mapping from an event log path to a timestamp of when it was last updated
val logPathToLastUpdated = mutable.HashMap[String, Long]()

/** Bind to the HTTP server behind this web interface */
override def bind() {
try {
serverInfo = Some(startJettyServer(host, port, handlers, conf))
logInfo("Started HistoryServer at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create HistoryServer", e)
System.exit(1)
}
checkForLogs()
}

/**
* Check for any updated event logs.
*
* If a new application is found, render the associated SparkUI and remember it.
* If an existing application is updated, re-render the associated SparkUI.
* If an existing application is removed, remove the associated SparkUI.
*/
def checkForLogs() {
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

// Render any missing or outdated SparkUI
logDirs.foreach { dir =>
val path = dir.getPath.toString
val lastUpdated = dir.getModificationTime
if (!logPathToLastUpdated.contains(path) ||
logPathToLastUpdated.getOrElse(path, -1L) < lastUpdated) {
maybeRenderUI(path, lastUpdated)
}
}

// Remove any outdated SparkUIs
val logPaths = logDirs.map(_.getPath.toString)
logPathToUI.keys.foreach { path =>
if (!logPaths.contains(path)) {
logPathToUI.remove(path)
logPathToLastUpdated.remove(path)
}
}

logWarning("By the end of check for logs, the map looks like")
logPathToUI.foreach { case (k, v) => logWarning("* %s".format(k)) }
}

/** Attempt to render a new SparkUI from event logs residing in the given log directory. */
def maybeRenderUI(logPath: String, lastUpdated: Long) {
logWarning("Maybe rendering UI %s".format(logPath))

val appName = logPath.split("/").last
val replayBus = new ReplayListenerBus(conf)
val ui = new SparkUI(conf, replayBus, appName, "/history/%s".format(appName))

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayBus.replay(logPath)
logWarning("Just replayed the events. Successful? %s".format(success))
if (success) {
attachUI(ui)
logPathToUI(logPath) = ui
logPathToLastUpdated(logPath) = lastUpdated
}
}

}

object HistoryServer {
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR

def main(argStrings: Array[String]) {
val conf = new SparkConf
val args = new HistoryServerArguments(argStrings, conf)
val server = new HistoryServer(args.logDir, args.port, conf)
server.bind()

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.net.URI

import org.apache.spark.SparkConf
import org.apache.spark.util.{Utils, IntParam}
import org.apache.hadoop.fs.Path

/**
* Command-line parser for the master.
*/
private[spark] class HistoryServerArguments(args: Array[String], conf: SparkConf) {
var port = 18080
var logDir = ""

parse(args.toList)

def parse(args: List[String]): Unit = {
args match {
case ("--port" | "-p") :: IntParam(value) :: tail =>
port = value
parse(tail)

case ("--dir" | "-d") :: value :: tail =>
logDir = value
parse(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case Nil => {}

case _ =>
printUsageAndExit(1)
}
validateLogDir()
}

def validateLogDir() {
if (logDir == "") {
System.err.println("Logging directory must be specified.")
printUsageAndExit(1)
}
val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
val path = new Path(logDir)
if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) {
System.err.println("Logging directory specified is invalid: %s".format(logDir))
printUsageAndExit(1)
}
}

/**
* Print usage and exit JVM with the given exit code.
*/
def printUsageAndExit(exitCode: Int) {
System.err.println(
"Usage: HistoryServer [options]\n" +
"\n" +
"Options:\n" +
" -p PORT, --port PORT Port for web server (default: 18080)\n" +
" -d DIR, --dir DIR Location of event log files")
System.exit(exitCode)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.deploy.DeployWebUI
import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: HistoryServer) {

def render(request: HttpServletRequest): Seq[Node] = {
val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
<li>
<strong>Welcome to the Fastest and Furious-est HistoryServer in the World!</strong>
</li>
{
parent.logPathToUI.map { case (path, ui) =>
<li>{path} at {ui.basePath}</li>
}
}
</ul>
</div>
</div>

UIUtils.basicSparkPage(content, "History Server")
}

def appRow(app: ApplicationInfo): Seq[Node] = {
<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
</td>
<td>
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
</td>
<td>
{app.coresGranted}
</td>
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{DeployWebUI.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
<td>{DeployWebUI.formatDuration(app.duration)}</td>
</tr>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -671,16 +671,16 @@ private[spark] class Master(
appConf.set("spark.eventLog.compress", "true")
appConf.set("spark.io.compression.codec", codec)
}
val replayerBus = new ReplayListenerBus(appConf)
val replayBus = new ReplayListenerBus(appConf)
val ui = new SparkUI(
appConf,
replayerBus,
replayBus,
"%s (finished)".format(appName),
"/history/%s".format(app.id))

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayerBus.replay(eventLogDir)
val success = replayBus.replay(eventLogDir)
if (!success) {
ui.stop()
None
Expand Down
Loading

0 comments on commit c086bd5

Please sign in to comment.