Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix leak in FlatMapPrefix operator. #1622

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Dec 22, 2024

extracted from #1621

Motivation:
In #1566 , @queimadus points out a leak

Modification:

  1. change the builder from a buffer to a vector builder.
  2. free that builder to null after the prefix is generated.

Result:
leak fixed.

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
import org.apache.pekko.util.ByteString

import scala.concurrent.Await


object PekkoQuickstart extends App {
  private implicit val system: ActorSystem = ActorSystem()

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 400000))
    .take(1000000)
    .flatMapPrefix(50000) { _ => Flow[ByteString] }

  val r = Source.empty
    .concatAllLazy(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore)

  Await.result(r, scala.concurrent.duration.Duration.Inf)
  println(r.value)

//  Source
//    .repeat(s)
//    .take(30000)
//    .flatMapConcat(x => x)
//    .runWith(Sink.ignore)
//    .onComplete(println(_))

//  Source.empty
//    .concatAllLazy(List.tabulate(30000)(_ => Source.lazySource(() => s)): _*)
//    .runWith(Sink.ignore).onComplete(println(_))
}

@He-Pin He-Pin added t:stream Pekko Streams bug Something isn't working backport labels Dec 22, 2024
@He-Pin He-Pin added this to the 1.1.3 milestone Dec 22, 2024
@He-Pin He-Pin changed the title chore: Fix leak in FlatMapPrefix operator. fix: Fix leak in FlatMapPrefix operator. Dec 22, 2024
val prefix = accumulated.toVector
accumulated.clear()
val prefix = builder.result()
builder = null // free for GC
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@queimadus would you like to give this patch a try?

Copy link

@queimadus queimadus Dec 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it a try but I'm not sure this will work because prefixAndTail uses a similar implementation and has the same problem described in #1566.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@queimadus There is a fix for prefixAndTail too #1623

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There do have leaks in the interpreter, eg the logic is still being refs.
I did a try in #1621, which needs changing more code to clean up the logics.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change does seem to help when running the provided example. This crash by lack of memory. Previous run would crash out at around the 2 minute mark.
Screenshot 2024-12-22 at 11 34 34

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original code before I simplified it with what looked to be the MRE, was more of a:

  def myLogic(prefix: Seq[ByteString]): Flow[ByteString, ByteString, NotUsed] =
    Flow[ByteString]

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 400000))
    .take(1000000)
    .prefixAndTail(50000)
    .flatMapConcat { case (prefix, tail) =>
      Source(prefix).concatLazy(tail).via(myLogic(prefix))
    }

  Source.empty
    .concatAllLazy(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore).onComplete(println(_))

Where I would need to use the prefix to generate the remaining flow logic and then emit the prefix back again. It still runs out of heap in this case, but it may be a different problem?

Nonetheless, I think this change makes sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefixAndTail fix is in another PR, could you verify that too?
@queimadus #1623 this one

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, this would be the flatMapPrefix equivalent that wouldn't work with Concats but work with flatMapConcat (the commented code):

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 400000))
    .take(1000000)
    .flatMapPrefix(50000) { prefix =>
      Flow[ByteString].prepend(Source(prefix))
    }

  Source.empty
    .concatAllLazy(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore).onComplete(println(_))

//  Source
//    .repeat(s)
//    .take(30000)
//    .flatMapConcat(x => x)
//    .runWith(Sink.ignore)
//    .onComplete(println(_))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the concat should be reimplemented to reduce memory usage somehow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport bug Something isn't working t:stream Pekko Streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants