From 8b293884abb5b73bd589628fc0fe9e98115b078e Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Wed, 13 Dec 2023 23:37:41 +0100 Subject: [PATCH] Add tests for implicit joins and fix duplicate activity scheduling This commit includes new integration tests for implicit join scenarios. The tests ensure that join and parallel join activities execute correctly and complete as expected. Additionally, a change has been made to the workflow execution context to prevent duplicate activity scheduling. Now, aside from checking if the activity is already scheduled, it also checks if the activity is scheduled for the specified owner. Fixes #4703 --- .../Flowchart/Activities/FlowJoin.cs | 16 +- .../WorkflowExecutionContextExtensions.cs | 3 +- .../Elsa.IntegrationTests.csproj | 6 + .../ImplicitJoins/JoinRunsOnceTests.cs | 52 ++++ .../ParallelJoinCompletesTests.cs | 52 ++++ .../ImplicitJoins/Workflows/join.json | 215 ++++++++++++++++ .../Workflows/parallel-join.json | 241 ++++++++++++++++++ 7 files changed, 576 insertions(+), 9 deletions(-) create mode 100644 test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/JoinRunsOnceTests.cs create mode 100644 test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/ParallelJoinCompletesTests.cs create mode 100644 test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/Workflows/join.json create mode 100644 test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/Workflows/parallel-join.json diff --git a/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/FlowJoin.cs b/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/FlowJoin.cs index c982f49a23..cd874d358f 100644 --- a/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/FlowJoin.cs +++ b/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/FlowJoin.cs @@ -33,10 +33,10 @@ public FlowJoin([CallerFilePath] string? source = default, [CallerLineNumber] in /// protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) { - var flowchartExecutionContext = context.ParentActivityExecutionContext!; - var flowchart = (Flowchart)flowchartExecutionContext.Activity; + var flowchartContext = context.ParentActivityExecutionContext!; + var flowchart = (Flowchart)flowchartContext.Activity; var inboundActivities = flowchart.Connections.LeftInboundActivities(this).ToList(); - var flowScope = flowchartExecutionContext.GetProperty(Flowchart.ScopeProperty)!; + var flowScope = flowchartContext.GetProperty(Flowchart.ScopeProperty)!; var executionCount = flowScope.GetExecutionCount(this); var mode = context.Get(Mode); @@ -49,7 +49,7 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context if (haveAllInboundActivitiesExecuted) { - await ClearBookmarksAsync(flowchart, context); + await CancelActivitiesInInboundPathAsync(flowchart, flowchartContext, context); await context.CompleteActivityAsync(); } @@ -57,20 +57,20 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context } case FlowJoinMode.WaitAny: { - await ClearBookmarksAsync(flowchart, context); + await CancelActivitiesInInboundPathAsync(flowchart, flowchartContext, context); await context.CompleteActivityAsync(); break; } } } - private async Task ClearBookmarksAsync(Flowchart flowchart, ActivityExecutionContext context) + private async Task CancelActivitiesInInboundPathAsync(Flowchart flowchart, ActivityExecutionContext flowchartContext, ActivityExecutionContext joinContext) { // Cancel all activities between this join activity and its most recent fork. var connections = flowchart.Connections; - var workflowExecutionContext = context.WorkflowExecutionContext; + var workflowExecutionContext = joinContext.WorkflowExecutionContext; var inboundActivities = connections.LeftAncestorActivities(this).Select(x => workflowExecutionContext.FindNodeByActivity(x)).Select(x => x!.Activity).ToList(); - var inboundActivityExecutionContexts = workflowExecutionContext.ActivityExecutionContexts.Where(x => inboundActivities.Contains(x.Activity)).ToList(); + var inboundActivityExecutionContexts = workflowExecutionContext.ActivityExecutionContexts.Where(x => inboundActivities.Contains(x.Activity) && x.ParentActivityExecutionContext == flowchartContext).ToList(); // Cancel each inbound activity. foreach (var activityExecutionContext in inboundActivityExecutionContexts) diff --git a/src/modules/Elsa.Workflows.Core/Extensions/WorkflowExecutionContextExtensions.cs b/src/modules/Elsa.Workflows.Core/Extensions/WorkflowExecutionContextExtensions.cs index 3f91e92f7e..ee24f57cde 100644 --- a/src/modules/Elsa.Workflows.Core/Extensions/WorkflowExecutionContextExtensions.cs +++ b/src/modules/Elsa.Workflows.Core/Extensions/WorkflowExecutionContextExtensions.cs @@ -98,7 +98,8 @@ public static ActivityWorkItem Schedule( if (options?.PreventDuplicateScheduling == true) { - var existingWorkItem = scheduler.Find(x => x.Activity.NodeId == activityNode.NodeId); + // Check if the activity is already scheduled for the specified owner. + var existingWorkItem = scheduler.Find(x => x.Activity.NodeId == activityNode.NodeId && x.Owner == owner); if (existingWorkItem != null) return existingWorkItem; diff --git a/test/integration/Elsa.IntegrationTests/Elsa.IntegrationTests.csproj b/test/integration/Elsa.IntegrationTests/Elsa.IntegrationTests.csproj index e171adab39..29f7310edf 100644 --- a/test/integration/Elsa.IntegrationTests/Elsa.IntegrationTests.csproj +++ b/test/integration/Elsa.IntegrationTests/Elsa.IntegrationTests.csproj @@ -153,5 +153,11 @@ Always + + Always + + + Always + diff --git a/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/JoinRunsOnceTests.cs b/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/JoinRunsOnceTests.cs new file mode 100644 index 0000000000..d1e9c5a84c --- /dev/null +++ b/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/JoinRunsOnceTests.cs @@ -0,0 +1,52 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Elsa.Common.Models; +using Elsa.IntegrationTests.Scenarios.ImplicitJoins.Workflows; +using Elsa.Testing.Shared; +using Elsa.Workflows.Core; +using Elsa.Workflows.Core.Contracts; +using Elsa.Workflows.Runtime.Contracts; +using Elsa.Workflows.Runtime.Filters; +using Microsoft.Extensions.DependencyInjection; +using Open.Linq.AsyncExtensions; +using Xunit; +using Xunit.Abstractions; + +namespace Elsa.IntegrationTests.Scenarios.ImplicitJoins; + +public class JoinRunsOnceTests +{ + private readonly IWorkflowRunner _workflowRunner; + private readonly CapturingTextWriter _capturingTextWriter = new(); + private readonly IServiceProvider _services; + + public JoinRunsOnceTests(ITestOutputHelper testOutputHelper) + { + _services = new TestApplicationBuilder(testOutputHelper).WithCapturingTextWriter(_capturingTextWriter).Build(); + _workflowRunner = _services.GetRequiredService(); + } + + [Fact(DisplayName = "The Join activity executes only once, not twice")] + public async Task Test1() + { + // Populate registries. + await _services.PopulateRegistriesAsync(); + + // Import workflow. + var workflowDefinition = await _services.ImportWorkflowDefinitionAsync($"Scenarios/ImplicitJoins/Workflows/join.json"); + + // Execute. + var state = await _services.RunWorkflowUntilEndAsync(workflowDefinition.DefinitionId); + + // Assert. + var journal = await _services.GetRequiredService().FindManyAsync(new WorkflowExecutionLogRecordFilter + { + WorkflowInstanceId = state.Id, + ActivityId = "802725996be1b582", + EventName = "Completed" + }, PageArgs.All); + + Assert.Single(journal.Items); + } +} \ No newline at end of file diff --git a/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/ParallelJoinCompletesTests.cs b/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/ParallelJoinCompletesTests.cs new file mode 100644 index 0000000000..3941705cb1 --- /dev/null +++ b/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/ParallelJoinCompletesTests.cs @@ -0,0 +1,52 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Elsa.Common.Models; +using Elsa.IntegrationTests.Scenarios.ImplicitJoins.Workflows; +using Elsa.Testing.Shared; +using Elsa.Workflows.Core; +using Elsa.Workflows.Core.Contracts; +using Elsa.Workflows.Runtime.Contracts; +using Elsa.Workflows.Runtime.Filters; +using Microsoft.Extensions.DependencyInjection; +using Open.Linq.AsyncExtensions; +using Xunit; +using Xunit.Abstractions; + +namespace Elsa.IntegrationTests.Scenarios.ImplicitJoins; + +public class ParallelJoinCompletesTests +{ + private readonly IWorkflowRunner _workflowRunner; + private readonly CapturingTextWriter _capturingTextWriter = new(); + private readonly IServiceProvider _services; + + public ParallelJoinCompletesTests(ITestOutputHelper testOutputHelper) + { + _services = new TestApplicationBuilder(testOutputHelper).WithCapturingTextWriter(_capturingTextWriter).Build(); + _workflowRunner = _services.GetRequiredService(); + } + + [Fact(DisplayName = "The ParallelForEach activity completes when its Body contains a Join activity")] + public async Task Test1() + { + // Populate registries. + await _services.PopulateRegistriesAsync(); + + // Import workflow. + var workflowDefinition = await _services.ImportWorkflowDefinitionAsync($"Scenarios/ImplicitJoins/Workflows/parallel-join.json"); + + // Execute. + var state = await _services.RunWorkflowUntilEndAsync(workflowDefinition.DefinitionId); + + // Assert. + var journal = await _services.GetRequiredService().FindManyAsync(new WorkflowExecutionLogRecordFilter + { + WorkflowInstanceId = state.Id, + ActivityId = "70fc1183cd5800f2", + EventName = "Completed" + }, PageArgs.All); + + Assert.Single(journal.Items); + } +} \ No newline at end of file diff --git a/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/Workflows/join.json b/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/Workflows/join.json new file mode 100644 index 0000000000..dc3fad71e0 --- /dev/null +++ b/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/Workflows/join.json @@ -0,0 +1,215 @@ +{ + "id": "825e9d347e2f3094", + "definitionId": "f64e42304661f407", + "name": "Joins", + "createdAt": "2023-12-13T21:41:53.628415+00:00", + "version": 3, + "toolVersion": "3.0.0.0", + "variables": [], + "inputs": [], + "outputs": [], + "outcomes": [], + "customProperties": { + "Elsa:WorkflowContextProviderTypes": [] + }, + "isReadonly": false, + "isLatest": true, + "isPublished": false, + "options": { + "autoUpdateConsumingWorkflows": false + }, + "root": { + "type": "Elsa.Flowchart", + "version": 1, + "id": "8a03bf79d15fd48a", + "nodeId": "Workflow1:8a03bf79d15fd48a", + "metadata": {}, + "customProperties": { + "source": "FlowchartJsonConverter.cs:45", + "notFoundConnections": [], + "canStartWorkflow": false, + "runAsynchronously": false + }, + "activities": [ + { + "id": "c7624645ff77c7d0", + "nodeId": "Workflow1:8a03bf79d15fd48a:c7624645ff77c7d0", + "name": "Start1", + "type": "Elsa.Start", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": -400.5, + "y": 212 + }, + "size": { + "width": 102.21875, + "height": 50 + } + } + } + }, + { + "text": null, + "id": "281e9ea6d278b231", + "nodeId": "Workflow1:8a03bf79d15fd48a:281e9ea6d278b231", + "name": "WriteLine2", + "type": "Elsa.WriteLine", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": -160.5, + "y": 305 + }, + "size": { + "width": 139.296875, + "height": 50 + } + } + } + }, + { + "mode": { + "typeName": "Elsa.Workflows.Core.Activities.Flowchart.Models.FlowJoinMode, Elsa.Workflows.Core", + "expression": { + "type": "Literal", + "value": "WaitAny" + }, + "memoryReference": { + "id": "802725996be1b582:input-0" + } + }, + "id": "802725996be1b582", + "nodeId": "Workflow1:8a03bf79d15fd48a:802725996be1b582", + "name": "FlowJoin1", + "type": "Elsa.FlowJoin", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": 80, + "y": 200 + }, + "size": { + "width": 98.265625, + "height": 50 + } + } + } + }, + { + "text": null, + "id": "bfc88f064cf394ed", + "nodeId": "Workflow1:8a03bf79d15fd48a:bfc88f064cf394ed", + "name": "WriteLine1", + "type": "Elsa.WriteLine", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": -160, + "y": 120 + }, + "size": { + "width": 139.296875, + "height": 50 + } + } + } + }, + { + "id": "5467123c7973d011", + "nodeId": "Workflow1:8a03bf79d15fd48a:5467123c7973d011", + "name": "End1", + "type": "Elsa.End", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": 274.5, + "y": 200 + }, + "size": { + "width": 94.40625, + "height": 50 + } + } + } + } + ], + "connections": [ + { + "source": { + "activity": "c7624645ff77c7d0", + "port": "Done" + }, + "target": { + "activity": "bfc88f064cf394ed", + "port": "In" + } + }, + { + "source": { + "activity": "bfc88f064cf394ed", + "port": "Done" + }, + "target": { + "activity": "802725996be1b582", + "port": "In" + } + }, + { + "source": { + "activity": "c7624645ff77c7d0", + "port": "Done" + }, + "target": { + "activity": "281e9ea6d278b231", + "port": "In" + } + }, + { + "source": { + "activity": "281e9ea6d278b231", + "port": "Done" + }, + "target": { + "activity": "802725996be1b582", + "port": "In" + } + }, + { + "source": { + "activity": "802725996be1b582", + "port": "Done" + }, + "target": { + "activity": "5467123c7973d011", + "port": "In" + } + } + ] + } +} \ No newline at end of file diff --git a/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/Workflows/parallel-join.json b/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/Workflows/parallel-join.json new file mode 100644 index 0000000000..2672924d17 --- /dev/null +++ b/test/integration/Elsa.IntegrationTests/Scenarios/ImplicitJoins/Workflows/parallel-join.json @@ -0,0 +1,241 @@ +{ + "id": "2effd8f5f4205a5b", + "definitionId": "2da8d4cb0fcfbacd", + "name": "Parallel \u002B Join", + "createdAt": "2023-12-13T21:42:12.634042+00:00", + "version": 2, + "toolVersion": "3.0.0.0", + "variables": [], + "inputs": [], + "outputs": [], + "outcomes": [], + "customProperties": { + "Elsa:WorkflowContextProviderTypes": [] + }, + "isReadonly": false, + "isLatest": true, + "isPublished": false, + "options": { + "autoUpdateConsumingWorkflows": false + }, + "root": { + "type": "Elsa.Flowchart", + "version": 1, + "id": "d3350290ed96e720", + "nodeId": "Workflow1:d3350290ed96e720", + "metadata": {}, + "customProperties": { + "source": "FlowchartJsonConverter.cs:45", + "notFoundConnections": [], + "canStartWorkflow": false, + "runAsynchronously": false + }, + "activities": [ + { + "items": { + "typeName": "Object[]", + "expression": { + "type": "JavaScript", + "value": "[1,2,3]" + }, + "memoryReference": { + "id": "70fc1183cd5800f2:input-0" + } + }, + "body": { + "type": "Elsa.Flowchart", + "version": 1, + "id": "b07170f2d68980cd", + "nodeId": "Workflow1:d3350290ed96e720:70fc1183cd5800f2:b07170f2d68980cd", + "metadata": {}, + "customProperties": { + "source": "FlowchartJsonConverter.cs:45", + "notFoundConnections": [], + "canStartWorkflow": false, + "runAsynchronously": false + }, + "activities": [ + { + "condition": { + "typeName": "Boolean", + "expression": { + "type": "JavaScript", + "value": "false" + }, + "memoryReference": { + "id": "33665f061d96b957:input-0" + } + }, + "id": "33665f061d96b957", + "nodeId": "Workflow1:d3350290ed96e720:70fc1183cd5800f2:b07170f2d68980cd:33665f061d96b957", + "name": "FlowDecision1", + "type": "Elsa.FlowDecision", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": -329.140625, + "y": -78.5 + }, + "size": { + "width": 129.515625, + "height": 50 + } + } + } + }, + { + "text": null, + "id": "e27177eb5965bd6b", + "nodeId": "Workflow1:d3350290ed96e720:70fc1183cd5800f2:b07170f2d68980cd:e27177eb5965bd6b", + "name": "WriteLine1", + "type": "Elsa.WriteLine", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": -40, + "y": -160 + }, + "size": { + "width": 139.296875, + "height": 50 + } + } + } + }, + { + "text": null, + "id": "343d233a75d8d8e9", + "nodeId": "Workflow1:d3350290ed96e720:70fc1183cd5800f2:b07170f2d68980cd:343d233a75d8d8e9", + "name": "WriteLine2", + "type": "Elsa.WriteLine", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": 371.859375, + "y": -78.5 + }, + "size": { + "width": 139.296875, + "height": 50 + } + } + } + }, + { + "mode": { + "typeName": "Elsa.Workflows.Core.Activities.Flowchart.Models.FlowJoinMode, Elsa.Workflows.Core", + "expression": { + "type": "Literal", + "value": "WaitAny" + }, + "memoryReference": { + "id": "1f8625e278b74aa3:input-0" + } + }, + "id": "1f8625e278b74aa3", + "nodeId": "Workflow1:d3350290ed96e720:70fc1183cd5800f2:b07170f2d68980cd:1f8625e278b74aa3", + "name": "FlowJoin1", + "type": "Elsa.FlowJoin", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": 188.859375, + "y": -78.5 + }, + "size": { + "width": 98.265625, + "height": 50 + } + } + } + } + ], + "connections": [ + { + "source": { + "activity": "33665f061d96b957", + "port": "True" + }, + "target": { + "activity": "e27177eb5965bd6b", + "port": "In" + } + }, + { + "source": { + "activity": "e27177eb5965bd6b", + "port": "Done" + }, + "target": { + "activity": "1f8625e278b74aa3", + "port": "In" + } + }, + { + "source": { + "activity": "1f8625e278b74aa3", + "port": "Done" + }, + "target": { + "activity": "343d233a75d8d8e9", + "port": "In" + } + }, + { + "source": { + "activity": "33665f061d96b957", + "port": "False" + }, + "target": { + "activity": "1f8625e278b74aa3", + "port": "In" + } + } + ] + }, + "id": "70fc1183cd5800f2", + "nodeId": "Workflow1:d3350290ed96e720:70fc1183cd5800f2", + "name": "ParallelForEach1", + "type": "Elsa.ParallelForEach", + "version": 1, + "customProperties": { + "canStartWorkflow": false, + "runAsynchronously": false + }, + "metadata": { + "designer": { + "position": { + "x": -260, + "y": 140 + }, + "size": { + "width": 185.203125, + "height": 116.015625 + } + } + } + } + ], + "connections": [] + } +} \ No newline at end of file