Skip to content

Commit

Permalink
splitAfter should emit substreams in a lazy way (#3242)
Browse files Browse the repository at this point in the history
ported from akka/akka#21306
closes 3222
  • Loading branch information
marcpiechura authored and Aaronontheweb committed Jan 5, 2018
1 parent feecd5e commit d4962b1
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 21 deletions.
66 changes: 66 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowSplitAfterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
using Akka.TestKit;
Expand Down Expand Up @@ -261,5 +265,67 @@ public void SplitAfter_must_support_eager_cancellation_of_master_stream_on_cance
});
}, Materializer);
}

[Fact]
public void SplitAfter_should_work_when_last_element_is_split_by() => this.AssertAllStagesStopped(() =>
{
WithSubstreamsSupport(splitAfter: 3, elementCount: 3,
run: (masterSubscriber, masterSubscription, expectSubFlow) =>
{
var s1 = new StreamPuppet(expectSubFlow()
.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
s1.Request(3);
s1.ExpectNext(1);
s1.ExpectNext(2);
s1.ExpectNext(3);
s1.ExpectComplete();
masterSubscription.Request(1);
masterSubscriber.ExpectComplete();
});
}, Materializer);

[Fact]
public void SplitAfter_should_fail_stream_if_substream_not_materialized_in_time() => this.AssertAllStagesStopped(() =>
{
var timeout = new StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.CancelTermination, TimeSpan.FromMilliseconds(500));
var settings = ActorMaterializerSettings.Create(Sys).WithSubscriptionTimeoutSettings(timeout);
var tightTimeoutMaterializer = ActorMaterializer.Create(Sys, settings);
var testSource = Source.Single(1).ConcatMaterialized(Source.Maybe<int>(), Keep.Left).SplitAfter(_ => true);
Action a = () =>
{
testSource.Lift().Delay(TimeSpan.FromSeconds(1)).ConcatMany(x => x)
.RunWith(Sink.Ignore<int>(), tightTimeoutMaterializer)
.Wait(TimeSpan.FromSeconds(3));
};
a.ShouldThrow<SubscriptionTimeoutException>();
}, Materializer);

