Skip to content

Commit

Permalink
Add output serialization and deserialization support (#6152)
Browse files Browse the repository at this point in the history
Introduced `Output` message and related data serialization/deserialization methods in `ProtoOutputExtensions`. Updated `WorkflowExecutionResult` and impacted methods to include `Output` handling. Enhanced workflow runtime logic to manage and integrate output data.
  • Loading branch information
sfmskywalker authored Nov 26, 2024
1 parent 4cf7972 commit a2f76b8
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 19 deletions.
15 changes: 15 additions & 0 deletions src/modules/Elsa.ProtoActor/Extensions/ProtoOutputExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Elsa.ProtoActor.ProtoBuf;

namespace Elsa.ProtoActor.Extensions;

internal static class ProtoOutputExtensions
{
public static IDictionary<string, object> Deserialize(this Output output) => output.Data.Deserialize();

public static Output SerializeOutput(this IDictionary<string, object> output)
{
var result = new Output();
output.Serialize(result.Data);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public WorkflowExecutionResult Map(ProtoWorkflowExecutionResponse source)
_workflowSubStatusMapper.Map(source.SubStatus),
_bookmarkMapper.Map(source.Bookmarks).ToList(),
_activityIncidentStateMapper.Map(source.Incidents).ToList(),
source.TriggeredActivityId.NullIfEmpty()
source.TriggeredActivityId.NullIfEmpty(),
source.Output.Deserialize()
);
}

Expand All @@ -61,6 +62,7 @@ public ProtoWorkflowExecutionResponse Map(WorkflowExecutionResult source)
Bookmarks = { _bookmarkMapper.Map(source.Bookmarks) },
Incidents = { _activityIncidentStateMapper.Map(source.Incidents).ToList() },
TriggeredActivityId = source.TriggeredActivityId,
Output = source.Output.SerializeOutput()
};
}
}
4 changes: 4 additions & 0 deletions src/modules/Elsa.ProtoActor/Proto/Shared.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ message Input {
map<string, Json> Data = 1;
}

message Output {
map<string, Json> Data = 1;
}

