Skip to content

Commit

Permalink
Fix BlockManagerUI bug by introducing new event
Browse files Browse the repository at this point in the history
Previously, the storage information of persisted RDD's continued to rely on the old SparkContext,
which is no longer accessible if the UI is rendered from disk. This fix solves it by introducing
an event, SparkListenerGetRDDInfo, which captures this information.

Per discussion with Patrick, an alternative is to encapsulate this information within
SparkListenerTaskEnd. This would bypass the need to create a new event, but would also require
a non-trivial refactor of BlockManager / BlockStore.
  • Loading branch information
andrewor14 committed Feb 19, 2014
1 parent 4273013 commit 64d2ce1
Show file tree
Hide file tree
Showing 15 changed files with 125 additions and 42 deletions.
45 changes: 34 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Properties
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.StorageStatus
import org.apache.spark.storage.{RDDInfo, StorageStatus}

import net.liftweb.json.JsonDSL._
import net.liftweb.json.JsonAST._
Expand Down Expand Up @@ -123,7 +123,7 @@ private[spark] case class SparkListenerLoadEnvironment(
}
}

/** An event used in the ExecutorUI to fetch storage status from SparkEnv */
/** An event used in the ExecutorsUI and BlockManagerUI to fetch storage status from SparkEnv */
private[spark] case class SparkListenerStorageStatusFetch(storageStatusList: Seq[StorageStatus])
extends SparkListenerEvent {
override def toJson = {
Expand All @@ -133,6 +133,16 @@ private[spark] case class SparkListenerStorageStatusFetch(storageStatusList: Seq
}
}

/** An event used in the BlockManagerUI to query information of persisted RDDs */
private[spark] case class SparkListenerGetRDDInfo(rddInfoList: Seq[RDDInfo])
extends SparkListenerEvent {
override def toJson = {
val rddInfoListJson = JArray(rddInfoList.map(_.toJson).toList)
super.toJson ~
("RDD Info List" -> rddInfoListJson)
}
}

/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object SparkListenerShutdown extends SparkListenerEvent

Expand All @@ -151,6 +161,7 @@ object SparkListenerEvent {
val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd)
val loadEnvironment = Utils.getFormattedClassName(SparkListenerLoadEnvironment)
val storageStatusFetch = Utils.getFormattedClassName(SparkListenerStorageStatusFetch)
val getRDDInfo = Utils.getFormattedClassName(SparkListenerGetRDDInfo)
val shutdown = Utils.getFormattedClassName(SparkListenerShutdown)

(json \ "Event").extract[String] match {
Expand All @@ -163,32 +174,33 @@ object SparkListenerEvent {
case `jobEnd` => jobEndFromJson(json)
case `loadEnvironment` => loadEnvironmentFromJson(json)
case `storageStatusFetch` => storageStatusFetchFromJson(json)
case `getRDDInfo` => getRDDInfoFromJson(json)
case `shutdown` => SparkListenerShutdown
}
}

private def stageSubmittedFromJson(json: JValue) = {
private def stageSubmittedFromJson(json: JValue): SparkListenerEvent = {
new SparkListenerStageSubmitted(
StageInfo.fromJson(json \ "Stage Info"),
Utils.propertiesFromJson(json \ "Properties"))
}

private def stageCompletedFromJson(json: JValue) = {
private def stageCompletedFromJson(json: JValue): SparkListenerEvent = {
new SparkListenerStageCompleted(StageInfo.fromJson(json \ "Stage Info"))
}

private def taskStartFromJson(json: JValue) = {
private def taskStartFromJson(json: JValue): SparkListenerEvent = {
implicit val format = DefaultFormats
new SparkListenerTaskStart(
(json \ "Stage ID").extract[Int],
TaskInfo.fromJson(json \ "Task Info"))
}

private def taskGettingResultFromJson(json: JValue) = {
private def taskGettingResultFromJson(json: JValue): SparkListenerEvent = {
new SparkListenerTaskGettingResult(TaskInfo.fromJson(json \ "Task Info"))
}

private def taskEndFromJson(json: JValue) = {
private def taskEndFromJson(json: JValue): SparkListenerEvent = {
implicit val format = DefaultFormats
new SparkListenerTaskEnd(
(json \ "Stage ID").extract[Int],
Expand All @@ -198,7 +210,7 @@ object SparkListenerEvent {
TaskMetrics.fromJson(json \ "Task Metrics"))
}

private def jobStartFromJson(json: JValue) = {
private def jobStartFromJson(json: JValue): SparkListenerEvent = {
implicit val format = DefaultFormats
val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
new SparkListenerJobStart(
Expand All @@ -207,14 +219,14 @@ object SparkListenerEvent {
Utils.propertiesFromJson(json \ "Properties"))
}

private def jobEndFromJson(json: JValue) = {
private def jobEndFromJson(json: JValue): SparkListenerEvent = {
implicit val format = DefaultFormats
new SparkListenerJobEnd(
(json \ "Job ID").extract[Int],
JobResult.fromJson(json \ "Job Result"))
}

private def loadEnvironmentFromJson(json: JValue) = {
private def loadEnvironmentFromJson(json: JValue): SparkListenerEvent = {
implicit val format = DefaultFormats
new SparkListenerLoadEnvironment(
Utils.mapFromJson(json \ "JVM Information").toSeq,
Expand All @@ -223,12 +235,19 @@ object SparkListenerEvent {
Utils.mapFromJson(json \ "Classpath Entries").toSeq)
}

private def storageStatusFetchFromJson(json: JValue) = {
private def storageStatusFetchFromJson(json: JValue): SparkListenerEvent = {
implicit val format = DefaultFormats
val storageStatusList =
(json \ "Storage Status List").extract[List[JValue]].map(StorageStatus.fromJson)
new SparkListenerStorageStatusFetch(storageStatusList)
}

private def getRDDInfoFromJson(json: JValue): SparkListenerEvent = {
implicit val format = DefaultFormats
val rddInfoList =
(json \ "RDD Info List").extract[List[JValue]].map(RDDInfo.fromJson)
new SparkListenerGetRDDInfo(rddInfoList)
}
}


Expand Down Expand Up @@ -282,6 +301,10 @@ trait SparkListener {
*/
def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) { }

/**
* Called when Spark queries statuses of persisted RDD's
*/
def onGetRDDInfo(getRDDInfo: SparkListenerGetRDDInfo) { }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private[spark] class SparkListenerBus extends Logging {
listeners.foreach(_.onLoadEnvironment(loadEnvironment))
case storageStatusFetch: SparkListenerStorageStatusFetch =>
listeners.foreach(_.onStorageStatusFetch(storageStatusFetch))
case getRDDInfo: SparkListenerGetRDDInfo =>
listeners.foreach(_.onGetRDDInfo(getRDDInfo))
case SparkListenerShutdown =>
return true
case _ =>
Expand Down
41 changes: 35 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,48 @@ case object StorageStatus {
}
}

case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
extends Ordered[RDDInfo] {
case class RDDInfo(
id: Int,
name: String,
storageLevel: StorageLevel,
numCachedPartitions: Int,
numPartitions: Int,
memSize: Long,
diskSize: Long)
extends JsonSerializable with Ordered[RDDInfo] {
override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
"DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions,
numPartitions, bytesToString(memSize), bytesToString(diskSize))
numPartitions, Utils.bytesToString(memSize), Utils.bytesToString(diskSize))
}

override def compare(that: RDDInfo) = {
this.id - that.id
}

override def toJson = {
("RDD ID" -> id) ~
("Name" -> name) ~
("Storage Level" -> storageLevel.toJson) ~
("Number of Cached Partitions" -> numCachedPartitions) ~
("Number of Partitions" -> numPartitions) ~
("Memory Size" -> memSize) ~
("Disk Size" -> diskSize)
}
}

case object RDDInfo {
def fromJson(json: JValue): RDDInfo = {
implicit val format = DefaultFormats
new RDDInfo(
(json \ "RDD ID").extract[Int],
(json \ "Name").extract[String],
StorageLevel.fromJson(json \ "Storage Level"),
(json \ "Number of Cached Partitions").extract[Int],
(json \ "Number of Partitions").extract[Int],
(json \ "Memory Size").extract[Long],
(json \ "Disk Size").extract[Long])
}
}

/* Helper methods for storage-related objects */
Expand All @@ -114,7 +143,7 @@ object StorageUtils {
sc: SparkContext) : Array[RDDInfo] = {

// Group by rddId, ignore the partition name
val groupedRddBlocks = infos.groupBy { case(k, v) => k.rddId }.mapValues(_.values.toArray)
val groupedRddBlocks = infos.groupBy { case (k, v) => k.rddId }.mapValues(_.values.toArray)

// For each RDD, generate an RDDInfo object
val rddInfos = groupedRddBlocks.map { case (rddId, rddBlocks) =>
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ private[spark] class SparkUI(val sc: SparkContext, live: Boolean = true) extends
.format(dirPath))
return false
}
val logFiles = logDir.listFiles.filter(_.isFile)
// Maintaining the order of log files is important because information of one job is
// dependent on that of another
val logFiles = logDir.listFiles.filter(_.isFile).sortBy(_.getName)
if (logFiles.size == 0) {
logWarning("No logs found in given directory %s when rendering persisted Spark Web UI!"
.format(dirPath))
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/UISparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ private[spark] class GatewayUISparkListener(live: Boolean) extends SparkListener
logEvent(storageStatusFetch)
logger.foreach(_.flush())
}

override def onGetRDDInfo(getRDDInfo: SparkListenerGetRDDInfo) {
listeners.foreach(_.onGetRDDInfo(getRDDInfo))
logEvent(getRDDInfo)
logger.foreach(_.flush())
}
}

/**
Expand Down Expand Up @@ -140,9 +146,6 @@ private[spark] class StorageStatusFetchSparkListener(
}
}

/**
* Update local state with fetch result, and log the appropriate event
*/
override def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) {
storageStatusList = storageStatusFetch.storageStatusList
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object UIUtils {
def prependBaseUri(resource: String = "") = uiRoot + resource

/** Returns a spark page with correctly formatted headers */
def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
def headerSparkPage(content: => Seq[Node], appName: String, title: String, page: Page.Value)
: Seq[Node] = {
val jobs = page match {
case Stages => <li class="active"><a href={prependBaseUri("/stages")}>Stages</a></li>
Expand Down Expand Up @@ -60,7 +60,7 @@ private[spark] object UIUtils {
type="text/css" />
<link rel="stylesheet" href={prependBaseUri("/static/webui.css")} type="text/css" />
<script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{sc.appName} - {title}</title>
<title>{appName} - {title}</title>
</head>
<body>
<div class="navbar navbar-static-top">
Expand All @@ -74,7 +74,7 @@ private[spark] object UIUtils {
{environment}
{executors}
</ul>
<p class="navbar-text pull-right"><strong>{sc.appName}</strong> application UI</p>
<p class="navbar-text pull-right"><strong>{appName}</strong> application UI</p>
</div>
</div>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[spark] class EnvironmentUI(parent: SparkUI, live: Boolean) {
<h4>Classpath Entries</h4> {classpathEntriesTable}
</span>

UIUtils.headerSparkPage(content, sc, "Environment", Environment)
UIUtils.headerSparkPage(content, sc.appName, "Environment", Environment)
}

private def propertyHeader = Seq("Name", "Value")
Expand Down Expand Up @@ -120,7 +120,6 @@ private[spark] class EnvironmentListener(
}
}

/** Prepare environment information for UI to render */
override def onLoadEnvironment(loadEnvironment: SparkListenerLoadEnvironment) {
jvmInformation = loadEnvironment.jvmInformation
sparkProperties = loadEnvironment.sparkProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class ExecutorsUI(parent: SparkUI, live: Boolean) {
</div>
</div>;

UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors)
UIUtils.headerSparkPage(content, sc.appName, "Executors (" + execInfo.size + ")", Executors)
}

/** Header fields for the executors table */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
failedStagesTable.toNodeSeq

UIUtils.headerSparkPage(content, sc, "Spark Stages", Stages)
UIUtils.headerSparkPage(content, sc.appName, "Spark Stages", Stages)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] class PoolPage(parent: JobProgressUI) {
val content = <h4>Summary </h4> ++ poolTable.toNodeSeq ++
<h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq

UIUtils.headerSparkPage(content, sc, "Fair Scheduler Pool: " + poolName, Stages)
UIUtils.headerSparkPage(content, sc.appName, "Fair Scheduler Pool: " + poolName, Stages)
}
}
}
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
<h4>Summary Metrics</h4> No tasks have started yet
<h4>Tasks</h4> No tasks have started yet
</div>
return UIUtils.headerSparkPage(content, sc, "Details for Stage %s".format(stageId), Stages)
return UIUtils.headerSparkPage(
content, sc.appName, "Details for Stage %s".format(stageId), Stages)
}

val tasks = listener.stageIdToTaskInfos(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
Expand Down Expand Up @@ -202,7 +203,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
<h4>Tasks</h4> ++ taskTable

UIUtils.headerSparkPage(content, sc, "Details for Stage %d".format(stageId), Stages)
UIUtils.headerSparkPage(content, sc.appName, "Details for Stage %d".format(stageId), Stages)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler

import org.apache.spark.SparkContext
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageUtils, RDDInfo}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.{GatewayUISparkListener, SparkUI, StorageStatusFetchSparkListener}

Expand Down Expand Up @@ -52,4 +54,28 @@ private[spark] class BlockManagerListener(
sc: SparkContext,
gateway: GatewayUISparkListener,
live: Boolean)
extends StorageStatusFetchSparkListener(sc, gateway, live)
extends StorageStatusFetchSparkListener(sc, gateway, live) {
var rddInfoList: Seq[RDDInfo] = Seq()

def getRDDInfo() {
if (live) {
val rddInfo = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
val getRDDInfo = new SparkListenerGetRDDInfo(rddInfo)
gateway.onGetRDDInfo(getRDDInfo)
}
}

override def onGetRDDInfo(getRDDInfo: SparkListenerGetRDDInfo) {
rddInfoList = getRDDInfo.rddInfoList
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
super.onStageSubmitted(stageSubmitted)
getRDDInfo()
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = {
super.onStageCompleted(stageCompleted)
getRDDInfo()
}
}
Loading

0 comments on commit 64d2ce1

Please sign in to comment.