From cd7a465e864670dc3f5311101679261ba202ac5f Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 5 Apr 2014 15:47:12 -0700 Subject: [PATCH] Code review feedback --- .../src/main/scala/org/apache/spark/Aggregator.scala | 2 +- .../main/scala/org/apache/spark/FutureAction.scala | 2 ++ .../org/apache/spark/InterruptibleIterator.scala | 2 +- .../main/scala/org/apache/spark/SparkContext.scala | 11 ++++++++++- .../apache/spark/broadcast/BroadcastFactory.scala | 2 ++ core/src/main/scala/org/apache/spark/package.scala | 9 +++++++++ .../scala/org/apache/spark/rdd/CoGroupedRDD.scala | 8 ++++++-- .../main/scala/org/apache/spark/rdd/EmptyRDD.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 12 ++++++++++-- .../scala/org/apache/spark/scheduler/JobLogger.scala | 2 ++ .../scala/org/apache/spark/scheduler/SplitInfo.scala | 1 + .../org/apache/spark/util/BoundedPriorityQueue.scala | 4 +--- .../main/scala/org/apache/spark/util/Vector.scala | 1 + .../main/scala/org/apache/spark/graphx/package.scala | 3 ++- .../org/apache/spark/tools}/StoragePerfTester.scala | 2 +- 15 files changed, 50 insertions(+), 13 deletions(-) rename {core/src/main/scala/org/apache/spark/storage => tools/src/main/scala/org/apache/spark/tools}/StoragePerfTester.scala (99%) diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 32c0e8228705c..f31a6318f69a6 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -21,13 +21,13 @@ import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap} /** * DEVELOPER API - UNSTABLE - * * A set of functions used to aggregate data. * * @param createCombiner function to create the initial value of the aggregation. * @param mergeValue function to merge a new value into the aggregation result. * @param mergeCombiners function to merge outputs from multiple mergeValue function. */ + case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 392150e682c47..53ec7d129d7fa 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -86,6 +86,8 @@ trait FutureAction[T] extends Future[T] { /** + * EXPERIMENTAL API + * * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. */ diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala index 9b1601d5b95fa..fd1802ba2f984 100644 --- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala +++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala @@ -21,7 +21,7 @@ package org.apache.spark * An iterator that wraps around an existing iterator to provide task killing functionality. * It works by checking the interrupted flag in [[TaskContext]]. */ -class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) +private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) extends Iterator[T] { def hasNext: Boolean = !context.interrupted && delegate.hasNext diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b23accbbb9410..e579c503362b5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -184,7 +184,7 @@ class SparkContext( jars.foreach(addJar) } - def warnSparkMem(value: String): String = { + private def warnSparkMem(value: String): String = { logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " + "deprecated, please use spark.executor.memory instead.") value @@ -665,6 +665,11 @@ class SparkContext( postEnvironmentUpdate() } + /** + * DEVELOPER API - UNSTABLE + * + * Register a listener to receive up-calls from events that happen during execution. + */ def addSparkListener(listener: SparkListener) { listenerBus.addListener(listener) } @@ -974,6 +979,8 @@ class SparkContext( } /** + * DEVELOPER API - UNSTABLE + * * Run a job that can return approximate results. */ def runApproximateJob[T, U, R]( @@ -991,6 +998,8 @@ class SparkContext( } /** + * EXPERIMENTAL API + * * Submit a job for execution and return a FutureJob holding the result. */ def submitJob[T, U, R]( diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index 6beecaeced5be..dda8387a16cbe 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -21,6 +21,8 @@ import org.apache.spark.SecurityManager import org.apache.spark.SparkConf /** + * DEVELOPER API - UNSTABLE + * * An interface for all the broadcast implementations in Spark (to allow * multiple broadcast implementations). SparkContext uses a user-specified * BroadcastFactory implementation to instantiate a particular broadcast for the diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 2625a7f6a575a..344397cb5b2af 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -32,6 +32,15 @@ package org.apache * * Java programmers should reference the [[spark.api.java]] package * for Spark programming APIs in Java. + * + * Classes and methods marked with EXPERIMENTAL API are + * user-facing features which have not been officially adopted by the Spark project. These are + * subject to change or removal in minor releases. + * + * Classes and methods marked with DEVELOPER API - UNSTABLE + * are intended for advanced users want to extend Spark through lower level interfaces. These are + * subject to changes or removal in minor releases. + * */ package object spark { // For package docs only diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index ad35f43dd9825..408071262d158 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -51,13 +51,17 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep] } /** + * DEVELOPER API - UNSTABLE + * * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a * tuple with the list of values for that key. * + * Note: This is an internal API. We recommend users use RDD.coGroup(...) instead of + * instantiating this directly. + * @param rdds parent RDDs. - * @param part partitioner used to partition the shuffle output. + * @param part partitioner used to partition the shuffle output */ -private[spark] class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala index 6f28ac3ed37f6..e4bd9ebf4fb68 100644 --- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, SparkContext, TaskContext} /** - * An RDD that is empty, i.e. has no element in it. + * An RDD that has no partitions and no elements.. */ private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ce2b8ac27206b..e269deefa9d17 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -513,6 +513,8 @@ abstract class RDD[T: ClassTag]( } /** + * DEVELOPER API - UNSTABLE + * * Return a new RDD by applying a function to each partition of this RDD. This is a variant of * mapPartitions that also passes the TaskContext into the closure. */ @@ -775,7 +777,9 @@ abstract class RDD[T: ClassTag]( def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum /** - * (Experimental) Approximate version of count() that returns a potentially incomplete result + * EXPERIMENTAL API + * + * Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. */ def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { @@ -821,7 +825,9 @@ abstract class RDD[T: ClassTag]( } /** - * (Experimental) Approximate version of countByValue(). + * EXPERIMENTAL API + * + * Approximate version of countByValue(). */ def countByValueApprox( timeout: Long, @@ -843,6 +849,8 @@ abstract class RDD[T: ClassTag]( } /** + * EXPERIMENTAL API + * * Return approximate number of distinct elements in the RDD. * * The accuracy of approximation can be controlled through the relative standard deviation diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 5cecf9416b32c..8fcc9c9d855b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -28,6 +28,8 @@ import org.apache.spark._ import org.apache.spark.executor.TaskMetrics /** + * DEVELOPER API - UNSTABLE + * * A logger class to record runtime information for jobs in Spark. This class outputs one log file * for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass * of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala index 5b40a3eb29b30..f742291deb9e9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala @@ -21,6 +21,7 @@ import collection.mutable.ArrayBuffer // information about a specific split instance : handles both split instances. // So that we do not need to worry about the differences. +private[spark] class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String, val length: Long, val underlyingSplit: Any) { override def toString(): String = { diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala index 6423d6948f2d7..b9f4a5d720b93 100644 --- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala +++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala @@ -24,13 +24,11 @@ import scala.collection.JavaConverters._ import scala.collection.generic.Growable /** - * DEVELOPER API - UNSTABLE - * * Bounded priority queue. This class wraps the original PriorityQueue * class and modifies it such that only the top K elements are retained. * The top K elements are defined by an implicit Ordering[A]. */ -class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A]) +private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A]) extends Iterable[A] with Growable[A] with Serializable { private val underlying = new JPriorityQueue[A](maxSize, ord) diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala index dc4b8f253f259..643366db9c78b 100644 --- a/core/src/main/scala/org/apache/spark/util/Vector.scala +++ b/core/src/main/scala/org/apache/spark/util/Vector.scala @@ -21,6 +21,7 @@ import scala.util.Random import org.apache.spark.util.random.XORShiftRandom +@deprecated("Use Vector from Spark's mllib.linalg package instead.", "1.0.0") class Vector(val elements: Array[Double]) extends Serializable { def length = elements.length diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala index 6d0e3cde812b1..099e19a161a40 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/package.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala @@ -22,7 +22,8 @@ import org.apache.spark.util.collection.OpenHashSet /** * ALPHA COMPONENT * - * GraphX is a graph processing framework built on top of Spark. */ + * GraphX is a graph processing framework built on top of Spark. + */ package object graphx { /** * A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala similarity index 99% rename from core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala rename to tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 5ba0363a14b3d..8e8c35615a711 100644 --- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.storage +package org.apache.spark.tools import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong