diff --git a/src/RawRabbit/Common/BaseBusClient.cs b/src/RawRabbit/Common/BaseBusClient.cs index 77be2d9c..e70d3fbe 100644 --- a/src/RawRabbit/Common/BaseBusClient.cs +++ b/src/RawRabbit/Common/BaseBusClient.cs @@ -70,5 +70,14 @@ public void Dispose() (_requester as IDisposable)?.Dispose(); (_responder as IDisposable)?.Dispose(); } + + public Task ShutdownAsync() + { + var subTask = (_subscriber as IShutdown)?.ShutdownAsync() ?? Task.FromResult(true); + var pubTask = (_publisher as IShutdown)?.ShutdownAsync() ?? Task.FromResult(true); + var reqTask = (_requester as IShutdown)?.ShutdownAsync() ?? Task.FromResult(true); + var respTask = (_responder as IShutdown)?.ShutdownAsync() ?? Task.FromResult(true); + return Task.WhenAll(subTask, pubTask, reqTask, respTask); + } } } diff --git a/src/RawRabbit/Common/IShutdown.cs b/src/RawRabbit/Common/IShutdown.cs new file mode 100644 index 00000000..c2d94532 --- /dev/null +++ b/src/RawRabbit/Common/IShutdown.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace RawRabbit.Common +{ + public interface IShutdown + { + Task ShutdownAsync(); + } +} diff --git a/src/RawRabbit/Configuration/RawRabbitConfiguration.cs b/src/RawRabbit/Configuration/RawRabbitConfiguration.cs index 7f805e7e..0f91ea46 100644 --- a/src/RawRabbit/Configuration/RawRabbitConfiguration.cs +++ b/src/RawRabbit/Configuration/RawRabbitConfiguration.cs @@ -18,6 +18,12 @@ public class RawRabbitConfiguration /// public TimeSpan PublishConfirmTimeout { get; set; } + /// + /// The amount of time to wait for message handlers to process message before + /// shutting down. + /// + public TimeSpan GracefulShutdown { get; set; } + /// /// Indicates if automatic recovery (reconnect, re-open channels, restore QoS) should be enabled /// Defaults to true. @@ -68,6 +74,7 @@ public RawRabbitConfiguration() AutomaticRecovery = true; TopologyRecovery = true; RecoveryInterval = TimeSpan.FromSeconds(10); + GracefulShutdown = TimeSpan.FromSeconds(10); Ssl = new SslOption {Enabled = false}; Hostnames = new List(); Exchange = new GeneralExchangeConfiguration @@ -84,6 +91,7 @@ public RawRabbitConfiguration() }; } + public static RawRabbitConfiguration Local => new RawRabbitConfiguration { VirtualHost = "/", diff --git a/src/RawRabbit/Consumer/Eventing/EventingBasicConsumerFactory.cs b/src/RawRabbit/Consumer/Eventing/EventingBasicConsumerFactory.cs index a3d44f81..0e3676aa 100644 --- a/src/RawRabbit/Consumer/Eventing/EventingBasicConsumerFactory.cs +++ b/src/RawRabbit/Consumer/Eventing/EventingBasicConsumerFactory.cs @@ -5,7 +5,6 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; -using RawRabbit.Common; using RawRabbit.Configuration.Respond; using RawRabbit.Consumer.Abstraction; using RawRabbit.ErrorHandling; @@ -13,26 +12,22 @@ namespace RawRabbit.Consumer.Eventing { - public class EventingBasicConsumerFactory : IConsumerFactory, IDisposable + public class EventingBasicConsumerFactory : IConsumerFactory { private readonly IErrorHandlingStrategy _strategy; private readonly ConcurrentBag _processedButNotAcked; - private readonly ConcurrentBag _consumers; private readonly ILogger _logger = LogManager.GetLogger(); public EventingBasicConsumerFactory(IErrorHandlingStrategy strategy) { _strategy = strategy; _processedButNotAcked = new ConcurrentBag(); - _consumers = new ConcurrentBag(); } public IRawConsumer CreateConsumer(IConsumerConfiguration cfg, IModel channel) { ConfigureQos(channel, cfg.PrefetchCount); var rawConsumer = new EventingRawConsumer(channel); - - _consumers.Add(rawConsumer); rawConsumer.Received += (sender, args) => { @@ -63,10 +58,6 @@ and the message was resent. } if (cfg.NoAck || rawConsumer.NackedDeliveryTags.Contains(args.DeliveryTag)) { - /* - The consumer has stated that 'ack'-ing is not required, so - now that the message is handled, the consumer is done. - */ return; } @@ -110,13 +101,5 @@ protected void BasicAck(IModel channel, BasicDeliverEventArgs args) _processedButNotAcked.Add(args.BasicProperties.MessageId); } } - - public void Dispose() - { - foreach (var consumer in _consumers) - { - consumer?.Disconnect(); - } - } } } \ No newline at end of file diff --git a/src/RawRabbit/IBusClient.cs b/src/RawRabbit/IBusClient.cs index 69ace59b..5c295e36 100644 --- a/src/RawRabbit/IBusClient.cs +++ b/src/RawRabbit/IBusClient.cs @@ -9,7 +9,7 @@ namespace RawRabbit { - public interface IBusClient : IDisposable where TMessageContext : IMessageContext + public interface IBusClient : IDisposable, IShutdown where TMessageContext : IMessageContext { ISubscription SubscribeAsync(Func subscribeMethod, Action configuration = null); diff --git a/src/RawRabbit/Operations/Publisher.cs b/src/RawRabbit/Operations/Publisher.cs index 950d2fc3..426414c3 100644 --- a/src/RawRabbit/Operations/Publisher.cs +++ b/src/RawRabbit/Operations/Publisher.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using RawRabbit.Channel; using RawRabbit.Channel.Abstraction; using RawRabbit.Common; using RawRabbit.Configuration.Publish; diff --git a/src/RawRabbit/Operations/Requester.cs b/src/RawRabbit/Operations/Requester.cs index 5b962fd4..96893e3b 100644 --- a/src/RawRabbit/Operations/Requester.cs +++ b/src/RawRabbit/Operations/Requester.cs @@ -1,10 +1,8 @@ using System; using System.Collections.Concurrent; -using System.Runtime.Remoting.Contexts; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; -using RawRabbit.Channel; using RawRabbit.Channel.Abstraction; using RawRabbit.Common; using RawRabbit.Configuration.Request; @@ -19,7 +17,7 @@ namespace RawRabbit.Operations { - public class Requester : IDisposable, IRequester where TMessageContext : IMessageContext + public class Requester : IDisposable, IShutdown, IRequester where TMessageContext : IMessageContext { private readonly IChannelFactory _channelFactory; private readonly IConsumerFactory _consumerFactory; @@ -213,7 +211,24 @@ public bool IsCompletedAndOpen() public void Dispose() { (_channelFactory as IDisposable)?.Dispose(); - _currentConsumer?.Consumer?.Disconnect(); + } + + public async Task ShutdownAsync() + { + _logger.LogDebug("Shutting down Requester."); + foreach (var ccS in _consumerCompletionSources) + { + if (!ccS.Value.IsCompletedAndOpen()) + { + continue; + } + ccS.Value.Consumer.Disconnect(); + } + if (!_responseDictionary.IsEmpty) + { + await Task.Delay(_requestTimeout); + } + Dispose(); } } } diff --git a/src/RawRabbit/Operations/Responder.cs b/src/RawRabbit/Operations/Responder.cs index 8d27c6fd..b8f346a0 100644 --- a/src/RawRabbit/Operations/Responder.cs +++ b/src/RawRabbit/Operations/Responder.cs @@ -1,9 +1,9 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; -using RawRabbit.Channel; using RawRabbit.Channel.Abstraction; using RawRabbit.Common; +using RawRabbit.Configuration; using RawRabbit.Configuration.Respond; using RawRabbit.Consumer.Abstraction; using RawRabbit.Context; @@ -15,7 +15,7 @@ namespace RawRabbit.Operations { - public class Responder : IDisposable, IResponder where TMessageContext : IMessageContext + public class Responder : IDisposable, IShutdown, IResponder where TMessageContext : IMessageContext { private readonly IChannelFactory _channelFactory; private readonly ITopologyProvider _topologyProvider; @@ -24,8 +24,9 @@ public class Responder : IDisposable, IResponder _contextProvider; private readonly IContextEnhancer _contextEnhancer; private readonly IBasicPropertiesProvider _propertyProvider; - private readonly List _consumers; + private readonly RawRabbitConfiguration _config; private readonly ILogger _logger = LogManager.GetLogger>(); + private readonly List _subscriptions; public Responder( IChannelFactory channelFactory, @@ -34,7 +35,8 @@ public Responder( IMessageSerializer serializer, IMessageContextProvider contextProvider, IContextEnhancer contextEnhancer, - IBasicPropertiesProvider propertyProvider) + IBasicPropertiesProvider propertyProvider, + RawRabbitConfiguration config) { _channelFactory = channelFactory; _topologyProvider = topologyProvider; @@ -43,7 +45,8 @@ public Responder( _contextProvider = contextProvider; _contextEnhancer = contextEnhancer; _propertyProvider = propertyProvider; - _consumers = new List(); + _config = config; + _subscriptions = new List(); } public ISubscription RespondAsync(Func> onMessage, ResponderConfiguration cfg) @@ -55,7 +58,6 @@ public ISubscription RespondAsync(Func { var consumer = _consumerFactory.CreateConsumer(cfg, channelTask.Result); - _consumers.Add(consumer); consumer.OnMessageAsync = (o, args) => { var body = _serializer.Deserialize(args.Body); @@ -90,6 +92,7 @@ public ISubscription RespondAsync(Func : IDisposable, ISubscriber where TMessageContext : IMessageContext + public class Subscriber : IDisposable, IShutdown, ISubscriber where TMessageContext : IMessageContext { private readonly IChannelFactory _channelFactory; private readonly IConsumerFactory _consumerFactory; @@ -21,6 +23,8 @@ public class Subscriber : IDisposable, ISubscriber _contextProvider; private readonly IContextEnhancer _contextEnhancer; + private readonly RawRabbitConfiguration _config; + private readonly List _subscriptions; private readonly ILogger _logger = LogManager.GetLogger>(); public Subscriber( @@ -29,7 +33,8 @@ public Subscriber( ITopologyProvider topologyProvider, IMessageSerializer serializer, IMessageContextProvider contextProvider, - IContextEnhancer contextEnhancer) + IContextEnhancer contextEnhancer, + RawRabbitConfiguration config) { _channelFactory = channelFactory; _consumerFactory = consumerFactory; @@ -37,6 +42,8 @@ public Subscriber( _serializer = serializer; _contextProvider = contextProvider; _contextEnhancer = contextEnhancer; + _config = config; + _subscriptions = new List(); } public ISubscription SubscribeAsync(Func subscribeMethod, SubscriptionConfiguration config) @@ -61,6 +68,7 @@ public ISubscription SubscribeAsync(Func subscribeM return new Subscription(consumer, config.Queue.QueueName); }); Task.WaitAll(subscriberTask); + _subscriptions.Add(subscriberTask.Result); return subscriberTask.Result; } @@ -71,5 +79,16 @@ public void Dispose() (_channelFactory as IDisposable)?.Dispose(); (_topologyProvider as IDisposable)?.Dispose(); } + + public async Task ShutdownAsync() + { + _logger.LogDebug("Shutting down Subscriber."); + foreach (var subscription in _subscriptions) + { + subscription.Dispose(); + } + await Task.Delay(_config.GracefulShutdown); + Dispose(); + } } }