diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 73488e7758a77..ddcb2d29dc8b0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -208,7 +208,8 @@ class SparkContext( @volatile private[spark] var dagScheduler = new DAGScheduler(this) dagScheduler.start() - postEnvironmentUpdateEvent() + postEnvironmentUpdate() + listenForBlockManagerUpdates() /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { @@ -642,7 +643,7 @@ class SparkContext( Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) - postEnvironmentUpdateEvent() + postEnvironmentUpdate() } def addSparkListener(listener: SparkListener) { @@ -791,7 +792,7 @@ class SparkContext( logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) } } - postEnvironmentUpdateEvent() + postEnvironmentUpdate() } /** @@ -1039,7 +1040,7 @@ class SparkContext( private[spark] def newRddId(): Int = nextRddId.getAndIncrement() /** Post the environment update event if the listener bus is ready */ - private def postEnvironmentUpdateEvent() { + private def postEnvironmentUpdate() { Option(listenerBus).foreach { bus => val schedulingMode = getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq @@ -1051,6 +1052,11 @@ class SparkContext( } } + /** Start listening for block manager status update events */ + private def listenForBlockManagerUpdates() { + env.blockManager.master.listener.map(_.setListenerBus(listenerBus)) + } + /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ private[spark] def cleanup(cleanupTime: Long) { persistentRdds.clearOldValues(cleanupTime) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index da684d81848bd..d4229a22a7b03 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -167,17 +167,19 @@ object SparkEnv extends Logging { } } - // Listen for block manager registration - val blockManagerListener = new BlockManagerRegistrationListener + val blockManagerStatusListener = new BlockManagerStatusListener + + // Lazy because an akka actor cannot be instantiated outside of Props lazy val blockManagerMasterActor = { val actor = new BlockManagerMasterActor(isLocal, conf) - actor.registerListener(blockManagerListener) + actor.registerListener(blockManagerStatusListener) actor } - val blockManagerMaster = - new BlockManagerMaster(registerOrLookup("BlockManagerMaster", blockManagerMasterActor), conf) - blockManagerMaster.registrationListener = Some(blockManagerListener) + val blockManagerMaster = new BlockManagerMaster( + registerOrLookup("BlockManagerMaster", blockManagerMasterActor), + conf, + blockManagerStatusListener) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 59770f3566c93..9b3350e66d8d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -150,9 +150,6 @@ class DAGScheduler( } } })) - - // Start listening for block manager registration - blockManagerMaster.registrationListener.foreach(_.setListenerBus(listenerBus)) } // Called by TaskScheduler to report task's starting. @@ -194,7 +191,7 @@ class DAGScheduler( private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { if (!cacheLocs.contains(rdd.id)) { - val blockIds = rdd.partitions.indices.map(index=> RDDBlockId(rdd.id, index)).toArray[BlockId] + val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) cacheLocs(rdd.id) = blockIds.map { id => locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) @@ -973,11 +970,6 @@ class DAGScheduler( logDebug("Additional executor lost message for " + execId + "(epoch " + currentEpoch + ")") } - // Block manager master actor should not be null except during tests - if (blockManagerMaster.driverActor != null) { - val storageStatusList = blockManagerMaster.getStorageStatus - listenerBus.post(SparkListenerExecutorsStateChange(storageStatusList)) - } } private def handleExecutorGained(execId: String, host: String) { @@ -986,8 +978,6 @@ class DAGScheduler( logInfo("Host gained which was in lost list earlier: " + host) failedEpoch -= execId } - // Do not trigger SparkListenerExecutorsStateChange, because it is already triggered in - // blockManagerMaster.registrationListener when a new BlockManager registers with the master } private def handleJobCancellation(jobId: Int) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventBus.scala b/core/src/main/scala/org/apache/spark/scheduler/EventBus.scala index f2c1863a6e9e4..a98ec5f05710c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventBus.scala @@ -36,38 +36,30 @@ private[spark] trait EventBus { /** * Post an event to all attached listeners. Return true if the shutdown event is posted. */ - protected def postToAll(event: SparkListenerEvent): Boolean = { - postToListeners(event, sparkListeners) - } - - /** - * Post an event to a given list of listeners. Return true if the shutdown event is posted. - */ - protected def postToListeners( - event: SparkListenerEvent, - listeners: Seq[SparkListener]): Boolean = { - + def postToAll(event: SparkListenerEvent): Boolean = { event match { case stageSubmitted: SparkListenerStageSubmitted => - listeners.foreach(_.onStageSubmitted(stageSubmitted)) + sparkListeners.foreach(_.onStageSubmitted(stageSubmitted)) case stageCompleted: SparkListenerStageCompleted => - listeners.foreach(_.onStageCompleted(stageCompleted)) + sparkListeners.foreach(_.onStageCompleted(stageCompleted)) case jobStart: SparkListenerJobStart => - listeners.foreach(_.onJobStart(jobStart)) + sparkListeners.foreach(_.onJobStart(jobStart)) case jobEnd: SparkListenerJobEnd => - listeners.foreach(_.onJobEnd(jobEnd)) + sparkListeners.foreach(_.onJobEnd(jobEnd)) case taskStart: SparkListenerTaskStart => - listeners.foreach(_.onTaskStart(taskStart)) + sparkListeners.foreach(_.onTaskStart(taskStart)) case taskGettingResult: SparkListenerTaskGettingResult => - listeners.foreach(_.onTaskGettingResult(taskGettingResult)) + sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult)) case taskEnd: SparkListenerTaskEnd => - listeners.foreach(_.onTaskEnd(taskEnd)) + sparkListeners.foreach(_.onTaskEnd(taskEnd)) case environmentUpdate: SparkListenerEnvironmentUpdate => - listeners.foreach(_.onEnvironmentUpdate(environmentUpdate)) - case executorsStateChange: SparkListenerExecutorsStateChange => - listeners.foreach(_.onExecutorsStateChange(executorsStateChange)) + sparkListeners.foreach(_.onEnvironmentUpdate(environmentUpdate)) + case blockManagerGained: SparkListenerBlockManagerGained => + sparkListeners.foreach(_.onBlockManagerGained(blockManagerGained)) + case blockManagerLost: SparkListenerBlockManagerLost => + sparkListeners.foreach(_.onBlockManagerLost(blockManagerLost)) case unpersistRDD: SparkListenerUnpersistRDD => - listeners.foreach(_.onUnpersistRDD(unpersistRDD)) + sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD)) case SparkListenerShutdown => return true case _ => diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 859ae74c23835..f562aede34640 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -79,7 +79,9 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) logEvent(event, flushLogger = true) override def onJobEnd(event: SparkListenerJobEnd) = logEvent(event, flushLogger = true) - override def onExecutorsStateChange(event: SparkListenerExecutorsStateChange) = + override def onBlockManagerGained(event: SparkListenerBlockManagerGained) = + logEvent(event, flushLogger = true) + override def onBlockManagerLost(event: SparkListenerBlockManagerLost) = logEvent(event, flushLogger = true) override def onUnpersistRDD(event: SparkListenerUnpersistRDD) = logEvent(event, flushLogger = true) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 8674af928e370..36873bfa90ba8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -24,7 +24,7 @@ import scala.collection.Map import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.storage.StorageStatus +import org.apache.spark.storage.{BlockManagerId, StorageStatus} sealed trait SparkListenerEvent @@ -48,9 +48,11 @@ case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkLi case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]]) extends SparkListenerEvent -case class SparkListenerExecutorsStateChange(storageStatusList: Seq[StorageStatus]) +case class SparkListenerBlockManagerGained(blockManagerId: BlockManagerId, maxMem: Long) extends SparkListenerEvent +case class SparkListenerBlockManagerLost(blockManagerId: BlockManagerId) extends SparkListenerEvent + case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ @@ -103,9 +105,14 @@ trait SparkListener { def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { } /** - * Called when a new executor has joined, or an existing executor is lost + * Called when a new block manager has joined + */ + def onBlockManagerGained(blockManagerGained: SparkListenerBlockManagerGained) { } + + /** + * Called when an existing block manager has been lost */ - def onExecutorsStateChange(executorsStateChange: SparkListenerExecutorsStateChange) { } + def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { } /** * Called when an RDD is manually unpersisted by the application diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index f33a0f8e511de..95e1ecb797440 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -28,7 +28,14 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging { +class BlockManagerMaster( + var driverActor: ActorRef, + conf: SparkConf, + val listener: Option[BlockManagerStatusListener] = None) + extends Logging { + + def this(driverActor: ActorRef, conf: SparkConf, listener: BlockManagerStatusListener) = + this(driverActor, conf, Some(listener)) val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3) val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000) @@ -37,8 +44,6 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log val timeout = AkkaUtils.askTimeout(conf) - var registrationListener: Option[BlockManagerRegistrationListener] = None - /** Remove a dead executor from the driver actor. This is only called on the driver side. */ def removeExecutor(execId: String) { tell(RemoveExecutor(execId)) @@ -55,8 +60,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log } /** Register the BlockManager's id with the driver. */ - def registerBlockManager( - blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { logInfo("Trying to register BlockManager") tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)) logInfo("Registered BlockManager") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index f854674a18aa9..f35b002de61cf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -29,6 +29,7 @@ import akka.actor.{Actor, ActorRef, Cancellable} import akka.pattern.ask import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.scheduler.{SparkListenerBlockManagerGained, SparkListenerBlockManagerLost} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -50,7 +51,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private val akkaTimeout = AkkaUtils.askTimeout(conf) - private val listeners = new ArrayBuffer[BlockManagerRegistrationListener] + private val listeners = new ArrayBuffer[BlockManagerStatusListener] val slaveTimeout = conf.get("spark.storage.blockManagerSlaveTimeoutMs", "" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong @@ -69,7 +70,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act super.preStart() } - def registerListener(listener: BlockManagerRegistrationListener) = listeners += listener + def registerListener(listener: BlockManagerStatusListener) { + listeners += listener + } def receive = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => @@ -164,6 +167,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act blockLocations.remove(locations) } } + val blockManagerLost = SparkListenerBlockManagerLost(blockManagerId) + listeners.foreach(_.onBlockManagerLost(blockManagerLost)) } private def expireDeadHosts() { @@ -240,7 +245,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) } - listeners.foreach(_.onBlockManagerRegister(storageStatus)) + val blockManagerGained = SparkListenerBlockManagerGained(id, maxMemSize) + listeners.foreach(_.onBlockManagerGained(blockManagerGained)) } private def updateBlockInfo( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStatusListener.scala similarity index 53% rename from core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala rename to core/src/main/scala/org/apache/spark/storage/BlockManagerStatusListener.scala index 4643ae29baa17..0a88ea79b2774 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerRegistrationListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStatusListener.scala @@ -17,12 +17,24 @@ package org.apache.spark.storage -import org.apache.spark.scheduler._ -import scala.collection.mutable.ArrayBuffer import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer -/** A listener for block manager registration */ -private[spark] class BlockManagerRegistrationListener { +import org.apache.spark.scheduler._ + +/** + * A listener for BlockManager status updates. + * + * This listener provides a way to post executor storage status information as soon as it + * is available (i.e. immediately after the associated BlockManager has registered with the + * driver). This is necessary because the SparkContext is only notified when an executor is + * launched, but by then the storage information is not ready yet. + * + * Further, it is possible for a BlockManager be registered before the listener bus on the + * driver is initialized (e.g. the driver's own BlockManager), in which case the corresponding + * event should be buffered. + */ +private[spark] class BlockManagerStatusListener extends SparkListener { private var _listenerBus: Option[SparkListenerBus] = None @@ -31,21 +43,24 @@ private[spark] class BlockManagerRegistrationListener { with mutable.SynchronizedBuffer[SparkListenerEvent] /** - * Set the listener bus. If there are buffered events, post them all to the listener bus at once. + * Set the listener bus. If there are buffered events, post them all to the listener bus. */ def setListenerBus(listenerBus: SparkListenerBus) = { _listenerBus = Some(listenerBus) - bufferedEvents.map(listenerBus.post) + bufferedEvents.map(listenerBus.postToAll) } /** - * Called when a new BlockManager is registered with the master. If the listener bus is ready, - * post the event; otherwise, buffer it. + * Post the event if the listener bus is ready; otherwise, buffer it. */ - def onBlockManagerRegister(storageStatus: Array[StorageStatus]) { - val executorsStateChange = SparkListenerExecutorsStateChange(storageStatus) - _listenerBus.map(_.post(executorsStateChange)).getOrElse { - bufferedEvents += executorsStateChange - } + private def postOrBuffer(event: SparkListenerEvent) { + _listenerBus.map(_.post(event)).getOrElse { bufferedEvents += event } } + + override def onBlockManagerGained(blockManagerGained: SparkListenerBlockManagerGained) = + postOrBuffer(blockManagerGained) + + override def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) = + postOrBuffer(blockManagerLost) + } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 94ac8caeacb24..07e0ec1ffbced 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -28,7 +28,7 @@ private[spark] class StorageStatus( val blockManagerId: BlockManagerId, val maxMem: Long, - val blocks: mutable.Map[BlockId, BlockStatus]) { + val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) { def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L) diff --git a/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala b/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala index 61ecf0e5e882d..405144bbbf9f0 100644 --- a/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala @@ -28,7 +28,9 @@ private[ui] trait UISparkListener extends SparkListener * A SparkListener that maintains executor storage status */ private[ui] class StorageStatusSparkListener extends UISparkListener { - var storageStatusList = Seq[StorageStatus]() + val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() + + def storageStatusList = executorIdToStorageStatus.values.toSeq /** Update storage status list to reflect updated block statuses */ def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { @@ -68,8 +70,17 @@ private[ui] class StorageStatusSparkListener extends UISparkListener { updateStorageStatus(unpersistRDD.rddId) } - override def onExecutorsStateChange(executorsStateChange: SparkListenerExecutorsStateChange) { - storageStatusList = executorsStateChange.storageStatusList + override def onBlockManagerGained(blockManagerGained: SparkListenerBlockManagerGained) { + val blockManagerId = blockManagerGained.blockManagerId + val executorId = blockManagerId.executorId + val maxMem = blockManagerGained.maxMem + val storageStatus = new StorageStatus(blockManagerId, maxMem) + executorIdToStorageStatus(executorId) = storageStatus + } + + override def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { + val executorId = blockManagerLost.blockManagerId.executorId + executorIdToStorageStatus.remove(executorId) } /** @@ -81,6 +92,7 @@ private[ui] class StorageStatusSparkListener extends UISparkListener { protected def formatExecutorId(execId: String): String = { if (execId == "localhost") "" else execId } + } /** @@ -120,4 +132,5 @@ private[ui] class RDDInfoSparkListener extends StorageStatusSparkListener { super.onUnpersistRDD(unpersistRDD) updateRDDInfo() } + } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 49581bc6beb1e..c186c9fda9d21 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -56,9 +56,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) { private def createExecutorTable() : Seq[Node] = { // Make an executor-id -> address map val executorIdToAddress = mutable.HashMap[String, String]() - val storageStatusList = listener.storageStatusList - for (statusId <- 0 until storageStatusList.size) { - val blockManagerId = storageStatusList(statusId).blockManagerId + listener.blockManagerIds.foreach { blockManagerId => val address = blockManagerId.hostPort val executorId = blockManagerId.executorId executorIdToAddress.put(executorId, address) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index e191883e433b7..f80576e0bcf48 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -19,11 +19,12 @@ package org.apache.spark.ui.jobs import scala.collection.mutable.{ListBuffer, HashMap} -import org.apache.spark.{ExceptionFailure, SparkContext, Success} +import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler._ -import org.apache.spark.ui.StorageStatusSparkListener +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.ui.UISparkListener /** * Tracks task-level information to be displayed in the UI. @@ -32,21 +33,12 @@ import org.apache.spark.ui.StorageStatusSparkListener * class, since the UI thread and the DAGScheduler event loop may otherwise * be reading/updating the internal data structures concurrently. */ -private[ui] class JobProgressListener(sc: SparkContext, live: Boolean) - extends StorageStatusSparkListener { +private[ui] class JobProgressListener(conf: SparkConf) extends UISparkListener { import JobProgressListener._ // How many stages to remember - val retainedStages = if (live) { - sc.conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) - } else { - DEFAULT_RETAINED_STAGES - } - - val stageIdToPool = new HashMap[Int, String]() - val stageIdToDescription = new HashMap[Int, String]() - val poolToActiveStages = new HashMap[String, HashMap[Int, StageInfo]]() + val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) val activeStages = HashMap[Int, StageInfo]() val completedStages = ListBuffer[StageInfo]() @@ -67,9 +59,16 @@ private[ui] class JobProgressListener(sc: SparkContext, live: Boolean) val stageIdToTasksFailed = HashMap[Int, Int]() val stageIdToTaskInfos = HashMap[Int, HashMap[Long, TaskUIData]]() val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]() + val stageIdToPool = HashMap[Int, String]() + val stageIdToDescription = HashMap[Int, String]() + val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() + + val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() var schedulingMode: Option[SchedulingMode] = None + def blockManagerIds = executorIdToBlockManagerId.values.toSeq + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stageInfo val stageId = stage.stageId @@ -232,7 +231,7 @@ private[ui] class JobProgressListener(sc: SparkContext, live: Boolean) } } - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) = { + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { val schedulingModeName = environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode") schedulingMode = schedulingModeName match { @@ -240,6 +239,18 @@ private[ui] class JobProgressListener(sc: SparkContext, live: Boolean) case None => None } } + + override def onBlockManagerGained(blockManagerGained: SparkListenerBlockManagerGained) { + val blockManagerId = blockManagerGained.blockManagerId + val executorId = blockManagerId.executorId + executorIdToBlockManagerId(executorId) = blockManagerId + } + + override def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { + val executorId = blockManagerLost.blockManagerId.executorId + executorIdToBlockManagerId.remove(executorId) + } + } private[ui] case class TaskUIData( diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index 09b820c765149..8f4172dd8ff09 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.Handler +import org.apache.spark.SparkConf import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.SparkUI @@ -42,7 +43,8 @@ private[ui] class JobProgressUI(parent: SparkUI) { private var _listener: Option[JobProgressListener] = None def start() { - _listener = Some(new JobProgressListener(sc, live)) + val conf = if (live) sc.conf else new SparkConf + _listener = Some(new JobProgressListener(conf)) } def formatDuration(ms: Long) = Utils.msDurationToString(ms) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 99e7c4b6d97c6..fc9a2b7e4658a 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -57,8 +57,10 @@ private[spark] object JsonProtocol { jobEndToJson(jobEnd) case environmentUpdate: SparkListenerEnvironmentUpdate => environmentUpdateToJson(environmentUpdate) - case executorsStateChange: SparkListenerExecutorsStateChange => - executorsStateChangeToJson(executorsStateChange) + case blockManagerGained: SparkListenerBlockManagerGained => + blockManagerGainedToJson(blockManagerGained) + case blockManagerLost: SparkListenerBlockManagerLost => + blockManagerLostToJson(blockManagerLost) case unpersistRDD: SparkListenerUnpersistRDD => unpersistRDDToJson(unpersistRDD) case SparkListenerShutdown => @@ -137,12 +139,17 @@ private[spark] object JsonProtocol { ("Classpath Entries" -> classpathEntries) } - def executorsStateChangeToJson(executorsStateChange: SparkListenerExecutorsStateChange) - : JValue = { - val storageStatusList = - JArray(executorsStateChange.storageStatusList.map(storageStatusToJson).toList) - ("Event" -> Utils.getFormattedClassName(executorsStateChange)) ~ - ("Storage Status List" -> storageStatusList) + def blockManagerGainedToJson(blockManagerGained: SparkListenerBlockManagerGained): JValue = { + val blockManagerId = blockManagerIdToJson(blockManagerGained.blockManagerId) + ("Event" -> Utils.getFormattedClassName(blockManagerGained)) ~ + ("Block Manager ID" -> blockManagerId) ~ + ("Maximum Memory" -> blockManagerGained.maxMem) + } + + def blockManagerLostToJson(blockManagerLost: SparkListenerBlockManagerLost): JValue = { + val blockManagerId = blockManagerIdToJson(blockManagerLost.blockManagerId) + ("Event" -> Utils.getFormattedClassName(blockManagerLost)) ~ + ("Block Manager ID" -> blockManagerId) } def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = { @@ -382,7 +389,8 @@ private[spark] object JsonProtocol { val jobStart = Utils.getFormattedClassName(SparkListenerJobStart) val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd) val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) - val executorsStateChanged = Utils.getFormattedClassName(SparkListenerExecutorsStateChange) + val blockManagerGained = Utils.getFormattedClassName(SparkListenerBlockManagerGained) + val blockManagerLost = Utils.getFormattedClassName(SparkListenerBlockManagerLost) val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD) val shutdown = Utils.getFormattedClassName(SparkListenerShutdown) @@ -395,7 +403,8 @@ private[spark] object JsonProtocol { case `jobStart` => jobStartFromJson(json) case `jobEnd` => jobEndFromJson(json) case `environmentUpdate` => environmentUpdateFromJson(json) - case `executorsStateChanged` => executorsStateChangeFromJson(json) + case `blockManagerGained` => blockManagerGainedFromJson(json) + case `blockManagerLost` => blockManagerLostFromJson(json) case `unpersistRDD` => unpersistRDDFromJson(json) case `shutdown` => SparkListenerShutdown } @@ -454,10 +463,15 @@ private[spark] object JsonProtocol { SparkListenerEnvironmentUpdate(environmentDetails) } - def executorsStateChangeFromJson(json: JValue): SparkListenerExecutorsStateChange = { - val storageStatusList = - (json \ "Storage Status List").extract[List[JValue]].map(storageStatusFromJson) - SparkListenerExecutorsStateChange(storageStatusList) + def blockManagerGainedFromJson(json: JValue): SparkListenerBlockManagerGained = { + val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") + val maxMem = (json \ "Maximum Memory").extract[Long] + SparkListenerBlockManagerGained(blockManagerId, maxMem) + } + + def blockManagerLostFromJson(json: JValue): SparkListenerBlockManagerLost = { + val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") + SparkListenerBlockManagerLost(blockManagerId) } def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {