Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose SparkListeners and relevant classes as DeveloperApi #648

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package org.apache.spark.storage

import java.util.UUID

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Identifies a particular Block of data, usually associated with a single file.
* A Block can be uniquely identified by its filename, but each type of Block has a different
* set of keys which produce its unique name.
*
* If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
*/
private[spark] sealed abstract class BlockId {
@DeveloperApi
sealed abstract class BlockId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just expose BlockId's as strings instead of revealing this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean exposing name and toString only?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean we should pass these around as String's in the events. Would that be a big change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense to give users some more semantics about this... it might be good to leave it as is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a fairly big change, since we technically don't need the BlockId classes anymore, as the strings are supposedly globally unique. I'm inclined to just leave it.

/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String

Expand All @@ -44,24 +48,29 @@ private[spark] sealed abstract class BlockId {
}
}

private[spark] case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
def name = "rdd_" + rddId + "_" + splitIndex
}

private[spark] case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
extends BlockId {
def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

private[spark] case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}

private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId {
@DeveloperApi
case class TaskResultBlockId(taskId: Long) extends BlockId {
def name = "taskresult_" + taskId
}

private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
@DeveloperApi
case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
def name = "input-" + streamId + "-" + uniqueId
}

Expand All @@ -75,7 +84,8 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
def name = "test_" + id
}

private[spark] object BlockId {
@DeveloperApi
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@ package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
* This class represent an unique identifier for a BlockManager.
* The first 2 constructors of this class is made private to ensure that
* BlockManagerId objects can be created only using the apply method in
* the companion object. This allows de-duplication of ID objects.
* Also, constructor parameters are private to ensure that parameters cannot
* be modified from outside this class.
*
* The first 2 constructors of this class is made private to ensure that BlockManagerId objects
* can be created only using the apply method in the companion object. This allows de-duplication
* of ID objects. Also, constructor parameters are private to ensure that parameters cannot be
* modified from outside this class.
*/
private[spark] class BlockManagerId private (
@DeveloperApi
class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
Expand Down Expand Up @@ -411,7 +412,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
}

private[spark] case class BlockStatus(
@DeveloperApi
case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to
* keep the data in memory in a serialized format, and whether to replicate the RDD partitions on
* multiple nodes.
*
* The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
* factory method of the singleton object (`StorageLevel(...)`).
*/
@DeveloperApi
class StorageLevel private(
private var useDisk_ : Boolean,
private var useMemory_ : Boolean,
Expand All @@ -54,9 +57,9 @@ class StorageLevel private(
assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")

if (useOffHeap) {
require(useDisk == false, "Off-heap storage level does not support using disk")
require(useMemory == false, "Off-heap storage level does not support using heap memory")
require(deserialized == false, "Off-heap storage level does not support deserialized storage")
require(!useDisk, "Off-heap storage level does not support using disk")
require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}

Expand Down Expand Up @@ -146,7 +149,7 @@ object StorageLevel {

/**
* :: DeveloperApi ::
* Create a new StorageLevel object without setting useOffHeap
* Create a new StorageLevel object without setting useOffHeap.
*/
@DeveloperApi
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
Expand All @@ -155,7 +158,7 @@ object StorageLevel {

/**
* :: DeveloperApi ::
* Create a new StorageLevel object
* Create a new StorageLevel object.
*/
@DeveloperApi
def apply(useDisk: Boolean, useMemory: Boolean,
Expand All @@ -164,15 +167,15 @@ object StorageLevel {

/**
* :: DeveloperApi ::
* Create a new StorageLevel object from its integer representation
* Create a new StorageLevel object from its integer representation.
*/
@DeveloperApi
def apply(flags: Int, replication: Int): StorageLevel =
getCachedStorageLevel(new StorageLevel(flags, replication))

/**
* :: DeveloperApi ::
* Read StorageLevel object from ObjectInput stream
* Read StorageLevel object from ObjectInput stream.
*/
@DeveloperApi
def apply(in: ObjectInput): StorageLevel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package org.apache.spark.storage

import scala.collection.mutable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._

/**
* A SparkListener that maintains executor storage status
* :: DeveloperApi ::
* A SparkListener that maintains executor storage status.
*/
private[spark] class StorageStatusListener extends SparkListener {
@DeveloperApi
class StorageStatusListener extends SparkListener {
private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()

def storageStatusList = executorIdToStorageStatus.values.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi

/** Storage information for each BlockManager. */
private[spark] class StorageStatus(
/**
* :: DeveloperApi ::
* Storage information for each BlockManager.
*/
@DeveloperApi
class StorageStatus(
val blockManagerId: BlockManagerId,
val maxMem: Long,
val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.ui.env

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.ui._

Expand All @@ -30,9 +31,11 @@ private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "envi
}

/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the EnvironmentTab
*/
private[ui] class EnvironmentListener extends SparkListener {
@DeveloperApi
class EnvironmentListener extends SparkListener {
var jvmInformation = Seq[(String, String)]()
var sparkProperties = Seq[(String, String)]()
var systemProperties = Seq[(String, String)]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.ui.exec
import scala.collection.mutable.HashMap

import org.apache.spark.ExceptionFailure
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.{SparkUI, WebUITab}
Expand All @@ -34,9 +35,11 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
}

/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the ExecutorsTab
*/
private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
@DeveloperApi
class ExecutorsListener(storageStatusListener: StorageStatusListener)
extends SparkListener {

val executorToTasksActive = HashMap[String, Int]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

package org.apache.spark.ui.jobs

/** class for reporting aggregated metrics for each executors in stageUI */
private[ui] class ExecutorSummary {
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Class for reporting aggregated metrics for each executor in stage UI.
*/
@DeveloperApi
class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable.{HashMap, ListBuffer}

import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId

/**
* :: DeveloperApi ::
* Tracks task-level information to be displayed in the UI.
*
* All access to the data structures in this class must be synchronized on the
* class, since the UI thread and the EventBus loop may otherwise be reading and
* updating the internal data structures concurrently.
*/
private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
@DeveloperApi
class JobProgressListener(conf: SparkConf) extends SparkListener {

import JobProgressListener._

Expand Down Expand Up @@ -246,7 +249,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {

}

private[ui] case class TaskUIData(
@DeveloperApi
case class TaskUIData(
taskInfo: TaskInfo,
taskMetrics: Option[TaskMetrics] = None,
exception: Option[ExceptionFailure] = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.ui.storage

import scala.collection.mutable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.ui._
import org.apache.spark.scheduler._
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
Expand All @@ -35,9 +36,11 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
}

/**
* A SparkListener that prepares information to be displayed on the BlockManagerUI
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
*/
private[ui] class StorageListener(storageStatusListener: StorageStatusListener)
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener)
extends SparkListener {

private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
Expand Down