Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumbed OnMissingChannel through to ASB #1444 #1446

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ public class AzureServiceBusConsumer : IAmAMessageConsumer
private readonly string _subscriptionName;
private bool _subscriptionCreated;
private static readonly Lazy<ILog> _logger = new Lazy<ILog>(LogProvider.For<AzureServiceBusConsumer>);
private readonly OnMissingChannel _makeChannel;

public AzureServiceBusConsumer(string topicName, string subscriptionName, IAmAMessageProducer messageProducer, IManagementClientWrapper managementClientWrapper,
IMessageReceiverProvider messageReceiverProvider, int batchSize = 10)
IMessageReceiverProvider messageReceiverProvider, int batchSize = 10, OnMissingChannel makeChannels = OnMissingChannel.Create)
{
_subscriptionName = subscriptionName;
_topicName = topicName;
_messageProducer = messageProducer;
_managementClientWrapper = managementClientWrapper;
_messageReceiverProvider = messageReceiverProvider;
_batchSize = batchSize;
_makeChannel = makeChannels;

GetMessageReceiverProvider();
}
Expand Down Expand Up @@ -114,7 +116,7 @@ private void EnsureSubscription()
{
const int maxDeliveryCount = 2000;

if (_subscriptionCreated)
if (_subscriptionCreated || _makeChannel.Equals(OnMissingChannel.Assume))
return;

try
Expand All @@ -125,6 +127,11 @@ private void EnsureSubscription()
return;
}

if (_makeChannel.Equals(OnMissingChannel.Validate))
{
throw new ChannelFailureException($"Subscription {_subscriptionName} does not exist on topic {_topicName} and missing channel mode set to Validate.");
}

_managementClientWrapper.CreateSubscription(_topicName, _subscriptionName, maxDeliveryCount);
_subscriptionCreated = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ public class AzureServiceBusMessageProducer : IAmAMessageProducer
private static readonly Lazy<ILog> _logger = new Lazy<ILog>(LogProvider.For<AzureServiceBusMessageProducer>);
private const int TopicConnectionSleepBetweenRetriesInMilliseconds = 100;
private const int TopicConnectionRetryCount = 5;
private readonly OnMissingChannel _makeChannel;

public AzureServiceBusMessageProducer(IManagementClientWrapper managementClientWrapper, ITopicClientProvider topicClientProvider)
public AzureServiceBusMessageProducer(IManagementClientWrapper managementClientWrapper, ITopicClientProvider topicClientProvider, OnMissingChannel makeChannel = OnMissingChannel.Create)
{
_managementClientWrapper = managementClientWrapper;
_topicClientProvider = topicClientProvider;
_makeChannel = makeChannel;
}

public void Send(Message message)
Expand Down Expand Up @@ -88,7 +90,7 @@ public void SendWithDelay(Message message, int delayMilliseconds = 0)

private void EnsureTopicExists(string topic)
{
if (_topicCreated)
if (_topicCreated || _makeChannel.Equals(OnMissingChannel.Assume))
return;

try
Expand All @@ -99,6 +101,11 @@ private void EnsureTopicExists(string topic)
return;
}

if (_makeChannel.Equals(OnMissingChannel.Validate))
{
throw new ChannelFailureException($"Topic {topic} does not exist and missing channel mode set to Validate.");
}

_managementClientWrapper.CreateTopic(topic);
_topicCreated = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
Expand Down Expand Up @@ -30,7 +31,7 @@ public AzureServiceBusConsumerTests()
_mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ReceiveMode.ReceiveAndDelete)).Returns(_messageReceiver.Object);

_azureServiceBusConsumer = new AzureServiceBusConsumer("topic", "subscription", _mockMessageProducer.Object,
_nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object);
_nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object, makeChannels: OnMissingChannel.Create);
}

[Fact]
Expand Down Expand Up @@ -370,5 +371,16 @@ public void When_receiving_messages_and_the_receiver_is_closing_a_MT_QUIT_messag
Assert.Equal(MessageType.MT_QUIT, result[0].Header.MessageType);

}

[Fact]
public void When_a_subscription_does_not_exist_and_Missing_is_set_to_Validate_a_Channel_Failure_is_Raised()
{
_nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(false);

var azureServiceBusConsumerValidate = new AzureServiceBusConsumer("topic", "subscription", _mockMessageProducer.Object,
_nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object, makeChannels: OnMissingChannel.Validate);

Assert.Throws<ChannelFailureException>(() => azureServiceBusConsumerValidate.Receive(400));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public AzureServiceBusMessageProducerTests()
_topicClientProvider = new Mock<ITopicClientProvider>();
_topicClient = new Mock<ITopicClient>();

_producer = new AzureServiceBusMessageProducer(_nameSpaceManagerWrapper.Object, _topicClientProvider.Object);
_producer = new AzureServiceBusMessageProducer(_nameSpaceManagerWrapper.Object, _topicClientProvider.Object, OnMissingChannel.Create);
}

[Fact]
Expand Down Expand Up @@ -193,5 +193,15 @@ public void When_there_is_an_error_getting_a_topic_client_the_connection_for_top

_topicClient.Verify(topicClient => topicClient.Send(It.IsAny<Microsoft.Azure.ServiceBus.Message>()), Times.Once);
}

[Fact]
public void When_the_topic_does_not_exist_and_Missing_is_set_to_Validate_an_exception_is_raised()
{
var messageBody = Encoding.UTF8.GetBytes("A message body");

var producerValidate = new AzureServiceBusMessageProducer(_nameSpaceManagerWrapper.Object, _topicClientProvider.Object, OnMissingChannel.Validate);

Assert.Throws<ChannelFailureException>(() => producerValidate.Send(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_NONE), new MessageBody(messageBody, "JSON"))));
}
}
}