From 3f2c9ac253e0bba39b45dc1642c002e26eccac1d Mon Sep 17 00:00:00 2001 From: Dominic Hickie Date: Thu, 4 Jul 2024 15:23:26 +0100 Subject: [PATCH 1/2] Backport support for outstanding and dispatched messages across all topics to v9 --- .../DynamoDbOutbox.cs | 236 +++++++++++------- ...e_are_dispatched_messages_in_the_outbox.cs | 118 +++++++++ ..._are_outstanding_messages_in_the_outbox.cs | 60 +++++ 3 files changed, 322 insertions(+), 92 deletions(-) create mode 100644 tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_dispatched_messages_in_the_outbox.cs diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs index 72e81cc01f..e505c11b23 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs @@ -24,9 +24,9 @@ THE SOFTWARE. */ #endregion using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Security.Cryptography; using System.Threading; using System.Threading.Tasks; using Amazon.DynamoDBv2; @@ -46,6 +46,8 @@ public class DynamoDbOutbox : private readonly DynamoDBOperationConfig _dynamoOverwriteTableConfig; private readonly Random _random = new Random(); + private readonly ConcurrentDictionary _topicNames; + public bool ContinueOnCapturedContext { get; set; } /// @@ -67,6 +69,8 @@ public DynamoDbOutbox(IAmazonDynamoDB client, DynamoDbConfiguration configuratio { throw new ArgumentOutOfRangeException(nameof(DynamoDbConfiguration.NumberOfShards), "Maximum number of shards is 20"); } + + _topicNames = new ConcurrentDictionary(); } /// @@ -84,6 +88,8 @@ public DynamoDbOutbox(DynamoDBContext context, DynamoDbConfiguration configurati { throw new ArgumentOutOfRangeException(nameof(DynamoDbConfiguration.NumberOfShards), "Maximum number of shards is 20"); } + + _topicNames = new ConcurrentDictionary(); } /// @@ -110,6 +116,9 @@ public async Task AddAsync(Message message, int outBoxTimeout = -1, Cancellation var expiresAt = GetExpirationTime(); var messageToStore = new MessageItem(message, shard, expiresAt); + // Store the name of the topic as a key in a concurrent dictionary to ensure uniqueness & thread safety + _topicNames.TryAdd(message.Header.Topic, 0); + if (transactionConnectionProvider != null) { await AddToTransactionWrite(messageToStore, (DynamoDbUnitOfWork)transactionConnectionProvider); @@ -120,8 +129,8 @@ public async Task AddAsync(Message message, int outBoxTimeout = -1, Cancellation } } - /// - /// Returns messages that have been successfully dispatched. Eventually consistent. + /// + /// Returns messages that have been successfully dispatched. Eventually consistent. /// /// How long ago was the message dispatched? /// How many messages returned at once? @@ -136,30 +145,46 @@ public IEnumerable DispatchedMessages( int outboxTimeout = -1, Dictionary args = null) { - if (args == null) + return DispatchedMessagesAsync(millisecondsDispatchedSince, pageSize, pageNumber, outboxTimeout, args).GetAwaiter().GetResult(); + } + + /// + /// Get the messages that have been dispatched + /// + /// The number of hours since the message was dispatched + /// The amount to return + /// The Cancellation Token + /// Messages that have already been dispatched + public async Task> DispatchedMessagesAsync(int hoursDispatchedSince, int pageSize = 100, + CancellationToken cancellationToken = default) + { + var milliseconds = TimeSpan.FromHours(hoursDispatchedSince).TotalMilliseconds; + return await DispatchedMessagesAsync(milliseconds, pageSize, cancellationToken: cancellationToken); + } + + /// + /// Retrieves messages that have been sent within the window + /// + /// + /// The number of messages to fetch. + /// The page number. + /// Timeout of sql call. + /// Additional parameters required for search, if any + /// The Cancellation Token + /// List of messages that need to be dispatched. + public async Task> DispatchedMessagesAsync(double millisecondsDispatchedSince, int pageSize = 100, int pageNumber = 1, + int outboxTimeout = -1, Dictionary args = null, CancellationToken cancellationToken = default) + { + if (args == null || !args.ContainsKey("Topic")) { - throw new ArgumentException("Missing required argument", nameof(args)); + return await DispatchedMessagesForAllTopicsAsync(millisecondsDispatchedSince, cancellationToken); } - - var sinceTime = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); - var topic = (string)args["Topic"]; - //in theory this is all values on that index that have a Delivered data (sparse index) - //we just need to filter for ones in the right date range - //As it is a GSI it can't use a consistent read - var queryConfig = new QueryOperationConfig - { - IndexName = _configuration.DeliveredIndexName, - KeyExpression = new KeyTopicDeliveredTimeExpression().Generate(topic, sinceTime), - ConsistentRead = false - }; - - //block async to make this sync - var messages = PageAllMessagesAsync(queryConfig).Result.ToList(); - return messages.Select(msg => msg.ConvertToMessage()); + var topic = (string)args["Topic"]; + return await DispatchedMessagesForTopicAsync(millisecondsDispatchedSince, topic, cancellationToken); } - /// + /// /// /// Finds a command with the specified identifier. /// @@ -257,32 +282,6 @@ public async Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatche } } - public async Task> DispatchedMessagesAsync(double millisecondsDispatchedSince, int pageSize = 100, int pageNumber = 1, - int outboxTimeout = -1, Dictionary args = null, CancellationToken cancellationToken = default) - { - if (args == null) - { - throw new ArgumentException("Missing required argument", nameof(args)); - } - - var sinceTime = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); - var topic = (string)args["Topic"]; - - //in theory this is all values on that index that have a Delivered data (sparse index) - //we just need to filter for ones in the right date range - //As it is a GSI it can't use a consistent read - var queryConfig = new QueryOperationConfig - { - IndexName = _configuration.DeliveredIndexName, - KeyExpression = new KeyTopicDeliveredTimeExpression().Generate(topic, sinceTime), - ConsistentRead = false - }; - - //block async to make this sync - var messages = await PageAllMessagesAsync(queryConfig, cancellationToken); - return messages.Select(msg => msg.ConvertToMessage()); - } - /// /// Update a message to show it is dispatched /// @@ -314,24 +313,12 @@ private static void MarkMessageDispatched(DateTime? dispatchedAt, MessageItem me /// Which page number of messages /// A list of messages that are outstanding for dispatch public IEnumerable OutstandingMessages( - double millisecondsDispatchedSince, - int pageSize = 100, - int pageNumber = 1, - Dictionary args = null) + double millisecondsDispatchedSince, + int pageSize = 100, + int pageNumber = 1, + Dictionary args = null) { - var now = DateTime.UtcNow; - - if (args == null) - { - throw new ArgumentException("Missing required argument", nameof(args)); - } - - var dispatchedTime = now.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); - var topic = (string)args["Topic"]; - - //block async to make this sync - var messages = QueryAllOutstandingShardsAsync(topic, dispatchedTime).Result.ToList(); - return messages.Select(msg => msg.ConvertToMessage()); + return OutstandingMessagesAsync(millisecondsDispatchedSince, pageSize, pageNumber, args).GetAwaiter().GetResult(); } /// @@ -349,24 +336,19 @@ public async Task> OutstandingMessagesAsync( Dictionary args = null, CancellationToken cancellationToken = default) { - var now = DateTime.UtcNow; - - if (args == null) + if (args == null || !args.ContainsKey("Topic")) { - throw new ArgumentException("Missing required argument", nameof(args)); + return await OutstandingMessagesForAllTopicsAsync(millisecondsDispatchedSince, cancellationToken); } - var minimumAge = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); - var topic = (string)args["Topic"]; - - //block async to make this sync - var messages = (await QueryAllOutstandingShardsAsync(topic, minimumAge, cancellationToken)).ToList(); - return messages.Select(msg => msg.ConvertToMessage()); + var topic = args["Topic"].ToString(); + return await OutstandingMessagesForTopicAsync(millisecondsDispatchedSince, topic, cancellationToken); } - public Task GetNumberOfOutstandingMessagesAsync(CancellationToken cancellationToken) + public async Task GetNumberOfOutstandingMessagesAsync(CancellationToken cancellationToken) { - throw new NotImplementedException(); + var messages = await OutstandingMessagesAsync(0, cancellationToken: cancellationToken); + return messages.Count(); } /// @@ -391,29 +373,99 @@ public async Task DeleteAsync(Guid[] messageIds, CancellationToken cancellationT } } - public Task> DispatchedMessagesAsync(int hoursDispatchedSince, int pageSize = 100, - CancellationToken cancellationToken = default) + private Task AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork) { - throw new NotImplementedException(); + var tcs = new TaskCompletionSource(); + var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap(); + + var transaction = dynamoDbUnitOfWork.BeginOrGetTransaction(); + transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}}); + tcs.SetResult(transaction); + return tcs.Task; } - - private Task AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork) - { - var tcs = new TaskCompletionSource(); - var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap(); - - var transaction = dynamoDbUnitOfWork.BeginOrGetTransaction(); - transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}}); - tcs.SetResult(transaction); - return tcs.Task; - } private async Task GetMessage(Guid id, CancellationToken cancellationToken = default) { MessageItem messageItem = await _context.LoadAsync(id.ToString(), _dynamoOverwriteTableConfig, cancellationToken); return messageItem?.ConvertToMessage() ?? new Message(); } - + + private async Task> DispatchedMessagesForAllTopicsAsync( + double millisecondsDispatchedSince, + CancellationToken cancellationToken) + { + var sinceTime = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); + + // Get the list of topic names we need to query over + var topics = _topicNames.Keys.ToList(); + + // Iterate over topics until all messages are retrieved + var messages = new List(); + foreach (var topic in topics) + { + //in theory this is all values on that index that have a Delivered data (sparse index) + //we just need to filter for ones in the right date range + //As it is a GSI it can't use a consistent read + var queryConfig = new QueryOperationConfig + { + IndexName = _configuration.DeliveredIndexName, + KeyExpression = new KeyTopicDeliveredTimeExpression().Generate(topic, sinceTime), + ConsistentRead = false + }; + + messages.AddRange(await PageAllMessagesAsync(queryConfig, cancellationToken)); + } + + return messages.Select(msg => msg.ConvertToMessage()); + } + + private async Task> DispatchedMessagesForTopicAsync( + double millisecondsDispatchedSince, + string topicName, + CancellationToken cancellationToken) + { + var sinceTime = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); + + //in theory this is all values on that index that have a Delivered data (sparse index) + //we just need to filter for ones in the right date range + //As it is a GSI it can't use a consistent read + var queryConfig = new QueryOperationConfig + { + IndexName = _configuration.DeliveredIndexName, + KeyExpression = new KeyTopicDeliveredTimeExpression().Generate(topicName, sinceTime), + ConsistentRead = false + }; + + var messages = await PageAllMessagesAsync(queryConfig, cancellationToken); + return messages.Select(msg => msg.ConvertToMessage()); + } + + private async Task> OutstandingMessagesForAllTopicsAsync(double millisecondsDispatchedSince, CancellationToken cancellationToken) + { + var olderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); + + // Get the list of topic names we need to query over + var topics = _topicNames.Keys.ToList(); + + // Iterate over topics and their associated shards until all messages are retrieved + var results = new List(); + foreach (var topic in topics) + { + results.AddRange(await QueryAllOutstandingShardsAsync(topic, olderThan, cancellationToken)); + } + + return results.Select(msg => msg.ConvertToMessage()); + } + + private async Task> OutstandingMessagesForTopicAsync(double millisecondsDispatchedSince, + string topicName, CancellationToken cancellationToken) + { + var olrderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); + + var messages = (await QueryAllOutstandingShardsAsync(topicName, olrderThan, cancellationToken)).ToList(); + return messages.Select(msg => msg.ConvertToMessage()); + } + private async Task> PageAllMessagesAsync(QueryOperationConfig queryConfig, CancellationToken cancellationToken = default) { var asyncSearch = _context.FromQueryAsync(queryConfig, _dynamoOverwriteTableConfig); diff --git a/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_dispatched_messages_in_the_outbox.cs b/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_dispatched_messages_in_the_outbox.cs new file mode 100644 index 0000000000..d7e1f5af7b --- /dev/null +++ b/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_dispatched_messages_in_the_outbox.cs @@ -0,0 +1,118 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Paramore.Brighter.Outbox.DynamoDB; +using Xunit; + +namespace Paramore.Brighter.DynamoDB.Tests.Outbox; + +[Trait("Category", "DynamoDB")] +public class DynamoDbOutboxDispatchedMessageTests : DynamoDBOutboxBaseTest +{ + private readonly Message _message; + private readonly DynamoDbOutbox _dynamoDbOutbox; + + public DynamoDbOutboxDispatchedMessageTests() + { + _message = CreateMessage("test_topic"); + _dynamoDbOutbox = new DynamoDbOutbox(Client, new DynamoDbConfiguration(OutboxTableName)); + } + + [Fact] + public async Task When_there_are_dispatched_messages_in_the_outbox_async() + { + await _dynamoDbOutbox.AddAsync(_message); + await _dynamoDbOutbox.MarkDispatchedAsync(_message.Id); + + var args = new Dictionary { { "Topic", "test_topic" } }; + + var messages = await _dynamoDbOutbox.DispatchedMessagesAsync(0, 100, 1, args: args); + + //Other tests may leave messages, so make sure that we grab ours + var message = messages.Single(m => m.Id == _message.Id); + message.Should().NotBeNull(); + message.Body.Value.Should().Be(_message.Body.Value); + } + + [Fact] + public async Task When_there_are_dispatched_messages_in_the_outbox() + { + _dynamoDbOutbox.Add(_message); + _dynamoDbOutbox.MarkDispatched(_message.Id); + + await Task.Delay(1000); + + var args = new Dictionary { { "Topic", "test_topic" } }; + + var messages = _dynamoDbOutbox.DispatchedMessages(0, 100, 1, args: args); + + //Other tests may leave messages, so make sure that we grab ours + var message = messages.Single(m => m.Id == _message.Id); + message.Should().NotBeNull(); + message.Body.Value.Should().Be(_message.Body.Value); + } + + [Fact] + public async Task When_there_are_dispatched_messages_for_multiple_topics_async() + { + var messages = new List(); + messages.Add(CreateMessage("one_topic")); + messages.Add(CreateMessage("another_topic")); + + foreach (var message in messages) + { + await _dynamoDbOutbox.AddAsync(message); + await _dynamoDbOutbox.MarkDispatchedAsync(message.Id); + } + + await Task.Delay(1000); + + var dispatchedMessages = await _dynamoDbOutbox.DispatchedMessagesAsync(0, 100, 1); + + //Other tests may leave messages, so make sure that we grab ours + foreach (var message in messages) + { + var dispatchedMessage = dispatchedMessages.Single(m => m.Id == message.Id); + dispatchedMessage.Should().NotBeNull(); + dispatchedMessage.Body.Value.Should().Be(message.Body.Value); + dispatchedMessage.Header.Topic.Should().Be(message.Header.Topic); + } + } + + [Fact] + public async Task When_there_are_dispatched_messages_for_multiple_topics() + { + var messages = new List(); + messages.Add(CreateMessage("one_topic")); + messages.Add(CreateMessage("another_topic")); + + foreach (var message in messages) + { + _dynamoDbOutbox.Add(message); + _dynamoDbOutbox.MarkDispatched(message.Id); + } + + await Task.Delay(1000); + + var dispatchedMessages = _dynamoDbOutbox.DispatchedMessages(0, 100, 1); + + //Other tests may leave messages, so make sure that we grab ours + foreach (var message in messages) + { + var dispatchedMessage = dispatchedMessages.Single(m => m.Id == message.Id); + dispatchedMessage.Should().NotBeNull(); + dispatchedMessage.Body.Value.Should().Be(message.Body.Value); + dispatchedMessage.Header.Topic.Should().Be(message.Header.Topic); + } + } + + private Message CreateMessage(string topic) + { + return new Message( + new MessageHeader(Guid.NewGuid(), topic, MessageType.MT_DOCUMENT), + new MessageBody("message body") + ); + } +} diff --git a/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs b/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs index cc1463248b..d8bc3e0c9e 100644 --- a/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs +++ b/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs @@ -54,4 +54,64 @@ public async Task When_there_are_outstanding_messages_in_the_outbox() message.Should().NotBeNull(); message.Body.Should().Be(_message.Body); } + + [Fact] + public async Task When_there_are_outstanding_messages_for_multiple_topics_async() + { + var messages = new List(); + messages.Add(CreateMessage("one_topic")); + messages.Add(CreateMessage("another_topic")); + + foreach (var message in messages) + { + await _dynamoDbOutbox.AddAsync(message); + } + + await Task.Delay(1000); + + var outstandingMessages = await _dynamoDbOutbox.OutstandingMessagesAsync(0, 100, 1); + + //Other tests may leave messages, so make sure that we grab ours + foreach (var message in messages) + { + var outstandingMessage = outstandingMessages.Single(m => m.Id == message.Id); + outstandingMessage.Should().NotBeNull(); + outstandingMessage.Body.Value.Should().Be(message.Body.Value); + outstandingMessage.Header.Topic.Should().Be(message.Header.Topic); + } + } + + [Fact] + public async Task When_there_are_outstanding_messages_for_multiple_topics() + { + var messages = new List(); + messages.Add(CreateMessage("one_topic")); + messages.Add(CreateMessage("another_topic")); + + foreach (var message in messages) + { + _dynamoDbOutbox.Add(message); + } + + await Task.Delay(1000); + + var outstandingMessages = _dynamoDbOutbox.OutstandingMessages(0, 100, 1); + + //Other tests may leave messages, so make sure that we grab ours + foreach (var message in messages) + { + var outstandingMessage = outstandingMessages.Single(m => m.Id == message.Id); + outstandingMessage.Should().NotBeNull(); + outstandingMessage.Body.Value.Should().Be(message.Body.Value); + outstandingMessage.Header.Topic.Should().Be(message.Header.Topic); + } + } + + private Message CreateMessage(string topic) + { + return new Message( + new MessageHeader(Guid.NewGuid(), topic, MessageType.MT_DOCUMENT), + new MessageBody("message body") + ); + } } From aa262d949153cb77680d3569247b5307f24c8e84 Mon Sep 17 00:00:00 2001 From: Dominic Hickie Date: Thu, 4 Jul 2024 16:08:44 +0100 Subject: [PATCH 2/2] Fix null ref exception when marking messages as dispatched --- src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs index e505c11b23..4c0c4f8af8 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs @@ -265,7 +265,7 @@ public Task> GetAsync( public async Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default) { var message = await _context.LoadAsync(id.ToString(), _dynamoOverwriteTableConfig, cancellationToken); - MarkMessageDispatched(dispatchedAt, message); + MarkMessageDispatched(dispatchedAt ?? DateTime.UtcNow, message); await _context.SaveAsync( message, @@ -290,7 +290,7 @@ public async Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatche public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary args = null) { var message = _context.LoadAsync(id.ToString(), _dynamoOverwriteTableConfig).Result; - MarkMessageDispatched(dispatchedAt, message); + MarkMessageDispatched(dispatchedAt ?? DateTime.UtcNow, message); _context.SaveAsync( message,