From b9624eefa111c012b013417a4c754358066e29ed Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 1 May 2015 14:43:56 -0700 Subject: [PATCH] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. Conflicts: core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala --- .../spark/serializer/KryoSerializer.scala | 5 ++++ .../apache/spark/serializer/Serializer.scala | 26 ++++++++++++++++++- .../util/collection/ExternalSorter.scala | 3 +-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index b7bc087855b9f..f6c17e362f9b3 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -125,6 +125,11 @@ class KryoSerializer(conf: SparkConf) override def newInstance(): SerializerInstance = { new KryoSerializerInstance(this) } + + override def supportsRelocationOfSerializedObjects: Boolean = { + // TODO: we should have a citation / explanatory comment here clarifying _why_ this is the case + newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset() + } } private[spark] diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index c381672a4f588..144a1c51ac858 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -23,7 +23,7 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{Experimental, DeveloperApi} import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator} /** @@ -63,6 +63,30 @@ abstract class Serializer { /** Creates a new [[SerializerInstance]]. */ def newInstance(): SerializerInstance + + /** + * Returns true if this serializer supports relocation of its serialized objects and false + * otherwise. This should return true if and only if reordering the bytes of serialized objects + * in serialization stream output results in re-ordered input that can be read with the + * deserializer. For instance, the following should work if the serializer supports relocation: + * + * serOut.open() + * position = 0 + * serOut.write(obj1) + * serOut.flush() + * position = # of bytes writen to stream so far + * obj1Bytes = [bytes 0 through position of stream] + * serOut.write(obj2) + * serOut.flush + * position2 = # of bytes written to stream so far + * obj2Bytes = bytes[position through position2 of stream] + * + * serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1) + * + * See SPARK-7311 for more discussion. + */ + @Experimental + def supportsRelocationOfSerializedObjects: Boolean = false } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index b7306cd551918..7d5cf7b61e56a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C]( private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB private val useSerializedPairBuffer = !ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && - ser.isInstanceOf[KryoSerializer] && - serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset + ser.supportsRelocationOfSerializedObjects // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we