diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c9af0778bdb29..28923a1d8c340 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -212,7 +212,6 @@ class SparkContext(config: SparkConf) extends Logging { // Initialize the Spark UI, registering all associated listeners private[spark] val ui = new SparkUI(this) - ui.start() ui.bind() // Optionally log Spark events diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 16abfe920da72..df3c394bacfa9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -17,12 +17,9 @@ package org.apache.spark.deploy.history -import javax.servlet.http.HttpServletRequest - import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, Path} -import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.scheduler._ @@ -45,15 +42,15 @@ import org.apache.spark.util.Utils */ class HistoryServer( val baseLogDir: String, + securityManager: SecurityManager, conf: SparkConf) - extends WebUI(new SecurityManager(conf)) with Logging { + extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging { import HistoryServer._ private val fileSystem = Utils.getHadoopFileSystem(baseLogDir) private val localHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) - private val port = WEB_UI_PORT // A timestamp of when the disk was last accessed to check for log updates private var lastLogCheckTime = -1L @@ -90,30 +87,20 @@ class HistoryServer( // A mapping of application ID to its history information, which includes the rendered UI val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]() + initialize() + /** - * Start the history server. + * Initialize the history server. * * This starts a background thread that periodically synchronizes information displayed on * this UI with the event logs in the provided base directory. */ - def start() { + def initialize() { attachPage(new IndexPage(this)) attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static")) logCheckingThread.start() } - /** Bind to the HTTP server behind this web interface. */ - def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf)) - logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort)) - } catch { - case e: Exception => - logError("Failed to bind HistoryServer", e) - System.exit(1) - } - } - /** * Check for any updates to event logs in the base directory. This is only effective once * the server has been bound. @@ -179,12 +166,11 @@ class HistoryServer( val path = logDir.getPath val appId = path.getName val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec) - val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId) val appListener = new ApplicationEventListener replayBus.addListener(appListener) + val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId) // Do not call ui.bind() to avoid creating a new server for each application - ui.start() replayBus.replay() if (appListener.applicationStarted) { attachUI(ui) @@ -267,9 +253,9 @@ object HistoryServer { def main(argStrings: Array[String]) { val args = new HistoryServerArguments(argStrings) - val server = new HistoryServer(args.logDir, conf) + val securityManager = new SecurityManager(conf) + val server = new HistoryServer(args.logDir, securityManager, conf) server.bind() - server.start() // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala index eeb22ab000558..69a6baa4aaeab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala @@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} -private[spark] class IndexPage(parent: HistoryServer) extends UIPage("") { +private[spark] class IndexPage(parent: HistoryServer) extends WebUIPage("") { override def render(request: HttpServletRequest): Seq[Node] = { val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9c90c4b4d11ef..076bb92bf2a10 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -118,7 +118,6 @@ private[spark] class Master( logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - webUi.start() webUi.bind() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) @@ -670,7 +669,6 @@ private[spark] class Master( val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) val ui = new SparkUI( new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id) - ui.start() replayBus.replay() app.desc.appUiUrl = ui.basePath appIdToUI(app.id) = ui diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 24282048b842e..d8c3321ea51ec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -28,11 +28,11 @@ import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils private[spark] class ApplicationPage(parent: MasterWebUI) - extends UIPage("app", includeJson = true) { + extends WebUIPage("app", includeJson = true) { private val master = parent.masterActorRef private val timeout = parent.timeout diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index f011c830a02da..3d2ad04110b77 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -28,10 +28,10 @@ import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) { +private[spark] class IndexPage(parent: MasterWebUI) extends WebUIPage("", includeJson = true) { private val master = parent.masterActorRef private val timeout = parent.timeout diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index d0f1a9bc9ffd1..965f7a0fac9e2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy.master.ui -import javax.servlet.http.HttpServletRequest - import org.apache.spark.Logging import org.apache.spark.deploy.master.Master import org.apache.spark.ui.{SparkUI, WebUI} @@ -30,15 +28,15 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) - extends WebUI(master.securityMgr) with Logging { + extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging { - private val host = Utils.localHostName() - private val port = requestedPort val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) + initialize() + /** Initialize all components of the server. */ - def start() { + def initialize() { attachPage(new ApplicationPage(this)) attachPage(new IndexPage(this)) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) @@ -46,18 +44,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler) } - /** Bind to the HTTP server behind this web interface. */ - def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf)) - logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Master web UI", e) - System.exit(1) - } - } - /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ def attachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 08ad87957c3d4..52c164ca3c574 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -130,7 +130,6 @@ private[spark] class Worker( createWorkDir() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) - webUi.start() webUi.bind() registerWithMaster() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index bf7d552101484..42ef8ed703779 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -28,10 +28,10 @@ import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) { +private[spark] class IndexPage(parent: WorkerWebUI) extends WebUIPage("", includeJson = true) { val workerActor = parent.worker.self val worker = parent.worker val timeout = parent.timeout diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala index f57900c99ce3d..8f6b36faf85ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -22,10 +22,10 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class LogPage(parent: WorkerWebUI) extends UIPage("logPage") { +private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { private val worker = parent.worker private val workDir = parent.workDir diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index c1fdc5cea173c..34b5acd2f9b64 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -20,26 +20,29 @@ package org.apache.spark.deploy.worker.ui import java.io.File import javax.servlet.http.HttpServletRequest -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.worker.Worker import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.AkkaUtils /** * Web UI server for the standalone worker. */ private[spark] -class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) - extends WebUI(worker.securityMgr) with Logging { +class WorkerWebUI( + val worker: Worker, + val workDir: File, + port: Option[Int] = None) + extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf) + with Logging { - private val host = Utils.localHostName() - private val port = requestedPort.getOrElse( - worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT)) val timeout = AkkaUtils.askTimeout(worker.conf) + initialize() + /** Initialize all components of the server. */ - def start() { + def initialize() { val logPage = new LogPage(this) attachPage(logPage) attachPage(new IndexPage(this)) @@ -48,21 +51,13 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I (request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr)) worker.metricsSystem.getServletHandlers.foreach(attachHandler) } - - /** Bind to the HTTP server behind this web interface. */ - def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf)) - logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Worker web UI", e) - System.exit(1) - } - } } private[spark] object WorkerWebUI { val DEFAULT_PORT = 8081 val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR + + def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = { + requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT)) + } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index f2633dfa8abd7..2eda1aff5ac73 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -25,19 +25,19 @@ import org.apache.spark.ui.env.EnvironmentTab import org.apache.spark.ui.exec.ExecutorsTab import org.apache.spark.ui.jobs.JobProgressTab import org.apache.spark.ui.storage.BlockManagerTab -import org.apache.spark.util.Utils /** * Top level user interface for Spark. */ private[spark] class SparkUI( val sc: SparkContext, - conf: SparkConf, + val conf: SparkConf, val securityManager: SecurityManager, val listenerBus: SparkListenerBus, var appName: String, val basePath: String = "") - extends WebUI(securityManager, basePath) with Logging { + extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath) + with Logging { def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName) def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = @@ -46,21 +46,14 @@ private[spark] class SparkUI( // If SparkContext is not provided, assume the associated application is not live val live = sc != null - private val bindHost = Utils.localHostName() - private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) - private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) - // Maintain executor storage status through Spark events val storageStatusListener = new StorageStatusListener - listenerBus.addListener(storageStatusListener) - /** Set the app name for this UI. */ - def setAppName(name: String) { - appName = name - } + initialize() /** Initialize all components of the server. */ - def start() { + def initialize() { + listenerBus.addListener(storageStatusListener) attachTab(new JobProgressTab(this)) attachTab(new BlockManagerTab(this)) attachTab(new EnvironmentTab(this)) @@ -72,22 +65,14 @@ private[spark] class SparkUI( } } - /** Bind to the HTTP server behind this web interface. */ - def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf)) - logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Spark web UI", e) - System.exit(1) - } + /** Set the app name for this UI. */ + def setAppName(name: String) { + appName = name } - /** Attach a tab to this UI, along with its corresponding listener if it exists. */ - override def attachTab(tab: UITab) { - super.attachTab(tab) - tab.listener.foreach(listenerBus.addListener) + /** Register the given listener with the listener bus. */ + def registerListener(listener: SparkListener) { + listenerBus.addListener(listener) } /** Stop the server behind this web interface. Only valid after bind(). */ @@ -96,10 +81,14 @@ private[spark] class SparkUI( logInfo("Stopped Spark web UI at %s".format(appUIAddress)) } - private[spark] def appUIAddress = "http://" + publicHost + ":" + boundPort + private[spark] def appUIAddress = "http://" + publicHostName + ":" + boundPort } private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" + + def getUIPort(conf: SparkConf): Int = { + conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) + } } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index b210c8d852898..d26109d06c186 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -135,10 +135,9 @@ private[spark] object UIUtils extends Logging { basePath: String, appName: String, title: String, - tabs: Seq[UITab], - activeTab: UITab, - refreshInterval: Option[Int] = None - ) : Seq[Node] = { + tabs: Seq[WebUITab], + activeTab: WebUITab, + refreshInterval: Option[Int] = None): Seq[Node] = { val header = tabs.map { tab =>
  • diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 126a7ff2f6080..655239089015c 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -25,8 +25,7 @@ import scala.xml.Node import org.eclipse.jetty.servlet.ServletContextHandler import org.json4s.JsonAST.{JNothing, JValue} -import org.apache.spark.SecurityManager -import org.apache.spark.scheduler.SparkListener +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -36,24 +35,31 @@ import org.apache.spark.util.Utils * Each WebUI represents a collection of tabs, each of which in turn represents a collection of * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. */ -private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") { - protected val tabs = ArrayBuffer[UITab]() +private[spark] abstract class WebUI( + securityManager: SecurityManager, + port: Int, + conf: SparkConf, + basePath: String = "") + extends Logging { + + protected val tabs = ArrayBuffer[WebUITab]() protected val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None + protected val localHostName = Utils.localHostName() + protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) + private val className = Utils.getFormattedClassName(this) - def getTabs: Seq[UITab] = tabs.toSeq + def getTabs: Seq[WebUITab] = tabs.toSeq def getHandlers: Seq[ServletContextHandler] = handlers.toSeq - def getListeners: Seq[SparkListener] = tabs.flatMap(_.listener) /** Attach a tab to this UI, along with all of its attached pages. */ - def attachTab(tab: UITab) { - tab.start() + def attachTab(tab: WebUITab) { tab.pages.foreach(attachPage) tabs += tab } /** Attach a page to this UI. */ - def attachPage(page: UIPage) { + def attachPage(page: WebUIPage) { val pagePath = "/" + page.prefix attachHandler(createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, basePath)) @@ -86,13 +92,20 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: } /** Initialize all components of the server. */ - def start() - - /** - * Bind to the HTTP server behind this web interface. - * Overridden implementation should set serverInfo. - */ - def bind() + def initialize() + + /** Bind to the HTTP server behind this web interface. */ + def bind() { + assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) + try { + serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf)) + logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) + } catch { + case e: Exception => + logError("Failed to bind %s".format(className), e) + System.exit(1) + } + } /** Return the actual port to which this server is bound. Only valid after bind(). */ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) @@ -100,39 +113,41 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: /** Stop the server behind this web interface. Only valid after bind(). */ def stop() { assert(serverInfo.isDefined, - "Attempted to stop %s before binding to a server!".format(Utils.getFormattedClassName(this))) + "Attempted to stop %s before binding to a server!".format(className)) serverInfo.get.server.stop() } } /** - * A tab that represents a collection of pages and a unit of listening for Spark events. - * Associating each tab with a listener is arbitrary and need not be the case. + * A tab that represents a collection of pages. */ -private[spark] abstract class UITab(val prefix: String) { - val pages = ArrayBuffer[UIPage]() - var listener: Option[SparkListener] = None - var name = prefix.capitalize +private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { + val pages = ArrayBuffer[WebUIPage]() + val name = prefix.capitalize /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */ - def attachPage(page: UIPage) { + def attachPage(page: WebUIPage) { page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") pages += page } - /** Initialize listener and attach pages. */ - def start() + /** Initialize this tab and attach all relevant pages. */ + def initialize() + + /** Get a list of header tabs from the parent UI. */ + def headerTabs: Seq[WebUITab] = parent.getTabs } /** * A page that represents the leaf node in the UI hierarchy. * + * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab. * If includeJson is true, the parent WebUI (direct or indirect) creates handlers for both the * HTML and the JSON content, rather than just the former. */ -private[spark] abstract class UIPage(var prefix: String, val includeJson: Boolean = false) { +private[spark] abstract class WebUIPage(var prefix: String, val includeJson: Boolean = false) { def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() def renderJson(request: HttpServletRequest): JValue = JNothing } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala index 6a2304f1ad42f..0f1ea7fa8d44d 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -20,21 +20,17 @@ package org.apache.spark.ui.env import org.apache.spark.scheduler._ import org.apache.spark.ui._ -private[ui] class EnvironmentTab(parent: SparkUI) extends UITab("environment") { +private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") { val appName = parent.appName val basePath = parent.basePath + val listener = new EnvironmentListener - def start() { - listener = Some(new EnvironmentListener) - attachPage(new IndexPage(this)) - } + initialize() - def environmentListener: EnvironmentListener = { - assert(listener.isDefined, "EnvironmentTab has not started yet!") - listener.get.asInstanceOf[EnvironmentListener] + def initialize() { + attachPage(new IndexPage(this)) + parent.registerListener(listener) } - - def headerTabs: Seq[UITab] = parent.getTabs } /** diff --git a/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala index bde672909bbcc..55a19774ed02d 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala @@ -21,12 +21,12 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIUtils, UIPage} +import org.apache.spark.ui.{UIUtils, WebUIPage} -private[ui] class IndexPage(parent: EnvironmentTab) extends UIPage("") { +private[ui] class IndexPage(parent: EnvironmentTab) extends WebUIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private val listener = parent.environmentListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { val runtimeInformationTable = UIUtils.listingTable( diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index c1f5ca856ffe1..843db7c8d956d 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -22,23 +22,19 @@ import scala.collection.mutable.HashMap import org.apache.spark.ExceptionFailure import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener -import org.apache.spark.ui.{SparkUI, UITab} +import org.apache.spark.ui.{SparkUI, WebUITab} -private[ui] class ExecutorsTab(parent: SparkUI) extends UITab("executors") { +private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") { val appName = parent.appName val basePath = parent.basePath + val listener = new ExecutorsListener(parent.storageStatusListener) - def start() { - listener = Some(new ExecutorsListener(parent.storageStatusListener)) - attachPage(new IndexPage(this)) - } + initialize() - def executorsListener: ExecutorsListener = { - assert(listener.isDefined, "ExecutorsTab has not started yet!") - listener.get.asInstanceOf[ExecutorsListener] + def initialize() { + attachPage(new IndexPage(this)) + parent.registerListener(listener) } - - def headerTabs: Seq[UITab] = parent.getTabs } /** diff --git a/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala index bc6a822b080c3..83c89c2fbca3e 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala @@ -21,13 +21,13 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[ui] class IndexPage(parent: ExecutorsTab) extends UIPage("") { +private[ui] class IndexPage(parent: ExecutorsTab) extends WebUIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private val listener = parent.executorsListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 31173e48d7a1e..c83e196c9c156 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.Utils /** Page showing executor summary */ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener def toNodeSeq: Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 2b54603af104e..f217965ea2053 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -22,15 +22,15 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, NodeSeq} import org.apache.spark.scheduler.Schedulable -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ -private[ui] class IndexPage(parent: JobProgressTab) extends UIPage("") { +private[ui] class IndexPage(parent: JobProgressTab) extends WebUIPage("") { private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler override def render(request: HttpServletRequest): Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 5167e20ea3d7d..18559f732d2a3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -222,12 +222,10 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { synchronized { - val schedulingModeName = - environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode") - schedulingMode = schedulingModeName match { - case Some(name) => Some(SchedulingMode.withName(name)) - case None => None - } + environmentUpdate + .environmentDetails("Spark Properties").toMap + .get("spark.scheduler.mode") + .map(SchedulingMode.withName) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index 93d26f7dd3632..7fe06b39346f5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -19,29 +19,25 @@ package org.apache.spark.ui.jobs import org.apache.spark.SparkConf import org.apache.spark.scheduler.SchedulingMode -import org.apache.spark.ui.{SparkUI, UITab} +import org.apache.spark.ui.{SparkUI, WebUITab} /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobProgressTab(parent: SparkUI) extends UITab("stages") { +private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stages") { val appName = parent.appName val basePath = parent.basePath val live = parent.live val sc = parent.sc + val conf = if (live) sc.conf else new SparkConf + val listener = new JobProgressListener(conf) - def start() { - val conf = if (live) sc.conf else new SparkConf - listener = Some(new JobProgressListener(conf)) + initialize() + + def initialize() { attachPage(new IndexPage(this)) attachPage(new StagePage(this)) attachPage(new PoolPage(this)) + parent.registerListener(listener) } - def jobProgressListener: JobProgressListener = { - assert(listener.isDefined, "JobProgressTab has not started yet!") - listener.get.asInstanceOf[JobProgressListener] - } - - def isFairScheduler = jobProgressListener.schedulingMode.exists(_ == SchedulingMode.FAIR) - - def headerTabs: Seq[UITab] = parent.getTabs + def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 7fffe2affb0f2..228bfb2881c53 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -22,15 +22,15 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.scheduler.{Schedulable, StageInfo} -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing specific pool details */ -private[ui] class PoolPage(parent: JobProgressTab) extends UIPage("pool") { +private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index bb7a9c14f7761..f4b68f241966d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -26,7 +26,7 @@ import org.apache.spark.ui.UIUtils /** Table showing list of pools */ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { private val basePath = parent.basePath - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener def toNodeSeq: Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 372210919cd91..71eda45d253e1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -22,14 +22,14 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ -private[ui] class StagePage(parent: JobProgressTab) extends UIPage("stage") { +private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index d918feafd97d0..5cc1fcd10a08d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -29,7 +29,7 @@ import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressTab) { private val basePath = parent.basePath - private lazy val listener = parent.jobProgressListener + private val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler def toNodeSeq: Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala index ac83f71ed31de..492c223625e6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala @@ -24,22 +24,18 @@ import org.apache.spark.scheduler._ import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils} /** Web UI showing storage status of all RDD's in the given SparkContext. */ -private[ui] class BlockManagerTab(parent: SparkUI) extends UITab("storage") { +private[ui] class BlockManagerTab(parent: SparkUI) extends WebUITab(parent, "storage") { val appName = parent.appName val basePath = parent.basePath + val listener = new BlockManagerListener(parent.storageStatusListener) - def start() { - listener = Some(new BlockManagerListener(parent.storageStatusListener)) + initialize() + + def initialize() { attachPage(new IndexPage(this)) attachPage(new RddPage(this)) + parent.registerListener(listener) } - - def blockManagerListener: BlockManagerListener = { - assert(listener.isDefined, "BlockManagerTab has not started yet!") - listener.get.asInstanceOf[BlockManagerListener] - } - - def headerTabs: Seq[UITab] = parent.getTabs } /** diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index cb1b0dc7574f8..054369bc4730c 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -22,14 +22,14 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.RDDInfo -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ -private[ui] class IndexPage(parent: BlockManagerTab) extends UIPage("") { +private[ui] class IndexPage(parent: BlockManagerTab) extends WebUIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.blockManagerListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { val rdds = listener.rddInfoList diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index a65ba0a020bcd..5eaf41c985ecf 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -22,14 +22,14 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} -import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ -private[ui] class RddPage(parent: BlockManagerTab) extends UIPage("rdd") { +private[ui] class RddPage(parent: BlockManagerTab) extends WebUIPage("rdd") { private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.blockManagerListener + private val listener = parent.listener override def render(request: HttpServletRequest): Seq[Node] = { val rddId = request.getParameter("id").toInt diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 58960812e1205..4089285d7c287 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -29,9 +29,9 @@ import org.apache.spark.util.Distribution /** Page for Spark Web UI that shows statistics of a streaming job */ private[ui] class StreamingPage(parent: StreamingTab) - extends UIPage("") with Logging { + extends WebUIPage("") with Logging { - private val listener = parent.streamingListener + private val listener = parent.listener private val startTime = Calendar.getInstance().getTime() private val emptyCellTest = "-" diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index 5a817b067e4fe..be8e652899ebe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -17,35 +17,26 @@ package org.apache.spark.streaming.ui -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.ui.{SparkUI, UITab} -import org.apache.spark.Logging import java.util.concurrent.atomic.AtomicInteger -/** Streaming tab in the Spark web UI */ +import org.apache.spark.Logging +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.ui.WebUITab + +/** Spark Web UI tab that shows statistics of a streaming job */ private[spark] class StreamingTab(ssc: StreamingContext) - extends UITab(StreamingTab.streamingTabName) with Logging { + extends WebUITab(ssc.sc.ui, "streaming") with Logging { val parent = ssc.sc.ui - val streamingListener = new StreamingJobProgressListener(ssc) - val basePath = parent.basePath val appName = parent.appName + val basePath = parent.basePath + val listener = new StreamingJobProgressListener(ssc) - ssc.addStreamingListener(streamingListener) - attachPage(new StreamingPage(this)) - parent.attachTab(this) - - def headerTabs = parent.getTabs - - def start() { } -} - -object StreamingTab { - private val atomicInteger = new AtomicInteger(0) + initialize() - /** Generate the name of the streaming tab. For the first streaming tab it will be */ - def streamingTabName: String = { - val count = atomicInteger.getAndIncrement - if (count == 0) "streaming" else s"streaming-$count" + def initialize() { + ssc.addStreamingListener(listener) + attachPage(new StreamingPage(this)) + parent.attachTab(this) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala index 5bba5d9a39dd7..502896d76c494 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -59,37 +59,33 @@ class UISuite extends FunSuite with ShouldMatchers with BeforeAndAfterAll with B val ssc = new StreamingContext(sc, Seconds(1)) eventually(timeout(10 seconds), interval(50 milliseconds)) { val uiData = Source.fromURL( - ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString + ssc.sparkContext.ui.appUIAddress).mkString + assert(!uiData.contains("random data that should not be present")) assert(uiData.contains("streaming")) } } - test("multiple streaming tabs") { - val ssc1 = new StreamingContext(sc, Seconds(1)) - val ssc2 = new StreamingContext(sc, Seconds(2)) - ssc1.uiTab.prefix should not be ssc2.uiTab.prefix - } - ignore("Testing") { runStreaming(1000000) } def runStreaming(duration: Long) { - val ssc = new StreamingContext("local[10]", "test", Seconds(1)) - val servers = (1 to 5).map { i => new TestServer(10000 + i) } + val ssc1 = new StreamingContext(sc, Seconds(1)) + val servers1 = (1 to 3).map { i => new TestServer(10000 + i) } + + val inputStream1 = ssc1.union(servers1.map(server => ssc1.socketTextStream("localhost", server.port))) + inputStream1.count.print - val inputStream = ssc.union(servers.map(server => ssc.socketTextStream("localhost", server.port))) - inputStream.count.print + ssc1.start() + servers1.foreach(_.start()) - ssc.start() - servers.foreach(_.start()) val startTime = System.currentTimeMillis() while (System.currentTimeMillis() - startTime < duration) { - servers.map(_.send(Random.nextString(10) + "\n")) + servers1.map(_.send(Random.nextString(10) + "\n")) //Thread.sleep(1) } - ssc.stop() - servers.foreach(_.stop()) + ssc1.stop() + servers1.foreach(_.stop()) } }