From e19726d56eaeb8ad5558f09a2f3d3ec6e70bd2f7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 20 May 2015 13:23:25 -0700 Subject: [PATCH] Add fix for SPARK-7766. --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 ++ .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 2 ++ 2 files changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 64ba27f34d2f1..217957963437d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -177,6 +177,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ override def serialize[T: ClassTag](t: T): ByteBuffer = { output.clear() + kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) try { kryo.writeClassAndObject(output, t) } catch { @@ -202,6 +203,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ } override def serializeStream(s: OutputStream): SerializationStream = { + kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) new KryoSerializationStream(kryo, s) } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c8c896188ee0a..0bd91a8dba2ab 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -345,6 +345,8 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { assert (output1 === output2) } + // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling + // reference-tracking would lead to corrupted output when serializer instances are re-used for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) { test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") { testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking)