Skip to content

Commit

Permalink
Move Channel Stages from Alpakka to main project. (akkadotnet#6268)
Browse files Browse the repository at this point in the history
* Move Channel Stages from Alpakka to main project.

* added API approvals

Co-authored-by: Aaron Stannard <[email protected]>
  • Loading branch information
to11mtm and Aaronontheweb committed Dec 20, 2022
1 parent 791bd10 commit a84ab32
Show file tree
Hide file tree
Showing 8 changed files with 801 additions and 0 deletions.
239 changes: 239 additions & 0 deletions src/core/Akka.Streams.Tests/Implementation/ChannelSinkSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// //-----------------------------------------------------------------------
// // <copyright file="ChannelSinkSpec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Implementation
{
public class ChannelSinkSpec : Akka.TestKit.Xunit2.TestKit
{
private readonly ActorMaterializer _materializer;

public ChannelSinkSpec(ITestOutputHelper output) : base(output: output)
{
_materializer = Sys.Materializer();
}

#region from writer

[Fact]
public async Task ChannelSink_writer_when_isOwner_should_complete_channel_with_success_when_upstream_completes()
{
var probe = this.CreateManualPublisherProbe<int>();
var channel = Channel.CreateBounded<int>(10);

Source.FromPublisher(probe)
.To(ChannelSink.FromWriter(channel.Writer, true))
.Run(_materializer);

var subscription = probe.ExpectSubscription();
subscription.SendComplete();

channel.Reader.Completion.Wait(1.Seconds()).Should().BeTrue();
}

[Fact]
public async Task ChannelSink_writer_isOwner_should_complete_channel_with_failure_when_upstream_fails()
{
var exception = new Exception("BOOM!");

try
{
var probe = this.CreateManualPublisherProbe<int>();
var channel = Channel.CreateBounded<int>(10);

Source.FromPublisher(probe)
.To(ChannelSink.FromWriter(channel.Writer, true))
.Run(_materializer);

var subscription = probe.ExpectSubscription();
subscription.SendError(exception);

await channel.Reader.Completion;
}
catch (Exception e)
{
e.Should().Be(exception);
}
}

[Fact]
public async Task ChannelSink_writer_when_NOT_owner_should_leave_channel_active()
{
var probe = this.CreateManualPublisherProbe<int>();
var channel = Channel.CreateBounded<int>(10);

Source.FromPublisher(probe)
.To(ChannelSink.FromWriter(channel.Writer, false))
.Run(_materializer);

var subscription = probe.ExpectSubscription();
subscription.SendComplete();

channel.Reader.Completion.Wait(TimeSpan.FromSeconds(1)).Should().BeFalse();

var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await channel.Writer.WriteAsync(11, cancel.Token);
var value = await channel.Reader.ReadAsync(cancel.Token);
value.Should().Be(11);
}

[Fact]
public async Task ChannelSink_writer_NOT_owner_should_leave_channel_active()
{
var exception = new Exception("BOOM!");

var probe = this.CreateManualPublisherProbe<int>();
var channel = Channel.CreateBounded<int>(10);

Source.FromPublisher(probe)
.To(ChannelSink.FromWriter(channel.Writer, false))
.Run(_materializer);

var subscription = probe.ExpectSubscription();
subscription.SendError(exception);

channel.Reader.Completion.Wait(TimeSpan.FromSeconds(1)).Should().BeFalse();

var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await channel.Writer.WriteAsync(11, cancel.Token);
var value = await channel.Reader.ReadAsync(cancel.Token);
value.Should().Be(11);
}

[Fact]
public async Task ChannelSink_writer_should_propagate_elements_to_channel()
{
var probe = this.CreateManualPublisherProbe<int>();
var channel = Channel.CreateBounded<int>(10);

Source.FromPublisher(probe)
.To(ChannelSink.FromWriter(channel.Writer, true))
.Run(_materializer);

var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var subscription = probe.ExpectSubscription();
var n = subscription.ExpectRequest();

Sys.Log.Info("Requested for {0} elements", n);

var i = 1;

for (; i <= n; i++)
subscription.SendNext(i);

for (int j = 0; j < n; j++)
{
var value = await channel.Reader.ReadAsync(cancel.Token);
value.Should().Be(j + 1);
}

var m = subscription.ExpectRequest() + n;
Sys.Log.Info("Requested for {0} elements", m - n);

for (; i <= m; i++)
{
subscription.SendNext(i);
var value = await channel.Reader.ReadAsync(cancel.Token);
value.Should().Be(i);
}
}

#endregion

#region as reader

[Fact]
public async Task ChannelSink_reader_should_complete_channel_with_success_when_upstream_completes()
{
var probe = this.CreateManualPublisherProbe<int>();

var reader = Source.FromPublisher(probe)
.ToMaterialized(ChannelSink.AsReader<int>(10), Keep.Right)
.Run(_materializer);

var subscription = probe.ExpectSubscription();
subscription.SendComplete();

reader.Completion.Wait(1.Seconds()).Should().BeTrue();
}

[Fact]
public async Task ChannelSink_reader_should_complete_channel_with_failure_when_upstream_fails()
{
var exception = new Exception("BOOM!");

try
{
var probe = this.CreateManualPublisherProbe<int>();

var reader = Source.FromPublisher(probe)
.ToMaterialized(ChannelSink.AsReader<int>(10), Keep.Right)
.Run(_materializer);

var subscription = probe.ExpectSubscription();
subscription.SendError(exception);

await reader.Completion;
}
catch (Exception e)
{
e.Should().Be(exception);
}
}

[Fact]
public async Task ChannelSink_reader_should_propagate_elements_to_channel()
{
var probe = this.CreateManualPublisherProbe<int>();

var reader = Source.FromPublisher(probe)
.ToMaterialized(ChannelSink.AsReader<int>(10), Keep.Right)
.Run(_materializer);

var cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var subscription = probe.ExpectSubscription();
var n = subscription.ExpectRequest();

Sys.Log.Info("Requested for {0} elements", n);

var i = 1;

for (; i <= n; i++)
subscription.SendNext(i);

for (int j = 0; j < n; j++)
{
Sys.Log.Info("Request: {0}",j);
var value = await reader.ReadAsync(cancel.Token);
Sys.Log.Info("Received: {0}",value);
value.Should().Be(j + 1);
}

var m = subscription.ExpectRequest() + n;
Sys.Log.Info("Requested for {0} elements", m - n);

for (; i <= m; i++)
{
subscription.SendNext(i);
var value = await reader.ReadAsync(cancel.Token);
value.Should().Be(i);
}
}

#endregion
}
}
96 changes: 96 additions & 0 deletions src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// //-----------------------------------------------------------------------
// // <copyright file="ChannelSourceSpec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Implementation
{
public class ChannelSourceSpec : Akka.TestKit.Xunit2.TestKit
{
private readonly ActorMaterializer _materializer;

public ChannelSourceSpec(ITestOutputHelper output) : base(output: output)
{
_materializer = Sys.Materializer();
}

[Fact]
public void ChannelSource_must_complete_after_channel_completes()
{
var channel = Channel.CreateUnbounded<int>();
var probe = this.CreateManualSubscriberProbe<int>();

ChannelSource.FromReader<int>(channel)
.To(Sink.FromSubscriber(probe))
.Run(_materializer);

var subscription = probe.ExpectSubscription();
subscription.Request(2);

channel.Writer.Complete();

probe.ExpectComplete();
}


[Fact]
public void ChannelSource_must_complete_after_channel_fails()
{
var channel = Channel.CreateUnbounded<int>();
var probe = this.CreateManualSubscriberProbe<int>();
var failure = new Exception("BOOM!");

ChannelSource.FromReader<int>(channel)
.To(Sink.FromSubscriber(probe))
.Run(_materializer);

var subscription = probe.ExpectSubscription();
subscription.Request(2);

channel.Writer.Complete(failure);

probe.ExpectError().InnerException.Should().Be(failure);
}

[Fact]
public async Task ChannelSource_must_read_incoming_events()
{
var tcs = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var channel = Channel.CreateBounded<int>(3);
await channel.Writer.WriteAsync(1, tcs.Token);
await channel.Writer.WriteAsync(2, tcs.Token);
await channel.Writer.WriteAsync(3, tcs.Token);

var probe = this.CreateManualSubscriberProbe<int>();

ChannelSource.FromReader<int>(channel)
.To(Sink.FromSubscriber(probe))
.Run(_materializer);

var subscription = probe.ExpectSubscription();
subscription.Request(5);

probe.ExpectNext(1);
probe.ExpectNext(2);

await channel.Writer.WriteAsync(4, tcs.Token);
await channel.Writer.WriteAsync(5, tcs.Token);

probe.ExpectNext(3);
probe.ExpectNext(4);
probe.ExpectNext(5);
}
}
}
51 changes: 51 additions & 0 deletions src/core/Akka.Streams/Dsl/ChannelSink.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// //-----------------------------------------------------------------------
// // <copyright file="ChannelSink.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Akka.Streams.Implementation;

namespace Akka.Streams.Dsl
{
public static class ChannelSink
{
/// <summary>
/// Creates a Sink, that will emit incoming events directly into provider <see cref="ChannelWriter{T}"/>.
/// It will handle problems such as backpressure and <paramref name="writer"/>. When <paramref name="isOwner"/>
/// is set to <c>true</c>, it will also take responsibility to complete given <paramref name="writer"/>.
/// </summary>
/// <typeparam name="T">Type of events passed to <paramref name="writer"/>.</typeparam>
/// <param name="writer">A <see cref="ChannelWriter{T}"/> to pass events emitted from materialized graph to.</param>
/// <param name="isOwner">Determines materialized graph should be responsible for completing given <paramref name="writer"/>.</param>
/// <returns></returns>
public static Sink<T, NotUsed> FromWriter<T>(ChannelWriter<T> writer, bool isOwner)
{
if (writer is null)
ThrowArgumentNullException("writer");

return Sink.FromGraph(new ChannelWriterSink<T>(writer, isOwner));
}

/// <summary>
/// Creates a sink that upon materialization, returns a <see cref="ChannelReader{T}"/> connected with
/// this materialized graph. It can then be used to consume events incoming from the graph. It will
/// also be completed once upstream completes.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="bufferSize"></param>
/// <param name="singleReader"></param>
/// <param name="fullMode"></param>
/// <returns></returns>
public static Sink<T, ChannelReader<T>> AsReader<T>(int bufferSize, bool singleReader = false, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait) =>
Sink.FromGraph(new ChannelReaderSink<T>(bufferSize, singleReader));

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowArgumentNullException(string name) =>
throw new ArgumentNullException(name, "ChannelSink.FromWriter received null instead of ChannelWriter`1.");
}
}
Loading

0 comments on commit a84ab32

Please sign in to comment.