Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature - Akka.Streams] ForEachAsync #6391

Closed
george-zubrienko opened this issue Feb 7, 2023 · 5 comments · Fixed by #6538
Closed

[Feature - Akka.Streams] ForEachAsync #6391

george-zubrienko opened this issue Feb 7, 2023 · 5 comments · Fixed by #6538

Comments

@george-zubrienko
Copy link
Contributor

george-zubrienko commented Feb 7, 2023

Is your feature request related to a problem? Please describe.
Imagine a scenario: consume input, process in a stream, sink to an external destination (write to database, send an api request etc). In this case, sink will be performing operation that is best run as a Task<TResult>. But currently there is no Sink.ForEachAsync to facilitate that, so available options are:

  • var mySink = Sink.ForEach<TIn>(async myVar => ...). This creates an async void delegate, which does not really behave as a loop where each interation does an async call. Also, exception propagation will not work, and IDEs/compiler will issue a warning for this kind of code.
  • This workaround:
var mySink = Flow.Create<TIn>()
 .SelectAsync(...)
 .ToMaterialized(Sink.ForEach<TOut>(_ => { }), Keep.Right)
 .Named(...) 

Which works fine, but is not super elegant :)

OG Akka has this: https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreachAsync.html

Describe the solution you'd like

private Task<TResult> MySinkLogic(..)
{
  ...
}

var mySink = Sink.ForEachAsync(4, myVar => MySinkLogic(myVar))
@george-zubrienko george-zubrienko changed the title [Feature] ForEachAsync [Feature - Akka.Streams] ForEachAsync Feb 7, 2023
@ismaelhamed
Copy link
Member

@george-zubrienko ForeachAsync is no fancier than this:

Flow.Create<T>()
    .SelectAsyncUnordered(parallelism, f)
    .ToMaterialized(Sink.Ignore<T>(), Keep.Right);

which you can use in the meantime to achieve the exact same behavior.

@F0b0s
Copy link
Contributor

F0b0s commented Mar 8, 2023

As i see we already have ForEachAsync there

Can you use it?

@F0b0s
Copy link
Contributor

F0b0s commented Mar 12, 2023

@george-zubrienko

@george-zubrienko
Copy link
Contributor Author

@F0b0s I don't see than in pre-1.5 released packages, the code you referenced is in dev branch. As of, can this be used for the issue in question, I think yes, that is 1-1 implementation of foreachAsync from the Scala API I referenced in the description.

@F0b0s
Copy link
Contributor

F0b0s commented Mar 20, 2023

@george-zubrienko check it in 1.5.1 please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants