From 564c4588f89f968bdc7bf41fd7171531d2a37884 Mon Sep 17 00:00:00 2001 From: Paul Reardon Date: Thu, 18 Mar 2021 20:19:10 +0000 Subject: [PATCH] Plumbed OnMissingChannel through to ASB #1444 --- .../AzureServiceBusConsumer.cs | 11 +++++++++-- .../AzureServiceBusMessageProducer.cs | 11 +++++++++-- .../AzureServiceBusConsumerTests.cs | 14 +++++++++++++- .../AzureServiceBusMessageProducerTests.cs | 12 +++++++++++- 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs index f9fe69b0ba..dd41e4f02f 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs @@ -18,9 +18,10 @@ public class AzureServiceBusConsumer : IAmAMessageConsumer private readonly string _subscriptionName; private bool _subscriptionCreated; private static readonly Lazy _logger = new Lazy(LogProvider.For); + private readonly OnMissingChannel _makeChannel; public AzureServiceBusConsumer(string topicName, string subscriptionName, IAmAMessageProducer messageProducer, IManagementClientWrapper managementClientWrapper, - IMessageReceiverProvider messageReceiverProvider, int batchSize = 10) + IMessageReceiverProvider messageReceiverProvider, int batchSize = 10, OnMissingChannel makeChannels = OnMissingChannel.Create) { _subscriptionName = subscriptionName; _topicName = topicName; @@ -28,6 +29,7 @@ public AzureServiceBusConsumer(string topicName, string subscriptionName, IAmAMe _managementClientWrapper = managementClientWrapper; _messageReceiverProvider = messageReceiverProvider; _batchSize = batchSize; + _makeChannel = makeChannels; GetMessageReceiverProvider(); } @@ -114,7 +116,7 @@ private void EnsureSubscription() { const int maxDeliveryCount = 2000; - if (_subscriptionCreated) + if (_subscriptionCreated || _makeChannel.Equals(OnMissingChannel.Assume)) return; try @@ -125,6 +127,11 @@ private void EnsureSubscription() return; } + if (_makeChannel.Equals(OnMissingChannel.Validate)) + { + throw new ChannelFailureException($"Subscription {_subscriptionName} does not exist on topic {_topicName} and missing channel mode set to Validate."); + } + _managementClientWrapper.CreateSubscription(_topicName, _subscriptionName, maxDeliveryCount); _subscriptionCreated = true; } diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs index f45dd01d85..dc613cb50a 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs @@ -15,11 +15,13 @@ public class AzureServiceBusMessageProducer : IAmAMessageProducer private static readonly Lazy _logger = new Lazy(LogProvider.For); private const int TopicConnectionSleepBetweenRetriesInMilliseconds = 100; private const int TopicConnectionRetryCount = 5; + private readonly OnMissingChannel _makeChannel; - public AzureServiceBusMessageProducer(IManagementClientWrapper managementClientWrapper, ITopicClientProvider topicClientProvider) + public AzureServiceBusMessageProducer(IManagementClientWrapper managementClientWrapper, ITopicClientProvider topicClientProvider, OnMissingChannel makeChannel = OnMissingChannel.Create) { _managementClientWrapper = managementClientWrapper; _topicClientProvider = topicClientProvider; + _makeChannel = makeChannel; } public void Send(Message message) @@ -88,7 +90,7 @@ public void SendWithDelay(Message message, int delayMilliseconds = 0) private void EnsureTopicExists(string topic) { - if (_topicCreated) + if (_topicCreated || _makeChannel.Equals(OnMissingChannel.Assume)) return; try @@ -99,6 +101,11 @@ private void EnsureTopicExists(string topic) return; } + if (_makeChannel.Equals(OnMissingChannel.Validate)) + { + throw new ChannelFailureException($"Topic {topic} does not exist and missing channel mode set to Validate."); + } + _managementClientWrapper.CreateTopic(topic); _topicCreated = true; } diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs index 834d40cfa9..108b083c9f 100644 --- a/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Reflection; using System.Text; using System.Threading.Tasks; using Microsoft.Azure.ServiceBus; @@ -30,7 +31,7 @@ public AzureServiceBusConsumerTests() _mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ReceiveMode.ReceiveAndDelete)).Returns(_messageReceiver.Object); _azureServiceBusConsumer = new AzureServiceBusConsumer("topic", "subscription", _mockMessageProducer.Object, - _nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object); + _nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object, makeChannels: OnMissingChannel.Create); } [Fact] @@ -370,5 +371,16 @@ public void When_receiving_messages_and_the_receiver_is_closing_a_MT_QUIT_messag Assert.Equal(MessageType.MT_QUIT, result[0].Header.MessageType); } + + [Fact] + public void When_a_subscription_does_not_exist_and_Missing_is_set_to_Validate_a_Channel_Failure_is_Raised() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(false); + + var azureServiceBusConsumerValidate = new AzureServiceBusConsumer("topic", "subscription", _mockMessageProducer.Object, + _nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object, makeChannels: OnMissingChannel.Validate); + + Assert.Throws(() => azureServiceBusConsumerValidate.Receive(400)); + } } } diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusMessageProducerTests.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusMessageProducerTests.cs index c9d635bb38..31e82e15a8 100644 --- a/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusMessageProducerTests.cs +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusMessageProducerTests.cs @@ -20,7 +20,7 @@ public AzureServiceBusMessageProducerTests() _topicClientProvider = new Mock(); _topicClient = new Mock(); - _producer = new AzureServiceBusMessageProducer(_nameSpaceManagerWrapper.Object, _topicClientProvider.Object); + _producer = new AzureServiceBusMessageProducer(_nameSpaceManagerWrapper.Object, _topicClientProvider.Object, OnMissingChannel.Create); } [Fact] @@ -193,5 +193,15 @@ public void When_there_is_an_error_getting_a_topic_client_the_connection_for_top _topicClient.Verify(topicClient => topicClient.Send(It.IsAny()), Times.Once); } + + [Fact] + public void When_the_topic_does_not_exist_and_Missing_is_set_to_Validate_an_exception_is_raised() + { + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + var producerValidate = new AzureServiceBusMessageProducer(_nameSpaceManagerWrapper.Object, _topicClientProvider.Object, OnMissingChannel.Validate); + + Assert.Throws(() => producerValidate.Send(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_NONE), new MessageBody(messageBody, "JSON")))); + } } }