// Probably covert by SplitAfter_should_work_when_last_element_is_split_by
// but we received a specific example which we want to cover too,
// see https://github.com/akkadotnet/akka.net/issues/3222
[Fact]
public void SplitAfter_should_not_create_a_subflow_when_no_element_is_left()
{
var result = new ConcurrentQueue<ImmutableList<Tuple<bool, int>>>();
Source.From(new[]
{
Tuple.Create(true, 1), Tuple.Create(true, 2), Tuple.Create(false, 0),
Tuple.Create(true, 3), Tuple.Create(true, 4), Tuple.Create(false, 0),
Tuple.Create(true, 5), Tuple.Create(false, 0)
})
.SplitAfter(t => !t.Item1)
.Where(t => t.Item1)
.Aggregate(ImmutableList.Create<Tuple<bool, int>>(), (list, b) => list.Add(b))
.To(Sink.ForEach<ImmutableList<Tuple<bool, int>>>(list => result.Enqueue(list)))
.Run(Materializer);

Thread.Sleep(500);
result.All(l => l.Count > 0).Should().BeTrue();
}
}
}
63 changes: 42 additions & 21 deletions src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -716,29 +716,33 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler
private sealed class SubstreamHandler : InAndOutHandler
{
private readonly Logic _logic;
private readonly Inlet<T> _inlet;
private readonly Split.SplitDecision _decision;
private bool _willCompleteAfterInitialElement;

public SubstreamHandler(Logic logic)
{
_logic = logic;
_inlet = logic._stage._in;
_decision = _logic._stage._decision;
}

public bool HasInitialElement => FirstElement.HasValue;

public Option<T> FirstElement { private get; set; }

// Substreams are always assumed to be pushable position when we enter this method
private void CloseThis(SubstreamHandler handler, T currentElem)
{
var decision = _logic._stage._decision;
if (decision == Split.SplitDecision.SplitAfter)
if (_decision == Split.SplitDecision.SplitAfter)
{
if (!_logic._substreamCancelled)
{
_logic._substreamSource.Push(currentElem);
_logic._substreamSource.Complete();
}
}
else if (decision == Split.SplitDecision.SplitBefore)
else if (_decision == Split.SplitDecision.SplitBefore)
{
handler.FirstElement = currentElem;
if (!_logic._substreamCancelled)
Expand All @@ -748,6 +752,8 @@ private void CloseThis(SubstreamHandler handler, T currentElem)

public override void OnPull()
{
_logic.CancelTimer(SubscriptionTimer);

if (HasInitialElement)
{
_logic._substreamSource.Push(FirstElement.Value);
Expand All @@ -761,36 +767,43 @@ public override void OnPull()
}
}
else
_logic.Pull(_logic._stage._in);
_logic.Pull(_inlet);
}

public override void OnDownstreamFinish()
{
_logic._substreamCancelled = true;
if (_logic.IsClosed(_logic._stage._in) || _logic._stage._propagateSubstreamCancel)
if (_logic.IsClosed(_inlet) || _logic._stage._propagateSubstreamCancel)
_logic.CompleteStage();
else
// Start draining
if (!_logic.HasBeenPulled(_logic._stage._in))
_logic.Pull(_logic._stage._in);
// Start draining
if (!_logic.HasBeenPulled(_inlet))
_logic.Pull(_inlet);
}

public override void OnPush()
{
var elem = _logic.Grab(_logic._stage._in);
var elem = _logic.Grab(_inlet);
try
{
if (_logic._stage._predicate(elem))
{
var handler = new SubstreamHandler(_logic);
CloseThis(handler, elem);
_logic.HandOver(handler);
if(_decision == Split.SplitDecision.SplitBefore)
_logic.HandOver(handler);
else
{
_logic._substreamSource = null;
_logic.SetHandler(_inlet, _logic);
_logic.Pull(_inlet);
}
}
else
{
// Drain into the void
if (_logic._substreamCancelled)
_logic.Pull(_logic._stage._in);
_logic.Pull(_inlet);
else
_logic._substreamSource.Push(elem);
}
Expand Down Expand Up @@ -858,19 +871,19 @@ public void OnPush()
public void OnPull()
{
if (_substreamSource == null)
Pull(_stage._in);
else if (!_substreamWaitingToBePushed)
{
Push(_stage._out, Source.FromGraph(_substreamSource.Source));
ScheduleOnce(SubscriptionTimer, _timeout);
_substreamWaitingToBePushed = true;
//can be already pulled from substream in case split after
if (!HasBeenPulled(_stage._in))
Pull(_stage._in);
}
else if (_substreamWaitingToBePushed)
PushSubstreamSource();
}

public void OnDownstreamFinish()
{
// If the substream is already cancelled or it has not been handed out, we can go away
if (!_substreamWaitingToBePushed || _substreamCancelled)
if (_substreamSource == null || _substreamWaitingToBePushed || _substreamCancelled)
CompleteStage();
}

Expand All @@ -894,15 +907,23 @@ private void HandOver(SubstreamHandler handler)

if (IsAvailable(_stage._out))
{
Push(_stage._out, Source.FromGraph(_substreamSource.Source));
ScheduleOnce(SubscriptionTimer, _timeout);
_substreamWaitingToBePushed = true;
if(_stage._decision == Split.SplitDecision.SplitBefore || handler.HasInitialElement)
PushSubstreamSource();
else
Pull(_stage._in);
}
else
_substreamWaitingToBePushed = false;
_substreamWaitingToBePushed = true;
}
}

private void PushSubstreamSource()
{
Push(_stage._out, Source.FromGraph(_substreamSource.Source));
ScheduleOnce(SubscriptionTimer, _timeout);
_substreamWaitingToBePushed = false;
}

protected internal override void OnTimer(object timerKey) => _substreamSource.Timeout(_timeout);
}

Expand Down

0 comments on commit d4962b1

Please sign in to comment.