Skip to content

Commit

Permalink
(#69) Adding 'IShutDown' and implement it to operations
Browse files Browse the repository at this point in the history
  • Loading branch information
par.dahlman committed Mar 19, 2016
1 parent 495dd70 commit 44f9dc4
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 32 deletions.
9 changes: 9 additions & 0 deletions src/RawRabbit/Common/BaseBusClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,14 @@ public void Dispose()
(_requester as IDisposable)?.Dispose();
(_responder as IDisposable)?.Dispose();
}

public Task ShutdownAsync()
{
var subTask = (_subscriber as IShutdown)?.ShutdownAsync() ?? Task.FromResult(true);
var pubTask = (_publisher as IShutdown)?.ShutdownAsync() ?? Task.FromResult(true);
var reqTask = (_requester as IShutdown)?.ShutdownAsync() ?? Task.FromResult(true);
var respTask = (_responder as IShutdown)?.ShutdownAsync() ?? Task.FromResult(true);
return Task.WhenAll(subTask, pubTask, reqTask, respTask);
}
}
}
9 changes: 9 additions & 0 deletions src/RawRabbit/Common/IShutdown.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace RawRabbit.Common
{
public interface IShutdown
{
Task ShutdownAsync();
}
}
8 changes: 8 additions & 0 deletions src/RawRabbit/Configuration/RawRabbitConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ public class RawRabbitConfiguration
/// </summary>
public TimeSpan PublishConfirmTimeout { get; set; }

/// <summary>
/// The amount of time to wait for message handlers to process message before
/// shutting down.
/// </summary>
public TimeSpan GracefulShutdown { get; set; }

/// <summary>
/// Indicates if automatic recovery (reconnect, re-open channels, restore QoS) should be enabled
/// Defaults to true.
Expand Down Expand Up @@ -68,6 +74,7 @@ public RawRabbitConfiguration()
AutomaticRecovery = true;
TopologyRecovery = true;
RecoveryInterval = TimeSpan.FromSeconds(10);
GracefulShutdown = TimeSpan.FromSeconds(10);
Ssl = new SslOption {Enabled = false};
Hostnames = new List<string>();
Exchange = new GeneralExchangeConfiguration
Expand All @@ -84,6 +91,7 @@ public RawRabbitConfiguration()
};
}


