Skip to content

Commit

Permalink
Update Bulk to use the new Producer Registry
Browse files Browse the repository at this point in the history
  • Loading branch information
preardon committed Jan 6, 2022
1 parent 6cae75b commit b91be3a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,39 +62,43 @@ public async Task SendAsync(Message message)
/// <param name="cancellationToken">The Cancellation Token.</param>
/// <returns>List of Messages successfully sent.</returns>
/// <exception cref="NotImplementedException"></exception>
public async IAsyncEnumerable<Guid[]> SendAsync(IEnumerable<Message> messages, int batchSize, [EnumeratorCancellation] CancellationToken cancellationToken)
public async IAsyncEnumerable<Guid[]> SendAsync(IEnumerable<Message> messages, int batchSize,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var messagesByTopic = messages.GroupBy(m => m.Header.Topic);

foreach (var topicBatch in messagesByTopic)
var topics = messages.Select(m => m.Header.Topic).Distinct();
if (topics.Count() != 1)
{
var messagesForTopic = topicBatch.ToArray();
var batches = Enumerable.Range(0, (int)Math.Ceiling((messagesForTopic.Length / (decimal)batchSize)))
.Select(i => new List<Message>(messagesForTopic
.Skip(i * batchSize)
.Take(batchSize)
.ToArray()));
s_logger.LogError("Cannot Bulk send for Multiple Topics, {NumberOfTopics} Topics Requested", topics.Count());
throw new Exception($"Cannot Bulk send for Multiple Topics, {topics.Count()} Topics Requested");
}
var topic = topics.Single();

var batches = Enumerable.Range(0, (int)Math.Ceiling((messages.Count() / (decimal)batchSize)))
.Select(i => new List<Message>(messages
.Skip(i * batchSize)
.Take(batchSize)
.ToArray()));

var serviceBusSenderWrapper = GetSender(topicBatch.Key);
var serviceBusSenderWrapper = GetSender(topic);

try
s_logger.LogInformation("Sending Messages for {TopicName} split into {NumberOfBatches} Batches of {BatchSize}", topic, batches.Count(), batchSize);
try
{
foreach (var batch in batches)
{
foreach (var batch in batches)
{
var asbMessages = batch.Select(ConvertToServiceBusMessage).ToArray();
var asbMessages = batch.Select(ConvertToServiceBusMessage).ToArray();

s_logger.LogDebug("Publishing {NumberOfMessages} messages to topic {Topic}.",
asbMessages.Length, topicBatch.Key);
s_logger.LogDebug("Publishing {NumberOfMessages} messages to topic {Topic}.",
asbMessages.Length, topic);

await serviceBusSenderWrapper.SendAsync(asbMessages, cancellationToken);
yield return batch.Select(m => m.Id).ToArray();
}
}
finally
{
await serviceBusSenderWrapper.CloseAsync();
await serviceBusSenderWrapper.SendAsync(asbMessages, cancellationToken);
yield return batch.Select(m => m.Id).ToArray();
}
}
finally
{
await serviceBusSenderWrapper.CloseAsync();
}
}

/// <summary>
Expand Down
6 changes: 0 additions & 6 deletions src/Paramore.Brighter/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -690,12 +690,6 @@ private void InitExtServiceBus(IPolicyRegistry<string> policyRegistry,
_bus.OutboxTimeout = outboxTimeout;
_bus.PolicyRegistry = policyRegistry;
_bus.ProducerRegistry = producerRegistry;
if (messageProducer is IAmAMessageProducerSync syncMessageProducer)
_bus.MessageProducerSync = syncMessageProducer;
if (messageProducer is IAmAMessageProducerAsync asyncMessageProducer)
_bus.AsyncMessageProducer = asyncMessageProducer;
if (messageProducer is IAmABulkMessageProducerAsync asyncBulkMessageProducer)
_bus.AsyncBulkMessageProducer = asyncBulkMessageProducer;
}
}
}
Expand Down
35 changes: 22 additions & 13 deletions src/Paramore.Brighter/ExternalBusServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ internal class ExternalBusServices : IDisposable

internal IAmAProducerRegistry ProducerRegistry { get; set; }

internal IAmAMessageProducerSync MessageProducerSync { get; set; }
internal IAmAMessageProducerAsync AsyncMessageProducer { get; set; }
internal IAmABulkMessageProducerAsync AsyncBulkMessageProducer { get; set; }

private DateTime _lastOutStandingMessageCheckAt = DateTime.UtcNow;

//Used to checking the limit on outstanding messages for an Outbox. We throw at that point. Writes to the static bool should be made thread-safe by locking the object
Expand Down Expand Up @@ -213,8 +209,6 @@ internal async Task BulkClearOutboxAsync(IEnumerable<Guid> posts, bool continueO
{
if (!HasAsyncOutbox())
throw new InvalidOperationException("No async outbox defined.");
if (AsyncBulkMessageProducer == null)
throw new InvalidOperationException("No async bulk message producer defined.");

var messages = await AsyncOutbox.GetAsync(posts, OutboxTimeout, cancellationToken);

Expand All @@ -230,19 +224,34 @@ internal async Task BulkClearOutboxAsync(IEnumerable<Message> posts, bool contin
{
if (!HasAsyncOutbox())
throw new InvalidOperationException("No async outbox defined.");
if (AsyncBulkMessageProducer == null)
throw new InvalidOperationException("No async bulk message producer defined.");

CheckOutboxOutstandingLimit();

var dispatchesMessages = AsyncBulkMessageProducer.SendAsync(posts, 10, cancellationToken);
//Chunk into Topics
var messagesByTopic = posts.GroupBy(m => m.Header.Topic);

await foreach (var successfulMessage in dispatchesMessages.WithCancellation(cancellationToken))
foreach (var topicBatch in messagesByTopic)
{
if (!(AsyncBulkMessageProducer is ISupportPublishConfirmation))
var producer = ProducerRegistry.LookupBy(topicBatch.Key);

if (producer is IAmABulkMessageProducerAsync bulkMessageProducer)
{
var messages = topicBatch.ToArray();
s_logger.LogInformation("Bulk Dispatching {NumberOfMessages} for Topic {TopicName}", messages.Length, topicBatch.Key);
var dispatchesMessages = bulkMessageProducer.SendAsync(messages, 10, cancellationToken);

await foreach (var successfulMessage in dispatchesMessages.WithCancellation(cancellationToken))
{
if (!(producer is ISupportPublishConfirmation))
{
await RetryAsync(async ct => await AsyncOutbox.MarkDispatchedAsync(successfulMessage,
DateTime.UtcNow, cancellationToken: cancellationToken), cancellationToken: cancellationToken);
}
}
}
else
{
await RetryAsync(async ct => await AsyncOutbox.MarkDispatchedAsync(successfulMessage,
DateTime.UtcNow, cancellationToken: cancellationToken), cancellationToken: cancellationToken);
throw new InvalidOperationException("No async bulk message producer defined.");
}
}

Expand Down

0 comments on commit b91be3a

Please sign in to comment.