Skip to content

Commit

Permalink
[SPARK-7766] KryoSerializerInstance reuse is unsafe when auto-reset i…
Browse files Browse the repository at this point in the history
…s disabled

SPARK-3386 / apache#5606 modified the shuffle write path to re-use serializer instances across multiple calls to DiskBlockObjectWriter. It turns out that this introduced a very rare bug when using `KryoSerializer`: if auto-reset is disabled and reference-tracking is enabled, then we'll end up re-using the same serializer instance to write multiple output streams without calling `reset()` between write calls, which can lead to cases where objects in one file may contain references to objects that are in previous files, causing errors during deserialization.

This patch fixes this bug by calling `reset()` at the start of `serialize()` and `serializeStream()`. I also added a regression test which demonstrates that this problem only occurs when auto-reset is disabled and reference-tracking is enabled.

Author: Josh Rosen <[email protected]>

Closes apache#6293 from JoshRosen/kryo-instance-reuse-bug and squashes the following commits:

e19726d [Josh Rosen] Add fix for SPARK-7766.
71845e3 [Josh Rosen] Add failing regression test to trigger Kryo re-use bug
  • Loading branch information
JoshRosen authored and nemccarthy committed Jun 19, 2015
1 parent dce43d7 commit f6e9eef
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ

override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.clear()
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
try {
kryo.writeClassAndObject(output, t)
} catch {
Expand All @@ -202,6 +203,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
}

override def serializeStream(s: OutputStream): SerializationStream = {
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
new KryoSerializationStream(kryo, s)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.serializer

import java.io.ByteArrayOutputStream

import scala.collection.mutable
import scala.reflect.ClassTag

Expand Down Expand Up @@ -319,6 +321,37 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(!ser2.getAutoReset)
}

private def testSerializerInstanceReuse(autoReset: Boolean, referenceTracking: Boolean): Unit = {
val conf = new SparkConf(loadDefaults = false)
.set("spark.kryo.referenceTracking", referenceTracking.toString)
if (!autoReset) {
conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName)
}
val ser = new KryoSerializer(conf)
val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
assert (serInstance.getAutoReset() === autoReset)
val obj = ("Hello", "World")
def serializeObjects(): Array[Byte] = {
val baos = new ByteArrayOutputStream()
val serStream = serInstance.serializeStream(baos)
serStream.writeObject(obj)
serStream.writeObject(obj)
serStream.close()
baos.toByteArray
}
val output1: Array[Byte] = serializeObjects()
val output2: Array[Byte] = serializeObjects()
assert (output1 === output2)
}

// Regression test for SPARK-7766, an issue where disabling auto-reset and enabling
// reference-tracking would lead to corrupted output when serializer instances are re-used
for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) {
test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") {
testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking)
}
}
}


Expand Down

0 comments on commit f6e9eef

Please sign in to comment.