Skip to content

Commit

Permalink
Merge pull request alteryx#49 from mateiz/kryo-fix-2
Browse files Browse the repository at this point in the history
Fix Chill serialization of Range objects

It used to write out each element one by one, creating very large objects.

(cherry picked from commit 320418f)
Signed-off-by: Reynold Xin <[email protected]>
  • Loading branch information
rxin committed Oct 9, 2013
1 parent 0b6f047 commit dfc62e2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.esotericsoftware.kryo.{KryoException, Kryo}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.twitter.chill.ScalaKryoInstantiator
import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}

import org.apache.spark.{SerializableWritable, Logging}
import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel}
Expand All @@ -39,7 +39,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
def newKryoOutput() = new KryoOutput(bufferSize)

def newKryo(): Kryo = {
val instantiator = new ScalaKryoInstantiator
val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()
val classLoader = Thread.currentThread.getContextClassLoader

Expand All @@ -49,7 +49,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
StorageLevel.MEMORY_ONLY,
PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock("1", ByteBuffer.allocate(1)),
GetBlock("1")
GetBlock("1"),
1 to 10,
1 until 10,
1L to 10L,
1L until 10L
)

for (obj <- toRegister) kryo.register(obj.getClass)
Expand All @@ -69,6 +73,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
case _: Exception => println("Failed to register spark.kryo.registrator")
}

// Register Chill's classes; we do this after our ranges and the user's own classes to let
// our code override the generic serialziers in Chill for things like Seq
new AllScalaRegistrar().apply(kryo)

kryo.setClassLoader(classLoader)

// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
}

test("ranges") {
val ser = (new KryoSerializer).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit < 100)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
check(1 until 1000000)
check(1 until 1000000 by 2)
check(1L to 1000000L)
check(1L to 1000000L by 2L)
check(1L until 1000000L)
check(1L until 1000000L by 2L)
check(1.0 to 1000000.0 by 1.0)
check(1.0 to 1000000.0 by 2.0)
check(1.0 until 1000000.0 by 1.0)
check(1.0 until 1000000.0 by 2.0)
}

test("custom registrator") {
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)

Expand Down

0 comments on commit dfc62e2

Please sign in to comment.