Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport support for outstanding and dispatched messages across all topics to v9 #3183

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 144 additions & 92 deletions src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ THE SOFTWARE. */
#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using Amazon.DynamoDBv2;
Expand All @@ -46,6 +46,8 @@ public class DynamoDbOutbox :
private readonly DynamoDBOperationConfig _dynamoOverwriteTableConfig;
private readonly Random _random = new Random();

private readonly ConcurrentDictionary<string, byte> _topicNames;

public bool ContinueOnCapturedContext { get; set; }

/// <summary>
Expand All @@ -67,6 +69,8 @@ public DynamoDbOutbox(IAmazonDynamoDB client, DynamoDbConfiguration configuratio
{
throw new ArgumentOutOfRangeException(nameof(DynamoDbConfiguration.NumberOfShards), "Maximum number of shards is 20");
}

_topicNames = new ConcurrentDictionary<string, byte>();
}

/// <summary>
Expand All @@ -84,6 +88,8 @@ public DynamoDbOutbox(DynamoDBContext context, DynamoDbConfiguration configurati
{
throw new ArgumentOutOfRangeException(nameof(DynamoDbConfiguration.NumberOfShards), "Maximum number of shards is 20");
}

_topicNames = new ConcurrentDictionary<string, byte>();
}

/// <inheritdoc />
Expand All @@ -110,6 +116,9 @@ public async Task AddAsync(Message message, int outBoxTimeout = -1, Cancellation
var expiresAt = GetExpirationTime();
var messageToStore = new MessageItem(message, shard, expiresAt);

// Store the name of the topic as a key in a concurrent dictionary to ensure uniqueness & thread safety
_topicNames.TryAdd(message.Header.Topic, 0);

if (transactionConnectionProvider != null)
{
await AddToTransactionWrite(messageToStore, (DynamoDbUnitOfWork)transactionConnectionProvider);
Expand All @@ -120,8 +129,8 @@ public async Task AddAsync(Message message, int outBoxTimeout = -1, Cancellation
}
}

/// <summary>
/// Returns messages that have been successfully dispatched. Eventually consistent.
/// <summary>
/// Returns messages that have been successfully dispatched. Eventually consistent.
/// </summary>
/// <param name="millisecondsDispatchedSince">How long ago was the message dispatched?</param>
/// <param name="pageSize">How many messages returned at once?</param>
Expand All @@ -136,30 +145,46 @@ public IEnumerable<Message> DispatchedMessages(
int outboxTimeout = -1,
Dictionary<string, object> args = null)
{
if (args == null)
return DispatchedMessagesAsync(millisecondsDispatchedSince, pageSize, pageNumber, outboxTimeout, args).GetAwaiter().GetResult();
}

/// <summary>
/// Get the messages that have been dispatched
/// </summary>
/// <param name="hoursDispatchedSince">The number of hours since the message was dispatched</param>
/// <param name="pageSize">The amount to return</param>
/// <param name="cancellationToken">The Cancellation Token</param>
/// <returns>Messages that have already been dispatched</returns>
public async Task<IEnumerable<Message>> DispatchedMessagesAsync(int hoursDispatchedSince, int pageSize = 100,
CancellationToken cancellationToken = default)
{
var milliseconds = TimeSpan.FromHours(hoursDispatchedSince).TotalMilliseconds;
return await DispatchedMessagesAsync(milliseconds, pageSize, cancellationToken: cancellationToken);
}

/// <summary>
/// Retrieves messages that have been sent within the window
/// </summary>
/// <param name="millisecondsDispatchedSince"></param>
/// <param name="pageSize">The number of messages to fetch.</param>
/// <param name="pageNumber">The page number.</param>
/// <param name="outboxTimeout">Timeout of sql call.</param>
/// <param name="args">Additional parameters required for search, if any</param>
/// <param name="cancellationToken">The Cancellation Token</param>
/// <returns>List of messages that need to be dispatched.</returns>
public async Task<IEnumerable<Message>> DispatchedMessagesAsync(double millisecondsDispatchedSince, int pageSize = 100, int pageNumber = 1,
int outboxTimeout = -1, Dictionary<string, object> args = null, CancellationToken cancellationToken = default)
{
if (args == null || !args.ContainsKey("Topic"))
{
throw new ArgumentException("Missing required argument", nameof(args));
return await DispatchedMessagesForAllTopicsAsync(millisecondsDispatchedSince, cancellationToken);
}

var sinceTime = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));
var topic = (string)args["Topic"];

//in theory this is all values on that index that have a Delivered data (sparse index)
//we just need to filter for ones in the right date range
//As it is a GSI it can't use a consistent read
var queryConfig = new QueryOperationConfig
{
IndexName = _configuration.DeliveredIndexName,
KeyExpression = new KeyTopicDeliveredTimeExpression().Generate(topic, sinceTime),
ConsistentRead = false
};

//block async to make this sync
var messages = PageAllMessagesAsync(queryConfig).Result.ToList();
return messages.Select(msg => msg.ConvertToMessage());
var topic = (string)args["Topic"];
return await DispatchedMessagesForTopicAsync(millisecondsDispatchedSince, topic, cancellationToken);
}

/// <inheritdoc />
/// <inheritdoc />
/// <summary>
/// Finds a command with the specified identifier.
/// </summary>
Expand Down Expand Up @@ -257,32 +282,6 @@ public async Task MarkDispatchedAsync(IEnumerable<Guid> ids, DateTime? dispatche
}
}

public async Task<IEnumerable<Message>> DispatchedMessagesAsync(double millisecondsDispatchedSince, int pageSize = 100, int pageNumber = 1,
int outboxTimeout = -1, Dictionary<string, object> args = null, CancellationToken cancellationToken = default)
{
if (args == null)
{
throw new ArgumentException("Missing required argument", nameof(args));
}

var sinceTime = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));
var topic = (string)args["Topic"];

//in theory this is all values on that index that have a Delivered data (sparse index)
//we just need to filter for ones in the right date range
//As it is a GSI it can't use a consistent read
var queryConfig = new QueryOperationConfig
{
IndexName = _configuration.DeliveredIndexName,
KeyExpression = new KeyTopicDeliveredTimeExpression().Generate(topic, sinceTime),
ConsistentRead = false
};

//block async to make this sync
var messages = await PageAllMessagesAsync(queryConfig, cancellationToken);
return messages.Select(msg => msg.ConvertToMessage());
}

/// <summary>
/// Update a message to show it is dispatched
/// </summary>
Expand Down Expand Up @@ -314,24 +313,12 @@ private static void MarkMessageDispatched(DateTime? dispatchedAt, MessageItem me
/// <param name="pageNumber">Which page number of messages</param>
/// <returns>A list of messages that are outstanding for dispatch</returns>
public IEnumerable<Message> OutstandingMessages(
double millisecondsDispatchedSince,
int pageSize = 100,
int pageNumber = 1,
Dictionary<string, object> args = null)
double millisecondsDispatchedSince,
int pageSize = 100,
int pageNumber = 1,
Dictionary<string, object> args = null)
{
var now = DateTime.UtcNow;

if (args == null)
{
throw new ArgumentException("Missing required argument", nameof(args));
}

var dispatchedTime = now.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));
var topic = (string)args["Topic"];

//block async to make this sync
var messages = QueryAllOutstandingShardsAsync(topic, dispatchedTime).Result.ToList();
return messages.Select(msg => msg.ConvertToMessage());
return OutstandingMessagesAsync(millisecondsDispatchedSince, pageSize, pageNumber, args).GetAwaiter().GetResult();
}

/// <summary>
Expand All @@ -349,24 +336,19 @@ public async Task<IEnumerable<Message>> OutstandingMessagesAsync(
Dictionary<string, object> args = null,
CancellationToken cancellationToken = default)
{
var now = DateTime.UtcNow;

if (args == null)
if (args == null || !args.ContainsKey("Topic"))
{
throw new ArgumentException("Missing required argument", nameof(args));
return await OutstandingMessagesForAllTopicsAsync(millisecondsDispatchedSince, cancellationToken);
}

var minimumAge = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));
var topic = (string)args["Topic"];

//block async to make this sync
var messages = (await QueryAllOutstandingShardsAsync(topic, minimumAge, cancellationToken)).ToList();
return messages.Select(msg => msg.ConvertToMessage());
var topic = args["Topic"].ToString();
return await OutstandingMessagesForTopicAsync(millisecondsDispatchedSince, topic, cancellationToken);
}

public Task<int> GetNumberOfOutstandingMessagesAsync(CancellationToken cancellationToken)
public async Task<int> GetNumberOfOutstandingMessagesAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
var messages = await OutstandingMessagesAsync(0, cancellationToken: cancellationToken);
return messages.Count();
}

