From afdabfd66b437c09a7b5192da25dbfc0dad25642 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sun, 24 Nov 2024 23:20:30 +0100 Subject: [PATCH 1/2] Refactor tenant middleware and consumer implementations Update `TenantConsumeMiddleware` to utilize `ITenantFinder` and `ITenantScopeFactory` for tenant context management. Refactor `WorkflowMessageConsumer` to use constructor parameters directly for improved dependency injection. Additionally, replace private scope variable in `TenantScope` with a public property to enhance code clarity. --- .../Multitenancy/Models/TenantScope.cs | 11 +++++----- .../Consumers/WorkflowMessageConsumer.cs | 21 +++++++------------ .../Middleware/TenantConsumeMiddleware.cs | 18 +++++++++++----- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/modules/Elsa.Common/Multitenancy/Models/TenantScope.cs b/src/modules/Elsa.Common/Multitenancy/Models/TenantScope.cs index 89baef16d5..9c937f7341 100644 --- a/src/modules/Elsa.Common/Multitenancy/Models/TenantScope.cs +++ b/src/modules/Elsa.Common/Multitenancy/Models/TenantScope.cs @@ -8,21 +8,20 @@ namespace Elsa.Common.Multitenancy; /// public class TenantScope : IAsyncDisposable { - private readonly IServiceScope _serviceScope; - public TenantScope(IServiceScope serviceScope, ITenantAccessor tenantAccessor, Tenant? tenant) { - _serviceScope = serviceScope; + ServiceScope = serviceScope; tenantAccessor.Tenant = tenant; } - public IServiceProvider ServiceProvider => _serviceScope.ServiceProvider; + public IServiceScope ServiceScope { get; } + public IServiceProvider ServiceProvider => ServiceScope.ServiceProvider; public async ValueTask DisposeAsync() { - if (_serviceScope is IAsyncDisposable asyncDisposable) + if (ServiceScope is IAsyncDisposable asyncDisposable) await asyncDisposable.DisposeAsync(); else - _serviceScope.Dispose(); + ServiceScope.Dispose(); } } \ No newline at end of file diff --git a/src/modules/Elsa.MassTransit/Consumers/WorkflowMessageConsumer.cs b/src/modules/Elsa.MassTransit/Consumers/WorkflowMessageConsumer.cs index b3d9ad08eb..65cf978a6c 100644 --- a/src/modules/Elsa.MassTransit/Consumers/WorkflowMessageConsumer.cs +++ b/src/modules/Elsa.MassTransit/Consumers/WorkflowMessageConsumer.cs @@ -1,3 +1,4 @@ +using Elsa.Common.Multitenancy; using Elsa.MassTransit.Activities; using Elsa.Workflows.Helpers; using Elsa.Workflows.Runtime; @@ -10,18 +11,9 @@ namespace Elsa.MassTransit.Consumers; /// /// A consumer of various dispatch message types to asynchronously execute workflows. /// -public class WorkflowMessageConsumer : IConsumer where T : class +public class WorkflowMessageConsumer(IWorkflowDispatcher workflowRuntime) : IConsumer + where T : class { - private readonly IWorkflowDispatcher _workflowRuntime; - - /// - /// Constructor. - /// - public WorkflowMessageConsumer(IWorkflowDispatcher workflowRuntime) - { - _workflowRuntime = workflowRuntime; - } - /// public async Task Consume(ConsumeContext context) { @@ -31,12 +23,15 @@ public async Task Consume(ConsumeContext context) var activityTypeName = ActivityTypeNameHelper.GenerateTypeName(messageType); var bookmark = new MessageReceivedBookmarkPayload(messageType); var correlationId = context.CorrelationId?.ToString(); - var input = new Dictionary { [MessageReceived.InputKey] = message }; + var input = new Dictionary + { + [MessageReceived.InputKey] = message + }; var request = new DispatchTriggerWorkflowsRequest(activityTypeName, bookmark) { CorrelationId = correlationId, Input = input }; - await _workflowRuntime.DispatchAsync(request, cancellationToken: cancellationToken); + await workflowRuntime.DispatchAsync(request, cancellationToken: cancellationToken); } } \ No newline at end of file diff --git a/src/modules/Elsa.MassTransit/Middleware/TenantConsumeMiddleware.cs b/src/modules/Elsa.MassTransit/Middleware/TenantConsumeMiddleware.cs index e9a33b5c46..f99e021057 100644 --- a/src/modules/Elsa.MassTransit/Middleware/TenantConsumeMiddleware.cs +++ b/src/modules/Elsa.MassTransit/Middleware/TenantConsumeMiddleware.cs @@ -1,9 +1,10 @@ using Elsa.Common.Multitenancy; using MassTransit; +using MassTransit.DependencyInjection; namespace Elsa.MassTransit.Middleware; -public class TenantConsumeMiddleware(ITenantContextInitializer tenantContextInitializer) : IFilter> where T : class +public class TenantConsumeMiddleware(ITenantFinder tenantFinder, ITenantScopeFactory tenantScopeFactory, Bind scopeSetter) : IFilter> where T : class { public void Probe(ProbeContext context) { @@ -12,9 +13,16 @@ public void Probe(ProbeContext context) public async Task Send(ConsumeContext context, IPipe> next) { - if (context.Headers.TryGetHeader(HeaderNames.TenantId, out var tenantId) && tenantId is string tenantIdString) - await tenantContextInitializer.InitializeAsync(tenantIdString, context.CancellationToken); - - await next.Send(context); + if (context.Headers.TryGetHeader(HeaderNames.TenantId, out var tenantId) && tenantId is string tenantIdString) + { + var tenant = await tenantFinder.FindByIdAsync(tenantIdString); + await using var tenantScope = tenantScopeFactory.CreateScope(tenant); + using var scope = scopeSetter.Value.PushContext(tenantScope.ServiceScope, context); + await next.Send(context); + } + else + { + await next.Send(context); + } } } \ No newline at end of file From bbf9398475ecc6f15f70b23638b99f0502248235 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sun, 24 Nov 2024 23:26:08 +0100 Subject: [PATCH 2/2] Refactor to replace IWorkflowDispatcher with IStimulusSender Updated WorkflowMessageConsumer to utilize IStimulusSender for handling message-triggered activities. Simplified dependencies and method calls to align with the new interface, improving the code's clarity and maintainability. --- .../Consumers/WorkflowMessageConsumer.cs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/modules/Elsa.MassTransit/Consumers/WorkflowMessageConsumer.cs b/src/modules/Elsa.MassTransit/Consumers/WorkflowMessageConsumer.cs index 65cf978a6c..2f41ff4274 100644 --- a/src/modules/Elsa.MassTransit/Consumers/WorkflowMessageConsumer.cs +++ b/src/modules/Elsa.MassTransit/Consumers/WorkflowMessageConsumer.cs @@ -1,17 +1,14 @@ -using Elsa.Common.Multitenancy; using Elsa.MassTransit.Activities; using Elsa.Workflows.Helpers; using Elsa.Workflows.Runtime; -using Elsa.Workflows.Runtime.Contracts; -using Elsa.Workflows.Runtime.Requests; using MassTransit; namespace Elsa.MassTransit.Consumers; /// -/// A consumer of various dispatch message types to asynchronously execute workflows. +/// A consumer of various message types to trigger activities derived from these messages. /// -public class WorkflowMessageConsumer(IWorkflowDispatcher workflowRuntime) : IConsumer +public class WorkflowMessageConsumer(IStimulusSender stimulusSender) : IConsumer where T : class { /// @@ -21,17 +18,17 @@ public async Task Consume(ConsumeContext context) var messageType = typeof(T); var message = context.Message; var activityTypeName = ActivityTypeNameHelper.GenerateTypeName(messageType); - var bookmark = new MessageReceivedBookmarkPayload(messageType); + var stimulus = new MessageReceivedBookmarkPayload(messageType); var correlationId = context.CorrelationId?.ToString(); var input = new Dictionary { [MessageReceived.InputKey] = message }; - var request = new DispatchTriggerWorkflowsRequest(activityTypeName, bookmark) + var stimulusMetadata = new StimulusMetadata { CorrelationId = correlationId, Input = input }; - await workflowRuntime.DispatchAsync(request, cancellationToken: cancellationToken); + await stimulusSender.SendAsync(activityTypeName, stimulus, stimulusMetadata, cancellationToken); } } \ No newline at end of file