diff --git a/assembly/pom.xml b/assembly/pom.xml index 54a25910ced7d..dcd9601fe4a90 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -30,6 +30,13 @@ Spark Project Assembly http://spark.incubator.apache.org/ + + ${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar + spark + /usr/share/spark + root + + @@ -79,7 +86,7 @@ maven-shade-plugin false - ${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar + ${spark.jar} *:* @@ -171,5 +178,112 @@ + + deb + + + + org.codehaus.mojo + buildnumber-maven-plugin + 1.1 + + + validate + + create + + + 8 + + + + + + org.vafer + jdeb + 0.11 + + + package + + jdeb + + + ${project.build.directory}/${deb.pkg.name}_${project.version}-${buildNumber}_all.deb + false + gzip + + + ${spark.jar} + file + + perm + ${deb.user} + ${deb.user} + ${deb.install.path}/jars + + + + ${basedir}/src/deb/RELEASE + file + + perm + ${deb.user} + ${deb.user} + ${deb.install.path} + + + + ${basedir}/../conf + directory + + perm + ${deb.user} + ${deb.user} + ${deb.install.path}/conf + 744 + + + + ${basedir}/../bin + directory + + perm + ${deb.user} + ${deb.user} + ${deb.install.path}/bin + 744 + + + + ${basedir}/../sbin + directory + + perm + ${deb.user} + ${deb.user} + ${deb.install.path}/sbin + 744 + + + + ${basedir}/../python + directory + + perm + ${deb.user} + ${deb.user} + ${deb.install.path}/python + 744 + + + + + + + + + + diff --git a/assembly/src/deb/RELEASE b/assembly/src/deb/RELEASE new file mode 100644 index 0000000000000..aad50ee73aa45 --- /dev/null +++ b/assembly/src/deb/RELEASE @@ -0,0 +1,2 @@ +compute-classpath.sh uses the existence of this file to decide whether to put the assembly jar on the +classpath or instead to use classfiles in the source tree. \ No newline at end of file diff --git a/repl-bin/src/deb/control/control b/assembly/src/deb/control/control similarity index 100% rename from repl-bin/src/deb/control/control rename to assembly/src/deb/control/control diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 0c823104215aa..278969655de48 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -39,6 +39,7 @@ if [ -f "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-dep CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/classes" DEPS_ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-deps.jar` @@ -59,6 +60,7 @@ if [[ $SPARK_TESTING == 1 ]]; then CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes" fi diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd old mode 100644 new mode 100755 index 460e6614766f8..80818c78ec24b --- a/bin/spark-class2.cmd +++ b/bin/spark-class2.cmd @@ -73,7 +73,7 @@ for %%d in ("%TOOLS_DIR%\target\scala-%SCALA_VERSION%\spark-tools*assembly*.jar" rem Compute classpath using external script set DONT_PRINT_CLASSPATH=1 -call "%FWDIR%sbin\compute-classpath.cmd" +call "%FWDIR%bin\compute-classpath.cmd" set DONT_PRINT_CLASSPATH=0 set CLASSPATH=%CLASSPATH%;%SPARK_TOOLS_JAR% diff --git a/bin/spark-shell.cmd b/bin/spark-shell.cmd old mode 100644 new mode 100755 index 23973e3e3dd43..99799128eb734 --- a/bin/spark-shell.cmd +++ b/bin/spark-shell.cmd @@ -18,6 +18,6 @@ rem limitations under the License. rem rem Find the path of sbin -set SBIN=%~dp0..\sbin\ +set BIN=%~dp0..\bin\ -cmd /V /E /C %SBIN%spark-class2.cmd org.apache.spark.repl.Main %* +cmd /V /E /C %BIN%spark-class2.cmd org.apache.spark.repl.Main %* diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index e89ac28b8eedf..df01b2e942180 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -17,17 +17,17 @@ package org.apache.spark -import java.io._ +import java.io.{ObjectInputStream, Serializable} import scala.collection.mutable.Map import scala.collection.generic.Growable import org.apache.spark.serializer.JavaSerializer /** - * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation, + * A data type that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. * - * You must define how to add data, and how to merge two of these together. For some datatypes, + * You must define how to add data, and how to merge two of these together. For some data types, * such as a counter, these might be the same operation. In that case, you can use the simpler * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are * accumulating a set. You will add items to the set, and you will union two sets together. @@ -45,7 +45,7 @@ class Accumulable[R, T] ( val id = Accumulators.newId @transient private var value_ = initialValue // Current value on master val zero = param.zero(initialValue) // Zero value to be passed to workers - var deserialized = false + private var deserialized = false Accumulators.register(this, true) @@ -127,7 +127,7 @@ class Accumulable[R, T] ( /** * Helper object defining how to accumulate values of a particular type. An implicit - * AccumulableParam needs to be available when you create Accumulables of a specific type. + * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. * * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in @@ -185,8 +185,30 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser } /** - * A simpler value of [[org.apache.spark.Accumulable]] where the result type being accumulated is the same - * as the types of elements being merged. + * A simpler value of [[Accumulable]] where the result type being accumulated is the same + * as the types of elements being merged, i.e. variables that are only "added" to through an + * associative operation and can therefore be efficiently supported in parallel. They can be used + * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of type + * `Int` and `Double`, and programmers can add support for new types. + * + * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. + * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. + * However, they cannot read its value. Only the driver program can read the accumulator's value, + * using its value method. + * + * The interpreter session below shows an accumulator being used to add up the elements of an array: + * + * {{{ + * scala> val accum = sc.accumulator(0) + * accum: spark.Accumulator[Int] = 0 + * + * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) + * ... + * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s + * + * scala> accum.value + * res2: Int = 10 + * }}} * * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `T` @@ -196,9 +218,9 @@ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param) /** - * A simpler version of [[org.apache.spark.AccumulableParam]] where the only datatype you can add in is the same type - * as the accumulated value. An implicit AccumulatorParam object needs to be available when you create - * Accumulators of a specific type. + * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add + * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be + * available when you create Accumulators of a specific type. * * @tparam T type of value to accumulate */ diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 8b30cd4bfe69d..c4579cf6ad560 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -17,6 +17,8 @@ package org.apache.spark +import scala.{Option, deprecated} + import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap} /** @@ -31,10 +33,14 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - private val sparkConf = SparkEnv.get.conf - private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) + private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) + + @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = + combineValuesByKey(iter, null) - def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], + context: TaskContext): Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null @@ -47,17 +53,23 @@ case class Aggregator[K, V, C] ( } combiners.iterator } else { - val combiners = - new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) + val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) while (iter.hasNext) { val (k, v) = iter.next() combiners.insert(k, v) } + // TODO: Make this non optional in a future release + Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) + Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) combiners.iterator } } - def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { + @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0") + def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = + combineCombinersByKey(iter, null) + + def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kc: Product2[K, C] = null @@ -75,6 +87,9 @@ case class Aggregator[K, V, C] ( val (k, c) = iter.next() combiners.insert(k, c) } + // TODO: Make this non optional in a future release + Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) + Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) combiners.iterator } } diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index c6b4ac5192d14..d7d10285dadcb 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -27,8 +27,8 @@ import org.apache.spark.rdd.RDD /** - * A future for the result of an action. This is an extension of the Scala Future interface to - * support cancellation. + * A future for the result of an action to support cancellation. This is an extension of the + * Scala Future interface to support cancellation. */ trait FutureAction[T] extends Future[T] { // Note that we redefine methods of the Future trait here explicitly so we can specify a different @@ -86,7 +86,7 @@ trait FutureAction[T] extends Future[T] { /** - * The future holding the result of an action that triggers a single job. Examples include + * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. */ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) @@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: /** - * A FutureAction for actions that could trigger multiple Spark jobs. Examples include take, + * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take, * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala index 56e0b8d2c0b9b..9b1601d5b95fa 100644 --- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala +++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala @@ -19,7 +19,7 @@ package org.apache.spark /** * An iterator that wraps around an existing iterator to provide task killing functionality. - * It works by checking the interrupted flag in TaskContext. + * It works by checking the interrupted flag in [[TaskContext]]. */ class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) extends Iterator[T] { diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index 9063cae87e140..b749e5414dab6 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -122,7 +122,7 @@ trait Logging { } } -object Logging { +private object Logging { @volatile private var initialized = false val initLock = new Object() } diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 2de32231e8714..951bfd79d0d6a 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -1,20 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap -import com.typesafe.config.ConfigFactory +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. * * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load - * values from both the `spark.*` Java system properties and any `spark.conf` on your application's - * classpath (if it has one). In this case, system properties take priority over `spark.conf`, and - * any parameters you set directly on the `SparkConf` object take priority over both of those. + * values from any `spark.*` Java system properties set in your application as well. In this case, + * parameters you set directly on the `SparkConf` object take priority over system properties. * * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and - * get the same configuration no matter what is on the classpath. + * get the same configuration no matter what the system properties are. * * All setter methods in this class support chaining. For example, you can write * `new SparkConf().setMaster("local").setAppName("My app")`. @@ -22,9 +38,9 @@ import com.typesafe.config.ConfigFactory * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified * by the user. Spark does not support modifying the configuration at runtime. * - * @param loadDefaults whether to load values from the system properties and classpath + * @param loadDefaults whether to also load values from Java system properties */ -class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging { +class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) @@ -32,11 +48,9 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with private val settings = new HashMap[String, String]() if (loadDefaults) { - ConfigFactory.invalidateCaches() - val typesafeConfig = ConfigFactory.systemProperties() - .withFallback(ConfigFactory.parseResources("spark.conf")) - for (e <- typesafeConfig.entrySet().asScala if e.getKey.startsWith("spark.")) { - settings(e.getKey) = e.getValue.unwrapped.toString + // Load any spark.* system properties + for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) { + settings(k) = v } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 55ac76bf63909..923b4ed68839c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -340,8 +340,8 @@ class SparkContext( * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { - hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) - .map(pair => pair._2.toString) + hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], + minSplits, cloneRecords = false).map(pair => pair._2.toString) } /** @@ -708,8 +708,11 @@ class SparkContext( env.httpFileServer.addJar(new File(fileName)) } catch { case e: Exception => { + // For now just log an error but allow to go through so spark examples work. + // The spark examples don't really need the jar distributed since its also + // the app jar. logError("Error adding jar (" + e + "), was the --addJars option used?") - throw e + null } } } else { @@ -722,8 +725,10 @@ class SparkContext( path } } - addedJars(key) = System.currentTimeMillis - logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) + if (key != null) { + addedJars(key) = System.currentTimeMillis + logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) + } } } @@ -956,6 +961,8 @@ class SparkContext( } } + def getCheckpointDir = checkpointDir + /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ def defaultParallelism: Int = taskScheduler.defaultParallelism @@ -1125,7 +1132,7 @@ object SparkContext { if (sparkHome != null) { res.setSparkHome(sparkHome) } - if (!jars.isEmpty) { + if (jars != null && !jars.isEmpty) { res.setJars(jars) } res.setExecutorEnv(environment.toSeq) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 7a6f044965027..50ac700823fba 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -34,11 +34,11 @@ import org.apache.spark.SparkContext.IntAccumulatorParam import org.apache.spark.SparkContext.DoubleAccumulatorParam import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import scala.Tuple2 + /** - * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns [[org.apache.spark.api.java.JavaRDD]]s and - * works with Java collections instead of Scala ones. + * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns + * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. */ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround { /** @@ -137,7 +137,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork */ def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits) - /**Get an RDD for a Hadoop SequenceFile with given key and value types. */ + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -148,7 +148,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits)) } - /**Get an RDD for a Hadoop SequenceFile. */ + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ + def sequenceFile[K, V](path: String, + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int, + cloneRecords: Boolean + ): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits, cloneRecords)) + } + + /** Get an RDD for a Hadoop SequenceFile. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): JavaPairRDD[K, V] = { implicit val kcm: ClassTag[K] = ClassTag(keyClass) @@ -156,6 +168,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass)) } + /** Get an RDD for a Hadoop SequenceFile. */ + def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], + cloneRecords: Boolean): + JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, cloneRecords)) + } + /** * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and * BytesWritable values that contain a serialized partition. This is still an experimental storage @@ -197,6 +218,37 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits)) } + + /** + * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other + * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable), + * using the older MapReduce API (`org.apache.hadoop.mapred`). + * + * @param conf JobConf for setting up the dataset + * @param inputFormatClass Class of the [[InputFormat]] + * @param keyClass Class of the keys + * @param valueClass Class of the values + * @param minSplits Minimum number of Hadoop Splits to generate. + * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader. + * Most RecordReader implementations reuse wrapper objects across multiple + * records, and can cause problems in RDD collect or aggregation operations. + * By default the records are cloned in Spark. However, application + * programmers can explicitly disable the cloning for better performance. + */ + def hadoopRDD[K, V, F <: InputFormat[K, V]]( + conf: JobConf, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int, + cloneRecords: Boolean + ): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits, + cloneRecords)) + } + /** * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, @@ -226,6 +278,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)) } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ + def hadoopFile[K, V, F <: InputFormat[K, V]]( + path: String, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int, + cloneRecords: Boolean + ): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, + minSplits, cloneRecords)) + } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ def hadoopFile[K, V, F <: InputFormat[K, V]]( path: String, @@ -239,6 +306,20 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork inputFormatClass, keyClass, valueClass)) } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ + def hadoopFile[K, V, F <: InputFormat[K, V]]( + path: String, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V], + cloneRecords: Boolean + ): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(keyClass) + implicit val vcm: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.hadoopFile(path, + inputFormatClass, keyClass, valueClass, cloneRecords = cloneRecords)) + } + /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. @@ -254,6 +335,22 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)) } + /** + * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat + * and extra configuration options to pass to the input format. + */ + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( + path: String, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V], + conf: Configuration, + cloneRecords: Boolean): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(kClass) + implicit val vcm: ClassTag[V] = ClassTag(vClass) + new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf, cloneRecords)) + } + /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. @@ -268,6 +365,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)) } + /** + * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat + * and extra configuration options to pass to the input format. + */ + def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( + conf: Configuration, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V], + cloneRecords: Boolean): JavaPairRDD[K, V] = { + implicit val kcm: ClassTag[K] = ClassTag(kClass) + implicit val vcm: ClassTag[V] = ClassTag(vClass) + new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass, cloneRecords)) + } + /** Build the union of two or more RDDs. */ override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) @@ -333,8 +445,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork sc.accumulable(initialValue)(param) /** - * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.Broadcast]] object for - * reading it in distributed functions. The variable will be sent to each cluster only once. + * Broadcast a read-only variable to the cluster, returning a + * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. + * The variable will be sent to each cluster only once. */ def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value) @@ -400,6 +513,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork sc.setCheckpointDir(dir) } + def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir) + protected def checkpointFile[T](path: String): JavaRDD[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] diff --git a/core/src/main/scala/org/apache/spark/api/java/package.scala b/core/src/main/scala/org/apache/spark/api/java/package.scala new file mode 100644 index 0000000000000..8ec770046abe9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api + +/** Spark Java programming APIs. */ +package object java { + // For package docs only +} diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index 0fc478a41967c..d113d4040594d 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -17,12 +17,40 @@ package org.apache.spark.broadcast -import java.io._ +import java.io.Serializable import java.util.concurrent.atomic.AtomicLong import org.apache.spark._ -abstract class Broadcast[T](private[spark] val id: Long) extends Serializable { +/** + * A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable + * cached on each machine rather than shipping a copy of it with tasks. They can be used, for + * example, to give every node a copy of a large input dataset in an efficient manner. Spark also + * attempts to distribute broadcast variables using efficient broadcast algorithms to reduce + * communication cost. + * + * Broadcast variables are created from a variable `v` by calling [[SparkContext#broadcast]]. + * The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the + * `value` method. The interpreter session below shows this: + * + * {{{ + * scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) + * broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) + * + * scala> broadcastVar.value + * res0: Array[Int] = Array(1, 2, 3) + * }}} + * + * After the broadcast variable is created, it should be used instead of the value `v` in any + * functions run on the cluster so that `v` is not shipped to the nodes more than once. + * In addition, the object `v` should not be modified after it is broadcast in order to ensure + * that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped + * to a new node later). + * + * @param id A unique identifier for the broadcast variable. + * @tparam T Type of the data contained in the broadcast variable. + */ +abstract class Broadcast[T](val id: Long) extends Serializable { def value: T // We cannot have an abstract readObject here due to some weird issues with diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index fb161ce69d40b..940e5ab805100 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkConf * BroadcastFactory implementation to instantiate a particular broadcast for the * entire Spark job. */ -private[spark] trait BroadcastFactory { +trait BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf): Unit def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T] def stop(): Unit diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 0eacda3d7dc2b..39ee0dbb92841 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -63,7 +63,10 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea } } -private[spark] class HttpBroadcastFactory extends BroadcastFactory { +/** + * A [[BroadcastFactory]] implementation that uses a HTTP server as the broadcast medium. + */ +class HttpBroadcastFactory extends BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) } def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) = diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 1d295c62bcb6c..d351dfc1f56a2 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -236,8 +236,10 @@ private[spark] case class TorrentInfo( @transient var hasBlocks = 0 } -private[spark] class TorrentBroadcastFactory - extends BroadcastFactory { +/** + * A [[BroadcastFactory]] that creates a torrent-based implementation of broadcast. + */ +class TorrentBroadcastFactory extends BroadcastFactory { def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) } diff --git a/core/src/main/scala/org/apache/spark/broadcast/package.scala b/core/src/main/scala/org/apache/spark/broadcast/package.scala new file mode 100644 index 0000000000000..01bf88629a7dd --- /dev/null +++ b/core/src/main/scala/org/apache/spark/broadcast/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Package for broadcast variables. See [[broadcast.Broadcast]] for details. + */ +package object broadcast { + // For package docs only +} diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index e133893f6ca5b..9987e2300ceb7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -29,13 +29,12 @@ import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.util.{AkkaUtils, Utils} -import akka.actor.Actor.emptyBehavior import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} /** * Proxy that relays messages to the driver. */ -class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging { +private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging { var masterActor: ActorSelection = _ val timeout = AkkaUtils.askTimeout(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 27dc42bf7e50e..b479225b45ee9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -28,7 +28,6 @@ import org.apache.spark.{SparkContext, SparkException} /** * Contains util methods to interact with Hadoop from Spark. */ -private[spark] class SparkHadoopUtil { val conf = newConfiguration() UserGroupInformation.setConfiguration(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 7507bf8ad0e6c..460883ec7ae24 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker import java.io.{File, FileOutputStream, IOException, InputStream} @@ -10,8 +27,9 @@ import org.apache.spark.util.Utils /** ** Utilities for running commands with the spark classpath. */ +private[spark] object CommandUtils extends Logging { - private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = { + def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = { val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java") // SPARK-698: do not call the run.cmd script, as process.destroy() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 1640d5fee0f77..6f6c101547c3c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker import akka.actor._ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 5182dcbb2abfd..312560d7063a4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -209,8 +209,11 @@ private[spark] class Worker( logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + // TODO (pwendell): We shuld make sparkHome an Option[String] in + // ApplicationDescription to be more explicit about this. + val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(execSparkHome_), workDir, akkaUrl, ExecutorState.RUNNING) + self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala index 0e0d0cd6264cf..1dc39c450ea16 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker import akka.actor.{Actor, Address, AddressFromURIString} diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7f31d7e6f8aec..c1b57f74d7e9a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -279,7 +279,7 @@ private[spark] class Executor( //System.exit(1) } } finally { - // TODO: Unregister shuffle memory only for ShuffleMapTask + // TODO: Unregister shuffle memory only for ResultTask val shuffleMemoryMap = env.shuffleMemoryMap shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(Thread.currentThread().getId) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index bb1471d9ee16a..0c8f4662a5f3a 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -48,6 +48,16 @@ class TaskMetrics extends Serializable { */ var resultSerializationTime: Long = _ + /** + * The number of in-memory bytes spilled by this task + */ + var memoryBytesSpilled: Long = _ + + /** + * The number of on-disk bytes spilled by this task + */ + var diskBytesSpilled: Long = _ + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 70a5a8caff839..2625a7f6a575a 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -29,6 +29,9 @@ package org.apache * be saved as SequenceFiles. These operations are automatically available on any RDD of the right * type (e.g. RDD[(Int, Int)] through implicit conversions when you * `import org.apache.spark.SparkContext._`. + * + * Java programmers should reference the [[spark.api.java]] package + * for Spark programming APIs in Java. */ package object spark { // For package docs only diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index a73714abcaf72..0e47f2e022610 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -66,7 +66,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Seq[CoGroup] - private val sparkConf = SparkEnv.get.conf private var serializerClass: String = null def setSerializer(cls: String): CoGroupedRDD[K] = { @@ -106,7 +105,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { - val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) + val sparkConf = SparkEnv.get.conf + val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size @@ -150,6 +150,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: map.insert(kv._1, new CoGroupValue(kv._2, depNum)) } } + context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled new InterruptibleIterator(context, map.iterator) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 5cdb80be1ddd8..dbe76f34316ae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -78,7 +78,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( keyClass: Class[K], valueClass: Class[V], minSplits: Int, - cloneRecords: Boolean) + cloneRecords: Boolean = true) extends RDD[(K, V)](sc, Nil) with Logging { def this( diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 1248409e3513a..4148581f527fe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -88,20 +88,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) + new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { - val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) + val combined = self.mapPartitionsWithContext((context, iter) => { + aggregator.combineValuesByKey(iter, context) + }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter)) + new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { - new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) + new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } } @@ -286,7 +288,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } - new ShuffledRDD[K, V, (K, V)](self, partitioner) + if (self.partitioner == partitioner) self else new ShuffledRDD[K, V, (K, V)](self, partitioner) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index d4f396afb5d2b..8ef919c4b58cb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -27,7 +27,6 @@ import scala.io.Source import scala.reflect.ClassTag import org.apache.spark.{SparkEnv, Partition, TaskContext} -import org.apache.spark.broadcast.Broadcast /** @@ -113,7 +112,7 @@ class PipedRDD[T: ClassTag]( } } -object PipedRDD { +private object PipedRDD { // Split a string into words using a standard StringTokenizer def tokenize(command: String): Seq[String] = { val buf = new ArrayBuffer[String] diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index edd4f381db50c..cd90a1561a975 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -548,6 +548,11 @@ abstract class RDD[T: ClassTag]( * *same number of partitions*, but does *not* require them to have the same number * of elements in each partition. */ + def zipPartitions[B: ClassTag, V: ClassTag] + (rdd2: RDD[B], preservesPartitioning: Boolean) + (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = + new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) + def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 55a40a92c9652..d8e97c3b7c7b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.util.Properties import org.apache.spark.util.{Utils, Distribution} -import org.apache.spark.{Logging, SparkContext, TaskEndReason} +import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.executor.TaskMetrics sealed trait SparkListenerEvents @@ -27,7 +27,7 @@ sealed trait SparkListenerEvents case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties) extends SparkListenerEvents -case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents +case class SparkListenerStageCompleted(stage: StageInfo) extends SparkListenerEvents case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents @@ -46,6 +46,9 @@ case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents +/** + * Interface for listening to events from the Spark scheduler. + */ trait SparkListener { /** * Called when a stage is completed, with information on the completed stage @@ -115,7 +118,7 @@ class StatsReportListener extends SparkListener with Logging { } -object StatsReportListener extends Logging { +private[spark] object StatsReportListener extends Logging { //for profiling, the extremes are more interesting val percentiles = Array[Int](0,5,10,25,50,75,90,95,100) @@ -202,9 +205,9 @@ object StatsReportListener extends Logging { } } +private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) -case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) -object RuntimePercentage { +private object RuntimePercentage { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { val denom = totalTime.toDouble val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d4f74d3e18543..6cc608ea5bc69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -352,9 +352,8 @@ private[spark] class TaskSchedulerImpl( taskResultGetter.stop() } - // sleeping for an arbitrary 5 seconds : to ensure that messages are sent out. - // TODO: Do something better ! - Thread.sleep(5000L) + // sleeping for an arbitrary 1 seconds to ensure that messages are sent out. + Thread.sleep(1000L) } override def defaultParallelism() = backend.defaultParallelism() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index fc0ee070897dd..5ad00a1ed1e10 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -629,7 +629,7 @@ private[spark] class TaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - handleFailedTask(tid, TaskState.KILLED, None) + handleFailedTask(tid, TaskState.FAILED, None) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6f1345c57a295..ed53558566edf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -80,6 +80,8 @@ private[spark] class BlockManager( val compressShuffle = conf.getBoolean("spark.shuffle.compress", true) // Whether to compress RDD partitions that are stored serialized val compressRdds = conf.getBoolean("spark.rdd.compress", false) + // Whether to compress shuffle output temporarily spilled to disk + val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) @@ -790,6 +792,7 @@ private[spark] class BlockManager( case ShuffleBlockId(_, _, _) => compressShuffle case BroadcastBlockId(_) => compressBroadcast case RDDBlockId(_, _) => compressRdds + case TempBlockId(_) => compressShuffleSpill case _ => false } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 369a277232b19..48cec4be4111c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer} * * This interface does not support concurrent writes. */ -abstract class BlockObjectWriter(val blockId: BlockId) { +private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { def open(): BlockObjectWriter @@ -69,7 +69,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) { } /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ -class DiskBlockObjectWriter( +private[spark] class DiskBlockObjectWriter( blockId: BlockId, file: File, serializer: Serializer, diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 0f84810d6be06..1b7934d59fa1d 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -108,6 +108,10 @@ class StorageLevel private( } +/** + * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating + * new storage levels. + */ object StorageLevel { val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 3c53e88380193..64e22a30b48f9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -24,4 +24,6 @@ private[spark] class ExecutorSummary { var succeededTasks : Int = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 + var memoryBytesSpilled : Long = 0 + var diskBytesSpilled : Long = 0 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 0dd876480afa0..ab03eb5ce1ab4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -48,6 +48,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) Succeeded Tasks Shuffle Read Shuffle Write + Shuffle Spill (Memory) + Shuffle Spill (Disk) {createExecutorTable()} @@ -80,6 +82,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {v.succeededTasks} {Utils.bytesToString(v.shuffleRead)} {Utils.bytesToString(v.shuffleWrite)} + {Utils.bytesToString(v.memoryBytesSpilled)} + {Utils.bytesToString(v.diskBytesSpilled)} } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index bcd282445050d..858a10ce750ff 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -52,6 +52,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTime = HashMap[Int, Long]() val stageIdToShuffleRead = HashMap[Int, Long]() val stageIdToShuffleWrite = HashMap[Int, Long]() + val stageIdToMemoryBytesSpilled = HashMap[Int, Long]() + val stageIdToDiskBytesSpilled = HashMap[Int, Long]() val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]() val stageIdToTasksComplete = HashMap[Int, Int]() val stageIdToTasksFailed = HashMap[Int, Int]() @@ -78,6 +80,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageIdToTime.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) stageIdToShuffleWrite.remove(s.stageId) + stageIdToMemoryBytesSpilled.remove(s.stageId) + stageIdToDiskBytesSpilled.remove(s.stageId) stageIdToTasksActive.remove(s.stageId) stageIdToTasksComplete.remove(s.stageId) stageIdToTasksFailed.remove(s.stageId) @@ -149,6 +153,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList Option(taskEnd.taskMetrics).foreach { taskMetrics => taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } + y.memoryBytesSpilled += taskMetrics.memoryBytesSpilled + y.diskBytesSpilled += taskMetrics.diskBytesSpilled } } case _ => {} @@ -184,6 +190,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageIdToShuffleWrite(sid) += shuffleWrite totalShuffleWrite += shuffleWrite + stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) + val memoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled).getOrElse(0L) + stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled + + stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L) + val diskBytesSpilled = metrics.map(m => m.diskBytesSpilled).getOrElse(0L) + stageIdToDiskBytesSpilled(sid) += diskBytesSpilled + val taskList = stageIdToTaskInfos.getOrElse( sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList -= ((taskEnd.taskInfo, None, None)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index d1e58016beaac..cfaf121895ec2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -56,6 +56,9 @@ private[spark] class StagePage(parent: JobProgressUI) { val hasShuffleRead = shuffleReadBytes > 0 val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) val hasShuffleWrite = shuffleWriteBytes > 0 + val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L) + val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L) + val hasBytesSpilled = (memoryBytesSpilled > 0 && diskBytesSpilled > 0) var activeTime = 0L listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) @@ -81,6 +84,16 @@ private[spark] class StagePage(parent: JobProgressUI) { {Utils.bytesToString(shuffleWriteBytes)} } + {if (hasBytesSpilled) +
  • + Shuffle spill (memory): + {Utils.bytesToString(memoryBytesSpilled)} +
  • +
  • + Shuffle spill (disk): + {Utils.bytesToString(diskBytesSpilled)} +
  • + } @@ -89,9 +102,10 @@ private[spark] class StagePage(parent: JobProgressUI) { Seq("Duration", "GC Time", "Result Ser Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ + {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ Seq("Errors") - val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks) + val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined)) @@ -153,13 +167,29 @@ private[spark] class StagePage(parent: JobProgressUI) { } val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) + val memoryBytesSpilledSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.memoryBytesSpilled.toDouble + } + val memoryBytesSpilledQuantiles = "Shuffle spill (memory)" +: + getQuantileCols(memoryBytesSpilledSizes) + + val diskBytesSpilledSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.diskBytesSpilled.toDouble + } + val diskBytesSpilledQuantiles = "Shuffle spill (disk)" +: + getQuantileCols(diskBytesSpilledSizes) + val listings: Seq[Seq[String]] = Seq( serializationQuantiles, serviceQuantiles, gettingResultQuantiles, schedulerDelayQuantiles, if (hasShuffleRead) shuffleReadQuantiles else Nil, - if (hasShuffleWrite) shuffleWriteQuantiles else Nil) + if (hasShuffleWrite) shuffleWriteQuantiles else Nil, + if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil, + if (hasBytesSpilled) diskBytesSpilledQuantiles else Nil) val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") @@ -178,8 +208,7 @@ private[spark] class StagePage(parent: JobProgressUI) { } } - - def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean) + def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = { def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = trace.map(e => {e.toString}) @@ -205,6 +234,14 @@ private[spark] class StagePage(parent: JobProgressUI) { val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms => if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("") + val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled} + val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") + val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") + + val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled} + val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") + val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") + {info.index} {info.taskId} @@ -234,6 +271,14 @@ private[spark] class StagePage(parent: JobProgressUI) { {shuffleWriteReadable} }} + {if (bytesSpilled) { + + {memoryBytesSpilledReadable} + + + {diskBytesSpilledReadable} + + }} {exception.map(e => {e.className} ({e.description})
    diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala index dc15a38b29d70..fcc1ca9502aa1 100644 --- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala @@ -18,14 +18,15 @@ package org.apache.spark.util /** - * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements + * Wrapper around an iterator which calls a completion method after it successfully iterates + * through all the elements. */ -abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ - def next = sub.next +private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ + def next() = sub.next() def hasNext = { val r = sub.hasNext if (!r) { - completion + completion() } r } @@ -33,7 +34,7 @@ abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterato def completion() } -object CompletionIterator { +private[spark] object CompletionIterator { def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = { new CompletionIterator[A,I](sub) { def completion() = completionFunction diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index ac07a55cb9101..b0febe906ade3 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -18,13 +18,13 @@ package org.apache.spark.util import java.util.{TimerTask, Timer} -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{SparkConf, Logging} /** * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries) */ -class MetadataCleaner( +private[spark] class MetadataCleaner( cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit, conf: SparkConf) @@ -60,7 +60,7 @@ class MetadataCleaner( } } -object MetadataCleanerType extends Enumeration { +private[spark] object MetadataCleanerType extends Enumeration { val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value @@ -72,7 +72,7 @@ object MetadataCleanerType extends Enumeration { // TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the // initialization of StreamingContext. It's okay for users trying to configure stuff themselves. -object MetadataCleaner { +private[spark] object MetadataCleaner { def getDelaySeconds(conf: SparkConf) = { conf.getInt("spark.cleaner.ttl", -1) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index a1a452315d143..856eb772a1084 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -22,10 +22,72 @@ package org.apache.spark.util.collection * A simple, fixed-size bit set implementation. This implementation is fast because it avoids * safety/bound checking. */ -class BitSet(numBits: Int) { +class BitSet(numBits: Int) extends Serializable { - private[this] val words = new Array[Long](bit2words(numBits)) - private[this] val numWords = words.length + private val words = new Array[Long](bit2words(numBits)) + private val numWords = words.length + + /** + * Compute the capacity (number of bits) that can be represented + * by this bitset. + */ + def capacity: Int = numWords * 64 + + /** + * Set all the bits up to a given index + */ + def setUntil(bitIndex: Int) { + val wordIndex = bitIndex >> 6 // divide by 64 + var i = 0 + while(i < wordIndex) { words(i) = -1; i += 1 } + if(wordIndex < words.size) { + // Set the remaining bits (note that the mask could still be zero) + val mask = ~(-1L << (bitIndex & 0x3f)) + words(wordIndex) |= mask + } + } + + /** + * Compute the bit-wise AND of the two sets returning the + * result. + */ + def &(other: BitSet): BitSet = { + val newBS = new BitSet(math.max(capacity, other.capacity)) + val smaller = math.min(numWords, other.numWords) + assert(newBS.numWords >= numWords) + assert(newBS.numWords >= other.numWords) + var ind = 0 + while( ind < smaller ) { + newBS.words(ind) = words(ind) & other.words(ind) + ind += 1 + } + newBS + } + + /** + * Compute the bit-wise OR of the two sets returning the + * result. + */ + def |(other: BitSet): BitSet = { + val newBS = new BitSet(math.max(capacity, other.capacity)) + assert(newBS.numWords >= numWords) + assert(newBS.numWords >= other.numWords) + val smaller = math.min(numWords, other.numWords) + var ind = 0 + while( ind < smaller ) { + newBS.words(ind) = words(ind) | other.words(ind) + ind += 1 + } + while( ind < numWords ) { + newBS.words(ind) = words(ind) + ind += 1 + } + while( ind < other.numWords ) { + newBS.words(ind) = other.words(ind) + ind += 1 + } + newBS + } /** * Sets the bit at the specified index to true. @@ -36,6 +98,11 @@ class BitSet(numBits: Int) { words(index >> 6) |= bitmask // div by 64 and mask } + def unset(index: Int) { + val bitmask = 1L << (index & 0x3f) // mod 64 and shift + words(index >> 6) &= ~bitmask // div by 64 and mask + } + /** * Return the value of the bit with the specified index. The value is true if the bit with * the index is currently set in this BitSet; otherwise, the result is false. @@ -48,6 +115,20 @@ class BitSet(numBits: Int) { (words(index >> 6) & bitmask) != 0 // div by 64 and mask } + /** + * Get an iterator over the set bits. + */ + def iterator = new Iterator[Int] { + var ind = nextSetBit(0) + override def hasNext: Boolean = ind >= 0 + override def next() = { + val tmp = ind + ind = nextSetBit(ind+1) + tmp + } + } + + /** Return the number of bits set to true in this BitSet. */ def cardinality(): Int = { var sum = 0 diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index e3bcd895aa28f..64e9b436f04a2 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -26,8 +26,8 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.serializer.Serializer -import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter} +import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer} +import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter} /** * An append-only map that spills sorted content to disk when there is insufficient space for it @@ -60,7 +60,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, serializer: Serializer = SparkEnv.get.serializerManager.default, - diskBlockManager: DiskBlockManager = SparkEnv.get.blockManager.diskBlockManager) + blockManager: BlockManager = SparkEnv.get.blockManager) extends Iterable[(K, C)] with Serializable with Logging { import ExternalAppendOnlyMap._ @@ -68,6 +68,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] private val sparkConf = SparkEnv.get.conf + private val diskBlockManager = blockManager.diskBlockManager // Collective memory threshold shared across all running tasks private val maxMemoryThreshold = { @@ -77,14 +78,26 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } // Number of pairs in the in-memory map - private var numPairsInMemory = 0 + private var numPairsInMemory = 0L // Number of in-memory pairs inserted before tracking the map's shuffle memory usage private val trackMemoryThreshold = 1000 + // Size of object batches when reading/writing from serializers. Objects are written in + // batches, with each batch using its own serialization stream. This cuts down on the size + // of reference-tracking maps constructed when deserializing a stream. + // + // NOTE: Setting this too low can cause excess copying when serializing, since some serializers + // grow internal data structures by growing + copying every time the number of objects doubles. + private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000) + // How many times we have spilled so far private var spillCount = 0 + // Number of bytes spilled in total + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false) private val comparator = new KCComparator[K, C] @@ -139,21 +152,35 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)" .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) val (blockId, file) = diskBlockManager.createTempBlock() - val writer = - new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites) + + val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _) + def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, + compressStream, syncWrites) + + var writer = getNewWriter + var objectsWritten = 0 try { val it = currentMap.destructiveSortedIterator(comparator) while (it.hasNext) { val kv = it.next() writer.write(kv) + objectsWritten += 1 + + if (objectsWritten == serializerBatchSize) { + writer.commit() + writer = getNewWriter + objectsWritten = 0 + } } - writer.commit() + + if (objectsWritten > 0) writer.commit() } finally { // Partial failures cannot be tolerated; do not revert partial writes + _diskBytesSpilled += writer.bytesWritten writer.close() } currentMap = new SizeTrackingAppendOnlyMap[K, C] - spilledMaps.append(new DiskMapIterator(file)) + spilledMaps.append(new DiskMapIterator(file, blockId)) // Reset the amount of shuffle memory used by this map in the global pool val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap @@ -161,8 +188,12 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( shuffleMemoryMap(Thread.currentThread().getId) = 0 } numPairsInMemory = 0 + _memoryBytesSpilled += mapSize } + def memoryBytesSpilled: Long = _memoryBytesSpilled + def diskBytesSpilled: Long = _diskBytesSpilled + /** * Return an iterator that merges the in-memory map with the spilled maps. * If no spill has occurred, simply return the in-memory map's iterator. @@ -297,16 +328,35 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( /** * An iterator that returns (K, C) pairs in sorted order from an on-disk map */ - private class DiskMapIterator(file: File) extends Iterator[(K, C)] { + private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) - val bufferedStream = new FastBufferedInputStream(fileStream) - val deserializeStream = ser.deserializeStream(bufferedStream) + val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize) + val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + var deserializeStream = ser.deserializeStream(compressedStream) + var objectsRead = 0 + var nextItem: (K, C) = null var eof = false def readNextItem(): (K, C) = { if (!eof) { try { + if (objectsRead == serializerBatchSize) { + val newInputStream = deserializeStream match { + case stream: KryoDeserializationStream => + // Kryo's serializer stores an internal buffer that pre-fetches from the underlying + // stream. We need to capture this buffer and feed it to the new serialization + // stream so that the bytes are not lost. + val kryoInput = stream.input + val remainingBytes = kryoInput.limit() - kryoInput.position() + val extraBuf = kryoInput.readBytes(remainingBytes) + new SequenceInputStream(new ByteArrayInputStream(extraBuf), compressedStream) + case _ => compressedStream + } + deserializeStream = ser.deserializeStream(newInputStream) + objectsRead = 0 + } + objectsRead += 1 return deserializeStream.readObject().asInstanceOf[(K, C)] } catch { case e: EOFException => diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 87e009a4de93d..5ded5d0b6da84 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -84,6 +84,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( protected var _bitset = new BitSet(_capacity) + def getBitSet = _bitset + // Init of the array in constructor (instead of in declaration) to work around a Scala compiler // specialization bug that would generate two arrays (one for Object and one for specialized T). protected var _data: Array[T] = _ @@ -161,7 +163,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( def getPos(k: T): Int = { var pos = hashcode(hasher.hash(k)) & _mask var i = 1 - while (true) { + val maxProbe = _data.size + while (i < maxProbe) { if (!_bitset.get(pos)) { return INVALID_POS } else if (k == _data(pos)) { @@ -179,6 +182,22 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( /** Return the value at the specified position. */ def getValue(pos: Int): T = _data(pos) + def iterator = new Iterator[T] { + var pos = nextPos(0) + override def hasNext: Boolean = pos != INVALID_POS + override def next(): T = { + val tmp = getValue(pos) + pos = nextPos(pos+1) + tmp + } + } + + /** Return the value at the specified position. */ + def getValueSafe(pos: Int): T = { + assert(_bitset.get(pos)) + _data(pos) + } + /** * Return the next position with an element stored, starting from the given position inclusively. */ @@ -259,7 +278,7 @@ object OpenHashSet { * A set of specialized hash function implementation to avoid boxing hash code computation * in the specialized implementation of OpenHashSet. */ - sealed class Hasher[@specialized(Long, Int) T] { + sealed class Hasher[@specialized(Long, Int) T] extends Serializable { def hash(o: T): Int = o.hashCode() } diff --git a/core/src/test/resources/spark.conf b/core/src/test/resources/spark.conf deleted file mode 100644 index aa4e7512354d3..0000000000000 --- a/core/src/test/resources/spark.conf +++ /dev/null @@ -1,8 +0,0 @@ -# A simple spark.conf file used only in our unit tests - -spark.test.intTestProperty = 1 - -spark.test { - stringTestProperty = "hi" - listTestProperty = ["a", "b"] -} diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index d9cb7fead5b88..8de7a328d1cf5 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -125,6 +125,23 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(thrown.getMessage.contains("failed 4 times")) } + test("repeatedly failing task that crashes JVM") { + // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather + // than hanging due to retrying the failed task infinitely many times (eventually the + // standalone scheduler will remove the application, causing the job to hang waiting to + // reconnect to the master). + sc = new SparkContext(clusterUrl, "test") + failAfter(Span(100000, Millis)) { + val thrown = intercept[SparkException] { + // One of the tasks always fails. + sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) } + } + assert(thrown.getClass === classOf[SparkException]) + System.out.println(thrown.getMessage) + assert(thrown.getMessage.contains("failed 4 times")) + } + } + test("caching") { sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(1 to 1000, 10).cache() diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index ef5936dd2f588..87e9012622456 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -1,37 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark import org.scalatest.FunSuite class SparkConfSuite extends FunSuite with LocalSparkContext { - // This test uses the spark.conf in core/src/test/resources, which has a few test properties - test("loading from spark.conf") { - val conf = new SparkConf() - assert(conf.get("spark.test.intTestProperty") === "1") - assert(conf.get("spark.test.stringTestProperty") === "hi") - // NOTE: we don't use list properties yet, but when we do, we'll have to deal with this syntax - assert(conf.get("spark.test.listTestProperty") === "[a, b]") - } - - // This test uses the spark.conf in core/src/test/resources, which has a few test properties - test("system properties override spark.conf") { + test("loading from system properties") { try { - System.setProperty("spark.test.intTestProperty", "2") + System.setProperty("spark.test.testProperty", "2") val conf = new SparkConf() - assert(conf.get("spark.test.intTestProperty") === "2") - assert(conf.get("spark.test.stringTestProperty") === "hi") + assert(conf.get("spark.test.testProperty") === "2") } finally { - System.clearProperty("spark.test.intTestProperty") + System.clearProperty("spark.test.testProperty") } } test("initializing without loading defaults") { try { - System.setProperty("spark.test.intTestProperty", "2") + System.setProperty("spark.test.testProperty", "2") val conf = new SparkConf(false) - assert(!conf.contains("spark.test.intTestProperty")) - assert(!conf.contains("spark.test.stringTestProperty")) + assert(!conf.contains("spark.test.testProperty")) } finally { - System.clearProperty("spark.test.intTestProperty") + System.clearProperty("spark.test.testProperty") } } @@ -107,4 +112,25 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { assert(sc.master === "local[2]") assert(sc.appName === "My other app") } + + test("nested property names") { + // This wasn't supported by some external conf parsing libraries + try { + System.setProperty("spark.test.a", "a") + System.setProperty("spark.test.a.b", "a.b") + System.setProperty("spark.test.a.b.c", "a.b.c") + val conf = new SparkConf() + assert(conf.get("spark.test.a") === "a") + assert(conf.get("spark.test.a.b") === "a.b") + assert(conf.get("spark.test.a.b.c") === "a.b.c") + conf.set("spark.test.a.b", "A.B") + assert(conf.get("spark.test.a") === "a") + assert(conf.get("spark.test.a.b") === "A.B") + assert(conf.get("spark.test.a.b.c") === "a.b.c") + } finally { + System.clearProperty("spark.test.a") + System.clearProperty("spark.test.a.b") + System.clearProperty("spark.test.a.b.c") + } + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index 45dbcaffae94f..0c502612647a2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker import java.io.File diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala index 94d88d307a163..1f1d8d138005b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index ef957bb0e5d17..bb4dc0fcd31a3 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.util.collection import scala.collection.mutable.ArrayBuffer @@ -9,22 +26,19 @@ import org.apache.spark.SparkContext._ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { - override def beforeEach() { - val conf = new SparkConf(false) - conf.set("spark.shuffle.externalSorting", "true") - sc = new SparkContext("local", "test", conf) - } - - val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i) - val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => { + private val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i) + private val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => { buffer += i } - val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] = + private val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] = (buf1, buf2) => { buf1 ++= buf2 } test("simple insert") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) @@ -48,6 +62,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("insert with collision") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) @@ -67,6 +84,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("ordering") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) map1.insert(1, 10) @@ -109,6 +129,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("null keys and values") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) map.insert(1, 5) @@ -147,6 +170,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("simple aggregator") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) + // reduceByKey val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1)) val result1 = rdd.reduceByKey(_+_).collect() @@ -159,6 +185,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("simple cogroup") { + val conf = new SparkConf(false) + sc = new SparkContext("local", "test", conf) val rdd1 = sc.parallelize(1 to 4).map(i => (i, i)) val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i)) val result = rdd1.cogroup(rdd2).collect() @@ -175,56 +203,56 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("spilling") { - // TODO: Figure out correct memory parameters to actually induce spilling - // System.setProperty("spark.shuffle.buffer.mb", "1") - // System.setProperty("spark.shuffle.buffer.fraction", "0.05") + // TODO: Use SparkConf (which currently throws connection reset exception) + System.setProperty("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext("local-cluster[1,1,512]", "test") - // reduceByKey - should spill exactly 6 times - val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i)) + // reduceByKey - should spill ~8 times + val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i)) val resultA = rddA.reduceByKey(math.max(_, _)).collect() - assert(resultA.length == 5000) + assert(resultA.length == 50000) resultA.foreach { case(k, v) => k match { case 0 => assert(v == 1) - case 2500 => assert(v == 5001) - case 4999 => assert(v == 9999) + case 25000 => assert(v == 50001) + case 49999 => assert(v == 99999) case _ => } } - // groupByKey - should spill exactly 11 times - val rddB = sc.parallelize(0 until 10000).map(i => (i/4, i)) + // groupByKey - should spill ~17 times + val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i)) val resultB = rddB.groupByKey().collect() - assert(resultB.length == 2500) + assert(resultB.length == 25000) resultB.foreach { case(i, seq) => i match { case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3)) - case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003)) - case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999)) + case 12500 => assert(seq.toSet == Set[Int](50000, 50001, 50002, 50003)) + case 24999 => assert(seq.toSet == Set[Int](99996, 99997, 99998, 99999)) case _ => } } - // cogroup - should spill exactly 7 times - val rddC1 = sc.parallelize(0 until 1000).map(i => (i, i)) - val rddC2 = sc.parallelize(0 until 1000).map(i => (i%100, i)) + // cogroup - should spill ~7 times + val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i)) + val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i)) val resultC = rddC1.cogroup(rddC2).collect() - assert(resultC.length == 1000) + assert(resultC.length == 10000) resultC.foreach { case(i, (seq1, seq2)) => i match { case 0 => assert(seq1.toSet == Set[Int](0)) - assert(seq2.toSet == Set[Int](0, 100, 200, 300, 400, 500, 600, 700, 800, 900)) - case 500 => - assert(seq1.toSet == Set[Int](500)) + assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000)) + case 5000 => + assert(seq1.toSet == Set[Int](5000)) assert(seq2.toSet == Set[Int]()) - case 999 => - assert(seq1.toSet == Set[Int](999)) + case 9999 => + assert(seq1.toSet == Set[Int](9999)) assert(seq2.toSet == Set[Int]()) case _ => } } - } - // TODO: Test memory allocation for multiple concurrently running tasks + System.clearProperty("spark.shuffle.memoryFraction") + } } diff --git a/docs/_config.yml b/docs/_config.yml index 11d18f0ac2093..ce0fdf5fb4f03 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -5,6 +5,6 @@ markdown: kramdown # of Spark, Scala, and Mesos. SPARK_VERSION: 0.9.0-incubating-SNAPSHOT SPARK_VERSION_SHORT: 0.9.0 -SCALA_VERSION: 2.10 +SCALA_VERSION: "2.10" MESOS_VERSION: 0.13.0 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index ad7969d012283..c529d89ffd192 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -21,7 +21,7 @@ - + @@ -68,9 +68,10 @@
  • Spark Streaming
  • MLlib (Machine Learning)
  • Bagel (Pregel on Spark)
  • +
  • GraphX (Graph Processing)
  • - + @@ -161,7 +163,7 @@

    Heading

    - +