diff --git a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala index 8e0f82ddb8897..110bd0a9a0c41 100644 --- a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala @@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo sc.stop() sc = null } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") } test("halting by voting") { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 4c8f9ed6fbc02..7dcfbf741c4f1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. Uses the provided - * Partitioner to partition the output RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + * @param partitioner partitioner of the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, partitioner) + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = + { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) } /** - * Return approximate number of distinct values for each key this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism - * level. + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + * @param numPartitions number of partitions of the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD) + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) } - /** * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. HashPartitions the - * output RDD into numPartitions. * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. */ - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, numPartitions) + def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD)) } /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 619bfd75be8eb..330569a8d8837 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return approximate number of distinct elements in the RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. */ - def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) + def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 223fef79261d0..f2ce3cbd47f93 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -28,7 +28,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import com.clearspring.analytics.stream.cardinality.HyperLogLog +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType @@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer -import org.apache.spark.util.SerializableHyperLogLog /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. @@ -214,39 +213,88 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * :: Experimental :: + * * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. Uses the provided - * Partitioner to partition the output RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` + * would trigger sparse representation of registers, which may reduce the memory consumption + * and increase accuracy when the cardinality is small. + * + * @param p The precision value for the normal set. + * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If `sp` equals 0, the sparse representation is skipped. + * @param partitioner Partitioner to use for the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { - val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v) - val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v) - val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) + @Experimental + def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { + require(p >= 4, s"p ($p) must be >= 4") + require(sp <= 32, s"sp ($sp) must be <= 32") + require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") + val createHLL = (v: V) => { + val hll = new HyperLogLogPlus(p, sp) + hll.offer(v) + hll + } + val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => { + hll.offer(v) + hll + } + val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { + h1.addAll(h2) + h1 + } + + combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality()) + } - combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) + /** + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + * @param partitioner partitioner of the resulting RDD + */ + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") + val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt + assert(p <= 32) + countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner) } /** * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. HashPartitions the - * output RDD into numPartitions. * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + * @param numPartitions number of partitions of the resulting RDD */ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } /** - * Return approximate number of distinct values for each key this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. The default value of - * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism - * level. + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. */ def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index aa03e9276fb34..54bdc3e7cbc7a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -19,12 +19,11 @@ package org.apache.spark.rdd import java.util.Random -import scala.collection.Map -import scala.collection.mutable +import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer import scala.reflect.{classTag, ClassTag} -import com.clearspring.analytics.stream.cardinality.HyperLogLog +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.NullWritable @@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils} +import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler} @@ -655,7 +654,19 @@ abstract class RDD[T: ClassTag]( * partitions* and the *same number of elements in each partition* (e.g. one was made through * a map on the other). */ - def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other) + def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = { + zipPartitions(other, true) { (thisIter, otherIter) => + new Iterator[(T, U)] { + def hasNext = (thisIter.hasNext, otherIter.hasNext) match { + case (true, true) => true + case (false, false) => false + case _ => throw new SparkException("Can only zip RDDs with " + + "same number of elements in each partition") + } + def next = (thisIter.next, otherIter.next) + } + } + } /** * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by @@ -921,15 +932,49 @@ abstract class RDD[T: ClassTag]( * :: Experimental :: * Return approximate number of distinct elements in the RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` + * would trigger sparse representation of registers, which may reduce the memory consumption + * and increase accuracy when the cardinality is small. + * + * @param p The precision value for the normal set. + * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If `sp` equals 0, the sparse representation is skipped. */ @Experimental + def countApproxDistinct(p: Int, sp: Int): Long = { + require(p >= 4, s"p ($p) must be greater than 0") + require(sp <= 32, s"sp ($sp) cannot be greater than 32") + require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") + val zeroCounter = new HyperLogLogPlus(p, sp) + aggregate(zeroCounter)( + (hll: HyperLogLogPlus, v: T) => { + hll.offer(v) + hll + }, + (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { + h1.addAll(h2) + h2 + }).cardinality() + } + + /** + * Return approximate number of distinct elements in the RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + */ def countApproxDistinct(relativeSD: Double = 0.05): Long = { - val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) - aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() + val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt + countApproxDistinct(p, 0) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala deleted file mode 100644 index b8110ffc42f2d..0000000000000 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import java.io.{IOException, ObjectOutputStream} - -import scala.reflect.ClassTag - -import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext} - -private[spark] class ZippedPartition[T: ClassTag, U: ClassTag]( - idx: Int, - @transient rdd1: RDD[T], - @transient rdd2: RDD[U] - ) extends Partition { - - var partition1 = rdd1.partitions(idx) - var partition2 = rdd2.partitions(idx) - override val index: Int = idx - - def partitions = (partition1, partition2) - - @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { - // Update the reference to parent partition at the time of task serialization - partition1 = rdd1.partitions(idx) - partition2 = rdd2.partitions(idx) - oos.defaultWriteObject() - } -} - -private[spark] class ZippedRDD[T: ClassTag, U: ClassTag]( - sc: SparkContext, - var rdd1: RDD[T], - var rdd2: RDD[U]) - extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) { - - override def getPartitions: Array[Partition] = { - if (rdd1.partitions.size != rdd2.partitions.size) { - throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") - } - val array = new Array[Partition](rdd1.partitions.size) - for (i <- 0 until rdd1.partitions.size) { - array(i) = new ZippedPartition(i, rdd1, rdd2) - } - array - } - - override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = { - val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions - rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context)) - } - - override def getPreferredLocations(s: Partition): Seq[String] = { - val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions - val pref1 = rdd1.preferredLocations(partition1) - val pref2 = rdd2.preferredLocations(partition2) - // Check whether there are any hosts that match both RDDs; otherwise return the union - val exactMatchLocations = pref1.intersect(pref2) - if (!exactMatchLocations.isEmpty) { - exactMatchLocations - } else { - (pref1 ++ pref2).distinct - } - } - - override def clearDependencies() { - super.clearDependencies() - rdd1 = null - rdd2 = null - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index cbe9bb093d1c9..9f45400bcf852 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -207,10 +207,12 @@ private[spark] class CoarseMesosSchedulerBackend( .addResources(createResource("cpus", cpusToUse)) .addResources(createResource("mem", sc.executorMemory)) .build() - d.launchTasks(offer.getId, Collections.singletonList(task), filters) + d.launchTasks( + Collections.singleton(offer.getId), Collections.singletonList(task), filters) } else { // Filter it out - d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters) + d.launchTasks( + Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index f08b19e6782e3..a089a02d42170 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -223,7 +223,7 @@ private[spark] class MesosSchedulerBackend( // Reply to the offers val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? for (i <- 0 until offers.size) { - d.launchTasks(offers(i).getId, mesosTasks(i), filters) + d.launchTasks(Collections.singleton(offers(i).getId), mesosTasks(i), filters) } } } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6e450081dcb11..a41286d3e4a00 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1015,8 +1015,26 @@ private[spark] class BlockManager( bytes: ByteBuffer, serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() - val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) - serializer.newInstance().deserializeStream(stream).asIterator + + def getIterator = { + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator + } + + if (blockId.isShuffle) { + // Reducer may need to read many local shuffle blocks and will wrap them into Iterators + // at the beginning. The wrapping will cost some memory (compression instance + // initialization, etc.). Reducer read shuffle blocks one by one so we could do the + // wrapping lazily to save memory. + class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { + lazy val proxy = f + override def hasNext: Boolean = proxy.hasNext + override def next(): Any = proxy.next() + } + new LazyProxyIterator(getIterator) + } else { + getIterator + } } def stop() { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 363de93e067b8..2d8ff1194a5dc 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -147,6 +147,27 @@ object StorageLevel { val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) + /** + * :: DeveloperApi :: + * Return the StorageLevel object with the specified name. + */ + @DeveloperApi + def fromString(s: String): StorageLevel = s match { + case "NONE" => NONE + case "DISK_ONLY" => DISK_ONLY + case "DISK_ONLY_2" => DISK_ONLY_2 + case "MEMORY_ONLY" => MEMORY_ONLY + case "MEMORY_ONLY_2" => MEMORY_ONLY_2 + case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER + case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2 + case "MEMORY_AND_DISK" => MEMORY_AND_DISK + case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2 + case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER + case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2 + case "OFF_HEAP" => OFF_HEAP + case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s) + } + /** * :: DeveloperApi :: * Create a new StorageLevel object without setting useOffHeap. diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala deleted file mode 100644 index 21a88eea3bbc2..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.{Externalizable, ObjectInput, ObjectOutput} - -import com.clearspring.analytics.stream.cardinality.{HyperLogLog, ICardinality} - -/** - * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is - * serializable. - */ -private[spark] -class SerializableHyperLogLog(var value: ICardinality) extends Externalizable { - - def this() = this(null) // For deserialization - - def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value)) - - def add[T](elem: T) = { - this.value.offer(elem) - this - } - - def readExternal(in: ObjectInput) { - val byteLength = in.readInt() - val bytes = new Array[Byte](byteLength) - in.readFully(bytes) - value = HyperLogLog.Builder.build(bytes) - } - - def writeExternal(out: ObjectOutput) { - val bytes = value.getBytes() - out.writeInt(bytes.length) - out.write(bytes) - } -} diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 3dd79243ab5bd..b78309f81cb8c 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -68,9 +68,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); - Utils.deleteRecursively(tempDir); } static class ReverseIntComparator implements Comparator, Serializable { @@ -1031,27 +1028,23 @@ public void countApproxDistinct() { arrayData.add(i % size); } JavaRDD simpleRdd = sc.parallelize(arrayData, 10); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01); + Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1); } @Test public void countApproxDistinctByKey() { - double relativeSD = 0.001; - List> arrayData = new ArrayList>(); for (int i = 10; i < 100; i++) for (int j = 0; j < i; j++) arrayData.add(new Tuple2(i, j)); JavaPairRDD pairRdd = sc.parallelizePairs(arrayData); - List> res = pairRdd.countApproxDistinctByKey(relativeSD).collect(); + List> res = pairRdd.countApproxDistinctByKey(8, 0).collect(); for (Tuple2 resItem : res) { double count = (double)resItem._1(); Long resCount = (Long)resItem._2(); Double error = Math.abs((resCount - count) / count); - Assert.assertTrue(error < relativeSD); + Assert.assertTrue(error < 0.1); } } diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala index c645e4cbe8132..4ab870e751778 100644 --- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala @@ -39,7 +39,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -77,7 +76,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === false) @@ -129,7 +127,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === true) @@ -182,7 +179,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = securityManager) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext System.setProperty("spark.hostPort", hostname + ":" + boundPort) assert(securityManager.isAuthenticationEnabled() === true) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 64933f4b1046d..f64f3c9036034 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -167,26 +167,28 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { }) } - test("ZippedRDD") { - testRDD(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x))) - testRDDPartitions(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x))) + test("ZippedPartitionsRDD") { + testRDD(rdd => rdd.zip(rdd.map(x => x))) + testRDDPartitions(rdd => rdd.zip(rdd.map(x => x))) - // Test that the ZippedPartition updates parent partitions - // after the parent RDD has been checkpointed and parent partitions have been changed. - // Note that this test is very specific to the current implementation of ZippedRDD. + // Test that ZippedPartitionsRDD updates parent partitions after parent RDDs have + // been checkpointed and parent partitions have been changed. + // Note that this test is very specific to the implementation of ZippedPartitionsRDD. val rdd = generateFatRDD() - val zippedRDD = new ZippedRDD(sc, rdd, rdd.map(x => x)) + val zippedRDD = rdd.zip(rdd.map(x => x)).asInstanceOf[ZippedPartitionsRDD2[_, _, _]] zippedRDD.rdd1.checkpoint() zippedRDD.rdd2.checkpoint() val partitionBeforeCheckpoint = - serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]]) + serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartitionsPartition]) zippedRDD.count() val partitionAfterCheckpoint = - serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]]) + serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartitionsPartition]) assert( - partitionAfterCheckpoint.partition1.getClass != partitionBeforeCheckpoint.partition1.getClass && - partitionAfterCheckpoint.partition2.getClass != partitionBeforeCheckpoint.partition2.getClass, - "ZippedRDD.partition1 and ZippedRDD.partition2 not updated after parent RDD is checkpointed" + partitionAfterCheckpoint.partitions(0).getClass != + partitionBeforeCheckpoint.partitions(0).getClass && + partitionAfterCheckpoint.partitions(1).getClass != + partitionBeforeCheckpoint.partitions(1).getClass, + "ZippedPartitionsRDD partition 0 (or 1) not updated after parent RDDs are checkpointed" ) } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 6b2571cd9295e..95ba273f16a71 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -124,9 +124,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, securityManager = new SecurityManager(conf)) - // Will be cleared by LocalSparkContext - System.setProperty("spark.driver.port", boundPort.toString) - val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 1230565ea5b7e..9ddafc451878d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -119,28 +119,30 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { * relatively tight error bounds to check correctness of functionality rather than checking * whether the approximation conforms with the requested bound. */ - val relativeSD = 0.001 + val p = 20 + val sp = 0 + // When p = 20, the relative accuracy is about 0.001. So with high probability, the + // relative error should be smaller than the threshold 0.01 we use here. + val relativeSD = 0.01 // For each value i, there are i tuples with first element equal to i. // Therefore, the expected count for key i would be i. val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) val rdd1 = sc.parallelize(stacked) - val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect() - counted1.foreach{ - case(k, count) => assert(error(count, k) < relativeSD) - } + val counted1 = rdd1.countApproxDistinctByKey(p, sp).collect() + counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) } - val rnd = new Random() + val rnd = new Random(42) // The expected count for key num would be num val randStacked = (1 to 100).flatMap { i => - val num = rnd.nextInt % 500 + val num = rnd.nextInt() % 500 (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) - val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect() - counted2.foreach{ - case(k, count) => assert(error(count, k) < relativeSD) + val counted2 = rdd2.countApproxDistinctByKey(relativeSD).collect() + counted2.foreach { case (k, count) => + assert(error(count, k) < relativeSD, s"${error(count, k)} < $relativeSD") } } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index e686068f7a99a..286e221e33b78 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -73,10 +73,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val size = 100 val uniformDistro = for (i <- 1 to 100000) yield i % size val simpleRdd = sc.makeRDD(uniformDistro) - assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2) - assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05) - assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01) - assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001) + assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4) + assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1) } test("SparkContext.union") { @@ -352,6 +350,10 @@ class RDDSuite extends FunSuite with SharedSparkContext { intercept[IllegalArgumentException] { nums.zip(sc.parallelize(1 to 4, 1)).collect() } + + intercept[SparkException] { + nums.zip(sc.parallelize(1 to 5, 2)).collect() + } } test("partition pruning") { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 00deecc1c3ca9..81bd8257bc155 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -78,8 +78,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } after { - System.clearProperty("spark.driver.port") - if (store != null) { store.stop() store = null diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index 7f744d5589ef7..e3ac32ef1a12e 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one or more diff --git a/docs/spark-debugger.md b/docs/spark-debugger.md deleted file mode 100644 index 35d06c51aaf0e..0000000000000 --- a/docs/spark-debugger.md +++ /dev/null @@ -1,121 +0,0 @@ ---- -layout: global -title: The Spark Debugger ---- -**Summary:** The Spark debugger provides replay debugging for deterministic (logic) errors in Spark programs. It's currently in development, but you can try it out in the [arthur branch](https://github.com/apache/spark/tree/arthur). - -## Introduction - -From a user's point of view, debugging a general distributed program can be tedious and confusing. Many distributed programs are nondeterministic; their outcome depends on the interleaving between computation and message passing across multiple machines. Also, the fact that a program is running on a cluster of hundreds or thousands of machines means that it's hard to understand the program state and pinpoint the location of problems. - -In order to tame nondeterminism, a distributed debugger has to log a lot of information, imposing a serious performance penalty on the application being debugged. - -But the Spark programming model lets us provide replay debugging for almost zero overhead. Spark programs are a series of RDDs and deterministic transformations, so when debugging a Spark program, we don't have to debug it all at once -- instead, we can debug each transformation individually. Broadly, the debugger lets us do the following two things: - -* Recompute and inspect intermediate RDDs after the program has finished. -* Re-run a particular task in a single-threaded debugger to find exactly what went wrong. - -For deterministic errors, debugging a Spark program is now as easy as debugging a single-threaded one. - -## Approach - -As your Spark program runs, the slaves report key events back to the master -- for example, RDD creations, RDD contents, and uncaught exceptions. (A full list of event types is in [EventLogging.scala](https://github.com/apache/spark/blob/arthur/core/src/main/scala/spark/EventLogging.scala).) The master logs those events, and you can load the event log into the debugger after your program is done running. - -_A note on nondeterminism:_ For fault recovery, Spark requires RDD transformations (for example, the function passed to `RDD.map`) to be deterministic. The Spark debugger also relies on this property, and it can also warn you if your transformation is nondeterministic. This works by checksumming the contents of each RDD and comparing the checksums from the original execution to the checksums after recomputing the RDD in the debugger. - -## Usage - -### Enabling the event log - -To turn on event logging for your program, set `$SPARK_JAVA_OPTS` in `conf/spark-env.sh` as follows: - -{% highlight bash %} -export SPARK_JAVA_OPTS='-Dspark.arthur.logPath=path/to/event-log' -{% endhighlight %} - -where `path/to/event-log` is where you want the event log to go relative to `$SPARK_HOME`. - -**Warning:** If `path/to/event-log` already exists, event logging will be automatically disabled. - -### Loading the event log into the debugger - -1. Run a Spark shell with `./bin/spark-shell --master hist`. -2. Use `EventLogReader` to load the event log as follows: - {% highlight scala %} -spark> val r = new spark.EventLogReader(sc, Some("path/to/event-log")) -r: spark.EventLogReader = spark.EventLogReader@726b37ad -{% endhighlight %} - - **Warning:** If the event log doesn't exist or is unreadable, this will silently fail and `r.events` will be empty. - -### Exploring intermediate RDDs - -Use `r.rdds` to get a list of intermediate RDDs generated during your program's execution. An RDD with id _x_ is located at r.rdds(x). For example: - -{% highlight scala %} -scala> r.rdds -res8: scala.collection.mutable.ArrayBuffer[spark.RDD[_]] = ArrayBuffer(spark.HadoopRDD@fe85adf, spark.MappedRDD@5fa5eea1, spark.MappedRDD@6d5bd16, spark.ShuffledRDD@3a70f2db, spark.FlatMappedValuesRDD@4d5825d6, spark.MappedValuesRDD@561c2c45, spark.CoGroupedRDD@539e922d, spark.MappedValuesRDD@4f8ef33e, spark.FlatMappedRDD@32039440, spark.ShuffledRDD@8fa0f67, spark.MappedValuesRDD@590937cb, spark.CoGroupedRDD@6c2e1e17, spark.MappedValuesRDD@47b9af7d, spark.FlatMappedRDD@6fb05c54, spark.ShuffledRDD@237dc815, spark.MappedValuesRDD@16daece7, spark.CoGroupedRDD@7ef73d69, spark.MappedValuesRDD@19e0f99e, spark.FlatMappedRDD@1240158, spark.ShuffledRDD@62d438fd, spark.MappedValuesRDD@5ae99cbb, spark.FilteredRDD@1f30e79e, spark.MappedRDD@43b64611) -{% endhighlight %} - -Use `r.printRDDs()` to get a formatted list of intermediate RDDs, along with the source location where they were created. For example: - -{% highlight scala %} -scala> r.printRDDs -#00: HadoopRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:31) -#01: MappedRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:31) -#02: MappedRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:35) -#03: ShuffledRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:35) -#04: FlatMappedValuesRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:35) -#05: MappedValuesRDD spark.bagel.examples.WikipediaPageRankStandalone$.pageRank(WikipediaPageRankStandalone.scala:91) -#06: CoGroupedRDD spark.bagel.examples.WikipediaPageRankStandalone$.pageRank(WikipediaPageRankStandalone.scala:92) -[...] -{% endhighlight %} - -Use `r.visualizeRDDs()` to visualize the RDDs as a dependency graph. For example: - -{% highlight scala %} -scala> r.visualizeRDDs -/tmp/spark-rdds-3758182885839775712.pdf -{% endhighlight %} - -![Example RDD dependency graph](http://www.ankurdave.com/images/rdd-dep-graph.png) - -Iterate over the `RDDCreation` entries in `r.events` (e.g. `for (RDDCreation(rdd, location) <- events)`) to access the RDD creation locations as well as the RDDs themselves. - -### Debugging a particular task - -1. Find the task you want to debug. If the task threw an exception, the `ExceptionEvent` that was created will have a reference to the task. For example: - {% highlight scala %} -spark> val task = r.events.collect { case e: ExceptionEvent => e }.head.task -{% endhighlight %} - Otherwise, look through the list of all tasks in `r.tasks`, or browse tasks by RDD using r.tasksForRDD(rdd), which returns a list of tasks whose input is the given RDD. - -2. Run the task by calling r.debugTask(taskStageId, taskPartition). The task should contain these two values; you can extract them as follows: - {% highlight scala %} -val (taskStageId, taskPartition) = task match { - case rt: ResultTask[_, _] => (rt.stageId, rt.partition) - case smt: ShuffleMapTask => (smt.stageId, smt.partition) - case _ => throw new UnsupportedOperationException -}) -{% endhighlight %} - The Spark debugger will launch the task in a separate JVM, but you will see the task's stdout and stderr inline with the Spark shell. If you want to pass custom debugging arguments to the task's JVM (for example, to change the debugging port), set the optional `debugOpts` argument to `r.debugTask`. When `debugOpts` is left unset, it defaults to: - {% highlight scala %} --Xdebug -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000 -{% endhighlight %} - -3. In another terminal, attach your favorite conventional debugger to the Spark shell. For example, if you want to use jdb, run `jdb -attach 8000`. - -4. Debug the task as you would debug a normal program. For example, to break when an exception is thrown: - {% highlight scala %} -> catch org.xml.sax.SAXParseException -{% endhighlight %} - -5. When the task ends, its JVM will quit and control will return to the main Spark shell. To stop it prematurely, you can kill it from the debugger, or interrupt it from the terminal with Ctrl-C. - -### Detecting nondeterminism in your transformations - -When a task gets run more than once, Arthur is able to compare the checksums of the task's output. If they are different, Arthur will insert a `ChecksumEvent` into `r.checksumMismatches` and print a warning like the following: - {% highlight scala %} -12/04/07 11:42:44 WARN spark.EventLogWriter: Nondeterminism detected in shuffle output on RDD 2, partition 3, output split 0 -{% endhighlight %} - diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 8b056f5ea734c..3af9f66e17dc2 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -83,7 +83,7 @@ def parse_args(): "between zones applies)") parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use") parser.add_option( - "-v", "--spark-version", default="0.9.1", + "-v", "--spark-version", default="1.0.0", help="Version of Spark to use: 'X.Y.Z' or a specific git hash") parser.add_option( "--spark-git-repo", @@ -191,7 +191,8 @@ def is_active(instance): # Return correct versions of Spark and Shark, given the supplied Spark version def get_spark_shark_version(opts): spark_shark_map = { - "0.7.3": "0.7.1", "0.8.0": "0.8.0", "0.8.1": "0.8.1", "0.9.0": "0.9.0", "0.9.1": "0.9.1" + "0.7.3": "0.7.1", "0.8.0": "0.8.0", "0.8.1": "0.8.1", "0.9.0": "0.9.0", "0.9.1": "0.9.1", + "1.0.0": "1.0.0" } version = opts.spark_version.replace("v", "") if version not in spark_shark_map: @@ -199,7 +200,6 @@ def get_spark_shark_version(opts): sys.exit(1) return (version, spark_shark_map[version]) - # Attempt to resolve an appropriate AMI given the architecture and # region of the request. def get_spark_ami(opts): diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala new file mode 100644 index 0000000000000..551c339b19523 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.graphx + +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx.PartitionStrategy +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.graphx.util.GraphGenerators +import java.io.{PrintWriter, FileOutputStream} + +/** + * The SynthBenchmark application can be used to run various GraphX algorithms on + * synthetic log-normal graphs. The intent of this code is to enable users to + * profile the GraphX system without access to large graph datasets. + */ +object SynthBenchmark { + + /** + * To run this program use the following: + * + * MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank + * + * Options: + * -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank) + * -niters the number of iterations of pagerank to use (Default: 10) + * -numVertices the number of vertices in the graph (Default: 1000000) + * -numEPart the number of edge partitions in the graph (Default: number of cores) + * -partStrategy the graph partitioning strategy to use + * -mu the mean parameter for the log-normal graph (Default: 4.0) + * -sigma the stdev parameter for the log-normal graph (Default: 1.3) + * -degFile the local file to save the degree information (Default: Empty) + */ + def main(args: Array[String]) { + val options = args.map { + arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + var app = "pagerank" + var niter = 10 + var numVertices = 100000 + var numEPart: Option[Int] = None + var partitionStrategy: Option[PartitionStrategy] = None + var mu: Double = 4.0 + var sigma: Double = 1.3 + var degFile: String = "" + + options.foreach { + case ("app", v) => app = v + case ("niter", v) => niter = v.toInt + case ("nverts", v) => numVertices = v.toInt + case ("numEPart", v) => numEPart = Some(v.toInt) + case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v)) + case ("mu", v) => mu = v.toDouble + case ("sigma", v) => sigma = v.toDouble + case ("degFile", v) => degFile = v + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + val conf = new SparkConf() + .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + + val sc = new SparkContext(conf) + + // Create the graph + println(s"Creating graph...") + val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices, + numEPart.getOrElse(sc.defaultParallelism), mu, sigma) + // Repartition the graph + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache() + + var startTime = System.currentTimeMillis() + val numEdges = graph.edges.count() + println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges") + val loadTime = System.currentTimeMillis() - startTime + + // Collect the degree distribution (if desired) + if (!degFile.isEmpty) { + val fos = new FileOutputStream(degFile) + val pos = new PrintWriter(fos) + val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0)) + .map(p => p._2).countByValue() + hist.foreach { + case (deg, count) => pos.println(s"$deg \t $count") + } + } + + // Run PageRank + startTime = System.currentTimeMillis() + if (app == "pagerank") { + println("Running PageRank") + val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum() + println(s"Total PageRank = $totalPR") + } else if (app == "cc") { + println("Running Connected Components") + val numComponents = graph.connectedComponents.vertices.map(_._2).distinct() + println(s"Number of components = $numComponents") + } + val runTime = System.currentTimeMillis() - startTime + + println(s"Num Vertices = $numVertices") + println(s"Num Edges = $numEdges") + println(s"Creation time = ${loadTime/1000.0} seconds") + println(s"Run time = ${runTime/1000.0} seconds") + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index 61c460c6b1de8..63db688bfb8c0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -43,7 +43,7 @@ object RDDRelation { sql("SELECT * FROM records").collect().foreach(println) // Aggregation queries are also supported. - val count = sql("SELECT COUNT(*) FROM records").collect().head.getInt(0) + val count = sql("SELECT COUNT(*) FROM records").collect().head.getLong(0) println(s"COUNT(*): $count") // The results of SQL queries are themselves RDDs and support all normal RDD functions. The diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index 84d3b6f243c72..c366c10b15a20 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -58,8 +58,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); } @Test diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index a8fc095072512..899a3cbd62b60 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx.impl.EdgePartition +import org.apache.spark.graphx.impl.EdgePartitionBuilder /** * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each @@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition * `impl.ReplicatedVertexView`. */ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( - val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]) + val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], + val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { partitionsRDD.setName("EdgeRDD") @@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() + /** + * Persists the edge partitions at the specified storage level, ignoring any existing target + * storage level. + */ override def persist(newLevel: StorageLevel): this.type = { partitionsRDD.persist(newLevel) this @@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( this } + /** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */ + override def cache(): this.type = { + partitionsRDD.persist(targetStorageLevel) + this + } + private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = { - new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => + this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => if (iter.hasNext) { val (pid, ep) = iter.next() Iterator(Tuple2(pid, f(pid, ep))) @@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = { val ed2Tag = classTag[ED2] val ed3Tag = classTag[ED3] - new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { + this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { (thisIter, otherIter) => val (pid, thisEPart) = thisIter.next() val (_, otherEPart) = otherIter.next() Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) }) } + + /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */ + private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag]( + partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = { + new EdgeRDD(partitionsRDD, this.targetStorageLevel) + } + + /** + * Changes the target storage level while preserving all other properties of the + * EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level. + * + * This does not actually trigger a cache; to do this, call + * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD. + */ + private[graphx] def withTargetStorageLevel( + targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = { + new EdgeRDD(this.partitionsRDD, targetStorageLevel) + } + +} + +object EdgeRDD { + /** + * Creates an EdgeRDD from a set of edges. + * + * @tparam ED the edge attribute type + * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD + */ + def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = { + val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => + val builder = new EdgePartitionBuilder[ED, VD] + iter.foreach { e => + builder.add(e.srcId, e.dstId, e.attr) + } + Iterator((pid, builder.toEdgePartition)) + } + EdgeRDD.fromEdgePartitions(edgePartitions) + } + + /** + * Creates an EdgeRDD from already-constructed edge partitions. + * + * @tparam ED the edge attribute type + * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD + */ + def fromEdgePartitions[ED: ClassTag, VD: ClassTag]( + edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = { + new EdgeRDD(edgePartitions) + } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index dc5dac4fdad57..14ae50e6657fd 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab @transient val triplets: RDD[EdgeTriplet[VD, ED]] /** - * Caches the vertices and edges associated with this graph at the specified storage level. + * Caches the vertices and edges associated with this graph at the specified storage level, + * ignoring any target storage levels previously set. * * @param newLevel the level at which to cache the graph. * @@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] /** - * Caches the vertices and edges associated with this graph. This is used to - * pin a graph in memory enabling multiple queries to reuse the same - * construction process. + * Caches the vertices and edges associated with this graph at the previously-specified target + * storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling + * multiple queries to reuse the same construction process. */ def cache(): Graph[VD, ED] @@ -105,9 +106,19 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab /** * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. */ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] + /** + * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. + * @param numPartitions the number of edge partitions in the new graph. + */ + def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] + /** * Transforms each vertex attribute in the graph using the map function. * @@ -358,9 +369,12 @@ object Graph { * Construct a graph from a collection of edges encoded as vertex id pairs. * * @param rawEdges a collection of edges in (src, dst) form + * @param defaultValue the vertex attributes with which to create vertices referenced by the edges * @param uniqueEdges if multiple identical edges are found they are combined and the edge * attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable * `uniqueEdges`, a [[PartitionStrategy]] must be provided. + * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary + * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary * * @return a graph with edge attributes containing either the count of duplicate edges or 1 * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex. @@ -368,10 +382,12 @@ object Graph { def fromEdgeTuples[VD: ClassTag]( rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, - uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = + uniqueEdges: Option[PartitionStrategy] = None, + edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, + vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] = { val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) - val graph = GraphImpl(edges, defaultValue) + val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel) uniqueEdges match { case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b) case None => graph @@ -383,14 +399,18 @@ object Graph { * * @param edges the RDD containing the set of edges in the graph * @param defaultValue the default vertex attribute to use for each vertex + * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary + * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary * * @return a graph with edge attributes described by `edges` and vertices * given by all vertices in `edges` with value `defaultValue` */ def fromEdges[VD: ClassTag, ED: ClassTag]( edges: RDD[Edge[ED]], - defaultValue: VD): Graph[VD, ED] = { - GraphImpl(edges, defaultValue) + defaultValue: VD, + edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, + vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = { + GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel) } /** @@ -405,12 +425,16 @@ object Graph { * @param edges the collection of edges in the graph * @param defaultVertexAttr the default vertex attribute to use for vertices that are * mentioned in edges but not in vertices + * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary + * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary */ def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { - GraphImpl(vertices, edges, defaultVertexAttr) + defaultVertexAttr: VD = null.asInstanceOf[VD], + edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, + vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = { + GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index 389490c139848..2e814e34f9ad8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -17,6 +17,7 @@ package org.apache.spark.graphx +import org.apache.spark.storage.StorageLevel import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} @@ -48,12 +49,16 @@ object GraphLoader extends Logging { * @param canonicalOrientation whether to orient edges in the positive * direction * @param minEdgePartitions the number of partitions for the edge RDD + * @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex + * storage level, call [[org.apache.spark.graphx.Graph#persistVertices]]. */ def edgeListFile( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, - minEdgePartitions: Int = 1) + minEdgePartitions: Int = 1, + edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, + vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) : Graph[Int, Int] = { val startTime = System.currentTimeMillis @@ -78,12 +83,13 @@ object GraphLoader extends Logging { } } Iterator((pid, builder.toEdgePartition)) - }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path)) + }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path)) edges.count() logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) - GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) + GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel) } // end of edgeListFile } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 1526ccef06fd4..5e7e72a764cc8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -114,9 +114,20 @@ object PartitionStrategy { */ case object CanonicalRandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { - val lower = math.min(src, dst) - val higher = math.max(src, dst) - math.abs((lower, higher).hashCode()) % numParts + if (src < dst) { + math.abs((src, dst).hashCode()) % numParts + } else { + math.abs((dst, src).hashCode()) % numParts + } } } + + /** Returns the PartitionStrategy with the specified name. */ + def fromString(s: String): PartitionStrategy = s match { + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + s) + } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 8b910fbc5a423..f1b6df9a3025e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._ * @tparam VD the vertex attribute associated with each vertex in the set. */ class VertexRDD[@specialized VD: ClassTag]( - val partitionsRDD: RDD[ShippableVertexPartition[VD]]) + val partitionsRDD: RDD[ShippableVertexPartition[VD]], + val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { require(partitionsRDD.partitioner.isDefined) @@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag]( * VertexRDD will be based on a different index and can no longer be quickly joined with this * RDD. */ - def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex())) + def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex())) override val partitioner = partitionsRDD.partitioner @@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag]( } setName("VertexRDD") + /** + * Persists the vertex partitions at the specified storage level, ignoring any existing target + * storage level. + */ override def persist(newLevel: StorageLevel): this.type = { partitionsRDD.persist(newLevel) this @@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag]( this } + /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */ + override def cache(): this.type = { + partitionsRDD.persist(targetStorageLevel) + this + } + /** The number of vertices in the RDD. */ override def count(): Long = { partitionsRDD.map(_.size).reduce(_ + _) @@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag]( f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]) : VertexRDD[VD2] = { val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) - new VertexRDD(newPartitionsRDD) + this.withPartitionsRDD(newPartitionsRDD) } @@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag]( val otherPart = otherIter.next() Iterator(thisPart.diff(otherPart)) } - new VertexRDD(newPartitionsRDD) + this.withPartitionsRDD(newPartitionsRDD) } /** @@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag]( val otherPart = otherIter.next() Iterator(thisPart.leftJoin(otherPart)(f)) } - new VertexRDD(newPartitionsRDD) + this.withPartitionsRDD(newPartitionsRDD) } /** @@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag]( case other: VertexRDD[_] => leftZipJoin(other)(f) case _ => - new VertexRDD[VD3]( + this.withPartitionsRDD[VD3]( partitionsRDD.zipPartitions( other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) { (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f)) @@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag]( val otherPart = otherIter.next() Iterator(thisPart.innerJoin(otherPart)(f)) } - new VertexRDD(newPartitionsRDD) + this.withPartitionsRDD(newPartitionsRDD) } /** @@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag]( case other: VertexRDD[_] => innerZipJoin(other)(f) case _ => - new VertexRDD( + this.withPartitionsRDD( partitionsRDD.zipPartitions( other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) { (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f)) @@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag]( val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc)) } - new VertexRDD[VD2](parts) + this.withPartitionsRDD[VD2](parts) } /** @@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag]( if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty partIter.map(_.withRoutingTable(routingTable)) } - new VertexRDD(vertexPartitions) + this.withPartitionsRDD(vertexPartitions) + } + + /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */ + private[graphx] def withPartitionsRDD[VD2: ClassTag]( + partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = { + new VertexRDD(partitionsRDD, this.targetStorageLevel) + } + + /** + * Changes the target storage level while preserving all other properties of the + * VertexRDD. Operations on the returned VertexRDD will preserve this storage level. + * + * This does not actually trigger a cache; to do this, call + * [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD. + */ + private[graphx] def withTargetStorageLevel( + targetStorageLevel: StorageLevel): VertexRDD[VD] = { + new VertexRDD(this.partitionsRDD, targetStorageLevel) } /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */ diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 1649b244d2881..15ea05cbe281d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -61,7 +61,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( this } - override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY) + override def cache(): Graph[VD, ED] = { + vertices.cache() + replicatedVertexView.edges.cache() + this + } override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) @@ -70,10 +74,14 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { - val numPartitions = replicatedVertexView.edges.partitions.size + partitionBy(partitionStrategy, edges.partitions.size) + } + + override def partitionBy( + partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = { val edTag = classTag[ED] val vdTag = classTag[VD] - val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e => + val newEdges = edges.withPartitionsRDD(edges.map { e => val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) // Should we be using 3-tuple or an optimized class @@ -256,24 +264,33 @@ object GraphImpl { /** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */ def apply[VD: ClassTag, ED: ClassTag]( edges: RDD[Edge[ED]], - defaultVertexAttr: VD): GraphImpl[VD, ED] = { - fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr) + defaultVertexAttr: VD, + edgeStorageLevel: StorageLevel, + vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { + fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel) } /** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */ def fromEdgePartitions[VD: ClassTag, ED: ClassTag]( edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])], - defaultVertexAttr: VD): GraphImpl[VD, ED] = { - fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr) + defaultVertexAttr: VD, + edgeStorageLevel: StorageLevel, + vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { + fromEdgeRDD(EdgeRDD.fromEdgePartitions(edgePartitions), defaultVertexAttr, edgeStorageLevel, + vertexStorageLevel) } /** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */ def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD): GraphImpl[VD, ED] = { - val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache() + defaultVertexAttr: VD, + edgeStorageLevel: StorageLevel, + vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { + val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD]) + .withTargetStorageLevel(edgeStorageLevel).cache() val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr) + .withTargetStorageLevel(vertexStorageLevel).cache() GraphImpl(vertexRDD, edgeRDD) } @@ -309,23 +326,13 @@ object GraphImpl { */ private def fromEdgeRDD[VD: ClassTag, ED: ClassTag]( edges: EdgeRDD[ED, VD], - defaultVertexAttr: VD): GraphImpl[VD, ED] = { - edges.cache() - val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr) - fromExistingRDDs(vertices, edges) - } - - /** Create an EdgeRDD from a set of edges. */ - private def createEdgeRDD[ED: ClassTag, VD: ClassTag]( - edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = { - val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => - val builder = new EdgePartitionBuilder[ED, VD] - iter.foreach { e => - builder.add(e.srcId, e.dstId, e.attr) - } - Iterator((pid, builder.toEdgePartition)) - } - new EdgeRDD(edgePartitions) + defaultVertexAttr: VD, + edgeStorageLevel: StorageLevel, + vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { + val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache() + val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr) + .withTargetStorageLevel(vertexStorageLevel) + fromExistingRDDs(vertices, edgesCached) } } // end of object GraphImpl diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala index 3a0bba1b93b41..86b366eb9202b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala @@ -69,7 +69,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format( includeSrc, includeDst, shipSrc, shipDst)) .partitionBy(edges.partitioner.get) - val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { + val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { (ePartIter, shippedVertsIter) => ePartIter.map { case (pid, edgePartition) => (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) @@ -91,7 +91,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( .setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)") .partitionBy(edges.partitioner.get) - val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) { + val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedActives) { (ePartIter, shippedActivesIter) => ePartIter.map { case (pid, edgePartition) => (pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator))) @@ -111,7 +111,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( hasSrcId, hasDstId)) .partitionBy(edges.partitioner.get) - val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { + val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { (ePartIter, shippedVertsIter) => ePartIter.map { case (pid, edgePartition) => (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index 069e042ed94a3..c1513a00453cf 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -17,7 +17,9 @@ package org.apache.spark.graphx.lib +import scala.collection.mutable import org.apache.spark._ +import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.graphx.PartitionStrategy._ @@ -28,18 +30,20 @@ object Analytics extends Logging { def main(args: Array[String]): Unit = { if (args.length < 2) { - System.err.println("Usage: Analytics [other options]") + System.err.println( + "Usage: Analytics --numEPart= [other options]") System.exit(1) } val taskType = args(0) val fname = args(1) - val options = args.drop(2).map { arg => + val optionsList = args.drop(2).map { arg => arg.dropWhile(_ == '-').split('=') match { case Array(opt, v) => (opt -> v) case _ => throw new IllegalArgumentException("Invalid argument: " + arg) } } + val options = mutable.Map(optionsList: _*) def pickPartitioner(v: String): PartitionStrategy = { // TODO: Use reflection rather than listing all the partitioning strategies here. @@ -57,20 +61,24 @@ object Analytics extends Logging { .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") .set("spark.locality.wait", "100000") + val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { + println("Set the number of edge partitions using --numEPart.") + sys.exit(1) + } + val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") + .map(pickPartitioner(_)) + val edgeStorageLevel = options.remove("edgeStorageLevel") + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + val vertexStorageLevel = options.remove("vertexStorageLevel") + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + taskType match { case "pagerank" => - var tol: Float = 0.001F - var outFname = "" - var numEPart = 4 - var partitionStrategy: Option[PartitionStrategy] = None - var numIterOpt: Option[Int] = None - - options.foreach{ - case ("tol", v) => tol = v.toFloat - case ("output", v) => outFname = v - case ("numEPart", v) => numEPart = v.toInt - case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) - case ("numIter", v) => numIterOpt = Some(v.toInt) + val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F) + val outFname = options.remove("output").getOrElse("") + val numIterOpt = options.remove("numIter").map(_.toInt) + + options.foreach { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } @@ -81,7 +89,9 @@ object Analytics extends Logging { val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart).cache() + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) println("GRAPHX: Number of vertices " + graph.vertices.count) @@ -102,32 +112,19 @@ object Analytics extends Logging { sc.stop() case "cc" => - var numIter = Int.MaxValue - var numVPart = 4 - var numEPart = 4 - var isDynamic = false - var partitionStrategy: Option[PartitionStrategy] = None - - options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean - case ("numEPart", v) => numEPart = v.toInt - case ("numVPart", v) => numVPart = v.toInt - case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + options.foreach { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } - if (!isDynamic && numIter == Int.MaxValue) { - println("Set number of iterations!") - sys.exit(1) - } println("======================================") println("| Connected Components |") println("======================================") val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart).cache() + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) val cc = ConnectedComponents.run(graph) @@ -135,24 +132,25 @@ object Analytics extends Logging { sc.stop() case "triangles" => - var numEPart = 4 - // TriangleCount requires the graph to be partitioned - var partitionStrategy: PartitionStrategy = RandomVertexCut - - options.foreach{ - case ("numEPart", v) => numEPart = v.toInt - case ("partStrategy", v) => partitionStrategy = pickPartitioner(v) + options.foreach { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } + println("======================================") println("| Triangle Count |") println("======================================") + val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) - val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, - minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() + val graph = GraphLoader.edgeListFile(sc, fname, + canonicalOrientation = true, + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel) + // TriangleCount requires the graph to be partitioned + .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache() val triangles = TriangleCount.run(graph) println("Triangles: " + triangles.vertices.map { - case (vid,data) => data.toLong + case (vid, data) => data.toLong }.reduce(_ + _) / 3) sc.stop() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index a3c8de3f9068f..635514f09ece0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -38,19 +38,42 @@ object GraphGenerators { val RMATa = 0.45 val RMATb = 0.15 val RMATd = 0.25 + /** * Generate a graph whose vertex out degree is log normal. + * + * The default values for mu and sigma are taken from the Pregel paper: + * + * Grzegorz Malewicz, Matthew H. Austern, Aart J.C Bik, James C. Dehnert, + * Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010. + * Pregel: a system for large-scale graph processing. SIGMOD '10. + * + * @param sc + * @param numVertices + * @param mu + * @param sigma + * @return */ - def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = { - // based on Pregel settings - val mu = 4 - val sigma = 1.3 - - val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{ - src => (src, sampleLogNormal(mu, sigma, numVertices)) + def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int, + mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = { + val vertices = sc.parallelize(0 until numVertices, numEParts).map { src => + // Initialize the random number generator with the source vertex id + val rand = new Random(src) + val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong) + (src.toLong, degree) } - val edges = vertices.flatMap { v => - generateRandomEdges(v._1.toInt, v._2, numVertices) + val edges = vertices.flatMap { case (src, degree) => + new Iterator[Edge[Int]] { + // Initialize the random number generator with the source vertex id + val rand = new Random(src) + var i = 0 + override def hasNext(): Boolean = { i < degree } + override def next(): Edge[Int] = { + val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i) + i += 1 + nextEdge + } + } } Graph(vertices, edges, 0) } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala index 51f02f94e00d5..47594a800a3b1 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala @@ -38,8 +38,6 @@ trait LocalSparkContext { f(sc) } finally { sc.stop() - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 07dfadf2f7869..00d0b18c27a8d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -416,7 +416,7 @@ class RowMatrix( mat } - /** Updates or verfires the number of rows. */ + /** Updates or verifies the number of rows. */ private def updateNumRows(m: Long) { if (nRows <= 0) { nRows == m diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index cfc3b6860649a..d743bd7dd1825 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -201,6 +201,10 @@ class ALS private ( val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner) val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner) + userInLinks.setName("userInLinks") + userOutLinks.setName("userOutLinks") + productInLinks.setName("productInLinks") + productOutLinks.setName("productOutLinks") // Initialize user and product factors randomly, but use a deterministic seed for each // partition so that fault recovery works @@ -225,14 +229,14 @@ class ALS private ( // perform ALS update logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) // Persist users because it will be called twice. - users.persist() + users.setName(s"users-$iter").persist() val YtY = Some(sc.broadcast(computeYtY(users))) val previousProducts = products products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, alpha, YtY) previousProducts.unpersist() logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) - products.persist() + products.setName(s"products-$iter").persist() val XtX = Some(sc.broadcast(computeYtY(products))) val previousUsers = users users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, @@ -245,22 +249,24 @@ class ALS private ( logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, alpha, YtY = None) + products.setName(s"products-$iter") logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, alpha, YtY = None) + users.setName(s"users-$iter") } } // The last `products` will be used twice. One to generate the last `users` and the other to // generate `productsOut`. So we cache it for better performance. - products.persist() + products.setName("products").persist() // Flatten and cache the two final RDDs to un-block them val usersOut = unblockFactors(users, userOutLinks) val productsOut = unblockFactors(products, productOutLinks) - usersOut.persist() - productsOut.persist() + usersOut.setName("usersOut").persist() + productsOut.setName("productsOut").persist() // Materialize usersOut and productsOut. usersOut.count() diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java index d75d3a6b26730..faa675b59cd50 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java @@ -42,7 +42,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List validationData, LogisticRegressionModel model) { diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java index 743a43a139c0c..1c90522a0714a 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java @@ -44,7 +44,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } private static final List POINTS = Arrays.asList( diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java index 667f76a1bd55f..31b9f3e8d438e 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java @@ -41,7 +41,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List validationData, SVMModel model) { diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java index 0c916ca378034..31676e64025d0 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java @@ -44,7 +44,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } @Test diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index b150334deb06c..bf2365f82044c 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -42,7 +42,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } static void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java index f725924a2d971..8950b48888b74 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java @@ -41,7 +41,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List validationData, LassoModel model) { diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java index 6dc6877691036..24c4c20d9af18 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -43,7 +43,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } int validatePrediction(List validationData, LinearRegressionModel model) { diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java index 03714ae7e4d00..7266eec235800 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java @@ -43,7 +43,6 @@ public void setUp() { public void tearDown() { sc.stop(); sc = null; - System.clearProperty("spark.driver.port"); } double predictionError(List validationData, RidgeRegressionModel model) { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala index 212fbe9288f0d..0d4868f3d9e42 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala @@ -34,7 +34,6 @@ trait LocalSparkContext extends BeforeAndAfterAll { self: Suite => if (sc != null) { sc.stop() } - System.clearProperty("spark.driver.port") super.afterAll() } } diff --git a/pom.xml b/pom.xml index fe43a9518b52e..fcd6f66b4414a 100644 --- a/pom.xml +++ b/pom.xml @@ -206,6 +206,17 @@ false + + spring-releases + Spring Release Repository + http://repo.spring.io/libs-release + + true + + + false + + @@ -289,9 +300,9 @@ com.clearspring.analytics stream - 2.5.1 + 2.7.0 - + it.unimi.dsi fastutil diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ecb389de5558f..dd7efceb23c96 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -16,7 +16,6 @@ */ import com.typesafe.tools.mima.core._ -import com.typesafe.tools.mima.core.ProblemFilters._ /** * Additional excludes for checking of Spark's binary compatibility. @@ -35,7 +34,29 @@ object MimaExcludes { val excludes = SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.1") => - Seq() + Seq(MimaBuild.excludeSparkPackage("graphx")) ++ + Seq( + // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values + // for countApproxDistinct* functions, which does not work in Java. We later removed + // them, and use the following to tell Mima to not care about them. + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1") + ) ++ + MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++ + MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++ + MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") case v if v.startsWith("1.0") => Seq( MimaBuild.excludeSparkPackage("api.java"), @@ -58,4 +79,3 @@ object MimaExcludes { case _ => Seq() } } - diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 64c9441d8e3f8..efb0b9319be13 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -212,6 +212,7 @@ object SparkBuild extends Build { "JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/", "MQTT Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/", "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/", + "Pivotal Repository" at "http://repo.spring.io/libs-release/", // For Sonatype publishing // "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", // "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/", @@ -361,7 +362,7 @@ object SparkBuild extends Build { "com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm), "com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm), "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), - "com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil), + "com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil), // Only HyperLogLogPlus is used, which does not depend on fastutil. "org.spark-project" % "pyrolite" % "2.0.1", "net.sf.py4j" % "py4j" % "0.8.1" ), diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ad553a5024318..062bec2381a8f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -271,6 +271,20 @@ def parallelize(self, c, numSlices=None): jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) return RDD(jrdd, self, serializer) + def pickleFile(self, name, minPartitions=None): + """ + Load an RDD previously saved using L{RDD.saveAsPickleFile} method. + + >>> tmpFile = NamedTemporaryFile(delete=True) + >>> tmpFile.close() + >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5) + >>> sorted(sc.pickleFile(tmpFile.name, 3).collect()) + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + """ + minPartitions = minPartitions or self.defaultMinPartitions + return RDD(self._jsc.objectFile(name, minPartitions), self, + BatchedSerializer(PickleSerializer())) + def textFile(self, name, minPartitions=None): """ Read a text file from HDFS, a local file system (available on all diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f3b1f1a665e5a..ca0a95578fd28 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -33,7 +33,8 @@ from random import Random from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ - BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long + BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ + PickleSerializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -427,11 +428,14 @@ def intersection(self, other): .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ .keys() - def _reserialize(self): - if self._jrdd_deserializer == self.ctx.serializer: + def _reserialize(self, serializer=None): + serializer = serializer or self.ctx.serializer + if self._jrdd_deserializer == serializer: return self else: - return self.map(lambda x: x, preservesPartitioning=True) + converted = self.map(lambda x: x, preservesPartitioning=True) + converted._jrdd_deserializer = serializer + return converted def __add__(self, other): """ @@ -897,6 +901,20 @@ def first(self): """ return self.take(1)[0] + def saveAsPickleFile(self, path, batchSize=10): + """ + Save this RDD as a SequenceFile of serialized objects. The serializer used is + L{pyspark.serializers.PickleSerializer}, default batch size is 10. + + >>> tmpFile = NamedTemporaryFile(delete=True) + >>> tmpFile.close() + >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) + >>> sorted(sc.pickleFile(tmpFile.name, 5).collect()) + [1, 2, 'rdd', 'spark'] + """ + self._reserialize(BatchedSerializer(PickleSerializer(), + batchSize))._jrdd.saveAsObjectFile(path) + def saveAsTextFile(self, path): """ Save this RDD as a text file, using string representations of elements. @@ -1062,7 +1080,7 @@ def rightOuterJoin(self, other, numPartitions=None): return python_right_outer_join(self, other, numPartitions) # TODO: add option to control map-side combining - def partitionBy(self, numPartitions, partitionFunc=hash): + def partitionBy(self, numPartitions, partitionFunc=None): """ Return a copy of the RDD partitioned using the specified partitioner. @@ -1073,6 +1091,9 @@ def partitionBy(self, numPartitions, partitionFunc=hash): """ if numPartitions is None: numPartitions = self.ctx.defaultParallelism + + if partitionFunc is None: + partitionFunc = lambda x: 0 if x is None else hash(x) # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. @@ -1418,10 +1439,9 @@ def _jrdd(self): if self._jrdd_val: return self._jrdd_val if self._bypass_serializer: - serializer = NoOpSerializer() - else: - serializer = self.ctx.serializer - command = (self.func, self._prev_jrdd_deserializer, serializer) + self._jrdd_deserializer = NoOpSerializer() + command = (self.func, self._prev_jrdd_deserializer, + self._jrdd_deserializer) pickled_command = CloudPickleSerializer().dumps(command) broadcast_vars = ListConverter().convert( [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 01929bbe7372f..557fbb43f3261 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -57,10 +57,6 @@ def setUp(self): def tearDown(self): self.sc.stop() sys.path = self._old_sys_path - # To avoid Akka rebinding to the same port, since it doesn't unbind - # immediately on shutdown - self.sc._jvm.System.clearProperty("spark.driver.port") - class TestCheckpoint(PySparkTestCase): diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 95460aa205331..98cdfd0054713 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -51,8 +51,6 @@ class ReplSuite extends FunSuite { if (interp.sparkContext != null) { interp.sparkContext.stop() } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") return out.toString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index fa7d010459c63..041e813598d1b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -58,7 +58,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { // By clearing the port we force Spark to pick a new one. This allows us to rerun tests // without restarting the JVM. - System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") override lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index b3ed302db6a38..98e17ff92e205 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -187,7 +187,6 @@ object MasterFailureTest extends Logging { setupCalled = true // Setup the streaming computation with the given operation - System.clearProperty("spark.driver.port") val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 849bbf1299182..6e1f01900071b 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -27,7 +27,6 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { - System.clearProperty("spark.driver.port"); System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint"); @@ -37,8 +36,5 @@ public void setUp() { public void tearDown() { ssc.stop(); ssc = null; - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index d20a7b728c741..10ad3c9e1adc9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -370,7 +370,6 @@ class CheckpointSuite extends TestSuiteBase { "\n-------------------------------------------\n" ) ssc = new StreamingContext(checkpointDir) - System.clearProperty("spark.driver.port") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 8036f77c973ae..cc178fba12c9d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -153,8 +153,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Default after function for any streaming test suite. Override this // if you want to add your stuff to "after" (i.e., don't call after { } ) def afterFunction() { - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") System.clearProperty("spark.streaming.clock") }