Skip to content

Commit

Permalink
Avoid logging all the blocks on each executor
Browse files Browse the repository at this point in the history
SparkListenerExecutorsStateChange is refactored into two events:
SparkListenerBlockManagerGained and SparkListenerBlockManagerLost.
Both of these convey the minimum amount of information needed to
reconstruct the storage status (i.e. the BlockManagerId, and in the
registration case, the maximum memory associated with the block
manager).

Further, each executor state change no longer involves logging
storage statuses for ALL executors, when only one has been updated.
  • Loading branch information
andrewor14 committed Mar 7, 2014
1 parent d6e3b4a commit d59da5f
Show file tree
Hide file tree
Showing 15 changed files with 167 additions and 105 deletions.
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ class SparkContext(
@volatile private[spark] var dagScheduler = new DAGScheduler(this)
dagScheduler.start()

postEnvironmentUpdateEvent()
postEnvironmentUpdate()
listenForBlockManagerUpdates()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
Expand Down Expand Up @@ -642,7 +643,7 @@ class SparkContext(
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdateEvent()
postEnvironmentUpdate()
}

def addSparkListener(listener: SparkListener) {
Expand Down Expand Up @@ -791,7 +792,7 @@ class SparkContext(
logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
}
}
postEnvironmentUpdateEvent()
postEnvironmentUpdate()
}

/**
Expand Down Expand Up @@ -1039,7 +1040,7 @@ class SparkContext(
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/** Post the environment update event if the listener bus is ready */
private def postEnvironmentUpdateEvent() {
private def postEnvironmentUpdate() {
Option(listenerBus).foreach { bus =>
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
Expand All @@ -1051,6 +1052,11 @@ class SparkContext(
}
}

/** Start listening for block manager status update events */
private def listenForBlockManagerUpdates() {
env.blockManager.master.listener.map(_.setListenerBus(listenerBus))
}

/** Called by MetadataCleaner to clean up the persistentRdds map periodically */
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,19 @@ object SparkEnv extends Logging {
}
}

// Listen for block manager registration
val blockManagerListener = new BlockManagerRegistrationListener
val blockManagerStatusListener = new BlockManagerStatusListener

// Lazy because an akka actor cannot be instantiated outside of Props
lazy val blockManagerMasterActor = {
val actor = new BlockManagerMasterActor(isLocal, conf)
actor.registerListener(blockManagerListener)
actor.registerListener(blockManagerStatusListener)
actor
}

val blockManagerMaster =
new BlockManagerMaster(registerOrLookup("BlockManagerMaster", blockManagerMasterActor), conf)
blockManagerMaster.registrationListener = Some(blockManagerListener)
val blockManagerMaster = new BlockManagerMaster(
registerOrLookup("BlockManagerMaster", blockManagerMasterActor),
conf,
blockManagerStatusListener)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,6 @@ class DAGScheduler(
}
}
}))

// Start listening for block manager registration
blockManagerMaster.registrationListener.foreach(_.setListenerBus(listenerBus))
}

