Skip to content

Commit

Permalink
Fix sharing of processors for Azure transports that messes up deadlet…
Browse files Browse the repository at this point in the history
…ters (#529)
  • Loading branch information
mburumaxwell authored Jul 6, 2023
1 parent 6872499 commit 025d144
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ Task<EventHubProducerClient> creator((Type, bool) _, CancellationToken ct)
private Task<EventProcessorClient> GetProcessorAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken)
{
var name = reg.EventName;
if (ecr.Deadletter) name += Options.DeadLetterSuffix;
var deadletter = ecr.Deadletter;
if (deadletter) name += Options.DeadLetterSuffix;

// For events configured as sourced from IoT Hub,
// 1. The event hub name is in the metadata
Expand Down Expand Up @@ -360,7 +361,7 @@ async Task<EventProcessorClient> creator(string key, CancellationToken ct)
return processor;
}

var key = $"{eventHubName}/{consumerGroup}";
var key = $"{eventHubName}/{consumerGroup}/{deadletter}";
return processorsCache.GetOrAddAsync(key, creator, cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ private Task<ServiceBusProcessor> GetProcessorAsync(EventRegistration reg, Event
{
var topicName = reg.EventName!;
var subscriptionName = ecr.ConsumerName!;
var deadletter = ecr.Deadletter;

async Task<ServiceBusProcessor> creator(string key, CancellationToken ct)
{
Expand All @@ -343,7 +344,7 @@ async Task<ServiceBusProcessor> creator(string key, CancellationToken ct)
PrefetchCount = Options.DefaultPrefetchCount,

// Set the sub-queue to be used
SubQueue = ecr.Deadletter ? SubQueue.DeadLetter : SubQueue.None,
SubQueue = deadletter ? SubQueue.DeadLetter : SubQueue.None,
};

// Allow for the defaults to be overridden
Expand Down Expand Up @@ -376,7 +377,7 @@ await CreateSubscriptionIfNotExistsAsync(ecr: ecr,
}
}

var key = $"{topicName}/{subscriptionName}";
var key = $"{topicName}/{subscriptionName}/{deadletter}";
return processorsCache.GetOrAddAsync(key, creator, cancellationToken);
}

Expand Down
7 changes: 4 additions & 3 deletions src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,9 @@ private Task<InMemoryProcessor> GetProcessorAsync(EventRegistration reg, EventCo
{
var topicName = reg.EventName!;
var subscriptionName = ecr.ConsumerName!;
var deadletter = ecr.Deadletter;

var key = $"{topicName}/{subscriptionName}";
var key = $"{topicName}/{subscriptionName}/{deadletter}";
Task<InMemoryProcessor> creator(string _, CancellationToken ct)
{
// Create the processor options
Expand All @@ -307,13 +308,13 @@ Task<InMemoryProcessor> creator(string _, CancellationToken ct)
{
// Create the processor for the Queue
Logger.CreatingQueueProcessor(queueName: topicName);
processor = inMemoryClient.CreateProcessor(queueName: topicName, options: inpo);
processor = inMemoryClient.CreateProcessor(queueName: topicName, options: inpo); // TODO: support deadletter
}
else
{
// Create the processor for the Subscription
Logger.CreatingSubscriptionProcessor(topicName: topicName, subscriptionName: subscriptionName);
processor = inMemoryClient.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: inpo);
processor = inMemoryClient.CreateProcessor(topicName: topicName, subscriptionName: subscriptionName, options: inpo); // TODO: support deadletter
}

return Task.FromResult(processor);
Expand Down

0 comments on commit 025d144

Please sign in to comment.