public static RawRabbitConfiguration Local => new RawRabbitConfiguration
{
VirtualHost = "/",
Expand Down
19 changes: 1 addition & 18 deletions src/RawRabbit/Consumer/Eventing/EventingBasicConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,29 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RawRabbit.Common;
using RawRabbit.Configuration.Respond;
using RawRabbit.Consumer.Abstraction;
using RawRabbit.ErrorHandling;
using RawRabbit.Logging;

namespace RawRabbit.Consumer.Eventing
{
public class EventingBasicConsumerFactory : IConsumerFactory, IDisposable
public class EventingBasicConsumerFactory : IConsumerFactory
{
private readonly IErrorHandlingStrategy _strategy;
private readonly ConcurrentBag<string> _processedButNotAcked;
private readonly ConcurrentBag<IRawConsumer> _consumers;
private readonly ILogger _logger = LogManager.GetLogger<EventingBasicConsumerFactory>();

public EventingBasicConsumerFactory(IErrorHandlingStrategy strategy)
{
_strategy = strategy;
_processedButNotAcked = new ConcurrentBag<string>();
_consumers = new ConcurrentBag<IRawConsumer>();
}

public IRawConsumer CreateConsumer(IConsumerConfiguration cfg, IModel channel)
{
ConfigureQos(channel, cfg.PrefetchCount);
var rawConsumer = new EventingRawConsumer(channel);

_consumers.Add(rawConsumer);

rawConsumer.Received += (sender, args) =>
{
Expand Down Expand Up @@ -63,10 +58,6 @@ and the message was resent.
}
if (cfg.NoAck || rawConsumer.NackedDeliveryTags.Contains(args.DeliveryTag))
{
/*
The consumer has stated that 'ack'-ing is not required, so
now that the message is handled, the consumer is done.
*/
return;
}
Expand Down Expand Up @@ -110,13 +101,5 @@ protected void BasicAck(IModel channel, BasicDeliverEventArgs args)
_processedButNotAcked.Add(args.BasicProperties.MessageId);
}
}

public void Dispose()
{
foreach (var consumer in _consumers)
{
consumer?.Disconnect();
}
}
}
}
2 changes: 1 addition & 1 deletion src/RawRabbit/IBusClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace RawRabbit
{
public interface IBusClient<out TMessageContext> : IDisposable where TMessageContext : IMessageContext
public interface IBusClient<out TMessageContext> : IDisposable, IShutdown where TMessageContext : IMessageContext
{
ISubscription SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, Action<ISubscriptionConfigurationBuilder> configuration = null);

Expand Down
1 change: 0 additions & 1 deletion src/RawRabbit/Operations/Publisher.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Threading.Tasks;
using RawRabbit.Channel;
using RawRabbit.Channel.Abstraction;
using RawRabbit.Common;
using RawRabbit.Configuration.Publish;
Expand Down
23 changes: 19 additions & 4 deletions src/RawRabbit/Operations/Requester.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
using System;
using System.Collections.Concurrent;
using System.Runtime.Remoting.Contexts;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RawRabbit.Channel;
using RawRabbit.Channel.Abstraction;
using RawRabbit.Common;
using RawRabbit.Configuration.Request;
Expand All @@ -19,7 +17,7 @@

namespace RawRabbit.Operations
{
public class Requester<TMessageContext> : IDisposable, IRequester where TMessageContext : IMessageContext
public class Requester<TMessageContext> : IDisposable, IShutdown, IRequester where TMessageContext : IMessageContext
{
private readonly IChannelFactory _channelFactory;
private readonly IConsumerFactory _consumerFactory;
Expand Down Expand Up @@ -213,7 +211,24 @@ public bool IsCompletedAndOpen()
public void Dispose()
{
(_channelFactory as IDisposable)?.Dispose();
_currentConsumer?.Consumer?.Disconnect();
}

public async Task ShutdownAsync()
{
_logger.LogDebug("Shutting down Requester.");
foreach (var ccS in _consumerCompletionSources)
{
if (!ccS.Value.IsCompletedAndOpen())
{
continue;
}
ccS.Value.Consumer.Disconnect();
}
if (!_responseDictionary.IsEmpty)
{
await Task.Delay(_requestTimeout);
}
Dispose();
}
}
}
26 changes: 20 additions & 6 deletions src/RawRabbit/Operations/Responder.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RawRabbit.Channel;
using RawRabbit.Channel.Abstraction;
using RawRabbit.Common;
using RawRabbit.Configuration;
using RawRabbit.Configuration.Respond;
using RawRabbit.Consumer.Abstraction;
using RawRabbit.Context;
Expand All @@ -15,7 +15,7 @@

namespace RawRabbit.Operations
{
public class Responder<TMessageContext> : IDisposable, IResponder<TMessageContext> where TMessageContext : IMessageContext
public class Responder<TMessageContext> : IDisposable, IShutdown, IResponder<TMessageContext> where TMessageContext : IMessageContext
{
private readonly IChannelFactory _channelFactory;
private readonly ITopologyProvider _topologyProvider;
Expand All @@ -24,8 +24,9 @@ public class Responder<TMessageContext> : IDisposable, IResponder<TMessageContex
private readonly IMessageContextProvider<TMessageContext> _contextProvider;
private readonly IContextEnhancer _contextEnhancer;
private readonly IBasicPropertiesProvider _propertyProvider;
private readonly List<IRawConsumer> _consumers;
private readonly RawRabbitConfiguration _config;
private readonly ILogger _logger = LogManager.GetLogger<Responder<TMessageContext>>();
private readonly List<ISubscription> _subscriptions;

public Responder(
IChannelFactory channelFactory,
Expand All @@ -34,7 +35,8 @@ public Responder(
IMessageSerializer serializer,
IMessageContextProvider<TMessageContext> contextProvider,
IContextEnhancer contextEnhancer,
IBasicPropertiesProvider propertyProvider)
IBasicPropertiesProvider propertyProvider,
RawRabbitConfiguration config)
{
_channelFactory = channelFactory;
_topologyProvider = topologyProvider;
Expand All @@ -43,7 +45,8 @@ public Responder(
_contextProvider = contextProvider;
_contextEnhancer = contextEnhancer;
_propertyProvider = propertyProvider;
_consumers = new List<IRawConsumer>();
_config = config;
_subscriptions = new List<ISubscription>();
}

public ISubscription RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Task<TResponse>> onMessage, ResponderConfiguration cfg)
Expand All @@ -55,7 +58,6 @@ public ISubscription RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageCo
.ContinueWith(t =>
{
var consumer = _consumerFactory.CreateConsumer(cfg, channelTask.Result);
_consumers.Add(consumer);
consumer.OnMessageAsync = (o, args) =>
{
var body = _serializer.Deserialize<TRequest>(args.Body);
Expand Down Expand Up @@ -90,6 +92,7 @@ public ISubscription RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageCo
return new Subscription(consumer, cfg.Queue.QueueName);
});
Task.WaitAll(respondTask);
_subscriptions.Add(respondTask.Result);
return respondTask.Result;
}

Expand All @@ -98,5 +101,16 @@ public void Dispose()
_logger.LogDebug("Disposing Responder.");
(_consumerFactory as IDisposable)?.Dispose();
}

public async Task ShutdownAsync()
{
_logger.LogDebug("Shutting down Responder.");
foreach (var subscription in _subscriptions)
{
subscription.Dispose();
}
await Task.Delay(_config.GracefulShutdown);
Dispose();
}
}
}
23 changes: 21 additions & 2 deletions src/RawRabbit/Operations/Subscriber.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RawRabbit.Channel.Abstraction;
using RawRabbit.Common;
using RawRabbit.Configuration;
using RawRabbit.Configuration.Subscribe;
using RawRabbit.Consumer.Abstraction;
using RawRabbit.Context;
Expand All @@ -13,14 +15,16 @@