// Called by TaskScheduler to report task's starting.
Expand Down Expand Up @@ -194,7 +191,7 @@ class DAGScheduler(

private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
if (!cacheLocs.contains(rdd.id)) {
val blockIds = rdd.partitions.indices.map(index=> RDDBlockId(rdd.id, index)).toArray[BlockId]
val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
cacheLocs(rdd.id) = blockIds.map { id =>
locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
Expand Down Expand Up @@ -973,11 +970,6 @@ class DAGScheduler(
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
}
// Block manager master actor should not be null except during tests
if (blockManagerMaster.driverActor != null) {
val storageStatusList = blockManagerMaster.getStorageStatus
listenerBus.post(SparkListenerExecutorsStateChange(storageStatusList))
}
}

private def handleExecutorGained(execId: String, host: String) {
Expand All @@ -986,8 +978,6 @@ class DAGScheduler(
logInfo("Host gained which was in lost list earlier: " + host)
failedEpoch -= execId
}
// Do not trigger SparkListenerExecutorsStateChange, because it is already triggered in
// blockManagerMaster.registrationListener when a new BlockManager registers with the master
}

private def handleJobCancellation(jobId: Int) {
Expand Down
36 changes: 14 additions & 22 deletions core/src/main/scala/org/apache/spark/scheduler/EventBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,30 @@ private[spark] trait EventBus {
/**
* Post an event to all attached listeners. Return true if the shutdown event is posted.
*/
protected def postToAll(event: SparkListenerEvent): Boolean = {
postToListeners(event, sparkListeners)
}

/**
* Post an event to a given list of listeners. Return true if the shutdown event is posted.
*/
protected def postToListeners(
event: SparkListenerEvent,
listeners: Seq[SparkListener]): Boolean = {

def postToAll(event: SparkListenerEvent): Boolean = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listeners.foreach(_.onStageSubmitted(stageSubmitted))
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
case stageCompleted: SparkListenerStageCompleted =>
listeners.foreach(_.onStageCompleted(stageCompleted))
sparkListeners.foreach(_.onStageCompleted(stageCompleted))
case jobStart: SparkListenerJobStart =>
listeners.foreach(_.onJobStart(jobStart))
sparkListeners.foreach(_.onJobStart(jobStart))
case jobEnd: SparkListenerJobEnd =>
listeners.foreach(_.onJobEnd(jobEnd))
sparkListeners.foreach(_.onJobEnd(jobEnd))
case taskStart: SparkListenerTaskStart =>
listeners.foreach(_.onTaskStart(taskStart))
sparkListeners.foreach(_.onTaskStart(taskStart))
case taskGettingResult: SparkListenerTaskGettingResult =>
listeners.foreach(_.onTaskGettingResult(taskGettingResult))
sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
case taskEnd: SparkListenerTaskEnd =>
listeners.foreach(_.onTaskEnd(taskEnd))
sparkListeners.foreach(_.onTaskEnd(taskEnd))
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listeners.foreach(_.onEnvironmentUpdate(environmentUpdate))
case executorsStateChange: SparkListenerExecutorsStateChange =>
listeners.foreach(_.onExecutorsStateChange(executorsStateChange))
sparkListeners.foreach(_.onEnvironmentUpdate(environmentUpdate))
case blockManagerGained: SparkListenerBlockManagerGained =>
sparkListeners.foreach(_.onBlockManagerGained(blockManagerGained))
case blockManagerLost: SparkListenerBlockManagerLost =>
sparkListeners.foreach(_.onBlockManagerLost(blockManagerLost))
case unpersistRDD: SparkListenerUnpersistRDD =>
listeners.foreach(_.onUnpersistRDD(unpersistRDD))
sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD))
case SparkListenerShutdown =>
return true
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
logEvent(event, flushLogger = true)
override def onJobEnd(event: SparkListenerJobEnd) =
logEvent(event, flushLogger = true)
override def onExecutorsStateChange(event: SparkListenerExecutorsStateChange) =
override def onBlockManagerGained(event: SparkListenerBlockManagerGained) =
logEvent(event, flushLogger = true)
override def onBlockManagerLost(event: SparkListenerBlockManagerLost) =
logEvent(event, flushLogger = true)
override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
logEvent(event, flushLogger = true)
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.Map
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.StorageStatus
import org.apache.spark.storage.{BlockManagerId, StorageStatus}

sealed trait SparkListenerEvent

Expand All @@ -48,9 +48,11 @@ case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkLi
case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])
extends SparkListenerEvent

case class SparkListenerExecutorsStateChange(storageStatusList: Seq[StorageStatus])
case class SparkListenerBlockManagerGained(blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent

case class SparkListenerBlockManagerLost(blockManagerId: BlockManagerId) extends SparkListenerEvent

case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

/** An event used in the listener to shutdown the listener daemon thread. */
Expand Down Expand Up @@ -103,9 +105,14 @@ trait SparkListener {
def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { }

/**
* Called when a new executor has joined, or an existing executor is lost
* Called when a new block manager has joined
*/
def onBlockManagerGained(blockManagerGained: SparkListenerBlockManagerGained) { }

/**
* Called when an existing block manager has been lost
*/
def onExecutorsStateChange(executorsStateChange: SparkListenerExecutorsStateChange) { }
def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) { }

/**
* Called when an RDD is manually unpersisted by the application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils

private[spark]
class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging {
class BlockManagerMaster(
var driverActor: ActorRef,
conf: SparkConf,
val listener: Option[BlockManagerStatusListener] = None)
extends Logging {

def this(driverActor: ActorRef, conf: SparkConf, listener: BlockManagerStatusListener) =
this(driverActor, conf, Some(listener))

val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3)
val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000)
Expand All @@ -37,8 +44,6 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log

val timeout = AkkaUtils.askTimeout(conf)

var registrationListener: Option[BlockManagerRegistrationListener] = None

/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
Expand All @@ -55,8 +60,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}

/** Register the BlockManager's id with the driver. */
def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
logInfo("Trying to register BlockManager")
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
logInfo("Registered BlockManager")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.scheduler.{SparkListenerBlockManagerGained, SparkListenerBlockManagerLost}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{AkkaUtils, Utils}

Expand All @@ -50,7 +51,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act

private val akkaTimeout = AkkaUtils.askTimeout(conf)

private val listeners = new ArrayBuffer[BlockManagerRegistrationListener]
private val listeners = new ArrayBuffer[BlockManagerStatusListener]

val slaveTimeout = conf.get("spark.storage.blockManagerSlaveTimeoutMs",
"" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong
Expand All @@ -69,7 +70,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
super.preStart()
}

def registerListener(listener: BlockManagerRegistrationListener) = listeners += listener
def registerListener(listener: BlockManagerStatusListener) {
listeners += listener
}

def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
Expand Down Expand Up @@ -164,6 +167,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
blockLocations.remove(locations)
}
}
val blockManagerLost = SparkListenerBlockManagerLost(blockManagerId)
listeners.foreach(_.onBlockManagerLost(blockManagerLost))
}

