From 56cc7fbcaf04a5aab88296d20da2cfc5b84a7651 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 28 Mar 2014 14:45:46 -0700 Subject: [PATCH 1/7] First cut implementation of Streaming UI. --- .../spark/streaming/StreamingContext.scala | 4 + .../spark/streaming/ui/StreamingUI.scala | 131 ++++++++++++++++++ .../apache/spark/streaming/ui/UIUtils.scala | 72 ++++++++++ 3 files changed, 207 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index e198c69470c1f..d45cdac5bef41 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -40,6 +40,7 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receivers._ import org.apache.spark.streaming.scheduler._ import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.ui.StreamingUI /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -158,6 +159,9 @@ class StreamingContext private[streaming] ( private[streaming] val waiter = new ContextWaiter + private[streaming] val ui = new StreamingUI(this) + ui.bind() + /** * Return the associated Spark context */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala new file mode 100644 index 0000000000000..e9f8d21faab45 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import scala.collection.mutable.SynchronizedQueue +import scala.xml.Node + +import javax.servlet.http.HttpServletRequest +import org.eclipse.jetty.servlet.ServletContextHandler + +import org.apache.spark.Logging +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListener, StreamingListenerBatchCompleted} +import org.apache.spark.ui.{ServerInfo, SparkUI} +import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.util.{Distribution, Utils} + +private[spark] class StreamingUIListener() extends StreamingListener { + + private val batchInfos = new SynchronizedQueue[BatchInfo] + private val maxBatchInfos = 100 + + override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) { + batchInfos.enqueue(batchStarted.batchInfo) + if (batchInfos.size > maxBatchInfos) batchInfos.dequeue() + } + + def processingDelayDistribution = extractDistribution(_.processingDelay) + + def schedulingDelayDistribution = extractDistribution(_.schedulingDelay) + + def totalDelay = extractDistribution(_.totalDelay) + + def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { + Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble)) + } + + def numBatchInfos = batchInfos.size +} + +private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { + + private val sc = ssc.sparkContext + private val conf = sc.conf + private val appName = sc.appName + private val bindHost = Utils.localHostName() + private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) + private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT) + private val securityManager = sc.env.securityManager + private val listener = new StreamingUIListener() + private val handlers: Seq[ServletContextHandler] = { + Seq( + createServletHandler("/", + (request: HttpServletRequest) => render(request), securityManager), + createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static") + ) + } + + private var serverInfo: Option[ServerInfo] = None + + ssc.addStreamingListener(listener) + + def bind() { + try { + serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) + logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort)) + } catch { + case e: Exception => + logError("Failed to create Spark JettyUtils", e) + System.exit(1) + } + } + + def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) + + private def render(request: HttpServletRequest): Seq[Node] = { + val batchStatsTable = generateBatchStatsTable() + val content = batchStatsTable + UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview") + } + + private def generateBatchStatsTable(): Seq[Node] = { + def getQuantiles(timeDistributionOption: Option[Distribution]) = { + timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) } + } + val numBatches = listener.numBatchInfos + val table = if (numBatches > 0) { + val processingDelayQuantilesRow = + "Processing Times" +: getQuantiles(listener.processingDelayDistribution) + val schedulingDelayQuantilesRow = + "Scheduling Delay:" +: getQuantiles(listener.processingDelayDistribution) + val totalDelayQuantilesRow = + "End-to-end Delay:" +: getQuantiles(listener.totalDelay) + + val headerRow = Seq("Metric", "Min", "25th percentile", + "Median", "75th percentile", "Max") + val dataRows: Seq[Seq[String]] = Seq( + processingDelayQuantilesRow, + schedulingDelayQuantilesRow, + totalDelayQuantilesRow + ) + Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true)) + } else { + None + } + + val content = +

Batch Processing Statistics

