Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clean up of dispose and delays in Dispatcher and Dispatcher tests #1487

Merged
merged 1 commit into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions src/Paramore.Brighter.ServiceActivator/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private void Start()

_logger.Value.InfoFormat("Dispatcher: Dispatcher starting {0} performers", _tasks.Count);

while (!_tasks.IsEmpty)
while (_tasks.Any())
{
try
{
Expand All @@ -228,19 +228,25 @@ private void Start()
if (consumer != null)
{
_logger.Value.DebugFormat("Dispatcher: Removing a consumer with subscription name {0}", consumer.Name);
consumer.Dispose();

_consumers.TryRemove(consumer.Name, out consumer);
if (_consumers.TryRemove(consumer.Name, out consumer))
{
consumer.Dispose();
}
}

Task removedTask;
_tasks.TryRemove(stoppingConsumer.Id, out removedTask);
if (_tasks.TryRemove(stoppingConsumer.Id, out Task removedTask))
{
removedTask.Dispose();
}

stoppingConsumer.Dispose();
}
catch (AggregateException ae)
{
ae.Handle(ex =>
{
_logger.Value.ErrorFormat("Dispatcher: Error on consumer; consumer shut down");
_logger.Value.Error(ex, "Dispatcher: Error on consumer; consumer shut down");
return true;
});
}
Expand All @@ -251,6 +257,11 @@ private void Start()
}
},
TaskCreationOptions.LongRunning);

while (State != DispatcherState.DS_RUNNING)
{
Task.Delay(100).Wait();
}
}

private IEnumerable<Consumer> CreateConsumers(IEnumerable<Subscription> connections)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Collections.Generic;
using System.Collections.Generic;
using System.Threading.Tasks;
using FluentAssertions;
using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles;
Expand All @@ -9,7 +9,7 @@

