Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Publisher Confirms from RMQ (#1494) #5

Merged
merged 1 commit into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions samples/RMQTaskQueueSamples/GreetingsSender/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,14 @@ static void Main(string[] args)
AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672")),
Exchange = new Exchange("paramore.brighter.exchange"),
};

var producer = new RmqMessageProducer(rmqConnection, new RmqPublication
{
MaxOutStandingMessages = 5,
MaxOutStandingCheckIntervalMilliSeconds = 500,
WaitForConfirmsTimeOutInMilliseconds = 1000,
MakeChannels =OnMissingChannel.Create

});

serviceCollection.AddBrighter(options =>
Expand Down
13 changes: 5 additions & 8 deletions src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
using System.Net;
using System.Text.Json;
using System.Threading;
using Amazon;
using Amazon.Runtime.Internal;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Amazon.SQS;
using Amazon.SQS.Model;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;
using Polly;
using Polly.Contrib.WaitAndRetry;
using Polly.Retry;
Expand All @@ -24,8 +22,7 @@ public class ChannelFactory : AWSMessagingGateway, IAmAChannelFactory
private SqsSubscription _subscription;
private string _queueUrl;
private string _dlqARN;
private RetryPolicy _retryPolicy;

private readonly RetryPolicy _retryPolicy;
/// <summary>
/// Initializes a new instance of the <see cref="ChannelFactory"/> class.
/// </summary>
Expand Down Expand Up @@ -292,12 +289,12 @@ private void SubscribeToTopic(AmazonSQSClient sqsClient, AmazonSimpleNotificatio
}
}

private string ToSecondsAsString(int timeountInMilliseconds)
private string ToSecondsAsString(int timeoutInMilliseconds)
{
int timeOutInSeconds = 0;
if (timeountInMilliseconds >= 1000)
timeOutInSeconds = timeountInMilliseconds / 1000;
else if (timeountInMilliseconds > 0)
if (timeoutInMilliseconds >= 1000)
timeOutInSeconds = timeoutInMilliseconds / 1000;
else if (timeoutInMilliseconds > 0)
timeOutInSeconds = 1;

return Convert.ToString(timeOutInSeconds);
Expand Down
78 changes: 73 additions & 5 deletions src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ THE SOFTWARE. */
#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;
using RabbitMQ.Client.Events;

namespace Paramore.Brighter.MessagingGateway.RMQ
{
Expand All @@ -36,15 +39,19 @@ namespace Paramore.Brighter.MessagingGateway.RMQ
/// The <see cref="RmqMessageProducer"/> is used by a client to talk to a server and abstracts the infrastructure for inter-process communication away from clients.
/// It handles subscription establishment, request sending and error handling
/// </summary>
public class RmqMessageProducer : RmqMessageGateway, IAmAMessageProducer, IAmAMessageProducerAsync
public class RmqMessageProducer : RmqMessageGateway, IAmAMessageProducer, IAmAMessageProducerAsync, ISupportPublishConfirmation, IDisposable
{
public event Action<bool, Guid> OnMessagePublished;
public int MaxOutStandingMessages { get; set; } = -1;
public int MaxOutStandingCheckIntervalMilliSeconds { get; set; } = 0;
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<RmqMessageProducer>();

private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<RmqMessageProducer>();

static readonly object _lock = new object();
private readonly Publication _publication;
private readonly RmqPublication _publication;
private readonly ConcurrentDictionary<ulong, Guid> _pendingConfirmations = new ConcurrentDictionary<ulong, Guid>();
private bool _confirmsSelected = false;
private readonly int _waitForConfirmsTimeOutInMilliseconds;

/// <summary>
/// Initializes a new instance of the <see cref="RmqMessageGateway" /> class.
Expand All @@ -69,6 +76,7 @@ public RmqMessageProducer(RmqMessagingGatewayConnection connection, RmqPublicati
_publication = publication ?? new RmqPublication{MakeChannels = OnMissingChannel.Create};
MaxOutStandingMessages = _publication.MaxOutStandingMessages;
MaxOutStandingCheckIntervalMilliSeconds = _publication.MaxOutStandingCheckIntervalMilliSeconds;
_waitForConfirmsTimeOutInMilliseconds = _publication.WaitForConfirmsTimeOutInMilliseconds;
}

/// <summary>
Expand Down Expand Up @@ -98,11 +106,18 @@ public void SendWithDelay(Message message, int delayMilliseconds = 0)
var rmqMessagePublisher = new RmqMessagePublisher(Channel, Connection);

message.Persist = Connection.PersistMessages;
Channel.BasicAcks += OnPublishSucceeded;
Channel.BasicNacks += OnPublishFailed;
Channel.ConfirmSelect();
_confirmsSelected = true;


s_logger.LogDebug(
"RmqMessageProducer: Publishing message to exchange {ExchangeName} on subscription {URL} with a delay of {Delay} and topic {Topic} and persisted {Persist} and id {Id} and body: {Request}",
Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri(), delayMilliseconds,
message.Header.Topic, message.Persist, message.Id, message.Body.Value);

_pendingConfirmations.TryAdd(Channel.NextPublishSeqNo, message.Id);

if (DelaySupported)
{
Expand Down Expand Up @@ -132,9 +147,62 @@ public void SendWithDelay(Message message, int delayMilliseconds = 0)
}
}

/// <summary>
/// Sends the specified message
/// NOTE: RMQ's client has no async support, so this is not actually async and will block whilst it sends
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public Task SendAsync(Message message)
{
throw new NotImplementedException();
var tcs = new TaskCompletionSource<object>();
Send(message);
tcs.SetResult(new object());
return tcs.Task;
}


public sealed override void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
if (Channel != null && Channel.IsOpen && _confirmsSelected)
{
//In the event this fails, then consequence is not marked as sent in outbox
//As we are disposing, just let that happen
Channel.WaitForConfirms(TimeSpan.FromMilliseconds(_waitForConfirmsTimeOutInMilliseconds), out bool timedOut);
if (timedOut)
s_logger.LogWarning("Failed to await publisher confirms when shutting down!");
}
}

base.Dispose(disposing);
}

