Skip to content

Commit

Permalink
Fix Background Clearing Thread for outbox to exit from monitor (#1994)
Browse files Browse the repository at this point in the history
* Fix Background Clearing Thread for outbox to exit from monitor

* Added the outbox limit check to AddToOutbox
  • Loading branch information
preardon authored Feb 17, 2022
1 parent f020e22 commit c714674
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions src/Paramore.Brighter/ExternalBusServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ protected virtual void Dispose(bool disposing)
internal async Task AddToOutboxAsync<T>(T request, bool continueOnCapturedContext, CancellationToken cancellationToken, Message message, IAmABoxTransactionConnectionProvider overridingTransactionConnectionProvider = null)
where T : class, IRequest
{
CheckOutboxOutstandingLimit();

var written = await RetryAsync(async ct => { await AsyncOutbox.AddAsync(message, OutboxTimeout, ct, overridingTransactionConnectionProvider).ConfigureAwait(continueOnCapturedContext); },
continueOnCapturedContext, cancellationToken).ConfigureAwait(continueOnCapturedContext);

Expand All @@ -71,6 +73,8 @@ internal async Task AddToOutboxAsync<T>(T request, bool continueOnCapturedContex

internal void AddToOutbox<T>(T request, Message message, IAmABoxTransactionConnectionProvider overridingTransactionConnectionProvider = null) where T : class, IRequest
{
CheckOutboxOutstandingLimit();

var written = Retry(() => { OutBox.Add(message, OutboxTimeout, overridingTransactionConnectionProvider); });

if (!written)
Expand Down Expand Up @@ -118,8 +122,6 @@ internal void ClearOutbox(params Guid[] posts)
if (!HasOutbox())
throw new InvalidOperationException("No outbox defined.");

CheckOutboxOutstandingLimit();

// Only allow a single Clear to happen at a time
_clearSemaphoreToken.Wait();
try
Expand Down Expand Up @@ -148,8 +150,6 @@ internal async Task ClearOutboxAsync(IEnumerable<Guid> posts, bool continueOnCap
if (!HasAsyncOutbox())
throw new InvalidOperationException("No async outbox defined.");

CheckOutboxOutstandingLimit();

await _clearSemaphoreToken.WaitAsync(cancellationToken);
try
{
Expand Down Expand Up @@ -179,8 +179,6 @@ internal async Task ClearOutboxAsync(IEnumerable<Guid> posts, bool continueOnCap
/// <param name="useBulk">Use bulk sending capability of the message producer, this must be paired with useAsync.</param>
internal void ClearOutbox(int amountToClear, int minimumAge, bool useAsync, bool useBulk)
{
CheckOutboxOutstandingLimit();

if (useAsync)
{
if (!HasAsyncOutbox())
Expand All @@ -205,11 +203,16 @@ private async Task BackgroundDispatchUsingSync(int amountToClear, int minimumAge
await _clearSemaphoreToken.WaitAsync(CancellationToken.None);
try
{
Dispatch(OutBox.OutstandingMessages(minimumAge, amountToClear));
var messages = OutBox.OutstandingMessages(minimumAge, amountToClear);
s_logger.LogInformation("Found {NumberOfMessages} to clear out of amount {AmountToClear}",
messages.Count(), amountToClear);
Dispatch(messages);
s_logger.LogInformation("Messages have been cleared");
}
finally
{
_clearSemaphoreToken.Release();
Monitor.Exit(_implicitClearMessagesObject);
}

CheckOutstandingMessages();
Expand All @@ -225,15 +228,21 @@ private async Task BackgroundDispatchUsingAsync(int amountToClear, int minimumAg
{
var messages =
await AsyncOutbox.OutstandingMessagesAsync(minimumAge, amountToClear);

s_logger.LogInformation("Found {NumberOfMessages} to clear out of amount {AmountToClear}",
messages.Count(), amountToClear);

if (useBulk)
await BulkDispatchAsync(messages, CancellationToken.None);
else
await DispatchAsync(messages, false, CancellationToken.None);

s_logger.LogInformation("Messages have been cleared");
}
finally
{
_clearSemaphoreToken.Release();
Monitor.Exit(_implicitClearMessagesObject);
}

CheckOutstandingMessages();
Expand Down

0 comments on commit c714674

Please sign in to comment.