From 62e60a56c1b1b06478a7df4025024dbbf2f90b62 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sun, 22 Dec 2024 19:17:38 +0800 Subject: [PATCH] fix: Fix a leak in PrefixAndTail operator. --- .../pekko/stream/impl/fusing/StreamOfStreams.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index dc84291b2ef..c674e78a313 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -211,7 +211,6 @@ import pekko.util.ccompat.JavaConverters._ tailSource.setHandler(subHandler) setKeepGoing(true) scheduleOnce(SubscriptionTimer, timeout) - builder = null Source.fromGraph(tailSource.source) } @@ -222,13 +221,16 @@ import pekko.util.ccompat.JavaConverters._ builder += grab(in) left -= 1 if (left == 0) { - push(out, (builder.result(), openSubstream())) + val prefix = builder.result() + builder = null // free for GC + push(out, (prefix, openSubstream())) complete(out) } else pull(in) } } override def onPull(): Unit = { if (left == 0) { + builder = null // free for GC push(out, (Nil, openSubstream())) complete(out) } else pull(in) @@ -237,7 +239,9 @@ import pekko.util.ccompat.JavaConverters._ override def onUpstreamFinish(): Unit = { if (!prefixComplete) { // This handles the unpulled out case as well - emit(out, (builder.result(), Source.empty), () => completeStage()) + val prefix = builder.result(); + builder = null // free for GC + emit(out, (prefix, Source.empty), () => completeStage()) } else { if (!tailSource.isClosed) tailSource.complete() completeStage()