From e038b4b076aba23e56915b6184908b00476d4f0f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 11 Apr 2014 12:03:43 -0700 Subject: [PATCH] Addressed Patrick's comments. --- project/MimaBuild.scala | 4 ++-- .../streaming/ui/StreamingJobProgressListener.scala | 10 +++++----- .../apache/spark/streaming/ui/StreamingPage.scala | 12 ++++++------ 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala index 4bfa9bf95ad78..9cb31d70444ff 100644 --- a/project/MimaBuild.scala +++ b/project/MimaBuild.scala @@ -75,8 +75,8 @@ object MimaBuild { excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator") ++ excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator#Block") ++ excludeSparkClass("streaming.dstream.ReportError") ++ - excludeSparkClass("org.apache.spark.streaming.dstream.ReportBlock") ++ - excludeSparkClass("org.apache.spark.streaming.dstream.DStream") + excludeSparkClass("streaming.dstream.ReportBlock") ++ + excludeSparkClass("streaming.dstream.DStream") case _ => Seq() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 8921b99f53a23..8b025b09ed34d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -33,7 +33,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St private val waitingBatchInfos = new HashMap[Time, BatchInfo] private val runningBatchInfos = new HashMap[Time, BatchInfo] private val completedaBatchInfos = new Queue[BatchInfo] - private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100) + private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) private var totalCompletedBatches = 0L private val receiverInfos = new HashMap[Int, ReceiverInfo] @@ -82,7 +82,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St runningBatchInfos.values.toSeq } - def completedBatches: Seq[BatchInfo] = synchronized { + def retainedCompletedBatches: Seq[BatchInfo] = synchronized { completedaBatchInfos.toSeq } @@ -99,7 +99,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St } def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized { - val latestBatchInfos = allBatches.reverse.take(batchInfoLimit) + val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit) val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) (0 until numNetworkReceivers).map { receiverId => val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo => @@ -134,10 +134,10 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St } def lastReceivedBatch: Option[BatchInfo] = { - allBatches.lastOption + retainedBatches.lastOption } - private def allBatches: Seq[BatchInfo] = synchronized { + private def retainedBatches: Seq[BatchInfo] = synchronized { (waitingBatchInfos.values.toSeq ++ runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 6fdfd8d05dcbb..290efaef51481 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -33,13 +33,13 @@ private[ui] class StreamingPage(parent: StreamingTab) private val listener = parent.listener private val startTime = Calendar.getInstance().getTime() - private val emptyCellTest = "-" + private val emptyCell = "-" /** Render the page */ override def render(request: HttpServletRequest): Seq[Node] = { val content = generateBasicStats() ++ -

Statistics over last {listener.completedBatches.size} processed batches

++ +

Statistics over last {listener.retainedCompletedBatches.size} processed batches

++ generateNetworkStatsTable() ++ generateBatchStatsTable() UIUtils.headerSparkPage( @@ -89,12 +89,12 @@ private[ui] class StreamingPage(parent: StreamingTab) val dataRows = (0 until listener.numNetworkReceivers).map { receiverId => val receiverInfo = listener.receiverInfo(receiverId) val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") - val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCellTest) + val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId)) val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => d.getQuantiles().map(r => formatDurationVerbose(r.toLong)) }.getOrElse { - Seq(emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest) + Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) } Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats } @@ -112,7 +112,7 @@ private[ui] class StreamingPage(parent: StreamingTab) /** Generate stats of batch jobs of the streaming program */ private def generateBatchStatsTable(): Seq[Node] = { - val numBatches = listener.completedBatches.size + val numBatches = listener.retainedCompletedBatches.size val lastCompletedBatch = listener.lastCompletedBatch val table = if (numBatches > 0) { val processingDelayQuantilesRow = { @@ -161,7 +161,7 @@ private[ui] class StreamingPage(parent: StreamingTab) * Returns a human-readable string representing a duration such as "5 second 35 ms" */ private def formatDurationOption(msOption: Option[Long]): String = { - msOption.map(formatDurationVerbose).getOrElse(emptyCellTest) + msOption.map(formatDurationVerbose).getOrElse(emptyCell) } /** Get quantiles for any time distribution */