Skip to content

Commit

Permalink
[15-74]FlowInitialDelaySpec (#6671)
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba authored Apr 25, 2023
1 parent b63df56 commit 342ea04
Showing 1 changed file with 23 additions and 23 deletions.
46 changes: 23 additions & 23 deletions src/core/Akka.Streams.Tests/Dsl/FlowInitialDelaySpec.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//-----------------------------------------------------------------------
// <copyright file="FlowInitialDelaySpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
Expand All @@ -30,48 +31,47 @@ public FlowInitialDelaySpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void Flow_InitialDelay_must_work_with_zero_delay()
public async Task Flow_InitialDelay_must_work_with_zero_delay()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var task = Source.From(Enumerable.Range(1, 10))
.InitialDelay(TimeSpan.Zero)
.Grouped(100)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
.InitialDelay(TimeSpan.Zero)
.Grouped(100)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
task.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
task.Result.Should().BeEquivalentTo(Enumerable.Range(1,10));
task.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void Flow_InitialDelay_must_delay_elements_by_the_specified_time_but_not_more()
public async Task Flow_InitialDelay_must_delay_elements_by_the_specified_time_but_not_more()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var task = Source.From(Enumerable.Range(1, 10))
.InitialDelay(TimeSpan.FromSeconds(2))
.InitialTimeout(TimeSpan.FromSeconds(1))
.RunWith(Sink.Ignore<int>(), Materializer);
.InitialDelay(TimeSpan.FromSeconds(2))
.InitialTimeout(TimeSpan.FromSeconds(1))
.RunWith(Sink.Ignore<int>(), Materializer);
task.Invoking(t => t.Wait(TimeSpan.FromSeconds(2))).Should().Throw<TimeoutException>();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void Flow_InitialDelay_must_properly_ignore_timer_while_backpressured()
public async Task Flow_InitialDelay_must_properly_ignore_timer_while_backpressured()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var probe = this.CreateSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 10))
.InitialDelay(TimeSpan.FromSeconds(0.5))
.RunWith(Sink.FromSubscriber(probe), Materializer);
probe.EnsureSubscription();
probe.ExpectNoMsg(TimeSpan.FromSeconds(1.5));
probe.Request(20);
probe.ExpectNextN(Enumerable.Range(1, 10));
await probe.EnsureSubscriptionAsync();
await probe.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await probe.RequestAsync(20);
await probe.ExpectNextNAsync(Enumerable.Range(1, 10));
probe.ExpectComplete();
await probe.ExpectCompleteAsync();
}, Materializer);
}
}
Expand Down

0 comments on commit 342ea04

Please sign in to comment.