++ +
{table.getOrElse("No statistics have been generated yet.")}
+ content + } +} + +object StreamingUI { + val DEFAULT_PORT = 6060 +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala new file mode 100644 index 0000000000000..62e95135fa5c5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -0,0 +1,72 @@ +package org.apache.spark.streaming.ui + +import scala.xml.Node +import org.apache.spark.ui.Page + +private[spark] object UIUtils { + + import org.apache.spark.ui.UIUtils.prependBaseUri + + def headerStreamingPage( + content: => Seq[Node], + basePath: String, + appName: String, + title: String): Seq[Node] = { + val overview = { +
  • Overview
  • + } + + + + + + + + {appName} - {title} + + + + +
    +
    +
    +

    + {title} +

    +
    +
    + {content} +
    + + + } + + def listingTable[T]( + headers: Seq[String], + makeRow: T => Seq[Node], + rows: Seq[T], + fixedWidth: Boolean = false): Seq[Node] = { + org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth) + } + + def listingTable[T]( + headers: Seq[String], + rows: Seq[Seq[String]], + fixedWidth: Boolean = false + ): Seq[Node] = { + def makeRow(data: Seq[String]): Seq[Node] = {data.map(d => {d})} + org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth) + } +} From 93f1c69e067fb02bcbb1dcab93d1dff4905c2e17 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 31 Mar 2014 16:31:48 -0700 Subject: [PATCH 2/7] Added network receiver information to the Streaming UI. --- .../spark/streaming/dstream/DStream.scala | 9 - .../dstream/NetworkInputDStream.scala | 61 +++++-- .../spark/streaming/scheduler/BatchInfo.scala | 1 + .../streaming/scheduler/JobGenerator.scala | 10 +- .../streaming/scheduler/JobScheduler.scala | 11 +- .../spark/streaming/scheduler/JobSet.scala | 7 +- .../scheduler/NetworkInputTracker.scala | 68 ++++--- .../scheduler/StreamingListener.scala | 13 +- .../spark/streaming/ui/StreamingUI.scala | 171 +++++++++++++++--- .../spark/streaming/InputStreamsSuite.scala | 6 +- .../org/apache/spark/streaming/UISuite.scala | 52 ++++++ 11 files changed, 301 insertions(+), 108 deletions(-) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index d48b51aa69565..36aae958019ce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -351,15 +351,6 @@ abstract class DStream[T: ClassTag] ( dependencies.foreach(_.clearMetadata(time)) } - /* Adds metadata to the Stream while it is running. - * This method should be overwritten by sublcasses of InputDStream. - */ - private[streaming] def addMetadata(metadata: Any) { - if (metadata != null) { - logInfo("Dropping Metadata: " + metadata.toString) - } - } - /** * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 0dc6704603f82..8da4309daf4ca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -17,23 +17,23 @@ package org.apache.spark.streaming.dstream -import java.util.concurrent.ArrayBlockingQueue import java.nio.ByteBuffer +import java.util.concurrent.ArrayBlockingQueue -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.Await import scala.concurrent.duration._ import scala.reflect.ClassTag -import akka.actor.{Props, Actor} +import akka.actor.{Actor, Props} import akka.pattern.ask -import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} -import org.apache.spark.streaming._ import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.rdd.{RDD, BlockRDD} +import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} -import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver} +import org.apache.spark.streaming._ +import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, AddBlocks, DeregisterReceiver, RegisterReceiver} +import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -48,8 +48,10 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { - // This is an unique identifier that is used to match the network receiver with the - // corresponding network input stream. + /** Keeps all received blocks information */ + private val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] + + /** This is an unique identifier for the network input stream. */ val id = ssc.getNewNetworkStreamId() /** @@ -64,23 +66,45 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte def stop() {} + /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */ override def compute(validTime: Time): Option[RDD[T]] = { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure if (validTime >= graph.startTime) { - val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime) + val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id) + receivedBlockInfo(validTime) = blockInfo + val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) Some(new BlockRDD[T](ssc.sc, blockIds)) } else { Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } } + + /** Get information on received blocks. */ + private[streaming] def getReceivedBlockInfo(time: Time) = { + receivedBlockInfo(time) + } + + /** + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This + * implementation overrides the default implementation to clear received + * block information. + */ + private[streaming] override def clearMetadata(time: Time) { + super.clearMetadata(time) + val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration)) + receivedBlockInfo --= oldReceivedBlocks.keys + logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", ")) + } } private[streaming] sealed trait NetworkReceiverMessage private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) +private[streaming] case class ReportBlock(blockId: StreamBlockId, numRecords: Long, metadata: Any) extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage @@ -156,21 +180,20 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging actor ! ReportError(e.toString) } - /** * Pushes a block (as an ArrayBuffer filled with data) into the block manager. */ - def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + def pushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) - actor ! ReportBlock(blockId, metadata) + actor ! ReportBlock(blockId, arrayBuffer.size, metadata) } /** * Pushes a block (as bytes) into the block manager. */ - def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + def pushBlock(blockId: StreamBlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { env.blockManager.putBytes(blockId, bytes, level) - actor ! ReportBlock(blockId, metadata) + actor ! ReportBlock(blockId, -1 , metadata) } /** A helper actor that communicates with the NetworkInputTracker */ @@ -188,8 +211,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging } override def receive() = { - case ReportBlock(blockId, metadata) => - tracker ! AddBlocks(streamId, Array(blockId), metadata) + case ReportBlock(blockId, numRecords, metadata) => + tracker ! AddBlocks(ReceivedBlockInfo(streamId, blockId, numRecords, metadata)) case ReportError(msg) => tracker ! DeregisterReceiver(streamId, msg) case StopReceiver(msg) => @@ -211,7 +234,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging class BlockGenerator(storageLevel: StorageLevel) extends Serializable with Logging { - case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) + case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 7f3cd2f8eb1fd..9c69a2a4e21f5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time */ case class BatchInfo( batchTime: Time, + receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]], submissionTime: Long, processingStartTime: Option[Long], processingEndTime: Option[Long] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index c7306248b1950..80888c755c6bc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -147,7 +147,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => - jobScheduler.runJobs(time, graph.generateJobs(time)) + jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) ) // Restart the timer @@ -159,7 +159,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { private def generateJobs(time: Time) { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { - case Success(jobs) => jobScheduler.runJobs(time, jobs) + case Success(jobs) => + val receivedBlockInfo = graph.getNetworkInputStreams.map { stream => + val streamId = stream.id + val receivedBlockInfo = stream.getReceivedBlockInfo(time) + (streamId, receivedBlockInfo) + }.toMap + jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventActor ! DoCheckpoint(time) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index de675d3c7fb94..ae99454cf8b86 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -82,14 +82,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } } - def runJobs(time: Time, jobs: Seq[Job]) { - if (jobs.isEmpty) { - logInfo("No jobs added for time " + time) + def submitJobSet(jobSet: JobSet) { + if (jobSet.jobs.isEmpty) { + logInfo("No jobs added for time " + jobSet.time) } else { - val jobSet = new JobSet(time, jobs) - jobSets.put(time, jobSet) + jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => executor.execute(new JobHandler(job))) - logInfo("Added jobs for time " + time) + logInfo("Added jobs for time " + jobSet.time) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index fcf303aee6cd7..a69d74362173e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time * belong to the same batch. */ private[streaming] -case class JobSet(time: Time, jobs: Seq[Job]) { +case class JobSet( + time: Time, + jobs: Seq[Job], + receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty + ) { private val incompleteJobs = new HashSet[Job]() private val submissionTime = System.currentTimeMillis() // when this jobset was submitted @@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) { def toBatchInfo: BatchInfo = { new BatchInfo( time, + receivedBlockInfo, submissionTime, if (processingStartTime >= 0 ) Some(processingStartTime) else None, if (processingEndTime >= 0 ) Some(processingEndTime) else None diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index e4fa163f2e069..74a7644d1c7ad 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -17,26 +17,33 @@ package org.apache.spark.streaming.scheduler -import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} -import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} -import org.apache.spark.{SparkException, Logging, SparkEnv} -import org.apache.spark.SparkContext._ - -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import scala.concurrent.duration._ +import scala.collection.mutable.{HashMap, SynchronizedQueue, SynchronizedMap} import akka.actor._ -import akka.pattern.ask -import akka.dispatch._ -import org.apache.spark.storage.BlockId -import org.apache.spark.streaming.{Time, StreamingContext} + +import org.apache.spark.{Logging, SparkEnv, SparkException} +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StreamBlockId +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver} import org.apache.spark.util.AkkaUtils +/** Information about block received by the network receiver */ +case class ReceivedBlockInfo( + streamId: Int, + blockId: StreamBlockId, + numRecords: Long, + metadata: Any + ) + +/** + * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate + * with each other. + */ private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) +private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage @@ -53,9 +60,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverExecutor() val receiverInfo = new HashMap[Int, ActorRef] - val receivedBlockIds = new HashMap[Int, Queue[BlockId]] + val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]] + with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]] val timeout = AkkaUtils.askTimeout(ssc.conf) - + val listenerBus = ssc.scheduler.listenerBus // actor is created when generator starts. // This not being null means the tracker has been started and not stopped @@ -87,15 +95,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } /** Return all the blocks received from a receiver. */ - def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized { - val queue = receivedBlockIds.synchronized { - receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]()) - } - val result = queue.synchronized { - queue.dequeueAll(x => true) - } - logInfo("Stream " + receiverId + " received " + result.size + " blocks") - result.toArray + def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = { + val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true) + logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks") + receivedBlockInfo.toArray + } + + private def getReceivedBlockInfoQueue(streamId: Int) = { + receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo]) } /** Actor to receive messages from the receivers. */ @@ -110,17 +117,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { + sender.path.address) sender ! true } - case AddBlocks(streamId, blockIds, metadata) => { - val tmp = receivedBlockIds.synchronized { - if (!receivedBlockIds.contains(streamId)) { - receivedBlockIds += ((streamId, new Queue[BlockId])) - } - receivedBlockIds(streamId) - } - tmp.synchronized { - tmp ++= blockIds - } - networkInputStreamMap(streamId).addMetadata(metadata) + case AddBlocks(receivedBlockInfo) => { + getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo } case DeregisterReceiver(streamId, msg) => { receiverInfo -= streamId diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 461ea3506477f..0c1edff9c8616 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -23,6 +23,7 @@ import org.apache.spark.util.Distribution /** Base trait for events related to StreamingListener */ sealed trait StreamingListenerEvent +case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent @@ -34,14 +35,14 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen * computation. */ trait StreamingListener { - /** - * Called when processing of a batch has completed - */ + + /** Called when a batch of jobs has been submitted for processing. */ + def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } + + /** Called when processing of a batch of jobs has completed. */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } - /** - * Called when processing of a batch has started - */ + /** Called when processing of a batch of jobs has started. */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala index e9f8d21faab45..b574cb103f766 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala @@ -17,40 +17,86 @@ package org.apache.spark.streaming.ui -import scala.collection.mutable.SynchronizedQueue +import scala.collection.mutable.{HashMap, Queue} import scala.xml.Node import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.servlet.ServletContextHandler -import org.apache.spark.Logging -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListener, StreamingListenerBatchCompleted} +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.scheduler._ import org.apache.spark.ui.{ServerInfo, SparkUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{Distribution, Utils} +import java.util.Locale -private[spark] class StreamingUIListener() extends StreamingListener { +private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListener { - private val batchInfos = new SynchronizedQueue[BatchInfo] - private val maxBatchInfos = 100 + private val waitingBatchInfos = new HashMap[Time, BatchInfo] + private val runningBatchInfos = new HashMap[Time, BatchInfo] + private val completedaBatchInfos = new Queue[BatchInfo] + private val batchInfoLimit = conf.getInt("spark.steaming.ui.maxBatches", 100) - override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) { - batchInfos.enqueue(batchStarted.batchInfo) - if (batchInfos.size > maxBatchInfos) batchInfos.dequeue() + override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized { + runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo } - def processingDelayDistribution = extractDistribution(_.processingDelay) + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized { + runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo + waitingBatchInfos.remove(batchStarted.batchInfo.batchTime) + } + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized { + waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime) + runningBatchInfos.remove(batchCompleted.batchInfo.batchTime) + completedaBatchInfos.enqueue(batchCompleted.batchInfo) + if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue() + } + + def numNetworkReceivers: Int = synchronized { + completedaBatchInfos.headOption.map(_.receivedBlockInfo.size).getOrElse(0) + } + + def waitingBatches: Seq[BatchInfo] = synchronized { + waitingBatchInfos.values.toSeq + } + + def runningBatches: Seq[BatchInfo] = synchronized { + runningBatchInfos.values.toSeq + } - def schedulingDelayDistribution = extractDistribution(_.schedulingDelay) + def completedBatches: Seq[BatchInfo] = synchronized { + completedaBatchInfos.toSeq + } - def totalDelay = extractDistribution(_.totalDelay) + def processingDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.processingDelay) + } - def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { - Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble)) + def schedulingDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.schedulingDelay) } - def numBatchInfos = batchInfos.size + def totalDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.totalDelay) + } + + def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized { + val allBatcheInfos = waitingBatchInfos.values.toSeq ++ + runningBatchInfos.values.toSeq ++ completedaBatchInfos + val latestBatchInfos = allBatcheInfos.sortBy(_.batchTime)(Time.ordering).reverse.take(batchInfoLimit) + val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) + (0 until numNetworkReceivers).map { receiverId => + val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty)) + val distributionOption = Distribution(blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble)) + (receiverId, distributionOption) + }.toMap + } + + private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { + Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble)) + } } private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { @@ -62,7 +108,7 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT) private val securityManager = sc.env.securityManager - private val listener = new StreamingUIListener() + private val listener = new StreamingUIListener(conf) private val handlers: Seq[ServletContextHandler] = { Seq( createServletHandler("/", @@ -89,23 +135,19 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) private def render(request: HttpServletRequest): Seq[Node] = { - val batchStatsTable = generateBatchStatsTable() - val content = batchStatsTable + val content = generateBatchStatsTable() ++ generateNetworkStatsTable() UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview") } - private def generateBatchStatsTable(): Seq[Node] = { - def getQuantiles(timeDistributionOption: Option[Distribution]) = { - timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) } - } - val numBatches = listener.numBatchInfos + private def generateBatchStatsTable(): Seq[Node] = { + val numBatches = listener.completedBatches.size val table = if (numBatches > 0) { val processingDelayQuantilesRow = "Processing Times" +: getQuantiles(listener.processingDelayDistribution) val schedulingDelayQuantilesRow = - "Scheduling Delay:" +: getQuantiles(listener.processingDelayDistribution) + "Scheduling Delay:" +: getQuantiles(listener.schedulingDelayDistribution) val totalDelayQuantilesRow = - "End-to-end Delay:" +: getQuantiles(listener.totalDelay) + "End-to-end Delay:" +: getQuantiles(listener.totalDelayDistribution) val headerRow = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") @@ -119,11 +161,86 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { None } + val batchCounts = + + + val batchStats = + + val content =

    Batch Processing Statistics

    ++ -
    {table.getOrElse("No statistics have been generated yet.")}
    +
    {batchCounts}
    ++ +
    {batchStats}
    + content } + + private def generateNetworkStatsTable(): Seq[Node] = { + val receivedRecordDistributions = listener.receivedRecordsDistributions + val numNetworkReceivers = receivedRecordDistributions.size + val table = if (receivedRecordDistributions.size > 0) { + val headerRow = Seq("Receiver", "Min", "25th percentile", + "Median", "75th percentile", "Max") + val dataRows = (0 until numNetworkReceivers).map { receiverId => + val receiverName = s"Receiver-$receiverId" + val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => + d.getQuantiles().map(r => numberToString(r.toLong) + " records/batch") + }.getOrElse { + Seq("-", "-", "-", "-", "-") + } + receiverName +: receivedRecordStats + } + Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true)) + } else { + None + } + + val content = +

    Network Input Statistics

    ++ +
    {table.getOrElse("No network receivers")}
    + + content + } + + private def getQuantiles(timeDistributionOption: Option[Distribution]) = { + timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) } + } + + private def numberToString(records: Double): String = { + val trillion = 1e12 + val billion = 1e9 + val million = 1e6 + val thousand = 1e3 + + val (value, unit) = { + if (records >= 2*trillion) { + (records / trillion, "T") + } else if (records >= 2*billion) { + (records / billion, "B") + } else if (records >= 2*million) { + (records / million, "M") + } else if (records >= 2*thousand) { + (records / thousand, "K") + } else { + (records, "") + } + } + "%.1f%s".formatLocal(Locale.US, value, unit) + } } object StreamingUI { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 74e73ebb342fe..723ea18e91dbf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -238,11 +238,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { /** This is a server to test the network input stream */ -class TestServer() extends Logging { +class TestServer(portToBind: Int = 0) extends Logging { val queue = new ArrayBlockingQueue[String](100) - val serverSocket = new ServerSocket(0) + val serverSocket = new ServerSocket(portToBind) val servingThread = new Thread() { override def run() { @@ -281,7 +281,7 @@ class TestServer() extends Logging { def start() { servingThread.start() } - def send(msg: String) { queue.add(msg) } + def send(msg: String) { queue.put(msg) } def stop() { servingThread.interrupt() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala new file mode 100644 index 0000000000000..204041def7dfc --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -0,0 +1,52 @@ +package org.apache.spark.streaming + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} +import org.apache.spark.streaming.dstream.InputDStream +import scala.reflect.ClassTag +import org.apache.spark.rdd.RDD +import scala.util.Random + +class UISuite extends FunSuite with BeforeAndAfterAll { + + test("Testing") { + runStreaming(1000000) + } + + def runStreaming(duration: Long) { + val ssc = new StreamingContext("local[10]", "test", Seconds(1)) + val servers = (1 to 5).map { i => new TestServer(10000 + i) } + + val inputStream = ssc.union(servers.map(server => ssc.socketTextStream("localhost", server.port))) + inputStream.count.print + + ssc.start() + servers.foreach(_.start()) + val startTime = System.currentTimeMillis() + while (System.currentTimeMillis() - startTime < duration) { + servers.map(_.send(Random.nextString(10) + "\n")) + Thread.sleep(1) + } + ssc.stop() + servers.foreach(_.stop()) + } +} + +class FunctionBasedInputDStream[T: ClassTag]( + ssc_ : StreamingContext, + function: (StreamingContext, Time) => Option[RDD[T]] + ) extends InputDStream[T](ssc_) { + + def start(): Unit = {} + + def stop(): Unit = {} + + def compute(validTime: Time): Option[RDD[T]] = function(ssc, validTime) +} + + + + + + + + From 4d86e985cb7bbc7f4f125e52d72f4e4bd560677e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 1 Apr 2014 11:02:23 -0700 Subject: [PATCH 3/7] Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later. --- .../spark/streaming/ui/StreamingUI.scala | 186 +++++++++++++----- .../apache/spark/streaming/ui/UIUtils.scala | 9 +- .../org/apache/spark/streaming/UISuite.scala | 2 +- 3 files changed, 147 insertions(+), 50 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala index b574cb103f766..545c5cb8e3f61 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala @@ -29,14 +29,17 @@ import org.apache.spark.streaming.scheduler._ import org.apache.spark.ui.{ServerInfo, SparkUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{Distribution, Utils} -import java.util.Locale +import java.util.{Calendar, Locale} -private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListener { +private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingListener { private val waitingBatchInfos = new HashMap[Time, BatchInfo] private val runningBatchInfos = new HashMap[Time, BatchInfo] private val completedaBatchInfos = new Queue[BatchInfo] - private val batchInfoLimit = conf.getInt("spark.steaming.ui.maxBatches", 100) + private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100) + private var totalBatchesCompleted = 0L + + val batchDuration = ssc.graph.batchDuration.milliseconds override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized { runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo @@ -52,6 +55,11 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe runningBatchInfos.remove(batchCompleted.batchInfo.batchTime) completedaBatchInfos.enqueue(batchCompleted.batchInfo) if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue() + totalBatchesCompleted += 1L + } + + def numTotalBatchesCompleted: Long = synchronized { + totalBatchesCompleted } def numNetworkReceivers: Int = synchronized { @@ -89,7 +97,8 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) (0 until numNetworkReceivers).map { receiverId => val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty)) - val distributionOption = Distribution(blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble)) + val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble * 1000 / batchDuration) + val distributionOption = Distribution(recordsOfParticularReceiver) (receiverId, distributionOption) }.toMap } @@ -99,44 +108,42 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe } } -private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { - private val sc = ssc.sparkContext - private val conf = sc.conf - private val appName = sc.appName - private val bindHost = Utils.localHostName() - private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) - private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT) - private val securityManager = sc.env.securityManager - private val listener = new StreamingUIListener(conf) - private val handlers: Seq[ServletContextHandler] = { - Seq( - createServletHandler("/", - (request: HttpServletRequest) => render(request), securityManager), - createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static") - ) - } +private[ui] class StreamingPage(parent: StreamingUI) extends Logging { - private var serverInfo: Option[ServerInfo] = None + private val listener = parent.listener + private val calendar = Calendar.getInstance() + private val startTime = calendar.getTime() - ssc.addStreamingListener(listener) - def bind() { - try { - serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) - logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Spark JettyUtils", e) - System.exit(1) - } - } + def render(request: HttpServletRequest): Seq[Node] = { - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) + val content = + generateBasicStats() ++ +

    Statistics over last {listener.completedBatches.size} processed batches

    ++ + generateNetworkStatsTable() ++ + generateBatchStatsTable() + UIUtils.headerStreamingPage(content, "", parent.appName, "Spark Streaming Overview") + } - private def render(request: HttpServletRequest): Seq[Node] = { - val content = generateBatchStatsTable() ++ generateNetworkStatsTable() - UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview") + private def generateBasicStats(): Seq[Node] = { + + val timeSinceStart = System.currentTimeMillis() - startTime.getTime +
      +
    • + Started at: {startTime.toString} +
    • +
    • + Time since start: {msDurationToString(timeSinceStart)} +
    • +
    • + Batch interval: {msDurationToString(listener.batchDuration)} +
    • +
    • + Processed batches: {listener.numTotalBatchesCompleted} +
    • +
    • +
    } private def generateBatchStatsTable(): Seq[Node] = { @@ -173,18 +180,12 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { val batchStats =
      -
    • -
      Statistics over last {numBatches} processed batches
      -
    • -
    • - {table.getOrElse("No statistics have been generated yet.")} -
    • + {table.getOrElse("No statistics have been generated yet.")}
    val content = -

    Batch Processing Statistics

    ++ -
    {batchCounts}
    ++ -
    {batchStats}
    +
    Batch Processing Statistics
    ++ +
    {batchStats}
    content } @@ -198,7 +199,7 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { val dataRows = (0 until numNetworkReceivers).map { receiverId => val receiverName = s"Receiver-$receiverId" val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles().map(r => numberToString(r.toLong) + " records/batch") + d.getQuantiles().map(r => numberToString(r.toLong) + " records/second") }.getOrElse { Seq("-", "-", "-", "-", "-") } @@ -210,8 +211,8 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { } val content = -

    Network Input Statistics

    ++ -
    {table.getOrElse("No network receivers")}
    +
    Network Input Statistics
    ++ +
    {table.getOrElse("No network receivers")}
    content } @@ -241,6 +242,95 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { } "%.1f%s".formatLocal(Locale.US, value, unit) } + + /** + * Returns a human-readable string representing a duration such as "5 second 35 ms" + */ + private def msDurationToString(ms: Long): String = { + try { + val second = 1000L + val minute = 60 * second + val hour = 60 * minute + val day = 24 * hour + val week = 7 * day + val year = 365 * day + + def toString(num: Long, unit: String): String = { + if (num == 0) { + "" + } else if (num == 1) { + s"$num $unit" + } else { + s"$num ${unit}s" + } + } + + val millisecondsString = if (ms % second == 0) "" else s"${ms % second} ms" + val secondString = toString((ms % minute) / second, "second") + val minuteString = toString((ms % hour) / minute, "minute") + val hourString = toString((ms % day) / hour, "hour") + val dayString = toString((ms % week) / day, "day") + val weekString = toString((ms % year) / week, "week") + val yearString = toString(ms / year, "year") + + Seq( + second -> millisecondsString, + minute -> s"$secondString $millisecondsString", + hour -> s"$minuteString $secondString", + day -> s"$hourString $minuteString $secondString", + week -> s"$dayString $hourString $minuteString", + year -> s"$weekString $dayString $hourString" + ).foreach { + case (durationLimit, durationString) if (ms < durationLimit) => + return durationString + case e: Any => // matcherror is thrown without this + } + return s"$yearString $weekString $dayString" + } catch { + case e: Exception => + logError("Error converting time to string", e) + return "" + } + } +} + + +private[spark] class StreamingUI(val ssc: StreamingContext) extends Logging { + + val sc = ssc.sparkContext + val conf = sc.conf + val appName = sc.appName + val listener = new StreamingUIListener(ssc) + val overviewPage = new StreamingPage(this) + + private val bindHost = Utils.localHostName() + private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) + private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT) + private val securityManager = sc.env.securityManager + private val handlers: Seq[ServletContextHandler] = { + Seq( + createServletHandler("/", + (request: HttpServletRequest) => overviewPage.render(request), securityManager), + createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static") + ) + } + + private var serverInfo: Option[ServerInfo] = None + + ssc.addStreamingListener(listener) + + def bind() { + try { + serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) + logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort)) + } catch { + case e: Exception => + logError("Failed to create Spark JettyUtils", e) + System.exit(1) + } + } + + private def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) } object StreamingUI { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala index 62e95135fa5c5..b87bba87129b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -25,8 +25,15 @@ private[spark] object UIUtils { type="text/css" /> {appName} - {title} + - +