diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cdc0e5a34240e..745e3fa4e85f6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -832,13 +832,12 @@ class SparkContext( setLocalProperty("externalCallSite", null) } + /** + * Capture the current user callsite and return a formatted version for printing. If the user + * has overridden the call site, this will return the user's version. + */ private[spark] def getCallSite(): String = { - val callSite = getLocalProperty("externalCallSite") - if (callSite == null) { - Utils.formatSparkCallSite - } else { - callSite - } + Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo()) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 91bf404631f49..01d9357a2556d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) - def generator: String = rdd.generator - override def toString = rdd.toString /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index af0114bee3f49..a89419bbd10e7 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -19,7 +19,6 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList} -import scala.Tuple2 import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name - /** Reset generator */ - def setGenerator(_generator: String) = { - rdd.setGenerator(_generator) - } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3fe56963e0008..4afa7523dd802 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag]( this } - /** User-defined generator of this RDD*/ - @transient var generator = Utils.getCallSiteInfo.firstUserClass - - /** Reset generator*/ - def setGenerator(_generator: String) = { - generator = _generator - } - /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not @@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE - /** Record user function generating this RDD. */ - @transient private[spark] val origin = sc.getCallSite() + /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ + @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo + private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo) private[spark] def elementClassTag: ClassTag[T] = classTag[T] @@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag]( } override def toString: String = "%s%s[%d] at %s".format( - Option(name).map(_ + " ").getOrElse(""), - getClass.getSimpleName, - id, - origin) + Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite) def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dc5b25d845dc2..d83d0341c61ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -279,7 +279,7 @@ class DAGScheduler( } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") + logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size) } stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 80f9ec7d03007..01cbcc390c6cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String) * @param indent Indent number before info */ protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { + val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE" val rddInfo = - if (rdd.getStorageLevel != StorageLevel.NONE) { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " + - rdd.origin + " " + rdd.generator - } else { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " + - rdd.origin + " " + rdd.generator - } + s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " + + s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}" jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach { case shufDep: ShuffleDependency[_, _] => diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index a78b0186b9eab..5c1fc30e4a557 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -100,7 +100,7 @@ private[spark] class Stage( id } - val name = callSite.getOrElse(rdd.origin) + val name = callSite.getOrElse(rdd.getCreationSite) override def toString = "Stage " + id diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ac376fc403ada..38a275d438959 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -719,8 +719,8 @@ private[spark] object Utils extends Logging { new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) } - def formatSparkCallSite = { - val callSiteInfo = getCallSiteInfo + /** Returns a printable version of the call site info suitable for logs. */ + def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = { "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile, callSiteInfo.firstUserLine) }