diff --git a/src/core/Akka.Streams.TestKit/ChainSetup.cs b/src/core/Akka.Streams.TestKit/ChainSetup.cs index ec5b931eb0d..d83bb797228 100644 --- a/src/core/Akka.Streams.TestKit/ChainSetup.cs +++ b/src/core/Akka.Streams.TestKit/ChainSetup.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Dsl; using Akka.TestKit; @@ -15,6 +16,9 @@ namespace Akka.Streams.TestKit { public class ChainSetup { + private readonly Func, ActorMaterializer, IPublisher> _toPublisher; + private readonly Func, Flow> _stream; + private readonly ActorMaterializer _materializer; protected readonly TestKitBase System; public ChainSetup( @@ -24,18 +28,11 @@ public ChainSetup( Func, ActorMaterializer, IPublisher> toPublisher, TestKitBase system) { - Settings = settings; System = system; - - Upstream = system.CreateManualPublisherProbe(); - Downstream = system.CreateSubscriberProbe(); - - var s = Source.FromPublisher(Upstream).Via(stream(Flow.Identity().Select(x => x).Named("buh"))); - Publisher = toPublisher(s, materializer); - UpstreamSubscription = Upstream.ExpectSubscription(); - Publisher.Subscribe(Downstream); - DownstreamSubscription = Downstream.ExpectSubscription(); + _toPublisher = toPublisher; + _stream = stream; + _materializer = materializer; } public ChainSetup( @@ -57,11 +54,87 @@ public ChainSetup( { } + [Obsolete("Will be removed after async_testkit conversion is complete. Use InitializeAsync instead")] + public ChainSetup Initialize() + => InitializeAsync() + .ConfigureAwait(false).GetAwaiter().GetResult(); + + public virtual async Task> InitializeAsync() + { + _upstream = System.CreateManualPublisherProbe(); + _downstream = System.CreateSubscriberProbe(); + + var s = Source.FromPublisher(_upstream).Via(_stream(Flow.Identity().Select(x => x).Named("buh"))); + _publisher = _toPublisher(s, _materializer); + _upstreamSubscription = await _upstream.ExpectSubscriptionAsync(); + _publisher.Subscribe(_downstream); + _downstreamSubscription = await _downstream.ExpectSubscriptionAsync(); + + Initialized = true; + + return this; + } + + private TestPublisher.ManualProbe _upstream; + private TestSubscriber.ManualProbe _downstream; + private IPublisher _publisher; + private StreamTestKit.PublisherProbeSubscription _upstreamSubscription; + private ISubscription _downstreamSubscription; + + public bool Initialized { get; private set; } + public ActorMaterializerSettings Settings { get; } - public TestPublisher.ManualProbe Upstream { get; } - public TestSubscriber.ManualProbe Downstream { get; } - public IPublisher Publisher { get; } - public StreamTestKit.PublisherProbeSubscription UpstreamSubscription { get; } - public ISubscription DownstreamSubscription { get; } + + public TestPublisher.ManualProbe Upstream + { + get + { + EnsureInitialized(); + return _upstream; + } + } + + public TestSubscriber.ManualProbe Downstream + { + get + { + EnsureInitialized(); + return _downstream; + } + } + + public IPublisher Publisher + { + get + { + EnsureInitialized(); + return _publisher; + } + } + + public StreamTestKit.PublisherProbeSubscription UpstreamSubscription + { + get + { + EnsureInitialized(); + return _upstreamSubscription; + } + } + + public ISubscription DownstreamSubscription + { + get + { + EnsureInitialized(); + return _downstreamSubscription; + } + } + + protected virtual void EnsureInitialized() + { + if (!Initialized) + throw new InvalidOperationException( + $"ChainSetup has not been initialized. Please make sure to call {nameof(InitializeAsync)} first."); + } } } diff --git a/src/core/Akka.Streams.TestKit/ScriptedTest.cs b/src/core/Akka.Streams.TestKit/ScriptedTest.cs index f402f9c228e..9919f6ff0a2 100644 --- a/src/core/Akka.Streams.TestKit/ScriptedTest.cs +++ b/src/core/Akka.Streams.TestKit/ScriptedTest.cs @@ -15,6 +15,7 @@ using Akka.Streams.Dsl; using Akka.TestKit; using Akka.Util; +using Akka.Util.Internal; using Reactive.Streams; using Xunit.Abstractions; @@ -306,7 +307,8 @@ protected void RunScript(Script script, ActorMa Func, Flow> op, int maximumOverrun = 3, int maximumRequest = 3, int maximumBuffer = 3) { - new ScriptRunner(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer, this).Run(); + new ScriptRunner(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer, this) + .Initialize().AsInstanceOf>().Run(); } protected static IPublisher ToPublisher(Source source, IMaterializer materializer) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSpec.cs index e95a1ff3d2b..d09d420afc9 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSpec.cs @@ -63,10 +63,12 @@ public void A_flow_must_request_initial_elements_from_upstream(string name, int if (name.Equals("identity")) setup = new ChainSetup(Identity, Settings.WithInputBuffer(n, n), - (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this); + (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this) + .Initialize(); else setup = new ChainSetup(Identity2, Settings.WithInputBuffer(n, n), - (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this); + (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this) + .Initialize(); setup.Upstream.ExpectRequest(setup.UpstreamSubscription, setup.Settings.MaxInputBufferSize); } @@ -75,7 +77,8 @@ public void A_flow_must_request_initial_elements_from_upstream(string name, int public void A_Flow_must_request_more_elements_from_upstream_when_downstream_requests_more_elements() { var setup = new ChainSetup(Identity, Settings, - (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this); + (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this) + .Initialize(); setup.Upstream.ExpectRequest(setup.UpstreamSubscription, Settings.MaxInputBufferSize); setup.DownstreamSubscription.Request(1); setup.Upstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); @@ -96,7 +99,8 @@ public void A_Flow_must_request_more_elements_from_upstream_when_downstream_requ public void A_Flow_must_deliver_events_when_publisher_sends_elements_and_then_completes() { var setup = new ChainSetup(Identity, Settings, - (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this); + (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this) + .Initialize(); setup.DownstreamSubscription.Request(1); setup.UpstreamSubscription.SendNext("test"); setup.UpstreamSubscription.SendComplete(); @@ -108,7 +112,8 @@ public void A_Flow_must_deliver_events_when_publisher_sends_elements_and_then_co public void A_Flow_must_deliver_complete_signal_when_publisher_immediately_completes() { var setup = new ChainSetup(Identity, Settings, - (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this); + (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this) + .Initialize(); setup.UpstreamSubscription.SendComplete(); setup.Downstream.ExpectComplete(); } @@ -117,7 +122,8 @@ public void A_Flow_must_deliver_complete_signal_when_publisher_immediately_compl public void A_Flow_must_deliver_error_signal_when_publisher_immediately_fails() { var setup = new ChainSetup(Identity, Settings, - (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this); + (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this) + .Initialize(); var weirdError = new Exception("weird test exception"); setup.UpstreamSubscription.SendError(weirdError); setup.Downstream.ExpectError().Should().Be(weirdError); @@ -127,7 +133,8 @@ public void A_Flow_must_deliver_error_signal_when_publisher_immediately_fails() public void A_Flow_must_cancel_upstream_when_single_subscriber_cancels_subscription_while_receiving_data() { var setup = new ChainSetup(Identity, Settings.WithInputBuffer(1, 1), - (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this); + (settings, factory) => ActorMaterializer.Create(factory, settings), ToPublisher, this) + .Initialize(); setup.DownstreamSubscription.Request(5); setup.UpstreamSubscription.ExpectRequest(1); setup.UpstreamSubscription.SendNext("test"); @@ -350,7 +357,8 @@ public void A_Flow_with_multiple_subscribers_FanOutBox_must_adapt_speed_to_the_c { var setup = new ChainSetup(Identity, Settings.WithInputBuffer(1, 1), (settings, factory) => ActorMaterializer.Create(factory, settings), - (source, materializer) => ToFanoutPublisher(source, materializer, 1), this); + (source, materializer) => ToFanoutPublisher(source, materializer, 1), this) + .Initialize(); var downstream2 = this.CreateManualSubscriberProbe(); setup.Publisher.Subscribe(downstream2); var downstream2Subscription = downstream2.ExpectSubscription(); @@ -379,7 +387,8 @@ public void A_Flow_with_multiple_subscribers_FanOutBox_must_support_slow_subscri { var setup = new ChainSetup(Identity, Settings.WithInputBuffer(1, 1), (settings, factory) => ActorMaterializer.Create(factory, settings), - (source, materializer) => ToFanoutPublisher(source, materializer, 2), this); + (source, materializer) => ToFanoutPublisher(source, materializer, 2), this) + .Initialize(); var downstream2 = this.CreateManualSubscriberProbe(); setup.Publisher.Subscribe(downstream2); var downstream2Subscription = downstream2.ExpectSubscription(); @@ -422,7 +431,8 @@ public void A_Flow_with_multiple_subscribers_FanOutBox_must_support_incoming_sub { var setup = new ChainSetup(Identity, Settings.WithInputBuffer(1, 1), (settings, factory) => ActorMaterializer.Create(factory, settings), - (source, materializer) => ToFanoutPublisher(source, materializer, 1), this); + (source, materializer) => ToFanoutPublisher(source, materializer, 1), this) + .Initialize(); setup.DownstreamSubscription.Request(5); setup.Upstream.ExpectRequest(setup.UpstreamSubscription, 1); @@ -463,7 +473,8 @@ public void A_Flow_with_multiple_subscribers_FanOutBox_must_be_unblocked_when_bl { var setup = new ChainSetup(Identity, Settings.WithInputBuffer(1, 1), (settings, factory) => ActorMaterializer.Create(factory, settings), - (source, materializer) => ToFanoutPublisher(source, materializer, 1), this); + (source, materializer) => ToFanoutPublisher(source, materializer, 1), this) + .Initialize(); var downstream2 = this.CreateManualSubscriberProbe(); setup.Publisher.Subscribe(downstream2); var downstream2Subscription = downstream2.ExpectSubscription(); @@ -503,7 +514,8 @@ public void A_Flow_with_multiple_subscribers_FanOutBox_must_call_future_subscrib { var setup = new ChainSetup(Identity, Settings.WithInputBuffer(1, 1), (settings, factory) => ActorMaterializer.Create(factory, settings), - (source, materializer) => ToFanoutPublisher(source, materializer, 1), this); + (source, materializer) => ToFanoutPublisher(source, materializer, 1), this) + .Initialize(); var downstream2 = this.CreateManualSubscriberProbe(); // don't link it just yet @@ -547,7 +559,8 @@ public void A_Flow_with_multiple_subscribers_FanOutBox_must_call_future_subscrib throw new TestException("test"); }), Settings.WithInputBuffer(1, 1), (settings, factory) => ActorMaterializer.Create(factory, settings), - (source, materializer) => ToFanoutPublisher(source, materializer, 1), this); + (source, materializer) => ToFanoutPublisher(source, materializer, 1), this) + .Initialize(); setup.DownstreamSubscription.Request(1); setup.UpstreamSubscription.ExpectRequest(1); @@ -567,7 +580,8 @@ public void A_Flow_with_multiple_subscribers_FanOutBox_must_call_future_subscrib { var setup = new ChainSetup(Identity, Settings.WithInputBuffer(1, 1), (settings, factory) => ActorMaterializer.Create(factory, settings), - (source, materializer) => ToFanoutPublisher(source, materializer, 16), this); + (source, materializer) => ToFanoutPublisher(source, materializer, 16), this) + .Initialize(); // make sure stream is initialized before canceling downstream Thread.Sleep(100); @@ -596,7 +610,8 @@ public void A_broken_Flow_must_cancel_upstream_and_call_onError_on_current_and_f { var setup = new ChainSetup(FaultyFlow, Settings.WithInputBuffer(1, 1), (settings, factory) => ActorMaterializer.Create(factory, settings), - (source, materializer) => ToFanoutPublisher(source, materializer, 16), this); + (source, materializer) => ToFanoutPublisher(source, materializer, 16), this) + .Initialize(); Action> checkError = sprobe => {