private void OnPublishFailed(object sender, BasicNackEventArgs e)
{
if (_pendingConfirmations.TryGetValue(e.DeliveryTag, out Guid messageId))
{
OnMessagePublished?.Invoke(false, messageId);
_pendingConfirmations.TryRemove(e.DeliveryTag, out Guid msgId);
s_logger.LogDebug("Failed to publish message: {MessageId}", messageId);
}
}

private void OnPublishSucceeded(object sender, BasicAckEventArgs e)
{
if (_pendingConfirmations.TryGetValue(e.DeliveryTag, out Guid messageId))
{
OnMessagePublished?.Invoke(true, messageId);
_pendingConfirmations.TryRemove(e.DeliveryTag, out Guid msgId);
s_logger.LogInformation("Published message: {MessageId}", messageId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ public RmqMessagePublisher(IModel channel, RmqMessagingGatewayConnection connect
throw new ArgumentNullException(nameof(connection));
}

_channel = channel;
_connection = connection;
}

_channel = channel;
}

/// <summary>
/// <summary>
/// Publishes the message.
/// </summary>
/// <param name="message">The message.</param>
Expand Down
7 changes: 6 additions & 1 deletion src/Paramore.Brighter.MessagingGateway.RMQ/RmqPublication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
{
public class RmqPublication : Publication
{
//Placeholder
/// <summary>
/// How long should we wait on shutdown for the broker to finish confirming delivery of messages
/// If we shut down without confirmation then messages will not be marked as sent in the Outbox
/// Any sweeper will then resend.
/// </summary>
public int WaitForConfirmsTimeOutInMilliseconds { get; set; } = 500;
}
}
6 changes: 3 additions & 3 deletions src/Paramore.Brighter/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public CommandProcessor(
_featureSwitchRegistry = featureSwitchRegistry;
_inboxConfiguration = inboxConfiguration;

ConfigureAsyncPublisherCalllbackMaybe();
ConfigureAsyncPublisherCallbackMaybe();
}

/// <summary>
Expand Down Expand Up @@ -364,7 +364,7 @@ public CommandProcessor(
_inboxConfiguration = inboxConfiguration;

//Only register one, to avoid two callbacks where we support both interfaces on a producer
if (!ConfigurePublisherCallbackMaybe()) ConfigureAsyncPublisherCalllbackMaybe();
if (!ConfigurePublisherCallbackMaybe()) ConfigureAsyncPublisherCallbackMaybe();
}

/// <summary>
Expand Down Expand Up @@ -883,7 +883,7 @@ private void CheckOutstandingMessages()
_lastOutStandingMessageCheckAt = DateTime.UtcNow;
}

private bool ConfigureAsyncPublisherCalllbackMaybe()
private bool ConfigureAsyncPublisherCallbackMaybe()
{
if (_asyncMessageProducer == null)
return false;
Expand Down
3 changes: 1 addition & 2 deletions src/Paramore.Brighter/Publication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class Publication
/// <summary>
/// What do we do with infrastructure dependencies for the producer?
/// </summary>
public OnMissingChannel MakeChannels { get; set; }

public OnMissingChannel MakeChannels { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#region Licence
/* The MIT License (MIT)
Copyright © 2014 Ian Cooper <[email protected]>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the “Software”), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. */

#endregion

using System;
using System.Threading.Tasks;
using FluentAssertions;
using Paramore.Brighter.MessagingGateway.RMQ;
using Xunit;

namespace Paramore.Brighter.RMQ.Tests.MessagingGateway
{
[Collection("RMQ")]
[Trait("Category", "RMQ")]
public class RmqMessageProducerConfirmationsSendMessageTests : IDisposable
{
private readonly RmqMessageProducer _messageProducer;
private readonly Message _message;
private bool _messageWasPublished = false;
private bool _messageWasNotPublished = true;

public RmqMessageProducerConfirmationsSendMessageTests ()
{
_message = new Message(
new MessageHeader(Guid.NewGuid(), Guid.NewGuid().ToString(), MessageType.MT_COMMAND),
new MessageBody("test content"));

var rmqConnection = new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672/%2f")),
Exchange = new Exchange("paramore.brighter.exchange")
};

_messageProducer = new RmqMessageProducer(rmqConnection);
_messageProducer.OnMessagePublished += (success, guid) =>
{
if (success)
{
guid.Should().Be(_message.Id);
_messageWasPublished = true;
_messageWasNotPublished = false;
}
else
{
_messageWasNotPublished = true;
}
};

//we need a queue to avoid a discard
new QueueFactory(rmqConnection, _message.Header.Topic).Create(3000);
}

[Fact]
public async Task When_confirming_posting_a_message_via_the_messaging_gateway()
{
_messageProducer.Send(_message);

await Task.Delay(500);

//if this is true, then possible test failed because of timeout or RMQ issues
_messageWasNotPublished.Should().BeFalse();
//did we see the message - intent to test logic here
_messageWasPublished.Should().BeTrue();
}

public void Dispose()
{
_messageProducer.Dispose();
}
}
}
Loading