From 5a8f64f33632fbf89d16cade2e0e66c5ed60760b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Oct 2014 00:49:11 -0700 Subject: [PATCH 01/22] [SPARK-3958] TorrentBroadcast cleanup / debugging improvements. This PR makes several changes to TorrentBroadcast in order to make it easier to reason about, which should help when debugging SPARK-3958. The key changes: - Remove all state from the global TorrentBroadcast object. This state consisted mainly of configuration options, like the block size and compression codec, and was read by the blockify / unblockify methods. Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block size was always determined by the first SparkConf that TorrentBroadast was initialized with; as a result, unit tests could not properly test TorrentBroadcast with different block sizes. Instead, blockifyObject and unBlockifyObject now accept compression codecs and blockSizes as arguments. These arguments are supplied at the call sites inside of TorrentBroadcast instances. Each TorrentBroadcast instance determines these values from SparkEnv's SparkConf. I was careful to ensure that we do not accidentally serialize CompressionCodec or SparkConf objects as part of the TorrentBroadcast object. - Remove special-case handling of local-mode in TorrentBroadcast. I don't think that broadcast implementations should know about whether we're running in local mode. If we want to optimize the performance of broadcast in local mode, then we should detect this at a higher level and use a dummy LocalBroadcastFactory implementation instead. Removing this code fixes a subtle error condition: in the old local mode code, a failure to find the broadcast in the local BlockManager would lead to an attempt to deblockify zero blocks, which could lead to confusing deserialization or decompression errors when we attempted to decompress an empty byte array. This should never have happened, though: a failure to find the block in local mode is evidence of some other error. The changes here will make it easier to debug those errors if they ever happen. - Add a check that throws an exception when attempting to deblockify an empty array. - Use ScalaCheck to add a test to check that TorrentBroadcast's blockifyObject and unBlockifyObject methods are inverses. - Misc. cleanup and logging improvements. Author: Josh Rosen Closes #2844 from JoshRosen/torrentbroadcast-bugfix and squashes the following commits: 1e8268d [Josh Rosen] Address Reynold's review comments 2a9fdfd [Josh Rosen] Address Reynold's review comments. c3b08f9 [Josh Rosen] Update TorrentBroadcast tests to reflect removal of special local-mode optimizations. 5c22782 [Josh Rosen] Store broadcast variable's value in the driver. 33fc754 [Josh Rosen] Change blockify/unblockifyObject to accept serializer as argument. 618a872 [Josh Rosen] [SPARK-3958] TorrentBroadcast cleanup / debugging improvements. --- .../spark/broadcast/TorrentBroadcast.scala | 136 +++++++++--------- .../broadcast/TorrentBroadcastFactory.scala | 11 +- .../spark/broadcast/BroadcastSuite.scala | 42 ++++-- 3 files changed, 97 insertions(+), 92 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 42d58682a1e23..99af2e9608ea7 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -26,6 +26,7 @@ import scala.util.Random import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec +import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BroadcastBlockId, StorageLevel} import org.apache.spark.util.ByteBufferInputStream import org.apache.spark.util.io.ByteArrayChunkOutputStream @@ -46,14 +47,12 @@ import org.apache.spark.util.io.ByteArrayChunkOutputStream * This prevents the driver from being the bottleneck in sending out multiple copies of the * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]]. * + * When initialized, TorrentBroadcast objects read SparkEnv.get.conf. + * * @param obj object to broadcast - * @param isLocal whether Spark is running in local mode (single JVM process). * @param id A unique identifier for the broadcast variable. */ -private[spark] class TorrentBroadcast[T: ClassTag]( - obj : T, - @transient private val isLocal: Boolean, - id: Long) +private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) extends Broadcast[T](id) with Logging with Serializable { /** @@ -62,6 +61,20 @@ private[spark] class TorrentBroadcast[T: ClassTag]( * blocks from the driver and/or other executors. */ @transient private var _value: T = obj + /** The compression codec to use, or None if compression is disabled */ + @transient private var compressionCodec: Option[CompressionCodec] = _ + /** Size of each block. Default value is 4MB. This value is only read by the broadcaster. */ + @transient private var blockSize: Int = _ + + private def setConf(conf: SparkConf) { + compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) { + Some(CompressionCodec.createCodec(conf)) + } else { + None + } + blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024 + } + setConf(SparkEnv.get.conf) private val broadcastId = BroadcastBlockId(id) @@ -76,23 +89,20 @@ private[spark] class TorrentBroadcast[T: ClassTag]( * @return number of blocks this broadcast variable is divided into */ private def writeBlocks(): Int = { - // For local mode, just put the object in the BlockManager so we can find it later. - SparkEnv.get.blockManager.putSingle( - broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false) - - if (!isLocal) { - val blocks = TorrentBroadcast.blockifyObject(_value) - blocks.zipWithIndex.foreach { case (block, i) => - SparkEnv.get.blockManager.putBytes( - BroadcastBlockId(id, "piece" + i), - block, - StorageLevel.MEMORY_AND_DISK_SER, - tellMaster = true) - } - blocks.length - } else { - 0 + // Store a copy of the broadcast variable in the driver so that tasks run on the driver + // do not create a duplicate copy of the broadcast variable's value. + SparkEnv.get.blockManager.putSingle(broadcastId, _value, StorageLevel.MEMORY_AND_DISK, + tellMaster = false) + val blocks = + TorrentBroadcast.blockifyObject(_value, blockSize, SparkEnv.get.serializer, compressionCodec) + blocks.zipWithIndex.foreach { case (block, i) => + SparkEnv.get.blockManager.putBytes( + BroadcastBlockId(id, "piece" + i), + block, + StorageLevel.MEMORY_AND_DISK_SER, + tellMaster = true) } + blocks.length } /** Fetch torrent blocks from the driver and/or other executors. */ @@ -104,29 +114,24 @@ private[spark] class TorrentBroadcast[T: ClassTag]( for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { val pieceId = BroadcastBlockId(id, "piece" + pid) - - // First try getLocalBytes because there is a chance that previous attempts to fetch the + logDebug(s"Reading piece $pieceId of $broadcastId") + // First try getLocalBytes because there is a chance that previous attempts to fetch the // broadcast blocks have already fetched some of the blocks. In that case, some blocks // would be available locally (on this executor). - var blockOpt = bm.getLocalBytes(pieceId) - if (!blockOpt.isDefined) { - blockOpt = bm.getRemoteBytes(pieceId) - blockOpt match { - case Some(block) => - // If we found the block from remote executors/driver's BlockManager, put the block - // in this executor's BlockManager. - SparkEnv.get.blockManager.putBytes( - pieceId, - block, - StorageLevel.MEMORY_AND_DISK_SER, - tellMaster = true) - - case None => - throw new SparkException("Failed to get " + pieceId + " of " + broadcastId) - } + def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId) + def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block => + // If we found the block from remote executors/driver's BlockManager, put the block + // in this executor's BlockManager. + SparkEnv.get.blockManager.putBytes( + pieceId, + block, + StorageLevel.MEMORY_AND_DISK_SER, + tellMaster = true) + block } - // If we get here, the option is defined. - blocks(pid) = blockOpt.get + val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse( + throw new SparkException(s"Failed to get $pieceId of $broadcastId")) + blocks(pid) = block } blocks } @@ -156,6 +161,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( private def readObject(in: ObjectInputStream) { in.defaultReadObject() TorrentBroadcast.synchronized { + setConf(SparkEnv.get.conf) SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match { case Some(x) => _value = x.asInstanceOf[T] @@ -167,7 +173,8 @@ private[spark] class TorrentBroadcast[T: ClassTag]( val time = (System.nanoTime() - start) / 1e9 logInfo("Reading broadcast variable " + id + " took " + time + " s") - _value = TorrentBroadcast.unBlockifyObject[T](blocks) + _value = + TorrentBroadcast.unBlockifyObject[T](blocks, SparkEnv.get.serializer, compressionCodec) // Store the merged copy in BlockManager so other tasks on this executor don't // need to re-fetch it. SparkEnv.get.blockManager.putSingle( @@ -179,43 +186,29 @@ private[spark] class TorrentBroadcast[T: ClassTag]( private object TorrentBroadcast extends Logging { - /** Size of each block. Default value is 4MB. */ - private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024 - private var initialized = false - private var conf: SparkConf = null - private var compress: Boolean = false - private var compressionCodec: CompressionCodec = null - - def initialize(_isDriver: Boolean, conf: SparkConf) { - TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests - synchronized { - if (!initialized) { - compress = conf.getBoolean("spark.broadcast.compress", true) - compressionCodec = CompressionCodec.createCodec(conf) - initialized = true - } - } - } - def stop() { - initialized = false - } - - def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = { - val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE) - val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos - val ser = SparkEnv.get.serializer.newInstance() + def blockifyObject[T: ClassTag]( + obj: T, + blockSize: Int, + serializer: Serializer, + compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = { + val bos = new ByteArrayChunkOutputStream(blockSize) + val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos) + val ser = serializer.newInstance() val serOut = ser.serializeStream(out) serOut.writeObject[T](obj).close() bos.toArrays.map(ByteBuffer.wrap) } - def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer]): T = { + def unBlockifyObject[T: ClassTag]( + blocks: Array[ByteBuffer], + serializer: Serializer, + compressionCodec: Option[CompressionCodec]): T = { + require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks") val is = new SequenceInputStream( asJavaEnumeration(blocks.iterator.map(block => new ByteBufferInputStream(block)))) - val in: InputStream = if (compress) compressionCodec.compressedInputStream(is) else is - - val ser = SparkEnv.get.serializer.newInstance() + val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is) + val ser = serializer.newInstance() val serIn = ser.deserializeStream(in) val obj = serIn.readObject[T]() serIn.close() @@ -227,6 +220,7 @@ private object TorrentBroadcast extends Logging { * If removeFromDriver is true, also remove these persisted blocks on the driver. */ def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = { + logDebug(s"Unpersisting TorrentBroadcast $id") SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking) } } diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala index ad0f701d7a98f..fb024c12094f2 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala @@ -28,14 +28,13 @@ import org.apache.spark.{SecurityManager, SparkConf} */ class TorrentBroadcastFactory extends BroadcastFactory { - override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { - TorrentBroadcast.initialize(isDriver, conf) - } + override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { } - override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) = - new TorrentBroadcast[T](value_, isLocal, id) + override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) = { + new TorrentBroadcast[T](value_, id) + } - override def stop() { TorrentBroadcast.stop() } + override def stop() { } /** * Remove all persisted state associated with the torrent broadcast with the given ID. diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index acaf321de52fb..e096c8c3e9b46 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -17,12 +17,15 @@ package org.apache.spark.broadcast +import scala.util.Random + import org.scalatest.FunSuite import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException} +import org.apache.spark.io.SnappyCompressionCodec +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage._ - class BroadcastSuite extends FunSuite with LocalSparkContext { private val httpConf = broadcastConf("HttpBroadcastFactory") @@ -84,6 +87,24 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet) } + test("TorrentBroadcast's blockifyObject and unblockifyObject are inverses") { + import org.apache.spark.broadcast.TorrentBroadcast._ + val blockSize = 1024 + val conf = new SparkConf() + val compressionCodec = Some(new SnappyCompressionCodec(conf)) + val serializer = new JavaSerializer(conf) + val seed = 42 + val rand = new Random(seed) + for (trial <- 1 to 100) { + val size = 1 + rand.nextInt(1024 * 10) + val data: Array[Byte] = new Array[Byte](size) + rand.nextBytes(data) + val blocks = blockifyObject(data, blockSize, serializer, compressionCodec) + val unblockified = unBlockifyObject[Array[Byte]](blocks, serializer, compressionCodec) + assert(unblockified === data) + } + } + test("Unpersisting HttpBroadcast on executors only in local mode") { testUnpersistHttpBroadcast(distributed = false, removeFromDriver = false) } @@ -193,26 +214,17 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { blockId = BroadcastBlockId(broadcastId, "piece0") statuses = bmm.getBlockStatus(blockId, askSlaves = true) - assert(statuses.size === (if (distributed) 1 else 0)) + assert(statuses.size === 1) } // Verify that blocks are persisted in both the executors and the driver def afterUsingBroadcast(broadcastId: Long, bmm: BlockManagerMaster) { var blockId = BroadcastBlockId(broadcastId) - var statuses = bmm.getBlockStatus(blockId, askSlaves = true) - if (distributed) { - assert(statuses.size === numSlaves + 1) - } else { - assert(statuses.size === 1) - } + val statuses = bmm.getBlockStatus(blockId, askSlaves = true) + assert(statuses.size === numSlaves + 1) blockId = BroadcastBlockId(broadcastId, "piece0") - statuses = bmm.getBlockStatus(blockId, askSlaves = true) - if (distributed) { - assert(statuses.size === numSlaves + 1) - } else { - assert(statuses.size === 0) - } + assert(statuses.size === numSlaves + 1) } // Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver @@ -224,7 +236,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { assert(statuses.size === expectedNumBlocks) blockId = BroadcastBlockId(broadcastId, "piece0") - expectedNumBlocks = if (removeFromDriver || !distributed) 0 else 1 + expectedNumBlocks = if (removeFromDriver) 0 else 1 statuses = bmm.getBlockStatus(blockId, askSlaves = true) assert(statuses.size === expectedNumBlocks) } From 85708168341a9406c451df20af3374c0850ce166 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 21 Oct 2014 09:29:45 -0700 Subject: [PATCH 02/22] [SPARK-4023] [MLlib] [PySpark] convert rdd into RDD of Vector Convert the input rdd to RDD of Vector. cc mengxr Author: Davies Liu Closes #2870 from davies/fix4023 and squashes the following commits: 1eac767 [Davies Liu] address comments 0871576 [Davies Liu] convert rdd into RDD of Vector --- python/pyspark/mllib/stat.py | 9 +++++---- python/pyspark/mllib/tests.py | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py index a6019dadf781c..84baf12b906df 100644 --- a/python/pyspark/mllib/stat.py +++ b/python/pyspark/mllib/stat.py @@ -22,7 +22,7 @@ from functools import wraps from pyspark import PickleSerializer -from pyspark.mllib.linalg import _to_java_object_rdd +from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd __all__ = ['MultivariateStatisticalSummary', 'Statistics'] @@ -107,7 +107,7 @@ def colStats(rdd): array([ 2., 0., 0., -2.]) """ sc = rdd.ctx - jrdd = _to_java_object_rdd(rdd) + jrdd = _to_java_object_rdd(rdd.map(_convert_to_vector)) cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd) return MultivariateStatisticalSummary(sc, cStats) @@ -163,14 +163,15 @@ def corr(x, y=None, method=None): if type(y) == str: raise TypeError("Use 'method=' to specify method name.") - jx = _to_java_object_rdd(x) if not y: + jx = _to_java_object_rdd(x.map(_convert_to_vector)) resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method) bytes = sc._jvm.SerDe.dumps(resultMat) ser = PickleSerializer() return ser.loads(str(bytes)).toArray() else: - jy = _to_java_object_rdd(y) + jx = _to_java_object_rdd(x.map(float)) + jy = _to_java_object_rdd(y.map(float)) return sc._jvm.PythonMLLibAPI().corr(jx, jy, method) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 463faf7b6f520..d6fb87b378b4a 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -36,6 +36,8 @@ from pyspark.serializers import PickleSerializer from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, _convert_to_vector from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.random import RandomRDDs +from pyspark.mllib.stat import Statistics from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase @@ -202,6 +204,23 @@ def test_regression(self): self.assertTrue(dt_model.predict(features[3]) > 0) +class StatTests(PySparkTestCase): + # SPARK-4023 + def test_col_with_different_rdds(self): + # numpy + data = RandomRDDs.normalVectorRDD(self.sc, 1000, 10, 10) + summary = Statistics.colStats(data) + self.assertEqual(1000, summary.count()) + # array + data = self.sc.parallelize([range(10)] * 10) + summary = Statistics.colStats(data) + self.assertEqual(10, summary.count()) + # array + data = self.sc.parallelize([pyarray.array("d", range(10))] * 10) + summary = Statistics.colStats(data) + self.assertEqual(10, summary.count()) + + @unittest.skipIf(not _have_scipy, "SciPy not installed") class SciPyTests(PySparkTestCase): From 2aeb84bc79b643912d26e08ec2d87e444027fefc Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Oct 2014 09:37:17 -0700 Subject: [PATCH 03/22] replace awaitTransformation with awaitTermination in scaladoc/javadoc Author: Holden Karau Closes #2861 from holdenk/SPARK-4015-Documentation-in-the-streaming-context-references-non-existent-function and squashes the following commits: 081db8a [Holden Karau] fix pyspark streaming doc too 0e03863 [Holden Karau] replace awaitTransformation with awaitTermination --- python/pyspark/streaming/context.py | 2 +- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 +- .../apache/spark/streaming/api/java/JavaStreamingContext.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index dc9dc41121935..2f53fbd27b17a 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -79,7 +79,7 @@ class StreamingContext(object): L{DStream} various input sources. It can be from an existing L{SparkContext}. After creating and transforming DStreams, the streaming computation can be started and stopped using `context.start()` and `context.stop()`, - respectively. `context.awaitTransformation()` allows the current thread + respectively. `context.awaitTermination()` allows the current thread to wait for the termination of the context by `stop()` or by an exception. """ _transformerSerializer = None diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 5a8eef1372e23..23d6d1c5e50fa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -47,7 +47,7 @@ import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab * The associated SparkContext can be accessed using `context.sparkContext`. After * creating and transforming DStreams, the streaming computation can be started and stopped * using `context.start()` and `context.stop()`, respectively. - * `context.awaitTransformation()` allows the current thread to wait for the termination + * `context.awaitTermination()` allows the current thread to wait for the termination * of the context by `stop()` or by an exception. */ class StreamingContext private[streaming] ( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 9dc26dc6b32a1..7db66c69a6d73 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -46,7 +46,7 @@ import org.apache.spark.streaming.receiver.Receiver * org.apache.spark.api.java.JavaSparkContext (see core Spark documentation) can be accessed * using `context.sparkContext`. After creating and transforming DStreams, the streaming * computation can be started and stopped using `context.start()` and `context.stop()`, - * respectively. `context.awaitTransformation()` allows the current thread to wait for the + * respectively. `context.awaitTermination()` allows the current thread to wait for the * termination of a context by `stop()` or by an exception. */ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { From c262cd5ddce75333ec936e90b81278c3992841ec Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 21 Oct 2014 10:37:13 -0700 Subject: [PATCH 04/22] [SPARK-4035] Fix a wrong format specifier Just found a typo. Should not use "%f" for Long. Author: zsxwing Closes #2875 from zsxwing/SPARK-4035 and squashes the following commits: ce347e2 [zsxwing] Fix a wrong format specifier --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3f5d06e1aeee7..0ce2a3f631b15 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -870,7 +870,7 @@ private[spark] class BlockManager( logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") blockTransferService.uploadBlockSync( peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" + logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %d ms" .format((System.currentTimeMillis - onePeerStartTime))) peersReplicatedTo += peer peersForReplication -= peer From 61ca7742d21dd66f5a7b3bb826e3aaca6f049b68 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 21 Oct 2014 11:22:25 -0700 Subject: [PATCH 05/22] [SPARK-4020] Do not rely on timeouts to remove failed block managers If an executor fails without being scheduled to run any tasks, then `DAGScheduler` won't notify `BlockManagerMasterActor` that the associated block manager should be removed. Instead, the associated block manager will be expired only after a few rounds of heartbeat timeouts. In terms of removal treatment, there should really be no distinction between executors that have been scheduled tasks and those that have not. The fix, then, is to add all known executors to `TaskSchedulerImpl`'s `activeExecutorIds` whether or not it has been scheduled a task. In fact, the existing comment above `activeExecutorIds` is ``` // Which executor IDs we have executors on val activeExecutorIds = new HashSet[String] ``` not "Which executors have been scheduled tasks thus far." Author: Andrew Or Closes #2865 from andrewor14/active-executors and squashes the following commits: ff3172b [Andrew Or] Add all known executors to `activeExecutorIds` --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6d697e3d003f6..2b39c7fc872da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -221,6 +221,7 @@ private[spark] class TaskSchedulerImpl( var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host + activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) @@ -261,7 +262,6 @@ private[spark] class TaskSchedulerImpl( val tid = task.taskId taskIdToTaskSetId(tid) = taskSet.taskSet.id taskIdToExecutorId(tid) = execId - activeExecutorIds += execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) From 1a623b2e163da3a9112cb9b68bda22b9e398ed5c Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Tue, 21 Oct 2014 11:49:39 -0700 Subject: [PATCH 06/22] SPARK-3770: Make userFeatures accessible from python https://issues.apache.org/jira/browse/SPARK-3770 We need access to the underlying latent user features from python. However, the userFeatures RDD from the MatrixFactorizationModel isn't accessible from the python bindings. I've added a method to the underlying scala class to turn the RDD[(Int, Array[Double])] to an RDD[String]. This is then accessed from the python recommendation.py Author: Michelangelo D'Agostino Closes #2636 from mdagost/mf_user_features and squashes the following commits: c98f9e2 [Michelangelo D'Agostino] Added unit tests for userFeatures and productFeatures and merged master. d5eadf8 [Michelangelo D'Agostino] Merge branch 'master' into mf_user_features 2481a2a [Michelangelo D'Agostino] Merged master and resolved conflict. a6ffb96 [Michelangelo D'Agostino] Eliminated a function from our first approach to this problem that is no longer needed now that we added the fromTuple2RDD function. 2aa1bf8 [Michelangelo D'Agostino] Implemented a function called fromTuple2RDD in PythonMLLibAPI and used it to expose the MF userFeatures and productFeatures in python. 34cb2a2 [Michelangelo D'Agostino] A couple of lint cleanups and a comment. cdd98e3 [Michelangelo D'Agostino] It's working now. e1fbe5e [Michelangelo D'Agostino] Added scala function to stringify userFeatures for access in python. --- .../mllib/api/python/PythonMLLibAPI.scala | 5 +++ python/pyspark/mllib/recommendation.py | 31 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 9a100170b75c6..b478c21537c2a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -673,6 +673,11 @@ private[spark] object SerDe extends Serializable { rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int])) } + /* convert RDD[Tuple2[,]] to RDD[Array[Any]] */ + def fromTuple2RDD(rdd: RDD[Tuple2[Any, Any]]): RDD[Array[Any]] = { + rdd.map(x => Array(x._1, x._2)) + } + /** * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by * PySpark. diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 17f96b8700bd7..22872dbbe3b55 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -53,6 +53,23 @@ class MatrixFactorizationModel(object): >>> model = ALS.train(ratings, 1) >>> model.predictAll(testset).count() == 2 True + + >>> model = ALS.train(ratings, 4) + >>> model.userFeatures().count() == 2 + True + + >>> first_user = model.userFeatures().take(1)[0] + >>> latents = first_user[1] + >>> len(latents) == 4 + True + + >>> model.productFeatures().count() == 2 + True + + >>> first_product = model.productFeatures().take(1)[0] + >>> latents = first_product[1] + >>> len(latents) == 4 + True """ def __init__(self, sc, java_model): @@ -83,6 +100,20 @@ def predictAll(self, user_product): return RDD(sc._jvm.SerDe.javaToPython(jresult), sc, AutoBatchedSerializer(PickleSerializer())) + def userFeatures(self): + sc = self._context + juf = self._java_model.userFeatures() + juf = sc._jvm.SerDe.fromTuple2RDD(juf).toJavaRDD() + return RDD(sc._jvm.PythonRDD.javaToPython(juf), sc, + AutoBatchedSerializer(PickleSerializer())) + + def productFeatures(self): + sc = self._context + jpf = self._java_model.productFeatures() + jpf = sc._jvm.SerDe.fromTuple2RDD(jpf).toJavaRDD() + return RDD(sc._jvm.PythonRDD.javaToPython(jpf), sc, + AutoBatchedSerializer(PickleSerializer())) + class ALS(object): From 5fdaf52a9df21cac69e2a4612aeb4e760e4424e7 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Tue, 21 Oct 2014 13:15:29 -0700 Subject: [PATCH 07/22] [SPARK-3994] Use standard Aggregator code path for countByKey and countByValue See [JIRA](https://issues.apache.org/jira/browse/SPARK-3994) for more information. Also adds a note which warns against using these methods. Author: Aaron Davidson Closes #2839 from aarondav/countByKey and squashes the following commits: d6fdb2a [Aaron Davidson] Respond to comments e1f06d3 [Aaron Davidson] [SPARK-3994] Use standard Aggregator code path for countByKey and countByValue --- .../apache/spark/rdd/PairRDDFunctions.scala | 11 +++++-- .../main/scala/org/apache/spark/rdd/RDD.scala | 31 +++++-------------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index ac96de86dd6d4..da89f634abaea 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -315,8 +315,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) @deprecated("Use reduceByKeyLocally", "1.0.0") def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func) - /** Count the number of elements for each key, and return the result to the master as a Map. */ - def countByKey(): Map[K, Long] = self.map(_._1).countByValue() + /** + * Count the number of elements for each key, collecting the results to a local Map. + * + * Note that this method should only be used if the resulting map is expected to be small, as + * the whole thing is loaded into the driver's memory. + * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which + * returns an RDD[T, Long] instead of a map. + */ + def countByKey(): Map[K, Long] = self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap /** * :: Experimental :: diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 71cabf61d4ee0..b7f125d01dfaf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -927,32 +927,15 @@ abstract class RDD[T: ClassTag]( } /** - * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final - * combine step happens locally on the master, equivalent to running a single reduce task. + * Return the count of each unique value in this RDD as a local map of (value, count) pairs. + * + * Note that this method should only be used if the resulting map is expected to be small, as + * the whole thing is loaded into the driver's memory. + * To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which + * returns an RDD[T, Long] instead of a map. */ def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = { - if (elementClassTag.runtimeClass.isArray) { - throw new SparkException("countByValue() does not support arrays") - } - // TODO: This should perhaps be distributed by default. - val countPartition = (iter: Iterator[T]) => { - val map = new OpenHashMap[T,Long] - iter.foreach { - t => map.changeValue(t, 1L, _ + 1L) - } - Iterator(map) - }: Iterator[OpenHashMap[T,Long]] - val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => { - m2.foreach { case (key, value) => - m1.changeValue(key, value, _ + value) - } - m1 - }: OpenHashMap[T,Long] - val myResult = mapPartitions(countPartition).reduce(mergeMaps) - // Convert to a Scala mutable map - val mutableResult = scala.collection.mutable.Map[T,Long]() - myResult.foreach { case (k, v) => mutableResult.put(k, v) } - mutableResult + map(value => (value, null)).countByKey() } /** From 814a9cd7fabebf2a06f7e2e5d46b6a2b28b917c2 Mon Sep 17 00:00:00 2001 From: coderxiang Date: Tue, 21 Oct 2014 15:45:47 -0700 Subject: [PATCH 08/22] SPARK-3568 [mllib] add ranking metrics Add common metrics for ranking algorithms (http://www-nlp.stanford.edu/IR-book/), including: - Mean Average Precision - Precisionn: top-n precision - Discounted cumulative gain (DCG) and NDCG The following methods and the corresponding tests are implemented: ``` class RankingMetrics[T](predictionAndLabels: RDD[(Array[T], Array[T])]) { /* Returns the precsionk for each query */ lazy val precAtK: RDD[Array[Double]] /** * param k the position to compute the truncated precision * return the average precision at the first k ranking positions */ def precision(k: Int): Double /* Returns the average precision for each query */ lazy val avePrec: RDD[Double] /*Returns the mean average precision (MAP) of all the queries*/ lazy val meanAvePrec: Double /*Returns the normalized discounted cumulative gain for each query */ lazy val ndcgAtK: RDD[Array[Double]] /** * param k the position to compute the truncated ndcg * return the average ndcg at the first k ranking positions */ def ndcg(k: Int): Double } ``` Author: coderxiang Closes #2667 from coderxiang/rankingmetrics and squashes the following commits: d881097 [coderxiang] update doc 14d9cd9 [coderxiang] remove unexpected files d7fb93f [coderxiang] style change and remove ignored files f113ee1 [coderxiang] modify doc for displaying superscript and subscript f626896 [coderxiang] improve doc and remove unnecessary computation while labSet is empty be6645e [coderxiang] set the precision of empty labset to 0.0 d64c120 [coderxiang] add logWarning for empty ground truth set dfae292 [coderxiang] handle empty labSet for map. add test 62047c4 [coderxiang] style change and add documentation f66612d [coderxiang] add additional test of precisionAt b794cb2 [coderxiang] move private members precAtK, ndcgAtK into public methods. style change 77c9e5d [coderxiang] set precAtK and ndcgAtK as private member. Improve documentation 5f87bce [coderxiang] add API to calculate precision and ndcg at each ranking position b7851cc [coderxiang] Use generic type to represent IDs e443fee [coderxiang] change style and use alternative builtin methods 3a5a6ff [coderxiang] add ranking metrics --- .../mllib/evaluation/RankingMetrics.scala | 152 ++++++++++++++++++ .../evaluation/RankingMetricsSuite.scala | 54 +++++++ 2 files changed, 206 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala new file mode 100644 index 0000000000000..93a7353e2c070 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) + extends Logging with Serializable { + + /** + * Compute the average precision of all the queries, truncated at ranking position k. + * + * If for a query, the ranking algorithm returns n (n < k) results, the precision value will be + * computed as #(relevant items retrieved) / k. This formula also applies when the size of the + * ground truth set is less than k. + * + * If a query has an empty ground truth set, zero will be used as precision together with + * a log warning. + * + * See the following paper for detail: + * + * IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen + * + * @param k the position to compute the truncated precision, must be positive + * @return the average precision at the first k ranking positions + */ + def precisionAt(k: Int): Double = { + require(k > 0, "ranking position k should be positive") + predictionAndLabels.map { case (pred, lab) => + val labSet = lab.toSet + + if (labSet.nonEmpty) { + val n = math.min(pred.length, k) + var i = 0 + var cnt = 0 + while (i < n) { + if (labSet.contains(pred(i))) { + cnt += 1 + } + i += 1 + } + cnt.toDouble / k + } else { + logWarning("Empty ground truth set, check input data") + 0.0 + } + }.mean + } + + /** + * Returns the mean average precision (MAP) of all the queries. + * If a query has an empty ground truth set, the average precision will be zero and a log + * warining is generated. + */ + lazy val meanAveragePrecision: Double = { + predictionAndLabels.map { case (pred, lab) => + val labSet = lab.toSet + + if (labSet.nonEmpty) { + var i = 0 + var cnt = 0 + var precSum = 0.0 + val n = pred.length + while (i < n) { + if (labSet.contains(pred(i))) { + cnt += 1 + precSum += cnt.toDouble / (i + 1) + } + i += 1 + } + precSum / labSet.size + } else { + logWarning("Empty ground truth set, check input data") + 0.0 + } + }.mean + } + + /** + * Compute the average NDCG value of all the queries, truncated at ranking position k. + * The discounted cumulative gain at position k is computed as: + * sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1), + * and the NDCG is obtained by dividing the DCG value on the ground truth set. In the current + * implementation, the relevance value is binary. + + * If a query has an empty ground truth set, zero will be used as ndcg together with + * a log warning. + * + * See the following paper for detail: + * + * IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen + * + * @param k the position to compute the truncated ndcg, must be positive + * @return the average ndcg at the first k ranking positions + */ + def ndcgAt(k: Int): Double = { + require(k > 0, "ranking position k should be positive") + predictionAndLabels.map { case (pred, lab) => + val labSet = lab.toSet + + if (labSet.nonEmpty) { + val labSetSize = labSet.size + val n = math.min(math.max(pred.length, labSetSize), k) + var maxDcg = 0.0 + var dcg = 0.0 + var i = 0 + while (i < n) { + val gain = 1.0 / math.log(i + 2) + if (labSet.contains(pred(i))) { + dcg += gain + } + if (i < labSetSize) { + maxDcg += gain + } + i += 1 + } + dcg / maxDcg + } else { + logWarning("Empty ground truth set, check input data") + 0.0 + } + }.mean + } + +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala new file mode 100644 index 0000000000000..a2d4bb41484b8 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.mllib.util.LocalSparkContext + +class RankingMetricsSuite extends FunSuite with LocalSparkContext { + test("Ranking metrics: map, ndcg") { + val predictionAndLabels = sc.parallelize( + Seq( + (Array[Int](1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Array[Int](1, 2, 3, 4, 5)), + (Array[Int](4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array[Int](1, 2, 3)), + (Array[Int](1, 2, 3, 4, 5), Array[Int]()) + ), 2) + val eps: Double = 1E-5 + + val metrics = new RankingMetrics(predictionAndLabels) + val map = metrics.meanAveragePrecision + + assert(metrics.precisionAt(1) ~== 1.0/3 absTol eps) + assert(metrics.precisionAt(2) ~== 1.0/3 absTol eps) + assert(metrics.precisionAt(3) ~== 1.0/3 absTol eps) + assert(metrics.precisionAt(4) ~== 0.75/3 absTol eps) + assert(metrics.precisionAt(5) ~== 0.8/3 absTol eps) + assert(metrics.precisionAt(10) ~== 0.8/3 absTol eps) + assert(metrics.precisionAt(15) ~== 8.0/45 absTol eps) + + assert(map ~== 0.355026 absTol eps) + + assert(metrics.ndcgAt(3) ~== 1.0/3 absTol eps) + assert(metrics.ndcgAt(5) ~== 0.328788 absTol eps) + assert(metrics.ndcgAt(10) ~== 0.487913 absTol eps) + assert(metrics.ndcgAt(15) ~== metrics.ndcgAt(10) absTol eps) + + } +} From 856b081729057f9da31a86e4bfa0dc0013492042 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 21 Oct 2014 16:20:58 -0700 Subject: [PATCH 09/22] [SQL]redundant methods for broadcast redundant methods for broadcast in ```TableReader``` Author: wangfei Closes #2862 from scwf/TableReader and squashes the following commits: 414cc24 [wangfei] unnecessary methods for broadcast --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 0de29d5cffd0e..fd4f65e488259 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -67,10 +67,6 @@ class HadoopTableReader( private val _broadcastedHiveConf = sc.sparkContext.broadcast(new SerializableWritable(hiveExtraConf)) - def broadcastedHiveConf = _broadcastedHiveConf - - def hiveConf = _broadcastedHiveConf.value.value - override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] = makeRDDForTable( hiveTable, From 6bb56faea8d238ea22c2de33db93b1b39f492b3a Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 21 Oct 2014 21:53:09 -0700 Subject: [PATCH 10/22] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy Author: Sandy Ryza Closes #789 from sryza/sandy-spark-1813 and squashes the following commits: 48b05e9 [Sandy Ryza] Simplify b824932 [Sandy Ryza] Allow both spark.kryo.classesToRegister and spark.kryo.registrator at the same time 6a15bb7 [Sandy Ryza] Small fix a2278c0 [Sandy Ryza] Respond to review comments 6ef592e [Sandy Ryza] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy --- .../scala/org/apache/spark/SparkConf.scala | 17 ++++- .../spark/serializer/KryoSerializer.scala | 43 ++++++++----- .../java/org/apache/spark/JavaAPISuite.java | 12 ++++ .../org/apache/spark/SparkConfSuite.scala | 62 +++++++++++++++++++ .../serializer/KryoSerializerSuite.scala | 6 +- docs/configuration.md | 15 ++++- docs/tuning.md | 17 +---- .../spark/examples/bagel/PageRankUtils.scala | 17 ----- .../examples/bagel/WikipediaPageRank.scala | 3 +- .../spark/examples/graphx/Analytics.scala | 6 +- .../examples/graphx/SynthBenchmark.scala | 5 +- .../spark/examples/mllib/MovieLensALS.scala | 12 +--- .../spark/graphx/GraphKryoRegistrator.scala | 2 +- .../org/apache/spark/graphx/GraphXUtils.scala | 47 ++++++++++++++ .../spark/graphx/LocalSparkContext.scala | 6 +- .../graphx/impl/EdgePartitionSuite.scala | 6 +- .../graphx/impl/VertexPartitionSuite.scala | 6 +- 17 files changed, 195 insertions(+), 87 deletions(-) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 605df0e929faa..dbbcc23305c50 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -18,7 +18,8 @@ package org.apache.spark import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap +import scala.collection.mutable.{HashMap, LinkedHashSet} +import org.apache.spark.serializer.KryoSerializer /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -140,6 +141,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } + /** + * Use Kryo serialization and register the given set of classes with Kryo. + * If called multiple times, this will append the classes from all calls together. + */ + def registerKryoClasses(classes: Array[Class[_]]): SparkConf = { + val allClassNames = new LinkedHashSet[String]() + allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty) + allClassNames ++= classes.map(_.getName) + + set("spark.kryo.classesToRegister", allClassNames.mkString(",")) + set("spark.serializer", classOf[KryoSerializer].getName) + this + } + /** Remove a parameter from the configuration */ def remove(key: String): SparkConf = { settings.remove(key) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index d6386f8c06fff..621a951c27d07 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -53,7 +53,18 @@ class KryoSerializer(conf: SparkConf) private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) - private val registrator = conf.getOption("spark.kryo.registrator") + private val userRegistrator = conf.getOption("spark.kryo.registrator") + private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") + .split(',') + .filter(!_.isEmpty) + .map { className => + try { + Class.forName(className) + } catch { + case e: Exception => + throw new SparkException("Failed to load class to register with Kryo", e) + } + } def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) @@ -80,22 +91,20 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) - // Allow the user to register their own classes by setting spark.kryo.registrator - for (regCls <- registrator) { - logDebug("Running user registrator: " + regCls) - try { - val reg = Class.forName(regCls, true, classLoader).newInstance() - .asInstanceOf[KryoRegistrator] - - // Use the default classloader when calling the user registrator. - Thread.currentThread.setContextClassLoader(classLoader) - reg.registerClasses(kryo) - } catch { - case e: Exception => - throw new SparkException(s"Failed to invoke $regCls", e) - } finally { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } + try { + // Use the default classloader when calling the user registrator. + Thread.currentThread.setContextClassLoader(classLoader) + // Register classes given through spark.kryo.classesToRegister. + classesToRegister.foreach { clazz => kryo.register(clazz) } + // Allow the user to register their own classes by setting spark.kryo.registrator. + userRegistrator + .map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]) + .foreach { reg => reg.registerClasses(kryo) } + } catch { + case e: Exception => + throw new SparkException(s"Failed to register classes with Kryo", e) + } finally { + Thread.currentThread.setContextClassLoader(oldClassLoader) } // Register Chill's classes; we do this after our ranges and the user's own classes to let diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 3190148fb5f43..814e40c4f77cc 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1418,4 +1418,16 @@ public Optional call(Integer i) { } } + static class Class1 {} + static class Class2 {} + + @Test + public void testRegisterKryoClasses() { + SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[]{ Class1.class, Class2.class }); + Assert.assertEquals( + Class1.class.getName() + "," + Class2.class.getName(), + conf.get("spark.kryo.classesToRegister")); + } + } diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 87e9012622456..5d018ea9868a7 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark import org.scalatest.FunSuite +import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} +import com.esotericsoftware.kryo.Kryo class SparkConfSuite extends FunSuite with LocalSparkContext { test("loading from system properties") { @@ -133,4 +135,64 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { System.clearProperty("spark.test.a.b.c") } } + + test("register kryo classes through registerKryoClasses") { + val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + + conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2])) + assert(conf.get("spark.kryo.classesToRegister") === + classOf[Class1].getName + "," + classOf[Class2].getName) + + conf.registerKryoClasses(Array(classOf[Class3])) + assert(conf.get("spark.kryo.classesToRegister") === + classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) + + conf.registerKryoClasses(Array(classOf[Class2])) + assert(conf.get("spark.kryo.classesToRegister") === + classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) + + // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't + // blow up. + val serializer = new KryoSerializer(conf) + serializer.newInstance().serialize(new Class1()) + serializer.newInstance().serialize(new Class2()) + serializer.newInstance().serialize(new Class3()) + } + + test("register kryo classes through registerKryoClasses and custom registrator") { + val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + + conf.registerKryoClasses(Array(classOf[Class1])) + assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName) + + conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName) + + // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't + // blow up. + val serializer = new KryoSerializer(conf) + serializer.newInstance().serialize(new Class1()) + serializer.newInstance().serialize(new Class2()) + } + + test("register kryo classes through conf") { + val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer") + conf.set("spark.serializer", classOf[KryoSerializer].getName) + + // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't + // blow up. + val serializer = new KryoSerializer(conf) + serializer.newInstance().serialize(new StringBuffer()) + } + +} + +class Class1 {} +class Class2 {} +class Class3 {} + +class CustomRegistrator extends KryoRegistrator { + def registerClasses(kryo: Kryo) { + kryo.register(classOf[Class2]) + } } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index e1e35b688d581..64ac6d2d920d2 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -210,13 +210,13 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("kryo with nonexistent custom registrator should fail") { - import org.apache.spark.{SparkConf, SparkException} + import org.apache.spark.SparkException val conf = new SparkConf(false) conf.set("spark.kryo.registrator", "this.class.does.not.exist") - + val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance()) - assert(thrown.getMessage.contains("Failed to invoke this.class.does.not.exist")) + assert(thrown.getMessage.contains("Failed to register classes with Kryo")) } test("default class loader can be set by a different thread") { diff --git a/docs/configuration.md b/docs/configuration.md index 96fa1377ec399..66738d3ca754e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -124,12 +124,23 @@ of the most common options to set are: org.apache.spark.Serializer. + + spark.kryo.classesToRegister + (none) + + If you use Kryo serialization, give a comma-separated list of custom class names to register + with Kryo. + See the tuning guide for more details. + + spark.kryo.registrator (none) - If you use Kryo serialization, set this class to register your custom classes with Kryo. - It should be set to a class that extends + If you use Kryo serialization, set this class to register your custom classes with Kryo. This + property is useful if you need to register your classes in a custom way, e.g. to specify a custom + field serializer. Otherwise spark.kryo.classesToRegister is simpler. It should be + set to a class that extends KryoRegistrator. See the tuning guide for more details. diff --git a/docs/tuning.md b/docs/tuning.md index 8fb2a0433b1a8..9b5c9adac6a4f 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -47,24 +47,11 @@ registration requirement, but we recommend trying it in any network-intensive ap Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the [Twitter chill](https://github.com/twitter/chill) library. -To register your own custom classes with Kryo, create a public class that extends -[`org.apache.spark.serializer.KryoRegistrator`](api/scala/index.html#org.apache.spark.serializer.KryoRegistrator) and set the -`spark.kryo.registrator` config property to point to it, as follows: +To register your own custom classes with Kryo, use the `registerKryoClasses` method. {% highlight scala %} -import com.esotericsoftware.kryo.Kryo -import org.apache.spark.serializer.KryoRegistrator - -class MyRegistrator extends KryoRegistrator { - override def registerClasses(kryo: Kryo) { - kryo.register(classOf[MyClass1]) - kryo.register(classOf[MyClass2]) - } -} - val conf = new SparkConf().setMaster(...).setAppName(...) -conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") -conf.set("spark.kryo.registrator", "mypackage.MyRegistrator") +conf.registerKryoClasses(Seq(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf) {% endhighlight %} diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala index e06f4dcd54442..e322d4ce5a745 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala @@ -18,17 +18,7 @@ package org.apache.spark.examples.bagel import org.apache.spark._ -import org.apache.spark.SparkContext._ -import org.apache.spark.serializer.KryoRegistrator - import org.apache.spark.bagel._ -import org.apache.spark.bagel.Bagel._ - -import scala.collection.mutable.ArrayBuffer - -import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} - -import com.esotericsoftware.kryo._ class PageRankUtils extends Serializable { def computeWithCombiner(numVertices: Long, epsilon: Double)( @@ -99,13 +89,6 @@ class PRMessage() extends Message[String] with Serializable { } } -class PRKryoRegistrator extends KryoRegistrator { - def registerClasses(kryo: Kryo) { - kryo.register(classOf[PRVertex]) - kryo.register(classOf[PRMessage]) - } -} - class CustomPartitioner(partitions: Int) extends Partitioner { def numPartitions = partitions diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala index e4db3ec51313d..859abedf2a55e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala @@ -38,8 +38,7 @@ object WikipediaPageRank { } val sparkConf = new SparkConf() sparkConf.setAppName("WikipediaPageRank") - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) + sparkConf.registerKryoClasses(Array(classOf[PRVertex], classOf[PRMessage])) val inputFile = args(0) val threshold = args(1).toDouble diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala index 45527d9382fd0..d70d93608a57c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala @@ -46,10 +46,8 @@ object Analytics extends Logging { } val options = mutable.Map(optionsList: _*) - val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") - .set("spark.locality.wait", "100000") + val conf = new SparkConf().set("spark.locality.wait", "100000") + GraphXUtils.registerKryoClasses(conf) val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { println("Set the number of edge partitions using --numEPart.") diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index 5f35a5836462e..05676021718d9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -18,7 +18,7 @@ package org.apache.spark.examples.graphx import org.apache.spark.SparkContext._ -import org.apache.spark.graphx.PartitionStrategy +import org.apache.spark.graphx.{GraphXUtils, PartitionStrategy} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.graphx.util.GraphGenerators import java.io.{PrintWriter, FileOutputStream} @@ -80,8 +80,7 @@ object SynthBenchmark { val conf = new SparkConf() .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)") - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + GraphXUtils.registerKryoClasses(conf) val sc = new SparkContext(conf) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index fc6678013b932..8796c28db8a66 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -19,7 +19,6 @@ package org.apache.spark.examples.mllib import scala.collection.mutable -import com.esotericsoftware.kryo.Kryo import org.apache.log4j.{Level, Logger} import scopt.OptionParser @@ -27,7 +26,6 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating} import org.apache.spark.rdd.RDD -import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} /** * An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/). @@ -40,13 +38,6 @@ import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} */ object MovieLensALS { - class ALSRegistrator extends KryoRegistrator { - override def registerClasses(kryo: Kryo) { - kryo.register(classOf[Rating]) - kryo.register(classOf[mutable.BitSet]) - } - } - case class Params( input: String = null, kryo: Boolean = false, @@ -108,8 +99,7 @@ object MovieLensALS { def run(params: Params) { val conf = new SparkConf().setAppName(s"MovieLensALS with $params") if (params.kryo) { - conf.set("spark.serializer", classOf[KryoSerializer].getName) - .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) + conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating])) .set("spark.kryoserializer.buffer.mb", "8") } val sc = new SparkContext(conf) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index 1948c978c30bf..563c948957ecf 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -27,10 +27,10 @@ import org.apache.spark.graphx.impl._ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap import org.apache.spark.util.collection.OpenHashSet - /** * Registers GraphX classes with Kryo for improved performance. */ +@deprecated("Register GraphX classes with Kryo using GraphXUtils.registerKryoClasses", "1.2.0") class GraphKryoRegistrator extends KryoRegistrator { def registerClasses(kryo: Kryo) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala new file mode 100644 index 0000000000000..2cb07937eaa2a --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx + +import org.apache.spark.SparkConf + +import org.apache.spark.graphx.impl._ +import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap + +import org.apache.spark.util.collection.{OpenHashSet, BitSet} +import org.apache.spark.util.BoundedPriorityQueue + +object GraphXUtils { + /** + * Registers classes that GraphX uses with Kryo. + */ + def registerKryoClasses(conf: SparkConf) { + conf.registerKryoClasses(Array( + classOf[Edge[Object]], + classOf[(VertexId, Object)], + classOf[EdgePartition[Object, Object]], + classOf[BitSet], + classOf[VertexIdToIndexMap], + classOf[VertexAttributeBlock[Object]], + classOf[PartitionStrategy], + classOf[BoundedPriorityQueue[Object]], + classOf[EdgeDirection], + classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]], + classOf[OpenHashSet[Int]], + classOf[OpenHashSet[Long]])) + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala index 47594a800a3b1..a3e28efc75a98 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala @@ -17,9 +17,6 @@ package org.apache.spark.graphx -import org.scalatest.Suite -import org.scalatest.BeforeAndAfterEach - import org.apache.spark.SparkConf import org.apache.spark.SparkContext @@ -31,8 +28,7 @@ trait LocalSparkContext { /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */ def withSpark[T](f: SparkContext => T) = { val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + GraphXUtils.registerKryoClasses(conf) val sc = new SparkContext("local", "test", conf) try { f(sc) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index 9d00f76327e4c..db1dac6160080 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -129,9 +129,9 @@ class EdgePartitionSuite extends FunSuite { val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) val a: EdgePartition[Int, Int] = makeEdgePartition(aList) val javaSer = new JavaSerializer(new SparkConf()) - val kryoSer = new KryoSerializer(new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")) + val conf = new SparkConf() + GraphXUtils.registerKryoClasses(conf) + val kryoSer = new KryoSerializer(conf) for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) { val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a)) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala index f9e771a900013..fe8304c1cdc32 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala @@ -125,9 +125,9 @@ class VertexPartitionSuite extends FunSuite { val verts = Set((0L, 1), (1L, 1), (2L, 1)) val vp = VertexPartition(verts.iterator) val javaSer = new JavaSerializer(new SparkConf()) - val kryoSer = new KryoSerializer(new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")) + val conf = new SparkConf() + GraphXUtils.registerKryoClasses(conf) + val kryoSer = new KryoSerializer(conf) for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) { val vpSer: VertexPartition[Int] = s.deserialize(s.serialize(vp)) From bae4ca3bbf75cf5b57a098a95848b0fd65bc241e Mon Sep 17 00:00:00 2001 From: Karthik Date: Wed, 22 Oct 2014 00:08:53 -0700 Subject: [PATCH 11/22] Update JavaCustomReceiver.java Changed the usage string to correctly reflect the file name. Author: Karthik Closes #2699 from namelessnerd/patch-1 and squashes the following commits: 8570e33 [Karthik] Update JavaCustomReceiver.java --- .../org/apache/spark/examples/streaming/JavaCustomReceiver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index 5622df5ce03ff..981bc4f0613a9 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -57,7 +57,7 @@ public class JavaCustomReceiver extends Receiver { public static void main(String[] args) { if (args.length < 2) { - System.err.println("Usage: JavaNetworkWordCount "); + System.err.println("Usage: JavaCustomReceiver "); System.exit(1); } From f05e09b4c95d799bdda3c3ff7fb76a4cd656415d Mon Sep 17 00:00:00 2001 From: CrazyJvm Date: Wed, 22 Oct 2014 00:51:33 -0700 Subject: [PATCH 12/22] use isRunningLocally rather than runningLocally runningLocally is deprecated now Author: CrazyJvm Closes #2879 from CrazyJvm/runningLocally and squashes the following commits: bec0b3e [CrazyJvm] use isRunningLocally rather than runningLocally --- core/src/main/scala/org/apache/spark/CacheManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index d89bb50076c9a..80da62c44edc5 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -61,7 +61,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val computedValues = rdd.computeOrReadCheckpoint(partition, context) // If the task is running locally, do not persist the result - if (context.runningLocally) { + if (context.isRunningLocally) { return computedValues } From 97cf19f64e924569892e0a0417de19329855f4af Mon Sep 17 00:00:00 2001 From: freeman Date: Wed, 22 Oct 2014 09:33:12 -0700 Subject: [PATCH 13/22] Fix for sampling error in NumPy v1.9 [SPARK-3995][PYSPARK] Change maximum value for default seed during RDD sampling so that it is strictly less than 2 ** 32. This prevents a bug in the most recent version of NumPy, which cannot accept random seeds above this bound. Adds an extra test that uses the default seed (instead of setting it manually, as in the docstrings). mengxr Author: freeman Closes #2889 from freeman-lab/pyspark-sampling and squashes the following commits: dc385ef [freeman] Change maximum value for default seed --- python/pyspark/rddsampler.py | 4 ++-- python/pyspark/tests.py | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 55e247da0e4dc..528a181e8905a 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -31,7 +31,7 @@ def __init__(self, withReplacement, seed=None): "Falling back to default random generator for sampling.") self._use_numpy = False - self._seed = seed if seed is not None else random.randint(0, sys.maxint) + self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1) self._withReplacement = withReplacement self._random = None self._split = None @@ -47,7 +47,7 @@ def initRandomGenerator(self, split): for _ in range(0, split): # discard the next few values in the sequence to have a # different seed for the different splits - self._random.randint(0, sys.maxint) + self._random.randint(0, 2 ** 32 - 1) self._split = split self._rand_initialized = True diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index f5ccf31abb3fa..1a8e4150e63c3 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -433,6 +433,12 @@ def test_deleting_input_files(self): os.unlink(tempFile.name) self.assertRaises(Exception, lambda: filtered_data.count()) + def test_sampling_default_seed(self): + # Test for SPARK-3995 (default seed setting) + data = self.sc.parallelize(range(1000), 1) + subset = data.takeSample(False, 10) + self.assertEqual(len(subset), 10) + def testAggregateByKey(self): data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2) From 813effc701fc27121c6f23ab32882932016fdbe0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Oct 2014 14:49:58 -0700 Subject: [PATCH 14/22] [SPARK-3426] Fix sort-based shuffle error when spark.shuffle.compress and spark.shuffle.spill.compress settings are different This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the `spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have different values. The problem is that sort-based shuffle's read and write paths use different settings for determining whether to apply compression. ExternalSorter writes runs to files using `TempBlockId` ids, which causes `spark.shuffle.spill.compress` to be used for enabling compression, but these spilled files end up being shuffled over the network and read as shuffle files using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes `spark.shuffle.compress` to be used for enabling decompression. As a result, this leads to errors when these settings disagree. Based on the discussions in #2247 and #2178, it sounds like we don't want to remove the `spark.shuffle.spill.compress` setting. Therefore, I've tried to come up with a fix where `spark.shuffle.spill.compress` is used to compress data that's read and written locally and `spark.shuffle.compress` is used to compress any data that will be fetched / read as shuffle blocks. To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and `TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and `spark.shuffle.compress`, respectively. ExternalAppendOnlyMap also used temp blocks for spilling data. It looks like ExternalSorter was designed to be a generic sorter but its configuration already happens to be tied to sort-based shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress its spills; we can move the compression configuration to the constructor in a later commit if we find that ExternalSorter is being used in other contexts where we want different configuration options to control compression. To summarize: **Before:** | | ExternalAppendOnlyMap | ExternalSorter | |-------|------------------------------|------------------------------| | Read | spark.shuffle.spill.compress | spark.shuffle.compress | | Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress | **After:** | | ExternalAppendOnlyMap | ExternalSorter | |-------|------------------------------|------------------------| | Read | spark.shuffle.spill.compress | spark.shuffle.compress | | Write | spark.shuffle.spill.compress | spark.shuffle.compress | Thanks to andrewor14 for debugging this with me! Author: Josh Rosen Closes #2890 from JoshRosen/SPARK-3426 and squashes the following commits: 1921cf6 [Josh Rosen] Minor edit for clarity. c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock(). 2c687b9 [Josh Rosen] Fix SPARK-3426. 91e7e40 [Josh Rosen] Combine tests into single test of all combinations 76ca65e [Josh Rosen] Add regression test for SPARK-3426. --- .../org/apache/spark/storage/BlockId.scala | 11 ++++++--- .../apache/spark/storage/BlockManager.scala | 3 ++- .../spark/storage/DiskBlockManager.scala | 17 +++++++++---- .../collection/ExternalAppendOnlyMap.scala | 2 +- .../util/collection/ExternalSorter.scala | 15 ++++++++++-- .../scala/org/apache/spark/ShuffleSuite.scala | 24 +++++++++++++++++++ 6 files changed, 61 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index a83a3f468ae5f..8df5ec6bde184 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -83,9 +83,14 @@ case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { def name = "input-" + streamId + "-" + uniqueId } -/** Id associated with temporary data managed as blocks. Not serializable. */ -private[spark] case class TempBlockId(id: UUID) extends BlockId { - def name = "temp_" + id +/** Id associated with temporary local data managed as blocks. Not serializable. */ +private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { + def name = "temp_local_" + id +} + +/** Id associated with temporary shuffle data managed as blocks. Not serializable. */ +private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId { + def name = "temp_shuffle_" + id } // Intended only for testing purposes diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0ce2a3f631b15..4cc97923658bc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1071,7 +1071,8 @@ private[spark] class BlockManager( case _: ShuffleBlockId => compressShuffle case _: BroadcastBlockId => compressBroadcast case _: RDDBlockId => compressRdds - case _: TempBlockId => compressShuffleSpill + case _: TempLocalBlockId => compressShuffleSpill + case _: TempShuffleBlockId => compressShuffle case _ => false } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index a715594f198c2..6633a1db57e59 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -98,11 +98,20 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon getAllFiles().map(f => BlockId(f.getName)) } - /** Produces a unique block id and File suitable for intermediate results. */ - def createTempBlock(): (TempBlockId, File) = { - var blockId = new TempBlockId(UUID.randomUUID()) + /** Produces a unique block id and File suitable for storing local intermediate results. */ + def createTempLocalBlock(): (TempLocalBlockId, File) = { + var blockId = new TempLocalBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { - blockId = new TempBlockId(UUID.randomUUID()) + blockId = new TempLocalBlockId(UUID.randomUUID()) + } + (blockId, getFile(blockId)) + } + + /** Produces a unique block id and File suitable for storing shuffled intermediate results. */ + def createTempShuffleBlock(): (TempShuffleBlockId, File) = { + var blockId = new TempShuffleBlockId(UUID.randomUUID()) + while (getFile(blockId).exists()) { + blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 0c088da46aa5e..26fa0cb6d7bde 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -153,7 +153,7 @@ class ExternalAppendOnlyMap[K, V, C]( * Sort the existing contents of the in-memory map and spill them to a temporary file on disk. */ override protected[this] def spill(collection: SizeTracker): Unit = { - val (blockId, file) = diskBlockManager.createTempBlock() + val (blockId, file) = diskBlockManager.createTempLocalBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize, curWriteMetrics) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index d1b06d14acbd2..c1ce13683b569 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -38,6 +38,11 @@ import org.apache.spark.storage.{BlockObjectWriter, BlockId} * * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. * + * Note: Although ExternalSorter is a fairly generic sorter, some of its configuration is tied + * to its use in sort-based shuffle (for example, its block compression is controlled by + * `spark.shuffle.compress`). We may need to revisit this if ExternalSorter is used in other + * non-shuffle contexts where we might want to use different configuration settings. + * * @param aggregator optional Aggregator with combine functions to use for merging data * @param partitioner optional Partitioner; if given, sort by partition ID and then key * @param ordering optional Ordering to sort keys within each partition; should be a total ordering @@ -259,7 +264,10 @@ private[spark] class ExternalSorter[K, V, C]( private def spillToMergeableFile(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { assert(!bypassMergeSort) - val (blockId, file) = diskBlockManager.createTempBlock() + // Because these files may be read during shuffle, their compression must be controlled by + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. + val (blockId, file) = diskBlockManager.createTempShuffleBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) var objectsWritten = 0 // Objects written since the last flush @@ -338,7 +346,10 @@ private[spark] class ExternalSorter[K, V, C]( if (partitionWriters == null) { curWriteMetrics = new ShuffleWriteMetrics() partitionWriters = Array.fill(numPartitions) { - val (blockId, file) = diskBlockManager.createTempBlock() + // Because these files may be read during shuffle, their compression must be controlled by + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. + val (blockId, file) = diskBlockManager.createTempShuffleBlock() blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() } } diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 15aa4d83800fa..2bdd84ce69ab8 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -242,6 +242,30 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex assert(thrown.getClass === classOf[SparkException]) assert(thrown.getMessage.toLowerCase.contains("serializable")) } + + test("shuffle with different compression settings (SPARK-3426)") { + for ( + shuffleSpillCompress <- Set(true, false); + shuffleCompress <- Set(true, false) + ) { + val conf = new SparkConf() + .setAppName("test") + .setMaster("local") + .set("spark.shuffle.spill.compress", shuffleSpillCompress.toString) + .set("spark.shuffle.compress", shuffleCompress.toString) + .set("spark.shuffle.memoryFraction", "0.001") + resetSparkContext() + sc = new SparkContext(conf) + try { + sc.parallelize(0 until 100000).map(i => (i / 4, i)).groupByKey().collect() + } catch { + case e: Exception => + val errMsg = s"Failed with spark.shuffle.spill.compress=$shuffleSpillCompress," + + s" spark.shuffle.compress=$shuffleCompress" + throw new Exception(errMsg, e) + } + } + } } object ShuffleSuite { From 137d94235383cc49ccf8a7bb7f314f578aa1dede Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 22 Oct 2014 15:04:41 -0700 Subject: [PATCH 15/22] [SPARK-3877][YARN] Throw an exception when application is not successful so that the exit code wil be set to 1 When an yarn application fails (yarn-cluster mode), the exit code of spark-submit is still 0. It's hard for people to write some automatic scripts to run spark jobs in yarn because the failure can not be detected in these scripts. This PR added a status checking after `monitorApplication`. If an application is not successful, `run()` will throw an `SparkException`, so that Client.scala will exit with code 1. Therefore, people can use the exit code of `spark-submit` to write some automatic scripts. Author: zsxwing Closes #2732 from zsxwing/SPARK-3877 and squashes the following commits: 1f89fa5 [zsxwing] Fix the unit test a0498e1 [zsxwing] Update the docs and the error message e1cb9ef [zsxwing] Fix the hacky way of calling Client ff16fec [zsxwing] Remove System.exit in Client.scala and add a test 6a2c103 [zsxwing] [SPARK-3877] Throw an exception when application is not successful so that the exit code wil be set to 1 --- .../org/apache/spark/deploy/yarn/Client.scala | 12 ++------ .../apache/spark/deploy/yarn/ClientBase.scala | 29 +++++++++++++++---- .../cluster/YarnClientSchedulerBackend.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 12 ++------ .../spark/deploy/yarn/YarnClusterSuite.scala | 24 ++++++++++----- 5 files changed, 44 insertions(+), 35 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 5c7bca4541222..9c66c785848a5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -137,15 +137,7 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - try { - val args = new ClientArguments(argStrings, sparkConf) - new Client(args, sparkConf).run() - } catch { - case e: Exception => - Console.err.println(e.getMessage) - System.exit(1) - } - - System.exit(0) + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 0efac4ea63702..fb0e34bf5985e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -417,17 +417,19 @@ private[spark] trait ClientBase extends Logging { /** * Report the state of an application until it has exited, either successfully or - * due to some failure, then return the application state. + * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED, + * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED, + * or KILLED). * * @param appId ID of the application to monitor. * @param returnOnRunning Whether to also return the application state when it is RUNNING. * @param logApplicationReport Whether to log details of the application report every iteration. - * @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING. + * @return A pair of the yarn application state and the final application state. */ def monitorApplication( appId: ApplicationId, returnOnRunning: Boolean = false, - logApplicationReport: Boolean = true): YarnApplicationState = { + logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) var lastState: YarnApplicationState = null while (true) { @@ -468,11 +470,11 @@ private[spark] trait ClientBase extends Logging { if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { - return state + return (state, report.getFinalApplicationStatus) } if (returnOnRunning && state == YarnApplicationState.RUNNING) { - return state + return (state, report.getFinalApplicationStatus) } lastState = state @@ -485,8 +487,23 @@ private[spark] trait ClientBase extends Logging { /** * Submit an application to the ResourceManager and monitor its state. * This continues until the application has exited for any reason. + * If the application finishes with a failed, killed, or undefined status, + * throw an appropriate SparkException. */ - def run(): Unit = monitorApplication(submitApplication()) + def run(): Unit = { + val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication()) + if (yarnApplicationState == YarnApplicationState.FAILED || + finalApplicationStatus == FinalApplicationStatus.FAILED) { + throw new SparkException("Application finished with failed status") + } + if (yarnApplicationState == YarnApplicationState.KILLED || + finalApplicationStatus == FinalApplicationStatus.KILLED) { + throw new SparkException("Application is killed") + } + if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) { + throw new SparkException("The final status of application is undefined") + } + } /* --------------------------------------------------------------------------------------- * | Methods that cannot be implemented here due to API differences across hadoop versions | diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 6bb4b82316ad4..d948a2aeedd45 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -99,7 +99,7 @@ private[spark] class YarnClientSchedulerBackend( */ private def waitForApplication(): Unit = { assert(client != null && appId != null, "Application has not been submitted yet!") - val state = client.monitorApplication(appId, returnOnRunning = true) // blocking + val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0b43e6ee20538..addaddb711d3c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -135,15 +135,7 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - try { - val args = new ClientArguments(argStrings, sparkConf) - new Client(args, sparkConf).run() - } catch { - case e: Exception => - Console.err.println(e.getMessage) - System.exit(1) - } - - System.exit(0) + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() } } diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index a826b2a78a8f5..d79b85e867fcd 100644 --- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster -import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -123,21 +123,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit val main = YarnClusterDriver.getClass.getName().stripSuffix("$") var result = File.createTempFile("result", null, tempDir) - // The Client object will call System.exit() after the job is done, and we don't want - // that because it messes up the scalatest monitoring. So replicate some of what main() - // does here. val args = Array("--class", main, "--jar", "file:" + fakeSparkJar.getAbsolutePath(), "--arg", "yarn-cluster", "--arg", result.getAbsolutePath(), "--num-executors", "1") - val sparkConf = new SparkConf() - val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf) - val clientArgs = new ClientArguments(args, sparkConf) - new Client(clientArgs, yarnConf, sparkConf).run() + Client.main(args) checkResult(result) } + test("run Spark in yarn-cluster mode unsuccessfully") { + val main = YarnClusterDriver.getClass.getName().stripSuffix("$") + + // Use only one argument so the driver will fail + val args = Array("--class", main, + "--jar", "file:" + fakeSparkJar.getAbsolutePath(), + "--arg", "yarn-cluster", + "--num-executors", "1") + val exception = intercept[SparkException] { + Client.main(args) + } + assert(Utils.exceptionString(exception).contains("Application finished with failed status")) + } + /** * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide * any sort of error when the job process finishes successfully, but the job itself fails. So From c5882c663e054adcd3ecd9f11e91a1929dbc14a3 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 22 Oct 2014 19:44:00 -0700 Subject: [PATCH 16/22] [SPARK-3812] [BUILD] Adapt maven build to publish effective pom. I have tried maven help plugin first but that published all projects in top level pom. So I was left with no choice but to roll my own trivial plugin. This patch basically installs an effective pom after maven install is finished. The problem it fixes is described as follows: If you install using maven ` mvn install -DskipTests -Dhadoop.version=2.2.0 -Phadoop-2.2 ` Then without this patch the published pom(s) will have hadoop version as 1.0.4. This can be a problem at some point. Author: Prashant Sharma Closes #2673 from ScrapCodes/build-changes-effective-pom and squashes the following commits: aa7b91d [Prashant Sharma] used an unused dep. 0300dac [Prashant Sharma] improved comment messages.. 28f891e [Prashant Sharma] Added a useless dependency, so that we can shade it. And realized fake shading works for us. 553d96b [Prashant Sharma] Shaded some unused class of an unused dep, to generate effective pom(s) --- pom.xml | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 288bbf1114bea..687cc6352d5c1 100644 --- a/pom.xml +++ b/pom.xml @@ -248,7 +248,17 @@ - + + + + org.spark-project.spark + unused + 1.0.0 + + @@ -992,6 +1002,27 @@ + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + false + + + org.spark-project.spark:unused + + + + + + org.apache.maven.plugins maven-enforcer-plugin From d6a302539213e8cdb51ca14b1769aeb33f9f435f Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 22 Oct 2014 22:13:11 -0700 Subject: [PATCH 17/22] [BUILD] Fixed resolver for scalastyle plugin and upgrade sbt version. Author: Prashant Sharma Closes #2877 from ScrapCodes/scalastyle-fix and squashes the following commits: a17b9fe [Prashant Sharma] [BUILD] Fixed resolver for scalastyle plugin. --- project/build.properties | 2 +- project/plugins.sbt | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/project/build.properties b/project/build.properties index c12ef652adfcb..32a3aeefaf9fb 100644 --- a/project/build.properties +++ b/project/build.properties @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -sbt.version=0.13.5 +sbt.version=0.13.6 diff --git a/project/plugins.sbt b/project/plugins.sbt index 678f5ed1ba610..9d50a50b109af 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,6 +4,8 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline. resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" +resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/" + addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") From f799700eec4a5e33db9b2d6a4bee60a50fd5a099 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 23 Oct 2014 09:19:32 -0700 Subject: [PATCH 18/22] [SPARK-4055][MLlib] Inconsistent spelling 'MLlib' and 'MLLib' Thare are some inconsistent spellings 'MLlib' and 'MLLib' in some documents and source codes. Author: Kousuke Saruta Closes #2903 from sarutak/SPARK-4055 and squashes the following commits: b031640 [Kousuke Saruta] Fixed inconsistent spelling "MLlib and MLLib" --- docs/mllib-feature-extraction.md | 2 +- docs/mllib-statistics.md | 2 +- .../src/main/java/org/apache/spark/examples/mllib/JavaALS.java | 2 +- .../main/java/org/apache/spark/examples/mllib/JavaKMeans.java | 2 +- .../main/scala/org/apache/spark/mllib/api/python/package.scala | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index 1511ae6dda4ed..11622414494e4 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -83,7 +83,7 @@ val idf = new IDF().fit(tf) val tfidf: RDD[Vector] = idf.transform(tf) {% endhighlight %} -MLLib's IDF implementation provides an option for ignoring terms which occur in less than a +MLlib's IDF implementation provides an option for ignoring terms which occur in less than a minimum number of documents. In such cases, the IDF for these terms is set to 0. This feature can be used by passing the `minDocFreq` value to the IDF constructor. diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md index c4632413991f1..10a5131c07414 100644 --- a/docs/mllib-statistics.md +++ b/docs/mllib-statistics.md @@ -197,7 +197,7 @@ print Statistics.corr(data, method="pearson") ## Stratified sampling -Unlike the other statistics functions, which reside in MLLib, stratified sampling methods, +Unlike the other statistics functions, which reside in MLlib, stratified sampling methods, `sampleByKey` and `sampleByKeyExact`, can be performed on RDD's of key-value pairs. For stratified sampling, the keys can be thought of as a label and the value as a specific attribute. For example the key can be man or woman, or document ids, and the respective values can be the list of ages diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java index 8d381d4e0a943..95a430f1da234 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java @@ -32,7 +32,7 @@ import scala.Tuple2; /** - * Example using MLLib ALS from Java. + * Example using MLlib ALS from Java. */ public final class JavaALS { diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java index f796123a25727..e575eedeb465c 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java @@ -30,7 +30,7 @@ import org.apache.spark.mllib.linalg.Vectors; /** - * Example using MLLib KMeans from Java. + * Example using MLlib KMeans from Java. */ public final class JavaKMeans { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/package.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/package.scala index 87bdc8558aaf5..c67a6d3ae6cce 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/package.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/package.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.api /** - * Internal support for MLLib Python API. + * Internal support for MLlib Python API. * * @see [[org.apache.spark.mllib.api.python.PythonMLLibAPI]] */ From 6b485225271a3c616c4fa1231c20090a95c86f32 Mon Sep 17 00:00:00 2001 From: Tal Sliwowicz Date: Thu, 23 Oct 2014 10:51:06 -0700 Subject: [PATCH 19/22] [SPARK-4006] In long running contexts, we encountered the situation of double registe... ...r without a remove in between. The cause for that is unknown, and assumed a temp network issue. However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us. The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones. Also - added some logging for register and unregister. This is just like https://github.com/apache/spark/pull/2854 except it's on master Author: Tal Sliwowicz Closes #2886 from tsliwowicz/master-block-mgr-removal and squashes the following commits: 094d508 [Tal Sliwowicz] some more white space change undone 41a2217 [Tal Sliwowicz] some more whitspaces change undone 7bcfc3d [Tal Sliwowicz] whitspaces fix df9d98f [Tal Sliwowicz] Code review comments fixed f48bce9 [Tal Sliwowicz] In long running contexts, we encountered the situation of double register without a remove in between. The cause for that is unknown, and assumed a temp network issue. --- .../storage/BlockManagerMasterActor.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 088f06e389d83..5e375a2553979 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -203,6 +203,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId)) + logInfo(s"Removing block manager $blockManagerId") } private def expireDeadHosts() { @@ -327,20 +328,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus val time = System.currentTimeMillis() if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { - case Some(manager) => - // A block manager of the same executor already exists. - // This should never happen. Let's just quit. - logError("Got two different block manager registrations on " + id.executorId) - System.exit(1) + case Some(oldId) => + // A block manager of the same executor already exists, so remove it (assumed dead) + logError("Got two different block manager registrations on same executor - " + + s" will replace old one $oldId with new one $id") + removeExecutor(id.executorId) case None => - blockManagerIdByExecutor(id.executorId) = id } - - logInfo("Registering block manager %s with %s RAM".format( - id.hostPort, Utils.bytesToString(maxMemSize))) - - blockManagerInfo(id) = - new BlockManagerInfo(id, time, maxMemSize, slaveActor) + logInfo("Registering block manager %s with %s RAM, %s".format( + id.hostPort, Utils.bytesToString(maxMemSize), id)) + + blockManagerIdByExecutor(id.executorId) = id + + blockManagerInfo(id) = new BlockManagerInfo( + id, System.currentTimeMillis(), maxMemSize, slaveActor) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) } From 293672c499911328eb27b48dbd7bdef4f4cc8adb Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 23 Oct 2014 13:46:55 -0700 Subject: [PATCH 20/22] specify unidocGenjavadocVersion of 0.8 Fixes an issue with being too strict generating javadoc causing errors. Author: Holden Karau Closes #2893 from holdenk/SPARK-3359-sbtunidoc-java8 and squashes the following commits: 9379a70 [Holden Karau] specify unidocGenjavadocVersion of 0.8 --- project/SparkBuild.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 01a5b20e7c51d..705937e3016e2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -22,6 +22,7 @@ import sbt._ import sbt.Classpaths.publishTask import sbt.Keys._ import sbtunidoc.Plugin.genjavadocSettings +import sbtunidoc.Plugin.UnidocKeys.unidocGenjavadocVersion import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings} import com.typesafe.sbt.pom.{PomBuild, SbtPomKeys} import net.virtualvoid.sbt.graph.Plugin.graphSettings @@ -116,6 +117,7 @@ object SparkBuild extends PomBuild { retrieveManaged := true, retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", publishMavenStyle := true, + unidocGenjavadocVersion := "0.8", resolvers += Resolver.mavenLocal, otherResolvers <<= SbtPomKeys.mvnLocalRepository(dotM2 => Seq(Resolver.file("dotM2", dotM2))), From 222fa47f0dfd6c53aac513655a519521d9396e72 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 23 Oct 2014 16:01:38 -0700 Subject: [PATCH 21/22] Revert "[SPARK-3812] [BUILD] Adapt maven build to publish effective pom." This reverts commit c5882c663e054adcd3ecd9f11e91a1929dbc14a3. I am reverting this becuase it appears to cause the maven tests to hang. --- pom.xml | 33 +-------------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/pom.xml b/pom.xml index 687cc6352d5c1..288bbf1114bea 100644 --- a/pom.xml +++ b/pom.xml @@ -248,17 +248,7 @@ - - - - org.spark-project.spark - unused - 1.0.0 - - + @@ -1002,27 +992,6 @@ - - - org.apache.maven.plugins - maven-shade-plugin - - - package - - shade - - - false - - - org.spark-project.spark:unused - - - - - - org.apache.maven.plugins maven-enforcer-plugin From 83b7a1c6503adce1826fc537b4db47e534da5cae Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Oct 2014 16:39:32 -0700 Subject: [PATCH 22/22] [SPARK-4019] [SPARK-3740] Fix MapStatus compression bug that could lead to empty results or Snappy errors This commit fixes a bug in MapStatus that could cause jobs to wrongly return empty results if those jobs contained stages with more than 2000 partitions where most of those partitions were empty. For jobs with > 2000 partitions, MapStatus uses HighlyCompressedMapStatus, which only stores the average size of blocks. If the average block size is zero, then this will cause all blocks to be reported as empty, causing BlockFetcherIterator to mistakenly skip them. For example, this would return an empty result: sc.makeRDD(0 until 10, 1000).repartition(2001).collect() This can also lead to deserialization errors (e.g. Snappy decoding errors) for jobs with > 2000 partitions where the average block size is non-zero but there is at least one empty block. In this case, the BlockFetcher attempts to fetch empty blocks and fails when trying to deserialize them. The root problem here is that MapStatus has a (previously undocumented) correctness property that was violated by HighlyCompressedMapStatus: If a block is non-empty, then getSizeForBlock must be non-zero. I fixed this by modifying HighlyCompressedMapStatus to store the average size of _non-empty_ blocks and to use a compressed bitmap to track which blocks are empty. I also removed a test which was broken as originally written: it attempted to check that HighlyCompressedMapStatus's size estimation error was < 10%, but this was broken because HighlyCompressedMapStatus is only used for map statuses with > 2000 partitions, but the test only created 50. Author: Josh Rosen Closes #2866 from JoshRosen/spark-4019 and squashes the following commits: fc8b490 [Josh Rosen] Roll back hashset change, which didn't improve performance. 5faa0a4 [Josh Rosen] Incorporate review feedback c8b8cae [Josh Rosen] Two performance fixes: 3b892dd [Josh Rosen] Address Reynold's review comments ba2e71c [Josh Rosen] Add missing newline 609407d [Josh Rosen] Use Roaring Bitmap to track non-empty blocks. c23897a [Josh Rosen] Use sets when comparing collect() results 91276a3 [Josh Rosen] [SPARK-4019] Fix MapStatus compression bug that could lead to empty results. --- core/pom.xml | 4 + .../apache/spark/scheduler/MapStatus.scala | 76 ++++++++++++++++--- .../scala/org/apache/spark/rdd/RDDSuite.scala | 5 ++ .../spark/scheduler/MapStatusSuite.scala | 53 +++++++------ pom.xml | 5 ++ 5 files changed, 109 insertions(+), 34 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index a5a178079bc57..7b68dbaea4789 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -132,6 +132,10 @@ com.twitter chill-java + + org.roaringbitmap + RoaringBitmap + commons-net commons-net diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index e25096ea92d70..2ab5d9637b593 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,6 +19,8 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} +import org.roaringbitmap.RoaringBitmap + import org.apache.spark.storage.BlockManagerId /** @@ -29,7 +31,12 @@ private[spark] sealed trait MapStatus { /** Location where this task was run. */ def location: BlockManagerId - /** Estimated size for the reduce block, in bytes. */ + /** + * Estimated size for the reduce block, in bytes. + * + * If a block is non-empty, then this method MUST return a non-zero size. This invariant is + * necessary for correctness, since block fetchers are allowed to skip zero-size blocks. + */ def getSizeForBlock(reduceId: Int): Long } @@ -38,7 +45,7 @@ private[spark] object MapStatus { def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > 2000) { - new HighlyCompressedMapStatus(loc, uncompressedSizes) + HighlyCompressedMapStatus(loc, uncompressedSizes) } else { new CompressedMapStatus(loc, uncompressedSizes) } @@ -112,35 +119,80 @@ private[spark] class CompressedMapStatus( } } - /** - * A [[MapStatus]] implementation that only stores the average size of the blocks. + * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, + * plus a bitmap for tracking which blocks are non-empty. During serialization, this bitmap + * is compressed. * - * @param loc location where the task is being executed. - * @param avgSize average size of all the blocks + * @param loc location where the task is being executed + * @param numNonEmptyBlocks the number of non-empty blocks + * @param emptyBlocks a bitmap tracking which blocks are empty + * @param avgSize average size of the non-empty blocks */ -private[spark] class HighlyCompressedMapStatus( +private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, + private[this] var numNonEmptyBlocks: Int, + private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long) extends MapStatus with Externalizable { - def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) { - this(loc, uncompressedSizes.sum / uncompressedSizes.length) - } + // loc could be null when the default constructor is called during deserialization + require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, + "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, 0L) // For deserialization only + protected def this() = this(null, -1, null, -1) // For deserialization only override def location: BlockManagerId = loc - override def getSizeForBlock(reduceId: Int): Long = avgSize + override def getSizeForBlock(reduceId: Int): Long = { + if (emptyBlocks.contains(reduceId)) { + 0 + } else { + avgSize + } + } override def writeExternal(out: ObjectOutput): Unit = { loc.writeExternal(out) + emptyBlocks.writeExternal(out) out.writeLong(avgSize) } override def readExternal(in: ObjectInput): Unit = { loc = BlockManagerId(in) + emptyBlocks = new RoaringBitmap() + emptyBlocks.readExternal(in) avgSize = in.readLong() } } + +private[spark] object HighlyCompressedMapStatus { + def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { + // We must keep track of which blocks are empty so that we don't report a zero-sized + // block as being non-empty (or vice-versa) when using the average block size. + var i = 0 + var numNonEmptyBlocks: Int = 0 + var totalSize: Long = 0 + // From a compression standpoint, it shouldn't matter whether we track empty or non-empty + // blocks. From a performance standpoint, we benefit from tracking empty blocks because + // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. + val emptyBlocks = new RoaringBitmap() + val totalNumBlocks = uncompressedSizes.length + while (i < totalNumBlocks) { + var size = uncompressedSizes(i) + if (size > 0) { + numNonEmptyBlocks += 1 + totalSize += size + } else { + emptyBlocks.add(i) + } + i += 1 + } + val avgSize = if (numNonEmptyBlocks > 0) { + totalSize / numNonEmptyBlocks + } else { + 0 + } + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) + } +} diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 465c1a8a43a79..6d2e696dc2fc4 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -459,6 +459,11 @@ class RDDSuite extends FunSuite with SharedSparkContext { for (i <- 0 until sample.size) assert(sample(i) === checkSample(i)) } + test("collect large number of empty partitions") { + // Regression test for SPARK-4019 + assert(sc.makeRDD(0 until 10, 1000).repartition(2001).collect().toSet === (0 until 10).toSet) + } + test("take") { var nums = sc.makeRDD(Range(1, 1000), 1) assert(nums.take(0).size === 0) diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 79e04f046e4c4..950c6dc58e332 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.FunSuite import org.apache.spark.SparkConf import org.apache.spark.serializer.JavaSerializer +import scala.util.Random class MapStatusSuite extends FunSuite { @@ -46,6 +47,26 @@ class MapStatusSuite extends FunSuite { } } + test("MapStatus should never report non-empty blocks' sizes as 0") { + import Math._ + for ( + numSizes <- Seq(1, 10, 100, 1000, 10000); + mean <- Seq(0L, 100L, 10000L, Int.MaxValue.toLong); + stddev <- Seq(0.0, 0.01, 0.5, 1.0) + ) { + val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean) + val status = MapStatus(BlockManagerId("a", "b", 10), sizes) + val status1 = compressAndDecompressMapStatus(status) + for (i <- 0 until numSizes) { + if (sizes(i) != 0) { + val failureMessage = s"Failed with $numSizes sizes with mean=$mean, stddev=$stddev" + assert(status.getSizeForBlock(i) !== 0, failureMessage) + assert(status1.getSizeForBlock(i) !== 0, failureMessage) + } + } + } + } + test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) { val sizes = Array.fill[Long](2001)(150L) val status = MapStatus(null, sizes) @@ -56,37 +77,25 @@ class MapStatusSuite extends FunSuite { assert(status.getSizeForBlock(2000) === 150L) } - test(classOf[HighlyCompressedMapStatus].getName + ": estimated size is within 10%") { - val sizes = Array.tabulate[Long](50) { i => i.toLong } + test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") { + val sizes = Array.tabulate[Long](3000) { i => i.toLong } + val avg = sizes.sum / sizes.filter(_ != 0).length val loc = BlockManagerId("a", "b", 10) val status = MapStatus(loc, sizes) - val ser = new JavaSerializer(new SparkConf) - val buf = ser.newInstance().serialize(status) - val status1 = ser.newInstance().deserialize[MapStatus](buf) + val status1 = compressAndDecompressMapStatus(status) + assert(status1.isInstanceOf[HighlyCompressedMapStatus]) assert(status1.location == loc) - for (i <- 0 until sizes.length) { - // make sure the estimated size is within 10% of the input; note that we skip the very small - // sizes because the compression is very lossy there. + for (i <- 0 until 3000) { val estimate = status1.getSizeForBlock(i) - if (estimate > 100) { - assert(math.abs(estimate - sizes(i)) * 10 <= sizes(i), - s"incorrect estimated size $estimate, original was ${sizes(i)}") + if (sizes(i) > 0) { + assert(estimate === avg) } } } - test(classOf[HighlyCompressedMapStatus].getName + ": estimated size should be the average size") { - val sizes = Array.tabulate[Long](3000) { i => i.toLong } - val avg = sizes.sum / sizes.length - val loc = BlockManagerId("a", "b", 10) - val status = MapStatus(loc, sizes) + def compressAndDecompressMapStatus(status: MapStatus): MapStatus = { val ser = new JavaSerializer(new SparkConf) val buf = ser.newInstance().serialize(status) - val status1 = ser.newInstance().deserialize[MapStatus](buf) - assert(status1.location == loc) - for (i <- 0 until 3000) { - val estimate = status1.getSizeForBlock(i) - assert(estimate === avg) - } + ser.newInstance().deserialize[MapStatus](buf) } } diff --git a/pom.xml b/pom.xml index 288bbf1114bea..a7e71f9ca5596 100644 --- a/pom.xml +++ b/pom.xml @@ -428,6 +428,11 @@ + + org.roaringbitmap + RoaringBitmap + 0.4.1 + commons-net commons-net