Skip to content

Commit

Permalink
Embed storage status and RDD info in Task events
Browse files Browse the repository at this point in the history
This commit achieves three main things. First and foremost, it embeds the information
from the SparkListenerFetchStorageStatus and SparkListenerGetRDDInfo events into events
that are more descriptive of the SparkListenerInterface. In particular, every Task now
maintains a list of blocks whose storage status have been updated as a result of the task.
Previously, this information is retrieved from fetching storage status from the driver,
an action arbitrarily associated with a stage. This change involves keeping track of
what blocks are dropped during each call to an RDD persist. A big TODO is to also capture
the behavior of an RDD unpersist in a SparkListenerEvent.

Second, the SparkListenerEvent interface now handles the dynamic nature of Executors.
In particular, a new event, SparkListenerExecutorStateChange, is introduced, which triggers
a storage status fetch from the driver. The purpose of this is mainly to decouple fetching
storage status from the driver from the Stage. Note that storage status is not ready until
the remote BlockManagers have been registered, so this involves attaching a registration
listener to the BlockManagerMasterActor.

Third, changes in environment properties is now supported. This accounts for the fact that
the user can invoke sc.addFile and sc.addJar in his/her own application, which should be
reflected appropriately on the EnvironmentUI. In the previous implementation, coupling this
information with application start prevents this from happening.

Other relatively minor changes include: 1) Refactoring BlockStatus and BlockManagerInfo to
not be a part of the BlockManagerMasterActor object, 2) Formatting changes, especially those
involving multi-line arguments, and 3) Making all UI widgets and listeners private[ui] instead
of private[spark].
  • Loading branch information
andrewor14 committed Feb 26, 2014
1 parent 6631c02 commit bbe3501
Show file tree
Hide file tree
Showing 36 changed files with 754 additions and 544 deletions.
16 changes: 12 additions & 4 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.storage.{BlockManager, StorageLevel, RDDBlockId}
import org.apache.spark.rdd.RDD


/** Spark class responsible for passing RDDs split contents to the BlockManager and making
sure a node doesn't load two copies of an RDD at once.
*/
/**
* Spark class responsible for passing RDDs split contents to the BlockManager and making
* sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD splits that are being computed/loaded. */
Expand Down Expand Up @@ -69,11 +71,17 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)

// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
val updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)

// Update task metrics to include any updated blocks
val metrics = context.taskMetrics
metrics.updatedBlocks = Some(updatedBlocks ++ metrics.updatedBlocks.getOrElse(Seq()))

elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading.synchronized {
Expand Down
49 changes: 37 additions & 12 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class SparkContext(

// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
jars.foreach { jar => addJar(jar, updateEnvironment = false) }
}

private[spark] val executorMemory = conf.getOption("spark.executor.memory")
Expand Down Expand Up @@ -204,14 +204,14 @@ class SparkContext(
taskScheduler.start()

@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()

// Start the UI before the DAG scheduler, because the UI listens for Spark events
ui.start()

// Trigger application start
val environmentDetails = SparkEnv.environmentDetails(this)
val applicationStart = new SparkListenerApplicationStart(environmentDetails)
dagScheduler.post(applicationStart)
dagScheduler.start()
dagScheduler.post(new SparkListenerApplicationStart(appName))

updateEnvironmentProperties()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
Expand Down Expand Up @@ -631,7 +631,7 @@ class SparkContext(
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(path)` to find its download location.
*/
def addFile(path: String) {
def addFile(path: String, updateEnvironment: Boolean = true) {
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
Expand All @@ -644,6 +644,9 @@ class SparkContext(
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
if (updateEnvironment) {
updateEnvironmentProperties()
}
}

def addSparkListener(listener: SparkListener) {
Expand Down Expand Up @@ -711,8 +714,11 @@ class SparkContext(
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
def clearFiles() {
def clearFiles(updateEnvironment: Boolean = true) {
addedFiles.clear()
if (updateEnvironment) {
updateEnvironmentProperties()
}
}

/**
Expand All @@ -730,7 +736,7 @@ class SparkContext(
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
def addJar(path: String, updateEnvironment: Boolean = true) {
if (path == null) {
logWarning("null specified as parameter to addJar")
} else {
Expand Down Expand Up @@ -774,14 +780,20 @@ class SparkContext(
logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
}
}
if (updateEnvironment) {
updateEnvironmentProperties()
}
}

/**
* Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
* any new nodes.
*/
def clearJars() {
def clearJars(updateEnvironment: Boolean = true) {
addedJars.clear()
if (updateEnvironment) {
updateEnvironmentProperties()
}
}

/** Shut down the SparkContext. */
Expand All @@ -798,8 +810,8 @@ class SparkContext(
// TODO: Cache.stop()?
env.stop()
// Clean up locally linked files
clearFiles()
clearJars()
clearFiles(updateEnvironment = false)
clearJars(updateEnvironment = false)
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
Expand Down Expand Up @@ -1022,6 +1034,19 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/** Update environment properties and post the corresponding event to the DAG scheduler */
private def updateEnvironmentProperties() {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails =
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
val environmentUpdate = new SparkListenerEnvironmentUpdate(environmentDetails)

// In case the DAG scheduler is not ready yet, first check whether its reference is valid
Option(dagScheduler).foreach(_.post(environmentUpdate))
}

/** Called by MetadataCleaner to clean up the persistentRdds map periodically */
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
Expand Down
48 changes: 29 additions & 19 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import scala.util.Properties

import akka.actor._

import com.google.common.collect.MapMaker

import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
import org.apache.spark.network.ConnectionManager
import org.apache.spark.serializer.{Serializer, SerializerManager}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.api.python.PythonWorkerFactory

import com.google.common.collect.MapMaker

/**
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
Expand Down Expand Up @@ -167,9 +167,18 @@ object SparkEnv extends Logging {
}
}

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf)), conf)
// Listen for block manager registration
val blockManagerListener = new BlockManagerRegistrationListener
lazy val blockManagerMasterActor = {
val actor = new BlockManagerMasterActor(isLocal, conf)
actor.registerListener(blockManagerListener)
actor
}

val blockManagerMaster =
new BlockManagerMaster(registerOrLookup("BlockManagerMaster", blockManagerMasterActor), conf)
blockManagerMaster.registrationListener = Some(blockManagerListener)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf)

Expand Down Expand Up @@ -243,23 +252,25 @@ object SparkEnv extends Logging {
* attributes as a sequence of KV pairs.
*/
private[spark]
def environmentDetails(sc: SparkContext): Map[String, Seq[(String, String)]] = {
def environmentDetails(
conf: SparkConf,
schedulingMode: String,
addedJars: Seq[String],
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {

val jvmInformation = Seq(
("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
("Java Home", Properties.javaHome),
("Scala Version", Properties.versionString),
("Scala Home", Properties.scalaHome)
).sorted

// Spark properties, including scheduling mode and app name whether or not they are configured
// Spark properties, including scheduling mode whether or not it is configured
var additionalFields = Seq[(String, String)]()
sc.conf.getOption("spark.scheduler.mode").getOrElse {
additionalFields ++= Seq(("spark.scheduler.mode", sc.getSchedulingMode.toString))
}
sc.conf.getOption("spark.app.name").getOrElse {
additionalFields ++= Seq(("spark.app.name", sc.appName))
conf.getOption("spark.scheduler.mode").getOrElse {
additionalFields ++= Seq(("spark.scheduler.mode", schedulingMode))
}
val sparkProperties = sc.conf.getAll.sorted ++ additionalFields
val sparkProperties = conf.getAll.sorted ++ additionalFields

val systemProperties = System.getProperties.iterator.toSeq
val classPathProperty = systemProperties.find { case (k, v) =>
Expand All @@ -273,12 +284,11 @@ object SparkEnv extends Logging {

// Class paths including all added jars and files
val classPathEntries = classPathProperty._2
.split(sc.conf.get("path.separator", ":"))
.split(conf.get("path.separator", ":"))
.filterNot(e => e.isEmpty)
.map(e => (e, "System Classpath"))
val addedJars = sc.addedJars.iterator.toSeq.map{ case (path, _) => (path, "Added By User") }
val addedFiles = sc.addedFiles.iterator.toSeq.map{ case (path, _) => (path, "Added By User") }
val classPaths = (addedJars ++ addedFiles ++ classPathEntries).sorted
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted

Map[String, Seq[(String, String)]](
"JVM Information" -> jvmInformation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.executor

import org.apache.spark.storage.{BlockId, BlockStatus}

class TaskMetrics extends Serializable {
/**
* Host's name the task runs on
Expand Down Expand Up @@ -68,6 +70,11 @@ class TaskMetrics extends Serializable {
* here
*/
var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None

/**
* If blocks have been updated as a result of this task, collect the statuses of this blocks here
*/
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
}

object TaskMetrics {
Expand Down
Loading

0 comments on commit bbe3501

Please sign in to comment.