From dfc62e294dc942303c01dc17a9c2974e8fa9668e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 9 Oct 2013 16:55:30 -0700 Subject: [PATCH] Merge pull request #49 from mateiz/kryo-fix-2 Fix Chill serialization of Range objects It used to write out each element one by one, creating very large objects. (cherry picked from commit 320418f7c8b42d4ce781b32c9ee47a9b54550b89) Signed-off-by: Reynold Xin --- .../spark/serializer/KryoSerializer.scala | 14 ++++++++++--- .../serializer/KryoSerializerSuite.scala | 21 +++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 6c500bad92648..e936b1cfedd91 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.esotericsoftware.kryo.{KryoException, Kryo} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import com.twitter.chill.ScalaKryoInstantiator +import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} import org.apache.spark.{SerializableWritable, Logging} import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel} @@ -39,7 +39,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging def newKryoOutput() = new KryoOutput(bufferSize) def newKryo(): Kryo = { - val instantiator = new ScalaKryoInstantiator + val instantiator = new EmptyScalaKryoInstantiator val kryo = instantiator.newKryo() val classLoader = Thread.currentThread.getContextClassLoader @@ -49,7 +49,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging StorageLevel.MEMORY_ONLY, PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY), GotBlock("1", ByteBuffer.allocate(1)), - GetBlock("1") + GetBlock("1"), + 1 to 10, + 1 until 10, + 1L to 10L, + 1L until 10L ) for (obj <- toRegister) kryo.register(obj.getClass) @@ -69,6 +73,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging case _: Exception => println("Failed to register spark.kryo.registrator") } + // Register Chill's classes; we do this after our ranges and the user's own classes to let + // our code override the generic serialziers in Chill for things like Seq + new AllScalaRegistrar().apply(kryo) + kryo.setClassLoader(classLoader) // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 0164dda0ba5e3..c016c5117149f 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three"))) } + test("ranges") { + val ser = (new KryoSerializer).newInstance() + def check[T](t: T) { + assert(ser.deserialize[T](ser.serialize(t)) === t) + // Check that very long ranges don't get written one element at a time + assert(ser.serialize(t).limit < 100) + } + check(1 to 1000000) + check(1 to 1000000 by 2) + check(1 until 1000000) + check(1 until 1000000 by 2) + check(1L to 1000000L) + check(1L to 1000000L by 2L) + check(1L until 1000000L) + check(1L until 1000000L by 2L) + check(1.0 to 1000000.0 by 1.0) + check(1.0 to 1000000.0 by 2.0) + check(1.0 until 1000000.0 by 1.0) + check(1.0 until 1000000.0 by 2.0) + } + test("custom registrator") { System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)