Skip to content

Commit

Permalink
chore: Reduce memory usage for many graphs.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 22, 2024
1 parent 798a54c commit 35b1a00
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import pekko.util.OptionVal
.mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy]
.propagateToNestedMaterialization
val matPromise = Promise[M]()
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
val accumulated = collection.mutable.Buffer.empty[In]
object logic extends GraphStageLogic(shape) with InHandler with OutHandler {
private val accumulated = collection.mutable.Buffer.empty[In]

private var subSource = OptionVal.none[SubSourceOutlet[In]]
private var subSink = OptionVal.none[SubSinkInlet[Out]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ import pekko.stream.stage._
* * a cancellation cause, if elem.isInstanceOf[Cancelled]
*/
var slot: Any = Empty

/**
* Clean up the connection after it has been completed or cancelled
*/
def cleanAfterCompleteOrCancelled(): Unit = {
inOwner = null
outOwner = null
inHandler = null
outHandler = null
}
}

private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] {
Expand Down Expand Up @@ -332,6 +342,7 @@ import pekko.stream.stage._
var i = 0
while (i < logics.length) {
val logic = logics(i)
logics(i) = null
if (!isStageCompleted(logic)) finalizeStage(logic)
i += 1
}
Expand Down Expand Up @@ -522,6 +533,7 @@ import pekko.stream.stage._
val cause = connection.slot.asInstanceOf[Cancelled].cause
connection.slot = Empty
connection.outHandler.onDownstreamFinish(cause)
connection.cleanAfterCompleteOrCancelled()
} else if ((code & (OutClosed | InClosed)) == OutClosed) {
// COMPLETIONS

Expand All @@ -536,6 +548,7 @@ import pekko.stream.stage._
completeConnection(connection.inOwner.stageId)
if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish()
else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex)
connection.cleanAfterCompleteOrCancelled()
} else {
// Push is pending, first process push, then re-enqueue closing event
processPush(connection)
Expand Down

0 comments on commit 35b1a00

Please sign in to comment.