Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change circuitbreaker approach #453

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2737bba
processing spike
fgheysels Nov 26, 2024
3931db9
code cleanup
fgheysels Nov 29, 2024
a91a68b
pr-sug: use message processing result io boolean
stijnmoreels Dec 3, 2024
d9bde5f
pr-sug: promote circuit breaker state enum to class
stijnmoreels Dec 3, 2024
5eb3360
pr-fix: throw-if-null is not available in net-standard
stijnmoreels Dec 3, 2024
237bcc4
pr-fix: correct usings in az service bus message pump
stijnmoreels Dec 3, 2024
8951657
pr-sug: add message id context to the processing result
stijnmoreels Dec 3, 2024
607b141
pr-fix: correct time-out for resiliency tests
stijnmoreels Dec 3, 2024
f7ac148
pr-fix: remove useless dev-test
stijnmoreels Dec 3, 2024
34b2c0d
pr-fix: correct recieved message creation in unit tests
stijnmoreels Dec 3, 2024
2b38c9a
pr-fix: more stable post-assertion resilence
stijnmoreels Dec 3, 2024
d965a66
pr-fix: use back the message id for the message processing result
stijnmoreels Dec 3, 2024
8af116f
pr-sug: finishing touches on circuit breaker state transitioning
stijnmoreels Dec 4, 2024
5bc2ed3
pr-fix: streamline equalization in circuit breaker state
stijnmoreels Dec 4, 2024
6c9ea59
pr-fix: let router abbandon message io circuit breaker handler
stijnmoreels Dec 4, 2024
21e0b19
pr-sug: rename wait interval method + fix wait recovery period log
stijnmoreels Dec 5, 2024
aa6f411
pr-fix: complete renaming in message pump
stijnmoreels Dec 5, 2024
337000c
pr-sug: use transition method for open state
stijnmoreels Dec 9, 2024
0f1aed4
pr-sug: add half-open state boolean flag
stijnmoreels Dec 9, 2024
9e8bd54
pr-fix: limit processing of single message on queue
stijnmoreels Dec 9, 2024
387b728
Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePump…
stijnmoreels Dec 9, 2024
bba3b25
pr-sug: reframe summary and remarks wording in circuit breaker states
stijnmoreels Dec 9, 2024
0adc278
Merge branch 'frgh/feat/auto_circuitbreaker' of https://github.com/ar…
stijnmoreels Dec 9, 2024
de53954
Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePump…
stijnmoreels Dec 9, 2024
0c83e86
Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePump…
stijnmoreels Dec 9, 2024
cf072c0
Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePump…
stijnmoreels Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IAzureServiceBusMessageRouter : IMessageRouter
{
/// <summary>
/// Handle a new <paramref name="message"/> that was received by routing them through registered <see cref="IAzureServiceBusMessageHandler{TMessage}"/>s
/// and optionally through an registered <see cref="IFallbackMessageHandler"/> or <see cref="IAzureServiceBusFallbackMessageHandler"/>
/// and optionally through a registered <see cref="IFallbackMessageHandler"/> or <see cref="IAzureServiceBusFallbackMessageHandler"/>
/// if none of the message handlers were able to process the <paramref name="message"/>.
/// </summary>
/// <param name="message">The incoming message that needs to be routed through registered message handlers.</param>
Expand All @@ -29,15 +29,15 @@ public interface IAzureServiceBusMessageRouter : IMessageRouter
/// Thrown when the <paramref name="message"/>, <paramref name="messageContext"/>, or <paramref name="correlationInfo"/> is <c>null</c>.
/// </exception>
/// <exception cref="InvalidOperationException">Thrown when no message handlers or none matching message handlers are found to process the message.</exception>
Task RouteMessageAsync(
Task<MessageProcessingResult> RouteMessageAsync(
ServiceBusReceivedMessage message,
AzureServiceBusMessageContext messageContext,
MessageCorrelationInfo correlationInfo,
CancellationToken cancellationToken);

/// <summary>
/// Handle a new <paramref name="message"/> that was received by routing them through registered <see cref="IAzureServiceBusMessageHandler{TMessage}"/>s
/// and optionally through an registered <see cref="IFallbackMessageHandler"/> or <see cref="IAzureServiceBusFallbackMessageHandler"/>
/// and optionally through a registered <see cref="IFallbackMessageHandler"/> or <see cref="IAzureServiceBusFallbackMessageHandler"/>
/// if none of the message handlers were able to process the <paramref name="message"/>.
/// </summary>
/// <param name="messageReceiver">
Expand All @@ -52,7 +52,7 @@ Task RouteMessageAsync(
/// Thrown when the <paramref name="messageReceiver"/>, <paramref name="message"/>, <paramref name="messageContext"/>, or <paramref name="correlationInfo"/> is <c>null</c>.
/// </exception>
/// <exception cref="InvalidOperationException">Thrown when no message handlers or none matching message handlers are found to process the message.</exception>
Task RouteMessageAsync(
Task<MessageProcessingResult> RouteMessageAsync(
ServiceBusReceiver messageReceiver,
ServiceBusReceivedMessage message,
AzureServiceBusMessageContext messageContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
using System;
using GuardNet;

namespace Arcus.Messaging.Abstractions.MessageHandling
{
/// <summary>
/// Represents all the possible errors of a <see cref="MessageProcessingResult"/>.
/// </summary>
public enum MessageProcessingError
{
/// <summary>
/// Defines an error that shows that the message processing was interrupted by some external cause,
/// unrelated to the message routing.
/// </summary>
ProcessingInterrupted,
fgheysels marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Defines an error shows that no <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation was found
/// that was able to process the received message.
/// </summary>
CannotFindMatchedHandler,

/// <summary>
/// Defines and error that shows that the matched <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation
/// was unable to process the received message.
/// </summary>
MatchedHandlerFailed,
}

/// <summary>
/// Represents an outcome of a message that was processed by an <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation.
/// </summary>
public class MessageProcessingResult
{
private MessageProcessingResult()
private MessageProcessingResult(string messageId)
{
MessageId = messageId ?? throw new ArgumentNullException(nameof(messageId));
IsSuccessful = true;
}

private MessageProcessingResult(Exception processingException)
private MessageProcessingResult(string messageId, MessageProcessingError error, string errorMessage, Exception processingException)
{
Guard.NotNull(processingException, nameof(processingException));

IsSuccessful = false;
MessageId = messageId ?? throw new ArgumentNullException(nameof(messageId));
Error = error;
ErrorMessage = errorMessage;
ProcessingException = processingException;
IsSuccessful = false;
}

/// <summary>
Expand All @@ -27,27 +52,57 @@ private MessageProcessingResult(Exception processingException)
public bool IsSuccessful { get; }

/// <summary>
/// Gets the exception that occurred during the message processing that represents the cause of the processing failure.
/// Gets the unique ID to identify the message for which this is a processing result.
/// </summary>
public string MessageId { get; }

/// <summary>
/// Gets the error type that shows which kind of error the message processing failed.
/// </summary>
/// <remarks>
/// Only available when this processing result represents an unsuccessful message processing result - when <see cref="IsSuccessful"/> is <c>false</c>.
/// </remarks>
public MessageProcessingError Error { get; }

/// <summary>
/// Gets the description that explains the context of the <see cref="Error"/>.
/// </summary>
/// <remarks>
/// Only available when this processing result represents an unsuccessful message processing result - when <see cref="IsSuccessful"/> is <c>false</c>.
/// </remarks>
public string ErrorMessage { get; }

/// <summary>
/// Gets the exception that occurred during the message processing that represents the cause of the processing failure.
/// </summary>
/// <remarks>
/// Only possibly available when this processing result represents an unsuccessful message processing result - when <see cref="IsSuccessful"/> is <c>false</c>.
/// </remarks>
public Exception ProcessingException { get; }

/// <summary>
/// Gets an <see cref="MessageProcessingResult"/> instance that represents a result of a message was successfully processed.
/// </summary>
public static MessageProcessingResult Success => new MessageProcessingResult();
public static MessageProcessingResult Success(string messageId) => new(messageId);

/// <summary>
/// Creates an <see cref="MessageProcessingResult"/> instance that represents a result of a message that was unsuccessfully processed.
/// </summary>
public static MessageProcessingResult Failure(string messageId, MessageProcessingError error, string errorMessage)
{
return new MessageProcessingResult(messageId, error, errorMessage, processingException: null);
}

/// <summary>
/// Creates an <see cref="MessageProcessingResult"/> instance that represents a result of a message that was unsuccessfully processed.
/// </summary>
/// <param name="processingException">The exception that occurred during the message processing that represents the cause of the processing failure.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="processingException"/> is blank.</exception>
public static MessageProcessingResult Failure(Exception processingException)
public static MessageProcessingResult Failure(string messageId, MessageProcessingError error, string errorMessage, Exception processingException)
{
Guard.NotNull(processingException, nameof(processingException));
return new MessageProcessingResult(processingException);
return new MessageProcessingResult(
messageId,
error,
errorMessage,
processingException ?? throw new ArgumentNullException(nameof(processingException)));
}
}
}
81 changes: 46 additions & 35 deletions src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions.MessageHandling;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
using GuardNet;
using Microsoft.Extensions.Configuration;
Expand All @@ -11,27 +10,6 @@

namespace Arcus.Messaging.Pumps.Abstractions
{
/// <summary>
/// Represents the available states in which the <see cref="MessagePump"/> is presently in within the circuit breaker context
/// </summary>
public enum MessagePumpCircuitState
{
/// <summary>
/// The message pump is able to receive messages.
/// </summary>
Closed,

/// <summary>
/// The message pump is under inspection if it can receive messages.
/// </summary>
HalfOpen,

/// <summary>
/// The message pump is unable to receive messages.
/// </summary>
Open
}

/// <summary>
/// Represents the foundation for building message pumps.
/// </summary>
Expand Down Expand Up @@ -123,18 +101,6 @@ public override async Task StopAsync(CancellationToken cancellationToken)
await base.StopAsync(cancellationToken);
}

/// <summary>
/// Try to process a single message after the circuit was broken, a.k.a entering the half-open state.
/// </summary>
/// <returns>
/// [Success] when the related message handler can again process messages and the message pump can again start receive messages in full; [Failure] otherwise.
/// </returns>
public virtual Task<MessageProcessingResult> TryProcessProcessSingleMessageAsync(MessagePumpCircuitBreakerOptions options)
{
CircuitState = MessagePumpCircuitState.HalfOpen;
return Task.FromResult(MessageProcessingResult.Success);
}

/// <summary>
/// Start with receiving messages on this message pump.
/// </summary>
Expand All @@ -154,11 +120,56 @@ public virtual Task StartProcessingMessagesAsync(CancellationToken cancellationT
public virtual Task StopProcessingMessagesAsync(CancellationToken cancellationToken)
{
IsStarted = false;
CircuitState = MessagePumpCircuitState.Open;
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open);

return Task.CompletedTask;
}

/// <summary>
/// Waits a previously configured amount of time until the message pump is expected to be recovered (Closed to Open state).
/// </summary>
/// <param name="cancellationToken">The token to cancel the wait period.</param>
protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellationToken)
{
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message interval of '{Interval}' during '{State}' state", JobId, CircuitState.Options.MessageRecoveryPeriod.ToString("g"), CircuitState);
await Task.Delay(CircuitState.Options.MessageRecoveryPeriod, cancellationToken);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
fgheysels marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
/// Waits a previously configured amount of time until the next single message can be tried (Half-Open state).
/// </summary>
/// <param name="cancellationToken">The token to cancel the wait period.</param>
protected async Task WaitMessageIntervalPeriodAsync(CancellationToken cancellationToken)
fgheysels marked this conversation as resolved.
Show resolved Hide resolved
{
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition into an '{State}' state, retrieving messages is paused for '{Period}'", JobId, CircuitState, CircuitState.Options.MessageIntervalDuringRecovery.ToString("g"));
fgheysels marked this conversation as resolved.
Show resolved Hide resolved
await Task.Delay(CircuitState.Options.MessageIntervalDuringRecovery, cancellationToken);

CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
}

/// <summary>
/// Notifies the message pump about the new state which pauses message retrieval.
/// </summary>
/// <param name="options">The additional accompanied options that goes with the new state.</param>
internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions options)
{
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState);

CircuitState = MessagePumpCircuitState.Open(options);
}

/// <summary>
/// Notifies the message pump about the new state which resumes message retrieval.
/// </summary>
protected void NotifyResumeRetrievingMessages()
{
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState);

CircuitState = MessagePumpCircuitState.Closed;
}
fgheysels marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Register information about the client connected to the messaging service
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions.MessageHandling;
using GuardNet;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -40,70 +38,28 @@ public DefaultMessagePumpCircuitBreaker(IServiceProvider serviceProvider, ILogge
/// <param name="jobId">The unique identifier to distinguish the message pump in the application services.</param>
/// <param name="configureOptions">The optional user-configurable options to manipulate the workings of the message pump interaction.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public virtual async Task PauseMessageProcessingAsync(string jobId, Action<MessagePumpCircuitBreakerOptions> configureOptions)
public virtual Task PauseMessageProcessingAsync(string jobId, Action<MessagePumpCircuitBreakerOptions> configureOptions)
{
Guard.NotNullOrWhitespace(jobId, nameof(jobId));

var options = new MessagePumpCircuitBreakerOptions();
configureOptions?.Invoke(options);

MessagePump messagePump = GetRegisteredMessagePump(jobId);

if (!messagePump.IsStarted)
{
_logger.LogWarning($"Cannot pause MessagePump for JobId {jobId} because the MessagePump has not been started.");
return;
_logger.LogWarning("Cannot pause message pump '{JobId}' because the pump has not been started", jobId);
return Task.CompletedTask;
}

if (messagePump.CircuitState != MessagePumpCircuitState.Closed)
{
_logger.LogWarning($"Cannot pause MessagePump for JobId {jobId} because the MessagePump's circuitbreaker is not in a closed state.");
return;
}

_logger.LogDebug("Open circuit by pausing message processing for message pump '{JobId}'...", jobId);
await messagePump.StopProcessingMessagesAsync(CancellationToken.None);

await Task.Factory.StartNew(async () =>
{
await WaitRecoveryTimeAsync(messagePump, options);

MessageProcessingResult result;
do
{
result = await TryProcessSingleMessageAsync(messagePump, options);

if (!result.IsSuccessful)
{
await WaitMessageIntervalDuringRecoveryAsync(messagePump, options);
}

} while (!result.IsSuccessful);

await ResumeMessageProcessingAsync(jobId);
}, TaskCreationOptions.LongRunning);
}

/// <summary>
/// Continue the process of receiving messages in the message pump after a successful message handling.
/// </summary>
/// <param name="jobId">The unique identifier to distinguish the message pump in the application services.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public virtual async Task ResumeMessageProcessingAsync(string jobId)
{
Guard.NotNullOrWhitespace(jobId, nameof(jobId));

MessagePump messagePump = GetRegisteredMessagePump(jobId);

if (messagePump.IsStarted)
if (!messagePump.CircuitState.IsClosed)
{
_logger.LogWarning("Resume called on Message pump '{JobId}' but Message pump is already started. CircuitState = {CircuitState}", jobId, messagePump.CircuitState);
return;
return Task.CompletedTask;
}

_logger.LogInformation("Message pump '{JobId}' successfully handled a single message, resume message processing (circuit breaker: closed)", messagePump.JobId);
await messagePump.StartProcessingMessagesAsync(CancellationToken.None);
var options = new MessagePumpCircuitBreakerOptions();
configureOptions?.Invoke(options);

messagePump.NotifyPauseReceiveMessages(options);
return Task.CompletedTask;
stijnmoreels marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand Down Expand Up @@ -148,23 +104,5 @@ protected MessagePump GetRegisteredMessagePump(string jobId)

return messagePumps[0];
}

private async Task<MessageProcessingResult> TryProcessSingleMessageAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options)
{
_logger.LogDebug("Try to process single message in message pump '{JobId}' (state: half-open)", messagePump.JobId);
return await messagePump.TryProcessProcessSingleMessageAsync(options);
}

private async Task WaitMessageIntervalDuringRecoveryAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options)
{
_logger.LogDebug("Wait configured interval period ({IntervalPeriod}) since message pump '{JobId}' failed to handle a single message (circuit breaker: open)", options.MessageIntervalDuringRecovery, messagePump.JobId);
await Task.Delay(options.MessageIntervalDuringRecovery);
}

private async Task WaitRecoveryTimeAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options)
{
_logger.LogDebug("Wait configured recovery period ({RecoveryPeriod}) since message pump '{JobId}' failed to process messages (circuit breaker: open)", options.MessageRecoveryPeriod, messagePump.JobId);
await Task.Delay(options.MessageRecoveryPeriod);
}
}
}
Loading