diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt index 96691adb052..16c22b566ab 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt @@ -1300,6 +1300,16 @@ namespace Akka.Streams.Dsl public Akka.Streams.Outlet Out(int id) { } public override string ToString() { } } + public class static ChannelSink + { + public static Akka.Streams.Dsl.Sink> AsReader(int bufferSize, bool singleReader = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } + public static Akka.Streams.Dsl.Sink FromWriter(System.Threading.Channels.ChannelWriter writer, bool isOwner) { } + } + public class static ChannelSource + { + public static Akka.Streams.Dsl.Source> Create(int bufferSize, bool singleWriter = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } + public static Akka.Streams.Dsl.Source FromReader(System.Threading.Channels.ChannelReader reader) { } + } public class static Concat { public static Akka.Streams.IGraph, Akka.NotUsed> Create(int inputPorts = 2) { } @@ -1938,6 +1948,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> AsObservable() { } public static Akka.Streams.Dsl.Sink> AsPublisher(bool fanout) { } public static Akka.Streams.Dsl.Sink Cancelled() { } + public static Akka.Streams.Dsl.Sink> ChannelReader(int bufferSize, bool singleReader, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } public static Akka.Streams.Dsl.Sink Combine(System.Func, TMat>> strategy, Akka.Streams.Dsl.Sink first, Akka.Streams.Dsl.Sink second, params Akka.Streams.Dsl.Sink<, >[] rest) { } public static Akka.Streams.Dsl.Sink Create(Reactive.Streams.ISubscriber subscriber) { } [Akka.Annotations.InternalApiAttribute()] @@ -1950,6 +1961,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> ForEachParallel(int parallelism, System.Action action) { } public static Akka.Streams.Dsl.Sink FromGraph(Akka.Streams.IGraph, TMat> graph) { } public static Akka.Streams.Dsl.Sink FromSubscriber(Reactive.Streams.ISubscriber subscriber) { } + public static Akka.Streams.Dsl.Sink FromWriter(System.Threading.Channels.ChannelWriter writer, bool isOwner) { } public static Akka.Streams.Dsl.Sink> Ignore() { } public static Akka.Streams.Dsl.Sink> Last() { } public static Akka.Streams.Dsl.Sink> LastOrDefault() { } @@ -1995,6 +2007,8 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source ActorPublisher(Akka.Actor.Props props) { } public static Akka.Streams.Dsl.Source ActorRef(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { } public static Akka.Streams.Dsl.Source> AsSubscriber() { } + public static Akka.Streams.Dsl.Source> Channel(int bufferSize, bool singleWriter = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } + public static Akka.Streams.Dsl.Source ChannelReader(System.Threading.Channels.ChannelReader channelReader) { } public static Akka.Streams.Dsl.Source Combine(Akka.Streams.Dsl.Source first, Akka.Streams.Dsl.Source second, System.Func, Akka.NotUsed>> strategy, params Akka.Streams.Dsl.Source<, >[] rest) { } public static Akka.Streams.Dsl.Source CombineMaterialized(Akka.Streams.Dsl.Source first, Akka.Streams.Dsl.Source second, System.Func, Akka.NotUsed>> strategy, System.Func combineMaterializers) { } public static Akka.Streams.Dsl.Source Cycle(System.Func> enumeratorFactory) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index 6f67a52a798..ee8bdef7f68 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -1300,6 +1300,16 @@ namespace Akka.Streams.Dsl public Akka.Streams.Outlet Out(int id) { } public override string ToString() { } } + public class static ChannelSink + { + public static Akka.Streams.Dsl.Sink> AsReader(int bufferSize, bool singleReader = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } + public static Akka.Streams.Dsl.Sink FromWriter(System.Threading.Channels.ChannelWriter writer, bool isOwner) { } + } + public class static ChannelSource + { + public static Akka.Streams.Dsl.Source> Create(int bufferSize, bool singleWriter = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } + public static Akka.Streams.Dsl.Source FromReader(System.Threading.Channels.ChannelReader reader) { } + } public class static Concat { public static Akka.Streams.IGraph, Akka.NotUsed> Create(int inputPorts = 2) { } @@ -1938,6 +1948,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> AsObservable() { } public static Akka.Streams.Dsl.Sink> AsPublisher(bool fanout) { } public static Akka.Streams.Dsl.Sink Cancelled() { } + public static Akka.Streams.Dsl.Sink> ChannelReader(int bufferSize, bool singleReader, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } public static Akka.Streams.Dsl.Sink Combine(System.Func, TMat>> strategy, Akka.Streams.Dsl.Sink first, Akka.Streams.Dsl.Sink second, params Akka.Streams.Dsl.Sink<, >[] rest) { } public static Akka.Streams.Dsl.Sink Create(Reactive.Streams.ISubscriber subscriber) { } [Akka.Annotations.InternalApiAttribute()] @@ -1950,6 +1961,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> ForEachParallel(int parallelism, System.Action action) { } public static Akka.Streams.Dsl.Sink FromGraph(Akka.Streams.IGraph, TMat> graph) { } public static Akka.Streams.Dsl.Sink FromSubscriber(Reactive.Streams.ISubscriber subscriber) { } + public static Akka.Streams.Dsl.Sink FromWriter(System.Threading.Channels.ChannelWriter writer, bool isOwner) { } public static Akka.Streams.Dsl.Sink> Ignore() { } public static Akka.Streams.Dsl.Sink> Last() { } public static Akka.Streams.Dsl.Sink> LastOrDefault() { } @@ -1995,6 +2007,8 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source ActorPublisher(Akka.Actor.Props props) { } public static Akka.Streams.Dsl.Source ActorRef(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { } public static Akka.Streams.Dsl.Source> AsSubscriber() { } + public static Akka.Streams.Dsl.Source> Channel(int bufferSize, bool singleWriter = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } + public static Akka.Streams.Dsl.Source ChannelReader(System.Threading.Channels.ChannelReader channelReader) { } public static Akka.Streams.Dsl.Source Combine(Akka.Streams.Dsl.Source first, Akka.Streams.Dsl.Source second, System.Func, Akka.NotUsed>> strategy, params Akka.Streams.Dsl.Source<, >[] rest) { } public static Akka.Streams.Dsl.Source CombineMaterialized(Akka.Streams.Dsl.Source first, Akka.Streams.Dsl.Source second, System.Func, Akka.NotUsed>> strategy, System.Func combineMaterializers) { } public static Akka.Streams.Dsl.Source Cycle(System.Func> enumeratorFactory) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index 96691adb052..16c22b566ab 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -1300,6 +1300,16 @@ namespace Akka.Streams.Dsl public Akka.Streams.Outlet Out(int id) { } public override string ToString() { } } + public class static ChannelSink + { + public static Akka.Streams.Dsl.Sink> AsReader(int bufferSize, bool singleReader = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } + public static Akka.Streams.Dsl.Sink FromWriter(System.Threading.Channels.ChannelWriter writer, bool isOwner) { } + } + public class static ChannelSource + { + public static Akka.Streams.Dsl.Source> Create(int bufferSize, bool singleWriter = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } + public static Akka.Streams.Dsl.Source FromReader(System.Threading.Channels.ChannelReader reader) { } + } public class static Concat { public static Akka.Streams.IGraph, Akka.NotUsed> Create(int inputPorts = 2) { } @@ -1938,6 +1948,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> AsObservable() { } public static Akka.Streams.Dsl.Sink> AsPublisher(bool fanout) { } public static Akka.Streams.Dsl.Sink Cancelled() { } + public static Akka.Streams.Dsl.Sink> ChannelReader(int bufferSize, bool singleReader, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } public static Akka.Streams.Dsl.Sink Combine(System.Func, TMat>> strategy, Akka.Streams.Dsl.Sink first, Akka.Streams.Dsl.Sink second, params Akka.Streams.Dsl.Sink<, >[] rest) { } public static Akka.Streams.Dsl.Sink Create(Reactive.Streams.ISubscriber subscriber) { } [Akka.Annotations.InternalApiAttribute()] @@ -1950,6 +1961,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Sink> ForEachParallel(int parallelism, System.Action action) { } public static Akka.Streams.Dsl.Sink FromGraph(Akka.Streams.IGraph, TMat> graph) { } public static Akka.Streams.Dsl.Sink FromSubscriber(Reactive.Streams.ISubscriber subscriber) { } + public static Akka.Streams.Dsl.Sink FromWriter(System.Threading.Channels.ChannelWriter writer, bool isOwner) { } public static Akka.Streams.Dsl.Sink> Ignore() { } public static Akka.Streams.Dsl.Sink> Last() { } public static Akka.Streams.Dsl.Sink> LastOrDefault() { } @@ -1995,6 +2007,8 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source ActorPublisher(Akka.Actor.Props props) { } public static Akka.Streams.Dsl.Source ActorRef(int bufferSize, Akka.Streams.OverflowStrategy overflowStrategy) { } public static Akka.Streams.Dsl.Source> AsSubscriber() { } + public static Akka.Streams.Dsl.Source> Channel(int bufferSize, bool singleWriter = False, System.Threading.Channels.BoundedChannelFullMode fullMode = 0) { } + public static Akka.Streams.Dsl.Source ChannelReader(System.Threading.Channels.ChannelReader channelReader) { } public static Akka.Streams.Dsl.Source Combine(Akka.Streams.Dsl.Source first, Akka.Streams.Dsl.Source second, System.Func, Akka.NotUsed>> strategy, params Akka.Streams.Dsl.Source<, >[] rest) { } public static Akka.Streams.Dsl.Source CombineMaterialized(Akka.Streams.Dsl.Source first, Akka.Streams.Dsl.Source second, System.Func, Akka.NotUsed>> strategy, System.Func combineMaterializers) { } public static Akka.Streams.Dsl.Source Cycle(System.Func> enumeratorFactory) { } diff --git a/src/core/Akka.Streams.Tests/Implementation/ChannelSinkSpec.cs b/src/core/Akka.Streams.Tests/Implementation/ChannelSinkSpec.cs new file mode 100644 index 00000000000..d7e243ebfdb --- /dev/null +++ b/src/core/Akka.Streams.Tests/Implementation/ChannelSinkSpec.cs @@ -0,0 +1,239 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Streams.Dsl; +using Akka.Streams.TestKit; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests.Implementation +{ + public class ChannelSinkSpec : Akka.TestKit.Xunit2.TestKit + { + private readonly ActorMaterializer _materializer; + + public ChannelSinkSpec(ITestOutputHelper output) : base(output: output) + { + _materializer = Sys.Materializer(); + } + + #region from writer + + [Fact] + public async Task ChannelSink_writer_when_isOwner_should_complete_channel_with_success_when_upstream_completes() + { + var probe = this.CreateManualPublisherProbe(); + var channel = Channel.CreateBounded(10); + + Source.FromPublisher(probe) + .To(ChannelSink.FromWriter(channel.Writer, true)) + .Run(_materializer); + + var subscription = probe.ExpectSubscription(); + subscription.SendComplete(); + + channel.Reader.Completion.Wait(1.Seconds()).Should().BeTrue(); + } + + [Fact] + public async Task ChannelSink_writer_isOwner_should_complete_channel_with_failure_when_upstream_fails() + { + var exception = new Exception("BOOM!"); + + try + { + var probe = this.CreateManualPublisherProbe(); + var channel = Channel.CreateBounded(10); + + Source.FromPublisher(probe) + .To(ChannelSink.FromWriter(channel.Writer, true)) + .Run(_materializer); + + var subscription = probe.ExpectSubscription(); + subscription.SendError(exception); + + await channel.Reader.Completion; + } + catch (Exception e) + { + e.Should().Be(exception); + } + } + + [Fact] + public async Task ChannelSink_writer_when_NOT_owner_should_leave_channel_active() + { + var probe = this.CreateManualPublisherProbe(); + var channel = Channel.CreateBounded(10); + + Source.FromPublisher(probe) + .To(ChannelSink.FromWriter(channel.Writer, false)) + .Run(_materializer); + + var subscription = probe.ExpectSubscription(); + subscription.SendComplete(); + + channel.Reader.Completion.Wait(TimeSpan.FromSeconds(1)).Should().BeFalse(); + + var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await channel.Writer.WriteAsync(11, cancel.Token); + var value = await channel.Reader.ReadAsync(cancel.Token); + value.Should().Be(11); + } + + [Fact] + public async Task ChannelSink_writer_NOT_owner_should_leave_channel_active() + { + var exception = new Exception("BOOM!"); + + var probe = this.CreateManualPublisherProbe(); + var channel = Channel.CreateBounded(10); + + Source.FromPublisher(probe) + .To(ChannelSink.FromWriter(channel.Writer, false)) + .Run(_materializer); + + var subscription = probe.ExpectSubscription(); + subscription.SendError(exception); + + channel.Reader.Completion.Wait(TimeSpan.FromSeconds(1)).Should().BeFalse(); + + var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await channel.Writer.WriteAsync(11, cancel.Token); + var value = await channel.Reader.ReadAsync(cancel.Token); + value.Should().Be(11); + } + + [Fact] + public async Task ChannelSink_writer_should_propagate_elements_to_channel() + { + var probe = this.CreateManualPublisherProbe(); + var channel = Channel.CreateBounded(10); + + Source.FromPublisher(probe) + .To(ChannelSink.FromWriter(channel.Writer, true)) + .Run(_materializer); + + var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var subscription = probe.ExpectSubscription(); + var n = subscription.ExpectRequest(); + + Sys.Log.Info("Requested for {0} elements", n); + + var i = 1; + + for (; i <= n; i++) + subscription.SendNext(i); + + for (int j = 0; j < n; j++) + { + var value = await channel.Reader.ReadAsync(cancel.Token); + value.Should().Be(j + 1); + } + + var m = subscription.ExpectRequest() + n; + Sys.Log.Info("Requested for {0} elements", m - n); + + for (; i <= m; i++) + { + subscription.SendNext(i); + var value = await channel.Reader.ReadAsync(cancel.Token); + value.Should().Be(i); + } + } + + #endregion + + #region as reader + + [Fact] + public async Task ChannelSink_reader_should_complete_channel_with_success_when_upstream_completes() + { + var probe = this.CreateManualPublisherProbe(); + + var reader = Source.FromPublisher(probe) + .ToMaterialized(ChannelSink.AsReader(10), Keep.Right) + .Run(_materializer); + + var subscription = probe.ExpectSubscription(); + subscription.SendComplete(); + + reader.Completion.Wait(1.Seconds()).Should().BeTrue(); + } + + [Fact] + public async Task ChannelSink_reader_should_complete_channel_with_failure_when_upstream_fails() + { + var exception = new Exception("BOOM!"); + + try + { + var probe = this.CreateManualPublisherProbe(); + + var reader = Source.FromPublisher(probe) + .ToMaterialized(ChannelSink.AsReader(10), Keep.Right) + .Run(_materializer); + + var subscription = probe.ExpectSubscription(); + subscription.SendError(exception); + + await reader.Completion; + } + catch (Exception e) + { + e.Should().Be(exception); + } + } + + [Fact] + public async Task ChannelSink_reader_should_propagate_elements_to_channel() + { + var probe = this.CreateManualPublisherProbe(); + + var reader = Source.FromPublisher(probe) + .ToMaterialized(ChannelSink.AsReader(10), Keep.Right) + .Run(_materializer); + + var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var subscription = probe.ExpectSubscription(); + var n = subscription.ExpectRequest(); + + Sys.Log.Info("Requested for {0} elements", n); + + var i = 1; + + for (; i <= n; i++) + subscription.SendNext(i); + + for (int j = 0; j < n; j++) + { + Sys.Log.Info("Request: {0}",j); + var value = await reader.ReadAsync(cancel.Token); + Sys.Log.Info("Received: {0}",value); + value.Should().Be(j + 1); + } + + var m = subscription.ExpectRequest() + n; + Sys.Log.Info("Requested for {0} elements", m - n); + + for (; i <= m; i++) + { + subscription.SendNext(i); + var value = await reader.ReadAsync(cancel.Token); + value.Should().Be(i); + } + } + + #endregion + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs b/src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs new file mode 100644 index 00000000000..56e2e096bad --- /dev/null +++ b/src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs @@ -0,0 +1,96 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Streams.Dsl; +using Akka.Streams.TestKit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests.Implementation +{ + public class ChannelSourceSpec : Akka.TestKit.Xunit2.TestKit + { + private readonly ActorMaterializer _materializer; + + public ChannelSourceSpec(ITestOutputHelper output) : base(output: output) + { + _materializer = Sys.Materializer(); + } + + [Fact] + public void ChannelSource_must_complete_after_channel_completes() + { + var channel = Channel.CreateUnbounded(); + var probe = this.CreateManualSubscriberProbe(); + + ChannelSource.FromReader(channel) + .To(Sink.FromSubscriber(probe)) + .Run(_materializer); + + var subscription = probe.ExpectSubscription(); + subscription.Request(2); + + channel.Writer.Complete(); + + probe.ExpectComplete(); + } + + + [Fact] + public void ChannelSource_must_complete_after_channel_fails() + { + var channel = Channel.CreateUnbounded(); + var probe = this.CreateManualSubscriberProbe(); + var failure = new Exception("BOOM!"); + + ChannelSource.FromReader(channel) + .To(Sink.FromSubscriber(probe)) + .Run(_materializer); + + var subscription = probe.ExpectSubscription(); + subscription.Request(2); + + channel.Writer.Complete(failure); + + probe.ExpectError().InnerException.Should().Be(failure); + } + + [Fact] + public async Task ChannelSource_must_read_incoming_events() + { + var tcs = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var channel = Channel.CreateBounded(3); + await channel.Writer.WriteAsync(1, tcs.Token); + await channel.Writer.WriteAsync(2, tcs.Token); + await channel.Writer.WriteAsync(3, tcs.Token); + + var probe = this.CreateManualSubscriberProbe(); + + ChannelSource.FromReader(channel) + .To(Sink.FromSubscriber(probe)) + .Run(_materializer); + + var subscription = probe.ExpectSubscription(); + subscription.Request(5); + + probe.ExpectNext(1); + probe.ExpectNext(2); + + await channel.Writer.WriteAsync(4, tcs.Token); + await channel.Writer.WriteAsync(5, tcs.Token); + + probe.ExpectNext(3); + probe.ExpectNext(4); + probe.ExpectNext(5); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/ChannelSink.cs b/src/core/Akka.Streams/Dsl/ChannelSink.cs new file mode 100644 index 00000000000..f32dab038db --- /dev/null +++ b/src/core/Akka.Streams/Dsl/ChannelSink.cs @@ -0,0 +1,51 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Akka.Streams.Implementation; + +namespace Akka.Streams.Dsl +{ + public static class ChannelSink + { + /// + /// Creates a Sink, that will emit incoming events directly into provider . + /// It will handle problems such as backpressure and . When + /// is set to true, it will also take responsibility to complete given . + /// + /// Type of events passed to . + /// A to pass events emitted from materialized graph to. + /// Determines materialized graph should be responsible for completing given . + /// + public static Sink FromWriter(ChannelWriter writer, bool isOwner) + { + if (writer is null) + ThrowArgumentNullException("writer"); + + return Sink.FromGraph(new ChannelWriterSink(writer, isOwner)); + } + + /// + /// Creates a sink that upon materialization, returns a connected with + /// this materialized graph. It can then be used to consume events incoming from the graph. It will + /// also be completed once upstream completes. + /// + /// + /// + /// + /// + /// + public static Sink> AsReader(int bufferSize, bool singleReader = false, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait) => + Sink.FromGraph(new ChannelReaderSink(bufferSize, singleReader)); + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowArgumentNullException(string name) => + throw new ArgumentNullException(name, "ChannelSink.FromWriter received null instead of ChannelWriter`1."); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/ChannelSource.cs b/src/core/Akka.Streams/Dsl/ChannelSource.cs new file mode 100644 index 00000000000..8bd7d62b75b --- /dev/null +++ b/src/core/Akka.Streams/Dsl/ChannelSource.cs @@ -0,0 +1,51 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Akka.Streams.Implementation; + +namespace Akka.Streams.Dsl +{ + /// + /// Container class for Akka.Streams factory methods, + /// which can be used to create sources from readable channels. + /// + public static class ChannelSource + { + /// + /// Creates an Akka.Streams from a given instance of + /// . It will propagate backpressure from the downstream + /// to guarantee resource-safe communication as well as will react when + /// will complete (successfully or with failure) and finish downstream accordingly. + /// + /// + /// + /// + public static Source FromReader(ChannelReader reader) + { + if (reader is null) + ThrowArgumentNullException("reader"); + + return Source.FromGraph(new ChannelReaderSource(reader)); + } + + public static Source> Create(int bufferSize, + bool singleWriter = false, + BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait) + { + return Source.FromGraph( + new ChannelReaderWithMaterializedWriterSource(bufferSize, + singleWriter, fullMode)); + } + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowArgumentNullException(string name) => + throw new ArgumentNullException(name, "ChannelSource.FromReader expected ChannelReader`1 but received null."); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/Sink.cs b/src/core/Akka.Streams/Dsl/Sink.cs index 70270060642..34a59570d13 100644 --- a/src/core/Akka.Streams/Dsl/Sink.cs +++ b/src/core/Akka.Streams/Dsl/Sink.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Immutable; +using System.Threading.Channels; using System.Threading.Tasks; using Akka.Actor; using Akka.Annotations; @@ -682,5 +683,16 @@ public static Sink> AsPublisher(bool fanout) /// /// public static Sink> AsObservable() => FromGraph(new ObservableSinkStage()); + + public static Sink> ChannelReader(int bufferSize, bool singleReader, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait) + { + return ChannelSink.AsReader(bufferSize, singleReader, fullMode); + } + + public static Sink FromWriter(ChannelWriter writer, + bool isOwner) + { + return ChannelSink.FromWriter(writer, isOwner); + } } } diff --git a/src/core/Akka.Streams/Dsl/Source.cs b/src/core/Akka.Streams/Dsl/Source.cs index 73beb211057..1c6d31127a3 100644 --- a/src/core/Akka.Streams/Dsl/Source.cs +++ b/src/core/Akka.Streams/Dsl/Source.cs @@ -11,6 +11,7 @@ using System.Linq; using System.Reflection; using System.Runtime.ExceptionServices; +using System.Threading.Channels; using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Dsl.Internal; @@ -1075,5 +1076,31 @@ public static Source FromObservable( { return FromGraph(new ObservableSourceStage(observable, maxBufferCapacity, overflowStrategy)); } + + public static Source ChannelReader( + ChannelReader channelReader) + { + return ChannelSource.FromReader(channelReader); + } + + /// + /// Creates a Source that materializes a + /// that may be used to write items to the stream. + /// + /// This works similarly to , + /// The main difference being that you are allowed to have multiple + /// Writes in flight. Allowing multiple writes makes Multi-producer + /// scenarios easier but is still an important semantic difference. + /// + /// + /// The size of the channel's buffer + /// If true, expects only one writer + /// How the channel behaves when full + public static Source> Channel(int bufferSize, + bool singleWriter = false, + BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait) + { + return ChannelSource.Create(bufferSize, singleWriter, fullMode); + } } } diff --git a/src/core/Akka.Streams/Implementation/ChannelSinks.cs b/src/core/Akka.Streams/Implementation/ChannelSinks.cs new file mode 100644 index 00000000000..33011e3d8eb --- /dev/null +++ b/src/core/Akka.Streams/Implementation/ChannelSinks.cs @@ -0,0 +1,170 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Streams.Stage; + +namespace Akka.Streams.Implementation +{ + internal sealed class ChannelSinkLogic : InGraphStageLogic + { + private readonly Inlet _inlet; + private readonly ChannelWriter _writer; + private readonly Action _onWriteAvailable; + private readonly Action _onWriteFailed; + private readonly Action> _onWriteReady; + private T _awaitingElement; + private readonly bool _isOwner; + + public ChannelSinkLogic(SinkShape shape, Inlet inlet, + ChannelWriter writer, bool isOwner) : base(shape) + { + _inlet = inlet; + _writer = writer; + _isOwner = isOwner; + _onWriteAvailable = GetAsyncCallback(OnWriteAvailable); + _onWriteFailed = GetAsyncCallback(OnWriteFailed); + _onWriteReady = OnWriteReady; + SetHandler(_inlet, this); + } + + private void OnWriteFailed(Exception cause) => TryFail(cause); + + private void OnWriteAvailable(bool available) + { + if (available && _writer.TryWrite(_awaitingElement)) + Pull(_inlet); + else + TryComplete(); + } + + private void TryComplete() + { + CompleteStage(); + if (_isOwner) + _writer.TryComplete(); + } + + private void TryFail(Exception e) + { + FailStage(e); + if (_isOwner) + _writer.TryComplete(e); + } + + public override void OnUpstreamFinish() + { + base.OnUpstreamFinish(); + if (_isOwner) + _writer.TryComplete(); + } + + public override void OnUpstreamFailure(Exception e) + { + base.OnUpstreamFailure(e); + if (_isOwner) + _writer.TryComplete(e); + } + + public override void PreStart() + { + Pull(_inlet); + base.PreStart(); + } + + public override void OnPush() + { + var element = Grab(_inlet); + if (_writer.TryWrite(element)) + { + Pull(_inlet); + } + else + { + var continuation = _writer.WaitToWriteAsync(); + if (continuation.IsCompletedSuccessfully) + { + var available = continuation.GetAwaiter().GetResult(); + if (available && _writer.TryWrite(element)) + { + Pull(_inlet); + } + else + { + TryComplete(); + } + } + else + { + var task = continuation.AsTask(); + _awaitingElement = element; + task.ContinueWith(_onWriteReady); + } + } + + } + + private void OnWriteReady(Task t) + { + if (t.IsFaulted) _onWriteFailed(t.Exception); + else if (t.IsCanceled) + _onWriteFailed(new TaskCanceledException(t)); + else _onWriteAvailable(t.Result); + } + } + + internal sealed class ChannelReaderSink : GraphStageWithMaterializedValue, ChannelReader> + { + private readonly int _bufferSize; + private readonly bool _singleReader; + private readonly BoundedChannelFullMode _fullMode; + + public ChannelReaderSink(int bufferSize, bool singleReader = false, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait) + { + _bufferSize = bufferSize; + _singleReader = singleReader; + _fullMode = fullMode; + Inlet = new Inlet("channelReader.in"); + Shape = new SinkShape(Inlet); + } + + public Inlet Inlet { get; } + public override SinkShape Shape { get; } + + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + { + var channel = Channel.CreateBounded(new BoundedChannelOptions(_bufferSize) + { + SingleWriter = true, + AllowSynchronousContinuations = false, + SingleReader = _singleReader, + FullMode = _fullMode + }); + return new LogicAndMaterializedValue>(new ChannelSinkLogic(this.Shape, this.Inlet, channel.Writer, true), channel); + } + } + + internal sealed class ChannelWriterSink : GraphStage> + { + private readonly ChannelWriter _writer; + private readonly bool _isOwner; + + public ChannelWriterSink(ChannelWriter writer, bool isOwner) + { + _writer = writer; + _isOwner = isOwner; + Inlet = new Inlet("channelReader.in"); + Shape = new SinkShape(Inlet); + } + + public Inlet Inlet { get; } + public override SinkShape Shape { get; } + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new ChannelSinkLogic(this.Shape, this.Inlet, _writer, _isOwner); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/ChannelSources.cs b/src/core/Akka.Streams/Implementation/ChannelSources.cs new file mode 100644 index 00000000000..818e5e5981a --- /dev/null +++ b/src/core/Akka.Streams/Implementation/ChannelSources.cs @@ -0,0 +1,155 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Streams.Stage; + +namespace Akka.Streams.Implementation +{ + sealed class ChannelSourceLogic : OutGraphStageLogic + { + private readonly Outlet _outlet; + private readonly ChannelReader _reader; + private readonly Action _onValueRead; + private readonly Action _onValueReadFailure; + private readonly Action _onReaderComplete; + private readonly Action> _onReadReady; + + public ChannelSourceLogic(SourceShape source, Outlet outlet, + ChannelReader reader) : base(source) + { + _outlet = outlet; + _reader = reader; + _onValueRead = GetAsyncCallback(OnValueRead); + _onValueReadFailure = + GetAsyncCallback(OnValueReadFailure); + _onReaderComplete = GetAsyncCallback(OnReaderComplete); + _onReadReady = ContinueAsyncRead; + _reader.Completion.ContinueWith(t => + { + if (t.IsFaulted) _onReaderComplete(t.Exception); + else if (t.IsCanceled) + _onReaderComplete(new TaskCanceledException(t)); + else _onReaderComplete(null); + }); + + SetHandler(_outlet, this); + } + + private void OnReaderComplete(Exception reason) + { + if (reason is null) + CompleteStage(); + else + FailStage(reason); + } + + private void OnValueReadFailure(Exception reason) => FailStage(reason); + + private void OnValueRead(bool dataAvailable) + { + if (dataAvailable && _reader.TryRead(out var element)) + Push(_outlet, element); + else + CompleteStage(); + } + + public override void OnPull() + { + if (_reader.TryRead(out var element)) + { + Push(_outlet, element); + } + else + { + var continuation = _reader.WaitToReadAsync(); + if (continuation.IsCompletedSuccessfully) + { + var dataAvailable = continuation.GetAwaiter().GetResult(); + if (dataAvailable && _reader.TryRead(out element)) + Push(_outlet, element); + else + CompleteStage(); + } + else + continuation.AsTask().ContinueWith(_onReadReady); + } + } + + private void ContinueAsyncRead(Task t) + { + if (t.IsFaulted) + _onValueReadFailure(t.Exception); + else if (t.IsCanceled) + _onValueReadFailure(new TaskCanceledException(t)); + else + _onValueRead(t.Result); + } + } + + internal sealed class ChannelReaderWithMaterializedWriterSource : + GraphStageWithMaterializedValue, ChannelWriter> + { + private readonly int _bufferSize; + private readonly bool _singleWriter; + private readonly BoundedChannelFullMode _fullMode; + + public ChannelReaderWithMaterializedWriterSource(int bufferSize, + bool singleWriter = false, + BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait) + { + _bufferSize = bufferSize; + _singleWriter = singleWriter; + _fullMode = fullMode; + Outlet = new Outlet("channelReader.out"); + Shape = new SourceShape(Outlet); + } + + public Outlet Outlet { get; } + public override SourceShape Shape { get; } + + public override ILogicAndMaterializedValue> + CreateLogicAndMaterializedValue( + Attributes inheritedAttributes) + { + + var channel = Channel.CreateBounded( + new BoundedChannelOptions(_bufferSize) + { + SingleWriter = _singleWriter, + AllowSynchronousContinuations = false, + SingleReader = true, + FullMode = _fullMode + }); + return new LogicAndMaterializedValue>( + new ChannelSourceLogic(this.Shape, this.Outlet, + channel.Reader), channel); + } + } + + internal sealed class ChannelReaderSource : GraphStage> + { + + private readonly ChannelReader _reader; + + public ChannelReaderSource(ChannelReader reader) + { + _reader = reader; + Outlet = new Outlet("channelReader.out"); + Shape = new SourceShape(Outlet); + } + + public Outlet Outlet { get; } + public override SourceShape Shape { get; } + + protected override GraphStageLogic + CreateLogic(Attributes inheritedAttributes) => + new ChannelSourceLogic(Shape, Outlet, _reader); + } +} \ No newline at end of file