diff --git a/Brighter.sln b/Brighter.sln index 07ea55e672..bfc14e3234 100644 --- a/Brighter.sln +++ b/Brighter.sln @@ -187,6 +187,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Paramore.Brighter.Extension EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Paramore.Brighter.Inbox.Postgres", "src\Paramore.Brighter.Inbox.Postgres\Paramore.Brighter.Inbox.Postgres.csproj", "{DE191819-35B1-40C8-9118-E169633C5041}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.AzureServiceBus.Tests", "tests\Paramore.Brighter.AzureServiceBus.Tests\Paramore.Brighter.AzureServiceBus.Tests.csproj", "{48F584DF-0BA1-4485-A612-14FD4F6A4CF7}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -965,6 +967,18 @@ Global {DE191819-35B1-40C8-9118-E169633C5041}.Release|Mixed Platforms.Build.0 = Release|Any CPU {DE191819-35B1-40C8-9118-E169633C5041}.Release|x86.ActiveCfg = Release|Any CPU {DE191819-35B1-40C8-9118-E169633C5041}.Release|x86.Build.0 = Release|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Debug|x86.ActiveCfg = Debug|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Debug|x86.Build.0 = Debug|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Release|Any CPU.Build.0 = Release|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Release|x86.ActiveCfg = Release|Any CPU + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1021,6 +1035,7 @@ Global {B853A06F-5DFD-4093-979B-B08D1BBDD2D6} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD} {C2355F42-792D-4964-B068-79186F887C6D} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD} {D2B52F80-5106-4981-9031-A02C5EB479B4} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD} + {48F584DF-0BA1-4485-A612-14FD4F6A4CF7} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8B7C7E31-2E32-4E0D-9426-BC9AF22E9F4C} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusChannelFactory.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusChannelFactory.cs new file mode 100644 index 0000000000..b3f04f40f1 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusChannelFactory.cs @@ -0,0 +1,35 @@ +using System; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus +{ + public class AzureServiceBusChannelFactory : IAmAChannelFactory + { + private readonly AzureServiceBusConsumerFactory _azureServiceBusConsumerFactory; + + public AzureServiceBusChannelFactory(AzureServiceBusConsumerFactory azureServiceBusConsumerFactory) + { + _azureServiceBusConsumerFactory = azureServiceBusConsumerFactory; + } + + public IAmAChannel CreateChannel(Subscription subscription) + { + if (!(subscription is AzureServiceBusSubscription azureServiceBusSubscription)) + { + throw new ConfigurationException("We expect an AzureServiceBusSubscription or AzureServiceBusSubscription as a parameter"); + } + + if (subscription.TimeoutInMiliseconds < 400) + { + throw new ArgumentException("The minimum allowed timeout is 400 milliseconds"); + } + + IAmAMessageConsumer messageConsumer = _azureServiceBusConsumerFactory.Create(azureServiceBusSubscription); + + return new Channel( + channelName: subscription.ChannelName, + messageConsumer: messageConsumer, + maxQueueLength: subscription.BufferSize + ); + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConfiguration.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConfiguration.cs new file mode 100644 index 0000000000..137bcc1c95 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConfiguration.cs @@ -0,0 +1,12 @@ +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus +{ + public class AzureServiceBusConfiguration + { + public AzureServiceBusConfiguration(string connectionString) + { + ConnectionString = connectionString; + } + + public string ConnectionString { get; } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs new file mode 100644 index 0000000000..f9fe69b0ba --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs @@ -0,0 +1,193 @@ +using System; +using System.Collections.Generic; +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Management; +using Paramore.Brighter.Logging; +using Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus +{ + public class AzureServiceBusConsumer : IAmAMessageConsumer + { + private readonly string _topicName; + private readonly IAmAMessageProducer _messageProducer; + private readonly IManagementClientWrapper _managementClientWrapper; + private readonly IMessageReceiverProvider _messageReceiverProvider; + private readonly int _batchSize; + private IMessageReceiverWrapper _messageReceiver; + private readonly string _subscriptionName; + private bool _subscriptionCreated; + private static readonly Lazy _logger = new Lazy(LogProvider.For); + + public AzureServiceBusConsumer(string topicName, string subscriptionName, IAmAMessageProducer messageProducer, IManagementClientWrapper managementClientWrapper, + IMessageReceiverProvider messageReceiverProvider, int batchSize = 10) + { + _subscriptionName = subscriptionName; + _topicName = topicName; + _messageProducer = messageProducer; + _managementClientWrapper = managementClientWrapper; + _messageReceiverProvider = messageReceiverProvider; + _batchSize = batchSize; + + GetMessageReceiverProvider(); + } + + private void GetMessageReceiverProvider() + { + _logger.Value.Info($"Getting message receiver provider for topic {_topicName} and subscription {_subscriptionName}..."); + try + { + _messageReceiver = _messageReceiverProvider.Get(_topicName, _subscriptionName, ReceiveMode.ReceiveAndDelete); + } + catch (Exception e) + { + _logger.Value.ErrorException($"Failed to get message receiver provider for topic {_topicName} and subscription {_subscriptionName}.", e); + } + } + + public Message[] Receive(int timeoutInMilliseconds) + { + _logger.Value.Debug($"Preparing to retrieve next message(s) from topic {_topicName} via subscription {_subscriptionName} with timeout {timeoutInMilliseconds} and batch size {_batchSize}."); + + IEnumerable messages; + EnsureSubscription(); + + var messagesToReturn = new List(); + + try + { + messages = _messageReceiver.Receive(_batchSize, TimeSpan.FromMilliseconds(timeoutInMilliseconds)).GetAwaiter().GetResult(); + } + catch (Exception e) + { + if (_messageReceiver.IsClosedOrClosing) + { + _logger.Value.Debug("Message Receiver is closing..."); + var message = new Message(new MessageHeader(Guid.NewGuid(), _topicName, MessageType.MT_QUIT), new MessageBody(string.Empty)); + messagesToReturn.Add(message); + return messagesToReturn.ToArray(); + } + + _logger.Value.ErrorException("Failing to receive messages.", e); + + //The connection to Azure Service bus may have failed so we re-establish the connection. + GetMessageReceiverProvider(); + + throw new ChannelFailureException("Failing to receive messages.", e); + } + + foreach (IBrokeredMessageWrapper azureServiceBusMessage in messages) + { + Message message = MapToBrighterMessage(azureServiceBusMessage); + messagesToReturn.Add(message); + } + + return messagesToReturn.ToArray(); + } + + private Message MapToBrighterMessage(IBrokeredMessageWrapper azureServiceBusMessage) + { + if (azureServiceBusMessage.MessageBodyValue == null) + { + _logger.Value.Warn($"Null message body received from topic {_topicName} via subscription {_subscriptionName}."); + } + + var messageBody = System.Text.Encoding.Default.GetString(azureServiceBusMessage.MessageBodyValue ?? Array.Empty()); + _logger.Value.Debug($"Received message from topic {_topicName} via subscription {_subscriptionName} with body {messageBody}."); + MessageType messageType = GetMessageType(azureServiceBusMessage); + var message = new Message(new MessageHeader(Guid.NewGuid(), _topicName, messageType), new MessageBody(messageBody)); + return message; + } + + private static MessageType GetMessageType(IBrokeredMessageWrapper azureServiceBusMessage) + { + if (azureServiceBusMessage.UserProperties == null ||!azureServiceBusMessage.UserProperties.ContainsKey("MessageType")) + return MessageType.MT_EVENT; + + if (Enum.TryParse(azureServiceBusMessage.UserProperties["MessageType"].ToString(), true, out MessageType messageType)) + return messageType; + + return MessageType.MT_EVENT; + } + + private void EnsureSubscription() + { + const int maxDeliveryCount = 2000; + + if (_subscriptionCreated) + return; + + try + { + if (_managementClientWrapper.SubscriptionExists(_topicName, _subscriptionName)) + { + _subscriptionCreated = true; + return; + } + + _managementClientWrapper.CreateSubscription(_topicName, _subscriptionName, maxDeliveryCount); + _subscriptionCreated = true; + } + catch (MessagingEntityAlreadyExistsException) + { + _logger.Value.Warn($"Message entity already exists with topic {_topicName} and subscription {_subscriptionName}."); + _subscriptionCreated = true; + } + catch (Exception e) + { + _logger.Value.ErrorException("Failing to check or create subscription.", e); + + //The connection to Azure Service bus may have failed so we re-establish the connection. + _managementClientWrapper.Reset(); + + throw new ChannelFailureException("Failing to check or create subscription", e); + } + } + + public void Requeue(Message message, int delayMilliseconds) + { + var topic = message.Header.Topic; + + _logger.Value.Info($"Requeuing message with topic {topic} and id {message.Id}."); + + if (delayMilliseconds > 0) + { + _messageProducer.SendWithDelay(message, delayMilliseconds); + } + else + { + _messageProducer.Send(message); + } + } + + public void Acknowledge(Message message) + { + //Not implemented as we use ReceiveMode.ReceiveAndDelete (Brighter will call this method anyway) + } + + public void Reject(Message message) + { + _logger.Value.Warn("Reject method NOT IMPLEMENTED."); + } + + public void Reject(Message message, bool requeue) + { + if (requeue) + { + Requeue(message, 0); + } + } + + public void Purge() + { + _logger.Value.Warn("Purge method NOT IMPLEMENTED."); + } + + public void Dispose() + { + _logger.Value.Info("Disposing the consumer..."); + _messageReceiver.Close(); + _logger.Value.Info("Consumer disposed."); + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumerFactory.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumerFactory.cs new file mode 100644 index 0000000000..5210c9b523 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumerFactory.cs @@ -0,0 +1,24 @@ +using Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus +{ + public class AzureServiceBusConsumerFactory : IAmAMessageConsumerFactory + { + private readonly AzureServiceBusConfiguration _configuration; + + public AzureServiceBusConsumerFactory(AzureServiceBusConfiguration configuration) + { + _configuration = configuration; + } + + public IAmAMessageConsumer Create(Subscription subscription) + { + var nameSpaceManagerWrapper = new ManagementClientWrapper(_configuration); + + return new AzureServiceBusConsumer(subscription.RoutingKey, subscription.ChannelName, + new AzureServiceBusMessageProducer(nameSpaceManagerWrapper, + new TopicClientProvider(_configuration)), nameSpaceManagerWrapper, + new MessageReceiverProvider(_configuration)); + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs index 5ae75be305..f45dd01d85 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs @@ -1,83 +1,118 @@ -#region Licence -/* The MIT License (MIT) -Copyright © 2015 Yiannis Triantafyllopoulos - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the “Software”), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. */ -#endregion - -using System; -using Newtonsoft.Json; +using System; +using System.Threading; using Paramore.Brighter.Logging; +using Polly; +using Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers; +using Polly.Retry; namespace Paramore.Brighter.MessagingGateway.AzureServiceBus { - public class AzureServiceBusMessageProducer : MessageGateway, IAmAMessageProducer + public class AzureServiceBusMessageProducer : IAmAMessageProducer { + private readonly IManagementClientWrapper _managementClientWrapper; + private readonly ITopicClientProvider _topicClientProvider; + private bool _topicCreated; private static readonly Lazy _logger = new Lazy(LogProvider.For); + private const int TopicConnectionSleepBetweenRetriesInMilliseconds = 100; + private const int TopicConnectionRetryCount = 5; - private readonly MessageSenderPool _pool; - - public AzureServiceBusMessageProducer(AzureServiceBusMessagingGatewayConfiguration configuration) - : base(configuration) + public AzureServiceBusMessageProducer(IManagementClientWrapper managementClientWrapper, ITopicClientProvider topicClientProvider) { - _pool = new MessageSenderPool(); + _managementClientWrapper = managementClientWrapper; + _topicClientProvider = topicClientProvider; } - /// - /// Sends the specified message. - /// - /// The message. - /// Task. - public void Send(Message message) + public void Send(Message message) { - _logger.Value.DebugFormat("AzureServiceBusMessageProducer: Publishing message to topic {0}", message.Header.Topic); - var messageSender = _pool.GetMessageSender(message.Header.Topic); - EnsureTopicExists(message.Header.Topic); - messageSender.Send(new Amqp.Message(message.Body.Value)); - _logger.Value.DebugFormat("AzureServiceBusMessageProducer: Published message with id {0} to topic '{1}' and content: {2}", message.Id, message.Header.Topic, JsonConvert.SerializeObject(message)); + SendWithDelay(message); } - /// - /// Sends the specified message. - /// - /// The message. - /// The sending delay - /// Task. - public void SendWithDelay(Message message, int delayMilliseconds = 0) + public void SendWithDelay(Message message, int delayMilliseconds = 0) { - Send(message); - } + _logger.Value.Debug($"Preparing to send message on topic {message.Header.Topic}"); - public bool DelaySupported => false; + EnsureTopicExists(message.Header.Topic); - ~AzureServiceBusMessageProducer() - { - Dispose(false); + ITopicClient topicClient; + + try + { + RetryPolicy policy = Policy + .Handle() + .Retry(TopicConnectionRetryCount, (exception, retryNumber) => + { + _logger.Value.ErrorException( + $"Failed to connect to topic {message.Header.Topic}, retrying...", exception); + + Thread.Sleep(TimeSpan.FromMilliseconds(TopicConnectionSleepBetweenRetriesInMilliseconds)); + } + ); + + topicClient = policy.Execute(() => _topicClientProvider.Get(message.Header.Topic)); + } + catch (Exception e) + { + _logger.Value.ErrorException($"Failed to connect to topic {message.Header.Topic}, aborting.", e); + throw; + } + + try + { + _logger.Value.Debug( + $"Publishing message to topic {message.Header.Topic} with a delay of {delayMilliseconds} and body {message.Body.Value} and id {message.Id}."); + + var azureServiceBusMessage = new Microsoft.Azure.ServiceBus.Message(message.Body.Bytes); + azureServiceBusMessage.UserProperties.Add("MessageType", message.Header.MessageType.ToString()); + if (delayMilliseconds == 0) + { + topicClient.Send(azureServiceBusMessage); + } + else + { + var dateTimeOffset = new DateTimeOffset(DateTime.UtcNow.AddMilliseconds(delayMilliseconds)); + topicClient.ScheduleMessage(azureServiceBusMessage, dateTimeOffset); + } + + _logger.Value.Debug( + $"Published message to topic {message.Header.Topic} with a delay of {delayMilliseconds} and body {message.Body.Value} and id {message.Id}"); + } + catch (Exception e) + { + _logger.Value.ErrorException($"Failed to publish message to topic {message.Header.Topic} with id {message.Id}, message will not be retried.", e); + } + finally + { + topicClient.Close(); + } } - protected override void Dispose(bool disposing) + private void EnsureTopicExists(string topic) { - if (disposing) + if (_topicCreated) + return; + + try + { + if (_managementClientWrapper.TopicExists(topic)) + { + _topicCreated = true; + return; + } + + _managementClientWrapper.CreateTopic(topic); + _topicCreated = true; + } + catch (Exception e) { - _pool?.Dispose(); + //The connection to Azure Service bus may have failed so we re-establish the connection. + _managementClientWrapper.Reset(); + _logger.Value.ErrorException("Failing to check or create topic.", e); + throw; } - base.Dispose(disposing); + } + + public void Dispose() + { } } } diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducerFactory.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducerFactory.cs new file mode 100644 index 0000000000..afd23bc717 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducerFactory.cs @@ -0,0 +1,15 @@ +using Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus +{ + public static class AzureServiceBusMessageProducerFactory + { + public static AzureServiceBusMessageProducer Get(AzureServiceBusConfiguration configuration) + { + var nameSpaceManagerWrapper = new ManagementClientWrapper(configuration); + var topicClientProvider = new TopicClientProvider(configuration); + + return new AzureServiceBusMessageProducer(nameSpaceManagerWrapper, topicClientProvider); + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagingGatewayConfiguration.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagingGatewayConfiguration.cs deleted file mode 100644 index f7effe3b79..0000000000 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagingGatewayConfiguration.cs +++ /dev/null @@ -1,51 +0,0 @@ -#region Licence -/* The MIT License (MIT) -Copyright © 2015 Yiannis Triantafyllopoulos - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the “Software”), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. */ -#endregion - -using System; - -namespace Paramore.Brighter.MessagingGateway.AzureServiceBus -{ - public class AzureServiceBusMessagingGatewayConfiguration - { - public Namespace Namespace { get; set; } - - public SharedAccessPolicy SharedAccessPolicy { get; set; } - } - - public class Namespace - { - /// - /// The base address of the URI of the service bus - we use this to access resources via REST - /// - public Uri BaseAddress { get; set; } - - public string Name { get; set; } - } - - public class SharedAccessPolicy - { - public string Name { get; set; } - - public string Key { get; set; } - } -} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs new file mode 100644 index 0000000000..047e370de6 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs @@ -0,0 +1,50 @@ +using System; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus +{ + public class AzureServiceBusSubscription : Subscription + { + public AzureServiceBusSubscription( + Type dataType, + SubscriptionName name = null, + ChannelName channelName = null, + RoutingKey routingKey = null, + int bufferSize = 1, + int noOfPerformers = 1, + int timeoutInMilliseconds = 300, + int requeueCount = -1, + int requeueDelayInMilliseconds = 0, + int unacceptableMessageLimit = 0, + bool isAsync = false, + IAmAChannelFactory channelFactory = null, + OnMissingChannel makeChannels = OnMissingChannel.Create) + : base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, + requeueCount, requeueDelayInMilliseconds, unacceptableMessageLimit, isAsync, channelFactory, + makeChannels) + { + } + } + + public class AzureServiceBusSubscription : AzureServiceBusSubscription where T : IRequest + { + public AzureServiceBusSubscription( + Type dataType, + SubscriptionName name = null, + ChannelName channelName = null, + RoutingKey routingKey = null, + int bufferSize = 1, + int noOfPerformers = 1, + int timeoutInMilliseconds = 300, + int requeueCount = -1, + int requeueDelayInMilliseconds = 0, + int unacceptableMessageLimit = 0, + bool isAsync = false, + IAmAChannelFactory channelFactory = null, + OnMissingChannel makeChannels = OnMissingChannel.Create) + : base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, + requeueCount, requeueDelayInMilliseconds, unacceptableMessageLimit, isAsync, channelFactory, + makeChannels) + { + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/BrokeredMessageWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/BrokeredMessageWrapper.cs new file mode 100644 index 0000000000..b4070fa468 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/BrokeredMessageWrapper.cs @@ -0,0 +1,18 @@ +using System.Collections.Generic; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public class BrokeredMessageWrapper : IBrokeredMessageWrapper + { + private readonly Microsoft.Azure.ServiceBus.Message _brokeredMessage; + + public BrokeredMessageWrapper(Microsoft.Azure.ServiceBus.Message brokeredMessage) + { + _brokeredMessage = brokeredMessage; + } + + public byte[] MessageBodyValue => _brokeredMessage.Body; + + public IDictionary UserProperties => _brokeredMessage.UserProperties; + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IBrokeredMessageWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IBrokeredMessageWrapper.cs new file mode 100644 index 0000000000..9c52d135bc --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IBrokeredMessageWrapper.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public interface IBrokeredMessageWrapper + { + byte[] MessageBodyValue { get; } + + IDictionary UserProperties { get; } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IManagementClientWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IManagementClientWrapper.cs new file mode 100644 index 0000000000..2506bbb44c --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IManagementClientWrapper.cs @@ -0,0 +1,15 @@ +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public interface IManagementClientWrapper + { + bool TopicExists(string topic); + + void CreateTopic(string topic); + + bool SubscriptionExists(string topicName, string subscriptionName); + + void CreateSubscription(string topicName, string subscriptionName, int maxDeliveryCount); + + void Reset(); + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IMessageReceiverProvider.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IMessageReceiverProvider.cs new file mode 100644 index 0000000000..66ba3e5762 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IMessageReceiverProvider.cs @@ -0,0 +1,9 @@ +using Microsoft.Azure.ServiceBus; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public interface IMessageReceiverProvider + { + IMessageReceiverWrapper Get(string topicName, string subscriptionName, ReceiveMode receiveMode); + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IMessageReceiverWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IMessageReceiverWrapper.cs new file mode 100644 index 0000000000..29a6997e5a --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IMessageReceiverWrapper.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public interface IMessageReceiverWrapper + { + Task> Receive(int batchSize, TimeSpan serverWaitTime); + + void Close(); + + bool IsClosedOrClosing { get; } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ITopicClient.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ITopicClient.cs new file mode 100644 index 0000000000..dc5b2d7403 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ITopicClient.cs @@ -0,0 +1,11 @@ +using System; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public interface ITopicClient + { + void Send(Microsoft.Azure.ServiceBus.Message message); + void ScheduleMessage(Microsoft.Azure.ServiceBus.Message message, DateTimeOffset scheduleEnqueueTime); + void Close(); + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ITopicClientProvider.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ITopicClientProvider.cs new file mode 100644 index 0000000000..9b69c7bedb --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ITopicClientProvider.cs @@ -0,0 +1,7 @@ +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public interface ITopicClientProvider + { + ITopicClient Get(string topic); + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ManagementClientWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ManagementClientWrapper.cs new file mode 100644 index 0000000000..a632e30127 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ManagementClientWrapper.cs @@ -0,0 +1,142 @@ +using System; +using Microsoft.Azure.ServiceBus.Management; +using Paramore.Brighter.Logging; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public class ManagementClientWrapper : IManagementClientWrapper + { + private ManagementClient _managementClient; + private static readonly Lazy Logger = new Lazy(LogProvider.For); + private readonly AzureServiceBusConfiguration _configuration; + + public ManagementClientWrapper(AzureServiceBusConfiguration configuration) + { + _configuration = configuration; + Initialise(); + } + + private void Initialise() + { + Logger.Value.Debug("Initialising new management client wrapper..."); + + try + { + if (_configuration == null) + { + throw new ArgumentNullException(nameof(_configuration), "Configuration is null, ensure this is set in the constructor."); + } + + _managementClient = new ManagementClient(_configuration.ConnectionString); + } + catch (Exception e) + { + Logger.Value.ErrorException("Failed to initialise new management client wrapper.", e); + throw; + } + + Logger.Value.Debug("New management client wrapper initialised."); + } + + public void Reset() + { + Logger.Value.Warn("Resetting management client wrapper..."); + Initialise(); + } + + public bool TopicExists(string topic) + { + Logger.Value.Debug($"Checking if topic {topic} exists..."); + + bool result; + + try + { + result = _managementClient.TopicExistsAsync(topic).GetAwaiter().GetResult(); + } + catch (Exception e) + { + Logger.Value.ErrorException($"Failed to check if topic {topic} exists.", e); + throw; + } + + if (result) + { + Logger.Value.Debug($"Topic {topic} exists."); + } + else + { + Logger.Value.Warn($"Topic {topic} does not exist."); + } + + return result; + } + + public void CreateTopic(string topic) + { + Logger.Value.Info($"Creating topic {topic}..."); + + try + { + _managementClient.CreateTopicAsync(topic).GetAwaiter().GetResult(); + } + catch (Exception e) + { + Logger.Value.ErrorException($"Failed to create topic {topic}.", e); + throw; + } + + Logger.Value.Info($"Topic {topic} created."); + } + + public bool SubscriptionExists(string topicName, string subscriptionName) + { + Logger.Value.Debug($"Checking if subscription {subscriptionName} for topic {topicName} exists..."); + + bool result; + + try + { + result =_managementClient.SubscriptionExistsAsync(topicName, subscriptionName).Result; + } + catch (Exception e) + { + Logger.Value.ErrorException($"Failed to check if subscription {subscriptionName} for topic {topicName} exists.", e); + throw; + } + + if (result) + { + Logger.Value.Debug($"Subscription {subscriptionName} for topic {topicName} exists."); + } + else + { + Logger.Value.Warn($"Subscription {subscriptionName} for topic {topicName} does not exist."); + } + + return result; + } + + public void CreateSubscription(string topicName, string subscriptionName, int maxDeliveryCount = 2000) + { + Logger.Value.Info($"Creating subscription {subscriptionName} for topic {topicName}..."); + + var subscriptionDescription = new SubscriptionDescription(topicName, subscriptionName) + { + MaxDeliveryCount = maxDeliveryCount + }; + + try + { + _managementClient.CreateSubscriptionAsync(subscriptionDescription).Wait(); + } + catch (Exception e) + { + Logger.Value.ErrorException($"Failed to create subscription {subscriptionName} for topic {topicName}.", e); + throw; + } + + Logger.Value.Info($"Subscription {subscriptionName} for topic {topicName} created."); + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/MessageReceiverProvider.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/MessageReceiverProvider.cs new file mode 100644 index 0000000000..baa4f0ef94 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/MessageReceiverProvider.cs @@ -0,0 +1,22 @@ +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public class MessageReceiverProvider : IMessageReceiverProvider + { + private readonly string _connectionString; + + public MessageReceiverProvider(AzureServiceBusConfiguration configuration) + { + _connectionString = configuration.ConnectionString; + } + + public IMessageReceiverWrapper Get(string topicName, string subscriptionName, ReceiveMode receiveMode) + { + var subscriptionPath = EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName); + var messageReceiver = new MessageReceiver(_connectionString, subscriptionPath, receiveMode, RetryPolicy.Default); + return new MessageReceiverWrapper(messageReceiver); + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/MessageReceiverWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/MessageReceiverWrapper.cs new file mode 100644 index 0000000000..8c28668b1d --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/MessageReceiverWrapper.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.ServiceBus.Core; +using Paramore.Brighter.Logging; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public class MessageReceiverWrapper : IMessageReceiverWrapper + { + private readonly IMessageReceiver _messageReceiver; + private static readonly Lazy Logger = new Lazy(LogProvider.For); + + public MessageReceiverWrapper(IMessageReceiver messageReceiver) + { + _messageReceiver = messageReceiver; + } + + public async Task> Receive(int batchSize, TimeSpan serverWaitTime) + { + var messages = await _messageReceiver.ReceiveAsync(batchSize, serverWaitTime).ConfigureAwait(false); + + if (messages == null) + { + return new List(); + } + return messages.Select(x => new BrokeredMessageWrapper(x)); + } + + public void Close() + { + Logger.Value.Warn("Closing the MessageReceiver connection"); + _messageReceiver.CloseAsync().GetAwaiter().GetResult(); + Logger.Value.Warn("MessageReceiver connection stopped"); + } + + public bool IsClosedOrClosing => _messageReceiver.IsClosedOrClosing; + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/TopicClientProvider.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/TopicClientProvider.cs new file mode 100644 index 0000000000..f1887fa3e7 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/TopicClientProvider.cs @@ -0,0 +1,20 @@ +using Microsoft.Azure.ServiceBus; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public class TopicClientProvider : ITopicClientProvider + { + private readonly string _connectionString; + + public TopicClientProvider(AzureServiceBusConfiguration configuration) + { + _connectionString = configuration.ConnectionString; + } + + public ITopicClient Get(string topic) + { + var topicClient = new TopicClient(_connectionString, topic); + return new TopicClientWrapper(topicClient); + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/TopicClientWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/TopicClientWrapper.cs new file mode 100644 index 0000000000..2cd993df6c --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/TopicClientWrapper.cs @@ -0,0 +1,42 @@ +using System; +using System.Transactions; +using Microsoft.Azure.ServiceBus; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers +{ + public class TopicClientWrapper : ITopicClient + { + private readonly TopicClient _topicClient; + + public TopicClientWrapper(TopicClient topicClient) + { + _topicClient = topicClient; + } + + public void Send(Microsoft.Azure.ServiceBus.Message message) + { + //Azure Service Bus only supports IsolationLevel.Serializable if in a transaction + if (Transaction.Current != null && Transaction.Current.IsolationLevel != IsolationLevel.Serializable) + { + using (new TransactionScope(TransactionScopeOption.Suppress)) + { + _topicClient.SendAsync(message).Wait(); + } + } + else + { + _topicClient.SendAsync(message).Wait(); + } + } + + public void ScheduleMessage(Microsoft.Azure.ServiceBus.Message message, DateTimeOffset scheduleEnqueueTime) + { + _topicClient.ScheduleMessageAsync(message, scheduleEnqueueTime).Wait(); + } + + public void Close() + { + _topicClient.CloseAsync().Wait(); + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/MessageGateway.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/MessageGateway.cs deleted file mode 100644 index 527c4a55a0..0000000000 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/MessageGateway.cs +++ /dev/null @@ -1,126 +0,0 @@ -#region Licence -/* The MIT License (MIT) -Copyright © 2015 Yiannis Triantafyllopoulos - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the “Software”), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. */ -#endregion - -using System; -using System.Globalization; -using System.Net; -using System.Net.Http; -using System.Net.Http.Headers; -using System.Security.Cryptography; -using System.Text; -using Paramore.Brighter.Logging; - -namespace Paramore.Brighter.MessagingGateway.AzureServiceBus -{ - /// - /// Provides access to Azure Service Bus, ensuring that the required topics exist. - /// Uses the REST API for Azure Service Bus as the AMQP .NET Lite library only supports AMQP 1.0 - /// and thus has no understanding of notions like queues, subscriptions, and topics. - /// - public class MessageGateway : IDisposable - { - private static readonly Lazy _logger = new Lazy(LogProvider.For); - - private readonly AzureServiceBusMessagingGatewayConfiguration _configuration; - private readonly HttpClient _client = new HttpClient(); - - public MessageGateway(AzureServiceBusMessagingGatewayConfiguration configuration) - { - _configuration = configuration; - _client.BaseAddress = _configuration.Namespace.BaseAddress; - } - - protected void EnsureTopicExists(string topicName) - { - if (!TopicExists(topicName)) - { - PutTopic(topicName); - } - } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - ~MessageGateway() - { - Dispose(false); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - _client.Dispose(); - } - } - - private string GetSASToken(string SASKeyName, string SASKeyValue) - { - var fromEpochStart = DateTime.UtcNow - new DateTime(1970, 1, 1); - var expiry = Convert.ToString((int)fromEpochStart.TotalSeconds + 3600); - var stringToSign = WebUtility.UrlEncode(_client.BaseAddress.AbsoluteUri) + "\n" + expiry; - var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(SASKeyValue)); - - var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign))); - var sasToken = String.Format(CultureInfo.InvariantCulture, "SharedAccessSignature sr={0}&sig={1}&se={2}&skn={3}", - WebUtility.UrlEncode(_client.BaseAddress.AbsoluteUri), WebUtility.UrlEncode(signature), expiry, SASKeyName); - return sasToken; - } - - - private void PutTopic(string topicName) - { - _client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( - GetSASToken(_configuration.SharedAccessPolicy.Name, _configuration.SharedAccessPolicy.Key) - ); - - _logger.Value.DebugFormat("\nCreating topic {0}", topicName); - - // Prepare the body of the create queue request - var requestBody = @" " + topicName + @" "; - - var response = _client.PutAsync(topicName, new StringContent(requestBody)).Result; - - if (!response.IsSuccessStatusCode) - { - throw new ConfigurationException($"Error creating topic on Azure Service BUS. Management API returned a {response.StatusCode} with {response.Content.ReadAsStringAsync().Result}"); - } - } - - private bool TopicExists(string topicName) - { - _client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( - GetSASToken(_configuration.SharedAccessPolicy.Name, _configuration.SharedAccessPolicy.Key) - ); - - _logger.Value.DebugFormat("\nCreating topic {0}", topicName); - - var response = _client.GetAsync(topicName).Result; - - return !response.IsSuccessStatusCode; - } - } -} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/MessageSenderPool.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/MessageSenderPool.cs deleted file mode 100644 index 905395d11a..0000000000 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/MessageSenderPool.cs +++ /dev/null @@ -1,93 +0,0 @@ -#region Licence -/* The MIT License (MIT) -Copyright © 2015 Yiannis Triantafyllopoulos - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the “Software”), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. */ -#endregion - -using System; -using System.Collections.Concurrent; -using System.Linq; -using Amqp; -using Paramore.Brighter.Logging; - -namespace Paramore.Brighter.MessagingGateway.AzureServiceBus -{ - public class MessageSenderPool : IDisposable - { - private static readonly Lazy _logger = new Lazy(LogProvider.For); - - private readonly ConcurrentDictionary _senders = new ConcurrentDictionary(); - private bool _closing; - - public SenderLink GetMessageSender(string topic) - { - if (_senders.ContainsKey(topic)) - return _senders[topic]; - - Address address = new Address("amqp://guest:guest@localhost:5672"); - Amqp.Connection connection = Amqp.Connection.Factory.CreateAsync(address).Result; - Session session = new Session(connection); - SenderLink sender = new SenderLink(session, "sender", topic); - - _senders.TryAdd(topic, sender); - sender.Closed += Sender_Closed; - return sender; - } - - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - ~MessageSenderPool() - { - Dispose(false); - } - - protected virtual void Dispose(bool disposing) - { - _closing = true; - if (disposing) - { - foreach (var sender in _senders.Values) - { - sender.Close(); - } - } - } - - private void Sender_Closed(IAmqpObject sender, Amqp.Framing.Error error) - { - if (!_closing) - { - var senderLink = (SenderLink) sender; - var key = - _senders.ToArray().Where(pair => pair.Value == senderLink).Select(pair => pair.Key).FirstOrDefault(); - if (key != null) - { - SenderLink returnedLink; - _senders.TryRemove(key, out returnedLink); - } - } - } - } -} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/Paramore.Brighter.MessagingGateway.AzureServiceBus.csproj b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/Paramore.Brighter.MessagingGateway.AzureServiceBus.csproj index 6bacdfee87..98e71ace27 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/Paramore.Brighter.MessagingGateway.AzureServiceBus.csproj +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/Paramore.Brighter.MessagingGateway.AzureServiceBus.csproj @@ -6,9 +6,10 @@ awssqs;AMQP;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability - + + \ No newline at end of file diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusChannelFactoryTests.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusChannelFactoryTests.cs new file mode 100644 index 0000000000..7190799c53 --- /dev/null +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusChannelFactoryTests.cs @@ -0,0 +1,23 @@ +using System; +using Xunit; +using Paramore.Brighter.MessagingGateway.AzureServiceBus; + +namespace Paramore.Brighter.AzureServiceBus.Tests +{ + + public class AzureServiceBusChannelFactoryTests + { + [Fact] + public void When_the_timeout_is_below_400_ms_it_should_throw_an_exception() + { + var factory = new AzureServiceBusChannelFactory(new AzureServiceBusConsumerFactory(new AzureServiceBusConfiguration("someString"))); + + var subscription = new AzureServiceBusSubscription(typeof(object), new SubscriptionName("name"), new ChannelName("name"), new RoutingKey("name"), + 1, 1, 399); + + ArgumentException exception = Assert.Throws(() => factory.CreateChannel(subscription)); + + Assert.Equal("The minimum allowed timeout is 400 milliseconds", exception.Message); + } + } +} diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs new file mode 100644 index 0000000000..834d40cfa9 --- /dev/null +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs @@ -0,0 +1,374 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Management; +using Moq; +using Paramore.Brighter.MessagingGateway.AzureServiceBus; +using Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers; +using Xunit; + +namespace Paramore.Brighter.AzureServiceBus.Tests +{ + public class AzureServiceBusConsumerTests + { + private readonly Mock _nameSpaceManagerWrapper; + private readonly AzureServiceBusConsumer _azureServiceBusConsumer; + private readonly Mock _messageReceiver; + private readonly Mock _mockMessageProducer; + private readonly Mock _mockMessageReceiver; + + public AzureServiceBusConsumerTests() + { + _nameSpaceManagerWrapper = new Mock(); + _mockMessageProducer = new Mock(); + _mockMessageReceiver = new Mock(); + + _messageReceiver = new Mock(); + + _mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ReceiveMode.ReceiveAndDelete)).Returns(_messageReceiver.Object); + + _azureServiceBusConsumer = new AzureServiceBusConsumer("topic", "subscription", _mockMessageProducer.Object, + _nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object); + } + + [Fact] + public void When_a_subscription_exists_and_messages_are_in_the_queue_the_messages_are_returned() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(true); + + var brokeredMessageList = new List(); + var message1 = new Mock(); + + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "MT_EVENT" } }); + var message2 = new Mock(); + + message2.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody2")); + message2.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "MT_DOCUMENT" } }); + brokeredMessageList.Add(message1.Object); + brokeredMessageList.Add(message2.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + Assert.Equal("somebody", result[0].Body.Value); + Assert.Equal("topic", result[0].Header.Topic); + Assert.Equal(MessageType.MT_EVENT, result[0].Header.MessageType); + + Assert.Equal("somebody2", result[1].Body.Value); + Assert.Equal("topic", result[1].Header.Topic); + Assert.Equal(MessageType.MT_DOCUMENT, result[1].Header.MessageType); + } + + [Fact] + public void When_a_subscription_does_not_exist_and_messages_are_in_the_queue_then_the_subscription_is_created_and_messages_are_returned() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(false); + var brokeredMessageList = new List(); + var message1 = new Mock(); + + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "MT_EVENT" } }); + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", 2000)); + Assert.Equal("somebody", result[0].Body.Value); + } + + [Fact] + public void When_a_message_is_a_command_type_then_the_message_type_is_set_correctly() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(true); + + var brokeredMessageList = new List(); + var message1 = new Mock(); + + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "MT_COMMAND" } }); + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + Assert.Equal("somebody", result[0].Body.Value); + Assert.Equal("topic", result[0].Header.Topic); + Assert.Equal(MessageType.MT_COMMAND, result[0].Header.MessageType); + } + + [Fact] + public void When_a_message_is_a_command_type_and_it_is_specified_in_funny_casing_then_the_message_type_is_set_correctly() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(true); + + var brokeredMessageList = new List(); + var message1 = new Mock(); + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "Mt_COmmAND" } }); + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + Assert.Equal("somebody", result[0].Body.Value); + Assert.Equal("topic", result[0].Header.Topic); + Assert.Equal(MessageType.MT_COMMAND, result[0].Header.MessageType); + } + + [Fact] + public void When_the_specified_message_type_is_unknown_then_it_should_default_to_MT_EVENT() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(true); + + var brokeredMessageList = new List(); + var message1 = new Mock(); + + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "wrong_message_type" } }); + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + Assert.Equal(MessageType.MT_EVENT, result[0].Header.MessageType); + } + + [Fact] + public void When_the_message_type_is_not_specified_it_should_default_to_MT_EVENT() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(true); + + var brokeredMessageList = new List(); + var message1 = new Mock(); + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(new Dictionary()); + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + Assert.Equal("somebody", result[0].Body.Value); + Assert.Equal("topic", result[0].Header.Topic); + Assert.Equal(MessageType.MT_EVENT, result[0].Header.MessageType); + } + + [Fact] + public void When_the_user_properties_on_the_azure_sb_message_is_null_it_should_default_to_message_type_to_MT_EVENT() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(true); + + + var brokeredMessageList = new List(); + var message1 = new Mock(); + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(null as IDictionary); + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + Assert.Equal("somebody", result[0].Body.Value); + Assert.Equal("topic", result[0].Header.Topic); + Assert.Equal(MessageType.MT_EVENT, result[0].Header.MessageType); + } + + [Fact] + public void When_there_are_no_messages_then_it_returns_an_empty_array() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(true); + var brokeredMessageList = new List(); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + Assert.Empty(result); + } + + [Fact] + public void When_trying_to_create_a_subscription_which_was_already_created_by_another_thread_it_should_ignore_the_error() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(false); + _nameSpaceManagerWrapper.Setup(f => f.CreateSubscription("topic", "subscription", 2000)).Throws(new MessagingEntityAlreadyExistsException("whatever")); + + var brokeredMessageList = new List(); + var message1 = new Mock(); + + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "MT_EVENT" } }); + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", 2000)); + Assert.Equal("somebody", result[0].Body.Value); + } + + [Fact] + public void When_reject_is_called_with_requeue_the_message_requeued() + { + var messageHeader = new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_EVENT); + + var message = new Message(messageHeader, new MessageBody("body")); + + _azureServiceBusConsumer.Reject(message, true); + + _mockMessageProducer.Verify(x => x.Send(message), Times.Once); + } + + [Fact] + public void When_dispose_is_called_the_close_method_is_called() + { + _azureServiceBusConsumer.Dispose(); + + _messageReceiver.Verify(x => x.Close(), Times.Once); + } + + [Fact] + public void When_requeue_is_called_and_the_delay_is_zero_the_send_method_is_called() + { + var messageLockTokenOne = Guid.NewGuid(); + var messageHeader = new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_EVENT); + var message = new Message(messageHeader, new MessageBody("body")); + message.Header.Bag.Add("LockToken", messageLockTokenOne); + + _azureServiceBusConsumer.Requeue(message, 0); + + _mockMessageProducer.Verify(x => x.Send(message), Times.Once); + } + + [Fact] + public void When_requeue_is_called_and_the_delay_is_more_than_zero_the_sendWithDelay_method_is_called() + { + var messageLockTokenOne = Guid.NewGuid(); + var messageHeader = new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_EVENT); + var message = new Message(messageHeader, new MessageBody("body")); + message.Header.Bag.Add("LockToken", messageLockTokenOne); + + _azureServiceBusConsumer.Requeue(message, 100); + + _mockMessageProducer.Verify(x => x.SendWithDelay(message, 100), Times.Once); + } + + [Fact] + public void + When_there_is_an_error_talking_to_servicebus_when_checking_if_subscription_exist_then_a_ChannelFailureException_is_raised() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Throws(new Exception()); + + Assert.Throws(() => _azureServiceBusConsumer.Receive(400)); + } + + [Fact] + public void When_there_is_an_error_talking_to_servicebus_when_creating_the_subscription_then_a_ChannelFailureException_is_raised_and_ManagementClientWrapper_is_reinitilised() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(false); + _nameSpaceManagerWrapper.Setup(f => f.CreateSubscription("topic", "subscription", 2000)).Throws(new Exception()); + + Assert.Throws(() => _azureServiceBusConsumer.Receive(400)); + _nameSpaceManagerWrapper.Verify(managementClientWrapper => managementClientWrapper.Reset(), Times.Once); + } + + [Fact] + public void When_there_is_an_error_talking_to_servicebus_when_receiving_then_a_ChannelFailureException_is_raised_and_the_messageReceiver_is_recreated() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(true); + + _messageReceiver.Setup(f => f.Receive(It.IsAny(), It.IsAny())).Throws(new Exception()); + + Assert.Throws(() => _azureServiceBusConsumer.Receive(400)); + _mockMessageReceiver.Verify(x => x.Get(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(2)); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void Once_the_subscription_is_created_or_exits_it_does_not_check_if_it_exists_every_time(bool subscriptionExists) + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(subscriptionExists); + var brokeredMessageList = new List(); + var message1 = new Mock(); + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "MT_EVENT" } }); + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + _azureServiceBusConsumer.Receive(400); + _azureServiceBusConsumer.Receive(400); + + if (subscriptionExists == false) + { + _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", 2000), Times.Once); + } + + _nameSpaceManagerWrapper.Verify(f => f.SubscriptionExists("topic", "subscription"), Times.Once); + } + + [Fact] + public void When_MessagingEntityAlreadyExistsException_does_not_check_if_subscription_exists() + { + _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(false); + _nameSpaceManagerWrapper.Setup(f => f.CreateSubscription("topic", "subscription", 2000)).Throws(new MessagingEntityAlreadyExistsException("whatever")); + + var brokeredMessageList = new List(); + var message1 = new Mock(); + + message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); + message1.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "MT_EVENT" } }); + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + _azureServiceBusConsumer.Receive(400); + + _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", 2000)); + Assert.Equal("somebody", result[0].Body.Value); + + _nameSpaceManagerWrapper.Verify(f => f.SubscriptionExists("topic", "subscription"), Times.Once); + } + + [Fact] + public void When_a_message_contains_a_null_body_message_is_still_processed() + { + var brokeredMessageList = new List(); + var message1 = new Mock(); + + message1.Setup(x => x.MessageBodyValue).Returns((byte[])null); + message1.Setup(m => m.UserProperties).Returns(new Dictionary() { { "MessageType", "MT_EVENT" } }); + + brokeredMessageList.Add(message1.Object); + + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + Assert.Equal(string.Empty, result[0].Body.Value); + } + + [Fact] + public void When_receiving_messages_and_the_receiver_is_closing_a_MT_QUIT_message_is_sent() + { + _messageReceiver.Setup(x => x.IsClosedOrClosing).Returns(true); + _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Throws(new Exception("Closing")); + + Message[] result = _azureServiceBusConsumer.Receive(400); + + Assert.Equal(MessageType.MT_QUIT, result[0].Header.MessageType); + + } + } +} diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusMessageProducerTests.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusMessageProducerTests.cs new file mode 100644 index 0000000000..c9d635bb38 --- /dev/null +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusMessageProducerTests.cs @@ -0,0 +1,197 @@ +using System; +using System.Text; +using Moq; +using Paramore.Brighter.MessagingGateway.AzureServiceBus; +using Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers; +using Xunit; + +namespace Paramore.Brighter.AzureServiceBus.Tests +{ + public class AzureServiceBusMessageProducerTests + { + private readonly Mock _nameSpaceManagerWrapper; + private readonly Mock _topicClientProvider; + private readonly Mock _topicClient; + private readonly AzureServiceBusMessageProducer _producer; + + public AzureServiceBusMessageProducerTests() + { + _nameSpaceManagerWrapper = new Mock(); + _topicClientProvider = new Mock(); + _topicClient = new Mock(); + + _producer = new AzureServiceBusMessageProducer(_nameSpaceManagerWrapper.Object, _topicClientProvider.Object); + } + + [Fact] + public void When_the_topic_exists_and_sending_a_message_with_no_delay_it_should_send_the_message_to_the_correct_topicclient() + { + Microsoft.Azure.ServiceBus.Message sentMessage = null; + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Returns(true); + _topicClientProvider.Setup(f => f.Get("topic")).Returns(_topicClient.Object); + _topicClient.Setup(f => f.Send(It.IsAny())).Callback((Microsoft.Azure.ServiceBus.Message g) => sentMessage = g); + + _producer.Send(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_EVENT), new MessageBody(messageBody, "JSON"))); + + Assert.Equal(messageBody, sentMessage.Body); + Assert.Equal("MT_EVENT", sentMessage.UserProperties["MessageType"]); + _topicClient.Verify(x => x.Close(), Times.Once); + } + + [Fact] + public void When_sending_a_command_message_type_message_with_no_delay_it_should_set_the_correct_messagetype_property() + { + Microsoft.Azure.ServiceBus.Message sentMessage = null; + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Returns(true); + _topicClientProvider.Setup(f => f.Get("topic")).Returns(_topicClient.Object); + _topicClient.Setup(f => f.Send(It.IsAny())).Callback((Microsoft.Azure.ServiceBus.Message g) => sentMessage = g); + + _producer.Send(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_COMMAND), new MessageBody(messageBody, "JSON"))); + + Assert.Equal(messageBody, sentMessage.Body); + Assert.Equal("MT_COMMAND", sentMessage.UserProperties["MessageType"]); + _topicClient.Verify(x => x.Close(), Times.Once); + } + + [Fact] + public void When_the_topic_does_not_exist_it_should_be_created_and_the_message_is_sent_to_the_correct_topicclient() + { + Microsoft.Azure.ServiceBus.Message sentMessage = null; + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Returns(false); + _topicClientProvider.Setup(f => f.Get("topic")).Returns(_topicClient.Object); + _topicClient.Setup(f => f.Send(It.IsAny())).Callback((Microsoft.Azure.ServiceBus.Message g) => sentMessage = g); + + _producer.Send(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_NONE), new MessageBody(messageBody, "JSON"))); + + _nameSpaceManagerWrapper.Verify(x => x.CreateTopic("topic"), Times.Once); + Assert.Equal(messageBody, sentMessage.Body); + } + + [Fact] + public void When_a_message_is_send_and_an_exception_occurs_close_is_still_called() + { + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Returns(true); + _topicClientProvider.Setup(f => f.Get("topic")).Returns(_topicClient.Object); + + _topicClient.Setup(x => x.Send(It.IsAny())).Throws(new Exception("Failed")); + + try + { + _producer.Send(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_NONE), new MessageBody("Message", "JSON"))); + } + catch (Exception) + { + // ignored + } + + _topicClient.Verify(x => x.Close(), Times.Once); + } + + [Fact] + public void When_the_topic_exists_and_sending_a_message_with_a_delay_it_should_send_the_message_to_the_correct_topicclient() + { + Microsoft.Azure.ServiceBus.Message sentMessage = null; + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Returns(true); + _topicClientProvider.Setup(f => f.Get("topic")).Returns(_topicClient.Object); + _topicClient.Setup(f => f.ScheduleMessage(It.IsAny(), It.IsAny())).Callback((Microsoft.Azure.ServiceBus.Message g, DateTimeOffset d) => sentMessage = g); + + _producer.SendWithDelay(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_EVENT), new MessageBody(messageBody, "JSON")), 1); + + Assert.Equal(messageBody, sentMessage.Body); + Assert.Equal("MT_EVENT", sentMessage.UserProperties["MessageType"]); + _topicClient.Verify(x => x.Close(), Times.Once); + } + + [Fact] + public void When_sending_a_command_message_type_message_with_delay_it_should_set_the_correct_messagetype_property() + { + Microsoft.Azure.ServiceBus.Message sentMessage = null; + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Returns(true); + _topicClientProvider.Setup(f => f.Get("topic")).Returns(_topicClient.Object); + _topicClient.Setup(f => f.ScheduleMessage(It.IsAny(), It.IsAny())).Callback((Microsoft.Azure.ServiceBus.Message g, DateTimeOffset d) => sentMessage = g); + + _producer.SendWithDelay(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_COMMAND), new MessageBody(messageBody, "JSON")), 1); + + Assert.Equal(messageBody, sentMessage.Body); + Assert.Equal("MT_COMMAND", sentMessage.UserProperties["MessageType"]); + _topicClient.Verify(x => x.Close(), Times.Once); + } + + [Fact] + public void When_the_topic_does_not_exist_and_sending_a_message_with_a_delay_it_should_send_the_message_to_the_correct_topicclient() + { + Microsoft.Azure.ServiceBus.Message sentMessage = null; + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Returns(false); + _topicClientProvider.Setup(f => f.Get("topic")).Returns(_topicClient.Object); + _topicClient.Setup(f => f.ScheduleMessage(It.IsAny(), It.IsAny())).Callback((Microsoft.Azure.ServiceBus.Message g, DateTimeOffset d) => sentMessage = g); + + _producer.SendWithDelay(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_NONE), new MessageBody(messageBody, "JSON")), 1); + + _nameSpaceManagerWrapper.Verify(x => x.CreateTopic("topic"), Times.Once); + Assert.Equal(messageBody, sentMessage.Body); + _topicClient.Verify(x => x.Close(), Times.Once); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void Once_the_topic_is_created_it_then_does_not_check_if_it_exists_every_time(bool topicExists) + { + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Returns(topicExists); + _topicClientProvider.Setup(f => f.Get("topic")).Returns(_topicClient.Object); + _topicClient.Setup(f => f.ScheduleMessage(It.IsAny(), It.IsAny())).Callback((Microsoft.Azure.ServiceBus.Message g, DateTimeOffset d) => { }); + + _producer.SendWithDelay(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_NONE), new MessageBody(messageBody, "JSON")), 1); + _producer.SendWithDelay(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_NONE), new MessageBody(messageBody, "JSON")), 1); + + if (topicExists == false) + { + _nameSpaceManagerWrapper.Verify(x => x.CreateTopic("topic"), Times.Once); + } + + _nameSpaceManagerWrapper.Verify(x => x.TopicExists("topic"), Times.Once); + } + + [Fact] + public void When_there_is_an_error_talking_to_servicebus_when_creating_the_topic_the_ManagementClientWrapper_is_reinitilised() + { + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Throws(new Exception()); + + Assert.Throws(() => _producer.SendWithDelay(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_NONE), new MessageBody(messageBody, "JSON")), 1)); + _nameSpaceManagerWrapper.Verify(managementClientWrapper => managementClientWrapper.Reset(), Times.Once); + } + + + [Fact] + public void When_there_is_an_error_getting_a_topic_client_the_connection_for_topic_client_is_retried() + { + var messageBody = Encoding.UTF8.GetBytes("A message body"); + + _nameSpaceManagerWrapper.Setup(t => t.TopicExists("topic")).Returns(true); + + _topicClientProvider.SetupSequence(f => f.Get("topic")) + .Throws(new Exception()) + .Returns(_topicClient.Object); + + _producer.SendWithDelay(new Message(new MessageHeader(Guid.NewGuid(), "topic", MessageType.MT_NONE), new MessageBody(messageBody, "JSON"))); + + _topicClient.Verify(topicClient => topicClient.Send(It.IsAny()), Times.Once); + } + } +} diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/Paramore.Brighter.AzureServiceBus.Tests.csproj b/tests/Paramore.Brighter.AzureServiceBus.Tests/Paramore.Brighter.AzureServiceBus.Tests.csproj new file mode 100644 index 0000000000..a4028698ad --- /dev/null +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/Paramore.Brighter.AzureServiceBus.Tests.csproj @@ -0,0 +1,27 @@ + + + + netcoreapp3.1 + + false + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + +