diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index d43ef6c430cb1..06d957a1b2cb8 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -17,16 +17,18 @@ package org.apache.spark +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap} /** - * Developer API + * :: DeveloperApi :: * A set of functions used to aggregate data. * * @param createCombiner function to create the initial value of the aggregation. * @param mergeValue function to merge a new value into the aggregation result. * @param mergeCombiners function to merge outputs from multiple mergeValue function. */ +@DeveloperApi case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index ae3a921e61658..13f3dcd84d721 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -17,21 +17,24 @@ package org.apache.spark +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer /** - * Developer API + * :: DeveloperApi :: * Base class for dependencies. */ +@DeveloperApi abstract class Dependency[T](val rdd: RDD[T]) extends Serializable /** - * Developer API + * :: DeveloperApi :: * Base class for dependencies where each partition of the parent RDD is used by at most one * partition of the child RDD. Narrow dependencies allow for pipelined execution. */ +@DeveloperApi abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { /** * Get the parent partitions for a child partition. @@ -43,7 +46,7 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { /** - * Developer API + * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage. * @param rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output @@ -51,6 +54,7 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * the default serializer, as specified by `spark.serializer` config option, will * be used. */ +@DeveloperApi class ShuffleDependency[K, V]( @transient rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, @@ -62,22 +66,24 @@ class ShuffleDependency[K, V]( /** - * Developer API + * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. */ +@DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = List(partitionId) } /** - * Developer API + * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range */ +@DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 93b180d180ecf..07c499f49d2b1 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -21,14 +21,16 @@ import scala.concurrent._ import scala.concurrent.duration.Duration import scala.util.Try +import org.apache.spark.annotations.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} /** - * Experimental + * :: Experimental :: * A future for the result of an action to support cancellation. This is an extension of the * Scala Future interface to support cancellation. */ +@Experimental trait FutureAction[T] extends Future[T] { // Note that we redefine methods of the Future trait here explicitly so we can specify a different // documentation (with reference to the word "action"). @@ -85,10 +87,11 @@ trait FutureAction[T] extends Future[T] { /** - * Experimental + * :: Experimental :: * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. */ +@Experimental class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) extends FutureAction[T] { @@ -150,11 +153,12 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: /** - * Experimental + * :: Experimental :: * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take, * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ +@Experimental class ComplexFutureAction[T] extends FutureAction[T] { // Pointer to the thread that is executing the action. It is set when the action is run. diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index f4cb7973e52a0..ed33c7d2be88b 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -21,8 +21,10 @@ import org.apache.log4j.{LogManager, PropertyConfigurator} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder +import org.apache.spark.annotations.DeveloperApi + /** - * Developer API + * :: DeveloperApi :: * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows * logging messages at different levels using methods that only evaluate parameters lazily if the * log level is enabled. @@ -30,6 +32,7 @@ import org.slf4j.impl.StaticLoggerBinder * NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility. * This will likely be changed or removed in future releases. */ +@DeveloperApi trait Logging { // Make the log field transient so that objects with Logging can // be serialized and used on another machine diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index 73694d09485b1..9c44340cacadb 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.ObjectWritable import org.apache.hadoop.io.Writable -/** Developer API */ +import org.apache.spark.annotations.DeveloperApi + +@DeveloperApi class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable { def value = t override def toString = t.toString diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index dd4833102b72e..6464b3bc4572f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.mesos.MesosNativeLibrary +import org.apache.spark.annotations.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} @@ -48,14 +49,16 @@ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} /** + * :: DeveloperApi :: * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ -class SparkContext(config: SparkConf) - extends Logging { + +@DeveloperApi +class SparkContext(config: SparkConf) extends Logging { // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It @@ -63,13 +66,14 @@ class SparkContext(config: SparkConf) private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() /** - * Developer API + * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Ca * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ + @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData @@ -714,9 +718,10 @@ class SparkContext(config: SparkConf) } /** - * Developer API + * :: DeveloperApi :: * Register a listener to receive up-calls from events that happen during execution. */ + @DeveloperApi def addSparkListener(listener: SparkListener) { listenerBus.addListener(listener) } @@ -1026,9 +1031,10 @@ class SparkContext(config: SparkConf) } /** - * Developer API + * :: DeveloperApi :: * Run a job that can return approximate results. */ + @DeveloperApi def runApproximateJob[T, U, R]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, @@ -1044,9 +1050,9 @@ class SparkContext(config: SparkConf) } /** - * Experimental * Submit a job for execution and return a FutureJob holding the result. */ + @Experimental def submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 599688092791e..e4e1f5e6378da 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -25,6 +25,7 @@ import scala.util.Properties import akka.actor._ import com.google.common.collect.MapMaker +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem @@ -35,13 +36,14 @@ import org.apache.spark.storage._ import org.apache.spark.util.{AkkaUtils, Utils} /** - * Developer API + * :: DeveloperApi :: * Holds all the runtime environment objects for a running Spark instance (either master or worker), * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these * objects needs to have the right SparkEnv set. You can get the current environment with * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set. */ +@DeveloperApi class SparkEnv ( val executorId: String, val actorSystem: ActorSystem, diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index faccafbdc9a1d..ec42f75598696 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -19,12 +19,14 @@ package org.apache.spark import scala.collection.mutable.ArrayBuffer +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.executor.TaskMetrics /** - * Developer API + * :: DeveloperApi :: * Contextual information about a task which can be read or mutated during execution. */ +@DeveloperApi class TaskContext( val stageId: Int, val partitionId: Int, diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 626f1260cff04..01312a44714a4 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -17,25 +17,26 @@ package org.apache.spark +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId /** - * Developer API + * :: DeveloperApi :: * Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry * tasks several times for "ephemeral" failures, and only report back failures that require some * old stages to be resubmitted, such as shuffle map fetch failures. */ - +@DeveloperApi sealed trait TaskEndReason -/** Developer API */ +@DeveloperApi case object Success extends TaskEndReason -/** Developer API */ +@DeveloperApi case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it -/** Developer API */ +@DeveloperApi case class FetchFailed( bmAddress: BlockManagerId, shuffleId: Int, @@ -43,7 +44,7 @@ case class FetchFailed( reduceId: Int) extends TaskEndReason -/** Developer API */ +@DeveloperApi case class ExceptionFailure( className: String, description: String, @@ -52,25 +53,28 @@ case class ExceptionFailure( extends TaskEndReason /** - * Developer API + * :: DeveloperApi :: * The task finished successfully, but the result was lost from the executor's block manager before * it was fetched. */ +@DeveloperApi case object TaskResultLost extends TaskEndReason -/** Developer API */ +@DeveloperApi case object TaskKilled extends TaskEndReason /** - * Developer API + * :: DeveloperApi :: * The task failed because the executor that it was running on was lost. This may happen because * the task crashed the JVM. */ +@DeveloperApi case object ExecutorLostFailure extends TaskEndReason /** - * Developer API + * :: DeveloperApi :: * We don't know why the task ended -- for example, because of a ClassNotFound exception when * deserializing the task result. */ +@DeveloperApi case object UnknownReason extends TaskEndReason diff --git a/core/src/main/scala/org/apache/spark/annotations/AlphaComponent.java b/core/src/main/scala/org/apache/spark/annotations/AlphaComponent.java new file mode 100644 index 0000000000000..d54767ab84c7e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/annotations/AlphaComponent.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.annotations; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.SOURCE) +@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, + ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE}) +public @interface AlphaComponent {} diff --git a/core/src/main/scala/org/apache/spark/annotations/DeveloperApi.java b/core/src/main/scala/org/apache/spark/annotations/DeveloperApi.java new file mode 100644 index 0000000000000..74c63ea949b3e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/annotations/DeveloperApi.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.annotations; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.SOURCE) +@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, + ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE}) +public @interface DeveloperApi {} diff --git a/core/src/main/scala/org/apache/spark/annotations/Experimental.java b/core/src/main/scala/org/apache/spark/annotations/Experimental.java new file mode 100644 index 0000000000000..58445cc2c1e86 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/annotations/Experimental.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.annotations; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.SOURCE) +@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, + ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE}) +public @interface Experimental {} diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index 051547a199cbb..fd44abc37e16a 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -16,17 +16,19 @@ */ package org.apache.spark.broadcast -import org.apache.spark.SecurityManager +import org.apache.spark.SecurityManager import org.apache.spark.SparkConf +import org.apache.spark.annotations.DeveloperApi /** - * Developer API - * An interface for all the broadcast implementations in Spark (to allow + * :: DeveloperApi :: + * An interface for all the broadcast implementations in Spark (to allow * multiple broadcast implementations). SparkContext uses a user-specified * BroadcastFactory implementation to instantiate a particular broadcast for the * entire Spark job. */ +@DeveloperApi trait BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T] diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index af8ff39313187..cbddcb3c34f7d 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,12 +17,14 @@ package org.apache.spark.executor +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.storage.{BlockId, BlockStatus} /** - * Developer API + * :: DeveloperApi :: * Metrics tracked during the execution of a task. */ +@DeveloperApi class TaskMetrics extends Serializable { /** * Host's name the task runs on @@ -87,9 +89,10 @@ private[spark] object TaskMetrics { /** - * Developer API + * :: DeveloperApi :: * Metrics pertaining to shuffle data read in a given task. */ +@DeveloperApi class ShuffleReadMetrics extends Serializable { /** * Absolute time when this task finished reading shuffle data @@ -125,9 +128,10 @@ class ShuffleReadMetrics extends Serializable { } /** - * Developer API + * :: DeveloperApi :: * Metrics pertaining to shuffle data written in a given task. */ +@DeveloperApi class ShuffleWriteMetrics extends Serializable { /** * Number of bytes written for the shuffle by this task diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 0a8fa9f680e5d..a0df43bafc30f 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -23,9 +23,10 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf +import org.apache.spark.annotations.DeveloperApi /** - * Developer API + * :: DeveloperApi :: * CompressionCodec allows the customization of choosing different compression implementations * to be used in block storage. * @@ -33,6 +34,7 @@ import org.apache.spark.SparkConf * This is intended for use as an internal compression utility within a single * Spark application. */ +@DeveloperApi trait CompressionCodec { def compressedOutputStream(s: OutputStream): OutputStream @@ -57,13 +59,14 @@ private[spark] object CompressionCodec { /** - * Developer API + * :: DeveloperApi :: * LZF implementation of [[org.apache.spark.io.CompressionCodec]]. * * Note: The wire protocol for this codec is not guaranteed to be compatible across versions * of Spark. This is intended for use as an internal compression utility within a single Spark * application. */ +@DeveloperApi class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedOutputStream(s: OutputStream): OutputStream = { @@ -75,7 +78,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { /** - * Developer API + * :: DeveloperApi :: * Snappy implementation of [[org.apache.spark.io.CompressionCodec]]. * Block size can be configured by spark.io.compression.snappy.block.size. * @@ -83,6 +86,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { * of Spark. This is intended for use as an internal compression utility within a single Spark * application. */ +@DeveloperApi class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedOutputStream(s: OutputStream): OutputStream = { diff --git a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala index 55b979bba0c49..ad11fccaf20a2 100644 --- a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala +++ b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala @@ -17,10 +17,13 @@ package org.apache.spark.partial +import org.apache.spark.annotations.Experimental + /** - * Experimental + * :: Experimental :: * A Double value with error bars and associated confidence. */ +@Experimental class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) { override def toString(): String = "[%.3f, %.3f]".format(low, high) } diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala index 9fa4c12326c11..6d5f4051c11f4 100644 --- a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala +++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala @@ -17,9 +17,9 @@ package org.apache.spark.partial -/** - * Experimental - */ +import org.apache.spark.annotations.Experimental + +@Experimental class PartialResult[R](initialVal: R, isFinal: Boolean) { private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None private var failure: Option[Exception] = None diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 05eda70db4545..85ea8f42b39ee 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -24,12 +24,14 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.reflect.ClassTag import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} +import org.apache.spark.annotations.Experimental /** - * Experimental + * :: Experimental :: * A set of asynchronous RDD actions available through an implicit conversion. * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. */ +@Experimental class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging { /** diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 320a33f64ddd7..9f0ce8fe919c7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap} import org.apache.spark.serializer.Serializer @@ -51,7 +52,7 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep] } /** - * Developer API + * :: DeveloperApi :: * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a * tuple with the list of values for that key. * @@ -61,6 +62,7 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep] * @param rdds parent RDDs. * @param part partitioner used to partition the shuffle output */ +@DeveloperApi class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 8807e8ca9aa3b..1fad40bcf8242 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TaskID import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.NextIterator @@ -70,7 +71,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp } /** - * Developer API + * :: DeveloperApi :: * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, * sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`). * @@ -88,6 +89,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * @param valueClass Class of the value associated with the inputFormatClass. * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate. */ +@DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableWritable[Configuration]], diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 5c97965556ac7..bdfe9a41041e6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.annotations.DeveloperApi private[spark] class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) @@ -36,7 +37,7 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS } /** - * Developer API + * :: DeveloperApi :: * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`). * @@ -49,6 +50,7 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS * @param valueClass Class of the value associated with the inputFormatClass. * @param conf The Hadoop configuration. */ +@DeveloperApi class NewHadoopRDD[K, V]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala index efeb28da4f083..5a42ba6a9d05a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag import org.apache.spark.{NarrowDependency, Partition, TaskContext} +import org.apache.spark.annotations.DeveloperApi private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition) extends Partition { @@ -46,12 +47,13 @@ private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterF /** - * Developer API + * :: DeveloperApi :: * A RDD used to prune RDD partitions/partitions so we can avoid launching tasks on * all partitions. An example use case: If we know the RDD is partitioned by range, * and the execution DAG has a filter on the key, we can avoid launching tasks * on partitions that don't have the range covering the key. */ +@DeveloperApi class PartitionPruningRDD[T: ClassTag]( @transient prev: RDD[T], @transient partitionFilterFunc: Int => Boolean) @@ -65,9 +67,7 @@ class PartitionPruningRDD[T: ClassTag]( } -/** - * Developer API - */ +@DeveloperApi object PartitionPruningRDD { /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 67f2fe65cf6d2..e9af8b358acaa 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ import org.apache.spark.Partitioner._ import org.apache.spark.SparkContext._ +import org.apache.spark.annotations.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator @@ -87,28 +88,33 @@ abstract class RDD[T: ClassTag]( // ======================================================================= /** - * Developer API + * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. */ + @DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T] /** - * Developer API + * :: DeveloperApi :: * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ + @DeveloperApi protected def getPartitions: Array[Partition] /** - * Developer API + * :: DeveloperApi :: * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ + @DeveloperApi protected def getDependencies: Seq[Dependency[_]] = deps /** - * Developer API - * Optionally overridden by subclasses to specify placement preferences. */ + * :: DeveloperApi :: + * Optionally overridden by subclasses to specify placement preferences. + */ + @DeveloperApi protected def getPreferredLocations(split: Partition): Seq[String] = Nil /** Optionally overridden by subclasses to specify how they are partitioned. */ @@ -520,10 +526,11 @@ abstract class RDD[T: ClassTag]( } /** - * Developer API + * :: DeveloperApi :: * Return a new RDD by applying a function to each partition of this RDD. This is a variant of * mapPartitions that also passes the TaskContext into the closure. */ + @DeveloperApi def mapPartitionsWithContext[U: ClassTag]( f: (TaskContext, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { @@ -783,11 +790,11 @@ abstract class RDD[T: ClassTag]( def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum /** - * Experimental - * + * :: Experimental :: * Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. */ + @Experimental def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) => var result = 0L @@ -831,10 +838,10 @@ abstract class RDD[T: ClassTag]( } /** - * Experimental - * + * :: Experimental :: * Approximate version of countByValue(). */ + @Experimental def countByValueApprox( timeout: Long, confidence: Double = 0.95 @@ -855,7 +862,7 @@ abstract class RDD[T: ClassTag]( } /** - * Experimental + * :: Experimental :: * Return approximate number of distinct elements in the RDD. * * The accuracy of approximation can be controlled through the relative standard deviation @@ -863,6 +870,7 @@ abstract class RDD[T: ClassTag]( * more accurate counts but increase the memory footprint and vise versa. The default value of * relativeSD is 0.05. */ + @Experimental def countApproxDistinct(relativeSD: Double = 0.05): Long = { val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala index 679759539cfd9..3661fe37f6f53 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, Partitioner, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.serializer.Serializer private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { @@ -28,13 +29,14 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { } /** - * Developer API + * :: DeveloperApi :: * The resulting RDD from a shuffle (e.g. repartitioning of data). * @param prev the parent RDD. * @param part the partitioner used to partition the RDD * @tparam K the key class. * @tparam V the value class. */ +@DeveloperApi class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag]( @transient var prev: RDD[P], part: Partitioner) diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index c40961497a34c..0313a12867374 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} +import org.apache.spark.annotations.DeveloperApi private[spark] class UnionPartition[T: ClassTag](idx: Int, rdd: RDD[T], splitIndex: Int) extends Partition { @@ -43,7 +44,7 @@ private[spark] class UnionPartition[T: ClassTag](idx: Int, rdd: RDD[T], splitInd } } -/** Developer API */ +@DeveloperApi class UnionRDD[T: ClassTag]( sc: SparkContext, @transient var rdds: Seq[RDD[T]]) diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 79eda49c6fdf9..3fd2c4ff3b570 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -27,12 +27,14 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.util.ReflectionUtils import org.apache.spark.Logging +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.deploy.SparkHadoopUtil /** - * Developer API + * :: DeveloperApi :: * Parses and holds information about inputFormat (and files) specified as a parameter. */ +@DeveloperApi class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_], val path: String) extends Logging { diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index f85ac36597e30..30d898c991548 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -25,10 +25,11 @@ import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable.HashMap import org.apache.spark._ +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.executor.TaskMetrics /** - * Developer API + * :: DeveloperApi :: * A logger class to record runtime information for jobs in Spark. This class outputs one log file * for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass * of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext @@ -39,7 +40,7 @@ import org.apache.spark.executor.TaskMetrics * to log application information as SparkListenerEvents. To enable this functionality, set * spark.eventLog.enabled to true. */ - +@DeveloperApi @deprecated("Log application information by setting spark.eventLog.enabled.", "1.0.0") class JobLogger(val user: String, val logDirName: String) extends SparkListener with Logging { diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala index 1fb6196718445..dc41effd59950 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala @@ -17,13 +17,18 @@ package org.apache.spark.scheduler +import org.apache.spark.annotations.DeveloperApi + /** - * Developer API + * :: DeveloperApi :: * A result of a job in the DAGScheduler. */ +@DeveloperApi sealed trait JobResult +@DeveloperApi case object JobSucceeded extends JobResult // A failed stage ID of -1 means there is not a particular stage that caused the failure +@DeveloperApi case class JobFailed(exception: Exception, failedStageId: Int) extends JobResult diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index cbe8c25de5d55..50de7c2a9df8e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -23,27 +23,28 @@ import scala.collection.Map import scala.collection.mutable import org.apache.spark.{Logging, TaskEndReason} +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{Distribution, Utils} -/** Developer API */ +@DeveloperApi sealed trait SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerTaskEnd( stageId: Int, taskType: String, @@ -52,26 +53,26 @@ case class SparkListenerTaskEnd( taskMetrics: TaskMetrics) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]]) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) extends SparkListenerEvent -/** Developer API */ +@DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ @@ -79,10 +80,11 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent /** - * Developer API + * :: DeveloperApi :: * Interface for listening to events from the Spark scheduler. Note that this is an internal * interface which might change in different Spark releases. */ +@DeveloperApi trait SparkListener { /** * Called when a stage is completed, with information on the completed stage @@ -142,9 +144,10 @@ trait SparkListener { } /** - * Developer API + * :: DeveloperApi :: * Simple SparkListener that logs a few summary statistics when each stage completes */ +@DeveloperApi class StatsReportListener extends SparkListener with Logging { import org.apache.spark.scheduler.StatsReportListener._ diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala index 4f28bdd991d8c..e245ff8c25e0b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala @@ -19,9 +19,11 @@ package org.apache.spark.scheduler import collection.mutable.ArrayBuffer +import org.apache.spark.annotations.DeveloperApi + // information about a specific split instance : handles both split instances. // So that we do not need to worry about the differences. -/** Developer API */ +@DeveloperApi class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String, val length: Long, val underlyingSplit: Any) { override def toString(): String = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 419cd96376c04..e2e287666834b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -17,12 +17,14 @@ package org.apache.spark.scheduler +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.storage.RDDInfo /** - * Developer API + * :: DeveloperApi :: * Stores information about a stage to pass from the scheduler to SparkListeners. */ +@DeveloperApi class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 515755a93c6e6..a37ab5a338e4f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -17,10 +17,13 @@ package org.apache.spark.scheduler +import org.apache.spark.annotations.DeveloperApi + /** - * Developer API + * :: DeveloperApi :: * Information about a running task attempt inside a TaskSet. */ +@DeveloperApi class TaskInfo( val taskId: Long, val index: Int, diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala index d2d05a0b81bba..eaf736fb20ea1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala @@ -17,7 +17,9 @@ package org.apache.spark.scheduler -/** Developer API */ +import org.apache.spark.annotations.DeveloperApi + +@DeveloperApi object TaskLocality extends Enumeration { // Process local is expected to be used ONLY within TaskSetManager for now. val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index cf0576713f4ad..6c6463b1c8827 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -21,6 +21,7 @@ import java.io._ import java.nio.ByteBuffer import org.apache.spark.SparkConf +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.util.ByteBufferInputStream private[spark] class JavaSerializationStream(out: OutputStream, counterReset: Int) @@ -94,13 +95,14 @@ private[spark] class JavaSerializerInstance(counterReset: Int) extends Serialize } /** - * Developer API + * :: DeveloperApi :: * A Spark serializer that uses Java's built-in serialization. * * Note that this serializer is not guaranteed to be wire-compatible across different versions of * Spark. It is intended to be used to serialize/de-serialize data within a single * Spark application. */ +@DeveloperApi class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable { private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000) diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 015c4f62e110b..d404035646065 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -22,11 +22,12 @@ import java.nio.ByteBuffer import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -import org.apache.spark.util.{ByteBufferInputStream, NextIterator} import org.apache.spark.SparkEnv +import org.apache.spark.annotations.DeveloperApi +import org.apache.spark.util.{ByteBufferInputStream, NextIterator} /** - * Developer API + * :: DeveloperApi :: * A serializer. Because some serialization libraries are not thread safe, this class is used to * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual * serialization and are guaranteed to only be called from one thread at a time. @@ -41,6 +42,7 @@ import org.apache.spark.SparkEnv * Note that serializers are not required to be wire-compatible across different versions of Spark. * They are intended to be used to serialize/de-serialize data within a single Spark application. */ +@DeveloperApi trait Serializer { def newInstance(): SerializerInstance } @@ -54,9 +56,10 @@ object Serializer { /** - * Developer API + * :: DeveloperApi :: * An instance of a serializer, for use by one thread at a time. */ +@DeveloperApi trait SerializerInstance { def serialize[T](t: T): ByteBuffer @@ -87,9 +90,10 @@ trait SerializerInstance { /** - * Developer API + * :: DeveloperApi :: * A stream for writing serialized objects. */ +@DeveloperApi trait SerializationStream { def writeObject[T](t: T): SerializationStream def flush(): Unit @@ -105,9 +109,10 @@ trait SerializationStream { /** - * Developer API + * :: DeveloperApi :: * A stream for reading serialized objects. */ +@DeveloperApi trait DeserializationStream { def readObject[T](): T def close(): Unit diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 63be6917d5131..d2df92ea7655d 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -21,6 +21,7 @@ import scala.collection.Map import scala.collection.mutable import org.apache.spark.SparkContext +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.util.Utils private[spark] @@ -48,7 +49,7 @@ class StorageStatus( } -/** Developer API */ +@DeveloperApi class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel) extends Ordered[RDDInfo] { diff --git a/core/src/main/scala/org/apache/spark/util/MutablePair.scala b/core/src/main/scala/org/apache/spark/util/MutablePair.scala index 75ff7f68e51b7..a68859a1f7669 100644 --- a/core/src/main/scala/org/apache/spark/util/MutablePair.scala +++ b/core/src/main/scala/org/apache/spark/util/MutablePair.scala @@ -17,14 +17,17 @@ package org.apache.spark.util +import org.apache.spark.annotations.DeveloperApi + /** - * Developer API + * :: DeveloperApi :: * A tuple of 2 elements. This can be used as an alternative to Scala's Tuple2 when we want to * minimize object allocation. * * @param _1 Element 1 of this MutablePair * @param _2 Element 2 of this MutablePair */ +@DeveloperApi case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef */) T1, @specialized(Int, Long, Double, Char, Boolean/* , AnyRef */) T2] (var _1: T1, var _2: T2) diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 6ee1d96bbc894..f1fbfdd51896d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -19,8 +19,10 @@ package org.apache.spark.util.collection import java.util.{Arrays, Comparator} +import org.apache.spark.annotations.DeveloperApi + /** - * Developer API + * :: DeveloperApi :: * A simple open hash table optimized for the append-only use case, where keys * are never removed, but the value for each key may be changed. * @@ -30,6 +32,7 @@ import java.util.{Arrays, Comparator} * * TODO: Cache the hash values of each key? java.util.HashMap does that. */ +@DeveloperApi class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable { require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index f255b258889c0..abd3a6d61731b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -27,11 +27,12 @@ import com.google.common.io.ByteStreams import it.unimi.dsi.fastutil.io.FastBufferedInputStream import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.annotations.DeveloperApi import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BlockManager} /** - * Developer API + * :: DeveloperApi :: * An append-only map that spills sorted content to disk when there is insufficient space for it * to grow. * @@ -56,6 +57,7 @@ import org.apache.spark.storage.{BlockId, BlockManager} * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of * this threshold, in case map size estimation is not sufficiently accurate. */ +@DeveloperApi class ExternalAppendOnlyMap[K, V, C]( createCombiner: V => C, mergeValue: (C, V) => C, diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index 959fe44f5132a..73011361b0676 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -19,14 +19,17 @@ package org.apache.spark.util.collection import scala.reflect.ClassTag +import org.apache.spark.annotations.DeveloperApi + /** - * Developer API + * :: DeveloperApi :: * A fast hash map implementation for nullable keys. This hash map supports insertions and updates, * but not deletions. This map is about 5X faster than java.util.HashMap, while using much less * space overhead. * * Under the hood, it uses our OpenHashSet implementation. */ +@DeveloperApi class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag]( initialCapacity: Int) extends Iterable[(K, V)] diff --git a/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala b/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala index e8adc28c2acda..745c38ef72143 100644 --- a/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/Pseudorandom.scala @@ -17,10 +17,13 @@ package org.apache.spark.util.random +import org.apache.spark.annotations.DeveloperApi + /** - * Developer API + * :: DeveloperApi :: * A class with pseudorandom behavior. */ +@DeveloperApi trait Pseudorandom { /** Set random seed. */ def setSeed(seed: Long) diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index a999a607d1b31..5306b89fddc86 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -22,8 +22,10 @@ import java.util.Random import cern.jet.random.Poisson import cern.jet.random.engine.DRand +import org.apache.spark.annotations.DeveloperApi + /** - * Developer API + * :: DeveloperApi :: * A pseudorandom sampler. It is possible to change the sampled item type. For example, we might * want to add weights for stratified sampling or importance sampling. Should only use * transformations that are tied to the sampler and cannot be applied after sampling. @@ -31,6 +33,7 @@ import cern.jet.random.engine.DRand * @tparam T item type * @tparam U sampled item type */ +@DeveloperApi trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable { /** take a random sample */ @@ -41,7 +44,7 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable } /** - * Developer API + * :: DeveloperApi :: * A sampler based on Bernoulli trials. * * @param lb lower bound of the acceptance range @@ -49,6 +52,7 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @param complement whether to use the complement of the range specified, default to false * @tparam T item type */ +@DeveloperApi class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) (implicit random: Random = new XORShiftRandom) extends RandomSampler[T, T] { @@ -69,12 +73,13 @@ class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) } /** - * Developer API + * :: DeveloperApi :: * A sampler based on values drawn from Poisson distribution. * * @param poisson a Poisson random number generator * @tparam T item type */ +@DeveloperApi class PoissonSampler[T](mean: Double) (implicit var poisson: Poisson = new Poisson(mean, new DRand)) extends RandomSampler[T, T] { diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb index d4ec15bb7d6fa..bbd56d2fd13bb 100644 --- a/docs/_plugins/copy_api_dirs.rb +++ b/docs/_plugins/copy_api_dirs.rb @@ -51,15 +51,15 @@ puts "cp -r " + source + "/. " + dest cp_r(source + "/.", dest) + # Append custom JavaScript + js = File.readlines("./js/api-docs.js") + js_file = dest + "/lib/template.js" + File.open(js_file, 'a') { |f| f.write("\n" + js.join()) } + # Append custom CSS + css = File.readlines("./css/api-docs.css") css_file = dest + "/lib/template.css" - extra_css = [ - "", - "/* Styles added by spark jekyll plug-in */", - ".developer {background-color: #44751E; float: right; text-transform: uppercase;}", - ".experimental {background-color: #257080; float: right; text-transform: uppercase;}", - ].join("\n") - File.open(css_file, 'a') { |f| f.write(extra_css) } + File.open(css_file, 'a') { |f| f.write("\n" + css.join()) } end # Build Epydoc for Python diff --git a/docs/css/api-docs.css b/docs/css/api-docs.css new file mode 100644 index 0000000000000..cc5f37bbdb42e --- /dev/null +++ b/docs/css/api-docs.css @@ -0,0 +1,14 @@ +/* Dynamically injected style for the API docs */ + +.developer { + background-color: #44751E; +} + +.experimental { + background-color: #257080; +} + +.badge { + font-family: Arial, san-serif; + float: right; +} diff --git a/docs/js/api-docs.js b/docs/js/api-docs.js new file mode 100644 index 0000000000000..2c3ca434ec46c --- /dev/null +++ b/docs/js/api-docs.js @@ -0,0 +1,27 @@ +/* Dynamically injected post-processing code for the API docs */ + +$(document).ready(function() { + var annotations = $("dt:contains('Annotations')").next("dd").children("span.name"); + addBadges(annotations, "AlphaComponent", ":: AlphaComponent ::", "ALPHA COMPONENT"); + addBadges(annotations, "DeveloperApi", ":: DeveloperApi ::", "Developer API"); + addBadges(annotations, "Experimental", ":: Experimental ::", "Experimental"); +}); + +function addBadges(allAnnotations, name, tag, html) { + var fullName = "org.apache.spark.annotations." + name; + var annotations = allAnnotations.children("a[name='" + fullName + "']"); + var tags = $("p.comment:contains(" + tag + ")").add( + $("div.comment p:contains(" + tag + ")")); + + // Remove identifier tags from comments + tags.each(function(index) { + var oldHTML = $(this).html(); + var newHTML = oldHTML.replace(tag, ""); + $(this).html(newHTML); + }); + + // Add badges to all containers + tags.prevAll("h4.signature").prepend(html); + annotations.closest("div.fullcomment").prevAll("h4.signature").prepend(html); + annotations.closest("div.fullcommenttop").prepend(html); +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5cdae65e29721..b8e4867f81a7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -21,6 +21,7 @@ import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag import org.apache.spark.SparkContext +import org.apache.spark.annotations.{AlphaComponent, Experimental} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl @@ -31,13 +32,14 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ /** - * ALPHA COMPONENT + * :: AlphaComponent :: * The entry point for running relational queries using Spark. Allows the creation of [[SchemaRDD]] * objects and the execution of SQL queries. * * @groupname userf Spark SQL Functions * @groupname Ungrouped Support functions for language integrated queries. */ +@AlphaComponent class SQLContext(@transient val sparkContext: SparkContext) extends Logging with dsl.ExpressionConversions @@ -61,11 +63,12 @@ class SQLContext(@transient val sparkContext: SparkContext) new this.QueryExecution { val logical = plan } /** - * Experimental + * :: Experimental :: * Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan * interface is considered internal, and thus not guranteed to be stable. As a result, using * them directly is not reccomended. */ + @Experimental implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index b7ebce240578a..8eaddd5d0770e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.annotations.{AlphaComponent, Experimental} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ @@ -26,7 +27,7 @@ import org.apache.spark.sql.catalyst.types.BooleanType import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} /** - * ALPHA COMPONENT + * :: AlphaComponent :: * An RDD of [[Row]] objects that has an associated schema. In addition to standard RDD functions, * SchemaRDDs can be used in relational queries, as shown in the examples below. * @@ -89,6 +90,7 @@ import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} * @groupprio schema -1 * @groupname Ungrouped Base RDD Functions */ +@AlphaComponent class SchemaRDD( @transient val sqlContext: SQLContext, @transient val logicalPlan: LogicalPlan) @@ -240,7 +242,7 @@ class SchemaRDD( Filter(ScalaUdf(udf, BooleanType, Seq(UnresolvedAttribute(arg1.name))), logicalPlan)) /** - * Experimental + * :: Experimental :: * Filters tuples using a function over a `Dynamic` version of a given Row. DynamicRows use * scala's Dynamic trait to emulate an ORM of in a dynamically typed language. Since the type of * the column is not known at compile time, all attributes are converted to strings before @@ -252,17 +254,19 @@ class SchemaRDD( * * @group Query */ + @Experimental def where(dynamicUdf: (DynamicRow) => Boolean) = new SchemaRDD( sqlContext, Filter(ScalaUdf(dynamicUdf, BooleanType, Seq(WrapDynamic(logicalPlan.output))), logicalPlan)) /** - * Experimental + * :: Experimental :: * Returns a sampled version of the underlying dataset. * * @group Query */ + @Experimental def sample( fraction: Double, withReplacement: Boolean = true, @@ -270,7 +274,7 @@ class SchemaRDD( new SchemaRDD(sqlContext, Sample(fraction, withReplacement, seed, logicalPlan)) /** - * Experimental + * :: Experimental :: * Applies the given Generator, or table generating function, to this relation. * * @param generator A table generating function. The API for such functions is likely to change @@ -286,6 +290,7 @@ class SchemaRDD( * * @group Query */ + @Experimental def generate( generator: Generator, join: Boolean = false, @@ -294,7 +299,7 @@ class SchemaRDD( new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan)) /** - * Experimental + * :: Experimental :: * Adds the rows from this RDD to the specified table. Note in a standard [[SQLContext]] there is * no notion of persistent tables, and thus queries that contain this operator will fail to * optimize. When working with an extension of a SQLContext that has a persistent catalog, such @@ -302,6 +307,7 @@ class SchemaRDD( * * @group schema */ + @Experimental def insertInto(tableName: String, overwrite: Boolean = false) = new SchemaRDD( sqlContext,