Skip to content

Commit

Permalink
(#35) Return 'ISubscription' for Subscribe and Respond
Browse files Browse the repository at this point in the history
  • Loading branch information
par.dahlman committed Mar 19, 2016
1 parent 6c73ed5 commit 495dd70
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 13 deletions.
8 changes: 4 additions & 4 deletions src/RawRabbit/Common/BaseBusClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public BaseBusClient(
_logger.LogInformation("BusClient initialized.");
}

public void SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, Action<ISubscriptionConfigurationBuilder> configuration = null)
public ISubscription SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, Action<ISubscriptionConfigurationBuilder> configuration = null)
{
var config = _configEval.GetConfiguration<T>(configuration);
_logger.LogInformation($"Subscribing to message '{typeof(T).Name}' on exchange '{config.Exchange.ExchangeName}' with routing key {config.RoutingKey}.");
_subscriber.SubscribeAsync(subscribeMethod, config);
return _subscriber.SubscribeAsync(subscribeMethod, config);
}

public Task PublishAsync<T>(T message = default(T), Guid globalMessageId = new Guid(), Action<IPublishConfigurationBuilder> configuration = null)
Expand All @@ -48,11 +48,11 @@ public void SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, Ac
return _publisher.PublishAsync(message, globalMessageId, config);
}

public void RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Task<TResponse>> onMessage, Action<IResponderConfigurationBuilder> configuration = null)
public ISubscription RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Task<TResponse>> onMessage, Action<IResponderConfigurationBuilder> configuration = null)
{
var config = _configEval.GetConfiguration<TRequest, TResponse>(configuration);
_logger.LogInformation($"Responding to to requests '{typeof(TRequest).Name}' with '{typeof(TResponse).Name}'.");
_responder.RespondAsync(onMessage, config);
return _responder.RespondAsync(onMessage, config);
}

public Task<TResponse> RequestAsync<TRequest, TResponse>(TRequest message = default(TRequest), Guid globalMessageId = new Guid(), Action<IRequestConfigurationBuilder> configuration = null)
Expand Down
37 changes: 37 additions & 0 deletions src/RawRabbit/Common/Subscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using RabbitMQ.Client;
using RawRabbit.Consumer.Abstraction;

namespace RawRabbit.Common
{
public interface ISubscription : IDisposable
{
string QueueName { get; }
string ConsumerTag { get; }
}

public class Subscription : ISubscription
{
public string QueueName { get; }
public string ConsumerTag { get; }

private readonly IRawConsumer _consumer;

public Subscription(IRawConsumer consumer, string queueName)
{
_consumer = consumer;
var basicConsumer = consumer as DefaultBasicConsumer;
if (basicConsumer == null)
{
return;
}
QueueName = queueName;
ConsumerTag = basicConsumer.ConsumerTag;
}

public void Dispose()
{
_consumer.Disconnect();
}
}
}
5 changes: 3 additions & 2 deletions src/RawRabbit/IBusClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using RawRabbit.Common;
using RawRabbit.Configuration.Publish;
using RawRabbit.Configuration.Request;
using RawRabbit.Configuration.Respond;
Expand All @@ -10,11 +11,11 @@ namespace RawRabbit
{
public interface IBusClient<out TMessageContext> : IDisposable where TMessageContext : IMessageContext
{
void SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, Action<ISubscriptionConfigurationBuilder> configuration = null);
ISubscription SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, Action<ISubscriptionConfigurationBuilder> configuration = null);

Task PublishAsync<T>(T message = default(T), Guid globalMessageId = new Guid(), Action<IPublishConfigurationBuilder> configuration = null);

void RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Task<TResponse>> onMessage, Action<IResponderConfigurationBuilder> configuration = null);
ISubscription RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Task<TResponse>> onMessage, Action<IResponderConfigurationBuilder> configuration = null);

Task<TResponse> RequestAsync<TRequest, TResponse>(TRequest message = default(TRequest), Guid globalMessageId = new Guid(), Action<IRequestConfigurationBuilder> configuration = null);
}
Expand Down
3 changes: 2 additions & 1 deletion src/RawRabbit/Operations/Abstraction/IResponder.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System;
using System.Threading.Tasks;
using RawRabbit.Common;
using RawRabbit.Configuration.Respond;
using RawRabbit.Context;

namespace RawRabbit.Operations.Abstraction
{
public interface IResponder<out TMessageContext> where TMessageContext : IMessageContext
{
void RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Task<TResponse>> onMessage, ResponderConfiguration cfg);
ISubscription RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Task<TResponse>> onMessage, ResponderConfiguration cfg);
}
}
3 changes: 2 additions & 1 deletion src/RawRabbit/Operations/Abstraction/ISubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System;
using System.Threading.Tasks;
using RawRabbit.Common;
using RawRabbit.Configuration.Subscribe;
using RawRabbit.Context;

namespace RawRabbit.Operations.Abstraction
{
public interface ISubscriber<out TMessageContext> where TMessageContext : IMessageContext
{
void SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, SubscriptionConfiguration config);
ISubscription SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, SubscriptionConfiguration config);
}
}
5 changes: 3 additions & 2 deletions src/RawRabbit/Operations/Responder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Responder(
_consumers = new List<IRawConsumer>();
}

public void RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Task<TResponse>> onMessage, ResponderConfiguration cfg)
public ISubscription RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Task<TResponse>> onMessage, ResponderConfiguration cfg)
{
var topologyTask = _topologyProvider.BindQueueAsync(cfg.Queue, cfg.Exchange, cfg.RoutingKey);
var channelTask = _channelFactory.CreateChannelAsync();
Expand Down Expand Up @@ -87,9 +87,10 @@ public void RespondAsync<TRequest, TResponse>(Func<TRequest, TMessageContext, Ta
});
};
consumer.Model.BasicConsume(cfg.Queue.QueueName, cfg.NoAck, consumer);
return new Subscription(consumer, cfg.Queue.QueueName);
});

Task.WaitAll(respondTask);
return respondTask.Result;
}

public void Dispose()
Expand Down
6 changes: 3 additions & 3 deletions src/RawRabbit/Operations/Subscriber.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Threading.Tasks;
using RawRabbit.Channel;
using RawRabbit.Channel.Abstraction;
using RawRabbit.Common;
using RawRabbit.Configuration.Subscribe;
Expand Down Expand Up @@ -40,7 +39,7 @@ public Subscriber(
_contextEnhancer = contextEnhancer;
}

public void SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, SubscriptionConfiguration config)
public ISubscription SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, SubscriptionConfiguration config)
{
var topologyTask = _topologyProvider.BindQueueAsync(config.Queue, config.Exchange, config.RoutingKey);
var channelTask = _channelFactory.CreateChannelAsync();
Expand All @@ -58,10 +57,11 @@ public void SubscribeAsync<T>(Func<T, TMessageContext, Task> subscribeMethod, Su
return subscribeMethod(body, context);
};
consumer.Model.BasicConsume(config.Queue.FullQueueName, config.NoAck, consumer);
_logger.LogDebug($"Setting up a consumer on channel '{channelTask.Result.ChannelNumber}' for queue {config.Queue.QueueName} with NoAck set to {config.NoAck}.");
return new Subscription(consumer, config.Queue.QueueName);
});
Task.WaitAll(subscriberTask);
return subscriberTask.Result;
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,42 @@ await subscriber.PublishAsync(new BasicMessage
/* Asset */
Assert.Equal(expected: "I am important!", actual: priorityTcs.Task.Result.Prop);
}

[Fact]
public async Task Should_Stop_Subscribe_When_Subscription_Is_Disposed()
{
/**/
var publisher = BusClientFactory.CreateDefault();
var subscriber = BusClientFactory.CreateDefault();
var firstMessage = new BasicMessage {Prop = "Value"};
var secondMessage = new BasicMessage {Prop = "AnotherValue"};
var firstRecievedTcs = new TaskCompletionSource<BasicMessage>();
var secondRecievedTcs = new TaskCompletionSource<BasicMessage>();
var recievedCount = 0;

var subscription = subscriber.SubscribeAsync<BasicMessage>((message, context) =>
{
recievedCount++;
firstRecievedTcs.SetResult(message);
return Task.FromResult(true);
});

await publisher.PublishAsync(firstMessage);
await firstRecievedTcs.Task;
subscription.Dispose();
await publisher.PublishAsync(secondMessage);
await Task.Delay(20);
publisher.SubscribeAsync<BasicMessage>((message, context) =>
{
secondRecievedTcs.SetResult(message);
return Task.FromResult(true);
});
await secondRecievedTcs.Task;

/* Assert */
Assert.Equal(1, recievedCount);
Assert.Equal(firstRecievedTcs.Task.Result.Prop, firstMessage.Prop);
Assert.Equal(secondRecievedTcs.Task.Result.Prop, secondMessage.Prop);
}
}
}

0 comments on commit 495dd70

Please sign in to comment.