Skip to content

Commit

Permalink
chore: Fix leak in FlatMapPrefix operator. (#1622)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Jan 3, 2025
1 parent 03712a9 commit ce3620f
Showing 1 changed file with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ import pekko.util.OptionVal
.mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy]
.propagateToNestedMaterialization
val matPromise = Promise[M]()
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
val accumulated = collection.mutable.Buffer.empty[In]
object FlatMapPrefixLogic extends GraphStageLogic(shape) with InHandler with OutHandler {
private var left = n
private var builder = Vector.newBuilder[In]
builder.sizeHint(left)

private var subSource = OptionVal.none[SubSourceOutlet[In]]
private var subSink = OptionVal.none[SubSinkInlet[Out]]
Expand All @@ -65,11 +67,12 @@ import pekko.util.OptionVal
subSource match {
case OptionVal.Some(s) => s.push(grab(in))
case _ =>
accumulated.append(grab(in))
if (accumulated.size == n) {
builder += grab(in)
left -= 1
if (left == 0) {
materializeFlow()
} else {
// gi'me some more!
// give me some more!
pull(in)
}
}
Expand Down Expand Up @@ -98,12 +101,12 @@ import pekko.util.OptionVal
// delegate to subSink
s.pull()
case _ =>
if (accumulated.size < n) pull(in)
else if (accumulated.size == n) {
if (left > 0) pull(in)
else if (left == 0) {
// corner case for n = 0, can be handled in FlowOps
materializeFlow()
} else {
throw new IllegalStateException(s"Unexpected accumulated size: ${accumulated.size} (n: $n)")
throw new IllegalStateException(s"Unexpected accumulated size, left : $left (n: $n)")
}
}
}
Expand All @@ -114,7 +117,7 @@ import pekko.util.OptionVal
case _ =>
if (propagateToNestedMaterialization) {
downstreamCause = OptionVal.Some(cause)
if (accumulated.size == n) {
if (left == 0) {
// corner case for n = 0, can be handled in FlowOps
materializeFlow()
} else if (!hasBeenPulled(in)) { // if in was already closed, nested flow would have already been materialized
Expand All @@ -128,8 +131,8 @@ import pekko.util.OptionVal

def materializeFlow(): Unit =
try {
val prefix = accumulated.toVector
accumulated.clear()
val prefix = builder.result()
builder = null // free for GC
subSource = OptionVal.Some(new SubSourceOutlet[In]("FlatMapPrefix.subSource"))
val theSubSource = subSource.get
subSink = OptionVal.Some(new SubSinkInlet[Out]("FlatMapPrefix.subSink"))
Expand Down Expand Up @@ -196,6 +199,6 @@ import pekko.util.OptionVal
case NonFatal(ex) => failStage(ex)
}
}
(logic, matPromise.future)
(FlatMapPrefixLogic, matPromise.future)
}
}

0 comments on commit ce3620f

Please sign in to comment.