Skip to content

Commit

Permalink
Code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Apr 5, 2014
1 parent 2f706f1 commit cd7a465
Show file tree
Hide file tree
Showing 15 changed files with 50 additions and 13 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
* <span class="badge badge-red">DEVELOPER API - UNSTABLE</span>
*
* A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/

case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ trait FutureAction[T] extends Future[T] {


/**
* <span class="badge badge-red">EXPERIMENTAL API</span>
*
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.spark
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in [[TaskContext]].
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {

def hasNext: Boolean = !context.interrupted && delegate.hasNext
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class SparkContext(
jars.foreach(addJar)
}

def warnSparkMem(value: String): String = {
private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
Expand Down Expand Up @@ -665,6 +665,11 @@ class SparkContext(
postEnvironmentUpdate()
}

/**
* <span class="badge badge-red">DEVELOPER API - UNSTABLE</span>
*
* Register a listener to receive up-calls from events that happen during execution.
*/
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
Expand Down Expand Up @@ -974,6 +979,8 @@ class SparkContext(
}

/**
* <span class="badge badge-red">DEVELOPER API - UNSTABLE</span>
*
* Run a job that can return approximate results.
*/
def runApproximateJob[T, U, R](
Expand All @@ -991,6 +998,8 @@ class SparkContext(
}

/**
* <span class="badge badge-red">EXPERIMENTAL API</span>
*
* Submit a job for execution and return a FutureJob holding the result.
*/
def submitJob[T, U, R](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.spark.SecurityManager
import org.apache.spark.SparkConf

/**
* <span class="badge badge-red">DEVELOPER API - UNSTABLE</span>
*
* An interface for all the broadcast implementations in Spark (to allow
* multiple broadcast implementations). SparkContext uses a user-specified
* BroadcastFactory implementation to instantiate a particular broadcast for the
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ package org.apache
*
* Java programmers should reference the [[spark.api.java]] package
* for Spark programming APIs in Java.
*
* Classes and methods marked with <span class="badge badge-red">EXPERIMENTAL API</span> are
* user-facing features which have not been officially adopted by the Spark project. These are
* subject to change or removal in minor releases.
*
* Classes and methods marked with <span class="badge badge-red">DEVELOPER API - UNSTABLE</span>
* are intended for advanced users want to extend Spark through lower level interfaces. These are
* subject to changes or removal in minor releases.
*
*/
package object spark {
// For package docs only
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
}

/**
* <span class="badge badge-red">DEVELOPER API - UNSTABLE</span>
*
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
*
* Note: This is an internal API. We recommend users use RDD.coGroup(...) instead of
* instantiating this directly.
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
* @param part partitioner used to partition the shuffle output
*/
private[spark]
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, SparkContext, TaskContext}

/**
* An RDD that is empty, i.e. has no element in it.
* An RDD that has no partitions and no elements..
*/
private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {

Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,8 @@ abstract class RDD[T: ClassTag](
}

/**
* <span class="badge badge-red">DEVELOPER API - UNSTABLE</span>
*
* Return a new RDD by applying a function to each partition of this RDD. This is a variant of
* mapPartitions that also passes the TaskContext into the closure.
*/
Expand Down Expand Up @@ -775,7 +777,9 @@ abstract class RDD[T: ClassTag](
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

/**
* (Experimental) Approximate version of count() that returns a potentially incomplete result
* <span class="badge badge-red">EXPERIMENTAL API</span>
*
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
Expand Down Expand Up @@ -821,7 +825,9 @@ abstract class RDD[T: ClassTag](
}

/**
* (Experimental) Approximate version of countByValue().
* <span class="badge badge-red">EXPERIMENTAL API</span>
*
* Approximate version of countByValue().
*/
def countByValueApprox(
timeout: Long,
Expand All @@ -843,6 +849,8 @@ abstract class RDD[T: ClassTag](
}

/**
* <span class="badge badge-red">EXPERIMENTAL API</span>
*
* Return approximate number of distinct elements in the RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.spark._
import org.apache.spark.executor.TaskMetrics

/**
* <span class="badge badge-red">DEVELOPER API - UNSTABLE</span>
*
* A logger class to record runtime information for jobs in Spark. This class outputs one log file
* for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass
* of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import collection.mutable.ArrayBuffer

// information about a specific split instance : handles both split instances.
// So that we do not need to worry about the differences.
private[spark]
class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String,
val length: Long, val underlyingSplit: Any) {
override def toString(): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ import scala.collection.JavaConverters._
import scala.collection.generic.Growable

/**
* <span class="badge badge-red">DEVELOPER API - UNSTABLE</span>
*
* Bounded priority queue. This class wraps the original PriorityQueue
* class and modifies it such that only the top K elements are retained.
* The top K elements are defined by an implicit Ordering[A].
*/
class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
extends Iterable[A] with Growable[A] with Serializable {

private val underlying = new JPriorityQueue[A](maxSize, ord)
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/util/Vector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.util.Random

import org.apache.spark.util.random.XORShiftRandom

@deprecated("Use Vector from Spark's mllib.linalg package instead.", "1.0.0")
class Vector(val elements: Array[Double]) extends Serializable {
def length = elements.length

Expand Down
3 changes: 2 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.spark.util.collection.OpenHashSet
/**
* <span class="badge badge-red">ALPHA COMPONENT</span>
*
* GraphX is a graph processing framework built on top of Spark. */
* GraphX is a graph processing framework built on top of Spark.
*/
package object graphx {
/**
* A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.storage
package org.apache.spark.tools

import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
Expand Down

0 comments on commit cd7a465

Please sign in to comment.