Skip to content

Commit

Permalink
Fix Tenant ID propagation for Proto.Actor runtime (#6142)
Browse files Browse the repository at this point in the history
* Fix Tenant ID propagation for Proto.Actor runtime

* Rename Constants to HeaderNames
  • Loading branch information
sfmskywalker authored Nov 24, 2024
1 parent 95a2a33 commit b4bbf25
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 24 deletions.
25 changes: 2 additions & 23 deletions src/modules/Elsa.ProtoActor.Core/Features/ProtoActorFeature.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Elsa.Common.Multitenancy;
using Elsa.Features.Abstractions;
using Elsa.Features.Services;
using Elsa.ProtoActor.HostedServices;
using Elsa.ProtoActor.Middleware;
using Elsa.Workflows.Runtime.ProtoActor.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand All @@ -22,7 +22,6 @@ namespace Elsa.ProtoActor.Features;
/// </summary>
public class ProtoActorFeature(IModule module) : FeatureBase(module)
{
private const string TenantHeaderName = "TenantId";
private LogLevel _diagnosticsLogLevel = LogLevel.Information;
private bool _enableMetrics;
private bool _enableTracing;
Expand Down Expand Up @@ -170,27 +169,7 @@ private ClusterConfig AddVirtualActors(IServiceProvider sp, ActorSystem system,
if (_enableTracing)
kind = kind.WithProps(props => props.WithTracing());

kind = kind.WithProps(props =>
{
props = props.WithReceiverMiddleware(next => async (context, envelope) =>
{
var tenantId = envelope.Header.GetValueOrDefault(TenantHeaderName);
if (tenantId != null)
{
var tenantContextInitializer = sp.GetRequiredService<ITenantContextInitializer>();
await tenantContextInitializer.InitializeAsync(tenantId);
}
await next(context, envelope);
}).WithSenderMiddleware(next => async (context, target, envelope) =>
{
var tenantAccessor = sp.GetRequiredService<ITenantAccessor>();
if (tenantAccessor.Tenant != null) envelope.WithHeader(TenantHeaderName, tenantAccessor.Tenant.Id);
await next(context, target, envelope);
});
return props;
});
kind = kind.WithProps(props => props.WithMultitenancy(sp));
clusterConfig = clusterConfig.WithClusterKind(kind);
}

Expand Down
6 changes: 6 additions & 0 deletions src/modules/Elsa.ProtoActor.Core/HeaderNames.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Elsa.ProtoActor;

public static class HeaderNames
{
public const string TenantId = "TenantId";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using Elsa.Common.Multitenancy;
using Microsoft.Extensions.DependencyInjection;
using Proto;
using Proto.DependencyInjection;

namespace Elsa.ProtoActor.Middleware;

public static class TenantScopeMiddleware
{
public static Props WithMultitenancy(this Props props, IServiceProvider sp)
{
props = props.WithReceiverMiddleware(next => ReadTenant(next, sp));
props = props.WithSenderMiddleware(next => PropagateTenant(next, sp));
return props;
}

public static Receiver ReadTenant(this Receiver next, IServiceProvider sp)
{
async Task Receiver(IReceiverContext context, MessageEnvelope envelope)
{
var tenantId = envelope.Header.GetValueOrDefault(HeaderNames.TenantId);
if (tenantId != null)
{
var tenantFinder = sp.GetRequiredService<ITenantFinder>();
var tenant = await tenantFinder.FindByIdAsync(tenantId);
var tenantScopeFactory = sp.GetRequiredService<ITenantScopeFactory>();
await using var tenantScope = tenantScopeFactory.CreateScope(tenant);
var originalServiceProvider = sp;
context.System.WithServiceProvider(tenantScope.ServiceProvider);
await next(context, envelope);
context.System.WithServiceProvider(originalServiceProvider);
}
else
{
await next(context, envelope);
}
}

return Receiver;
}

public static Sender PropagateTenant(this Sender next, IServiceProvider sp)
{
async Task Sender(ISenderContext context, PID target, MessageEnvelope envelope)
{
var tenantAccessor = sp.GetRequiredService<ITenantAccessor>();
if (tenantAccessor.Tenant != null) envelope.WithHeader(HeaderNames.TenantId, tenantAccessor.Tenant.Id);
await next(context, target, envelope);
}

return Sender;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Elsa.Common.Multitenancy;
using Elsa.Extensions;
using Elsa.ProtoActor;
using Elsa.Workflows.Runtime.ProtoActor.ProtoBuf;
using Elsa.Workflows.State;
using JetBrains.Annotations;
Expand Down Expand Up @@ -87,7 +88,7 @@ public async Task ImportStateAsync(WorkflowState workflowState, CancellationToke
private IDictionary<string, string> CreateHeaders()
{
var headers = new Dictionary<string, string>();
if (_tenantAccessor.Tenant != null) headers["TenantId"] = _tenantAccessor.Tenant.Id;
if (_tenantAccessor.Tenant != null) headers[HeaderNames.TenantId] = _tenantAccessor.Tenant.Id;
return headers;
}
}

0 comments on commit b4bbf25

Please sign in to comment.