Skip to content

Commit

Permalink
Refactor background activities scheduling
Browse files Browse the repository at this point in the history
Background activities scheduling was moved to its own middleware where the actual scheduling happens instead of in the invoking middleware. Removed ScheduleBackgroundActivities.cs handler and added ScheduleBackgroundActivitiesMiddleware.cs instead. As a part of this change, restructured BackgroundActivityInvokerMiddleware to BackgroundActivityCollectorMiddleware, and updated all relevant references.

Minor modifications to a few other files include changes in their package dependencies and activity kind settings.
  • Loading branch information
sfmskywalker committed Dec 17, 2023
1 parent 19522e4 commit fc878a4
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 34 deletions.
30 changes: 30 additions & 0 deletions src/bundles/Elsa.WorkflowServer.Web/DataSourceActivity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Bogus;
using Elsa.Extensions;
using Elsa.Workflows.Core;
using Elsa.Workflows.Core.Attributes;
using Elsa.Workflows.Core.Models;

namespace Elsa.WorkflowServer.Web;

[Activity("Demo", "Data Source", "Generates a collection of random strings.")]
public class DataSourceActivity : CodeActivity<ICollection<string>>
{
[Input(Description = "The number of items to generate.")]
public Input<int> NumberOfItems { get; set; } = default!;


protected override void Execute(ActivityExecutionContext context)
{
var randomizer = new Randomizer();
var randomStrings = new List<string>();
var numberOfItems = context.Get(NumberOfItems);

for (var i = 0; i < numberOfItems; i++)
{
var randomString = randomizer.String2(10);
randomStrings.Add(randomString);
}

Result.Set(context, randomStrings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.10.4"/>
<PackageReference Include="Bogus" Version="35.0.1" />
<PackageReference Include="FluentStorage.Azure.Blobs" Version="5.2.2"/>
<PackageReference Include="Proto.Persistence.Sqlite" Version="1.4.0"/>
<PackageReference Include="Proto.Persistence.SqlServer" Version="1.4.0"/>
<PackageReference Include="Proto.Persistence.Sqlite" Version="1.5.0"/>
<PackageReference Include="Proto.Persistence.SqlServer" Version="1.5.0"/>
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async ValueTask InvokeAsync(ActivityExecutionContext context)
}

// Check if this is a background execution.
var isBackgroundExecution = context.TransientProperties.GetValueOrDefault<object, bool>(BackgroundActivityInvokerMiddleware.IsBackgroundExecution);
var isBackgroundExecution = context.TransientProperties.GetValueOrDefault<object, bool>(BackgroundActivityCollectorMiddleware.IsBackgroundExecution);

// Is the activity configured to load the context?
foreach (var providerType in providerTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Elsa.Expressions.Contracts;
using Elsa.Expressions.Helpers;
using Elsa.Expressions.Models;
using Elsa.Expressions.Options;
using Elsa.Extensions;
using Elsa.Workflows.Core;
using Elsa.Workflows.Core.Activities.Flowchart.Attributes;
Expand All @@ -23,7 +22,7 @@ namespace Elsa.Workflows.Runtime.Activities;
/// <summary>
/// Creates new workflow instances of the specified workflow for each item in the data source and dispatches them for execution.
/// </summary>
[Activity("Elsa", "Composition", "Create new workflow instances for each item in the data source and dispatch them for execution.", Kind = ActivityKind.Task)]
[Activity("Elsa", "Composition", "Create new workflow instances for each item in the data source and dispatch them for execution.", Kind = ActivityKind.Job)]
[FlowNode("Finished", "Canceled", "Done")]
[UsedImplicitly]
public class BulkDispatchWorkflows : Activity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Elsa.Extensions;
public static class ActivityExecutionPipelineBuilderExtensions
{
/// <summary>
/// Installs the <see cref="BackgroundActivityInvokerMiddleware"/>.
/// Installs the <see cref="BackgroundActivityCollectorMiddleware"/>.
/// </summary>
public static IActivityExecutionPipelineBuilder UseBackgroundActivityInvoker(this IActivityExecutionPipelineBuilder pipelineBuilder) => pipelineBuilder.UseMiddleware<BackgroundActivityInvokerMiddleware>();
public static IActivityExecutionPipelineBuilder UseBackgroundActivityInvoker(this IActivityExecutionPipelineBuilder pipelineBuilder) => pipelineBuilder.UseMiddleware<BackgroundActivityCollectorMiddleware>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ public static class WorkflowExecutionPipelineBuilderExtensions
public static IWorkflowExecutionPipelineBuilder UseDefaultPipeline(this IWorkflowExecutionPipelineBuilder pipelineBuilder) =>
pipelineBuilder
.Reset()
.UseBackgroundActivities()
.UsePersistentVariables()
.UseBookmarkPersistence()
.UseActivityExecutionLogPersistence()
.UseWorkflowExecutionLogPersistence()
.UseExceptionHandling()
.UseDefaultActivityScheduler();

/// <summary>
/// Installs middleware that persists the workflow instance before and after workflow execution.
/// </summary>
public static IWorkflowExecutionPipelineBuilder UseBackgroundActivities(this IWorkflowExecutionPipelineBuilder pipelineBuilder) => pipelineBuilder.UseMiddleware<ScheduleBackgroundActivitiesMiddleware>();

/// <summary>
/// Installs middleware that persists the workflow instance before and after workflow execution.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ public override void Apply()
.AddNotificationHandler<ResumeDispatchWorkflowActivity>()
.AddNotificationHandler<ResumeBulkDispatchWorkflowActivity>()
.AddNotificationHandler<IndexWorkflowTriggersHandler>()
.AddNotificationHandler<ScheduleBackgroundActivities>()
.AddNotificationHandler<CancelBackgroundActivities>()
.AddNotificationHandler<DeleteBookmarks>()
.AddNotificationHandler<DeleteTriggers>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public CancelBackgroundActivities(IBackgroundActivityScheduler backgroundActivit
/// <inheritdoc />
public async Task HandleAsync(WorkflowBookmarksIndexed notification, CancellationToken cancellationToken)
{
var removedBookmarks = notification.IndexedWorkflowBookmarks.RemovedBookmarks.Where(x => x.Name == BackgroundActivityInvokerMiddleware.BackgroundActivityBookmarkName);
var removedBookmarks = notification.IndexedWorkflowBookmarks.RemovedBookmarks.Where(x => x.Name == BackgroundActivityCollectorMiddleware.BackgroundActivityBookmarkName);

foreach (var removedBookmark in removedBookmarks)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
using Elsa.Workflows.Core.Models;
using Elsa.Workflows.Core.Pipelines.ActivityExecution;
using Elsa.Workflows.Runtime.Bookmarks;
using Elsa.Workflows.Runtime.Middleware.Workflows;
using Elsa.Workflows.Runtime.Models;

namespace Elsa.Workflows.Runtime.Middleware.Activities;

/// <summary>
/// Executes the current activity from a background job if the activity is of kind <see cref="ActivityKind.Job"/> or <see cref="Task"/>.
/// Collects the current activity for scheduling for execution from a background job if the activity is of kind <see cref="ActivityKind.Job"/> or <see cref="Task"/>.
/// The actual scheduling of the activity happens in <see cref="ScheduleBackgroundActivitiesMiddleware"/>.
/// </summary>
public class BackgroundActivityInvokerMiddleware : DefaultActivityInvokerMiddleware
public class BackgroundActivityCollectorMiddleware : DefaultActivityInvokerMiddleware
{
/// <summary>
/// A key into the activity execution context's transient properties that indicates whether the current activity is being executed in the background.
Expand All @@ -23,7 +25,7 @@ public class BackgroundActivityInvokerMiddleware : DefaultActivityInvokerMiddlew
internal const string BackgroundActivityBookmarkName = "BackgroundActivity";

/// <inheritdoc />
public BackgroundActivityInvokerMiddleware(ActivityMiddlewareDelegate next) : base(next)
public BackgroundActivityCollectorMiddleware(ActivityMiddlewareDelegate next) : base(next)
{
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,54 +1,50 @@
using Elsa.Extensions;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Core;
using Elsa.Workflows.Core.Contracts;
using Elsa.Workflows.Core.Pipelines.WorkflowExecution;
using Elsa.Workflows.Runtime.Bookmarks;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Middleware.Activities;
using Elsa.Workflows.Runtime.Models;
using Elsa.Workflows.Runtime.Notifications;
using JetBrains.Annotations;

namespace Elsa.Workflows.Runtime.Handlers;
namespace Elsa.Workflows.Runtime.Middleware.Workflows;

/// <summary>
/// A handler that schedules background activities.
/// Schedule background activities.
/// </summary>
[PublicAPI]
internal class ScheduleBackgroundActivities : INotificationHandler<WorkflowBookmarksIndexed>
public class ScheduleBackgroundActivitiesMiddleware : WorkflowExecutionMiddleware
{
private readonly IBackgroundActivityScheduler _backgroundActivityScheduler;
private readonly IBookmarkPayloadSerializer _bookmarkPayloadSerializer;
private readonly IBookmarkHasher _bookmarkHasher;
private readonly IWorkflowRuntime _workflowRuntime;
private IWorkflowStateExtractor _workflowStateExtractor;

/// <summary>
/// Initializes a new instance of the <see cref="ScheduledBackgroundActivity"/> class.
/// </summary>
public ScheduleBackgroundActivities(
private readonly IWorkflowStateExtractor _workflowStateExtractor;

/// <inheritdoc />
public ScheduleBackgroundActivitiesMiddleware(
WorkflowMiddlewareDelegate next,
IBackgroundActivityScheduler backgroundActivityScheduler,
IBookmarkPayloadSerializer bookmarkPayloadSerializer,
IBookmarkHasher bookmarkHasher,
IWorkflowRuntime workflowRuntime,
IWorkflowStateExtractor workflowStateExtractor)
IWorkflowStateExtractor workflowStateExtractor) : base(next)
{
_backgroundActivityScheduler = backgroundActivityScheduler;
_bookmarkPayloadSerializer = bookmarkPayloadSerializer;
_bookmarkHasher = bookmarkHasher;
_workflowRuntime = workflowRuntime;
_workflowStateExtractor = workflowStateExtractor;
}

/// <inheritdoc />
public async Task HandleAsync(WorkflowBookmarksIndexed notification, CancellationToken cancellationToken)
public override async ValueTask InvokeAsync(WorkflowExecutionContext context)
{
var workflowExecutionContext = notification.WorkflowExecutionContext;
await Next(context);

var workflowExecutionContext = context;
var cancellationToken = context.CancellationTokens.SystemCancellationToken;

var scheduledBackgroundActivities = workflowExecutionContext
.TransientProperties
.GetOrAdd(BackgroundActivityInvokerMiddleware.BackgroundActivitySchedulesKey, () => new List<ScheduledBackgroundActivity>());
.GetOrAdd(BackgroundActivityCollectorMiddleware.BackgroundActivitySchedulesKey, () => new List<ScheduledBackgroundActivity>());

if (scheduledBackgroundActivities.Any())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc
await _variablePersistenceManager.LoadVariablesAsync(workflowExecutionContext);

// Mark the activity as being invoked from a background worker.
activityExecutionContext.TransientProperties[BackgroundActivityInvokerMiddleware.IsBackgroundExecution] = true;
activityExecutionContext.TransientProperties[BackgroundActivityCollectorMiddleware.IsBackgroundExecution] = true;

// Invoke the activity.
await _activityInvoker.InvokeAsync(activityExecutionContext);
Expand Down Expand Up @@ -110,7 +110,7 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc
// Resume the workflow, passing along the activity output.
// TODO: This approach will fail if the output is non-serializable. We need to find a way to pass the output to the workflow without serializing it.
var bookmarkId = scheduledBackgroundActivity.BookmarkId;
var inputKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityOutputKey(activityNodeId);
var inputKey = BackgroundActivityCollectorMiddleware.GetBackgroundActivityOutputKey(activityNodeId);

var dispatchRequest = new DispatchWorkflowInstanceRequest
{
Expand Down

0 comments on commit fc878a4

Please sign in to comment.