Skip to content

Commit

Permalink
Allow normal and deadletter consumer for the same event in the same a…
Browse files Browse the repository at this point in the history
…pp (#528)

* Allow two consumers of the same event with different deadletter
  • Loading branch information
mburumaxwell authored Jul 6, 2023
1 parent 60a842b commit 6872499
Show file tree
Hide file tree
Showing 23 changed files with 134 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public override void PostConfigure(string? name, AmazonKinesisTransportOptions o

// Consumer names become Queue names and they should not be longer than 128 characters
// See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
if (ecr.ConsumerName!.Length > 128)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public override void PostConfigure(string? name, AmazonSqsTransportOptions optio

// Consumer names become Queue names and they should not be longer than 80 characters
// See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
if (ecr.ConsumerName!.Length > 80)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
var queueUrl = await GetQueueUrlAsync(reg: reg,
ecr: ecr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public override void PostConfigure(string? name, AzureEventHubsTransportOptions
}

// Consumer names become Consumer Group names and they should not be longer than 256 characters
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
if (ecr.ConsumerName!.Length > 256)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected override Task StartCoreAsync(CancellationToken cancellationToken)
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
var t = ReceiveAsync(reg: reg, ecr: ecr, cancellationToken: stoppingCts.Token);
receiverTasks.Add(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public override void PostConfigure(string? name, AzureServiceBusTransportOptions
// When not using Queues, ConsumerName -> SubscriptionName does not happen
if (reg.EntityKind == EntityKind.Broadcast)
{
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
if (ecr.ConsumerName!.Length > 50)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken
var registrations = GetRegistrations();
foreach (var reg in registrations)
{
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public override void PostConfigure(string? name, InMemoryTransportOptions option
options.EnsureAllowedEntityKind(reg, EntityKind.Broadcast, EntityKind.Queue);

// This does not support dead-letter yet
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
if (ecr.Deadletter)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public override void PostConfigure(string? name, KafkaTransportOptions options)
+ "Kafka does not allow more than 255 characters for Topic names.");
}

foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
// Consumer names become Consumer Group IDs and they should not be longer than 255 characters
if (ecr.ConsumerName!.Length > 255)
Expand Down
2 changes: 1 addition & 1 deletion src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private async Task ProcessAsync(CancellationToken cancellationToken)
var reg = GetRegistrations().Single(r => r.EventName == topic);

// form the generic method
var ecr = reg.Consumers.Values.Single(); // only one consumer per event
var ecr = reg.Consumers.Single(); // only one consumer per event
var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType);
await ((Task)method.Invoke(this, new object[] { reg, ecr, result, cancellationToken, })!).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public override void PostConfigure(string? name, RabbitMqTransportOptions option
+ "RabbitMQ does not allow more than 255 characters for Exchange names.");
}

foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
// Consumer names become Queue names and they should not be longer than 255 characters
if (ecr.ConsumerName!.Length > 255)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private async Task ConnectConsumersAsync(CancellationToken cancellationToken)
foreach (var reg in registrations)
{
var exchangeName = reg.EventName!;
foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
// queue names must be unique so add the exchange name so that we can tell to whom the queue belongs
var queueName = BusOptions.Naming.Join(ecr.ConsumerName!, exchangeName);
Expand Down
4 changes: 2 additions & 2 deletions src/Tingle.EventBus/Configuration/DefaultEventConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void Configure(EventRegistration registration, EventBusOptions options)
// bind from IConfiguration
var configuration = configurationProvider.Configuration.GetSection($"Events:{registration.EventType.FullName}");
configuration.Bind(registration);
foreach (var ecr in registration.Consumers.Values)
foreach (var ecr in registration.Consumers)
{
configuration.GetSection($"Consumers:{ecr.ConsumerType.FullName}").Bind(ecr);
}
Expand Down Expand Up @@ -117,7 +117,7 @@ internal void ConfigureConsumerNames(EventRegistration reg, EventBusNamingOption
// prefix is either the one provided or the application name
var prefix = options.ConsumerNamePrefix ?? environment.ApplicationName;

foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
// set the consumer name, if not set
if (string.IsNullOrWhiteSpace(ecr.ConsumerName))
Expand Down
33 changes: 18 additions & 15 deletions src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
/// <summary>
/// Represents a registration for a consumer of an event.
/// </summary>
public class EventConsumerRegistration : IEquatable<EventConsumerRegistration>
public class EventConsumerRegistration : IEquatable<EventConsumerRegistration?>
{
/// <summary>
/// Creates an instance of <see cref="EventConsumerRegistration"/>.
/// </summary>
/// <param name="consumerType">The type of consumer handling the event.</param>
public EventConsumerRegistration(Type consumerType)
/// <param name="deadletter">Whether the consumer should be connected to the dead-letter entity.</param>
public EventConsumerRegistration(Type consumerType, bool deadletter)
{
ConsumerType = consumerType ?? throw new ArgumentNullException(nameof(consumerType));
Deadletter = deadletter;
}

/// <summary>
Expand All @@ -20,17 +22,17 @@ public EventConsumerRegistration(Type consumerType)
public Type ConsumerType { get; }

/// <summary>
/// The name generated for the consumer.
/// Gets or sets a value indicating if the consumer should be connected to the dead-letter entity.
/// For transports that do not support dead-letter entities, a separate queue is created.
/// When set to <see langword="true"/>, you must use <see cref="IDeadLetteredEventConsumer{T}"/>
/// to consume events.
/// </summary>
public string? ConsumerName { get; set; }
public bool Deadletter { get; }

/// <summary>
/// Gets or sets a value indicating if the consumer should be connected to the dead-letter sub-queue.
/// For transports that do not support dead-letter sub-queues, a separate queue is created.
/// When set to <see langword="true"/>, you must use <see cref="IDeadLetteredEventConsumer{T}"/>
/// to consume events.
/// The name generated for the consumer.
/// </summary>
public bool Deadletter { get; internal set; }
public string? ConsumerName { get; set; }

/// <summary>
/// The behaviour for unhandled errors when consuming events via the
Expand Down Expand Up @@ -83,21 +85,22 @@ public EventConsumerRegistration OnError(UnhandledConsumerErrorBehaviour? behavi
/// <inheritdoc/>
public bool Equals(EventConsumerRegistration? other)
{
return other is not null && EqualityComparer<Type>.Default.Equals(ConsumerType, other.ConsumerType);
return other is not null &&
EqualityComparer<Type>.Default.Equals(ConsumerType, other.ConsumerType) &&
Deadletter == other.Deadletter;
}

/// <inheritdoc/>
public override int GetHashCode() => ConsumerType.GetHashCode();
public override int GetHashCode() => HashCode.Combine(ConsumerType, Deadletter);

///
public static bool operator ==(EventConsumerRegistration left, EventConsumerRegistration right)
public static bool operator ==(EventConsumerRegistration? left, EventConsumerRegistration? right)
{
return EqualityComparer<EventConsumerRegistration>.Default.Equals(left, right);
return EqualityComparer<EventConsumerRegistration?>.Default.Equals(left, right);
}

///
public static bool operator !=(EventConsumerRegistration left, EventConsumerRegistration right) => !(left == right);
public static bool operator !=(EventConsumerRegistration? left, EventConsumerRegistration? right) => !(left == right);

#endregion

}
13 changes: 7 additions & 6 deletions src/Tingle.EventBus/Configuration/EventRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public EventRegistration(Type eventType)
/// <remarks>
/// This is backed by a <see cref="HashSet{T}"/> to ensure no duplicates.
/// </remarks>
public Dictionary<Type, EventConsumerRegistration> Consumers { get; } = new();
public HashSet<EventConsumerRegistration> Consumers { get; } = new();

/// <summary>
/// Gets a key/value collection that can be used to organize and share data across components
Expand All @@ -103,20 +103,21 @@ public EventRegistration(Type eventType)
/// <inheritdoc/>
public bool Equals(EventRegistration? other)
{
return other is not null && EqualityComparer<Type>.Default.Equals(EventType, other.EventType);
return other is not null &&
EqualityComparer<Type>.Default.Equals(EventType, other.EventType);
}

/// <inheritdoc/>
public override int GetHashCode() => EventType.GetHashCode();
public override int GetHashCode() => HashCode.Combine(EventType);

///
public static bool operator ==(EventRegistration left, EventRegistration right)
public static bool operator ==(EventRegistration? left, EventRegistration? right)
{
return EqualityComparer<EventRegistration>.Default.Equals(left, right);
return EqualityComparer<EventRegistration?>.Default.Equals(left, right);
}

///
public static bool operator !=(EventRegistration left, EventRegistration right) => !(left == right);
public static bool operator !=(EventRegistration? left, EventRegistration? right) => !(left == right);

#endregion
}
11 changes: 4 additions & 7 deletions src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,9 @@ public EventBusBuilder AddConsumer<TConsumer>(Action<EventRegistration, EventCon
// get or create a simple EventRegistration
var reg = options.Registrations.GetOrAdd(et, t => new EventRegistration(t));
// get or create a simple ConsumerRegistration
if (!reg.Consumers.TryGetValue(consumerType, out var ecr))
{
ecr = new EventConsumerRegistration(consumerType) { Deadletter = deadletter, };
reg.Consumers.Add(consumerType, ecr);
}
// create a simple ConsumerRegistration (HashSet removes duplicates)
var ecr = new EventConsumerRegistration(consumerType, deadletter);
reg.Consumers.Add(ecr);
// call the configuration function
configure?.Invoke(reg, ecr);
Expand Down Expand Up @@ -224,7 +221,7 @@ public EventBusBuilder RemoveConsumer<TConsumer>() where TConsumer : class, IEve
var ct = typeof(TConsumer);
foreach (var registration in options.Registrations.Values)
{
registration.Consumers.Remove(ct);
registration.Consumers.RemoveWhere(creg => creg.ConsumerType == ct);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ public ValidateOptionsResult Validate(string? name, EventBusOptions options)
// Ensure there are no consumers with the same name per event
foreach (var evr in registrations)
{
var conflict = evr.Consumers.Values.GroupBy(ecr => ecr.ConsumerName).FirstOrDefault(kvp => kvp.Count() > 1);
var conflict = evr.Consumers.GroupBy(ecr => (ecr.ConsumerName, ecr.Deadletter)).FirstOrDefault(kvp => kvp.Count() > 1);
if (conflict != null)
{
var names = conflict.Select(r => r.ConsumerType.FullName);
return ValidateOptionsResult.Fail($"The consumer name '{conflict.Key}' cannot be used more than once on '{evr.EventType.Name}'."
var id = conflict.Key.ConsumerName;
if (conflict.Key.Deadletter) id += " [dead-letter]";
return ValidateOptionsResult.Fail($"The consumer name '({id})' cannot be used more than once on '{evr.EventType.Name}'."
+ $" Types:\r\n- {string.Join("\r\n- ", names)}");
}
}
Expand Down
35 changes: 20 additions & 15 deletions src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,42 +134,47 @@ public ICollection<EventRegistration> GetRegistrations(string transportName)
}

/// <summary>
/// Get the consumer registration in a given event type.
/// Get the consumer registrations in a given event type.
/// </summary>
/// <typeparam name="TEvent">The event type from which to retrieve a <see cref="EventConsumerRegistration"/> for.</typeparam>
/// <typeparam name="TConsumer">The consumer to configure.</typeparam>
/// <param name="reg">
/// When this method returns, contains the event registration associated with the specified event type,
/// When this method returns, contains the event registrations associated with the specified event type,
/// if the event type is found; otherwise, <see langword="null"/> is returned.
/// This parameter is passed uninitialized.
/// </param>
/// <param name="ecr">
/// When this method returns, contains the consumer registration associated with the specified event type,
/// <param name="ecrs">
/// When this method returns, contains the consumer registrations associated with the specified event type,
/// if the event type is found; otherwise, <see langword="null"/> is returned.
/// This parameter is passed uninitialized.
/// </param>
/// <returns><see langword="true" /> if there's a consumer registered for the given event type; otherwise, false.</returns>
internal bool TryGetConsumerRegistration<TEvent, TConsumer>([NotNullWhen(true)] out EventRegistration? reg,
[NotNullWhen(true)] out EventConsumerRegistration? ecr)
internal bool TryGetConsumerRegistrations<TEvent, TConsumer>([NotNullWhen(true)] out EventRegistration? reg,
[NotNullWhen(true)] out List<EventConsumerRegistration>? ecrs)
{
ecr = default;
return Registrations.TryGetValue(typeof(TEvent), out reg) && reg.Consumers.TryGetValue(typeof(TConsumer), out ecr);
ecrs = default;
if (Registrations.TryGetValue(typeof(TEvent), out reg))
{
ecrs = reg.Consumers.Where(r => r.ConsumerType == typeof(TConsumer)).ToList();
return false;
}
return false;
}

/// <summary>
/// Get the consumer registration in a given event type.
/// Get the consumer registrations in a given event type.
/// </summary>
/// <typeparam name="TEvent">The event type from which to retrieve a <see cref="EventConsumerRegistration"/> for.</typeparam>
/// <typeparam name="TConsumer">The consumer to configure.</typeparam>
/// <param name="registration">
/// When this method returns, contains the consumer registration associated with the specified event type,
/// <param name="registrations">
/// When this method returns, contains the consumer registrations associated with the specified event type,
/// if the event type is found; otherwise, <see langword="null"/> is returned.
/// This parameter is passed uninitialized.
/// </param>
/// <returns><see langword="true" /> if there's a consumer registered for the given event type; otherwise, false.</returns>
public bool TryGetConsumerRegistration<TEvent, TConsumer>([NotNullWhen(true)] out EventConsumerRegistration? registration)
public bool TryGetConsumerRegistrations<TEvent, TConsumer>([NotNullWhen(true)] out List<EventConsumerRegistration>? registrations)
{
return TryGetConsumerRegistration<TEvent, TConsumer>(out _, out registration);
return TryGetConsumerRegistrations<TEvent, TConsumer>(out _, out registrations);
}

/// <summary>
Expand Down Expand Up @@ -202,9 +207,9 @@ public EventBusOptions ConfigureConsumer<TEvent, TConsumer>(Action<EventRegistra
{
if (configure is null) throw new ArgumentNullException(nameof(configure));

if (TryGetConsumerRegistration<TEvent, TConsumer>(out var reg, out var ecr) && ecr is not null)
if (TryGetConsumerRegistrations<TEvent, TConsumer>(out var reg, out var ecrs) && ecrs is not null)
{
configure(reg, ecr);
foreach (var ecr in ecrs) configure(reg, ecr);
}

return this;
Expand Down
2 changes: 1 addition & 1 deletion src/Tingle.EventBus/Transports/EventBusTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
// Combine the retry policies
PollyHelper.CombineIfNeeded(BusOptions, Options, reg);

foreach (var ecr in reg.Consumers.Values)
foreach (var ecr in reg.Consumers)
{
// Set unhandled error behaviour
ecr.UnhandledErrorBehaviour ??= Options.DefaultUnhandledConsumerErrorBehaviour;
Expand Down
Loading

0 comments on commit 6872499

Please sign in to comment.