message Properties {
map<string, Json> Data = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ message StartWorkflowRequest {
string InstanceId = 2;
string VersionOptions = 3;
optional string CorrelationId = 4;
optional Input input = 5;
optional Properties properties = 6;
optional Input Input = 5;
optional Properties Properties = 6;
optional string TriggerActivityId = 7;
optional bool IsExistingInstance = 8;
}
Expand All @@ -29,6 +29,7 @@ message WorkflowExecutionResponse {
repeated Bookmark Bookmarks = 5;
repeated ActivityIncident Incidents = 6;
optional string TriggeredActivityId = 7;
optional Output Output = 8;
}

message ActivityIncident {
Expand Down
19 changes: 10 additions & 9 deletions src/modules/Elsa.Workflows.Runtime/Activities/ExecuteWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Management;
using Elsa.Workflows.Models;
using Elsa.Workflows.Options;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Parameters;
using Elsa.Workflows.UIHints;
using JetBrains.Annotations;

Expand Down Expand Up @@ -61,29 +62,29 @@ private async ValueTask<ExecuteWorkflowResult> ExecuteWorkflowAsync(ActivityExec
var workflowDefinitionId = WorkflowDefinitionId.Get(context);
var input = Input.GetOrDefault(context) ?? new Dictionary<string, object>();
var correlationId = CorrelationId.GetOrDefault(context);
var workflowInvoker = context.GetRequiredService<IWorkflowRunner>();
var workflowRuntime = context.GetRequiredService<IWorkflowRuntime>();
var identityGenerator = context.GetRequiredService<IIdentityGenerator>();
var workflowDefinitionService = context.GetRequiredService<IWorkflowDefinitionService>();
var workflowGraph = await workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionId, VersionOptions.Published, context.CancellationToken);

if (workflowGraph == null)
throw new Exception($"No published version of workflow definition with ID {workflowDefinitionId} found.");

var options = new RunWorkflowOptions
var options = new StartWorkflowRuntimeParams
{
ParentWorkflowInstanceId = context.WorkflowExecutionContext.Id,
Input = input,
CorrelationId = correlationId,
WorkflowInstanceId = identityGenerator.GenerateId()
InstanceId = identityGenerator.GenerateId()
};

var workflowResult = await workflowInvoker.RunAsync(workflowGraph, options, context.CancellationToken);
var workflowResult = await workflowRuntime.StartWorkflowAsync(workflowDefinitionId, options);
var info = new ExecuteWorkflowResult
{
WorkflowInstanceId = options.WorkflowInstanceId,
Status = workflowResult.WorkflowState.Status,
SubStatus = workflowResult.WorkflowState.SubStatus,
Output = workflowResult.WorkflowState.Output
WorkflowInstanceId = workflowResult.WorkflowInstanceId,
Status = workflowResult.Status,
SubStatus = workflowResult.SubStatus,
Output = workflowResult.Output
};

return info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,11 @@

namespace Elsa.Workflows.Runtime.Results;

public record WorkflowExecutionResult(string WorkflowInstanceId, WorkflowStatus Status, WorkflowSubStatus SubStatus, ICollection<Bookmark> Bookmarks, ICollection<ActivityIncident> Incidents, string? TriggeredActivityId = null);
public record WorkflowExecutionResult(
string WorkflowInstanceId,
WorkflowStatus Status,
WorkflowSubStatus SubStatus,
ICollection<Bookmark> Bookmarks,
ICollection<ActivityIncident> Incidents,
string? TriggeredActivityId,
IDictionary<string, object> Output);
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public async Task<ICollection<WorkflowExecutionResult>> StartWorkflowsAsync(stri
await workflowHost.ResumeWorkflowAsync(resumeWorkflowOptions, applicationCancellationToken);
await workflowHost.PersistStateAsync(systemCancellationToken);
workflowState = workflowHost.WorkflowState;
return new WorkflowExecutionResult(workflowState.Id, workflowState.Status, workflowState.SubStatus, workflowState.Bookmarks, workflowState.Incidents);
return new WorkflowExecutionResult(workflowState.Id, workflowState.Status, workflowState.SubStatus, workflowState.Bookmarks, workflowState.Incidents, null, workflowState.Output);
}
}

Expand Down Expand Up @@ -349,10 +349,10 @@ private async Task<WorkflowExecutionResult> StartWorkflowAsync(IWorkflowHost wor
{
var workflowInstanceId = string.IsNullOrEmpty(options?.InstanceId)
? identityGenerator.GenerateId()
: options?.InstanceId;
: options.InstanceId;
var cancellationTokens = options?.CancellationTokens ?? default;

await using (await AcquireLockAsync(workflowInstanceId!, cancellationTokens.SystemCancellationToken))
await using (await AcquireLockAsync(workflowInstanceId, cancellationTokens.SystemCancellationToken))
{
var input = options?.Input;
var correlationId = options?.CorrelationId;
Expand All @@ -377,7 +377,8 @@ private async Task<WorkflowExecutionResult> StartWorkflowAsync(IWorkflowHost wor
workflowState.SubStatus,
workflowState.Bookmarks,
workflowState.Incidents,
default);
default,
workflowState.Output);
}
}

Expand Down Expand Up @@ -426,8 +427,10 @@ private async Task<ICollection<WorkflowExecutionResult>> ResumeWorkflowsAsync(IE
});

if (resumeResult != null)
resumedWorkflows.Add(new WorkflowExecutionResult(workflowInstanceId, resumeResult.Status,
resumeResult.SubStatus, resumeResult.Bookmarks, resumeResult.Incidents));
resumedWorkflows.Add(resumeResult with
{
WorkflowInstanceId = workflowInstanceId
});
}

return resumedWorkflows;
Expand Down

0 comments on commit a2f76b8

Please sign in to comment.