diff --git a/src/modules/Elsa.ProtoActor.Core/Features/ProtoActorFeature.cs b/src/modules/Elsa.ProtoActor.Core/Features/ProtoActorFeature.cs index e8af6529d1..a4231af8d8 100644 --- a/src/modules/Elsa.ProtoActor.Core/Features/ProtoActorFeature.cs +++ b/src/modules/Elsa.ProtoActor.Core/Features/ProtoActorFeature.cs @@ -1,7 +1,7 @@ -using Elsa.Common.Multitenancy; using Elsa.Features.Abstractions; using Elsa.Features.Services; using Elsa.ProtoActor.HostedServices; +using Elsa.ProtoActor.Middleware; using Elsa.Workflows.Runtime.ProtoActor.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -22,7 +22,6 @@ namespace Elsa.ProtoActor.Features; /// public class ProtoActorFeature(IModule module) : FeatureBase(module) { - private const string TenantHeaderName = "TenantId"; private LogLevel _diagnosticsLogLevel = LogLevel.Information; private bool _enableMetrics; private bool _enableTracing; @@ -170,27 +169,7 @@ private ClusterConfig AddVirtualActors(IServiceProvider sp, ActorSystem system, if (_enableTracing) kind = kind.WithProps(props => props.WithTracing()); - kind = kind.WithProps(props => - { - props = props.WithReceiverMiddleware(next => async (context, envelope) => - { - var tenantId = envelope.Header.GetValueOrDefault(TenantHeaderName); - if (tenantId != null) - { - var tenantContextInitializer = sp.GetRequiredService(); - await tenantContextInitializer.InitializeAsync(tenantId); - } - - await next(context, envelope); - }).WithSenderMiddleware(next => async (context, target, envelope) => - { - var tenantAccessor = sp.GetRequiredService(); - if (tenantAccessor.Tenant != null) envelope.WithHeader(TenantHeaderName, tenantAccessor.Tenant.Id); - await next(context, target, envelope); - }); - - return props; - }); + kind = kind.WithProps(props => props.WithMultitenancy(sp)); clusterConfig = clusterConfig.WithClusterKind(kind); } diff --git a/src/modules/Elsa.ProtoActor.Core/HeaderNames.cs b/src/modules/Elsa.ProtoActor.Core/HeaderNames.cs new file mode 100644 index 0000000000..d719abaf34 --- /dev/null +++ b/src/modules/Elsa.ProtoActor.Core/HeaderNames.cs @@ -0,0 +1,6 @@ +namespace Elsa.ProtoActor; + +public static class HeaderNames +{ + public const string TenantId = "TenantId"; +} \ No newline at end of file diff --git a/src/modules/Elsa.ProtoActor.Core/Middleware/TenantScopeMiddleware.cs b/src/modules/Elsa.ProtoActor.Core/Middleware/TenantScopeMiddleware.cs new file mode 100644 index 0000000000..1870472121 --- /dev/null +++ b/src/modules/Elsa.ProtoActor.Core/Middleware/TenantScopeMiddleware.cs @@ -0,0 +1,53 @@ +using Elsa.Common.Multitenancy; +using Microsoft.Extensions.DependencyInjection; +using Proto; +using Proto.DependencyInjection; + +namespace Elsa.ProtoActor.Middleware; + +public static class TenantScopeMiddleware +{ + public static Props WithMultitenancy(this Props props, IServiceProvider sp) + { + props = props.WithReceiverMiddleware(next => ReadTenant(next, sp)); + props = props.WithSenderMiddleware(next => PropagateTenant(next, sp)); + return props; + } + + public static Receiver ReadTenant(this Receiver next, IServiceProvider sp) + { + async Task Receiver(IReceiverContext context, MessageEnvelope envelope) + { + var tenantId = envelope.Header.GetValueOrDefault(HeaderNames.TenantId); + if (tenantId != null) + { + var tenantFinder = sp.GetRequiredService(); + var tenant = await tenantFinder.FindByIdAsync(tenantId); + var tenantScopeFactory = sp.GetRequiredService(); + await using var tenantScope = tenantScopeFactory.CreateScope(tenant); + var originalServiceProvider = sp; + context.System.WithServiceProvider(tenantScope.ServiceProvider); + await next(context, envelope); + context.System.WithServiceProvider(originalServiceProvider); + } + else + { + await next(context, envelope); + } + } + + return Receiver; + } + + public static Sender PropagateTenant(this Sender next, IServiceProvider sp) + { + async Task Sender(ISenderContext context, PID target, MessageEnvelope envelope) + { + var tenantAccessor = sp.GetRequiredService(); + if (tenantAccessor.Tenant != null) envelope.WithHeader(HeaderNames.TenantId, tenantAccessor.Tenant.Id); + await next(context, target, envelope); + } + + return Sender; + } +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime.ProtoActor/Services/ProtoActorWorkflowClient.cs b/src/modules/Elsa.Workflows.Runtime.ProtoActor/Services/ProtoActorWorkflowClient.cs index 8b71896e76..7576d136f9 100644 --- a/src/modules/Elsa.Workflows.Runtime.ProtoActor/Services/ProtoActorWorkflowClient.cs +++ b/src/modules/Elsa.Workflows.Runtime.ProtoActor/Services/ProtoActorWorkflowClient.cs @@ -1,5 +1,6 @@ using Elsa.Common.Multitenancy; using Elsa.Extensions; +using Elsa.ProtoActor; using Elsa.Workflows.Runtime.ProtoActor.ProtoBuf; using Elsa.Workflows.State; using JetBrains.Annotations; @@ -87,7 +88,7 @@ public async Task ImportStateAsync(WorkflowState workflowState, CancellationToke private IDictionary CreateHeaders() { var headers = new Dictionary(); - if (_tenantAccessor.Tenant != null) headers["TenantId"] = _tenantAccessor.Tenant.Id; + if (_tenantAccessor.Tenant != null) headers[HeaderNames.TenantId] = _tenantAccessor.Tenant.Id; return headers; } } \ No newline at end of file