From b7cedb76a6efe61d7a3f705145cbabc038a6825c Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Wed, 5 May 2021 15:21:50 +0100 Subject: [PATCH] Support Publisher Confirms from RMQ (#1494) * Support Publisher Confirms from RMQ * RMQ does not support a SendAsync method as there is no async version of publish. * We don't want the SendAsync path to fail for RMQ, but the RMQ client doesn't have async support. So we wrap in a task completion source Co-authored-by: Toby Henderson Co-authored-by: toby.henderson --- .../GreetingsSender/Program.cs | 5 + .../ChannelFactory.cs | 13 +-- .../RmqMessageProducer.cs | 78 +++++++++++++++- .../RmqMessagePublisher.cs | 7 +- .../RmqPublication.cs | 7 +- src/Paramore.Brighter/CommandProcessor.cs | 6 +- src/Paramore.Brighter/Publication.cs | 3 +- ...ing_a_message_via_the_messaging_gateway.cs | 91 ++++++++++++++++++ ...message_via_the_messaging_gateway_async.cs | 92 +++++++++++++++++++ 9 files changed, 280 insertions(+), 22 deletions(-) create mode 100644 tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway.cs create mode 100644 tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway_async.cs diff --git a/samples/RMQTaskQueueSamples/GreetingsSender/Program.cs b/samples/RMQTaskQueueSamples/GreetingsSender/Program.cs index 8218ec8121..ba91406244 100644 --- a/samples/RMQTaskQueueSamples/GreetingsSender/Program.cs +++ b/samples/RMQTaskQueueSamples/GreetingsSender/Program.cs @@ -52,9 +52,14 @@ static void Main(string[] args) AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672")), Exchange = new Exchange("paramore.brighter.exchange"), }; + var producer = new RmqMessageProducer(rmqConnection, new RmqPublication { + MaxOutStandingMessages = 5, + MaxOutStandingCheckIntervalMilliSeconds = 500, + WaitForConfirmsTimeOutInMilliseconds = 1000, MakeChannels =OnMissingChannel.Create + }); serviceCollection.AddBrighter(options => diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs index b2159d1dfd..2f48ef0a5e 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs @@ -4,14 +4,12 @@ using System.Net; using System.Text.Json; using System.Threading; -using Amazon; using Amazon.Runtime.Internal; using Amazon.SimpleNotificationService; using Amazon.SimpleNotificationService.Model; using Amazon.SQS; using Amazon.SQS.Model; using Microsoft.Extensions.Logging; -using Paramore.Brighter.Logging; using Polly; using Polly.Contrib.WaitAndRetry; using Polly.Retry; @@ -24,8 +22,7 @@ public class ChannelFactory : AWSMessagingGateway, IAmAChannelFactory private SqsSubscription _subscription; private string _queueUrl; private string _dlqARN; - private RetryPolicy _retryPolicy; - + private readonly RetryPolicy _retryPolicy; /// /// Initializes a new instance of the class. /// @@ -292,12 +289,12 @@ private void SubscribeToTopic(AmazonSQSClient sqsClient, AmazonSimpleNotificatio } } - private string ToSecondsAsString(int timeountInMilliseconds) + private string ToSecondsAsString(int timeoutInMilliseconds) { int timeOutInSeconds = 0; - if (timeountInMilliseconds >= 1000) - timeOutInSeconds = timeountInMilliseconds / 1000; - else if (timeountInMilliseconds > 0) + if (timeoutInMilliseconds >= 1000) + timeOutInSeconds = timeoutInMilliseconds / 1000; + else if (timeoutInMilliseconds > 0) timeOutInSeconds = 1; return Convert.ToString(timeOutInSeconds); diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageProducer.cs index d21086fe92..8291fd7c94 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageProducer.cs @@ -23,11 +23,14 @@ THE SOFTWARE. */ #endregion using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.IO; using System.Text.Json; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Paramore.Brighter.Logging; +using RabbitMQ.Client.Events; namespace Paramore.Brighter.MessagingGateway.RMQ { @@ -36,15 +39,19 @@ namespace Paramore.Brighter.MessagingGateway.RMQ /// The is used by a client to talk to a server and abstracts the infrastructure for inter-process communication away from clients. /// It handles subscription establishment, request sending and error handling /// - public class RmqMessageProducer : RmqMessageGateway, IAmAMessageProducer, IAmAMessageProducerAsync + public class RmqMessageProducer : RmqMessageGateway, IAmAMessageProducer, IAmAMessageProducerAsync, ISupportPublishConfirmation, IDisposable { + public event Action OnMessagePublished; public int MaxOutStandingMessages { get; set; } = -1; public int MaxOutStandingCheckIntervalMilliSeconds { get; set; } = 0; - - private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); + + private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); static readonly object _lock = new object(); - private readonly Publication _publication; + private readonly RmqPublication _publication; + private readonly ConcurrentDictionary _pendingConfirmations = new ConcurrentDictionary(); + private bool _confirmsSelected = false; + private readonly int _waitForConfirmsTimeOutInMilliseconds; /// /// Initializes a new instance of the class. @@ -69,6 +76,7 @@ public RmqMessageProducer(RmqMessagingGatewayConnection connection, RmqPublicati _publication = publication ?? new RmqPublication{MakeChannels = OnMissingChannel.Create}; MaxOutStandingMessages = _publication.MaxOutStandingMessages; MaxOutStandingCheckIntervalMilliSeconds = _publication.MaxOutStandingCheckIntervalMilliSeconds; + _waitForConfirmsTimeOutInMilliseconds = _publication.WaitForConfirmsTimeOutInMilliseconds; } /// @@ -98,11 +106,18 @@ public void SendWithDelay(Message message, int delayMilliseconds = 0) var rmqMessagePublisher = new RmqMessagePublisher(Channel, Connection); message.Persist = Connection.PersistMessages; + Channel.BasicAcks += OnPublishSucceeded; + Channel.BasicNacks += OnPublishFailed; + Channel.ConfirmSelect(); + _confirmsSelected = true; + s_logger.LogDebug( "RmqMessageProducer: Publishing message to exchange {ExchangeName} on subscription {URL} with a delay of {Delay} and topic {Topic} and persisted {Persist} and id {Id} and body: {Request}", Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri(), delayMilliseconds, message.Header.Topic, message.Persist, message.Id, message.Body.Value); + + _pendingConfirmations.TryAdd(Channel.NextPublishSeqNo, message.Id); if (DelaySupported) { @@ -132,9 +147,62 @@ public void SendWithDelay(Message message, int delayMilliseconds = 0) } } + /// + /// Sends the specified message + /// NOTE: RMQ's client has no async support, so this is not actually async and will block whilst it sends + /// + /// + /// public Task SendAsync(Message message) { - throw new NotImplementedException(); + var tcs = new TaskCompletionSource(); + Send(message); + tcs.SetResult(new object()); + return tcs.Task; + } + + + public sealed override void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + if (Channel != null && Channel.IsOpen && _confirmsSelected) + { + //In the event this fails, then consequence is not marked as sent in outbox + //As we are disposing, just let that happen + Channel.WaitForConfirms(TimeSpan.FromMilliseconds(_waitForConfirmsTimeOutInMilliseconds), out bool timedOut); + if (timedOut) + s_logger.LogWarning("Failed to await publisher confirms when shutting down!"); + } + } + + base.Dispose(disposing); + } + + private void OnPublishFailed(object sender, BasicNackEventArgs e) + { + if (_pendingConfirmations.TryGetValue(e.DeliveryTag, out Guid messageId)) + { + OnMessagePublished?.Invoke(false, messageId); + _pendingConfirmations.TryRemove(e.DeliveryTag, out Guid msgId); + s_logger.LogDebug("Failed to publish message: {MessageId}", messageId); + } + } + + private void OnPublishSucceeded(object sender, BasicAckEventArgs e) + { + if (_pendingConfirmations.TryGetValue(e.DeliveryTag, out Guid messageId)) + { + OnMessagePublished?.Invoke(true, messageId); + _pendingConfirmations.TryRemove(e.DeliveryTag, out Guid msgId); + s_logger.LogInformation("Published message: {MessageId}", messageId); + } } } } diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessagePublisher.cs index 54360aae67..88bcff9898 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessagePublisher.cs @@ -74,11 +74,12 @@ public RmqMessagePublisher(IModel channel, RmqMessagingGatewayConnection connect throw new ArgumentNullException(nameof(connection)); } - _channel = channel; _connection = connection; - } + + _channel = channel; + } - /// + /// /// Publishes the message. /// /// The message. diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqPublication.cs b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqPublication.cs index b64d4d1818..b2dff64294 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqPublication.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqPublication.cs @@ -2,6 +2,11 @@ { public class RmqPublication : Publication { - //Placeholder + /// + /// How long should we wait on shutdown for the broker to finish confirming delivery of messages + /// If we shut down without confirmation then messages will not be marked as sent in the Outbox + /// Any sweeper will then resend. + /// + public int WaitForConfirmsTimeOutInMilliseconds { get; set; } = 500; } } diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index 34f3fef23e..8a09fdcb0e 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -245,7 +245,7 @@ public CommandProcessor( _featureSwitchRegistry = featureSwitchRegistry; _inboxConfiguration = inboxConfiguration; - ConfigureAsyncPublisherCalllbackMaybe(); + ConfigureAsyncPublisherCallbackMaybe(); } /// @@ -364,7 +364,7 @@ public CommandProcessor( _inboxConfiguration = inboxConfiguration; //Only register one, to avoid two callbacks where we support both interfaces on a producer - if (!ConfigurePublisherCallbackMaybe()) ConfigureAsyncPublisherCalllbackMaybe(); + if (!ConfigurePublisherCallbackMaybe()) ConfigureAsyncPublisherCallbackMaybe(); } /// @@ -883,7 +883,7 @@ private void CheckOutstandingMessages() _lastOutStandingMessageCheckAt = DateTime.UtcNow; } - private bool ConfigureAsyncPublisherCalllbackMaybe() + private bool ConfigureAsyncPublisherCallbackMaybe() { if (_asyncMessageProducer == null) return false; diff --git a/src/Paramore.Brighter/Publication.cs b/src/Paramore.Brighter/Publication.cs index 161a990292..65d1068bfb 100644 --- a/src/Paramore.Brighter/Publication.cs +++ b/src/Paramore.Brighter/Publication.cs @@ -50,7 +50,6 @@ public class Publication /// /// What do we do with infrastructure dependencies for the producer? /// - public OnMissingChannel MakeChannels { get; set; } - + public OnMissingChannel MakeChannels { get; set; } } } diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway.cs new file mode 100644 index 0000000000..cc1656ab3f --- /dev/null +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway.cs @@ -0,0 +1,91 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2014 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; +using System.Threading.Tasks; +using FluentAssertions; +using Paramore.Brighter.MessagingGateway.RMQ; +using Xunit; + +namespace Paramore.Brighter.RMQ.Tests.MessagingGateway +{ + [Collection("RMQ")] + [Trait("Category", "RMQ")] + public class RmqMessageProducerConfirmationsSendMessageTests : IDisposable + { + private readonly RmqMessageProducer _messageProducer; + private readonly Message _message; + private bool _messageWasPublished = false; + private bool _messageWasNotPublished = true; + + public RmqMessageProducerConfirmationsSendMessageTests () + { + _message = new Message( + new MessageHeader(Guid.NewGuid(), Guid.NewGuid().ToString(), MessageType.MT_COMMAND), + new MessageBody("test content")); + + var rmqConnection = new RmqMessagingGatewayConnection + { + AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672/%2f")), + Exchange = new Exchange("paramore.brighter.exchange") + }; + + _messageProducer = new RmqMessageProducer(rmqConnection); + _messageProducer.OnMessagePublished += (success, guid) => + { + if (success) + { + guid.Should().Be(_message.Id); + _messageWasPublished = true; + _messageWasNotPublished = false; + } + else + { + _messageWasNotPublished = true; + } + }; + + //we need a queue to avoid a discard + new QueueFactory(rmqConnection, _message.Header.Topic).Create(3000); + } + + [Fact] + public async Task When_confirming_posting_a_message_via_the_messaging_gateway() + { + _messageProducer.Send(_message); + + await Task.Delay(500); + + //if this is true, then possible test failed because of timeout or RMQ issues + _messageWasNotPublished.Should().BeFalse(); + //did we see the message - intent to test logic here + _messageWasPublished.Should().BeTrue(); + } + + public void Dispose() + { + _messageProducer.Dispose(); + } + } +} diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway_async.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway_async.cs new file mode 100644 index 0000000000..2c33f943aa --- /dev/null +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway_async.cs @@ -0,0 +1,92 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2014 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; +using System.Threading.Tasks; +using FluentAssertions; +using Paramore.Brighter.MessagingGateway.RMQ; +using Xunit; + +namespace Paramore.Brighter.RMQ.Tests.MessagingGateway +{ + [Collection("RMQ")] + [Trait("Category", "RMQ")] + public class RmqMessageProducerConfirmationsSendMessageAsyncTests : IDisposable + { + private readonly RmqMessageProducer _messageProducer; + private readonly Message _message; + private bool _messageWasPublished = false; + private bool _messageWasNotPublished = true; + + public RmqMessageProducerConfirmationsSendMessageAsyncTests() + { + _message = new Message( + new MessageHeader(Guid.NewGuid(), Guid.NewGuid().ToString(), MessageType.MT_COMMAND), + new MessageBody("test content")); + + var rmqConnection = new RmqMessagingGatewayConnection + { + AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672/%2f")), + Exchange = new Exchange("paramore.brighter.exchange") + }; + + _messageProducer = new RmqMessageProducer(rmqConnection); + _messageProducer.OnMessagePublished += (success, guid) => + { + if (success) + { + guid.Should().Be(_message.Id); + _messageWasPublished = true; + _messageWasNotPublished = false; + } + else + { + _messageWasNotPublished = true; + } + }; + + //we need a queue to avoid a discard + new QueueFactory(rmqConnection, _message.Header.Topic).Create(3000); + } + + [Fact] + public async Task When_confirming_posting_a_message_via_the_messaging_gateway_async() + { + //The RMQ client doesn't support async, so this is async over sync, but let's check it works all the same + await _messageProducer.SendAsync(_message); + + await Task.Delay(500); + + //if this is true, then possible test failed because of timeout or RMQ issues + _messageWasNotPublished.Should().BeFalse(); + //did we see the message - intent to test logic here + _messageWasPublished.Should().BeTrue(); + } + + public void Dispose() + { + _messageProducer.Dispose(); + } + } +}