private def expireDeadHosts() {
Expand Down Expand Up @@ -240,7 +245,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
maxMemSize, slaveActor)
}
listeners.foreach(_.onBlockManagerRegister(storageStatus))
val blockManagerGained = SparkListenerBlockManagerGained(id, maxMemSize)
listeners.foreach(_.onBlockManagerGained(blockManagerGained))
}

private def updateBlockInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,24 @@

package org.apache.spark.storage

import org.apache.spark.scheduler._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/** A listener for block manager registration */
private[spark] class BlockManagerRegistrationListener {
import org.apache.spark.scheduler._

/**
* A listener for BlockManager status updates.
*
* This listener provides a way to post executor storage status information as soon as it
* is available (i.e. immediately after the associated BlockManager has registered with the
* driver). This is necessary because the SparkContext is only notified when an executor is
* launched, but by then the storage information is not ready yet.
*
* Further, it is possible for a BlockManager be registered before the listener bus on the
* driver is initialized (e.g. the driver's own BlockManager), in which case the corresponding
* event should be buffered.
*/
private[spark] class BlockManagerStatusListener extends SparkListener {

private var _listenerBus: Option[SparkListenerBus] = None

Expand All @@ -31,21 +43,24 @@ private[spark] class BlockManagerRegistrationListener {
with mutable.SynchronizedBuffer[SparkListenerEvent]

/**
* Set the listener bus. If there are buffered events, post them all to the listener bus at once.
* Set the listener bus. If there are buffered events, post them all to the listener bus.
*/
def setListenerBus(listenerBus: SparkListenerBus) = {
_listenerBus = Some(listenerBus)
bufferedEvents.map(listenerBus.post)
bufferedEvents.map(listenerBus.postToAll)
}

/**
* Called when a new BlockManager is registered with the master. If the listener bus is ready,
* post the event; otherwise, buffer it.
* Post the event if the listener bus is ready; otherwise, buffer it.
*/
def onBlockManagerRegister(storageStatus: Array[StorageStatus]) {
val executorsStateChange = SparkListenerExecutorsStateChange(storageStatus)
_listenerBus.map(_.post(executorsStateChange)).getOrElse {
bufferedEvents += executorsStateChange
}
private def postOrBuffer(event: SparkListenerEvent) {
_listenerBus.map(_.post(event)).getOrElse { bufferedEvents += event }
}

override def onBlockManagerGained(blockManagerGained: SparkListenerBlockManagerGained) =
postOrBuffer(blockManagerGained)

override def onBlockManagerLost(blockManagerLost: SparkListenerBlockManagerLost) =
postOrBuffer(blockManagerLost)

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark]
class StorageStatus(
val blockManagerId: BlockManagerId,
val maxMem: Long,
val blocks: mutable.Map[BlockId, BlockStatus]) {
val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {

def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)

Expand Down
Loading

0 comments on commit d59da5f

Please sign in to comment.