Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source that flattens a task source and keeps the materialized value #5338

Merged
merged 1 commit into from
Oct 24, 2021

Conversation

ismaelhamed
Copy link
Member

@ismaelhamed ismaelhamed commented Oct 23, 2021

Port of FromFutureSource operator

@ismaelhamed ismaelhamed force-pushed the task-source-22360 branch 2 times, most recently from 09ede26 to 1fcba5b Compare October 24, 2021 06:46
@ismaelhamed ismaelhamed marked this pull request as ready for review October 24, 2021 08:13
@ismaelhamed
Copy link
Member Author

Looks like unrelated tests failing...

@Aaronontheweb
Copy link
Member

Looks like unrelated tests failing...

The bane of my existence...

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to add that static member back on the StreamDetachedException - although I don't think that exception is used at all outside of Akka.Streams itself so it's probably fine.

@@ -916,7 +916,8 @@ namespace Akka.Streams
}
public class StreamDetachedException : System.Exception
{
public static readonly Akka.Streams.StreamDetachedException Instance;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit of a breaking change here - should probably keep this member but also allow the public constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb should I send a new PR and add this back?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great

@@ -1946,6 +1947,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromObservable<T>(System.IObservable<T> observable, int maxBufferCapacity = 128, Akka.Streams.OverflowStrategy overflowStrategy = 2) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromPublisher<T>(Reactive.Streams.IPublisher<T> publisher) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromTask<T>(System.Threading.Tasks.Task<T> task) { }
public static Akka.Streams.Dsl.Source<T, System.Threading.Tasks.Task<M>> FromTaskSource<T, M>(System.Threading.Tasks.Task<Akka.Streams.Dsl.Source<T, M>> task) { }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// should complete as soon as inner source has been materialized
sourceMatVal.Result.Should().Be("foo");
sinkMatVal.Result.Should().BeEquivalentTo(ImmutableList.CreateRange(new List<int>() { 1, 2, 3 }));
}, _materializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb merged commit cd93348 into akkadotnet:dev Oct 24, 2021
@Aaronontheweb Aaronontheweb added this to the 1.4.28 milestone Oct 24, 2021
This was referenced Nov 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants