Skip to content

Commit

Permalink
(#64): Migrated Subscriber to new infra
Browse files Browse the repository at this point in the history
And by doing so, no one uses the operation base anymore, so it's gone baby,
gone.
  • Loading branch information
par.dahlman committed Mar 5, 2016
1 parent 534b174 commit 6a5635d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 133 deletions.
114 changes: 0 additions & 114 deletions src/RawRabbit/Operations/OperatorBase.cs

This file was deleted.

55 changes: 36 additions & 19 deletions src/RawRabbit/Operations/Subscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,62 @@

namespace RawRabbit.Operations
{
public class Subscriber<TMessageContext> : OperatorBase, ISubscriber<TMessageContext> where TMessageContext : IMessageContext
public class Subscriber<TMessageContext> : ISubscriber<TMessageContext> where TMessageContext : IMessageContext
{
private readonly IChannelFactory _channelFactory;
private readonly IConsumerFactory _consumerFactory;
private readonly ITopologyProvider _topologyProvider;
private readonly IMessageSerializer _serializer;
private readonly IMessageContextProvider<TMessageContext> _contextProvider;
private readonly IContextEnhancer _contextEnhancer;
private readonly ILogger _logger = LogManager.GetLogger<Subscriber<TMessageContext>>();

public Subscriber(IChannelFactory channelFactory, IConsumerFactory consumerFactory, IMessageSerializer serializer, IMessageContextProvider<TMessageContext> contextProvider, IContextEnhancer contextEnhancer)
: base(channelFactory, serializer)
public Subscriber(
IChannelFactory channelFactory,
IConsumerFactory consumerFactory,
ITopologyProvider topologyProvider,
IMessageSerializer serializer,
IMessageContextProvider<TMessageContext> contextProvider,
IContextEnhancer contextEnhancer)
{
_channelFactory = channelFactory;
_consumerFactory = consumerFactory;
_topologyProvider = topologyProvider;
_serializer = serializer;
_contextProvider = contextProvider;
_contextEnhancer = contextEnhancer;
}

public void SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, SubscriptionConfiguration config)
{
var channel = ChannelFactory.CreateChannel();
DeclareQueue(config.Queue, channel);
DeclareExchange(config.Exchange, channel);
BindQueue(config.Queue, config.Exchange, config.RoutingKey, channel);
var consumer = _consumerFactory.CreateConsumer(config, channel);
consumer.OnMessageAsync = (o, args) =>
{
var body = Serializer.Deserialize<T>(args.Body);
var context = _contextProvider.ExtractContext(args.BasicProperties.Headers[PropertyHeaders.Context]);
_contextEnhancer.WireUpContextFeatures(context, consumer, args);
return subscribeMethod(body, context);
};
consumer.Model.BasicConsume(config.Queue.FullQueueName, config.NoAck, consumer);
var topologyTask = _topologyProvider.BindQueueAsync(config.Queue, config.Exchange, config.RoutingKey);
var channelTask = _channelFactory.CreateChannelAsync();

_logger.LogDebug($"Setting up a consumer on queue {config.Queue.QueueName} with NoAck set to {config.NoAck}.");
var subscriberTask = Task
.WhenAll(topologyTask, channelTask)
.ContinueWith(t =>
{
var consumer = _consumerFactory.CreateConsumer(config, channelTask.Result);
consumer.OnMessageAsync = (o, args) =>
{
var body = _serializer.Deserialize<T>(args.Body);
var context = _contextProvider.ExtractContext(args.BasicProperties.Headers[PropertyHeaders.Context]);
_contextEnhancer.WireUpContextFeatures(context, consumer, args);
return subscribeMethod(body, context);
};
consumer.Model.BasicConsume(config.Queue.FullQueueName, config.NoAck, consumer);
_logger.LogDebug($"Setting up a consumer on channel '{channelTask.Result.ChannelNumber}' for queue {config.Queue.QueueName} with NoAck set to {config.NoAck}.");
});
Task.WaitAll(subscriberTask);
}

public override void Dispose()
public void Dispose()
{
_logger.LogDebug("Disposing Subscriber.");
base.Dispose();
(_consumerFactory as IDisposable)?.Dispose();
(_channelFactory as IDisposable)?.Dispose();
(_topologyProvider as IDisposable)?.Dispose();
}
}
}

0 comments on commit 6a5635d

Please sign in to comment.