diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index ceead59b79ed6..59fdf659c9e11 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,15 +17,18 @@
package org.apache.spark
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
/**
+ * :: DeveloperApi ::
* A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
+@DeveloperApi
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index 1cd629c15bd46..2c31cc20211ff 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -17,19 +17,24 @@
package org.apache.spark
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
/**
+ * :: DeveloperApi ::
* Base class for dependencies.
*/
+@DeveloperApi
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
/**
+ * :: DeveloperApi ::
* Base class for dependencies where each partition of the parent RDD is used by at most one
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
+@DeveloperApi
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* Get the parent partitions for a child partition.
@@ -41,6 +46,7 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
+ * :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
@@ -48,6 +54,7 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
+@DeveloperApi
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
@@ -61,20 +68,24 @@ class ShuffleDependency[K, V](
/**
+ * :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
+@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}
/**
+ * :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
+@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 2eec09cd1c795..1e4dec86a0530 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -21,13 +21,16 @@ import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try
+import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
/**
+ * :: Experimental ::
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
+@Experimental
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
@@ -84,9 +87,11 @@ trait FutureAction[T] extends Future[T] {
/**
+ * :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
+@Experimental
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {
@@ -148,10 +153,12 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
/**
+ * :: Experimental ::
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
+@Experimental
class ComplexFutureAction[T] extends FutureAction[T] {
// Pointer to the thread that is executing the action. It is set when the action is run.
diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
index 9b1601d5b95fa..fd1802ba2f984 100644
--- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
+++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
@@ -21,7 +21,7 @@ package org.apache.spark
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in [[TaskContext]].
*/
-class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
+private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {
def hasNext: Boolean = !context.interrupted && delegate.hasNext
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 7423082e34f47..e5e15617acb10 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -21,11 +21,18 @@ import org.apache.log4j.{LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder
+import org.apache.spark.annotation.DeveloperApi
+
/**
+ * :: DeveloperApi ::
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
* log level is enabled.
+ *
+ * NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility.
+ * This will likely be changed or removed in future releases.
*/
+@DeveloperApi
trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
index dff665cae6cb6..e50b9ac2291f9 100644
--- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala
+++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
@@ -23,6 +23,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable
+import org.apache.spark.annotation.DeveloperApi
+
+@DeveloperApi
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d7124616d3bfb..f7750514ae13d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
@@ -48,22 +49,35 @@ import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
/**
+ * :: DeveloperApi ::
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
- * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
- * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
- * from a list of input files or InputFormats for the application.
*/
-class SparkContext(
- config: SparkConf,
- // This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
- // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
- // contains a map from hostname to a list of input format splits on the host.
- val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
- extends Logging {
+
+@DeveloperApi
+class SparkContext(config: SparkConf) extends Logging {
+
+ // This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
+ // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
+ // contains a map from hostname to a list of input format splits on the host.
+ private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()
+
+ /**
+ * :: DeveloperApi ::
+ * Alternative constructor for setting preferred locations where Spark will create executors.
+ *
+ * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Ca
+ * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
+ * from a list of input files or InputFormats for the application.
+ */
+ @DeveloperApi
+ def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
+ this(config)
+ this.preferredNodeLocationData = preferredNodeLocationData
+ }
/**
* Alternative constructor that allows setting common Spark properties directly
@@ -93,10 +107,45 @@ class SparkContext(
environment: Map[String, String] = Map(),
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
{
- this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
- preferredNodeLocationData)
+ this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
+ this.preferredNodeLocationData = preferredNodeLocationData
}
+ // NOTE: The below constructors could be consolidated using default arguments. Due to
+ // Scala bug SI-8479, however, this causes the compile step to fail when generating docs.
+ // Until we have a good workaround for that bug the constructors remain broken out.
+
+ /**
+ * Alternative constructor that allows setting common Spark properties directly
+ *
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI.
+ */
+ private[spark] def this(master: String, appName: String) =
+ this(master, appName, null, Nil, Map(), Map())
+
+ /**
+ * Alternative constructor that allows setting common Spark properties directly
+ *
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI.
+ * @param sparkHome Location where Spark is installed on cluster nodes.
+ */
+ private[spark] def this(master: String, appName: String, sparkHome: String) =
+ this(master, appName, sparkHome, Nil, Map(), Map())
+
+ /**
+ * Alternative constructor that allows setting common Spark properties directly
+ *
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI.
+ * @param sparkHome Location where Spark is installed on cluster nodes.
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ */
+ private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
+ this(master, appName, sparkHome, jars, Map(), Map())
+
private[spark] val conf = config.clone()
/**
@@ -189,7 +238,7 @@ class SparkContext(
jars.foreach(addJar)
}
- def warnSparkMem(value: String): String = {
+ private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
@@ -653,6 +702,9 @@ class SparkContext(
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)
+ /** Get an RDD that has no partitions or elements. */
+ def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)
+
// Methods for creating shared variables
/**
@@ -716,6 +768,11 @@ class SparkContext(
postEnvironmentUpdate()
}
+ /**
+ * :: DeveloperApi ::
+ * Register a listener to receive up-calls from events that happen during execution.
+ */
+ @DeveloperApi
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
@@ -1021,8 +1078,10 @@ class SparkContext(
}
/**
+ * :: DeveloperApi ::
* Run a job that can return approximate results.
*/
+ @DeveloperApi
def runApproximateJob[T, U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
@@ -1040,6 +1099,7 @@ class SparkContext(
/**
* Submit a job for execution and return a FutureJob holding the result.
*/
+ @Experimental
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 9ea123f174b95..915315ed74436 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -25,6 +25,7 @@ import scala.util.Properties
import akka.actor._
import com.google.common.collect.MapMaker
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
@@ -35,13 +36,18 @@ import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}
/**
+ * :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
+ *
+ * NOTE: This is not intended for external use. This is exposed for Shark and may be made private
+ * in a future release.
*/
-class SparkEnv private[spark] (
+@DeveloperApi
+class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val serializer: Serializer,
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index be53ca2968cfb..dc5a19ecd738e 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -19,8 +19,14 @@ package org.apache.spark
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
+/**
+ * :: DeveloperApi ::
+ * Contextual information about a task which can be read or mutated during execution.
+ */
+@DeveloperApi
class TaskContext(
val stageId: Int,
val partitionId: Int,
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index f1a753b6ab8a9..a3074916d13e7 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -17,29 +17,35 @@
package org.apache.spark
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
/**
+ * :: DeveloperApi ::
* Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
* tasks several times for "ephemeral" failures, and only report back failures that require some
* old stages to be resubmitted, such as shuffle map fetch failures.
*/
-private[spark] sealed trait TaskEndReason
+@DeveloperApi
+sealed trait TaskEndReason
-private[spark] case object Success extends TaskEndReason
+@DeveloperApi
+case object Success extends TaskEndReason
-private[spark]
+@DeveloperApi
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
-private[spark] case class FetchFailed(
+@DeveloperApi
+case class FetchFailed(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends TaskEndReason
-private[spark] case class ExceptionFailure(
+@DeveloperApi
+case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
@@ -47,21 +53,28 @@ private[spark] case class ExceptionFailure(
extends TaskEndReason
/**
+ * :: DeveloperApi ::
* The task finished successfully, but the result was lost from the executor's block manager before
* it was fetched.
*/
-private[spark] case object TaskResultLost extends TaskEndReason
+@DeveloperApi
+case object TaskResultLost extends TaskEndReason
-private[spark] case object TaskKilled extends TaskEndReason
+@DeveloperApi
+case object TaskKilled extends TaskEndReason
/**
+ * :: DeveloperApi ::
* The task failed because the executor that it was running on was lost. This may happen because
* the task crashed the JVM.
*/
-private[spark] case object ExecutorLostFailure extends TaskEndReason
+@DeveloperApi
+case object ExecutorLostFailure extends TaskEndReason
/**
+ * :: DeveloperApi ::
* We don't know why the task ended -- for example, because of a ClassNotFound exception when
* deserializing the task result.
*/
-private[spark] case object UnknownReason extends TaskEndReason
+@DeveloperApi
+case object UnknownReason extends TaskEndReason
diff --git a/core/src/main/scala/org/apache/spark/annotation/AlphaComponent.java b/core/src/main/scala/org/apache/spark/annotation/AlphaComponent.java
new file mode 100644
index 0000000000000..af01fb7cfbd04
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/annotation/AlphaComponent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.annotation;
+
+import java.lang.annotation.*;
+
+/** A new component of Spark which may have unstable API's. */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
+ ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE})
+public @interface AlphaComponent {}
diff --git a/core/src/main/scala/org/apache/spark/annotation/DeveloperApi.java b/core/src/main/scala/org/apache/spark/annotation/DeveloperApi.java
new file mode 100644
index 0000000000000..5d546e7a63985
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/annotation/DeveloperApi.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * A lower-level, unstable API intended for developers.
+ *
+ * Developer API's might change or be removed in minor versions of Spark.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
+ ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE})
+public @interface DeveloperApi {}
diff --git a/core/src/main/scala/org/apache/spark/annotation/Experimental.java b/core/src/main/scala/org/apache/spark/annotation/Experimental.java
new file mode 100644
index 0000000000000..306b1418d8d0a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/annotation/Experimental.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * An experimental user-facing API.
+ *
+ * Experimental API's might change or be removed in minor versions of Spark, or be adopted as
+ * first-class Spark API's.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
+ ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE})
+public @interface Experimental {}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index f816bb43a5b44..537f410b0ca26 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -23,6 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.Partitioner
import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions
+import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD
@@ -184,14 +185,26 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
def meanApprox(timeout: Long, confidence: JDouble): PartialResult[BoundedDouble] =
srdd.meanApprox(timeout, confidence)
- /** (Experimental) Approximate operation to return the mean within a timeout. */
+ /**
+ * :: Experimental ::
+ * Approximate operation to return the mean within a timeout.
+ */
+ @Experimental
def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout)
- /** (Experimental) Approximate operation to return the sum within a timeout. */
+ /**
+ * :: Experimental ::
+ * Approximate operation to return the sum within a timeout.
+ */
+ @Experimental
def sumApprox(timeout: Long, confidence: JDouble): PartialResult[BoundedDouble] =
srdd.sumApprox(timeout, confidence)
- /** (Experimental) Approximate operation to return the sum within a timeout. */
+ /**
+ * :: Experimental ::
+ * Approximate operation to return the sum within a timeout.
+ */
+ @Experimental
def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout)
/**
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index e6c5d85917678..a41c7dbda2afc 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -27,11 +27,12 @@ import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
+import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
@@ -201,16 +202,20 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey())
/**
- * (Experimental) Approximate version of countByKey that can return a partial result if it does
+ * :: Experimental ::
+ * Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
+ @Experimental
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout).map(mapAsJavaMap)
/**
- * (Experimental) Approximate version of countByKey that can return a partial result if it does
+ * :: Experimental ::
+ * Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
+ @Experimental
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index ae577b500ccb4..725c423a53e35 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -27,6 +27,7 @@ import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _}
@@ -343,16 +344,20 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def count(): Long = rdd.count()
/**
- * (Experimental) Approximate version of count() that returns a potentially incomplete result
+ * :: Experimental ::
+ * Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
+ @Experimental
def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
rdd.countApprox(timeout, confidence)
/**
- * (Experimental) Approximate version of count() that returns a potentially incomplete result
+ * :: Experimental ::
+ * Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
+ @Experimental
def countApprox(timeout: Long): PartialResult[BoundedDouble] =
rdd.countApprox(timeout)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index a2855d4db1d2e..1e8242a2cbbce 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -89,7 +89,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*/
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
environment: JMap[String, String]) =
- this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment))
+ this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map()))
private[spark] val env = sc.env
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
index ecbf18849ad48..22810cb1c662d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
import com.google.common.base.Optional
-object JavaUtils {
+private[spark] object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
option match {
case Some(value) => Optional.of(value)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
index c7f7c59cfb449..8c8ce9b1691ac 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
@@ -16,16 +16,19 @@
*/
package org.apache.spark.broadcast
-import org.apache.spark.SecurityManager
+import org.apache.spark.SecurityManager
import org.apache.spark.SparkConf
+import org.apache.spark.annotation.DeveloperApi
/**
- * An interface for all the broadcast implementations in Spark (to allow
+ * :: DeveloperApi ::
+ * An interface for all the broadcast implementations in Spark (to allow
* multiple broadcast implementations). SparkContext uses a user-specified
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
+@DeveloperApi
trait BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 127f5e90f3e1a..0ed52cfe9df61 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.spark.metrics.source.Source
-class ExecutorSource(val executor: Executor, executorId: String) extends Source {
+private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 88625e79a5c68..e4f02a4be0b97 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,8 +17,14 @@
package org.apache.spark.executor
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.{BlockId, BlockStatus}
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked during the execution of a task.
+ */
+@DeveloperApi
class TaskMetrics extends Serializable {
/**
* Host's name the task runs on
@@ -77,11 +83,16 @@ class TaskMetrics extends Serializable {
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
}
-object TaskMetrics {
- private[spark] def empty(): TaskMetrics = new TaskMetrics
+private[spark] object TaskMetrics {
+ def empty(): TaskMetrics = new TaskMetrics
}
+/**
+ * :: DeveloperApi ::
+ * Metrics pertaining to shuffle data read in a given task.
+ */
+@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
* Absolute time when this task finished reading shuffle data
@@ -116,6 +127,11 @@ class ShuffleReadMetrics extends Serializable {
var remoteBytesRead: Long = _
}
+/**
+ * :: DeveloperApi ::
+ * Metrics pertaining to shuffle data written in a given task.
+ */
+@DeveloperApi
class ShuffleWriteMetrics extends Serializable {
/**
* Number of bytes written for the shuffle by this task
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 059e58824c39b..e1a5ee316bb69 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -23,11 +23,18 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
+import org.apache.spark.annotation.DeveloperApi
/**
+ * :: DeveloperApi ::
* CompressionCodec allows the customization of choosing different compression implementations
* to be used in block storage.
+ *
+ * Note: The wire protocol for a codec is not guaranteed compatible across versions of Spark.
+ * This is intended for use as an internal compression utility within a single
+ * Spark application.
*/
+@DeveloperApi
trait CompressionCodec {
def compressedOutputStream(s: OutputStream): OutputStream
@@ -52,8 +59,14 @@ private[spark] object CompressionCodec {
/**
+ * :: DeveloperApi ::
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].
+ *
+ * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
+ * of Spark. This is intended for use as an internal compression utility within a single Spark
+ * application.
*/
+@DeveloperApi
class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
@@ -65,9 +78,15 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
/**
+ * :: DeveloperApi ::
* Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
* Block size can be configured by spark.io.compression.snappy.block.size.
+ *
+ * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
+ * of Spark. This is intended for use as an internal compression utility within a single Spark
+ * application.
*/
+@DeveloperApi
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
index 64eac73605388..05852f1f98993 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
@@ -25,7 +25,7 @@ import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem
-class ConsoleSink(val property: Properties, val registry: MetricRegistry,
+private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
index 544848d4150b6..42c1200926fea 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
@@ -26,7 +26,7 @@ import com.codahale.metrics.{CsvReporter, MetricRegistry}
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem
-class CsvSink(val property: Properties, val registry: MetricRegistry,
+private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
index 7f0a2fd16fa99..aeb4ad44a0647 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -27,7 +27,7 @@ import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem
-class GraphiteSink(val property: Properties, val registry: MetricRegistry,
+private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val GRAPHITE_DEFAULT_PERIOD = 10
val GRAPHITE_DEFAULT_UNIT = "SECONDS"
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
index 3b5edd5c376f0..ed27234b4e760 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import com.codahale.metrics.{JmxReporter, MetricRegistry}
import org.apache.spark.SecurityManager
-class JmxSink(val property: Properties, val registry: MetricRegistry,
+private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
index 854b52c510e3d..571539ba5e467 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -30,7 +30,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.SecurityManager
import org.apache.spark.ui.JettyUtils._
-class MetricsServlet(val property: Properties, val registry: MetricRegistry,
+private[spark] class MetricsServlet(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
index 3a739aa563eae..6f2b5a06027ea 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
@@ -17,7 +17,7 @@
package org.apache.spark.metrics.sink
-trait Sink {
+private[spark] trait Sink {
def start: Unit
def stop: Unit
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala b/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala
index 75cb2b8973aa1..f865f9648a91e 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala
@@ -20,7 +20,7 @@ package org.apache.spark.metrics.source
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
-class JvmSource extends Source {
+private[spark] class JvmSource extends Source {
val sourceName = "jvm"
val metricRegistry = new MetricRegistry()
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/Source.scala b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala
index 3fee55cc6dcd5..1dda2cd83b2a9 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/Source.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala
@@ -19,7 +19,7 @@ package org.apache.spark.metrics.source
import com.codahale.metrics.MetricRegistry
-trait Source {
+private[spark] trait Source {
def sourceName: String
def metricRegistry: MetricRegistry
}
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 2625a7f6a575a..59bbb1171f239 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -32,7 +32,16 @@ package org.apache
*
* Java programmers should reference the [[spark.api.java]] package
* for Spark programming APIs in Java.
+ *
+ * Classes and methods marked with
+ * Experimental are user-facing features which have not been officially adopted by the
+ * Spark project. These are subject to change or removal in minor releases.
+ *
+ * Classes and methods marked with
+ * Developer API are intended for advanced users want to extend Spark through lower
+ * level interfaces. These are subject to changes or removal in minor releases.
*/
+
package object spark {
// For package docs only
}
diff --git a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
index 5f4450859cc9b..aed0353344427 100644
--- a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
+++ b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
@@ -17,9 +17,13 @@
package org.apache.spark.partial
+import org.apache.spark.annotation.Experimental
+
/**
- * A Double with error bars on it.
+ * :: Experimental ::
+ * A Double value with error bars and associated confidence.
*/
+@Experimental
class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
override def toString(): String = "[%.3f, %.3f]".format(low, high)
}
diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
index 812368e04ac0d..eade07fbcbe37 100644
--- a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
+++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
@@ -17,6 +17,9 @@
package org.apache.spark.partial
+import org.apache.spark.annotation.Experimental
+
+@Experimental
class PartialResult[R](initialVal: R, isFinal: Boolean) {
private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
private var failure: Option[Exception] = None
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index d1c74a5063510..aed951a40b40c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -24,11 +24,14 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
+import org.apache.spark.annotation.Experimental
/**
+ * :: Experimental ::
* A set of asynchronous RDD actions available through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
+@Experimental
class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 9aa454a5c8b88..c6e79557f08a1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
import org.apache.spark.serializer.Serializer
@@ -51,12 +52,17 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
}
/**
+ * :: DeveloperApi ::
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
*
+ * Note: This is an internal API. We recommend users use RDD.coGroup(...) instead of
+ * instantiating this directly.
+
* @param rdds parent RDDs.
- * @param part partitioner used to partition the shuffle output.
+ * @param part partitioner used to partition the shuffle output
*/
+@DeveloperApi
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 4e82b51313bf0..44401a663440c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark._
* @param parentsIndices list of indices in the parent that have been coalesced into this partition
* @param preferredLocation the preferred location for this partition
*/
-case class CoalescedRDDPartition(
+private[spark] case class CoalescedRDDPartition(
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int],
@@ -70,7 +70,7 @@ case class CoalescedRDDPartition(
* @param maxPartitions number of desired partitions in the coalesced RDD
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
-class CoalescedRDD[T: ClassTag](
+private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index a7b6b3b5146ce..2306c9736b334 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.rdd
+import org.apache.spark.annotation.Experimental
import org.apache.spark.{TaskContext, Logging}
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.MeanEvaluator
@@ -63,14 +64,22 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
*/
def sampleVariance(): Double = stats().sampleVariance
- /** (Experimental) Approximate operation to return the mean within a timeout. */
+ /**
+ * :: Experimental ::
+ * Approximate operation to return the mean within a timeout.
+ */
+ @Experimental
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new MeanEvaluator(self.partitions.size, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}
- /** (Experimental) Approximate operation to return the sum within a timeout. */
+ /**
+ * :: Experimental ::
+ * Approximate operation to return the sum within a timeout.
+ */
+ @Experimental
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new SumEvaluator(self.partitions.size, confidence)
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
index a84e5f9fd8ef8..a2d7e344cf1b2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -22,9 +22,9 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, SparkContext, TaskContext}
/**
- * An RDD that is empty, i.e. has no element in it.
+ * An RDD that has no partitions and no elements.
*/
-class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
+private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
override def getPartitions: Array[Partition] = Array.empty
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 3af008bd72378..6811e1abb8b70 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator
@@ -70,9 +71,13 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}
/**
+ * :: DeveloperApi ::
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
* sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
*
+ * Note: Instantiating this class directly is not recommended, please use
+ * [[org.apache.spark.SparkContext.hadoopRDD()]]
+ *
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
@@ -84,6 +89,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
*/
+@DeveloperApi
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 1b503743ac117..a76a070b5b863 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -27,7 +27,7 @@ import org.apache.spark.util.NextIterator
private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
override def index = idx
}
-
+// TODO: Expose a jdbcRDD function in SparkContext and mark this as semi-private
/**
* An RDD that executes an SQL query on a JDBC connection and reads results.
* For usage example, see test case JdbcRDDSuite.
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 461a749eac48b..2d8dfa5a1645a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
private[spark]
class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
@@ -36,15 +37,20 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
}
/**
+ * :: DeveloperApi ::
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
* sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
*
+ * Note: Instantiating this class directly is not recommended, please use
+ * [[org.apache.spark.SparkContext.newAPIHadoopRDD()]]
+ *
* @param sc The SparkContext to associate the RDD with.
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param conf The Hadoop configuration.
*/
+@DeveloperApi
class NewHadoopRDD[K, V](
sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index a92a84b5342d1..343e4325c0ef0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -39,6 +39,7 @@ RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.spark._
+import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.SparkHadoopWriter
import org.apache.spark.Partitioner.defaultPartitioner
@@ -201,9 +202,11 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
/**
- * (Experimental) Approximate version of countByKey that can return a partial result if it does
+ * :: Experimental ::
+ * Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
+ @Experimental
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[Map[K, BoundedDouble]] = {
self.map(_._1).countByValueApprox(timeout, confidence)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index b0440ca7f32cf..f781a8d776f2a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -20,8 +20,10 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
import org.apache.spark.{NarrowDependency, Partition, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
-class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition) extends Partition {
+private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition)
+ extends Partition {
override val index = idx
}
@@ -30,7 +32,7 @@ class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition) extends
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
-class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
+private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {
@transient
@@ -45,11 +47,13 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
/**
+ * :: DeveloperApi ::
* A RDD used to prune RDD partitions/partitions so we can avoid launching tasks on
* all partitions. An example use case: If we know the RDD is partitioned by range,
* and the execution DAG has a filter on the key, we can avoid launching tasks
* on partitions that don't have the range covering the key.
*/
+@DeveloperApi
class PartitionPruningRDD[T: ClassTag](
@transient prev: RDD[T],
@transient partitionFilterFunc: Int => Boolean)
@@ -63,6 +67,7 @@ class PartitionPruningRDD[T: ClassTag](
}
+@DeveloperApi
object PartitionPruningRDD {
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
index ce4c0d382baab..b4e3bb5d75e17 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
@@ -42,7 +42,7 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
* @tparam T input RDD item type
* @tparam U sampled RDD item type
*/
-class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
+private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
@transient seed: Long = System.nanoTime)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 41ae0fec823e7..e441d4a40ccd2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -37,7 +37,7 @@ import org.apache.spark.util.Utils
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
*/
-class PipedRDD[T: ClassTag](
+private[spark] class PipedRDD[T: ClassTag](
prev: RDD[T],
command: Seq[String],
envVars: Map[String, String],
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 74fa2a4fcd401..3437b2cac19c2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext._
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
@@ -86,22 +87,34 @@ abstract class RDD[T: ClassTag](
// Methods that should be implemented by subclasses of RDD
// =======================================================================
- /** Implemented by subclasses to compute a given partition. */
+ /**
+ * :: DeveloperApi ::
+ * Implemented by subclasses to compute a given partition.
+ */
+ @DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
+ * :: DeveloperApi ::
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
+ @DeveloperApi
protected def getPartitions: Array[Partition]
/**
+ * :: DeveloperApi ::
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
+ @DeveloperApi
protected def getDependencies: Seq[Dependency[_]] = deps
- /** Optionally overridden by subclasses to specify placement preferences. */
+ /**
+ * :: DeveloperApi ::
+ * Optionally overridden by subclasses to specify placement preferences.
+ */
+ @DeveloperApi
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
@@ -518,9 +531,11 @@ abstract class RDD[T: ClassTag](
}
/**
+ * :: DeveloperApi ::
* Return a new RDD by applying a function to each partition of this RDD. This is a variant of
* mapPartitions that also passes the TaskContext into the closure.
*/
+ @DeveloperApi
def mapPartitionsWithContext[U: ClassTag](
f: (TaskContext, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
@@ -792,9 +807,11 @@ abstract class RDD[T: ClassTag](
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
/**
- * (Experimental) Approximate version of count() that returns a potentially incomplete result
+ * :: Experimental ::
+ * Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
+ @Experimental
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
var result = 0L
@@ -838,8 +855,10 @@ abstract class RDD[T: ClassTag](
}
/**
- * (Experimental) Approximate version of countByValue().
+ * :: Experimental ::
+ * Approximate version of countByValue().
*/
+ @Experimental
def countByValueApprox(
timeout: Long,
confidence: Double = 0.95
@@ -860,6 +879,7 @@ abstract class RDD[T: ClassTag](
}
/**
+ * :: Experimental ::
* Return approximate number of distinct elements in the RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
@@ -867,6 +887,7 @@ abstract class RDD[T: ClassTag](
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*/
+ @Experimental
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
index 4ceea557f569c..b097c30f8c231 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
@@ -33,7 +33,7 @@ class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition
}
@deprecated("Replaced by PartitionwiseSampledRDD", "1.0.0")
-class SampledRDD[T: ClassTag](
+private[spark] class SampledRDD[T: ClassTag](
prev: RDD[T],
withReplacement: Boolean,
frac: Double,
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index 02660ea6a45c5..802b0bdfb2d59 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
import org.apache.spark.{Dependency, Partition, Partitioner, ShuffleDependency, SparkEnv, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.Serializer
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -28,12 +29,14 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
}
/**
+ * :: DeveloperApi ::
* The resulting RDD from a shuffle (e.g. repartitioning of data).
* @param prev the parent RDD.
* @param part the partitioner used to partition the RDD
* @tparam K the key class.
* @tparam V the value class.
*/
+@DeveloperApi
class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](
@transient var prev: RDD[P],
part: Partitioner)
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index a447030752096..21c6e07d69f90 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
private[spark] class UnionPartition[T: ClassTag](idx: Int, rdd: RDD[T], splitIndex: Int)
extends Partition {
@@ -43,6 +44,7 @@ private[spark] class UnionPartition[T: ClassTag](idx: Int, rdd: RDD[T], splitInd
}
}
+@DeveloperApi
class UnionRDD[T: ClassTag](
sc: SparkContext,
@transient var rdds: Seq[RDD[T]])
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index b56643444aa40..f3d30f6c9b32f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -41,7 +41,7 @@ private[spark] class ZippedPartitionsPartition(
}
}
-abstract class ZippedPartitionsBaseRDD[V: ClassTag](
+private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
sc: SparkContext,
var rdds: Seq[RDD[_]],
preservesPartitioning: Boolean = false)
@@ -74,7 +74,7 @@ abstract class ZippedPartitionsBaseRDD[V: ClassTag](
}
}
-class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
+private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
@@ -94,7 +94,7 @@ class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
}
}
-class ZippedPartitionsRDD3
+private[spark] class ZippedPartitionsRDD3
[A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
@@ -119,7 +119,7 @@ class ZippedPartitionsRDD3
}
}
-class ZippedPartitionsRDD4
+private[spark] class ZippedPartitionsRDD4
[A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
index 2119e76f0e032..b8110ffc42f2d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
@@ -44,7 +44,7 @@ private[spark] class ZippedPartition[T: ClassTag, U: ClassTag](
}
}
-class ZippedRDD[T: ClassTag, U: ClassTag](
+private[spark] class ZippedRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1: RDD[T],
var rdd2: RDD[U])
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index b3f2cb346f7da..bac37bfdaa23f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -27,11 +27,14 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark.Logging
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
/**
+ * :: DeveloperApi ::
* Parses and holds information about inputFormat (and files) specified as a parameter.
*/
+@DeveloperApi
class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
val path: String) extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 7c5053998f1d6..713aebfa3ce00 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -25,9 +25,11 @@ import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.HashMap
import org.apache.spark._
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
/**
+ * :: DeveloperApi ::
* A logger class to record runtime information for jobs in Spark. This class outputs one log file
* for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass
* of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext
@@ -38,7 +40,7 @@ import org.apache.spark.executor.TaskMetrics
* to log application information as SparkListenerEvents. To enable this functionality, set
* spark.eventLog.enabled to true.
*/
-
+@DeveloperApi
@deprecated("Log application information by setting spark.eventLog.enabled.", "1.0.0")
class JobLogger(val user: String, val logDirName: String) extends SparkListener with Logging {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
index 047bd27056120..4cd6cbe189aab 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -17,11 +17,17 @@
package org.apache.spark.scheduler
+import org.apache.spark.annotation.DeveloperApi
+
/**
+ * :: DeveloperApi ::
* A result of a job in the DAGScheduler.
*/
-private[spark] sealed trait JobResult
+@DeveloperApi
+sealed trait JobResult
-private[spark] case object JobSucceeded extends JobResult
+@DeveloperApi
+case object JobSucceeded extends JobResult
+@DeveloperApi
private[spark] case class JobFailed(exception: Exception) extends JobResult
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d42e67742a4f7..ced20350d5356 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -23,21 +23,28 @@ import scala.collection.Map
import scala.collection.mutable
import org.apache.spark.{Logging, TaskEndReason}
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Distribution, Utils}
+@DeveloperApi
sealed trait SparkListenerEvent
+@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerTaskEnd(
stageId: Int,
taskType: String,
@@ -46,20 +53,26 @@ case class SparkListenerTaskEnd(
taskMetrics: TaskMetrics)
extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null)
extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])
extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
@@ -67,8 +80,11 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent
/**
- * Interface for listening to events from the Spark scheduler.
+ * :: DeveloperApi ::
+ * Interface for listening to events from the Spark scheduler. Note that this is an internal
+ * interface which might change in different Spark releases.
*/
+@DeveloperApi
trait SparkListener {
/**
* Called when a stage completes successfully or fails, with information on the completed stage.
@@ -128,8 +144,10 @@ trait SparkListener {
}
/**
+ * :: DeveloperApi ::
* Simple SparkListener that logs a few summary statistics when each stage completes
*/
+@DeveloperApi
class StatsReportListener extends SparkListener with Logging {
import org.apache.spark.scheduler.StatsReportListener._
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
index 5b40a3eb29b30..b85eabd6bbdbc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
@@ -19,8 +19,11 @@ package org.apache.spark.scheduler
import collection.mutable.ArrayBuffer
+import org.apache.spark.annotation.DeveloperApi
+
// information about a specific split instance : handles both split instances.
// So that we do not need to worry about the differences.
+@DeveloperApi
class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String,
val length: Long, val underlyingSplit: Any) {
override def toString(): String = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index eec409b182ac6..9f732f7191465 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -17,12 +17,14 @@
package org.apache.spark.scheduler
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.RDDInfo
/**
+ * :: DeveloperApi ::
* Stores information about a stage to pass from the scheduler to SparkListeners.
*/
-private[spark]
+@DeveloperApi
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 6183b125def99..4c62e4dc0bac8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -17,10 +17,13 @@
package org.apache.spark.scheduler
+import org.apache.spark.annotation.DeveloperApi
+
/**
+ * :: DeveloperApi ::
* Information about a running task attempt inside a TaskSet.
*/
-private[spark]
+@DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
@@ -46,15 +49,15 @@ class TaskInfo(
var serializedSize: Int = 0
- def markGettingResult(time: Long = System.currentTimeMillis) {
+ private[spark] def markGettingResult(time: Long = System.currentTimeMillis) {
gettingResultTime = time
}
- def markSuccessful(time: Long = System.currentTimeMillis) {
+ private[spark] def markSuccessful(time: Long = System.currentTimeMillis) {
finishTime = time
}
- def markFailed(time: Long = System.currentTimeMillis) {
+ private[spark] def markFailed(time: Long = System.currentTimeMillis) {
finishTime = time
failed = true
}
@@ -83,11 +86,11 @@ class TaskInfo(
def duration: Long = {
if (!finished) {
- throw new UnsupportedOperationException("duration() called on unfinished tasks")
+ throw new UnsupportedOperationException("duration() called on unfinished task")
} else {
finishTime - launchTime
}
}
- def timeRunning(currentTime: Long): Long = currentTime - launchTime
+ private[spark] def timeRunning(currentTime: Long): Long = currentTime - launchTime
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
index 308edb12edd5c..eb920ab0c0b67 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
@@ -17,7 +17,10 @@
package org.apache.spark.scheduler
-private[spark] object TaskLocality extends Enumeration {
+import org.apache.spark.annotation.DeveloperApi
+
+@DeveloperApi
+object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 18a68b05fa853..5e5883554fcc1 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -21,6 +21,7 @@ import java.io._
import java.nio.ByteBuffer
import org.apache.spark.SparkConf
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.ByteBufferInputStream
private[spark] class JavaSerializationStream(out: OutputStream, counterReset: Int)
@@ -94,8 +95,14 @@ private[spark] class JavaSerializerInstance(counterReset: Int) extends Serialize
}
/**
+ * :: DeveloperApi ::
* A Spark serializer that uses Java's built-in serialization.
+ *
+ * Note that this serializer is not guaranteed to be wire-compatible across different versions of
+ * Spark. It is intended to be used to serialize/de-serialize data within a single
+ * Spark application.
*/
+@DeveloperApi
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000)
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 926e71573be32..d1e8c3ef63622 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -33,6 +33,10 @@ import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
+ *
+ * Note that this serializer is not guaranteed to be wire-compatible across different versions of
+ * Spark. It is intended to be used to serialize/de-serialize data within a single
+ * Spark application.
*/
class KryoSerializer(conf: SparkConf)
extends org.apache.spark.serializer.Serializer
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 099143494b851..9f04dc6e427c0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -22,20 +22,27 @@ import java.nio.ByteBuffer
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
import org.apache.spark.SparkEnv
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
/**
+ * :: DeveloperApi ::
* A serializer. Because some serialization libraries are not thread safe, this class is used to
* create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual
* serialization and are guaranteed to only be called from one thread at a time.
*
* Implementations of this trait should implement:
+ *
* 1. a zero-arg constructor or a constructor that accepts a [[org.apache.spark.SparkConf]]
* as parameter. If both constructors are defined, the latter takes precedence.
*
* 2. Java serialization interface.
+ *
+ * Note that serializers are not required to be wire-compatible across different versions of Spark.
+ * They are intended to be used to serialize/de-serialize data within a single Spark application.
*/
+@DeveloperApi
trait Serializer {
def newInstance(): SerializerInstance
}
@@ -49,8 +56,10 @@ object Serializer {
/**
+ * :: DeveloperApi ::
* An instance of a serializer, for use by one thread at a time.
*/
+@DeveloperApi
trait SerializerInstance {
def serialize[T](t: T): ByteBuffer
@@ -81,8 +90,10 @@ trait SerializerInstance {
/**
+ * :: DeveloperApi ::
* A stream for writing serialized objects.
*/
+@DeveloperApi
trait SerializationStream {
def writeObject[T](t: T): SerializationStream
def flush(): Unit
@@ -98,8 +109,10 @@ trait SerializationStream {
/**
+ * :: DeveloperApi ::
* A stream for reading serialized objects.
*/
+@DeveloperApi
trait DeserializationStream {
def readObject[T](): T
def close(): Unit
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b021564477c47..a2a729130091f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -35,11 +35,11 @@ import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
import org.apache.spark.util._
-sealed trait Values
+private[spark] sealed trait Values
-case class ByteBufferValues(buffer: ByteBuffer) extends Values
-case class IteratorValues(iterator: Iterator[Any]) extends Values
-case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values
+private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends Values
+private[spark] case class IteratorValues(iterator: Iterator[Any]) extends Values
+private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values
private[spark] class BlockManager(
executorId: String,
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index ff6e84cf9819a..07255aa366a6d 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -21,6 +21,7 @@ import scala.collection.Map
import scala.collection.mutable
import org.apache.spark.SparkContext
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
private[spark]
@@ -47,6 +48,7 @@ class StorageStatus(
}
}
+@DeveloperApi
private[spark]
class RDDInfo(
val id: Int,
diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
index c3692f2fd929b..b9f4a5d720b93 100644
--- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
+++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
@@ -28,7 +28,7 @@ import scala.collection.generic.Growable
* class and modifies it such that only the top K elements are retained.
* The top K elements are defined by an implicit Ordering[A].
*/
-class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
+private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
extends Iterable[A] with Growable[A] with Serializable {
private val underlying = new JPriorityQueue[A](maxSize, ord)
diff --git a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala b/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
index db3db87e6618e..93235031f3ad5 100644
--- a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
+++ b/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
@@ -22,7 +22,7 @@ import java.util
import scala.Array
import scala.reflect._
-object CollectionsUtils {
+private[spark] object CollectionsUtils {
def makeBinarySearch[K <% Ordered[K] : ClassTag] : (Array[K], K) => Int = {
classTag[K] match {
case ClassTag.Float =>
diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala
index 5b347555fe708..a465298c8c5ab 100644
--- a/core/src/main/scala/org/apache/spark/util/Distribution.scala
+++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala
@@ -29,7 +29,7 @@ import scala.collection.immutable.IndexedSeq
*
* Assumes you are giving it a non-empty set of data
*/
-class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) {
+private[spark] class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) {
require(startIdx < endIdx)
def this(data: Traversable[Double]) = this(data.toArray, 0, data.size)
java.util.Arrays.sort(data, startIdx, endIdx)
@@ -69,7 +69,7 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int)
}
}
-object Distribution {
+private[spark] object Distribution {
def apply(data: Traversable[Double]): Option[Distribution] = {
if (data.size > 0) {
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index a0c07e32fdc98..b5f2ec6831d26 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -36,7 +36,7 @@ import org.apache.spark.io.CompressionCodec
* @param compress Whether to compress output
* @param overwrite Whether to overwrite existing files
*/
-class FileLogger(
+private[spark] class FileLogger(
logDir: String,
conf: SparkConf = new SparkConf,
outputBufferSize: Int = 8 * 1024, // 8 KB
diff --git a/core/src/main/scala/org/apache/spark/util/MutablePair.scala b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
index a6b39247a54ca..74fa77b68de0b 100644
--- a/core/src/main/scala/org/apache/spark/util/MutablePair.scala
+++ b/core/src/main/scala/org/apache/spark/util/MutablePair.scala
@@ -17,13 +17,17 @@
package org.apache.spark.util
+import org.apache.spark.annotation.DeveloperApi
+
/**
+ * :: DeveloperApi ::
* A tuple of 2 elements. This can be used as an alternative to Scala's Tuple2 when we want to
* minimize object allocation.
*
* @param _1 Element 1 of this MutablePair
* @param _2 Element 2 of this MutablePair
*/
+@DeveloperApi
case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef */) T1,
@specialized(Int, Long, Double, Char, Boolean/* , AnyRef */) T2]
(var _1: T1, var _2: T2)
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
index 19bece86b36b4..7cd8f28b12dd6 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions
import scala.collection.mutable.Set
-class TimeStampedHashSet[A] extends Set[A] {
+private[spark] class TimeStampedHashSet[A] extends Set[A] {
val internalMap = new ConcurrentHashMap[A, Long]()
def contains(key: A): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index dc4b8f253f259..3c8f94a416c65 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -21,6 +21,7 @@ import scala.util.Random
import org.apache.spark.util.random.XORShiftRandom
+@deprecated("Use Vectors.dense from Spark's mllib.linalg package instead.", "1.0.0")
class Vector(val elements: Array[Double]) extends Serializable {
def length = elements.length
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index b8c852b4ff5c7..025492b177a77 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -19,7 +19,10 @@ package org.apache.spark.util.collection
import java.util.{Arrays, Comparator}
+import org.apache.spark.annotation.DeveloperApi
+
/**
+ * :: DeveloperApi ::
* A simple open hash table optimized for the append-only use case, where keys
* are never removed, but the value for each key may be changed.
*
@@ -29,9 +32,9 @@ import java.util.{Arrays, Comparator}
*
* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
-private[spark]
-class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
- V)] with Serializable {
+@DeveloperApi
+class AppendOnlyMap[K, V](initialCapacity: Int = 64)
+ extends Iterable[(K, V)] with Serializable {
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
require(initialCapacity >= 1, "Invalid initial capacity")
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index caa06d5b445b4..dd01ae821f705 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -27,10 +27,12 @@ import com.google.common.io.ByteStreams
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BlockManager}
/**
+ * :: DeveloperApi ::
* An append-only map that spills sorted content to disk when there is insufficient space for it
* to grow.
*
@@ -55,8 +57,8 @@ import org.apache.spark.storage.{BlockId, BlockManager}
* `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of
* this threshold, in case map size estimation is not sufficiently accurate.
*/
-
-private[spark] class ExternalAppendOnlyMap[K, V, C](
+@DeveloperApi
+class ExternalAppendOnlyMap[K, V, C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index c26f23d50024a..62f99f3981793 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -19,14 +19,17 @@ package org.apache.spark.util.collection
import scala.reflect.ClassTag
+import org.apache.spark.annotation.DeveloperApi
+
/**
+ * :: DeveloperApi ::
* A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
* but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
* space overhead.
*
* Under the hood, it uses our OpenHashSet implementation.
*/
-private[spark]
+@DeveloperApi
class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
initialCapacity: Int)
extends Iterable[(K, V)]
diff --git a/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala b/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala
index 98569143ee1e3..70f3dd62b9b19 100644
--- a/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala
@@ -17,9 +17,13 @@
package org.apache.spark.util.random
+import org.apache.spark.annotation.DeveloperApi
+
/**
+ * :: DeveloperApi ::
* A class with pseudorandom behavior.
*/
+@DeveloperApi
trait Pseudorandom {
/** Set random seed. */
def setSeed(seed: Long)
diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
index 0f1fca4813ba9..37a6b04f5200f 100644
--- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
@@ -22,7 +22,10 @@ import java.util.Random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
+import org.apache.spark.annotation.DeveloperApi
+
/**
+ * :: DeveloperApi ::
* A pseudorandom sampler. It is possible to change the sampled item type. For example, we might
* want to add weights for stratified sampling or importance sampling. Should only use
* transformations that are tied to the sampler and cannot be applied after sampling.
@@ -30,6 +33,7 @@ import cern.jet.random.engine.DRand
* @tparam T item type
* @tparam U sampled item type
*/
+@DeveloperApi
trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable {
/** take a random sample */
@@ -40,6 +44,7 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable
}
/**
+ * :: DeveloperApi ::
* A sampler based on Bernoulli trials.
*
* @param lb lower bound of the acceptance range
@@ -47,6 +52,7 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable
* @param complement whether to use the complement of the range specified, default to false
* @tparam T item type
*/
+@DeveloperApi
class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false)
(implicit random: Random = new XORShiftRandom)
extends RandomSampler[T, T] {
@@ -67,11 +73,13 @@ class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false)
}
/**
+ * :: DeveloperApi ::
* A sampler based on values drawn from Poisson distribution.
*
* @param poisson a Poisson random number generator
* @tparam T item type
*/
+@DeveloperApi
class PoissonSampler[T](mean: Double)
(implicit var poisson: Poisson = new Poisson(mean, new DRand))
extends RandomSampler[T, T] {
diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb
index 2245bcbc70f1e..bbd56d2fd13bb 100644
--- a/docs/_plugins/copy_api_dirs.rb
+++ b/docs/_plugins/copy_api_dirs.rb
@@ -24,7 +24,9 @@
external_projects = ["flume", "kafka", "mqtt", "twitter", "zeromq"]
sql_projects = ["catalyst", "core", "hive"]
- projects = core_projects + external_projects.map { |project_name| "external/" + project_name }
+ projects = core_projects
+ projects = projects + external_projects.map { |project_name| "external/" + project_name }
+ projects = projects + sql_projects.map { |project_name| "sql/" + project_name }
puts "Moving to project root and building scaladoc."
curr_dir = pwd
@@ -42,24 +44,22 @@
source = "../" + project_name + "/target/scala-2.10/api"
dest = "api/" + project_name
- puts "echo making directory " + dest
+ puts "making directory " + dest
mkdir_p dest
# From the rubydoc: cp_r('src', 'dest') makes src/dest, but this doesn't.
puts "cp -r " + source + "/. " + dest
cp_r(source + "/.", dest)
- end
-
- sql_projects.each do |project_name|
- source = "../sql/" + project_name + "/target/scala-2.10/api/"
- dest = "api/sql/" + project_name
- puts "echo making directory " + dest
- mkdir_p dest
+ # Append custom JavaScript
+ js = File.readlines("./js/api-docs.js")
+ js_file = dest + "/lib/template.js"
+ File.open(js_file, 'a') { |f| f.write("\n" + js.join()) }
- # From the rubydoc: cp_r('src', 'dest') makes src/dest, but this doesn't.
- puts "cp -r " + source + "/. " + dest
- cp_r(source + "/.", dest)
+ # Append custom CSS
+ css = File.readlines("./css/api-docs.css")
+ css_file = dest + "/lib/template.css"
+ File.open(css_file, 'a') { |f| f.write("\n" + css.join()) }
end
# Build Epydoc for Python
diff --git a/docs/css/api-docs.css b/docs/css/api-docs.css
new file mode 100644
index 0000000000000..b2d1d7f869790
--- /dev/null
+++ b/docs/css/api-docs.css
@@ -0,0 +1,18 @@
+/* Dynamically injected style for the API docs */
+
+.developer {
+ background-color: #44751E;
+}
+
+.experimental {
+ background-color: #257080;
+}
+
+.alphaComponent {
+ background-color: #bb0000;
+}
+
+.badge {
+ font-family: Arial, san-serif;
+ float: right;
+}
diff --git a/docs/js/api-docs.js b/docs/js/api-docs.js
new file mode 100644
index 0000000000000..1414b6d0b81a1
--- /dev/null
+++ b/docs/js/api-docs.js
@@ -0,0 +1,26 @@
+/* Dynamically injected post-processing code for the API docs */
+
+$(document).ready(function() {
+ var annotations = $("dt:contains('Annotations')").next("dd").children("span.name");
+ addBadges(annotations, "AlphaComponent", ":: AlphaComponent ::", "Alpha Component");
+ addBadges(annotations, "DeveloperApi", ":: DeveloperApi ::", "Developer API");
+ addBadges(annotations, "Experimental", ":: Experimental ::", "Experimental");
+});
+
+function addBadges(allAnnotations, name, tag, html) {
+ var annotations = allAnnotations.filter(":contains('" + name + "')")
+ var tags = $(".cmt:contains(" + tag + ")")
+
+ // Remove identifier tags from comments
+ tags.each(function(index) {
+ var oldHTML = $(this).html();
+ var newHTML = oldHTML.replace(tag, "");
+ $(this).html(newHTML);
+ });
+
+ // Add badges to all containers
+ tags.prevAll("h4.signature")
+ .add(annotations.closest("div.fullcommenttop"))
+ .add(annotations.closest("div.fullcomment").prevAll("h4.signature"))
+ .prepend(html);
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
index 425a5164cad24..ff17edeaf8f16 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/package.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
@@ -19,7 +19,10 @@ package org.apache.spark
import org.apache.spark.util.collection.OpenHashSet
-/** GraphX is a graph processing framework built on top of Spark. */
+/**
+ * ALPHA COMPONENT
+ * GraphX is a graph processing framework built on top of Spark.
+ */
package object graphx {
/**
* A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
index 0f06ea088e1a1..77b5429bad432 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
@@ -90,7 +90,7 @@ trait MutableRow extends Row {
def setString(ordinal: Int, value: String)
/**
- * EXPERIMENTAL
+ * Experimental
*
* Returns a mutable string builder for the specified column. A given row should return the
* result of any mutations made to the returned buffer next time getString is called for the same
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 3193787680d16..d3d4c56bafe41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -21,6 +21,7 @@ import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl
@@ -32,14 +33,14 @@ import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
import org.apache.spark.sql.execution._
/**
- * ALPHA COMPONENT
- *
+ * :: AlphaComponent ::
* The entry point for running relational queries using Spark. Allows the creation of [[SchemaRDD]]
* objects and the execution of SQL queries.
*
* @groupname userf Spark SQL Functions
* @groupname Ungrouped Support functions for language integrated queries.
*/
+@AlphaComponent
class SQLContext(@transient val sparkContext: SparkContext)
extends Logging
with dsl.ExpressionConversions
@@ -63,12 +64,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
new this.QueryExecution { val logical = plan }
/**
- * EXPERIMENTAL
- *
+ * :: Experimental ::
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
* interface is considered internal, and thus not guranteed to be stable. As a result, using
* them directly is not reccomended.
*/
+ @Experimental
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index fc95781448569..16da7fd92bffe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
+import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
@@ -26,8 +27,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.BooleanType
/**
- * ALPHA COMPONENT
- *
+ * :: AlphaComponent ::
* An RDD of [[Row]] objects that has an associated schema. In addition to standard RDD functions,
* SchemaRDDs can be used in relational queries, as shown in the examples below.
*
@@ -90,6 +90,7 @@ import org.apache.spark.sql.catalyst.types.BooleanType
* @groupprio schema -1
* @groupname Ungrouped Base RDD Functions
*/
+@AlphaComponent
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient protected[spark] val logicalPlan: LogicalPlan)
@@ -228,8 +229,7 @@ class SchemaRDD(
Filter(ScalaUdf(udf, BooleanType, Seq(UnresolvedAttribute(arg1.name))), logicalPlan))
/**
- * EXPERIMENTAL
- *
+ * :: Experimental ::
* Filters tuples using a function over a `Dynamic` version of a given Row. DynamicRows use
* scala's Dynamic trait to emulate an ORM of in a dynamically typed language. Since the type of
* the column is not known at compile time, all attributes are converted to strings before
@@ -241,18 +241,19 @@ class SchemaRDD(
*
* @group Query
*/
+ @Experimental
def where(dynamicUdf: (DynamicRow) => Boolean) =
new SchemaRDD(
sqlContext,
Filter(ScalaUdf(dynamicUdf, BooleanType, Seq(WrapDynamic(logicalPlan.output))), logicalPlan))
/**
- * EXPERIMENTAL
- *
+ * :: Experimental ::
* Returns a sampled version of the underlying dataset.
*
* @group Query
*/
+ @Experimental
def sample(
fraction: Double,
withReplacement: Boolean = true,
@@ -260,8 +261,7 @@ class SchemaRDD(
new SchemaRDD(sqlContext, Sample(fraction, withReplacement, seed, logicalPlan))
/**
- * EXPERIMENTAL
- *
+ * :: Experimental ::
* Applies the given Generator, or table generating function, to this relation.
*
* @param generator A table generating function. The API for such functions is likely to change
@@ -277,6 +277,7 @@ class SchemaRDD(
*
* @group Query
*/
+ @Experimental
def generate(
generator: Generator,
join: Boolean = false,
@@ -285,8 +286,7 @@ class SchemaRDD(
new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan))
/**
- * EXPERIMENTAL
- *
+ * :: Experimental ::
* Adds the rows from this RDD to the specified table. Note in a standard [[SQLContext]] there is
* no notion of persistent tables, and thus queries that contain this operator will fail to
* optimize. When working with an extension of a SQLContext that has a persistent catalog, such
@@ -294,6 +294,7 @@ class SchemaRDD(
*
* @group schema
*/
+ @Experimental
def insertInto(tableName: String, overwrite: Boolean = false) =
new SchemaRDD(
sqlContext,
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
similarity index 97%
rename from core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
rename to tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 8cea302eb14c3..8e8c35615a711 100644
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.storage
+package org.apache.spark.tools
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
@@ -25,7 +25,7 @@ import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
/**
- * Utility for micro-benchmarking shuffle write performance.
+ * Internal utility for micro-benchmarking shuffle write performance.
*
* Writes simulated shuffle output from several threads and records the observed throughput.
*/