From 1af239b29cf41619ff4df684741ef1e4816ded87 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 9 Apr 2014 22:32:48 -0700 Subject: [PATCH] Changed streaming UI to attach itself as a tab with the Spark UI. --- .../scala/org/apache/spark/ui/FooTab.scala | 105 ------- .../scala/org/apache/spark/ui/UIUtils.scala | 15 +- .../spark/streaming/StreamingContext.scala | 26 +- .../dstream/NetworkInputDStream.scala | 6 +- .../scheduler/NetworkInputTracker.scala | 4 +- .../ui/StreamingProgressListener.scala | 131 +++++++++ .../spark/streaming/ui/StreamingUI.scala | 264 ++++++------------ .../apache/spark/streaming/ui/UIUtils.scala | 95 ------- 8 files changed, 241 insertions(+), 405 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/ui/FooTab.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala diff --git a/core/src/main/scala/org/apache/spark/ui/FooTab.scala b/core/src/main/scala/org/apache/spark/ui/FooTab.scala deleted file mode 100644 index 620fe8001a85c..0000000000000 --- a/core/src/main/scala/org/apache/spark/ui/FooTab.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui - -import javax.servlet.http.HttpServletRequest - -import scala.collection.mutable -import scala.xml.Node - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.SparkContext._ -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} - -/* - * This is an example of how to extend the SparkUI by adding new tabs to it. It is intended - * only as a demonstration and should be removed before merging into master! - * - * bin/spark-class org.apache.spark.ui.FooTab - */ - -/** A tab that displays basic information about jobs seen so far. */ -private[spark] class FooTab(parent: SparkUI) extends UITab("foo") { - val appName = parent.appName - val basePath = parent.basePath - - def start() { - listener = Some(new FooListener) - attachPage(new IndexPage(this)) - } - - def fooListener: FooListener = { - assert(listener.isDefined, "ExecutorsTab has not started yet!") - listener.get.asInstanceOf[FooListener] - } - - def headerTabs: Seq[UITab] = parent.getTabs -} - -/** A foo page. Enough said. */ -private[spark] class IndexPage(parent: FooTab) extends UIPage("") { - private val appName = parent.appName - private val basePath = parent.basePath - private val listener = parent.fooListener - - override def render(request: HttpServletRequest): Seq[Node] = { - val results = listener.jobResultMap.toSeq.sortBy { case (k, _) => k } - val content = -
-
- Foo Jobs: -
    - {results.map { case (k, v) =>
  • Job {k}: {v}
  • }} -
-
-
- UIUtils.headerSparkPage(content, basePath, appName, "Foo", parent.headerTabs, parent) - } -} - -/** A listener that maintains a mapping between job IDs and job results. */ -private[spark] class FooListener extends SparkListener { - val jobResultMap = mutable.Map[Int, String]() - - override def onJobEnd(end: SparkListenerJobEnd) { - jobResultMap(end.jobId) = end.jobResult.toString - } -} - - -/** - * Start a SparkContext and a SparkUI with a FooTab attached. - */ -private[spark] object FooTab { - def main(args: Array[String]) { - val sc = new SparkContext("local", "Foo Tab", new SparkConf) - val fooTab = new FooTab(sc.ui) - sc.ui.attachTab(fooTab) - - // Run a few jobs - sc.parallelize(1 to 1000).count() - sc.parallelize(1 to 2000).persist().count() - sc.parallelize(1 to 3000).map(i => (i/2, i)).groupByKey().count() - sc.parallelize(1 to 4000).map(i => (i/2, i)).groupByKey().persist().count() - sc.parallelize(1 to 5000).map(i => (i/2, i)).groupByKey().persist().count() - sc.parallelize(1 to 6000).map(i => (i/2, i)).groupByKey().persist().count() - sc.parallelize(1 to 7000).map(i => (i/2, i)).groupByKey().persist().count() - - readLine("\n> Started SparkUI with a Foo tab...") - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 7cf16b5ed29b1..fcda341ae5941 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -61,7 +61,9 @@ private[spark] object UIUtils { appName: String, title: String, tabs: Seq[UITab], - activeTab: UITab) : Seq[Node] = { + activeTab: UITab, + refreshInterval: Option[Int] = None + ) : Seq[Node] = { val header = tabs.map { tab =>
  • @@ -78,8 +80,17 @@ private[spark] object UIUtils { type="text/css" /> {appName} - {title} + - +
  • Overview
  • - } - - - - - - - - {appName} - {title} - - - - - -
    -
    -
    -

    - {title} -

    -
    -
    - {content} -
    - - - } - - def listingTable[T]( - headers: Seq[String], - makeRow: T => Seq[Node], - rows: Seq[T], - fixedWidth: Boolean = false): Seq[Node] = { - org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth) - } - - def listingTable[T]( - headers: Seq[String], - rows: Seq[Seq[String]], - fixedWidth: Boolean = false - ): Seq[Node] = { - def makeRow(data: Seq[String]): Seq[Node] = {data.map(d => {d})} - org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth) - } -}