From cb26ad88c522070c66e979ab1ab0f040cd1bdbe7 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Thu, 4 May 2023 19:34:14 -0500 Subject: [PATCH] [SPARK-43378][CORE] Properly close stream objects in deserializeFromChunkedBuffer ### What changes were proposed in this pull request? Fixes a that SerializerHelper.deserializeFromChunkedBuffer does not call close on the deserialization stream. For some serializers like Kryo this creates a performance regressions as the kryo instances are not returned to the pool. ### Why are the changes needed? This causes a performance regression in Spark 3.4.0 for some workloads. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #41049 from eejbyfeldt/SPARK-43378. Authored-by: Emil Ejbyfeldt Signed-off-by: Sean Owen --- .../scala/org/apache/spark/serializer/SerializerHelper.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala index 2cff87990a487..54a0b2e339e22 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala @@ -49,6 +49,8 @@ private[spark] object SerializerHelper extends Logging { serializerInstance: SerializerInstance, bytes: ChunkedByteBuffer): T = { val in = serializerInstance.deserializeStream(bytes.toInputStream()) - in.readObject() + val res = in.readObject() + in.close() + res } }