From a2f76b8ebae82f43b89248cb65aff36f6fe54b7a Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Tue, 26 Nov 2024 15:59:25 +0100 Subject: [PATCH] Add output serialization and deserialization support (#6152) Introduced `Output` message and related data serialization/deserialization methods in `ProtoOutputExtensions`. Updated `WorkflowExecutionResult` and impacted methods to include `Output` handling. Enhanced workflow runtime logic to manage and integrate output data. --- .../Extensions/ProtoOutputExtensions.cs | 15 +++++++++++++++ .../Mappers/WorkflowExecutionResultMapper.cs | 4 +++- .../Elsa.ProtoActor/Proto/Shared.proto | 4 ++++ .../Proto/WorkflowInstance.Messages.proto | 5 +++-- .../Activities/ExecuteWorkflow.cs | 19 ++++++++++--------- .../Results/WorkflowExecutionResult.cs | 9 ++++++++- .../Services/DefaultWorkflowRuntime.cs | 15 +++++++++------ 7 files changed, 52 insertions(+), 19 deletions(-) create mode 100644 src/modules/Elsa.ProtoActor/Extensions/ProtoOutputExtensions.cs diff --git a/src/modules/Elsa.ProtoActor/Extensions/ProtoOutputExtensions.cs b/src/modules/Elsa.ProtoActor/Extensions/ProtoOutputExtensions.cs new file mode 100644 index 0000000000..eb0775cd32 --- /dev/null +++ b/src/modules/Elsa.ProtoActor/Extensions/ProtoOutputExtensions.cs @@ -0,0 +1,15 @@ +using Elsa.ProtoActor.ProtoBuf; + +namespace Elsa.ProtoActor.Extensions; + +internal static class ProtoOutputExtensions +{ + public static IDictionary Deserialize(this Output output) => output.Data.Deserialize(); + + public static Output SerializeOutput(this IDictionary output) + { + var result = new Output(); + output.Serialize(result.Data); + return result; + } +} \ No newline at end of file diff --git a/src/modules/Elsa.ProtoActor/Mappers/WorkflowExecutionResultMapper.cs b/src/modules/Elsa.ProtoActor/Mappers/WorkflowExecutionResultMapper.cs index a2d6423cc1..569d9938dd 100644 --- a/src/modules/Elsa.ProtoActor/Mappers/WorkflowExecutionResultMapper.cs +++ b/src/modules/Elsa.ProtoActor/Mappers/WorkflowExecutionResultMapper.cs @@ -42,7 +42,8 @@ public WorkflowExecutionResult Map(ProtoWorkflowExecutionResponse source) _workflowSubStatusMapper.Map(source.SubStatus), _bookmarkMapper.Map(source.Bookmarks).ToList(), _activityIncidentStateMapper.Map(source.Incidents).ToList(), - source.TriggeredActivityId.NullIfEmpty() + source.TriggeredActivityId.NullIfEmpty(), + source.Output.Deserialize() ); } @@ -61,6 +62,7 @@ public ProtoWorkflowExecutionResponse Map(WorkflowExecutionResult source) Bookmarks = { _bookmarkMapper.Map(source.Bookmarks) }, Incidents = { _activityIncidentStateMapper.Map(source.Incidents).ToList() }, TriggeredActivityId = source.TriggeredActivityId, + Output = source.Output.SerializeOutput() }; } } \ No newline at end of file diff --git a/src/modules/Elsa.ProtoActor/Proto/Shared.proto b/src/modules/Elsa.ProtoActor/Proto/Shared.proto index 9d7d5d6d9c..ef4d1a64f2 100644 --- a/src/modules/Elsa.ProtoActor/Proto/Shared.proto +++ b/src/modules/Elsa.ProtoActor/Proto/Shared.proto @@ -14,6 +14,10 @@ message Input { map Data = 1; } +message Output { + map Data = 1; +} + message Properties { map Data = 1; } \ No newline at end of file diff --git a/src/modules/Elsa.ProtoActor/Proto/WorkflowInstance.Messages.proto b/src/modules/Elsa.ProtoActor/Proto/WorkflowInstance.Messages.proto index bbfc099202..177594a4ed 100644 --- a/src/modules/Elsa.ProtoActor/Proto/WorkflowInstance.Messages.proto +++ b/src/modules/Elsa.ProtoActor/Proto/WorkflowInstance.Messages.proto @@ -15,8 +15,8 @@ message StartWorkflowRequest { string InstanceId = 2; string VersionOptions = 3; optional string CorrelationId = 4; - optional Input input = 5; - optional Properties properties = 6; + optional Input Input = 5; + optional Properties Properties = 6; optional string TriggerActivityId = 7; optional bool IsExistingInstance = 8; } @@ -29,6 +29,7 @@ message WorkflowExecutionResponse { repeated Bookmark Bookmarks = 5; repeated ActivityIncident Incidents = 6; optional string TriggeredActivityId = 7; + optional Output Output = 8; } message ActivityIncident { diff --git a/src/modules/Elsa.Workflows.Runtime/Activities/ExecuteWorkflow.cs b/src/modules/Elsa.Workflows.Runtime/Activities/ExecuteWorkflow.cs index 263cc1e646..661a64e1d0 100644 --- a/src/modules/Elsa.Workflows.Runtime/Activities/ExecuteWorkflow.cs +++ b/src/modules/Elsa.Workflows.Runtime/Activities/ExecuteWorkflow.cs @@ -5,7 +5,8 @@ using Elsa.Workflows.Contracts; using Elsa.Workflows.Management; using Elsa.Workflows.Models; -using Elsa.Workflows.Options; +using Elsa.Workflows.Runtime.Contracts; +using Elsa.Workflows.Runtime.Parameters; using Elsa.Workflows.UIHints; using JetBrains.Annotations; @@ -61,7 +62,7 @@ private async ValueTask ExecuteWorkflowAsync(ActivityExec var workflowDefinitionId = WorkflowDefinitionId.Get(context); var input = Input.GetOrDefault(context) ?? new Dictionary(); var correlationId = CorrelationId.GetOrDefault(context); - var workflowInvoker = context.GetRequiredService(); + var workflowRuntime = context.GetRequiredService(); var identityGenerator = context.GetRequiredService(); var workflowDefinitionService = context.GetRequiredService(); var workflowGraph = await workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionId, VersionOptions.Published, context.CancellationToken); @@ -69,21 +70,21 @@ private async ValueTask ExecuteWorkflowAsync(ActivityExec if (workflowGraph == null) throw new Exception($"No published version of workflow definition with ID {workflowDefinitionId} found."); - var options = new RunWorkflowOptions + var options = new StartWorkflowRuntimeParams { ParentWorkflowInstanceId = context.WorkflowExecutionContext.Id, Input = input, CorrelationId = correlationId, - WorkflowInstanceId = identityGenerator.GenerateId() + InstanceId = identityGenerator.GenerateId() }; - var workflowResult = await workflowInvoker.RunAsync(workflowGraph, options, context.CancellationToken); + var workflowResult = await workflowRuntime.StartWorkflowAsync(workflowDefinitionId, options); var info = new ExecuteWorkflowResult { - WorkflowInstanceId = options.WorkflowInstanceId, - Status = workflowResult.WorkflowState.Status, - SubStatus = workflowResult.WorkflowState.SubStatus, - Output = workflowResult.WorkflowState.Output + WorkflowInstanceId = workflowResult.WorkflowInstanceId, + Status = workflowResult.Status, + SubStatus = workflowResult.SubStatus, + Output = workflowResult.Output }; return info; diff --git a/src/modules/Elsa.Workflows.Runtime/Results/WorkflowExecutionResult.cs b/src/modules/Elsa.Workflows.Runtime/Results/WorkflowExecutionResult.cs index 298c7357bb..d4c5832e12 100644 --- a/src/modules/Elsa.Workflows.Runtime/Results/WorkflowExecutionResult.cs +++ b/src/modules/Elsa.Workflows.Runtime/Results/WorkflowExecutionResult.cs @@ -2,4 +2,11 @@ namespace Elsa.Workflows.Runtime.Results; -public record WorkflowExecutionResult(string WorkflowInstanceId, WorkflowStatus Status, WorkflowSubStatus SubStatus, ICollection Bookmarks, ICollection Incidents, string? TriggeredActivityId = null); \ No newline at end of file +public record WorkflowExecutionResult( + string WorkflowInstanceId, + WorkflowStatus Status, + WorkflowSubStatus SubStatus, + ICollection Bookmarks, + ICollection Incidents, + string? TriggeredActivityId, + IDictionary Output); \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs b/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs index 2f47861387..2dd43dd0d3 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs @@ -168,7 +168,7 @@ public async Task> StartWorkflowsAsync(stri await workflowHost.ResumeWorkflowAsync(resumeWorkflowOptions, applicationCancellationToken); await workflowHost.PersistStateAsync(systemCancellationToken); workflowState = workflowHost.WorkflowState; - return new WorkflowExecutionResult(workflowState.Id, workflowState.Status, workflowState.SubStatus, workflowState.Bookmarks, workflowState.Incidents); + return new WorkflowExecutionResult(workflowState.Id, workflowState.Status, workflowState.SubStatus, workflowState.Bookmarks, workflowState.Incidents, null, workflowState.Output); } } @@ -349,10 +349,10 @@ private async Task StartWorkflowAsync(IWorkflowHost wor { var workflowInstanceId = string.IsNullOrEmpty(options?.InstanceId) ? identityGenerator.GenerateId() - : options?.InstanceId; + : options.InstanceId; var cancellationTokens = options?.CancellationTokens ?? default; - await using (await AcquireLockAsync(workflowInstanceId!, cancellationTokens.SystemCancellationToken)) + await using (await AcquireLockAsync(workflowInstanceId, cancellationTokens.SystemCancellationToken)) { var input = options?.Input; var correlationId = options?.CorrelationId; @@ -377,7 +377,8 @@ private async Task StartWorkflowAsync(IWorkflowHost wor workflowState.SubStatus, workflowState.Bookmarks, workflowState.Incidents, - default); + default, + workflowState.Output); } } @@ -426,8 +427,10 @@ private async Task> ResumeWorkflowsAsync(IE }); if (resumeResult != null) - resumedWorkflows.Add(new WorkflowExecutionResult(workflowInstanceId, resumeResult.Status, - resumeResult.SubStatus, resumeResult.Bookmarks, resumeResult.Incidents)); + resumedWorkflows.Add(resumeResult with + { + WorkflowInstanceId = workflowInstanceId + }); } return resumedWorkflows;