Skip to content

Commit

Permalink
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
Browse files Browse the repository at this point in the history
…ng-web-ui

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
	streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
  • Loading branch information
tdas committed Apr 11, 2014
2 parents 914b8ff + 585cd65 commit caa5e05
Show file tree
Hide file tree
Showing 32 changed files with 183 additions and 248 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ class SparkContext(config: SparkConf) extends Logging {

// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.start()
ui.bind()

// Optionally log Spark events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@

package org.apache.spark.deploy.history

import javax.servlet.http.HttpServletRequest

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler._
Expand All @@ -45,15 +42,15 @@ import org.apache.spark.util.Utils
*/
class HistoryServer(
val baseLogDir: String,
securityManager: SecurityManager,
conf: SparkConf)
extends WebUI(new SecurityManager(conf)) with Logging {
extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {

import HistoryServer._

private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
private val port = WEB_UI_PORT

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTime = -1L
Expand Down Expand Up @@ -90,30 +87,20 @@ class HistoryServer(
// A mapping of application ID to its history information, which includes the rendered UI
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()

initialize()

/**
* Start the history server.
* Initialize the history server.
*
* This starts a background thread that periodically synchronizes information displayed on
* this UI with the event logs in the provided base directory.
*/
def start() {
def initialize() {
attachPage(new IndexPage(this))
attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
logCheckingThread.start()
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to bind HistoryServer", e)
System.exit(1)
}
}

/**
* Check for any updates to event logs in the base directory. This is only effective once
* the server has been bound.
Expand Down Expand Up @@ -179,12 +166,11 @@ class HistoryServer(
val path = logDir.getPath
val appId = path.getName
val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
replayBus.replay()
if (appListener.applicationStarted) {
attachUI(ui)
Expand Down Expand Up @@ -267,9 +253,9 @@ object HistoryServer {

def main(argStrings: Array[String]) {
val args = new HistoryServerArguments(argStrings)
val server = new HistoryServer(args.logDir, conf)
val securityManager = new SecurityManager(conf)
val server = new HistoryServer(args.logDir, securityManager, conf)
server.bind()
server.start()

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}

private[spark] class IndexPage(parent: HistoryServer) extends UIPage("") {
private[spark] class IndexPage(parent: HistoryServer) extends WebUIPage("") {

override def render(request: HttpServletRequest): Seq[Node] = {
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ private[spark] class Master(
logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
Expand Down Expand Up @@ -670,7 +669,6 @@ private[spark] class Master(
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(
new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
ui.start()
replayBus.replay()
app.desc.appUiUrl = ui.basePath
appIdToUI(app.id) = ui
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class ApplicationPage(parent: MasterWebUI)
extends UIPage("app", includeJson = true) {
extends WebUIPage("app", includeJson = true) {

private val master = parent.masterActorRef
private val timeout = parent.timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) {
private[spark] class IndexPage(parent: MasterWebUI) extends WebUIPage("", includeJson = true) {
private val master = parent.masterActorRef
private val timeout = parent.timeout

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{SparkUI, WebUI}
Expand All @@ -30,34 +28,22 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr) with Logging {
extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {

private val host = Utils.localHostName()
private val port = requestedPort
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)

initialize()

/** Initialize all components of the server. */
def start() {
def initialize() {
attachPage(new ApplicationPage(this))
attachPage(new IndexPage(this))
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf))
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Master web UI", e)
System.exit(1)
}
}

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ private[spark] class Worker(
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
webUi.bind()
registerWithMaster()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) {
private[spark] class IndexPage(parent: WorkerWebUI) extends WebUIPage("", includeJson = true) {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class LogPage(parent: WorkerWebUI) extends UIPage("logPage") {
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
private val worker = parent.worker
private val workDir = parent.workDir

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,29 @@ package org.apache.spark.deploy.worker.ui
import java.io.File
import javax.servlet.http.HttpServletRequest

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.AkkaUtils

/**
* Web UI server for the standalone worker.
*/
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends WebUI(worker.securityMgr) with Logging {
class WorkerWebUI(
val worker: Worker,
val workDir: File,
port: Option[Int] = None)
extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf)
with Logging {

private val host = Utils.localHostName()
private val port = requestedPort.getOrElse(
worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
val timeout = AkkaUtils.askTimeout(worker.conf)

initialize()

/** Initialize all components of the server. */
def start() {
def initialize() {
val logPage = new LogPage(this)
attachPage(logPage)
attachPage(new IndexPage(this))
Expand All @@ -48,21 +51,13 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
(request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr))
worker.metricsSystem.getServletHandlers.foreach(attachHandler)
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf))
logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Worker web UI", e)
System.exit(1)
}
}
}

private[spark] object WorkerWebUI {
val DEFAULT_PORT = 8081
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR

def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
}
}
45 changes: 17 additions & 28 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ import org.apache.spark.ui.env.EnvironmentTab
import org.apache.spark.ui.exec.ExecutorsTab
import org.apache.spark.ui.jobs.JobProgressTab
import org.apache.spark.ui.storage.BlockManagerTab
import org.apache.spark.util.Utils

/**
* Top level user interface for Spark.
*/
private[spark] class SparkUI(
val sc: SparkContext,
conf: SparkConf,
val conf: SparkConf,
val securityManager: SecurityManager,
val listenerBus: SparkListenerBus,
var appName: String,
val basePath: String = "")
extends WebUI(securityManager, basePath) with Logging {
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath)
with Logging {

def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
Expand All @@ -46,21 +46,14 @@ private[spark] class SparkUI(
// If SparkContext is not provided, assume the associated application is not live
val live = sc != null

private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)

// Maintain executor storage status through Spark events
val storageStatusListener = new StorageStatusListener
listenerBus.addListener(storageStatusListener)

/** Set the app name for this UI. */
def setAppName(name: String) {
appName = name
}
initialize()

/** Initialize all components of the server. */
def start() {
def initialize() {
listenerBus.addListener(storageStatusListener)
attachTab(new JobProgressTab(this))
attachTab(new BlockManagerTab(this))
attachTab(new EnvironmentTab(this))
Expand All @@ -72,22 +65,14 @@ private[spark] class SparkUI(
}
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf))
logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Spark web UI", e)
System.exit(1)
}
/** Set the app name for this UI. */
def setAppName(name: String) {
appName = name
}

/** Attach a tab to this UI, along with its corresponding listener if it exists. */
override def attachTab(tab: UITab) {
super.attachTab(tab)
tab.listener.foreach(listenerBus.addListener)
/** Register the given listener with the listener bus. */
def registerListener(listener: SparkListener) {
listenerBus.addListener(listener)
}

/** Stop the server behind this web interface. Only valid after bind(). */
Expand All @@ -96,10 +81,14 @@ private[spark] class SparkUI(
logInfo("Stopped Spark web UI at %s".format(appUIAddress))
}

private[spark] def appUIAddress = "http://" + publicHost + ":" + boundPort
private[spark] def appUIAddress = "http://" + publicHostName + ":" + boundPort
}

private[spark] object SparkUI {
val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"

def getUIPort(conf: SparkConf): Int = {
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
}
}
Loading

0 comments on commit caa5e05

Please sign in to comment.