From 35b1a009aff5ad28df45fea9b7cebfe9e46c0b22 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sun, 22 Dec 2024 17:35:53 +0800 Subject: [PATCH] chore: Reduce memory usage for many graphs. --- .../pekko/stream/impl/fusing/FlatMapPrefix.scala | 4 ++-- .../pekko/stream/impl/fusing/GraphInterpreter.scala | 13 +++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala index c3672ae1eaa..5f033fb702d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala @@ -43,8 +43,8 @@ import pekko.util.OptionVal .mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy] .propagateToNestedMaterialization val matPromise = Promise[M]() - val logic = new GraphStageLogic(shape) with InHandler with OutHandler { - val accumulated = collection.mutable.Buffer.empty[In] + object logic extends GraphStageLogic(shape) with InHandler with OutHandler { + private val accumulated = collection.mutable.Buffer.empty[In] private var subSource = OptionVal.none[SubSourceOutlet[In]] private var subSink = OptionVal.none[SubSinkInlet[Out]] diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala index b3be4fbe5bd..3d46a554d5d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala @@ -111,6 +111,16 @@ import pekko.stream.stage._ * * a cancellation cause, if elem.isInstanceOf[Cancelled] */ var slot: Any = Empty + + /** + * Clean up the connection after it has been completed or cancelled + */ + def cleanAfterCompleteOrCancelled(): Unit = { + inOwner = null + outOwner = null + inHandler = null + outHandler = null + } } private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] { @@ -332,6 +342,7 @@ import pekko.stream.stage._ var i = 0 while (i < logics.length) { val logic = logics(i) + logics(i) = null if (!isStageCompleted(logic)) finalizeStage(logic) i += 1 } @@ -522,6 +533,7 @@ import pekko.stream.stage._ val cause = connection.slot.asInstanceOf[Cancelled].cause connection.slot = Empty connection.outHandler.onDownstreamFinish(cause) + connection.cleanAfterCompleteOrCancelled() } else if ((code & (OutClosed | InClosed)) == OutClosed) { // COMPLETIONS @@ -536,6 +548,7 @@ import pekko.stream.stage._ completeConnection(connection.inOwner.stageId) if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish() else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex) + connection.cleanAfterCompleteOrCancelled() } else { // Push is pending, first process push, then re-enqueue closing event processPush(connection)