From 4d86e985cb7bbc7f4f125e52d72f4e4bd560677e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 1 Apr 2014 11:02:23 -0700 Subject: [PATCH] Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later. --- .../spark/streaming/ui/StreamingUI.scala | 186 +++++++++++++----- .../apache/spark/streaming/ui/UIUtils.scala | 9 +- .../org/apache/spark/streaming/UISuite.scala | 2 +- 3 files changed, 147 insertions(+), 50 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala index b574cb103f766..545c5cb8e3f61 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala @@ -29,14 +29,17 @@ import org.apache.spark.streaming.scheduler._ import org.apache.spark.ui.{ServerInfo, SparkUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{Distribution, Utils} -import java.util.Locale +import java.util.{Calendar, Locale} -private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListener { +private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingListener { private val waitingBatchInfos = new HashMap[Time, BatchInfo] private val runningBatchInfos = new HashMap[Time, BatchInfo] private val completedaBatchInfos = new Queue[BatchInfo] - private val batchInfoLimit = conf.getInt("spark.steaming.ui.maxBatches", 100) + private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100) + private var totalBatchesCompleted = 0L + + val batchDuration = ssc.graph.batchDuration.milliseconds override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized { runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo @@ -52,6 +55,11 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe runningBatchInfos.remove(batchCompleted.batchInfo.batchTime) completedaBatchInfos.enqueue(batchCompleted.batchInfo) if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue() + totalBatchesCompleted += 1L + } + + def numTotalBatchesCompleted: Long = synchronized { + totalBatchesCompleted } def numNetworkReceivers: Int = synchronized { @@ -89,7 +97,8 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) (0 until numNetworkReceivers).map { receiverId => val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty)) - val distributionOption = Distribution(blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble)) + val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble * 1000 / batchDuration) + val distributionOption = Distribution(recordsOfParticularReceiver) (receiverId, distributionOption) }.toMap } @@ -99,44 +108,42 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe } } -private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { - private val sc = ssc.sparkContext - private val conf = sc.conf - private val appName = sc.appName - private val bindHost = Utils.localHostName() - private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) - private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT) - private val securityManager = sc.env.securityManager - private val listener = new StreamingUIListener(conf) - private val handlers: Seq[ServletContextHandler] = { - Seq( - createServletHandler("/", - (request: HttpServletRequest) => render(request), securityManager), - createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static") - ) - } +private[ui] class StreamingPage(parent: StreamingUI) extends Logging { - private var serverInfo: Option[ServerInfo] = None + private val listener = parent.listener + private val calendar = Calendar.getInstance() + private val startTime = calendar.getTime() - ssc.addStreamingListener(listener) - def bind() { - try { - serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) - logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Spark JettyUtils", e) - System.exit(1) - } - } + def render(request: HttpServletRequest): Seq[Node] = { - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) + val content = + generateBasicStats() ++ +

Statistics over last {listener.completedBatches.size} processed batches

++ + generateNetworkStatsTable() ++ + generateBatchStatsTable() + UIUtils.headerStreamingPage(content, "", parent.appName, "Spark Streaming Overview") + } - private def render(request: HttpServletRequest): Seq[Node] = { - val content = generateBatchStatsTable() ++ generateNetworkStatsTable() - UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview") + private def generateBasicStats(): Seq[Node] = { + + val timeSinceStart = System.currentTimeMillis() - startTime.getTime + } private def generateBatchStatsTable(): Seq[Node] = { @@ -173,18 +180,12 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { val batchStats = val content = -

Batch Processing Statistics

++ -
{batchCounts}
++ -
{batchStats}
+
Batch Processing Statistics
++ +
{batchStats}
content } @@ -198,7 +199,7 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { val dataRows = (0 until numNetworkReceivers).map { receiverId => val receiverName = s"Receiver-$receiverId" val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles().map(r => numberToString(r.toLong) + " records/batch") + d.getQuantiles().map(r => numberToString(r.toLong) + " records/second") }.getOrElse { Seq("-", "-", "-", "-", "-") } @@ -210,8 +211,8 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { } val content = -

Network Input Statistics

++ -
{table.getOrElse("No network receivers")}
+
Network Input Statistics
++ +
{table.getOrElse("No network receivers")}
content } @@ -241,6 +242,95 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { } "%.1f%s".formatLocal(Locale.US, value, unit) } + + /** + * Returns a human-readable string representing a duration such as "5 second 35 ms" + */ + private def msDurationToString(ms: Long): String = { + try { + val second = 1000L + val minute = 60 * second + val hour = 60 * minute + val day = 24 * hour + val week = 7 * day + val year = 365 * day + + def toString(num: Long, unit: String): String = { + if (num == 0) { + "" + } else if (num == 1) { + s"$num $unit" + } else { + s"$num ${unit}s" + } + } + + val millisecondsString = if (ms % second == 0) "" else s"${ms % second} ms" + val secondString = toString((ms % minute) / second, "second") + val minuteString = toString((ms % hour) / minute, "minute") + val hourString = toString((ms % day) / hour, "hour") + val dayString = toString((ms % week) / day, "day") + val weekString = toString((ms % year) / week, "week") + val yearString = toString(ms / year, "year") + + Seq( + second -> millisecondsString, + minute -> s"$secondString $millisecondsString", + hour -> s"$minuteString $secondString", + day -> s"$hourString $minuteString $secondString", + week -> s"$dayString $hourString $minuteString", + year -> s"$weekString $dayString $hourString" + ).foreach { + case (durationLimit, durationString) if (ms < durationLimit) => + return durationString + case e: Any => // matcherror is thrown without this + } + return s"$yearString $weekString $dayString" + } catch { + case e: Exception => + logError("Error converting time to string", e) + return "" + } + } +} + + +private[spark] class StreamingUI(val ssc: StreamingContext) extends Logging { + + val sc = ssc.sparkContext + val conf = sc.conf + val appName = sc.appName + val listener = new StreamingUIListener(ssc) + val overviewPage = new StreamingPage(this) + + private val bindHost = Utils.localHostName() + private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) + private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT) + private val securityManager = sc.env.securityManager + private val handlers: Seq[ServletContextHandler] = { + Seq( + createServletHandler("/", + (request: HttpServletRequest) => overviewPage.render(request), securityManager), + createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static") + ) + } + + private var serverInfo: Option[ServerInfo] = None + + ssc.addStreamingListener(listener) + + def bind() { + try { + serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) + logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort)) + } catch { + case e: Exception => + logError("Failed to create Spark JettyUtils", e) + System.exit(1) + } + } + + private def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) } object StreamingUI { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala index 62e95135fa5c5..b87bba87129b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -25,8 +25,15 @@ private[spark] object UIUtils { type="text/css" /> {appName} - {title} + - +