namespace RawRabbit.Operations
{
public class Subscriber<TMessageContext> : IDisposable, ISubscriber<TMessageContext> where TMessageContext : IMessageContext
public class Subscriber<TMessageContext> : IDisposable, IShutdown, ISubscriber<TMessageContext> where TMessageContext : IMessageContext
{
private readonly IChannelFactory _channelFactory;
private readonly IConsumerFactory _consumerFactory;
private readonly ITopologyProvider _topologyProvider;
private readonly IMessageSerializer _serializer;
private readonly IMessageContextProvider<TMessageContext> _contextProvider;
private readonly IContextEnhancer _contextEnhancer;
private readonly RawRabbitConfiguration _config;
private readonly List<ISubscription> _subscriptions;
private readonly ILogger _logger = LogManager.GetLogger<Subscriber<TMessageContext>>();

public Subscriber(
Expand All @@ -29,14 +33,17 @@ public Subscriber(
ITopologyProvider topologyProvider,
IMessageSerializer serializer,
IMessageContextProvider<TMessageContext> contextProvider,
IContextEnhancer contextEnhancer)
IContextEnhancer contextEnhancer,
RawRabbitConfiguration config)
{
_channelFactory = channelFactory;
_consumerFactory = consumerFactory;
_topologyProvider = topologyProvider;
_serializer = serializer;
_contextProvider = contextProvider;
_contextEnhancer = contextEnhancer;
_config = config;
_subscriptions = new List<ISubscription>();
}

public ISubscription SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, SubscriptionConfiguration config)
Expand All @@ -61,6 +68,7 @@ public ISubscription SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeM
return new Subscription(consumer, config.Queue.QueueName);
});
Task.WaitAll(subscriberTask);
_subscriptions.Add(subscriberTask.Result);
return subscriberTask.Result;
}

Expand All @@ -71,5 +79,16 @@ public void Dispose()
(_channelFactory as IDisposable)?.Dispose();
(_topologyProvider as IDisposable)?.Dispose();
}

public async Task ShutdownAsync()
{
_logger.LogDebug("Shutting down Subscriber.");
foreach (var subscription in _subscriptions)
{
subscription.Dispose();
}
await Task.Delay(_config.GracefulShutdown);
Dispose();
}
}
}

0 comments on commit 44f9dc4

Please sign in to comment.