Skip to content

Commit

Permalink
SPARK-1205: Clean up callSite/origin/generator.
Browse files Browse the repository at this point in the history
This patch removes the `generator` field and simplifies + documents
the tracking of callsites.

There are two places where we care about call sites, when a job is
run and when an RDD is created. This patch retains both of those
features but does a slight refactoring and renaming to make things
less confusing.

There was another feature of an rdd called the `generator` which was
by default the user class that in which the RDD was created. This is
used exclusively in the JobLogger. It been subsumed by the ability
to name a job group. The job logger can later be refectored to
read the job group directly (will require some work) but for now
this just preserves the default logged value of the user class.
I'm not sure any users ever used the ability to override this.

Author: Patrick Wendell <[email protected]>

Closes apache#106 from pwendell/callsite and squashes the following commits:

fc1d009 [Patrick Wendell] Compile fix
e17fb76 [Patrick Wendell] Review feedback: callSite -> creationSite
62e77ef [Patrick Wendell] Review feedback
576e60b [Patrick Wendell] SPARK-1205: Clean up callSite/origin/generator.
  • Loading branch information
pwendell committed Mar 10, 2014
1 parent a59419c commit 2a51617
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 38 deletions.
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -832,13 +832,12 @@ class SparkContext(
setLocalProperty("externalCallSite", null)
}

/**
* Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version.
*/
private[spark] def getCallSite(): String = {
val callSite = getLocalProperty("externalCallSite")
if (callSite == null) {
Utils.formatSparkCallSite
} else {
callSite
}
Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo())
}

/**
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))

def generator: String = rdd.generator

override def toString = rdd.toString

/** Assign a name to this RDD */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.api.java

import java.util.{Comparator, List => JList}

import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

def name(): String = rdd.name

/** Reset generator */
def setGenerator(_generator: String) = {
rdd.setGenerator(_generator)
}
}
18 changes: 4 additions & 14 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag](
this
}

/** User-defined generator of this RDD*/
@transient var generator = Utils.getCallSiteInfo.firstUserClass

/** Reset generator*/
def setGenerator(_generator: String) = {
generator = _generator
}

/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
Expand Down Expand Up @@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag](

private var storageLevel: StorageLevel = StorageLevel.NONE

/** Record user function generating this RDD. */
@transient private[spark] val origin = sc.getCallSite()
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo)

private[spark] def elementClassTag: ClassTag[T] = classTag[T]

Expand Down Expand Up @@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag](
}

override def toString: String = "%s%s[%d] at %s".format(
Option(name).map(_ + " ").getOrElse(""),
getClass.getSimpleName,
id,
origin)
Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)

def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class DAGScheduler(
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
}
stage
Expand Down
10 changes: 3 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String)
* @param indent Indent number before info
*/
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE"
val rddInfo =
if (rdd.getStorageLevel != StorageLevel.NONE) {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
rdd.origin + " " + rdd.generator
} else {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
rdd.origin + " " + rdd.generator
}
s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}"
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] =>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[spark] class Stage(
id
}

val name = callSite.getOrElse(rdd.origin)
val name = callSite.getOrElse(rdd.getCreationSite)

override def toString = "Stage " + id

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,8 @@ private[spark] object Utils extends Logging {
new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
}

def formatSparkCallSite = {
val callSiteInfo = getCallSiteInfo
/** Returns a printable version of the call site info suitable for logs. */
def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = {
"%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
callSiteInfo.firstUserLine)
}
Expand Down

0 comments on commit 2a51617

Please sign in to comment.