Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Tenant ID propagation for MassTransit #6144

Merged
merged 2 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/modules/Elsa.Common/Multitenancy/Models/TenantScope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,20 @@ namespace Elsa.Common.Multitenancy;
/// </summary>
public class TenantScope : IAsyncDisposable
{
private readonly IServiceScope _serviceScope;

public TenantScope(IServiceScope serviceScope, ITenantAccessor tenantAccessor, Tenant? tenant)
{
_serviceScope = serviceScope;
ServiceScope = serviceScope;
tenantAccessor.Tenant = tenant;
}

public IServiceProvider ServiceProvider => _serviceScope.ServiceProvider;
public IServiceScope ServiceScope { get; }
public IServiceProvider ServiceProvider => ServiceScope.ServiceProvider;

public async ValueTask DisposeAsync()
{
if (_serviceScope is IAsyncDisposable asyncDisposable)
if (ServiceScope is IAsyncDisposable asyncDisposable)
await asyncDisposable.DisposeAsync();
else
_serviceScope.Dispose();
ServiceScope.Dispose();
}
}
28 changes: 10 additions & 18 deletions src/modules/Elsa.MassTransit/Consumers/WorkflowMessageConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,34 @@
using Elsa.MassTransit.Activities;
using Elsa.Workflows.Helpers;
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Requests;
using MassTransit;

namespace Elsa.MassTransit.Consumers;

/// <summary>
/// A consumer of various dispatch message types to asynchronously execute workflows.
/// A consumer of various message types to trigger activities derived from these messages.
/// </summary>
public class WorkflowMessageConsumer<T> : IConsumer<T> where T : class
public class WorkflowMessageConsumer<T>(IStimulusSender stimulusSender) : IConsumer<T>
where T : class
{
private readonly IWorkflowDispatcher _workflowRuntime;

/// <summary>
/// Constructor.
/// </summary>
public WorkflowMessageConsumer(IWorkflowDispatcher workflowRuntime)
{
_workflowRuntime = workflowRuntime;
}

/// <inheritdoc />
public async Task Consume(ConsumeContext<T> context)
{
var cancellationToken = context.CancellationToken;
var messageType = typeof(T);
var message = context.Message;
var activityTypeName = ActivityTypeNameHelper.GenerateTypeName(messageType);
var bookmark = new MessageReceivedBookmarkPayload(messageType);
var stimulus = new MessageReceivedBookmarkPayload(messageType);
var correlationId = context.CorrelationId?.ToString();
var input = new Dictionary<string, object> { [MessageReceived.InputKey] = message };
var request = new DispatchTriggerWorkflowsRequest(activityTypeName, bookmark)
var input = new Dictionary<string, object>
{
[MessageReceived.InputKey] = message
};
var stimulusMetadata = new StimulusMetadata
{
CorrelationId = correlationId,
Input = input
};
await _workflowRuntime.DispatchAsync(request, cancellationToken: cancellationToken);
await stimulusSender.SendAsync(activityTypeName, stimulus, stimulusMetadata, cancellationToken);
}
}
18 changes: 13 additions & 5 deletions src/modules/Elsa.MassTransit/Middleware/TenantConsumeMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using Elsa.Common.Multitenancy;
using MassTransit;
using MassTransit.DependencyInjection;

namespace Elsa.MassTransit.Middleware;

public class TenantConsumeMiddleware<T>(ITenantContextInitializer tenantContextInitializer) : IFilter<ConsumeContext<T>> where T : class
public class TenantConsumeMiddleware<T>(ITenantFinder tenantFinder, ITenantScopeFactory tenantScopeFactory, Bind<IBus, ISetScopedConsumeContext> scopeSetter) : IFilter<ConsumeContext<T>> where T : class
{
public void Probe(ProbeContext context)
{
Expand All @@ -12,9 +13,16 @@ public void Probe(ProbeContext context)

public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
if (context.Headers.TryGetHeader(HeaderNames.TenantId, out var tenantId) && tenantId is string tenantIdString)
await tenantContextInitializer.InitializeAsync(tenantIdString, context.CancellationToken);

await next.Send(context);
if (context.Headers.TryGetHeader(HeaderNames.TenantId, out var tenantId) && tenantId is string tenantIdString)
{
var tenant = await tenantFinder.FindByIdAsync(tenantIdString);
await using var tenantScope = tenantScopeFactory.CreateScope(tenant);
using var scope = scopeSetter.Value.PushContext(tenantScope.ServiceScope, context);
await next.Send(context);
}
else
{
await next.Send(context);
}
}
}
Loading