diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs index 534beadec1..d9703ff584 100644 --- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs +++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs @@ -215,7 +215,7 @@ private void Start() _logger.Value.InfoFormat("Dispatcher: Dispatcher starting {0} performers", _tasks.Count); - while (!_tasks.IsEmpty) + while (_tasks.Any()) { try { @@ -228,19 +228,25 @@ private void Start() if (consumer != null) { _logger.Value.DebugFormat("Dispatcher: Removing a consumer with subscription name {0}", consumer.Name); - consumer.Dispose(); - _consumers.TryRemove(consumer.Name, out consumer); + if (_consumers.TryRemove(consumer.Name, out consumer)) + { + consumer.Dispose(); + } } - Task removedTask; - _tasks.TryRemove(stoppingConsumer.Id, out removedTask); + if (_tasks.TryRemove(stoppingConsumer.Id, out Task removedTask)) + { + removedTask.Dispose(); + } + + stoppingConsumer.Dispose(); } catch (AggregateException ae) { ae.Handle(ex => { - _logger.Value.ErrorFormat("Dispatcher: Error on consumer; consumer shut down"); + _logger.Value.Error(ex, "Dispatcher: Error on consumer; consumer shut down"); return true; }); } @@ -251,6 +257,11 @@ private void Start() } }, TaskCreationOptions.LongRunning); + + while (State != DispatcherState.DS_RUNNING) + { + Task.Delay(100).Wait(); + } } private IEnumerable CreateConsumers(IEnumerable connections) diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_Is_asked_to_connect_a_channel_and_handler_async.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_Is_asked_to_connect_a_channel_and_handler_async.cs index 16c4e1969a..cc969459b2 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_Is_asked_to_connect_a_channel_and_handler_async.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_Is_asked_to_connect_a_channel_and_handler_async.cs @@ -1,4 +1,4 @@ -using System.Collections.Generic; +using System.Collections.Generic; using System.Threading.Tasks; using FluentAssertions; using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; @@ -9,7 +9,7 @@ namespace Paramore.Brighter.Core.Tests.MessageDispatch { - public class MessageDispatcherRoutingAsyncTests + public class MessageDispatcherRoutingAsyncTests : IAsyncLifetime { private readonly Dispatcher _dispatcher; private readonly FakeChannel _channel; @@ -38,14 +38,12 @@ public MessageDispatcherRoutingAsyncTests() _channel.Enqueue(message); _dispatcher.State.Should().Be(DispatcherState.DS_AWAITING); - _dispatcher.Receive(); } - [Fact(Timeout = 50000)] - public void When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler_async() + [Fact] + public async Task When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler_async() { - Task.Delay(5000).Wait(); - _dispatcher.End().Wait(); + await _dispatcher.End(); //should have consumed the messages in the channel _channel.Length.Should().Be(0); @@ -54,7 +52,23 @@ public void When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler_ //should have dispatched a request _commandProcessor.Observe().Should().NotBeNull(); //should have published async - _commandProcessor.Commands.Should().Contain(ctype => ctype == CommandType.PublishAsync); + _commandProcessor.Commands.Should().Contain(commandType => commandType == CommandType.PublishAsync); + } + + public Task InitializeAsync() + { + _dispatcher.Receive(); + var completionSource = new TaskCompletionSource(); + completionSource.SetResult(_dispatcher); + + return completionSource.Task; + } + + public Task DisposeAsync() + { + var completionSource = new TaskCompletionSource(); + completionSource.SetResult(null); + return completionSource.Task; } - } + } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_has_a_new_connection_added_while_running.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_has_a_new_connection_added_while_running.cs index 794402760e..0437a08ecb 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_has_a_new_connection_added_while_running.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_has_a_new_connection_added_while_running.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -34,7 +34,7 @@ THE SOFTWARE. */ namespace Paramore.Brighter.Core.Tests.MessageDispatch { - public class DispatcherAddNewConnectionTests : IDisposable + public class DispatcherAddNewConnectionTests : IAsyncLifetime { private readonly Dispatcher _dispatcher; private readonly FakeChannel _channel; @@ -59,11 +59,10 @@ public DispatcherAddNewConnectionTests() _channel.Enqueue(message); _dispatcher.State.Should().Be(DispatcherState.DS_AWAITING); - _dispatcher.Receive(); } [Fact] - public void When_A_Message_Dispatcher_Has_A_New_Connection_Added_While_Running() + public async Task When_A_Message_Dispatcher_Has_A_New_Connection_Added_While_Running() { _dispatcher.Open(_newSubscription); @@ -71,8 +70,9 @@ public void When_A_Message_Dispatcher_Has_A_New_Connection_Added_While_Running() var message = new MyEventMessageMapper().MapToMessage(@event); _channel.Enqueue(message); - Task.Delay(1000).Wait(); + await Task.Delay(500); + //_should_have_consumed_the_messages_in_the_event_channel _channel.Length.Should().Be(0); //_should_have_a_running_state @@ -83,10 +83,19 @@ public void When_A_Message_Dispatcher_Has_A_New_Connection_Added_While_Running() _dispatcher.Connections.Should().HaveCount(2); } - public void Dispose() + + public Task InitializeAsync() + { + _dispatcher.Receive(); + var completionSource = new TaskCompletionSource(); + completionSource.SetResult(_dispatcher); + + return completionSource.Task; + } + + public Task DisposeAsync() { - if (_dispatcher?.State == DispatcherState.DS_RUNNING) - _dispatcher.End().Wait(); + return _dispatcher?.End(); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler.cs index 0fc677c75d..4a2a280cc7 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -33,7 +33,7 @@ THE SOFTWARE. */ namespace Paramore.Brighter.Core.Tests.MessageDispatch { - public class MessageDispatcherRoutingTests + public class MessageDispatcherRoutingTests : IAsyncLifetime { private readonly Dispatcher _dispatcher; private readonly FakeChannel _channel; @@ -61,14 +61,13 @@ public MessageDispatcherRoutingTests() _channel.Enqueue(message); _dispatcher.State.Should().Be(DispatcherState.DS_AWAITING); - _dispatcher.Receive(); } [Fact] - public void When_A_Message_Dispatcher_Is_Asked_To_Connect_A_Channel_And_Handler() + public async Task When_A_Message_Dispatcher_Is_Asked_To_Connect_A_Channel_And_Handler() { - Task.Delay(1000).Wait(); - _dispatcher.End().Wait(); + + await _dispatcher.End(); //_should_have_consumed_the_messages_in_the_channel _channel.Length.Should().Be(0); @@ -79,5 +78,21 @@ public void When_A_Message_Dispatcher_Is_Asked_To_Connect_A_Channel_And_Handler( //_should_have_published_async _commandProcessor.Commands.Should().Contain(ctype => ctype == CommandType.Publish); } + + public Task InitializeAsync() + { + _dispatcher.Receive(); + var completionSource = new TaskCompletionSource(); + completionSource.SetResult(_dispatcher); + + return completionSource.Task; + } + + public Task DisposeAsync() + { + var completionSource = new TaskCompletionSource(); + completionSource.SetResult(null); + return completionSource.Task; + } } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_restarts_a_connection.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_restarts_a_connection.cs index 956672e31f..547aca6fdd 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_restarts_a_connection.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_restarts_a_connection.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -33,14 +33,14 @@ THE SOFTWARE. */ namespace Paramore.Brighter.Core.Tests.MessageDispatch { - public class MessageDispatcherResetConnection + public class MessageDispatcherResetConnection : IAsyncLifetime { private readonly Dispatcher _dispatcher; private readonly FakeChannel _channel; private readonly IAmACommandProcessor _commandProcessor; private readonly Subscription _subscription; - public MessageDispatcherResetConnection() + public MessageDispatcherResetConnection() { _channel = new FakeChannel(); _commandProcessor = new SpyCommandProcessor(); @@ -57,12 +57,11 @@ public MessageDispatcherResetConnection() _dispatcher.State.Should().Be(DispatcherState.DS_AWAITING); _dispatcher.Receive(); - Task.Delay(1000).Wait(); - _dispatcher.Shut(_subscription); + } - [Fact(Skip = "Breaks test runner on Rider")] - public void When_A_Message_Dispatcher_Restarts_A_Connection() + [Fact] + public async Task When_A_Message_Dispatcher_Restarts_A_Connection() { _dispatcher.Open(_subscription); @@ -70,12 +69,22 @@ public void When_A_Message_Dispatcher_Restarts_A_Connection() var message = new MyEventMessageMapper().MapToMessage(@event); _channel.Enqueue(message); - _dispatcher.End().Wait(); + await _dispatcher.End(); //_should_have_consumed_the_messages_in_the_event_channel _channel.Length.Should().Be(0); //_should_have_a_stopped_state _dispatcher.State.Should().Be(DispatcherState.DS_STOPPED); } + + public Task InitializeAsync() + { + return Task.Delay(500).ContinueWith(task => _dispatcher.Shut(_subscription)); + } + + public Task DisposeAsync() + { + return _dispatcher?.End(); + } } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_restarts_a_connection_after_all_connections_have_stopped.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_restarts_a_connection_after_all_connections_have_stopped.cs index 8089ce9a9c..3183768c7f 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_restarts_a_connection_after_all_connections_have_stopped.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_restarts_a_connection_after_all_connections_have_stopped.cs @@ -34,7 +34,7 @@ THE SOFTWARE. */ namespace Paramore.Brighter.Core.Tests.MessageDispatch { - public class DispatcherRestartConnectionTests : IDisposable + public class DispatcherRestartConnectionTests : IAsyncLifetime { private readonly Dispatcher _dispatcher; private readonly FakeChannel _channel; @@ -63,18 +63,19 @@ public DispatcherRestartConnectionTests() Task.Delay(250).Wait(); _dispatcher.Shut("test"); _dispatcher.Shut("newTest"); - Task.Delay(1000).Wait(); + Task.Delay(500).Wait(); _dispatcher.Consumers.Should().HaveCount(0); } [Fact] - public void When_A_Message_Dispatcher_Restarts_A_Connection_After_All_Connections_Have_Stopped() + public async Task When_A_Message_Dispatcher_Restarts_A_Connection_After_All_Connections_Have_Stopped() { _dispatcher.Open("newTest"); var @event = new MyEvent(); var message = new MyEventMessageMapper().MapToMessage(@event); _channel.Enqueue(message); - Task.Delay(1000).Wait(); + + await Task.Delay(500); //_should_have_consumed_the_messages_in_the_event_channel _channel.Length.Should().Be(0); @@ -85,11 +86,17 @@ public void When_A_Message_Dispatcher_Restarts_A_Connection_After_All_Connection //_should_have_two_connections _dispatcher.Connections.Should().HaveCount(2); } + public Task InitializeAsync() + { + var completionSource = new TaskCompletionSource(); + completionSource.SetResult(_dispatcher); + + return completionSource.Task; + } - public void Dispose() + public Task DisposeAsync() { - if (_dispatcher?.State == DispatcherState.DS_RUNNING) - _dispatcher.End().Wait(); + return _dispatcher.End(); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_shuts_a_connection.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_shuts_a_connection.cs index 45cbe5239b..4124441e4f 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_shuts_a_connection.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_shuts_a_connection.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -33,10 +33,10 @@ THE SOFTWARE. */ namespace Paramore.Brighter.Core.Tests.MessageDispatch { - public class MessageDispatcherShutConnectionTests + public class MessageDispatcherShutConnectionTests : IAsyncLifetime { - private static Dispatcher _dispatcher; - private static Subscription _subscription; + private readonly Dispatcher _dispatcher; + private readonly Subscription _subscription; public MessageDispatcherShutConnectionTests() { @@ -55,15 +55,14 @@ public MessageDispatcherShutConnectionTests() channel.Enqueue(message); _dispatcher.State.Should().Be(DispatcherState.DS_AWAITING); - _dispatcher.Receive(); } [Fact] - public void When_A_Message_Dispatcher_Shuts_A_Connection() + public async Task When_A_Message_Dispatcher_Shuts_A_Connection() { - Task.Delay(1000).Wait(); + _dispatcher.State.Should().Be(DispatcherState.DS_RUNNING); _dispatcher.Shut(_subscription); - _dispatcher.End().Wait(); + await _dispatcher.End(); //_should_have_consumed_the_messages_in_the_channel _dispatcher.Consumers.Should().NotContain(consumer => consumer.Name == _subscription.Name && consumer.State == ConsumerState.Open); @@ -72,5 +71,20 @@ public void When_A_Message_Dispatcher_Shuts_A_Connection() //_should_have_no_consumers _dispatcher.Consumers.Should().BeEmpty(); } + + + public Task InitializeAsync() + { + _dispatcher.Receive(); + var completionSource = new TaskCompletionSource(); + completionSource.SetResult(_dispatcher); + + return completionSource.Task; + } + + public Task DisposeAsync() + { + return _dispatcher.End(); + } } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_starts_different_types_of_performers.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_starts_different_types_of_performers.cs index 6675e1092c..73717aad49 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_starts_different_types_of_performers.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_starts_different_types_of_performers.cs @@ -22,7 +22,6 @@ THE SOFTWARE. */ #endregion -using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -42,7 +41,7 @@ public class MessageDispatcherMultipleConnectionTests private readonly Dispatcher _dispatcher; private readonly FakeChannel _eventChannel; private readonly FakeChannel _commandChannel; - private static int _numberOfConsumers; + private int _numberOfConsumers; public MessageDispatcherMultipleConnectionTests() { @@ -77,11 +76,12 @@ public MessageDispatcherMultipleConnectionTests() [Fact] - public void When_A_Message_Dispatcher_Starts_Different_Types_Of_Performers() + public async Task When_A_Message_Dispatcher_Starts_Different_Types_Of_Performers() { - Task.Delay(1000).Wait(); + _dispatcher.State.Should().Be(DispatcherState.DS_RUNNING); _numberOfConsumers = _dispatcher.Consumers.Count(); - _dispatcher.End().Wait(); + + await _dispatcher.End(); //_should_have_consumed_the_messages_in_the_event_channel _eventChannel.Length.Should().Be(0); @@ -95,6 +95,8 @@ public void When_A_Message_Dispatcher_Starts_Different_Types_Of_Performers() _numberOfConsumers.Should().Be(2); } + + } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_starts_multiple_performers.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_starts_multiple_performers.cs index 2148dcd29b..ef6602210a 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_starts_multiple_performers.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_starts_multiple_performers.cs @@ -49,7 +49,7 @@ public MessageDispatcherMultiplePerformerTests() var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((_) => new MyEventMessageMapper())); messageMapperRegistry.Register(); - var connection = new Subscription(new SubscriptionName("test"), noOfPerformers: 3, timeoutInMilliseconds: 1000, channelFactory: new InMemoryChannelFactory(_channel), channelName: new ChannelName("fakeChannel"), routingKey: new RoutingKey("fakekey")); + var connection = new Subscription(new SubscriptionName("test"), noOfPerformers: 3, timeoutInMilliseconds: 100, channelFactory: new InMemoryChannelFactory(_channel), channelName: new ChannelName("fakeChannel"), routingKey: new RoutingKey("fakekey")); _dispatcher = new Dispatcher(_commandProcessor, messageMapperRegistry, new List { connection }); var @event = new MyEvent(); @@ -65,10 +65,10 @@ public MessageDispatcherMultiplePerformerTests() [Fact] public void WhenAMessageDispatcherStartsMultiplePerformers() { + _dispatcher.State.Should().Be(DispatcherState.DS_RUNNING); //should_have_multiple_consumers _dispatcher.Consumers.Count().Should().Be(3); - Task.Delay(1000).Wait(); _dispatcher.End().Wait(); //_should_have_consumed_the_messages_in_the_channel @@ -76,5 +76,6 @@ public void WhenAMessageDispatcherStartsMultiplePerformers() //_should_have_a_stopped_state _dispatcher.State.Should().Be(DispatcherState.DS_STOPPED); } + } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_fails_to_be_mapped_to_a_request.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_fails_to_be_mapped_to_a_request.cs index a1cce64cd0..4e56f0251f 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_fails_to_be_mapped_to_a_request.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_fails_to_be_mapped_to_a_request.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; using FluentAssertions; using Paramore.Brighter.Core.Tests.MessageDispatch.TestDoubles; @@ -31,7 +31,6 @@ public MessagePumpFailingMessageTranslationTests() public void When_A_Message_Fails_To_Be_Mapped_To_A_Request () { var task = Task.Factory.StartNew(() => _messagePump.Run(), TaskCreationOptions.LongRunning); - Task.Delay(1000).Wait(); _channel.Stop(); diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_fails_to_be_mapped_to_a_request_and_the_unacceptable_message_limit_is_reached.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_fails_to_be_mapped_to_a_request_and_the_unacceptable_message_limit_is_reached.cs index 06b99aad7a..ef47176749 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_fails_to_be_mapped_to_a_request_and_the_unacceptable_message_limit_is_reached.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_fails_to_be_mapped_to_a_request_and_the_unacceptable_message_limit_is_reached.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -53,13 +53,10 @@ public MessagePumpUnacceptableMessageLimitTests() } [Fact] - public void When_A_Message_Fails_To_Be_Mapped_To_A_Request_And_The_Unacceptable_Message_Limit_Is_Reached() + public async Task When_A_Message_Fails_To_Be_Mapped_To_A_Request_And_The_Unacceptable_Message_Limit_Is_Reached() { - var task = Task.Factory.StartNew(() => _messagePump.Run(), TaskCreationOptions.LongRunning); - Task.Delay(1000).Wait(); - - Task.WaitAll(new[] { task }); - + await Task.Factory.StartNew(() => _messagePump.Run(), TaskCreationOptions.LongRunning); + //should_have_acknowledge_the_3_messages _channel.AcknowledgeCount.Should().Be(3); //should_dispose_the_input_channel diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_requeue_count_threshold_for_commands_has_been_reached.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_requeue_count_threshold_for_commands_has_been_reached.cs index 8aa3d42612..4743f2857f 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_requeue_count_threshold_for_commands_has_been_reached.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_requeue_count_threshold_for_commands_has_been_reached.cs @@ -57,15 +57,15 @@ public MessagePumpCommandRequeueCountThresholdTests() } [Fact] - public void When_A_Requeue_Count_Threshold_For_Commands_Has_Been_Reached() + public async Task When_A_Requeue_Count_Threshold_For_Commands_Has_Been_Reached() { var task = Task.Factory.StartNew(() => _messagePump.Run(), TaskCreationOptions.LongRunning); - Task.Delay(1000).Wait(); + await Task.Delay(500); var quitMessage = new Message(new MessageHeader(Guid.Empty, "", MessageType.MT_QUIT), new MessageBody("")); _channel.Enqueue(quitMessage); - Task.WaitAll(new[] { task }); + await task; //_should_send_the_message_via_the_command_processor _commandProcessor.Commands[0].Should().Be(CommandType.Send); diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_requeue_count_threshold_for_events_has_been_reached.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_requeue_count_threshold_for_events_has_been_reached.cs index c727fc911a..65d05c9f84 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_requeue_count_threshold_for_events_has_been_reached.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_requeue_count_threshold_for_events_has_been_reached.cs @@ -57,15 +57,15 @@ public MessagePumpEventRequeueCountThresholdTests() } [Fact] - public void When_A_Requeue_Count_Threshold_For_Events_Has_Been_Reached() + public async Task When_A_Requeue_Count_Threshold_For_Events_Has_Been_Reached() { var task = Task.Factory.StartNew(() => _messagePump.Run(), TaskCreationOptions.LongRunning); - Task.Delay(1000).Wait(); + await Task.Delay(500); var quitMessage = new Message(new MessageHeader(Guid.Empty, "", MessageType.MT_QUIT), new MessageBody("")); _channel.Enqueue(quitMessage); - Task.WaitAll(new[] { task }); + await task; //_should_publish_the_message_via_the_command_processor _commandProcessor.Commands[0].Should().Be(CommandType.Publish); diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_an_unacceptable_message_is_recieved.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_an_unacceptable_message_is_recieved.cs index 599587ed85..31a98c9fb2 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_an_unacceptable_message_is_recieved.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_an_unacceptable_message_is_recieved.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -55,12 +55,11 @@ public MessagePumpUnacceptableMessageTests() public async Task When_An_Unacceptable_Message_Is_Received() { var task = Task.Factory.StartNew(() => _messagePump.Run(), TaskCreationOptions.LongRunning); - Task.Delay(1000).Wait(); - var quitMessage = new Message(new MessageHeader(Guid.Empty, "", MessageType.MT_QUIT), new MessageBody("")); + var quitMessage = new Message(new MessageHeader(Guid.NewGuid(), "", MessageType.MT_QUIT), new MessageBody("")); _channel.Enqueue(quitMessage); - Task.WaitAll(new[] { task }); + await task; //should_acknowledge_the_message _channel.AcknowledgeHappened.Should().BeTrue(); diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_an_unacceptable_message_limit_is_reached.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_an_unacceptable_message_limit_is_reached.cs index 9809d0bd4d..b7b3bf197e 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_an_unacceptable_message_limit_is_reached.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_an_unacceptable_message_limit_is_reached.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -61,7 +61,6 @@ public MessagePumpUnacceptableMessageLimitBreachedTests() public void When_An_Unacceptable_Message_Limit_Is_Reached() { var task = Task.Factory.StartNew(() => _messagePump.Run(), TaskCreationOptions.LongRunning); - Task.Delay(1000).Wait(); Task.WaitAll(new[] { task });