Skip to content

Commit

Permalink
Also deserialize new events
Browse files Browse the repository at this point in the history
This includes SparkListenerLoadEnvironment and SparkListenerStorageStatusFetch
  • Loading branch information
andrewor14 committed Feb 15, 2014
1 parent 8a2ebe6 commit c4cd480
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 10 deletions.
23 changes: 21 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ object SparkListenerEvent {

/**
* Deserialize a SparkListenerEvent from JSON
* TODO: include newly added events!
*/
def fromJson(json: JValue): SparkListenerEvent = {
implicit val format = DefaultFormats
Expand All @@ -159,6 +158,8 @@ object SparkListenerEvent {
val jobStart = Utils.getFormattedClassName(SparkListenerJobStart)
val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd)
val shutdown = Utils.getFormattedClassName(SparkListenerShutdown)
val loadEnvironment = Utils.getFormattedClassName(SparkListenerLoadEnvironment)
val storageStatusFetch = Utils.getFormattedClassName(SparkListenerStorageStatusFetch)

(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
Expand All @@ -169,6 +170,8 @@ object SparkListenerEvent {
case `jobStart` => jobStartFromJson(json)
case `jobEnd` => jobEndFromJson(json)
case `shutdown` => SparkListenerShutdown
case `loadEnvironment` => loadEnvironmentFromJson(json)
case `storageStatusFetch` => storageStatusFetchFromJson(json)
}
}

Expand Down Expand Up @@ -219,6 +222,22 @@ object SparkListenerEvent {
(json \ "Job ID").extract[Int],
JobResult.fromJson(json \ "Job Result"))
}

private def loadEnvironmentFromJson(json: JValue) = {
implicit val format = DefaultFormats
new SparkListenerLoadEnvironment(
Utils.mapFromJson(json \ "JVM Information").toSeq,
Utils.mapFromJson(json \ "Spark Properties").toSeq,
Utils.mapFromJson(json \ "System Properties").toSeq,
Utils.mapFromJson(json \ "Classpath Entries").toSeq)
}

private def storageStatusFetchFromJson(json: JValue) = {
implicit val format = DefaultFormats
val storageStatusList =
(json \ "Storage Status List").extract[List[JValue]].map(StorageStatus.fromJson)
new SparkListenerStorageStatusFetch(storageStatusList)
}
}


