Skip to content

Commit

Permalink
Add failing regression test to trigger Kryo re-use bug
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 20, 2015
1 parent eb4632f commit 71845e3
Showing 1 changed file with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.serializer

import java.io.ByteArrayOutputStream

import scala.collection.mutable
import scala.reflect.ClassTag

Expand Down Expand Up @@ -319,6 +321,35 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(!ser2.getAutoReset)
}

private def testSerializerInstanceReuse(autoReset: Boolean, referenceTracking: Boolean): Unit = {
val conf = new SparkConf(loadDefaults = false)
.set("spark.kryo.referenceTracking", referenceTracking.toString)
if (!autoReset) {
conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName)
}
val ser = new KryoSerializer(conf)
val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
assert (serInstance.getAutoReset() === autoReset)
val obj = ("Hello", "World")
def serializeObjects(): Array[Byte] = {
val baos = new ByteArrayOutputStream()
val serStream = serInstance.serializeStream(baos)
serStream.writeObject(obj)
serStream.writeObject(obj)
serStream.close()
baos.toByteArray
}
val output1: Array[Byte] = serializeObjects()
val output2: Array[Byte] = serializeObjects()
assert (output1 === output2)
}

for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) {
test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") {
testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking)
}
}
}


Expand Down

0 comments on commit 71845e3

Please sign in to comment.