From a5f765cabb9f8f7f465ad2cd3d5a64d9ad165928 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 5 May 2014 18:32:14 -0700 Subject: [PATCH] Expose SparkListeners and relevant classes as DeveloperApi Hopefully this can go into 1.0, as a few people on the user list have asked for this. Author: Andrew Or Closes #648 from andrewor14/expose-listeners and squashes the following commits: e45e1ef [Andrew Or] Add missing colons (minor) 350d643 [Andrew Or] Expose SparkListeners and relevant classes as DeveloperApi (cherry picked from commit ea10b3126167af3f50f7c2a70e1d942e839fcb66) Signed-off-by: Patrick Wendell --- .../org/apache/spark/storage/BlockId.scala | 24 +++++++++++++------ .../apache/spark/storage/BlockManagerId.scala | 15 +++++++----- .../storage/BlockManagerMasterActor.scala | 4 +++- .../apache/spark/storage/StorageLevel.scala | 17 +++++++------ .../spark/storage/StorageStatusListener.scala | 7 ++++-- .../apache/spark/storage/StorageUtils.scala | 9 +++++-- .../apache/spark/ui/env/EnvironmentTab.scala | 5 +++- .../apache/spark/ui/exec/ExecutorsTab.scala | 5 +++- .../spark/ui/jobs/ExecutorSummary.scala | 10 ++++++-- .../spark/ui/jobs/JobProgressListener.scala | 8 +++++-- .../apache/spark/ui/storage/StorageTab.scala | 7 ++++-- 11 files changed, 78 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index cffea28fbf794..42ec181b00bb3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -19,14 +19,18 @@ package org.apache.spark.storage import java.util.UUID +import org.apache.spark.annotation.DeveloperApi + /** + * :: DeveloperApi :: * Identifies a particular Block of data, usually associated with a single file. * A Block can be uniquely identified by its filename, but each type of Block has a different * set of keys which produce its unique name. * * If your BlockId should be serializable, be sure to add it to the BlockId.apply() method. */ -private[spark] sealed abstract class BlockId { +@DeveloperApi +sealed abstract class BlockId { /** A globally unique identifier for this Block. Can be used for ser/de. */ def name: String @@ -44,24 +48,29 @@ private[spark] sealed abstract class BlockId { } } -private[spark] case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { +@DeveloperApi +case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { def name = "rdd_" + rddId + "_" + splitIndex } -private[spark] case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) +@DeveloperApi +case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId } -private[spark] case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId { +@DeveloperApi +case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId { def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field) } -private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId { +@DeveloperApi +case class TaskResultBlockId(taskId: Long) extends BlockId { def name = "taskresult_" + taskId } -private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { +@DeveloperApi +case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { def name = "input-" + streamId + "-" + uniqueId } @@ -75,7 +84,8 @@ private[spark] case class TestBlockId(id: String) extends BlockId { def name = "test_" + id } -private[spark] object BlockId { +@DeveloperApi +object BlockId { val RDD = "rdd_([0-9]+)_([0-9]+)".r val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index be537d77309bc..b1585bd8199d1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -20,17 +20,20 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import java.util.concurrent.ConcurrentHashMap +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils /** + * :: DeveloperApi :: * This class represent an unique identifier for a BlockManager. - * The first 2 constructors of this class is made private to ensure that - * BlockManagerId objects can be created only using the apply method in - * the companion object. This allows de-duplication of ID objects. - * Also, constructor parameters are private to ensure that parameters cannot - * be modified from outside this class. + * + * The first 2 constructors of this class is made private to ensure that BlockManagerId objects + * can be created only using the apply method in the companion object. This allows de-duplication + * of ID objects. Also, constructor parameters are private to ensure that parameters cannot be + * modified from outside this class. */ -private[spark] class BlockManagerId private ( +@DeveloperApi +class BlockManagerId private ( private var executorId_ : String, private var host_ : String, private var port_ : Int, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 63fa5d3eb6541..98fa0df6ec289 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -28,6 +28,7 @@ import akka.actor.{Actor, ActorRef, Cancellable} import akka.pattern.ask import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -411,7 +412,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } -private[spark] case class BlockStatus( +@DeveloperApi +case class BlockStatus( storageLevel: StorageLevel, memSize: Long, diskSize: Long, diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index c9a52e0366d93..363de93e067b8 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -22,14 +22,17 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import org.apache.spark.annotation.DeveloperApi /** + * :: DeveloperApi :: * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on * multiple nodes. + * * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants * for commonly useful storage levels. To create your own storage level object, use the * factory method of the singleton object (`StorageLevel(...)`). */ +@DeveloperApi class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, @@ -54,9 +57,9 @@ class StorageLevel private( assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes") if (useOffHeap) { - require(useDisk == false, "Off-heap storage level does not support using disk") - require(useMemory == false, "Off-heap storage level does not support using heap memory") - require(deserialized == false, "Off-heap storage level does not support deserialized storage") + require(!useDisk, "Off-heap storage level does not support using disk") + require(!useMemory, "Off-heap storage level does not support using heap memory") + require(!deserialized, "Off-heap storage level does not support deserialized storage") require(replication == 1, "Off-heap storage level does not support multiple replication") } @@ -146,7 +149,7 @@ object StorageLevel { /** * :: DeveloperApi :: - * Create a new StorageLevel object without setting useOffHeap + * Create a new StorageLevel object without setting useOffHeap. */ @DeveloperApi def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, @@ -155,7 +158,7 @@ object StorageLevel { /** * :: DeveloperApi :: - * Create a new StorageLevel object + * Create a new StorageLevel object. */ @DeveloperApi def apply(useDisk: Boolean, useMemory: Boolean, @@ -164,7 +167,7 @@ object StorageLevel { /** * :: DeveloperApi :: - * Create a new StorageLevel object from its integer representation + * Create a new StorageLevel object from its integer representation. */ @DeveloperApi def apply(flags: Int, replication: Int): StorageLevel = @@ -172,7 +175,7 @@ object StorageLevel { /** * :: DeveloperApi :: - * Read StorageLevel object from ObjectInput stream + * Read StorageLevel object from ObjectInput stream. */ @DeveloperApi def apply(in: ObjectInput): StorageLevel = { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 7a174959037be..a6e6627d54e01 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -19,12 +19,15 @@ package org.apache.spark.storage import scala.collection.mutable +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ /** - * A SparkListener that maintains executor storage status + * :: DeveloperApi :: + * A SparkListener that maintains executor storage status. */ -private[spark] class StorageStatusListener extends SparkListener { +@DeveloperApi +class StorageStatusListener extends SparkListener { private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() def storageStatusList = executorIdToStorageStatus.values.toSeq diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 1eddd1cdc483b..6f3252a2f6d31 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -21,9 +21,14 @@ import scala.collection.Map import scala.collection.mutable import org.apache.spark.SparkContext +import org.apache.spark.annotation.DeveloperApi -/** Storage information for each BlockManager. */ -private[spark] class StorageStatus( +/** + * :: DeveloperApi :: + * Storage information for each BlockManager. + */ +@DeveloperApi +class StorageStatus( val blockManagerId: BlockManagerId, val maxMem: Long, val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) { diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala index 03b46e1bd59af..bbbe55ecf44a1 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -17,6 +17,7 @@ package org.apache.spark.ui.env +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.ui._ @@ -30,9 +31,11 @@ private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "envi } /** + * :: DeveloperApi :: * A SparkListener that prepares information to be displayed on the EnvironmentTab */ -private[ui] class EnvironmentListener extends SparkListener { +@DeveloperApi +class EnvironmentListener extends SparkListener { var jvmInformation = Seq[(String, String)]() var sparkProperties = Seq[(String, String)]() var systemProperties = Seq[(String, String)]() diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 5678bf34ac730..91d37b835b19d 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -20,6 +20,7 @@ package org.apache.spark.ui.exec import scala.collection.mutable.HashMap import org.apache.spark.ExceptionFailure +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.{SparkUI, WebUITab} @@ -34,9 +35,11 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut } /** + * :: DeveloperApi :: * A SparkListener that prepares information to be displayed on the ExecutorsTab */ -private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener) +@DeveloperApi +class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener { val executorToTasksActive = HashMap[String, Int]() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 1dfe1d4f1fa11..2aaf6329b792d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -17,8 +17,14 @@ package org.apache.spark.ui.jobs -/** class for reporting aggregated metrics for each executors in stageUI */ -private[ui] class ExecutorSummary { +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Class for reporting aggregated metrics for each executor in stage UI. + */ +@DeveloperApi +class ExecutorSummary { var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 0db4afa701b41..396cbcbc8d268 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -20,19 +20,22 @@ package org.apache.spark.ui.jobs import scala.collection.mutable.{HashMap, ListBuffer} import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success} +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId /** + * :: DeveloperApi :: * Tracks task-level information to be displayed in the UI. * * All access to the data structures in this class must be synchronized on the * class, since the UI thread and the EventBus loop may otherwise be reading and * updating the internal data structures concurrently. */ -private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { +@DeveloperApi +class JobProgressListener(conf: SparkConf) extends SparkListener { import JobProgressListener._ @@ -246,7 +249,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { } -private[ui] case class TaskUIData( +@DeveloperApi +case class TaskUIData( taskInfo: TaskInfo, taskMetrics: Option[TaskMetrics] = None, exception: Option[ExceptionFailure] = None) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 07ec297841f20..c4bb7aab50393 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -19,6 +19,7 @@ package org.apache.spark.ui.storage import scala.collection.mutable +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.ui._ import org.apache.spark.scheduler._ import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils} @@ -35,9 +36,11 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage" } /** - * A SparkListener that prepares information to be displayed on the BlockManagerUI + * :: DeveloperApi :: + * A SparkListener that prepares information to be displayed on the BlockManagerUI. */ -private[ui] class StorageListener(storageStatusListener: StorageStatusListener) +@DeveloperApi +class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener { private val _rddInfoMap = mutable.Map[Int, RDDInfo]()