Skip to content

Commit

Permalink
fix: Fix a leak in PrefixAndTail operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 22, 2024
1 parent 798a54c commit 62e60a5
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ import pekko.util.ccompat.JavaConverters._
tailSource.setHandler(subHandler)
setKeepGoing(true)
scheduleOnce(SubscriptionTimer, timeout)
builder = null
Source.fromGraph(tailSource.source)
}

Expand All @@ -222,13 +221,16 @@ import pekko.util.ccompat.JavaConverters._
builder += grab(in)
left -= 1
if (left == 0) {
push(out, (builder.result(), openSubstream()))
val prefix = builder.result()
builder = null // free for GC
push(out, (prefix, openSubstream()))
complete(out)
} else pull(in)
}
}
override def onPull(): Unit = {
if (left == 0) {
builder = null // free for GC
push(out, (Nil, openSubstream()))
complete(out)
} else pull(in)
Expand All @@ -237,7 +239,9 @@ import pekko.util.ccompat.JavaConverters._
override def onUpstreamFinish(): Unit = {
if (!prefixComplete) {
// This handles the unpulled out case as well
emit(out, (builder.result(), Source.empty), () => completeStage())
val prefix = builder.result();
builder = null // free for GC
emit(out, (prefix, Source.empty), () => completeStage())
} else {
if (!tailSource.isClosed) tailSource.complete()
completeStage()
Expand Down

0 comments on commit 62e60a5

Please sign in to comment.