From 6a5635d162cbc30e1f17031488b09782097d476f Mon Sep 17 00:00:00 2001 From: "par.dahlman" Date: Sat, 5 Mar 2016 10:00:32 +0100 Subject: [PATCH] (#64): Migrated Subscriber to new infra And by doing so, no one uses the operation base anymore, so it's gone baby, gone. --- src/RawRabbit/Operations/OperatorBase.cs | 114 ----------------------- src/RawRabbit/Operations/Subscriber.cs | 55 +++++++---- 2 files changed, 36 insertions(+), 133 deletions(-) delete mode 100644 src/RawRabbit/Operations/OperatorBase.cs diff --git a/src/RawRabbit/Operations/OperatorBase.cs b/src/RawRabbit/Operations/OperatorBase.cs deleted file mode 100644 index a9075b98..00000000 --- a/src/RawRabbit/Operations/OperatorBase.cs +++ /dev/null @@ -1,114 +0,0 @@ -using System; -using RabbitMQ.Client; -using RawRabbit.Common; -using RawRabbit.Configuration.Exchange; -using RawRabbit.Configuration.Queue; -using RawRabbit.Logging; -using RawRabbit.Serialization; - -namespace RawRabbit.Operations -{ - public abstract class OperatorBase : IDisposable - { - protected readonly IChannelFactory ChannelFactory; - protected readonly IMessageSerializer Serializer; - private readonly ILogger _logger = LogManager.GetLogger(); - - protected OperatorBase(IChannelFactory channelFactory, IMessageSerializer serializer) - { - ChannelFactory = channelFactory; - Serializer = serializer; - } - - protected void DeclareExchange(ExchangeConfiguration config, IModel channel = null) - { - if (config.IsDefaultExchange() || config.AssumeInitialized) - { - return; - } - _logger.LogDebug($"Declaring exchange\n Name: {config.ExchangeName}\n Type: {config.ExchangeType}\n Durable: {config.Durable}\n Autodelete: {config.AutoDelete}"); - channel = channel ?? ChannelFactory.GetChannel(); - channel.ExchangeDeclare( - config.ExchangeName, - config.ExchangeType, - config.Durable, - config.AutoDelete, - config.Arguments - ); - } - - protected void DeclareQueue(QueueConfiguration queue, IModel channel = null) - { - if (queue.IsDirectReplyTo()) - { - /* - "Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode. There is no need to - declare this "queue" first, although the client can do so if it wants." - - https://www.rabbitmq.com/direct-reply-to.html - */ - return; - } - channel = channel ?? ChannelFactory.GetChannel(); - channel - .QueueDeclare( - queue: queue.FullQueueName, - durable: queue.Durable, - exclusive: queue.Exclusive, - autoDelete: queue.AutoDelete, - arguments: queue.Arguments - ); - _logger.LogDebug($"Declaring queue\n Name: {queue.FullQueueName}\n Exclusive: {queue.Exclusive}\n Durable: {queue.Durable}\n Autodelete: {queue.AutoDelete}"); - } - - protected void BindQueue(QueueConfiguration queue, ExchangeConfiguration exchange, string routingKey, IModel channel = null) - { - if (exchange.IsDefaultExchange()) - { - /* - "The default exchange is implicitly bound to every queue, - with a routing key equal to the queue name. It it not possible - to explicitly bind to, or unbind from the default exchange." - */ - return; - } - if (queue.IsDirectReplyTo()) - { - /* - "Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode. There is no need to - declare this "queue" first, although the client can do so if it wants." - - https://www.rabbitmq.com/direct-reply-to.html - */ - return; - } - _logger.LogDebug($"Binding queue {queue.QueueName} to exchange {exchange.ExchangeName} with routing key {routingKey}"); - channel = channel ?? ChannelFactory.GetChannel(); - channel - .QueueBind( - queue: queue.FullQueueName, - exchange: exchange.ExchangeName, - routingKey: routingKey - ); - } - - protected void ConfigureQos(IModel channel, ushort prefetchCount) - { - /* - QoS is per consumer on channel. If ChannelFactory is used, - we might get a new channel than the one the consumer is - we are configuring. - */ - _logger.LogDebug($"Setting QoS\n Prefetch Size: 0\n Prefetch Count: {prefetchCount}\n global: false"); - channel.BasicQos( - prefetchSize: 0, //TODO : what is this? - prefetchCount: prefetchCount, - global: false // https://www.rabbitmq.com/consumer-prefetch.html - ); - } - - public virtual void Dispose() - { - _logger.LogDebug($"Disposing operation."); - ChannelFactory?.Dispose(); - } - } -} diff --git a/src/RawRabbit/Operations/Subscriber.cs b/src/RawRabbit/Operations/Subscriber.cs index 33bfd563..2b65db2c 100644 --- a/src/RawRabbit/Operations/Subscriber.cs +++ b/src/RawRabbit/Operations/Subscriber.cs @@ -12,45 +12,62 @@ namespace RawRabbit.Operations { - public class Subscriber : OperatorBase, ISubscriber where TMessageContext : IMessageContext + public class Subscriber : ISubscriber where TMessageContext : IMessageContext { + private readonly IChannelFactory _channelFactory; private readonly IConsumerFactory _consumerFactory; + private readonly ITopologyProvider _topologyProvider; + private readonly IMessageSerializer _serializer; private readonly IMessageContextProvider _contextProvider; private readonly IContextEnhancer _contextEnhancer; private readonly ILogger _logger = LogManager.GetLogger>(); - public Subscriber(IChannelFactory channelFactory, IConsumerFactory consumerFactory, IMessageSerializer serializer, IMessageContextProvider contextProvider, IContextEnhancer contextEnhancer) - : base(channelFactory, serializer) + public Subscriber( + IChannelFactory channelFactory, + IConsumerFactory consumerFactory, + ITopologyProvider topologyProvider, + IMessageSerializer serializer, + IMessageContextProvider contextProvider, + IContextEnhancer contextEnhancer) { + _channelFactory = channelFactory; _consumerFactory = consumerFactory; + _topologyProvider = topologyProvider; + _serializer = serializer; _contextProvider = contextProvider; _contextEnhancer = contextEnhancer; } public void SubscribeAsync(Func subscribeMethod, SubscriptionConfiguration config) { - var channel = ChannelFactory.CreateChannel(); - DeclareQueue(config.Queue, channel); - DeclareExchange(config.Exchange, channel); - BindQueue(config.Queue, config.Exchange, config.RoutingKey, channel); - var consumer = _consumerFactory.CreateConsumer(config, channel); - consumer.OnMessageAsync = (o, args) => - { - var body = Serializer.Deserialize(args.Body); - var context = _contextProvider.ExtractContext(args.BasicProperties.Headers[PropertyHeaders.Context]); - _contextEnhancer.WireUpContextFeatures(context, consumer, args); - return subscribeMethod(body, context); - }; - consumer.Model.BasicConsume(config.Queue.FullQueueName, config.NoAck, consumer); + var topologyTask = _topologyProvider.BindQueueAsync(config.Queue, config.Exchange, config.RoutingKey); + var channelTask = _channelFactory.CreateChannelAsync(); - _logger.LogDebug($"Setting up a consumer on queue {config.Queue.QueueName} with NoAck set to {config.NoAck}."); + var subscriberTask = Task + .WhenAll(topologyTask, channelTask) + .ContinueWith(t => + { + var consumer = _consumerFactory.CreateConsumer(config, channelTask.Result); + consumer.OnMessageAsync = (o, args) => + { + var body = _serializer.Deserialize(args.Body); + var context = _contextProvider.ExtractContext(args.BasicProperties.Headers[PropertyHeaders.Context]); + _contextEnhancer.WireUpContextFeatures(context, consumer, args); + return subscribeMethod(body, context); + }; + consumer.Model.BasicConsume(config.Queue.FullQueueName, config.NoAck, consumer); + + _logger.LogDebug($"Setting up a consumer on channel '{channelTask.Result.ChannelNumber}' for queue {config.Queue.QueueName} with NoAck set to {config.NoAck}."); + }); + Task.WaitAll(subscriberTask); } - public override void Dispose() + public void Dispose() { _logger.LogDebug("Disposing Subscriber."); - base.Dispose(); (_consumerFactory as IDisposable)?.Dispose(); + (_channelFactory as IDisposable)?.Dispose(); + (_topologyProvider as IDisposable)?.Dispose(); } } }