Expand Down Expand Up @@ -320,7 +339,7 @@ private[spark] object StatsReportListener extends Logging {

def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
val stats = d.statCounter
val quantiles = d.getQuantiles(probabilities).map{formatNumber}
val quantiles = d.getQuantiles(probabilities).map(formatNumber)
logInfo(heading + stats)
logInfo(percentilesHeader)
logInfo("\t" + quantiles.mkString("\t"))
Expand Down
122 changes: 121 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package org.apache.spark.storage

import java.util.UUID
import org.apache.spark.scheduler.JsonSerializable
import org.apache.spark.util.Utils

import net.liftweb.json.JsonDSL._
import net.liftweb.json.JsonAST.JValue
import net.liftweb.json.DefaultFormats

/**
* Identifies a particular Block of data, usually associated with a single file.
Expand All @@ -46,43 +49,89 @@ private[spark] sealed abstract class BlockId extends JsonSerializable {
case _ => false
}

override def toJson = ("Name" -> name)
override def toJson = "Type" -> Utils.getFormattedClassName(this)
}

private[spark] case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
def name = "rdd_" + rddId + "_" + splitIndex

override def toJson = {
super.toJson ~
("RDD ID" -> rddId) ~
("Split Index" -> splitIndex)
}
}

private[spark]
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId

override def toJson = {
super.toJson ~
("Shuffle ID" -> shuffleId) ~
("Map ID" -> mapId) ~
("Reduce ID" -> reduceId)
}
}

private[spark] case class BroadcastBlockId(broadcastId: Long) extends BlockId {
def name = "broadcast_" + broadcastId

override def toJson = {
super.toJson ~
("Broadcast ID" -> broadcastId)
}
}

private[spark]
case class BroadcastHelperBlockId(broadcastId: BroadcastBlockId, hType: String) extends BlockId {
def name = broadcastId.name + "_" + hType

override def toJson = {
super.toJson ~
("Broadcast Block ID" -> broadcastId.toJson) ~
("Helper Type" -> hType)
}
}

private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId {
def name = "taskresult_" + taskId

override def toJson = {
super.toJson ~
("Task ID" -> taskId)
}
}

private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
def name = "input-" + streamId + "-" + uniqueId

override def toJson = {
super.toJson ~
("Stream ID" -> streamId) ~
("Unique ID" -> uniqueId)
}
}

/** Id associated with temporary data managed as blocks. Not serializable. */
private[spark] case class TempBlockId(id: UUID) extends BlockId {
def name = "temp_" + id

override def toJson = {
val UUIDJson = Utils.UUIDToJson(id)
super.toJson ~
("Temp ID" -> UUIDJson)
}
}

// Intended only for testing purposes
private[spark] case class TestBlockId(id: String) extends BlockId {
def name = "test_" + id

override def toJson = {
super.toJson ~
("Test ID" -> id)
}
}

private[spark] object BlockId {
Expand Down Expand Up @@ -113,4 +162,75 @@ private[spark] object BlockId {
case _ =>
throw new IllegalStateException("Unrecognized BlockId: " + id)
}

def fromJson(json: JValue): BlockId = {
implicit val format = DefaultFormats
val rddBlockId = Utils.getFormattedClassName(RDDBlockId)
val shuffleBlockId = Utils.getFormattedClassName(ShuffleBlockId)
val broadcastBlockId = Utils.getFormattedClassName(BroadcastBlockId)
val broadcastHelperBlockId = Utils.getFormattedClassName(BroadcastHelperBlockId)
val taskResultBlockId = Utils.getFormattedClassName(TaskResultBlockId)
val streamBlockId = Utils.getFormattedClassName(StreamBlockId)
val tempBlockId = Utils.getFormattedClassName(TempBlockId)
val testBlockId = Utils.getFormattedClassName(TestBlockId)

(json \ "Type").extract[String] match {
case `rddBlockId` => rddBlockIdFromJson(json)
case `shuffleBlockId` => shuffleBlockIdFromJson(json)
case `broadcastBlockId` => broadcastBlockIdFromJson(json)
case `broadcastHelperBlockId` => broadcastHelperBlockIdFromJson(json)
case `taskResultBlockId` => taskResultBlockIdFromJson(json)
case `streamBlockId` => streamBlockIdFromJson(json)
case `tempBlockId` => tempBlockIdFromJson(json)
case `testBlockId` => testBlockIdFromJson(json)
}
}

private def rddBlockIdFromJson(json: JValue) = {
implicit val format = DefaultFormats
new RDDBlockId(
(json \ "RDD ID").extract[Int],
(json \ "Split Index").extract[Int])
}

private def shuffleBlockIdFromJson(json: JValue) = {
implicit val format = DefaultFormats
new ShuffleBlockId(
(json \ "Shuffle ID").extract[Int],
(json \ "Map ID").extract[Int],
(json \ "Reduce ID").extract[Int])
}

private def broadcastBlockIdFromJson(json: JValue) = {
implicit val format = DefaultFormats
new BroadcastBlockId((json \ "Broadcast ID").extract[Long])
}

private def broadcastHelperBlockIdFromJson(json: JValue) = {
implicit val format = DefaultFormats
new BroadcastHelperBlockId(
broadcastBlockIdFromJson(json \ "Broadcast Block ID"),
(json \ "Helper Type").extract[String])
}

private def taskResultBlockIdFromJson(json: JValue) = {
implicit val format = DefaultFormats
new TaskResultBlockId((json \ "Task ID").extract[Long])
}

private def streamBlockIdFromJson(json: JValue) = {
implicit val format = DefaultFormats
new StreamBlockId(
(json \ "Stream ID").extract[Int],
(json \ "Unique ID").extract[Long])
}

private def tempBlockIdFromJson(json: JValue) = {
new TempBlockId(Utils.UUIDFromJson(json \ "Temp ID"))
}

private def testBlockIdFromJson(json: JValue) = {
implicit val format = DefaultFormats
new TestBlockId((json \ "Test ID").extract[String])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.scheduler.JsonSerializable

import net.liftweb.json.JsonDSL._
import net.liftweb.json.JsonAST.JValue
import net.liftweb.json.DefaultFormats

/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
Expand Down Expand Up @@ -321,6 +323,16 @@ object BlockManagerMasterActor {
}
}

case object BlockStatus {
def fromJson(json: JValue): BlockStatus = {
implicit val format = DefaultFormats
new BlockStatus(
StorageLevel.fromJson(json \ "Storage Level"),
(json \ "Memory Size").extract[Long],
(json \ "Disk Size").extract[Long])
}
}

class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}

import net.liftweb.json.JsonDSL._
import org.apache.spark.scheduler.JsonSerializable
import net.liftweb.json.JsonAST.JValue
import net.liftweb.json.DefaultFormats

/**
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
Expand Down Expand Up @@ -157,4 +159,13 @@ object StorageLevel {
storageLevelCache.putIfAbsent(level, level)
storageLevelCache.get(level)
}

def fromJson(json: JValue): StorageLevel = {
implicit val format = DefaultFormats
new StorageLevel(
(json \ "Use Disk").extract[Boolean],
(json \ "Use Memory").extract[Boolean],
(json \ "Deserialize").extract[Boolean],
(json \ "Replication").extract[Int])
}
}
21 changes: 18 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.scheduler.JsonSerializable

import net.liftweb.json.JsonDSL._
import net.liftweb.json.JsonAST._
import net.liftweb.json.DefaultFormats

private[spark]
case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
Expand Down Expand Up @@ -54,13 +55,27 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
})
("Block Manager ID" -> blockManagerId.toJson) ~
("Maximum Memory" -> maxMem) ~
("Memory Used" -> memUsed) ~
("Memory Remaining" -> memRemaining) ~
("Disk Used" -> diskUsed) ~
("Blocks" -> blocksJson)
}
}

private[spark]
case object StorageStatus {
def fromJson(json: JValue): StorageStatus = {
implicit val format = DefaultFormats
val blocks = (json \ "Blocks").extract[List[JValue]].map { block =>
val id = BlockId.fromJson(block \ "Block ID")
val status = BlockStatus.fromJson(block \ "Status")
(id, status)
}.toMap
new StorageStatus(
BlockManagerId.fromJson(json \ "Block Manager ID"),
(json \ "Maximum Memory").extract[Long],
blocks
)
}
}

case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
extends Ordered[RDDInfo] {
Expand Down
22 changes: 18 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ private[spark] object Utils extends Logging {
}.getOrElse(JNothing)
}

/** Convert a Json object to a java Properties */
/** Convert a JSON object to a java Properties */
def propertiesFromJson(json: JValue): Properties = {
val properties = new Properties()
mapFromJson(json).map { case (k, v) =>
Expand All @@ -905,7 +905,21 @@ private[spark] object Utils extends Logging {
properties
}

/** Convert a java stack trace to a Json object */
/** Convert a java UUID to a JSON object */
def UUIDToJson(id: UUID): JValue = {
("Least Significant Bits" -> id.getLeastSignificantBits) ~
("Most Significant Bits" -> id.getMostSignificantBits)
}

/** Convert a JSON object to a java UUID */
def UUIDFromJson(json: JValue): UUID = {
implicit val format = DefaultFormats
new UUID(
(json \ "Least Significant Bits").extract[Long],
(json \ "Most Significant Bits").extract[Long])
}

/** Convert a java stack trace to a JSON object */
def stackTraceToJson(stackTrace: Array[StackTraceElement]): JValue = {
JArray(stackTrace.map { case line =>
("Declaring Class" -> line.getClassName) ~
Expand All @@ -927,13 +941,13 @@ private[spark] object Utils extends Logging {
}.toArray
}

/** Convert an Exception to a Json object */
/** Convert an Exception to a JSON object */
def exceptionToJson(exception: Exception): JValue = {
("Message" -> exception.toString) ~
("Stack Trace" -> stackTraceToJson(exception.getStackTrace))
}

/** Convert a Json object to an Exception */
/** Convert a JSON object to an Exception */
def exceptionFromJson(json: JValue): Exception = {
implicit val format = DefaultFormats
val e = new Exception((json \ "Message").extract[String])
Expand Down

0 comments on commit c4cd480

Please sign in to comment.