/// <summary>
Expand All @@ -391,29 +373,99 @@ public async Task DeleteAsync(Guid[] messageIds, CancellationToken cancellationT
}
}

public Task<IEnumerable<Message>> DispatchedMessagesAsync(int hoursDispatchedSince, int pageSize = 100,
CancellationToken cancellationToken = default)
private Task<TransactWriteItemsRequest> AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork)
{
throw new NotImplementedException();
var tcs = new TaskCompletionSource<TransactWriteItemsRequest>();
var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap();

var transaction = dynamoDbUnitOfWork.BeginOrGetTransaction();
transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}});
tcs.SetResult(transaction);
return tcs.Task;
}

private Task<TransactWriteItemsRequest> AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork)
{
var tcs = new TaskCompletionSource<TransactWriteItemsRequest>();
var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap();

var transaction = dynamoDbUnitOfWork.BeginOrGetTransaction();
transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}});
tcs.SetResult(transaction);
return tcs.Task;
}

private async Task<Message> GetMessage(Guid id, CancellationToken cancellationToken = default)
{
MessageItem messageItem = await _context.LoadAsync<MessageItem>(id.ToString(), _dynamoOverwriteTableConfig, cancellationToken);
return messageItem?.ConvertToMessage() ?? new Message();
}


private async Task<IEnumerable<Message>> DispatchedMessagesForAllTopicsAsync(
double millisecondsDispatchedSince,
CancellationToken cancellationToken)
{
var sinceTime = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));

// Get the list of topic names we need to query over
var topics = _topicNames.Keys.ToList();

// Iterate over topics until all messages are retrieved
var messages = new List<MessageItem>();
foreach (var topic in topics)
{
//in theory this is all values on that index that have a Delivered data (sparse index)
//we just need to filter for ones in the right date range
//As it is a GSI it can't use a consistent read
var queryConfig = new QueryOperationConfig
{
IndexName = _configuration.DeliveredIndexName,
KeyExpression = new KeyTopicDeliveredTimeExpression().Generate(topic, sinceTime),
ConsistentRead = false
};

messages.AddRange(await PageAllMessagesAsync(queryConfig, cancellationToken));
}

return messages.Select(msg => msg.ConvertToMessage());
}

private async Task<IEnumerable<Message>> DispatchedMessagesForTopicAsync(
double millisecondsDispatchedSince,
string topicName,
CancellationToken cancellationToken)
{
var sinceTime = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));

//in theory this is all values on that index that have a Delivered data (sparse index)
//we just need to filter for ones in the right date range
//As it is a GSI it can't use a consistent read
var queryConfig = new QueryOperationConfig
{
IndexName = _configuration.DeliveredIndexName,
KeyExpression = new KeyTopicDeliveredTimeExpression().Generate(topicName, sinceTime),
ConsistentRead = false
};

var messages = await PageAllMessagesAsync(queryConfig, cancellationToken);
return messages.Select(msg => msg.ConvertToMessage());
}

private async Task<IEnumerable<Message>> OutstandingMessagesForAllTopicsAsync(double millisecondsDispatchedSince, CancellationToken cancellationToken)
{
var olderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));

// Get the list of topic names we need to query over
var topics = _topicNames.Keys.ToList();

// Iterate over topics and their associated shards until all messages are retrieved
var results = new List<MessageItem>();
foreach (var topic in topics)
{
results.AddRange(await QueryAllOutstandingShardsAsync(topic, olderThan, cancellationToken));
}

return results.Select(msg => msg.ConvertToMessage());
}

private async Task<IEnumerable<Message>> OutstandingMessagesForTopicAsync(double millisecondsDispatchedSince,
string topicName, CancellationToken cancellationToken)
{
var olrderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));

var messages = (await QueryAllOutstandingShardsAsync(topicName, olrderThan, cancellationToken)).ToList();
return messages.Select(msg => msg.ConvertToMessage());
}

private async Task<IEnumerable<MessageItem>> PageAllMessagesAsync(QueryOperationConfig queryConfig, CancellationToken cancellationToken = default)
{
var asyncSearch = _context.FromQueryAsync<MessageItem>(queryConfig, _dynamoOverwriteTableConfig);
Expand Down
Loading
Loading