namespace Paramore.Brighter.Core.Tests.MessageDispatch
{
public class MessageDispatcherRoutingAsyncTests
public class MessageDispatcherRoutingAsyncTests : IAsyncLifetime
{
private readonly Dispatcher _dispatcher;
private readonly FakeChannel _channel;
Expand Down Expand Up @@ -38,14 +38,12 @@ public MessageDispatcherRoutingAsyncTests()
_channel.Enqueue(message);

_dispatcher.State.Should().Be(DispatcherState.DS_AWAITING);
_dispatcher.Receive();
}

[Fact(Timeout = 50000)]
public void When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler_async()
[Fact]
public async Task When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler_async()
{
Task.Delay(5000).Wait();
_dispatcher.End().Wait();
await _dispatcher.End();

//should have consumed the messages in the channel
_channel.Length.Should().Be(0);
Expand All @@ -54,7 +52,23 @@ public void When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler_
//should have dispatched a request
_commandProcessor.Observe<MyEvent>().Should().NotBeNull();
//should have published async
_commandProcessor.Commands.Should().Contain(ctype => ctype == CommandType.PublishAsync);
_commandProcessor.Commands.Should().Contain(commandType => commandType == CommandType.PublishAsync);
}

public Task InitializeAsync()
{
_dispatcher.Receive();
var completionSource = new TaskCompletionSource<IDispatcher>();
completionSource.SetResult(_dispatcher);

return completionSource.Task;
}

public Task DisposeAsync()
{
var completionSource = new TaskCompletionSource<object>();
completionSource.SetResult(null);
return completionSource.Task;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Licence
#region Licence
/* The MIT License (MIT)
Copyright © 2014 Ian Cooper <[email protected]>

Expand Down Expand Up @@ -34,7 +34,7 @@ THE SOFTWARE. */

namespace Paramore.Brighter.Core.Tests.MessageDispatch
{
public class DispatcherAddNewConnectionTests : IDisposable
public class DispatcherAddNewConnectionTests : IAsyncLifetime
{
private readonly Dispatcher _dispatcher;
private readonly FakeChannel _channel;
Expand All @@ -59,20 +59,20 @@ public DispatcherAddNewConnectionTests()
_channel.Enqueue(message);

_dispatcher.State.Should().Be(DispatcherState.DS_AWAITING);
_dispatcher.Receive();
}

[Fact]
public void When_A_Message_Dispatcher_Has_A_New_Connection_Added_While_Running()
public async Task When_A_Message_Dispatcher_Has_A_New_Connection_Added_While_Running()
{
_dispatcher.Open(_newSubscription);

var @event = new MyEvent();
var message = new MyEventMessageMapper().MapToMessage(@event);
_channel.Enqueue(message);

Task.Delay(1000).Wait();

await Task.Delay(500);

//_should_have_consumed_the_messages_in_the_event_channel
_channel.Length.Should().Be(0);
//_should_have_a_running_state
Expand All @@ -83,10 +83,19 @@ public void When_A_Message_Dispatcher_Has_A_New_Connection_Added_While_Running()
_dispatcher.Connections.Should().HaveCount(2);
}

public void Dispose()

public Task InitializeAsync()
{
_dispatcher.Receive();
var completionSource = new TaskCompletionSource<IDispatcher>();
completionSource.SetResult(_dispatcher);

return completionSource.Task;
}

public Task DisposeAsync()
{
if (_dispatcher?.State == DispatcherState.DS_RUNNING)
_dispatcher.End().Wait();
return _dispatcher?.End();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Licence
#region Licence
/* The MIT License (MIT)
Copyright © 2014 Ian Cooper <[email protected]>

Expand Down Expand Up @@ -33,7 +33,7 @@ THE SOFTWARE. */

namespace Paramore.Brighter.Core.Tests.MessageDispatch
{
public class MessageDispatcherRoutingTests
public class MessageDispatcherRoutingTests : IAsyncLifetime
{
private readonly Dispatcher _dispatcher;
private readonly FakeChannel _channel;
Expand Down Expand Up @@ -61,14 +61,13 @@ public MessageDispatcherRoutingTests()
_channel.Enqueue(message);

_dispatcher.State.Should().Be(DispatcherState.DS_AWAITING);
_dispatcher.Receive();
}

[Fact]
public void When_A_Message_Dispatcher_Is_Asked_To_Connect_A_Channel_And_Handler()
public async Task When_A_Message_Dispatcher_Is_Asked_To_Connect_A_Channel_And_Handler()
{
Task.Delay(1000).Wait();
_dispatcher.End().Wait();

await _dispatcher.End();

//_should_have_consumed_the_messages_in_the_channel
_channel.Length.Should().Be(0);
Expand All @@ -79,5 +78,21 @@ public void When_A_Message_Dispatcher_Is_Asked_To_Connect_A_Channel_And_Handler(
//_should_have_published_async
_commandProcessor.Commands.Should().Contain(ctype => ctype == CommandType.Publish);
}

public Task InitializeAsync()
{
_dispatcher.Receive();
var completionSource = new TaskCompletionSource<IDispatcher>();
completionSource.SetResult(_dispatcher);

return completionSource.Task;
}

public Task DisposeAsync()
{
var completionSource = new TaskCompletionSource<object>();
completionSource.SetResult(null);
return completionSource.Task;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Licence
#region Licence
/* The MIT License (MIT)
Copyright © 2014 Ian Cooper <[email protected]>

Expand Down Expand Up @@ -33,14 +33,14 @@ THE SOFTWARE. */

namespace Paramore.Brighter.Core.Tests.MessageDispatch
{
public class MessageDispatcherResetConnection
public class MessageDispatcherResetConnection : IAsyncLifetime
{
private readonly Dispatcher _dispatcher;
private readonly FakeChannel _channel;
private readonly IAmACommandProcessor _commandProcessor;
private readonly Subscription _subscription;

public MessageDispatcherResetConnection()
public MessageDispatcherResetConnection()
{
_channel = new FakeChannel();
_commandProcessor = new SpyCommandProcessor();
Expand All @@ -57,25 +57,34 @@ public MessageDispatcherResetConnection()

_dispatcher.State.Should().Be(DispatcherState.DS_AWAITING);
_dispatcher.Receive();
Task.Delay(1000).Wait();
_dispatcher.Shut(_subscription);

}

[Fact(Skip = "Breaks test runner on Rider")]
public void When_A_Message_Dispatcher_Restarts_A_Connection()
[Fact]
public async Task When_A_Message_Dispatcher_Restarts_A_Connection()
{
_dispatcher.Open(_subscription);

var @event = new MyEvent();
var message = new MyEventMessageMapper().MapToMessage(@event);
_channel.Enqueue(message);

_dispatcher.End().Wait();
await _dispatcher.End();

//_should_have_consumed_the_messages_in_the_event_channel
_channel.Length.Should().Be(0);
//_should_have_a_stopped_state
_dispatcher.State.Should().Be(DispatcherState.DS_STOPPED);
}

public Task InitializeAsync()
{
return Task.Delay(500).ContinueWith(task => _dispatcher.Shut(_subscription));
}

public Task DisposeAsync()
{
return _dispatcher?.End();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ THE SOFTWARE. */

namespace Paramore.Brighter.Core.Tests.MessageDispatch
{
public class DispatcherRestartConnectionTests : IDisposable
public class DispatcherRestartConnectionTests : IAsyncLifetime
{
private readonly Dispatcher _dispatcher;
private readonly FakeChannel _channel;
Expand Down Expand Up @@ -63,18 +63,19 @@ public DispatcherRestartConnectionTests()
Task.Delay(250).Wait();
_dispatcher.Shut("test");
_dispatcher.Shut("newTest");
Task.Delay(1000).Wait();
Task.Delay(500).Wait();
_dispatcher.Consumers.Should().HaveCount(0);
}

[Fact]
public void When_A_Message_Dispatcher_Restarts_A_Connection_After_All_Connections_Have_Stopped()
public async Task When_A_Message_Dispatcher_Restarts_A_Connection_After_All_Connections_Have_Stopped()
{
_dispatcher.Open("newTest");
var @event = new MyEvent();
var message = new MyEventMessageMapper().MapToMessage(@event);
_channel.Enqueue(message);
Task.Delay(1000).Wait();

await Task.Delay(500);

//_should_have_consumed_the_messages_in_the_event_channel
_channel.Length.Should().Be(0);
Expand All @@ -85,11 +86,17 @@ public void When_A_Message_Dispatcher_Restarts_A_Connection_After_All_Connection
//_should_have_two_connections
_dispatcher.Connections.Should().HaveCount(2);
}
public Task InitializeAsync()
{
var completionSource = new TaskCompletionSource<IDispatcher>();
completionSource.SetResult(_dispatcher);

return completionSource.Task;
}

public void Dispose()
public Task DisposeAsync()
{
if (_dispatcher?.State == DispatcherState.DS_RUNNING)
_dispatcher.End().Wait();
return _dispatcher.End();
}
}
}
Loading