diff --git a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs index eb201831c7..78b624b1d2 100644 --- a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs +++ b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs @@ -40,14 +40,17 @@ private void DoWork(object state) IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService(); var outBoxSweeper = new OutboxSweeper( - milliSecondsSinceSent:_options.MinimumMessageAge, - outbox:outbox, - commandProcessor:commandProcessor); + milliSecondsSinceSent: _options.MinimumMessageAge, + outbox: outbox, + commandProcessor: commandProcessor, + _options.UseBulk); + + if(_options.UseBulk) + outBoxSweeper.SweepAsync(CancellationToken.None).RunSynchronously(); + else + outBoxSweeper.Sweep(); - outBoxSweeper.Sweep(); - s_logger.LogInformation("Outbox Sweeper sleeping"); - } public Task StopAsync(CancellationToken cancellationToken) diff --git a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeperOptions.cs b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeperOptions.cs index 16b79e3161..dd61d8900a 100644 --- a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeperOptions.cs +++ b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeperOptions.cs @@ -13,5 +13,10 @@ public class TimedOutboxSweeperOptions /// The age a message to pickup by the sweeper in milliseconds. /// public int MinimumMessageAge { get; set; } = 5000; + + /// + /// Use bulk operations to dispatch messages. + /// + public bool UseBulk { get; set; } = false; } } diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs index be70ae20f5..2e3f163325 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessageProducer.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using Microsoft.Extensions.Logging; using Paramore.Brighter.Logging; @@ -14,7 +16,7 @@ namespace Paramore.Brighter.MessagingGateway.AzureServiceBus /// /// A Sync and Async Message Producer for Azure Service Bus. /// - public class AzureServiceBusMessageProducer : IAmAMessageProducerSync, IAmAMessageProducerAsync + public class AzureServiceBusMessageProducer : IAmAMessageProducerSync, IAmAMessageProducerAsync, IAmABulkMessageProducerAsync { public int MaxOutStandingMessages { get; set; } = -1; public int MaxOutStandingCheckIntervalMilliSeconds { get; set; } = 0; @@ -52,6 +54,53 @@ public async Task SendAsync(Message message) await SendWithDelayAsync(message); } + /// + /// Sends a Batch of Messages + /// + /// The messages to send. + /// The size of batches to send messages in. + /// The Cancellation Token. + /// List of Messages successfully sent. + /// + public async IAsyncEnumerable SendAsync(IEnumerable messages, int batchSize, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + var topics = messages.Select(m => m.Header.Topic).Distinct(); + if (topics.Count() != 1) + { + s_logger.LogError("Cannot Bulk send for Multiple Topics, {NumberOfTopics} Topics Requested", topics.Count()); + throw new Exception($"Cannot Bulk send for Multiple Topics, {topics.Count()} Topics Requested"); + } + var topic = topics.Single(); + + var batches = Enumerable.Range(0, (int)Math.Ceiling((messages.Count() / (decimal)batchSize))) + .Select(i => new List(messages + .Skip(i * batchSize) + .Take(batchSize) + .ToArray())); + + var serviceBusSenderWrapper = GetSender(topic); + + s_logger.LogInformation("Sending Messages for {TopicName} split into {NumberOfBatches} Batches of {BatchSize}", topic, batches.Count(), batchSize); + try + { + foreach (var batch in batches) + { + var asbMessages = batch.Select(ConvertToServiceBusMessage).ToArray(); + + s_logger.LogDebug("Publishing {NumberOfMessages} messages to topic {Topic}.", + asbMessages.Length, topic); + + await serviceBusSenderWrapper.SendAsync(asbMessages, cancellationToken); + yield return batch.Select(m => m.Id).ToArray(); + } + } + finally + { + await serviceBusSenderWrapper.CloseAsync(); + } + } + /// /// Send the specified message with specified delay /// @@ -71,29 +120,7 @@ public async Task SendWithDelayAsync(Message message, int delayMilliseconds = 0) { s_logger.LogDebug("Preparing to send message on topic {Topic}", message.Header.Topic); - EnsureTopicExists(message.Header.Topic); - - IServiceBusSenderWrapper serviceBusSenderWrapper; - - try - { - RetryPolicy policy = Policy - .Handle() - .Retry(TopicConnectionRetryCount, (exception, retryNumber) => - { - s_logger.LogError(exception, "Failed to connect to topic {Topic}, retrying...", message.Header.Topic); - - Thread.Sleep(TimeSpan.FromMilliseconds(TopicConnectionSleepBetweenRetriesInMilliseconds)); - } - ); - - serviceBusSenderWrapper = policy.Execute(() => _serviceBusSenderProvider.Get(message.Header.Topic)); - } - catch (Exception e) - { - s_logger.LogError(e, "Failed to connect to topic {Topic}, aborting.", message.Header.Topic); - throw; - } + var serviceBusSenderWrapper = GetSender(message.Header.Topic); try { @@ -101,16 +128,7 @@ public async Task SendWithDelayAsync(Message message, int delayMilliseconds = 0) "Publishing message to topic {Topic} with a delay of {Delay} and body {Request} and id {Id}.", message.Header.Topic, delayMilliseconds, message.Body.Value, message.Id); - var azureServiceBusMessage = new ServiceBusMessage(message.Body.Bytes); - azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.MessageTypeHeaderBagKey, message.Header.MessageType.ToString()); - azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.HandledCountHeaderBagKey, message.Header.HandledCount); - foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key))) - { - azureServiceBusMessage.ApplicationProperties.Add(header.Key, header.Value); - } - azureServiceBusMessage.CorrelationId = message.Header.CorrelationId.ToString(); - azureServiceBusMessage.ContentType = message.Header.ContentType; - azureServiceBusMessage.MessageId = message.Header.Id.ToString(); + var azureServiceBusMessage = ConvertToServiceBusMessage(message); if (delayMilliseconds == 0) { await serviceBusSenderWrapper.SendAsync(azureServiceBusMessage); @@ -140,6 +158,48 @@ public void Dispose() { } + private IServiceBusSenderWrapper GetSender(string topic) + { + EnsureTopicExists(topic); + + try + { + RetryPolicy policy = Policy + .Handle() + .Retry(TopicConnectionRetryCount, (exception, retryNumber) => + { + s_logger.LogError(exception, "Failed to connect to topic {Topic}, retrying...", + topic); + + Thread.Sleep(TimeSpan.FromMilliseconds(TopicConnectionSleepBetweenRetriesInMilliseconds)); + } + ); + + return policy.Execute(() => _serviceBusSenderProvider.Get(topic)); + } + catch (Exception e) + { + s_logger.LogError(e, "Failed to connect to topic {Topic}, aborting.", topic); + throw; + } + } + + private ServiceBusMessage ConvertToServiceBusMessage(Message message) + { + var azureServiceBusMessage = new ServiceBusMessage(message.Body.Bytes); + azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.MessageTypeHeaderBagKey, message.Header.MessageType.ToString()); + azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.HandledCountHeaderBagKey, message.Header.HandledCount); + foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key))) + { + azureServiceBusMessage.ApplicationProperties.Add(header.Key, header.Value); + } + azureServiceBusMessage.CorrelationId = message.Header.CorrelationId.ToString(); + azureServiceBusMessage.ContentType = message.Header.ContentType; + azureServiceBusMessage.MessageId = message.Header.Id.ToString(); + + return azureServiceBusMessage; + } + private void EnsureTopicExists(string topic) { if (_topicCreated || _makeChannel.Equals(OnMissingChannel.Assume)) diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IServiceBusSenderWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IServiceBusSenderWrapper.cs index 6336934a72..062f52e38d 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IServiceBusSenderWrapper.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IServiceBusSenderWrapper.cs @@ -35,6 +35,13 @@ public interface IServiceBusSenderWrapper /// Cancellation Token. Task SendAsync(ServiceBusMessage message, CancellationToken cancellationToken = default(CancellationToken)); + /// + /// Send Messages + /// + /// The messages to send. + /// Cancellation Token. + Task SendAsync(ServiceBusMessage[] messages, CancellationToken cancellationToken = default(CancellationToken)); + /// /// Schedule a message to be sent. /// diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ServiceBusSenderWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ServiceBusSenderWrapper.cs index 22ab868421..c4eecf0329 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ServiceBusSenderWrapper.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ServiceBusSenderWrapper.cs @@ -48,6 +48,11 @@ public async Task SendAsync(ServiceBusMessage message, } } + public Task SendAsync(ServiceBusMessage[] messages, CancellationToken cancellationToken = default(CancellationToken)) + { + return _serviceBusSender.SendMessagesAsync(messages, cancellationToken); + } + public void ScheduleMessage(ServiceBusMessage message, DateTimeOffset scheduleEnqueueTime) { _serviceBusSender.ScheduleMessageAsync(message, scheduleEnqueueTime).Wait(); diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/Paramore.Brighter.MessagingGateway.AzureServiceBus.csproj b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/Paramore.Brighter.MessagingGateway.AzureServiceBus.csproj index c5a8cbb17c..6ba8906cce 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/Paramore.Brighter.MessagingGateway.AzureServiceBus.csproj +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/Paramore.Brighter.MessagingGateway.AzureServiceBus.csproj @@ -4,6 +4,7 @@ Yiannis Triantafyllopoulos netstandard2.0 awssqs;AMQP;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability + 8.0 diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutboxSync.cs b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutboxSync.cs index 670c9022b8..77046b6f55 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutboxSync.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutboxSync.cs @@ -153,6 +153,18 @@ public Message Get(Guid messageId, int outBoxTimeout = -1) .ConfigureAwait(ContinueOnCapturedContext); } + public async Task> GetAsync(IEnumerable messageIds, int outBoxTimeout = -1, + CancellationToken cancellationToken = default(CancellationToken)) + { + var messages = new List(); + foreach (var messageId in messageIds) + { + messages.Add(await GetAsync(messageId, -1, cancellationToken)); + } + + return messages; + } + /// /// Get paginated list of Messages. /// @@ -200,7 +212,16 @@ await _context.SaveAsync( new DynamoDBOperationConfig{OverrideTableName = _configuration.TableName}, cancellationToken); } - + + public async Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, + CancellationToken cancellationToken = default(CancellationToken)) + { + foreach(var messageId in ids) + { + await MarkDispatchedAsync(messageId, dispatchedAt, args, cancellationToken); + } + } + /// /// Update a message to show it is dispatched /// diff --git a/src/Paramore.Brighter.Outbox.EventStore/EventStoreOutboxSync.cs b/src/Paramore.Brighter.Outbox.EventStore/EventStoreOutboxSync.cs index 7faf8825cf..36094db446 100644 --- a/src/Paramore.Brighter.Outbox.EventStore/EventStoreOutboxSync.cs +++ b/src/Paramore.Brighter.Outbox.EventStore/EventStoreOutboxSync.cs @@ -201,6 +201,12 @@ public Task GetAsync( throw new NotImplementedException(); } + public Task> GetAsync(IEnumerable messageIds, int outBoxTimeout = -1, + CancellationToken cancellationToken = default(CancellationToken)) + { + throw new NotImplementedException(); + } + /// /// Returns multiple events from a given stream. /// If all the events do not exist, as many as can be found will be returned. @@ -258,6 +264,12 @@ public async Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Di await _eventStore.AppendToStreamAsync(stream, nextEventNumber.Value, eventData); } + public Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, + CancellationToken cancellationToken = default(CancellationToken)) + { + throw new NotImplementedException(); + } + /// /// Update a message to show it is dispatched /// diff --git a/src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs b/src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs index 35e9c6750a..7edeab2374 100644 --- a/src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs +++ b/src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs @@ -26,6 +26,7 @@ THE SOFTWARE. */ using System; using System.Collections.Generic; using System.Data; +using System.Linq; using Microsoft.Data.SqlClient; using System.Text.Json; using System.Threading; @@ -257,14 +258,52 @@ public Message Get(Guid messageId, int outBoxTimeout = -1) .ConfigureAwait(ContinueOnCapturedContext); } + /// + /// Returns messages specified by the Ids + /// + /// The Timeout of the outbox. + /// Cancellation Token. + /// The Ids of the messages + /// + public async Task> GetAsync(IEnumerable messageIds, int outBoxTimeout = -1, + CancellationToken cancellationToken = default(CancellationToken)) + { + var connection = await _connectionProvider.GetConnectionAsync(cancellationToken); + using (var command = connection.CreateCommand()) + { + CreateListOfMessagesCommand(command, messageIds.ToList()); + + if (connection.State != ConnectionState.Open) + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + + if (_connectionProvider.HasOpenTransaction) + command.Transaction = _connectionProvider.GetTransaction(); + var dbDataReader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + + var messages = new List(); + while (await dbDataReader.ReadAsync(cancellationToken)) + { + messages.Add(MapAMessage(dbDataReader)); + } + dbDataReader.Close(); + + if (!_connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!_connectionProvider.HasOpenTransaction) + connection.Close(); + + return messages; + } + } + /// /// Returns all messages in the store /// /// Number of messages to return in search results (default = 100) /// Page number of results to return (default = 1) /// Additional parameters required for search, if any - /// A list of messages - public IList Get(int pageSize = 100, int pageNumber = 1, Dictionary args = null) + /// A list of messages + public IList Get(int pageSize = 100, int pageNumber = 1, Dictionary args = null) { var connection = _connectionProvider.GetConnection(); using (var command = connection.CreateCommand()) @@ -327,14 +366,14 @@ public async Task> GetAsync( return messages; } } - + /// /// Update a message to show it is dispatched /// /// The id of the message to update /// When was the message dispatched, defaults to UTC now /// Allows the sender to cancel the request pipeline. Optional - + public async Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default) { var connection = await _connectionProvider.GetConnectionAsync(cancellationToken); @@ -348,7 +387,32 @@ public async Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Di if(!_connectionProvider.IsSharedConnection) connection.Dispose(); else if (!_connectionProvider.HasOpenTransaction) connection.Close(); } - + + /// + /// Update messages to show it is dispatched + /// + /// The ids of the messages to update + /// When was the message dispatched, defaults to UTC now + /// Allows the sender to cancel the request pipeline. Optional + public async Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, + CancellationToken cancellationToken = default(CancellationToken)) + { + var connection = await _connectionProvider.GetConnectionAsync(cancellationToken); + + if (connection.State != ConnectionState.Open) + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + using (var command = InitMarkDispatchedCommand(connection, ids, dispatchedAt)) + { + if (_connectionProvider.HasOpenTransaction) + command.Transaction = _connectionProvider.GetTransaction(); + await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + } + if (!_connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!_connectionProvider.HasOpenTransaction) + connection.Close(); + } + /// /// Update a message to show it is dispatched /// @@ -488,12 +552,19 @@ private void CreatePagedOutstandingCommand(SqlCommand command, double milliSecon command.Parameters.AddRange(parameters); } + private void CreateListOfMessagesCommand(SqlCommand command, List messageIds) + { + var inClause = GenerateInClauseAndAddParameters(command, messageIds.ToList()); + var sql = $"SELECT * FROM {_configuration.OutBoxTableName} WHERE MessageId IN ( {inClause} )"; + + command.CommandText = sql; + } + //Fold this code back in as there is only one choice private SqlParameter CreateSqlParameter(string parameterName, object value) { return new SqlParameter(parameterName, value ?? DBNull.Value); - } private T ExecuteCommand(Func execute, string sql, int outboxTimeout, params SqlParameter[] parameters) @@ -578,7 +649,30 @@ private SqlCommand InitMarkDispatchedCommand(SqlConnection connection, Guid mess command.Parameters.Add(CreateSqlParameter("DispatchedAt", dispatchedAt)); return command; } - + private SqlCommand InitMarkDispatchedCommand(SqlConnection connection, IEnumerable messageIds, DateTime? dispatchedAt) + { + var command = connection.CreateCommand(); + var inClause = GenerateInClauseAndAddParameters(command, messageIds.ToList()); + var sql = $"UPDATE {_configuration.OutBoxTableName} SET Dispatched = @DispatchedAt WHERE MessageId in ( {inClause} )"; + + command.CommandText = sql; + command.Parameters.Add(CreateSqlParameter("DispatchedAt", dispatchedAt)); + + return command; + } + + private string GenerateInClauseAndAddParameters(SqlCommand command, List messageIds) + { + var paramNames = messageIds.Select((s, i) => "@p" + i).ToArray(); + + for (int i = 0; i < paramNames.Count(); i++) + { + command.Parameters.Add(CreateSqlParameter(paramNames[i], messageIds[i])); + } + + return string.Join(",", paramNames); + } + private Message MapAMessage(SqlDataReader dr) { var id = GetMessageId(dr); @@ -704,5 +798,5 @@ private async Task MapFunctionAsync(SqlDataReader dr) return message ?? new Message(); } - } + } } diff --git a/src/Paramore.Brighter.Outbox.MySql/MySqlOutboxSync.cs b/src/Paramore.Brighter.Outbox.MySql/MySqlOutboxSync.cs index efe838eb93..c48449ce37 100644 --- a/src/Paramore.Brighter.Outbox.MySql/MySqlOutboxSync.cs +++ b/src/Paramore.Brighter.Outbox.MySql/MySqlOutboxSync.cs @@ -233,6 +233,31 @@ public async Task GetAsync( .ConfigureAwait(ContinueOnCapturedContext); } + public async Task> GetAsync(IEnumerable messageIds, int outBoxTimeout = -1, + CancellationToken cancellationToken = default(CancellationToken)) + { + var connection = _connectionProvider.GetConnection(); + + using (var command = connection.CreateCommand()) + { + CreateBulkReadCommand(command, messageIds); + + if (connection.State != ConnectionState.Open) + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + + var messages = new List(); + using (var dbDataReader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext)) + { + while (await dbDataReader.ReadAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext)) + { + messages.Add(MapAMessage(dbDataReader)); + } + } + + return messages; + } + } + /// /// Returns all messages in the store @@ -305,12 +330,19 @@ public async Task> GetAsync( /// The id of the message to update /// When was the message dispatched, defaults to UTC now /// Allows the sender to cancel the request pipeline. Optional - public async Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictionary args = null, + public Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default) + { + return MarkDispatchedAsync(new[] {id}, dispatchedAt, args, cancellationToken); + } + + public async Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, + CancellationToken cancellationToken = default(CancellationToken)) { var connection = _connectionProvider.GetConnection(); - if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - using (var command = InitMarkDispatchedCommand(connection, id, dispatchedAt)) + if (connection.State != ConnectionState.Open) + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + using (var command = InitMarkDispatchedCommand(connection, ids, dispatchedAt)) { await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); } @@ -325,7 +357,7 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary messageIds) + { + var parameters = new[] { CreateSqlParameter("@MessageIds", string.Join(",", messageIds)) }; + + var inClause = GenerateInClauseAndAddParameters(command, messageIds.ToList()); + + var sql = $"SELECT * FROM {_configuration.OutBoxTableName} WHERE `MessageID` IN ( {inClause} )"; + + command.CommandText = sql; + AddParamtersParamArrayToCollection(parameters, command); + } + private void CreatePagedOutstandingCommand(DbCommand command, double milliSecondsSinceAdded, int pageSize, int pageNumber) { var offset = (pageNumber - 1) * pageSize; @@ -512,16 +556,29 @@ private MySqlParameter[] InitAddDbParameters(Message message) }; } - private DbCommand InitMarkDispatchedCommand(DbConnection connection, Guid messageId, DateTime? dispatchedAt) + private DbCommand InitMarkDispatchedCommand(DbConnection connection, IEnumerable messageIds, DateTime? dispatchedAt) { var command = connection.CreateCommand(); - var sql = $"UPDATE {_configuration.OutBoxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @MessageId"; + var inClause = GenerateInClauseAndAddParameters(command, messageIds.ToList()); + var sql = $"UPDATE {_configuration.OutBoxTableName} SET Dispatched = @DispatchedAt WHERE MessageId IN ( {inClause} )"; + command.CommandText = sql; - command.Parameters.Add(CreateSqlParameter("@MessageId", messageId)); command.Parameters.Add(CreateSqlParameter("@DispatchedAt", dispatchedAt)); return command; } + private string GenerateInClauseAndAddParameters(DbCommand command, List messageIds) + { + var paramNames = messageIds.Select((s, i) => "@p" + i).ToArray(); + + for (int i = 0; i < paramNames.Count(); i++) + { + command.Parameters.Add(CreateSqlParameter(paramNames[i], messageIds[i])); + } + + return string.Join(",", paramNames); + } + private static bool IsExceptionUnqiueOrDuplicateIssue(MySqlException sqlException) { return sqlException.Number == MySqlDuplicateKeyError; diff --git a/src/Paramore.Brighter.Outbox.Sqlite/SqliteOutboxSync.cs b/src/Paramore.Brighter.Outbox.Sqlite/SqliteOutboxSync.cs index 9786bb0883..dc80737b3f 100644 --- a/src/Paramore.Brighter.Outbox.Sqlite/SqliteOutboxSync.cs +++ b/src/Paramore.Brighter.Outbox.Sqlite/SqliteOutboxSync.cs @@ -235,6 +235,35 @@ public Message Get(Guid messageId, int outBoxTimeout = -1) .ConfigureAwait(ContinueOnCapturedContext); } + public async Task> GetAsync(IEnumerable messageIds, int outBoxTimeout = -1, + CancellationToken cancellationToken = default(CancellationToken)) + { + var connection = _connectionProvider.GetConnection(); + using (var command = connection.CreateCommand()) + { + var inClause = GenerateInClauseAndAddParameters(command, messageIds.ToList()); + var sql = $"SELECT * FROM {_configuration.OutBoxTableName} WHERE MessageId IN ( {inClause} )"; + + command.CommandText = sql; + + if (connection.State != ConnectionState.Open) + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + + var messages = new List(); + using (var dbDataReader = await command.ExecuteReaderAsync(cancellationToken) + .ConfigureAwait(ContinueOnCapturedContext)) + { + while (await dbDataReader.ReadAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext)) + { + messages.Add(MapAMessage(dbDataReader)); + } + } + + ; + return messages; + } + } + /// /// Returns all messages in the outbox /// @@ -295,7 +324,6 @@ public async Task> GetAsync( } } - ; return messages; } } @@ -307,12 +335,20 @@ public async Task> GetAsync( /// The id of the message to update /// When was the message dispatched, defaults to UTC now /// Allows the sender to cancel the request pipeline. Optional - public async Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictionary args = null, + public Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default) + { + return MarkDispatchedAsync(new[] {id}, dispatchedAt, args, cancellationToken); + } + + public async Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, + Dictionary args = null, + CancellationToken cancellationToken = default(CancellationToken)) { var connection = _connectionProvider.GetConnection(); - if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - using (var command = InitMarkDispatchedCommand(connection, id, dispatchedAt)) + if (connection.State != ConnectionState.Open) + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + using (var command = InitMarkDispatchedCommand(connection, ids, dispatchedAt)) { await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); } @@ -327,7 +363,7 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary(Func execute, string sql, int outboxTimeout, + + private string GenerateInClauseAndAddParameters(SqliteCommand command, List messageIds) + { + var paramNames = messageIds.Select((s, i) => "@p" + i).ToArray(); + + for (int i = 0; i < paramNames.Count(); i++) + { + command.Parameters.Add(CreateSqlParameter(paramNames[i], messageIds[i])); + } + + return string.Join(",", paramNames); + } + + private T ExecuteCommand(Func execute, string sql, int outboxTimeout, params SqliteParameter[] parameters) { var connection = _connectionProvider.GetConnection(); @@ -513,12 +561,12 @@ private SqliteParameter[] InitAddDbParameters(Message message) }; } - private SqliteCommand InitMarkDispatchedCommand(SqliteConnection connection, Guid messageId, DateTime? dispatchedAt) + private SqliteCommand InitMarkDispatchedCommand(SqliteConnection connection, IEnumerable messageIds, DateTime? dispatchedAt) { var command = connection.CreateCommand(); - var sql = $"UPDATE {_configuration.OutBoxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @MessageId"; + var inClause = GenerateInClauseAndAddParameters(command, messageIds.ToList()); + var sql = $"UPDATE {_configuration.OutBoxTableName} SET Dispatched = @DispatchedAt WHERE MessageId IN ( {inClause} )"; command.CommandText = sql; - command.Parameters.Add(CreateSqlParameter("@MessageId", messageId.ToString())); command.Parameters.Add(CreateSqlParameter("@DispatchedAt", dispatchedAt.HasValue ? dispatchedAt.Value.ToString("s"): DateTime.UtcNow.ToString("s"))); return command; } diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index 7f5969eab3..17330700ad 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -531,6 +531,19 @@ public async Task ClearOutboxAsync( await _bus.ClearOutboxAsync(posts, continueOnCapturedContext, cancellationToken); } + /// + /// Flushes the message box message given by to the broker using bulk sending. + /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ + /// + /// The posts to flush + public async Task BulkClearOutboxAsync( + IEnumerable posts, + bool continueOnCapturedContext = false, + CancellationToken cancellationToken = default(CancellationToken)) + { + await _bus.BulkClearOutboxAsync(posts, continueOnCapturedContext, cancellationToken); + } + /// /// Uses the Request-Reply messaging approach to send a message to another server and block awaiting a reply. diff --git a/src/Paramore.Brighter/ExternalBusServices.cs b/src/Paramore.Brighter/ExternalBusServices.cs index 7ae464175e..2a6f5977a9 100644 --- a/src/Paramore.Brighter/ExternalBusServices.cs +++ b/src/Paramore.Brighter/ExternalBusServices.cs @@ -49,7 +49,6 @@ protected virtual void Dispose(bool disposing) if (_disposed) return; - if (disposing && ProducerRegistry != null) ProducerRegistry.CloseAll(); _disposed = true; @@ -205,6 +204,60 @@ await RetryAsync(async ct => await AsyncOutbox.MarkDispatchedAsync(messageId, Da } + internal async Task BulkClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, + CancellationToken cancellationToken = default(CancellationToken)) + { + if (!HasAsyncOutbox()) + throw new InvalidOperationException("No async outbox defined."); + + var messages = await AsyncOutbox.GetAsync(posts, OutboxTimeout, cancellationToken); + + s_logger.LogInformation( + "{RequestedNumberOfMessages} of {RetrievedNumberOfMessages} retrieved from the Outbox.", posts.Count(), + messages.Count()); + + await BulkClearOutboxAsync(messages, continueOnCapturedContext, cancellationToken); + } + + internal async Task BulkClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, + CancellationToken cancellationToken = default(CancellationToken)) + { + if (!HasAsyncOutbox()) + throw new InvalidOperationException("No async outbox defined."); + + CheckOutboxOutstandingLimit(); + + //Chunk into Topics + var messagesByTopic = posts.GroupBy(m => m.Header.Topic); + + foreach (var topicBatch in messagesByTopic) + { + var producer = ProducerRegistry.LookupBy(topicBatch.Key); + + if (producer is IAmABulkMessageProducerAsync bulkMessageProducer) + { + var messages = topicBatch.ToArray(); + s_logger.LogInformation("Bulk Dispatching {NumberOfMessages} for Topic {TopicName}", messages.Length, topicBatch.Key); + var dispatchesMessages = bulkMessageProducer.SendAsync(messages, 10, cancellationToken); + + await foreach (var successfulMessage in dispatchesMessages.WithCancellation(cancellationToken)) + { + if (!(producer is ISupportPublishConfirmation)) + { + await RetryAsync(async ct => await AsyncOutbox.MarkDispatchedAsync(successfulMessage, + DateTime.UtcNow, cancellationToken: cancellationToken), cancellationToken: cancellationToken); + } + } + } + else + { + throw new InvalidOperationException("No async bulk message producer defined."); + } + } + + CheckOutstandingMessages(); + } + internal bool ConfigureAsyncPublisherCallbackMaybe(IAmAMessageProducer producer) { if (producer is ISupportPublishConfirmation producerSync) diff --git a/src/Paramore.Brighter/IAmABulkMessageProducerAsync.cs b/src/Paramore.Brighter/IAmABulkMessageProducerAsync.cs new file mode 100644 index 0000000000..74f4f611c1 --- /dev/null +++ b/src/Paramore.Brighter/IAmABulkMessageProducerAsync.cs @@ -0,0 +1,53 @@ +#region Licence + +/* The MIT License (MIT) +Copyright © 2014 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Paramore.Brighter +{ + /// + /// Interface IAmABulkMessageProducerAsync + /// Abstracts away the Application Layer used to push messages with async/await support onto a Task Queue + /// Usually clients do not need to instantiate as access is via an derived class. + /// We provide the following default gateway applications + /// + /// AMQP + /// RESTML + /// + /// + public interface IAmABulkMessageProducerAsync : IAmAMessageProducer + { + /// + /// Sends the specified message. + /// + /// The messages. + /// The Number of Messages to dispatch at a time. + /// The Cancellation Token. + IAsyncEnumerable SendAsync(IEnumerable messages, int batchSize, CancellationToken cancellationToken); + } +} diff --git a/src/Paramore.Brighter/IAmACommandProcessor.cs b/src/Paramore.Brighter/IAmACommandProcessor.cs index 438c404198..e89c34c66b 100644 --- a/src/Paramore.Brighter/IAmACommandProcessor.cs +++ b/src/Paramore.Brighter/IAmACommandProcessor.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -126,6 +126,13 @@ public interface IAmACommandProcessor /// The posts to flush Task ClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default(CancellationToken)); + /// + /// Flushes the message box message given by to the broker using bulk sending. + /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ + /// + /// The posts to flush + Task BulkClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default(CancellationToken)); + /// /// Uses the Request-Reply messaging approach to send a message to another server and block awaiting a reply. /// The message is placed into a message queue but not into the outbox. diff --git a/src/Paramore.Brighter/IAmAnOutboxAsync.cs b/src/Paramore.Brighter/IAmAnOutboxAsync.cs index dd383886f4..10a97a8e8b 100644 --- a/src/Paramore.Brighter/IAmAnOutboxAsync.cs +++ b/src/Paramore.Brighter/IAmAnOutboxAsync.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2016 Ian Cooper @@ -66,12 +66,29 @@ public interface IAmAnOutboxAsync : IAmAnOutbox where T : Message /// . Task GetAsync(Guid messageId, int outBoxTimeout = -1, CancellationToken cancellationToken = default(CancellationToken)); + /// + /// Awaitable Get the messages. + /// + /// The message identifiers. + /// The time allowed for the read in milliseconds; on a -2 default + /// Allows the sender to cancel the request pipeline. Optional + /// . + Task> GetAsync(IEnumerable messageIds, int outBoxTimeout = -1, CancellationToken cancellationToken = default(CancellationToken)); + /// /// Update a message to show it is dispatched /// /// The id of the message to update /// When was the message dispatched, defaults to UTC now /// Allows the sender to cancel the request pipeline. Optional - Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default(CancellationToken)); - } + Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default(CancellationToken)); + + /// + /// Update messages to show it is dispatched + /// + /// The ids of the messages to update + /// When was the message dispatched, defaults to UTC now + /// Allows the sender to cancel the request pipeline. Optional + Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default(CancellationToken)); + } } diff --git a/src/Paramore.Brighter/InMemoryOutbox.cs b/src/Paramore.Brighter/InMemoryOutbox.cs index 862ccc0c28..19eaddbc31 100644 --- a/src/Paramore.Brighter/InMemoryOutbox.cs +++ b/src/Paramore.Brighter/InMemoryOutbox.cs @@ -218,7 +218,20 @@ public IList Get(int pageSize = 100, int pageNumber = 1, Dictionary + public Task> GetAsync(IEnumerable messageIds, int outBoxTimeout = -1, + CancellationToken cancellationToken = default(CancellationToken)) + { + var tcs = new TaskCompletionSource>(TaskCreationOptions.RunContinuationsAsynchronously); + ClearExpiredMessages(); + + var ids = messageIds.Select(m => m.ToString()).ToList(); + + tcs.SetResult(_requests.Values.Where(oe => ids.Contains(oe.Key)).Select(oe => oe.Message).ToList()); + + return tcs.Task; + } + + /// /// Mark the message as dispatched /// /// The message to mark as dispatched @@ -233,7 +246,13 @@ public IList Get(int pageSize = 100, int pageNumber = 1, Dictionary + public Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, + CancellationToken cancellationToken = default(CancellationToken)) + { + throw new NotImplementedException(); + } + + /// /// Mark the message as dispatched /// /// The message to mark as dispatched diff --git a/src/Paramore.Brighter/OutboxSweeper.cs b/src/Paramore.Brighter/OutboxSweeper.cs index 79c7bf97ab..86e73a9438 100644 --- a/src/Paramore.Brighter/OutboxSweeper.cs +++ b/src/Paramore.Brighter/OutboxSweeper.cs @@ -11,6 +11,7 @@ public class OutboxSweeper private readonly double _milliSecondsSinceSent; private readonly IAmAnOutboxViewer _outbox; private readonly IAmACommandProcessor _commandProcessor; + private readonly bool _useBulk; /// /// This sweeper clears an outbox of any outstanding messages within the time interval @@ -18,11 +19,13 @@ public class OutboxSweeper /// How long can a message sit in the box before we attempt to resend /// What is the outbox you want to check -- should be the same one supplied to the command processor below /// Who should post the messages - public OutboxSweeper(double milliSecondsSinceSent, IAmAnOutboxViewer outbox, IAmACommandProcessor commandProcessor) + /// + public OutboxSweeper(double milliSecondsSinceSent, IAmAnOutboxViewer outbox, IAmACommandProcessor commandProcessor, bool useBulk = false) { _milliSecondsSinceSent = milliSecondsSinceSent; _outbox = outbox; _commandProcessor = commandProcessor; + _useBulk = useBulk; if (outbox is IAmAnOutboxViewerAsync outboxViewerAsync) _outboxAsync = outboxViewerAsync; } @@ -46,10 +49,11 @@ public async Task SweepAsync(CancellationToken cancellationToken = default) { if(_outboxAsync == null) throw new InvalidOperationException("No Async Outbox Viewer defined."); - await SweepAsync(_milliSecondsSinceSent, _outboxAsync, _commandProcessor, cancellationToken); + + await SweepAsync(_milliSecondsSinceSent, _outboxAsync, _commandProcessor, cancellationToken, _useBulk); } - public static async Task SweepAsync(double milliSecondsSinceSent, IAmAnOutboxViewerAsync outbox, IAmACommandProcessor commandProcessor, CancellationToken cancellationToken) + public static async Task SweepAsync(double milliSecondsSinceSent, IAmAnOutboxViewerAsync outbox, IAmACommandProcessor commandProcessor, CancellationToken cancellationToken, bool useBulk = false) { //find all the unsent messages var outstandingMessages = (await outbox.OutstandingMessagesAsync(milliSecondsSinceSent, cancellationToken: cancellationToken)).ToArray(); @@ -58,7 +62,10 @@ public static async Task SweepAsync(double milliSecondsSinceSent, IAmAnOutboxVie if (outstandingMessages.Any()) { var messages = outstandingMessages.Select(message => message.Id).ToArray(); - await commandProcessor.ClearOutboxAsync(messages, cancellationToken: cancellationToken); + if (useBulk) + await commandProcessor.BulkClearOutboxAsync(messages, cancellationToken: cancellationToken); + else + await commandProcessor.ClearOutboxAsync(messages, cancellationToken: cancellationToken); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducer.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducer.cs index 1f53219fbd..3bc08e6afd 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducer.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducer.cs @@ -24,16 +24,18 @@ THE SOFTWARE. */ using System; using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; using System.Threading.Tasks; namespace Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles { - public class FakeMessageProducer : IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation + public class FakeMessageProducer : IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation, IAmABulkMessageProducerAsync { public event Action OnMessagePublished; public int MaxOutStandingMessages { get; set; } = -1; public int MaxOutStandingCheckIntervalMilliSeconds { get; set; } = 0; - + public List SentMessages = new List(); public bool MessageWasSent { get; set; } @@ -46,8 +48,17 @@ public Task SendAsync(Message message) tcs.SetResult(message); return tcs.Task; } + public async IAsyncEnumerable SendAsync(IEnumerable messages, int batchSize, [EnumeratorCancellation] CancellationToken cancellationToken) + { + foreach (var msg in messages) + { + yield return new[] {msg.Id}; + } + MessageWasSent = true; + SentMessages.AddRange(messages); + } - public void Send(Message message) + public void Send(Message message) { MessageWasSent = true; SentMessages.Add(message); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeOutboxSync.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeOutboxSync.cs index c417869e57..75696e4035 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeOutboxSync.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeOutboxSync.cs @@ -94,6 +94,16 @@ public IList Get( return Task.FromResult(Get(messageId, outBoxTimeout)); } + public Task> GetAsync(IEnumerable messageIds, int outBoxTimeout = -1, + CancellationToken cancellationToken = default(CancellationToken)) + { + var tcs = new TaskCompletionSource>(); + tcs.SetResult(_posts.Where(oe => messageIds.Contains(oe.Message.Id)) + .Select(outboxEntry => outboxEntry.Message).ToList()); + + return tcs.Task; + } + public Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -105,6 +115,12 @@ public Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictiona return tcs.Task; } + public Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, + CancellationToken cancellationToken = default(CancellationToken)) + { + throw new NotImplementedException(); + } + public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary args = null) { var entry = _posts.SingleOrDefault(oe => oe.Message.Id == id); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor _Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor _Async.cs new file mode 100644 index 0000000000..b850afca1f --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor _Async.cs @@ -0,0 +1,117 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2015 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using FluentAssertions; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Polly; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors +{ + [Collection("CommandProcessor")] + public class CommandProcessorPostBoxBulkClearAsyncTests : IDisposable + { + private readonly CommandProcessor _commandProcessor; + private readonly Message _message; + private readonly Message _message2; + private readonly FakeOutboxSync _fakeOutboxSync; + private readonly FakeMessageProducer _fakeMessageProducer; + + public CommandProcessorPostBoxBulkClearAsyncTests() + { + var myCommand = new MyCommand{ Value = "Hello World"}; + var myCommand2 = new MyCommand { Value = "Hello World 2" }; + + _fakeOutboxSync = new FakeOutboxSync(); + _fakeMessageProducer = new FakeMessageProducer(); + + var topic = "MyCommand"; + var topic2 = "MyCommand2"; + + _message = new Message( + new MessageHeader(myCommand.Id, topic, MessageType.MT_COMMAND), + new MessageBody(JsonSerializer.Serialize(myCommand, JsonSerialisationOptions.Options)) + ); + + _message2 = new Message( + new MessageHeader(myCommand.Id, topic2, MessageType.MT_COMMAND), + new MessageBody(JsonSerializer.Serialize(myCommand2, JsonSerialisationOptions.Options)) + ); + + var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((_) => new MyCommandMessageMapper())); + messageMapperRegistry.Register(); + + var retryPolicy = Policy + .Handle() + .RetryAsync(); + + var circuitBreakerPolicy = Policy + .Handle() + .CircuitBreakerAsync(1, TimeSpan.FromMilliseconds(1)); + + _commandProcessor = new CommandProcessor( + new InMemoryRequestContextFactory(), + new PolicyRegistry { { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } }, + messageMapperRegistry, + _fakeOutboxSync, + new ProducerRegistry(new Dictionary() { { topic, _fakeMessageProducer }, { topic2, _fakeMessageProducer } })); + } + + [Fact] + public async Task When_Clearing_The_PostBox_On_The_Command_Processor_Async() + { + await _fakeOutboxSync.AddAsync(_message); + await _fakeOutboxSync.AddAsync(_message2); + + await _commandProcessor.BulkClearOutboxAsync(new []{_message.Id, _message2.Id}); + + //_should_send_a_message_via_the_messaging_gateway + _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + + var sentMessage = _fakeMessageProducer.SentMessages[0]; + sentMessage.Should().NotBeNull(); + sentMessage.Id.Should().Be(_message.Id); + sentMessage.Header.Topic.Should().Be(_message.Header.Topic); + sentMessage.Body.Value.Should().Be(_message.Body.Value); + + var sentMessage2 = _fakeMessageProducer.SentMessages[1]; + sentMessage2.Should().NotBeNull(); + sentMessage2.Id.Should().Be(_message2.Id); + sentMessage2.Header.Topic.Should().Be(_message2.Header.Topic); + sentMessage2.Body.Value.Should().Be(_message2.Body.Value); + } + + public void Dispose() + { + CommandProcessor.ClearExtServiceBus(); + } + } +} + diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs index 92b5e20a29..d484e41669 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs @@ -144,6 +144,12 @@ public async Task ClearOutboxAsync(IEnumerable posts, bool continueOnCaptu await completionSource.Task; } + public Task BulkClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, + CancellationToken cancellationToken = default(CancellationToken)) + { + return ClearOutboxAsync(posts, continueOnCapturedContext, cancellationToken); + } + public TResponse Call(T request, int timeOutInMilliseconds) where T : class, ICall where TResponse : class, IResponse { _requests.Enqueue(request); diff --git a/tests/Paramore.Brighter.Core.Tests/Paramore.Brighter.Core.Tests.csproj b/tests/Paramore.Brighter.Core.Tests/Paramore.Brighter.Core.Tests.csproj index d76962c025..37394516bc 100644 --- a/tests/Paramore.Brighter.Core.Tests/Paramore.Brighter.Core.Tests.csproj +++ b/tests/Paramore.Brighter.Core.Tests/Paramore.Brighter.Core.Tests.csproj @@ -1,6 +1,6 @@  - netcoreapp3.1 + net6.0 false @@ -15,6 +15,7 @@ + diff --git a/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs b/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs index 3c2d237a79..6aad158d92 100644 --- a/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs +++ b/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs @@ -109,6 +109,12 @@ public void ClearOutbox(params Guid[] posts) return tcs.Task; } + public Task BulkClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, + CancellationToken cancellationToken = default(CancellationToken)) + { + return ClearOutboxAsync(posts, continueOnCapturedContext, cancellationToken); + } + public TResponse Call(T request, int timeOutInMilliseconds) where T : class, ICall where TResponse : class, IResponse { return null; diff --git a/tests/Paramore.Brighter.MSSQL.Tests/Outbox/When_there_are_multiple_messages_and_some_are_recievied_and_Dispatched_bulk_Async.cs b/tests/Paramore.Brighter.MSSQL.Tests/Outbox/When_there_are_multiple_messages_and_some_are_recievied_and_Dispatched_bulk_Async.cs new file mode 100644 index 0000000000..4911d70fb9 --- /dev/null +++ b/tests/Paramore.Brighter.MSSQL.Tests/Outbox/When_there_are_multiple_messages_and_some_are_recievied_and_Dispatched_bulk_Async.cs @@ -0,0 +1,72 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Paramore.Brighter.Outbox.MsSql; +using Xunit; + +namespace Paramore.Brighter.MSSQL.Tests.Outbox +{ + [Trait("Category", "MSSQL")] + public class MsSqlOutboxBulkGetAsyncTests : IDisposable + { + private readonly MsSqlTestHelper _msSqlTestHelper; + private readonly string _Topic1 = "test_topic"; + private readonly string _Topic2 = "test_topic3"; + private IEnumerable _messages; + private readonly Message _message1; + private readonly Message _message2; + private readonly Message _message3; + private readonly Message _message; + private readonly MsSqlOutbox _sqlOutbox; + + public MsSqlOutboxBulkGetAsyncTests() + { + _msSqlTestHelper = new MsSqlTestHelper(); + _msSqlTestHelper.SetupMessageDb(); + + _sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration); + _message = new Message(new MessageHeader(Guid.NewGuid(), _Topic1, MessageType.MT_COMMAND), + new MessageBody("message body")); + _message1 = new Message(new MessageHeader(Guid.NewGuid(), _Topic2, MessageType.MT_EVENT), + new MessageBody("message body2")); + _message2 = new Message(new MessageHeader(Guid.NewGuid(), _Topic1, MessageType.MT_COMMAND), + new MessageBody("message body3")); + _message3 = new Message(new MessageHeader(Guid.NewGuid(), _Topic2, MessageType.MT_EVENT), + new MessageBody("message body4")); + } + + [Fact] + public async Task When_there_are_multiple_messages_and_some_are_received_and_Dispatched_bulk_Async() + { + await _sqlOutbox.AddAsync(_message); + await Task.Delay(100); + await _sqlOutbox.AddAsync(_message1); + await Task.Delay(100); + await _sqlOutbox.AddAsync(_message2); + await Task.Delay(100); + await _sqlOutbox.AddAsync(_message3); + await Task.Delay(100); + + _messages = await _sqlOutbox.GetAsync(new[] {_message1.Id, _message2.Id}); + + //should fetch 1 message + _messages.Should().HaveCount(2); + //should fetch expected message + _messages.Should().Contain(m => m.Id == _message1.Id); + _messages.Should().Contain(m => m.Id == _message2.Id); + + await _sqlOutbox.MarkDispatchedAsync(_messages.Select(m => m.Id), DateTime.UtcNow); + + var undispatchedMessages = await _sqlOutbox.OutstandingMessagesAsync(0); + + undispatchedMessages.Count().Should().Be(2); + } + + public void Dispose() + { + _msSqlTestHelper.CleanUpDb(); + } + } +} diff --git a/tests/Paramore.Brighter.MySQL.Tests/Outbox/When_there_are_multiple_messages_and_some_are_receivied_and_Dispatched_bulk_Async.cs b/tests/Paramore.Brighter.MySQL.Tests/Outbox/When_there_are_multiple_messages_and_some_are_receivied_and_Dispatched_bulk_Async.cs new file mode 100644 index 0000000000..5b28d6ce19 --- /dev/null +++ b/tests/Paramore.Brighter.MySQL.Tests/Outbox/When_there_are_multiple_messages_and_some_are_receivied_and_Dispatched_bulk_Async.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Paramore.Brighter.Outbox.MySql; +using Xunit; + +namespace Paramore.Brighter.MySQL.Tests.Outbox +{ + [Trait("Category", "MySql")] + public class MySqlOutboxBulkAsyncTests : IDisposable + { + private readonly MySqlTestHelper _mySqlTestHelper; + private readonly string _Topic1 = "test_topic"; + private readonly string _Topic2 = "test_topic3"; + private IEnumerable _messages; + private readonly Message _message1; + private readonly Message _message2; + private readonly Message _message3; + private readonly Message _message; + private readonly MySqlOutboxSync _sqlOutbox; + + public MySqlOutboxBulkAsyncTests() + { + _mySqlTestHelper = new MySqlTestHelper(); + _mySqlTestHelper.SetupMessageDb(); + + _sqlOutbox = new MySqlOutboxSync(_mySqlTestHelper.OutboxConfiguration); + _message = new Message(new MessageHeader(Guid.NewGuid(), _Topic1, MessageType.MT_COMMAND), + new MessageBody("message body")); + _message1 = new Message(new MessageHeader(Guid.NewGuid(), _Topic2, MessageType.MT_EVENT), + new MessageBody("message body2")); + _message2 = new Message(new MessageHeader(Guid.NewGuid(), _Topic1, MessageType.MT_COMMAND), + new MessageBody("message body3")); + _message3 = new Message(new MessageHeader(Guid.NewGuid(), _Topic2, MessageType.MT_EVENT), + new MessageBody("message body4")); + } + + [Fact] + public async Task When_there_are_multiple_messages_and_some_are_recievied_and_Dispatched_bulk_Async() + { + await _sqlOutbox.AddAsync(_message); + await Task.Delay(100); + await _sqlOutbox.AddAsync(_message1); + await Task.Delay(100); + await _sqlOutbox.AddAsync(_message2); + await Task.Delay(100); + await _sqlOutbox.AddAsync(_message3); + await Task.Delay(100); + + _messages = await _sqlOutbox.GetAsync(new[] { _message1.Id, _message2.Id }); + + //should fetch 1 message + _messages.Should().HaveCount(2); + //should fetch expected message + _messages.Should().Contain(m => m.Id == _message1.Id); + _messages.Should().Contain(m => m.Id == _message2.Id); + + await _sqlOutbox.MarkDispatchedAsync(_messages.Select(m => m.Id), DateTime.UtcNow); + + var undispatchedMessages = await _sqlOutbox.OutstandingMessagesAsync(0); + + undispatchedMessages.Count().Should().Be(2); + } + + public void Dispose() + { + _mySqlTestHelper.CleanUpDb(); + } + + } +} diff --git a/tests/Paramore.Brighter.Sqlite.Tests/Outbox/When_there_are_multiple_messages_and_some_are_received_and_Dispatched_bulk_Async.cs b/tests/Paramore.Brighter.Sqlite.Tests/Outbox/When_there_are_multiple_messages_and_some_are_received_and_Dispatched_bulk_Async.cs new file mode 100644 index 0000000000..866494cd74 --- /dev/null +++ b/tests/Paramore.Brighter.Sqlite.Tests/Outbox/When_there_are_multiple_messages_and_some_are_received_and_Dispatched_bulk_Async.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Paramore.Brighter.Outbox.Sqlite; +using Xunit; + +namespace Paramore.Brighter.Sqlite.Tests.Outbox +{ + public class SqliteOutboxBulkGetAsyncTests :IDisposable + { + private readonly SqliteTestHelper _sqliteTestHelper; + private readonly string _Topic1 = "test_topic"; + private readonly string _Topic2 = "test_topic3"; + private IEnumerable _messages; + private readonly Message _message1; + private readonly Message _message2; + private readonly Message _message3; + private readonly Message _message; + private readonly SqliteOutboxSync _sqlOutbox; + + public SqliteOutboxBulkGetAsyncTests() + { + _sqliteTestHelper = new SqliteTestHelper(); + _sqliteTestHelper.SetupMessageDb(); + _sqlOutbox = new SqliteOutboxSync(new SqliteConfiguration(_sqliteTestHelper.ConnectionString, _sqliteTestHelper.TableName_Messages)); + + _message = new Message(new MessageHeader(Guid.NewGuid(), _Topic1, MessageType.MT_COMMAND), + new MessageBody("message body")); + _message1 = new Message(new MessageHeader(Guid.NewGuid(), _Topic2, MessageType.MT_EVENT), + new MessageBody("message body2")); + _message2 = new Message(new MessageHeader(Guid.NewGuid(), _Topic1, MessageType.MT_COMMAND), + new MessageBody("message body3")); + _message3 = new Message(new MessageHeader(Guid.NewGuid(), _Topic2, MessageType.MT_EVENT), + new MessageBody("message body4")); + } + + [Fact] + public async Task When_there_are_multiple_messages_and_some_are_received_and_Dispatched_bulk_Async() + { + await _sqlOutbox.AddAsync(_message); + await Task.Delay(100); + await _sqlOutbox.AddAsync(_message1); + await Task.Delay(100); + await _sqlOutbox.AddAsync(_message2); + await Task.Delay(100); + await _sqlOutbox.AddAsync(_message3); + await Task.Delay(100); + + _messages = await _sqlOutbox.GetAsync(new[] { _message1.Id, _message2.Id }); + + //should fetch 1 message + _messages.Should().HaveCount(2); + //should fetch expected message + _messages.Should().Contain(m => m.Id == _message1.Id); + _messages.Should().Contain(m => m.Id == _message2.Id); + + await _sqlOutbox.MarkDispatchedAsync(_messages.Select(m => m.Id), DateTime.UtcNow); + + var undispatchedMessages = await _sqlOutbox.OutstandingMessagesAsync(0); + + undispatchedMessages.Count().Should().Be(2); + } + + public void Dispose() + { + _sqliteTestHelper.CleanUpDb(); + } + } +}