Skip to content

Commit

Permalink
Bugfix RestartFlow issue; #5165 (#5181)
Browse files Browse the repository at this point in the history
* Extracted RestartFlow,Sink and Source to their own files

Helps me keep overview on what im working on.

* Ported latest changes from Scala master RestartFlow

* Made Delay attribute public and sort of namespaced it

* Use attributes and sourcefactory when building Source

* Use attributes when building Sink

* Fixing current tests

* More specs

* Api Approval

Co-authored-by: Aaron Stannard <[email protected]>
  • Loading branch information
Danthar and Aaronontheweb authored Aug 16, 2021
1 parent 2a8278b commit d37fdbb
Show file tree
Hide file tree
Showing 5 changed files with 548 additions and 333 deletions.
13 changes: 13 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,19 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, Akka.Streams.RestartSettings settings) { }
}
public class RestartWithBackoffFlow
{
public RestartWithBackoffFlow() { }
public class Delay : Akka.Streams.Attributes.IAttribute, System.IEquatable<Akka.Streams.Dsl.RestartWithBackoffFlow.Delay>
{
public readonly System.TimeSpan Duration;
public Delay(System.TimeSpan duration) { }
public bool Equals(Akka.Streams.Dsl.RestartWithBackoffFlow.Delay other) { }
public override bool Equals(object obj) { }
public override int GetHashCode() { }
public override string ToString() { }
}
}
public class static Retry
{
[Akka.Annotations.ApiMayChangeAttribute()]
Expand Down
119 changes: 92 additions & 27 deletions src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Event;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
Expand Down Expand Up @@ -132,11 +133,11 @@ public void A_restart_with_backoff_source_should_backoff_before_restart()
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" });
}, _restartSettings)
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" });
}, _restartSettings)
.RunWith(this.SinkProbe<string>(), Materializer);
probe.RequestNext("a");
probe.RequestNext("b");
Expand All @@ -161,11 +162,11 @@ public void A_restart_with_backoff_source_should_reset_exponential_backoff_back_
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" });
}, _restartSettings)
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" });
}, _restartSettings)
.RunWith(this.SinkProbe<string>(), Materializer);
probe.RequestNext("a");
probe.RequestNext("b");
Expand Down Expand Up @@ -316,11 +317,11 @@ public void A_restart_with_backoff_source_should_not_restart_the_source_when_max
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.Single("a");
}, _shortRestartSettings.WithMaxRestarts(1, _shortMinBackoff))
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.Single("a");
}, _shortRestartSettings.WithMaxRestarts(1, _shortMinBackoff))
.RunWith(this.SinkProbe<string>(), Materializer);
probe.RequestNext("a");
probe.RequestNext("a");
Expand All @@ -339,11 +340,11 @@ public void A_restart_with_backoff_source_should_reset_maxRestarts_when_source_r
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.Single("a");
}, _restartSettings.WithMaxRestarts(2, _minBackoff))
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.Single("a");
}, _restartSettings.WithMaxRestarts(2, _minBackoff))
.RunWith(this.SinkProbe<string>(), Materializer);
probe.RequestNext("a");
// There should be minBackoff delay
Expand Down Expand Up @@ -371,11 +372,11 @@ public void A_restart_with_backoff_source_should_allow_using_withMaxRestarts_ins
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" }).TakeWhile(c => c != "b");
}, _shortRestartSettings.WithMaxRestarts(2, TimeSpan.FromSeconds(1)))
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" }).TakeWhile(c => c != "b");
}, _shortRestartSettings.WithMaxRestarts(2, TimeSpan.FromSeconds(1)))
.RunWith(this.SinkProbe<string>(), Materializer);
probe.RequestNext("a");
probe.RequestNext("a");
Expand Down Expand Up @@ -765,6 +766,70 @@ public void A_restart_with_backoff_flow_should_run_normally()
}, Materializer);
}

[Fact]
public void Simplified_restart_flow_restarts_stages_test()
{
var created = new AtomicCounter(0);
var restarts = 4;
this.AssertAllStagesStopped(() =>
{
var flow = RestartFlowFactory<int, int, NotUsed>(() =>
{
created.IncrementAndGet();
return Flow.Create<int>()
.Select(i =>
{
if (i == 6)
{
throw new ArgumentException($"BOOM");
}
return i;
});
}, true,
//defaults to unlimited restarts
RestartSettings.Create(TimeSpan.FromMilliseconds(10), TimeSpan.FromSeconds(30), 0));
var (source, sink) = this.SourceProbe<int>().Select(x =>
{
Log.Debug($"Processing: {x}");
return x;
})
.Via(flow)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
source.SendNext(1);
source.SendNext(2);
source.SendNext(3);
source.SendNext(4);
source.SendNext(5);
for (int i = 0; i < restarts; i++)
{
source.SendNext(6);
}
source.SendNext(7);
source.SendNext(8);
source.SendNext(9);
source.SendNext(10);
sink.RequestNext(1);
sink.RequestNext(2);
sink.RequestNext(3);
sink.RequestNext(4);
sink.RequestNext(5);
//6 is never received since RestartFlow's do not retry
sink.RequestNext(7);
sink.RequestNext(8);
sink.RequestNext(9);
sink.RequestNext(10);
source.SendComplete();
}, Materializer);
created.Current.Should().Be(restarts + 1);
}

[Fact]
public void A_restart_with_backoff_flow_should_restart_on_cancellation()
{
Expand Down Expand Up @@ -1010,8 +1075,8 @@ public void A_restart_with_backoff_flow_should_restart_on_failure_when_using_onl
flowInProbe.RequestNext("c");
flowOutProbe.SendNext("d");
sink.RequestNext("d");

sink.Request(1);
created.Current.Should().Be(2);
}
}
}
}
Loading

0 comments on commit d37fdbb

Please sign in to comment.