Skip to content

Commit

Permalink
Reflect RDD unpersist on UI
Browse files Browse the repository at this point in the history
This introduces a new event, SparkListenerUnpersistRDD.
  • Loading branch information
andrewor14 committed Feb 27, 2014
1 parent 7b2f811 commit 996d7a2
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 8 deletions.
19 changes: 18 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ class SparkContext(
}

/**
* Return current scheduling mode
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
Expand All @@ -727,6 +727,23 @@ class SparkContext(
dagScheduler.getPreferredLocs(rdd, partition)
}

/**
* Register an RDD to be persisted in memory and/or disk storage
*/
private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}

/**
* Unpersist an RDD from memory and/or disk storage
*/
private[spark] def unpersistRDD(rdd: RDD[_], blocking: Boolean = true) {
val rddId = rdd.id
env.blockManager.master.removeRdd(rddId, blocking)
persistentRdds.remove(rddId)
dagScheduler.post(new SparkListenerUnpersistRDD(rddId))
}

/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ abstract class RDD[T: ClassTag](
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
storageLevel = newLevel
// Register the RDD with the SparkContext
sc.persistentRdds(id) = this
this
}

Expand All @@ -165,8 +164,7 @@ abstract class RDD[T: ClassTag](
*/
def unpersist(blocking: Boolean = true): RDD[T] = {
logInfo("Removing RDD " + id + " from persistence list")
sc.env.blockManager.master.removeRdd(id, blocking)
sc.persistentRdds.remove(id)
sc.unpersistRDD(this, blocking)
storageLevel = StorageLevel.NONE
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
case class SparkListenerExecutorsStateChange(storageStatusList: Seq[StorageStatus])
extends SparkListenerEvent

case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

/** An event used in the listener to shutdown the listener daemon thread. */
private[spark] case object SparkListenerShutdown extends SparkListenerEvent

Expand Down Expand Up @@ -112,6 +114,10 @@ trait SparkListener {
*/
def onExecutorsStateChange(executorsStateChange: SparkListenerExecutorsStateChange) { }

/**
* Called when an RDD is manually unpersisted by the application
*/
def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ private[spark] class SparkListenerBus extends Logging {
listeners.foreach(_.onEnvironmentUpdate(environmentUpdate))
case executorsStateChange: SparkListenerExecutorsStateChange =>
listeners.foreach(_.onExecutorsStateChange(executorsStateChange))
case unpersistRDD: SparkListenerUnpersistRDD =>
listeners.foreach(_.onUnpersistRDD(unpersistRDD))
case SparkListenerShutdown =>
return true
case _ =>
Expand Down
31 changes: 28 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/UISparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, live: Boolean) extends
}
}

private def startLogger() = logger.foreach(_.start())
private def closeLogger() = logger.foreach(_.close())
private def restartLogger() = logger.foreach(_.start())

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
listeners.foreach(_.onStageSubmitted(stageSubmitted))
Expand All @@ -89,7 +89,7 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, live: Boolean) extends

override def onJobStart(jobStart: SparkListenerJobStart) {
listeners.foreach(_.onJobStart(jobStart))
restartLogger()
startLogger()
logEvent(jobStart)
}

Expand All @@ -115,6 +115,13 @@ private[ui] class GatewayUISparkListener(parent: SparkUI, live: Boolean) extends
listeners.foreach(_.onExecutorsStateChange(executorsStateChange))
logEvent(executorsStateChange, flushLogger = true)
}

override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) {
listeners.foreach(_.onUnpersistRDD(unpersistRDD))
// In case logger has not already started, as unpersist may be called between jobs
startLogger()
logEvent(unpersistRDD, flushLogger = true)
}
}

/**
Expand All @@ -133,6 +140,16 @@ private[ui] class StorageStatusSparkListener extends UISparkListener {
}
}

/** Update storage status list to reflect the removal of an RDD from the cache */
def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
unpersistedBlocksIds.foreach { blockId =>
storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L)
}
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val execId = taskEnd.taskInfo.executorId
val metrics = taskEnd.taskMetrics
Expand All @@ -144,6 +161,10 @@ private[ui] class StorageStatusSparkListener extends UISparkListener {
}
}

override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) {
updateStorageStatus(unpersistRDD.rddId)
}

override def onExecutorsStateChange(executorsStateChange: SparkListenerExecutorsStateChange) {
storageStatusList = executorsStateChange.storageStatusList
}
Expand Down Expand Up @@ -179,7 +200,11 @@ private[ui] class RDDInfoSparkListener extends StorageStatusSparkListener {

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
// Remove all partitions that are no longer cached
// TODO(aor): Handle unpersist
_rddInfoMap.retain { case (id, info) => info.numCachedPartitions > 0 }
}

override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) {
super.onUnpersistRDD(unpersistRDD)
updateRDDInfo()
}
}
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ private[spark] object JsonProtocol {
environmentUpdateToJson(environmentUpdate)
case executorsStateChange: SparkListenerExecutorsStateChange =>
executorsStateChangeToJson(executorsStateChange)
case unpersistRDD: SparkListenerUnpersistRDD =>
unpersistRDDToJson(unpersistRDD)
case SparkListenerShutdown =>
shutdownToJson()
}
Expand Down Expand Up @@ -147,6 +149,11 @@ private[spark] object JsonProtocol {
("Storage Status List" -> storageStatusList)
}

def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~
("RDD ID" -> unpersistRDD.rddId)
}

def shutdownToJson(): JValue = {
"Event" -> Utils.getFormattedClassName(SparkListenerShutdown)
}
Expand Down Expand Up @@ -381,6 +388,7 @@ private[spark] object JsonProtocol {
val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart)
val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate)
val executorsStateChanged = Utils.getFormattedClassName(SparkListenerExecutorsStateChange)
val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD)
val shutdown = Utils.getFormattedClassName(SparkListenerShutdown)

(json \ "Event").extract[String] match {
Expand All @@ -394,6 +402,7 @@ private[spark] object JsonProtocol {
case `applicationStart` => applicationStartFromJson(json)
case `environmentUpdate` => environmentUpdateFromJson(json)
case `executorsStateChanged` => executorsStateChangeFromJson(json)
case `unpersistRDD` => unpersistRDDFromJson(json)
case `shutdown` => SparkListenerShutdown
}
}
Expand Down Expand Up @@ -461,6 +470,10 @@ private[spark] object JsonProtocol {
new SparkListenerExecutorsStateChange(storageStatusList)
}

def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
new SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
}

/**
* JSON deserialization methods for classes SparkListenerEvent's depend on
*/
Expand Down

0 comments on commit 996d7a2

Please sign in to comment.