From 41adca77d66f6d32610670d52a42337c0f65f38a Mon Sep 17 00:00:00 2001 From: Ebbe Knudsen Date: Mon, 16 Dec 2024 15:35:00 +0100 Subject: [PATCH 01/11] Update BRS-026 with activities --- .../Contracts/StartOrchestrationDto.proto | 1 + .../ProcessManagerMessageClient.cs | 14 +- .../OrchestrationInstance.cs | 25 +++ .../V1/Model/RequestCalculatedDataInputV1.cs | 20 --- .../ProcessManager.Orchestrations.csproj | 1 - .../EnqueueMessagesActivity_Brs_026_V1.cs | 66 ++++++++ ...EnqueueRejectMessageActivity_Brs_026_V1.cs | 66 ++++++++ ...rformAsyncValidationActivity_Brs_026_V1.cs | 70 ++++++++ .../StartOrchestrationActivity_Brs_026_V1.cs | 52 ++++++ ...rminateOrchestrationActivity_Brs_026_V1.cs | 70 ++++++++ .../TerminateStepActivity_Brs_026_V1.cs | 61 +++++++ ...on_RequestCalculatedEnergyTimeSeries_V1.cs | 160 ++++++++++++++++++ ...estCalculatedEnergyTimeSeriesHandlerV1.cs} | 14 +- ...lculatedEnergyTimeSeriesOrchestrationV1.cs | 48 ------ ...uestCalculatedEnergyTimeSeriesTriggerV1.cs | 75 -------- ...er_RequestCalculatedEnergyTimeSeries_V1.cs | 47 +++++ .../ProcessManager.Orchestrations/Program.cs | 5 +- ...tartOrchestrationFromMessageHandlerBase.cs | 79 +++++++++ 18 files changed, 712 insertions(+), 162 deletions(-) delete mode 100644 source/ProcessManager.Orchestrations.Abstractions/Processes/BRS_026/V1/Model/RequestCalculatedDataInputV1.cs create mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs create mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs create mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs create mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs create mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs create mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs create mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs rename source/ProcessManager.Orchestrations/Processes/BRS_026/V1/{RequestCalculatedEnergyTimeSeriesHandler.cs => RequestCalculatedEnergyTimeSeriesHandlerV1.cs} (74%) delete mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesOrchestrationV1.cs delete mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesTriggerV1.cs create mode 100644 source/ProcessManager.Orchestrations/Processes/BRS_026/V1/StartTrigger_RequestCalculatedEnergyTimeSeries_V1.cs create mode 100644 source/ProcessManager.Orchestrations/Shared/StartOrchestrationFromMessageHandlerBase.cs diff --git a/source/ProcessManager.Abstractions/Contracts/StartOrchestrationDto.proto b/source/ProcessManager.Abstractions/Contracts/StartOrchestrationDto.proto index f7d87faf..7f17dd92 100644 --- a/source/ProcessManager.Abstractions/Contracts/StartOrchestrationDto.proto +++ b/source/ProcessManager.Abstractions/Contracts/StartOrchestrationDto.proto @@ -20,5 +20,6 @@ option csharp_namespace = "Energinet.DataHub.ProcessManager.Abstractions.Contrac message StartOrchestrationDto { string orchestration_name = 1; // The name of the orchestration to start. int32 orchestration_version = 2; // The version of the orchestration to start. + string started_by_actor_id = 4; // The actor id who wants to start the orchestration string json_input = 3; // The input to the orchestration serialized as a JSON } diff --git a/source/ProcessManager.Client/ProcessManagerMessageClient.cs b/source/ProcessManager.Client/ProcessManagerMessageClient.cs index 570f1246..8d4071d4 100644 --- a/source/ProcessManager.Client/ProcessManagerMessageClient.cs +++ b/source/ProcessManager.Client/ProcessManagerMessageClient.cs @@ -34,10 +34,7 @@ public Task StartNewOrchestrationInstanceAsync( CancellationToken cancellationToken) where TInputParameterDto : IInputParameterDto { - var serviceBusMessage = CreateServiceBusMessage( - command.OrchestrationDescriptionUniqueName.Name, - command.OrchestrationDescriptionUniqueName.Version, - command); + var serviceBusMessage = CreateServiceBusMessage(command); return SendServiceBusMessage( serviceBusMessage, @@ -45,21 +42,20 @@ public Task StartNewOrchestrationInstanceAsync( } private ServiceBusMessage CreateServiceBusMessage( - string orchestrationName, - int orchestrationVersion, MessageCommand command) where TInputParameterDto : IInputParameterDto { var message = new StartOrchestrationDto { - OrchestrationName = orchestrationName, - OrchestrationVersion = orchestrationVersion, + OrchestrationName = command.OrchestrationDescriptionUniqueName.Name, + OrchestrationVersion = command.OrchestrationDescriptionUniqueName.Version, + StartedByActorId = command.OperatingIdentity.ActorId.ToString(), JsonInput = JsonSerializer.Serialize(command.InputParameter), }; ServiceBusMessage serviceBusMessage = new(JsonFormatter.Default.Format(message)) { - Subject = orchestrationName, + Subject = command.OrchestrationDescriptionUniqueName.Name, MessageId = command.MessageId, ContentType = "application/json", }; diff --git a/source/ProcessManager.Core/Domain/OrchestrationInstance/OrchestrationInstance.cs b/source/ProcessManager.Core/Domain/OrchestrationInstance/OrchestrationInstance.cs index 60a01384..e78c070c 100644 --- a/source/ProcessManager.Core/Domain/OrchestrationInstance/OrchestrationInstance.cs +++ b/source/ProcessManager.Core/Domain/OrchestrationInstance/OrchestrationInstance.cs @@ -82,6 +82,31 @@ private OrchestrationInstance() /// internal OrchestrationDescriptionId OrchestrationDescriptionId { get; } + /// + /// Transition a step's lifecycle to running + /// + /// + /// + public void TransitionStepToRunning(int sequence, IClock clock) + { + var step = Steps.SingleOrDefault(s => s.Sequence == sequence); + + if (step == null) + throw new ArgumentOutOfRangeException(nameof(sequence), sequence, "A step with the given sequence does not exist"); + + step.Lifecycle.TransitionToRunning(clock); + } + + public void TransitionStepToTerminated(int sequence, OrchestrationStepTerminationStates terminationState, IClock clock) + { + var step = Steps.SingleOrDefault(s => s.Sequence == sequence); + + if (step == null) + throw new ArgumentOutOfRangeException(nameof(sequence), sequence, "A step with the given sequence does not exist"); + + step.Lifecycle.TransitionToTerminated(clock, terminationState); + } + /// /// Factory method that ensures domain rules are obeyed when creating a new /// orchestration instance. diff --git a/source/ProcessManager.Orchestrations.Abstractions/Processes/BRS_026/V1/Model/RequestCalculatedDataInputV1.cs b/source/ProcessManager.Orchestrations.Abstractions/Processes/BRS_026/V1/Model/RequestCalculatedDataInputV1.cs deleted file mode 100644 index 56d4f8e0..00000000 --- a/source/ProcessManager.Orchestrations.Abstractions/Processes/BRS_026/V1/Model/RequestCalculatedDataInputV1.cs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2020 Energinet DataHub A/S -// -// Licensed under the Apache License, Version 2.0 (the "License2"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -namespace Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; - -public record RequestCalculatedDataInputV1( - string MessageId, - TInput Input) - where TInput : class; diff --git a/source/ProcessManager.Orchestrations/ProcessManager.Orchestrations.csproj b/source/ProcessManager.Orchestrations/ProcessManager.Orchestrations.csproj index c718d7cd..e7bb367d 100644 --- a/source/ProcessManager.Orchestrations/ProcessManager.Orchestrations.csproj +++ b/source/ProcessManager.Orchestrations/ProcessManager.Orchestrations.csproj @@ -32,7 +32,6 @@ - diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs new file mode 100644 index 00000000..f6bcb9a4 --- /dev/null +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs @@ -0,0 +1,66 @@ +// Copyright 2020 Energinet DataHub A/S +// +// Licensed under the Apache License, Version 2.0 (the "License2"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; +using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; +using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask; +using NodaTime; + +namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; + +/// +/// Enqueue messages in EDI (and set step to running) +/// +internal class EnqueueMessagesActivity_Brs_026_V1( + IClock clock, + IOrchestrationInstanceProgressRepository progressRepository) +{ + private readonly IClock _clock = clock; + private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; + + public static Task RunActivity(TaskOrchestrationContext context, EnqueueMessagesActivityInput activityInput, TaskOptions options) + { + return context.CallActivityAsync( + nameof(EnqueueMessagesActivity_Brs_026_V1), + activityInput, + options); + } + + [Function(nameof(EnqueueMessagesActivity_Brs_026_V1))] + public async Task Run( + [ActivityTrigger] EnqueueMessagesActivityInput input) + { + var orchestrationInstance = await _progressRepository + .GetAsync(input.InstanceId) + .ConfigureAwait(false); + + orchestrationInstance.TransitionStepToRunning( + Orchestration_RequestCalculatedEnergyTimeSeries_V1.EnqueueMessagesStepSequence, + _clock); + await _progressRepository.UnitOfWork.CommitAsync().ConfigureAwait(false); + await EnqueueMessagesAsync(input).ConfigureAwait(false); + } + + private async Task EnqueueMessagesAsync(EnqueueMessagesActivityInput input) + { + // TODO: Enqueue message in EDI instead of delay + await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); + } + + public record EnqueueMessagesActivityInput( + OrchestrationInstanceId InstanceId, + RequestCalculatedEnergyTimeSeriesInputV1 RequestInput); +} diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs new file mode 100644 index 00000000..f17dafe2 --- /dev/null +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs @@ -0,0 +1,66 @@ +// Copyright 2020 Energinet DataHub A/S +// +// Licensed under the Apache License, Version 2.0 (the "License2"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; +using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask; +using NodaTime; + +namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; + +/// +/// Enqueue reject message in EDI (and set step to running) +/// +internal class EnqueueRejectMessageActivity_Brs_026_V1( + IClock clock, + IOrchestrationInstanceProgressRepository progressRepository) +{ + private readonly IClock _clock = clock; + private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; + + public static Task RunActivity(TaskOrchestrationContext context, EnqueueRejectMessageActivityInput activityInput, TaskOptions options) + { + return context.CallActivityAsync( + nameof(EnqueueRejectMessageActivity_Brs_026_V1), + activityInput, + options); + } + + [Function(nameof(EnqueueRejectMessageActivity_Brs_026_V1))] + public async Task Run( + [ActivityTrigger] EnqueueRejectMessageActivityInput input) + { + var orchestrationInstance = await _progressRepository + .GetAsync(input.InstanceId) + .ConfigureAwait(false); + + orchestrationInstance.TransitionStepToRunning( + Orchestration_RequestCalculatedEnergyTimeSeries_V1.EnqueueMessagesStepSequence, + _clock); + await _progressRepository.UnitOfWork.CommitAsync().ConfigureAwait(false); + + await EnqueueRejectMessageAsync(input).ConfigureAwait(false); + } + + private async Task EnqueueRejectMessageAsync(EnqueueRejectMessageActivityInput input) + { + // TODO: Enqueue message in EDI instead of delay + await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); + } + + public record EnqueueRejectMessageActivityInput( + OrchestrationInstanceId InstanceId, + string ValidationError); +} diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs new file mode 100644 index 00000000..ca6d6f3b --- /dev/null +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs @@ -0,0 +1,70 @@ +// Copyright 2020 Energinet DataHub A/S +// +// Licensed under the Apache License, Version 2.0 (the "License2"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; +using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; +using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask; +using NodaTime; + +namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; + +/// +/// Perform async validation (and set step to running) +/// +internal class PerformAsyncValidationActivity_Brs_026_V1( + IClock clock, + IOrchestrationInstanceProgressRepository progressRepository) +{ + private readonly IClock _clock = clock; + private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; + + public static Task RunActivity(TaskOrchestrationContext context, AsyncValidationActivityInput activityInput, TaskOptions options) + { + return context.CallActivityAsync( + nameof(PerformAsyncValidationActivity_Brs_026_V1), + activityInput, + options); + } + + [Function(nameof(PerformAsyncValidationActivity_Brs_026_V1))] + public async Task Run( + [ActivityTrigger] AsyncValidationActivityInput input) + { + var orchestrationInstance = await _progressRepository + .GetAsync(input.InstanceId) + .ConfigureAwait(false); + + orchestrationInstance.TransitionStepToRunning( + Orchestration_RequestCalculatedEnergyTimeSeries_V1.AsyncValidationStepSequence, + _clock); + await _progressRepository.UnitOfWork.CommitAsync().ConfigureAwait(false); + + var isValid = await PerformAsyncValidationAsync(input.RequestInput).ConfigureAwait(false); + + return isValid; + } + + private async Task PerformAsyncValidationAsync(RequestCalculatedEnergyTimeSeriesInputV1 requestInput) + { + // TODO: Perform async validation instead of delay + await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); + return true; + } + + public record AsyncValidationActivityInput( + OrchestrationInstanceId InstanceId, + RequestCalculatedEnergyTimeSeriesInputV1 RequestInput); +} diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs new file mode 100644 index 00000000..ba380eec --- /dev/null +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs @@ -0,0 +1,52 @@ +// Copyright 2020 Energinet DataHub A/S +// +// Licensed under the Apache License, Version 2.0 (the "License2"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; +using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask; +using NodaTime; + +namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; + +/// +/// Set the orchestration instance lifecycle to running +/// +internal class StartOrchestrationActivity_Brs_026_V1( + IClock clock, + IOrchestrationInstanceProgressRepository progressRepository) +{ + private readonly IClock _clock = clock; + private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; + + public static Task RunActivity(TaskOrchestrationContext context, OrchestrationInstanceId orchestrationId, TaskOptions options) + { + return context.CallActivityAsync( + nameof(StartOrchestrationActivity_Brs_026_V1), + orchestrationId, + options); + } + + [Function(nameof(StartOrchestrationActivity_Brs_026_V1))] + public async Task Run( + [ActivityTrigger] OrchestrationInstanceId orchestrationInstanceId) + { + var orchestrationInstance = await _progressRepository + .GetAsync(orchestrationInstanceId) + .ConfigureAwait(false); + + orchestrationInstance.Lifecycle.TransitionToRunning(_clock); + await _progressRepository.UnitOfWork.CommitAsync().ConfigureAwait(false); + } +} diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs new file mode 100644 index 00000000..9de99584 --- /dev/null +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs @@ -0,0 +1,70 @@ +// Copyright 2020 Energinet DataHub A/S +// +// Licensed under the Apache License, Version 2.0 (the "License2"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; +using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask; +using NodaTime; + +namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; + +/// +/// Set the orchestration instance lifecycle to terminated +/// +internal class TerminateOrchestrationActivity_Brs_026_V1( + IClock clock, + IOrchestrationInstanceProgressRepository progressRepository) +{ + private readonly IClock _clock = clock; + private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; + + public static Task RunActivity(TaskOrchestrationContext context, TerminateOrchestrationActivityInput input, TaskOptions options) + { + return context.CallActivityAsync( + nameof(TerminateOrchestrationActivity_Brs_026_V1), + input, + options); + } + + [Function(nameof(TerminateOrchestrationActivity_Brs_026_V1))] + public async Task Run( + [ActivityTrigger] TerminateOrchestrationActivityInput input) + { + var orchestrationInstance = await _progressRepository + .GetAsync(input.InstanceId) + .ConfigureAwait(false); + + switch (input.TerminationState) + { + case OrchestrationInstanceTerminationStates.Succeeded: + orchestrationInstance.Lifecycle.TransitionToSucceeded(_clock); + break; + + case OrchestrationInstanceTerminationStates.Failed: + orchestrationInstance.Lifecycle.TransitionToFailed(_clock); + break; + + case OrchestrationInstanceTerminationStates.UserCanceled: + default: + throw new ArgumentOutOfRangeException(nameof(input.TerminationState), input.TerminationState, "Invalid termination state"); + } + + await _progressRepository.UnitOfWork.CommitAsync().ConfigureAwait(false); + } + + public record TerminateOrchestrationActivityInput( + OrchestrationInstanceId InstanceId, + OrchestrationInstanceTerminationStates TerminationState); +} diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs new file mode 100644 index 00000000..a87dc13e --- /dev/null +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs @@ -0,0 +1,61 @@ +// Copyright 2020 Energinet DataHub A/S +// +// Licensed under the Apache License, Version 2.0 (the "License2"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; +using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask; +using NodaTime; + +namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; + +/// +/// Set the orchestration instance step lifecycle to terminated +/// +internal class TerminateStepActivity_Brs_026_V1( + IClock clock, + IOrchestrationInstanceProgressRepository progressRepository) +{ + private readonly IClock _clock = clock; + private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; + + public static Task RunActivity(TaskOrchestrationContext context, TerminateStepActivityInput input, TaskOptions options) + { + return context.CallActivityAsync( + nameof(TerminateStepActivity_Brs_026_V1), + input, + options); + } + + [Function(nameof(TerminateStepActivity_Brs_026_V1))] + public async Task Run( + [ActivityTrigger] TerminateStepActivityInput input) + { + var orchestrationInstance = await _progressRepository + .GetAsync(input.InstanceId) + .ConfigureAwait(false); + + orchestrationInstance.TransitionStepToTerminated( + input.StepSequence, + input.TerminationState, + _clock); + + await _progressRepository.UnitOfWork.CommitAsync().ConfigureAwait(false); + } + + public record TerminateStepActivityInput( + OrchestrationInstanceId InstanceId, + int StepSequence, + OrchestrationStepTerminationStates TerminationState); +} diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs new file mode 100644 index 00000000..c29339ee --- /dev/null +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs @@ -0,0 +1,160 @@ +// Copyright 2020 Energinet DataHub A/S +// +// Licensed under the Apache License, Version 2.0 (the "License2"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; +using Energinet.DataHub.ProcessManagement.Core.Infrastructure.Extensions.DurableTask; +using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; +using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask; +using Microsoft.Extensions.Logging; + +namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1; + +// TODO: Implement according to guidelines: https://energinet.atlassian.net/wiki/spaces/D3/pages/824803345/Durable+Functions+Development+Guidelines +internal class Orchestration_RequestCalculatedEnergyTimeSeries_V1 +{ + public const int AsyncValidationStepSequence = 1; + public const int EnqueueMessagesStepSequence = 2; + + [Function(nameof(Orchestration_RequestCalculatedEnergyTimeSeries_V1))] + public async Task Run( + [OrchestrationTrigger] TaskOrchestrationContext context) + { + /* + * Orchestration: + * 1. Deserialize input + * 2. Async validation + * 3. Enqueue Messages in EDI + * 4. Wait for notify from EDI + * 5. Complete process in database + */ + + var input = context.GetOrchestrationParameterValue(); + + if (input == null) + return "Error: No input specified."; + + var instanceId = new OrchestrationInstanceId(Guid.Parse(context.InstanceId)); + var defaultRetryOptions = CreateDefaultRetryOptions(); + + // Set orchestration lifecycle to running + await StartOrchestrationActivity_Brs_026_V1.RunActivity(context, instanceId, defaultRetryOptions); + + var isValid = await PerformAsyncValidation(context, instanceId, input, defaultRetryOptions); + + if (isValid) + { + await EnqueueMessagesActivity_Brs_026_V1.RunActivity( + context, + new EnqueueMessagesActivity_Brs_026_V1.EnqueueMessagesActivityInput( + instanceId, + input), + defaultRetryOptions); + } + else + { + await EnqueueRejectMessageActivity_Brs_026_V1.RunActivity( + context, + new EnqueueRejectMessageActivity_Brs_026_V1.EnqueueRejectMessageActivityInput( + instanceId, + "Validation error"), + defaultRetryOptions); + } + + var messagesEnqueued = await WaitForEnqueueMessagesResponse(context, instanceId); + + var enqueueMessagesTerminationState = messagesEnqueued + ? OrchestrationStepTerminationStates.Succeeded + : OrchestrationStepTerminationStates.Failed; + await TerminateStepActivity_Brs_026_V1.RunActivity( + context, + new TerminateStepActivity_Brs_026_V1.TerminateStepActivityInput( + instanceId, + EnqueueMessagesStepSequence, + enqueueMessagesTerminationState), + defaultRetryOptions); + + if (!messagesEnqueued) + { + var logger = context.CreateReplaySafeLogger(); + logger.Log( + LogLevel.Warning, + "Timeout while waiting for enqueue messages to complete (InstanceId={OrchestrationInstanceId}).", + instanceId.Value); + + await TerminateOrchestrationActivity_Brs_026_V1.RunActivity( + context, + new TerminateOrchestrationActivity_Brs_026_V1.TerminateOrchestrationActivityInput( + instanceId, + OrchestrationInstanceTerminationStates.Failed), + defaultRetryOptions); + + return "Error: Timeout while waiting for enqueue messages"; + } + + await TerminateOrchestrationActivity_Brs_026_V1.RunActivity( + context, + new TerminateOrchestrationActivity_Brs_026_V1.TerminateOrchestrationActivityInput( + instanceId, + OrchestrationInstanceTerminationStates.Succeeded), + defaultRetryOptions); + + return $"Success (BusinessReason={input.BusinessReason})"; + } + + private async Task WaitForEnqueueMessagesResponse(TaskOrchestrationContext context, OrchestrationInstanceId instanceId) + { + // TODO: Use monitor pattern to wait for "notify" from EDI + var waitForMessagesEnqueued = context.CreateTimer(TimeSpan.FromSeconds(1), CancellationToken.None); + await waitForMessagesEnqueued; + + return true; + } + + private async Task PerformAsyncValidation( + TaskOrchestrationContext context, + OrchestrationInstanceId instanceId, + RequestCalculatedEnergyTimeSeriesInputV1 input, + TaskOptions retryOptions) + { + var isValid = await PerformAsyncValidationActivity_Brs_026_V1.RunActivity( + context, + new PerformAsyncValidationActivity_Brs_026_V1.AsyncValidationActivityInput( + instanceId, + input), + retryOptions); + + var asyncValidationTerminationState = isValid + ? OrchestrationStepTerminationStates.Succeeded + : OrchestrationStepTerminationStates.Failed; + await TerminateStepActivity_Brs_026_V1.RunActivity( + context, + new TerminateStepActivity_Brs_026_V1.TerminateStepActivityInput( + instanceId, + AsyncValidationStepSequence, + asyncValidationTerminationState), + retryOptions); + + return isValid; + } + + private TaskOptions CreateDefaultRetryOptions() + { + return TaskOptions.FromRetryPolicy(new RetryPolicy( + maxNumberOfAttempts: 5, + firstRetryInterval: TimeSpan.FromSeconds(30), + backoffCoefficient: 2.0)); + } +} diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesHandler.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesHandlerV1.cs similarity index 74% rename from source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesHandler.cs rename to source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesHandlerV1.cs index ab3eae80..a3ae6a5d 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesHandler.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesHandlerV1.cs @@ -12,27 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Azure.Messaging.ServiceBus; using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationDescription; using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; +using Energinet.DataHub.ProcessManager.Orchestrations.Shared; +using Microsoft.Extensions.Logging; namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1; -public class RequestCalculatedEnergyTimeSeriesHandler( +public class RequestCalculatedEnergyTimeSeriesHandlerV1( + ILogger logger, IStartOrchestrationInstanceCommands commands) + : StartOrchestrationFromMessageHandlerBase(logger) { private readonly IStartOrchestrationInstanceCommands _commands = commands; - /// - /// Start a request for calculated energy time series. - /// - public async Task StartRequestCalculatedEnergyTimeSeriesAsync(RequestCalculatedEnergyTimeSeriesInputV1 input) + protected override async Task StartOrchestration(ActorIdentity actorIdentity, RequestCalculatedEnergyTimeSeriesInputV1 input) { var orchestrationDescriptionUniqueName = new Brs_026_V1(); await _commands.StartNewOrchestrationInstanceAsync( - identity: new ActorIdentity(new ActorId(Guid.NewGuid())), // TODO: Any call to commands must include identity information; see 'ScheduleOrchestrationInstanceCommand' and 'CancelScheduledOrchestrationInstanceCommand' + identity: actorIdentity, uniqueName: new OrchestrationDescriptionUniqueName( orchestrationDescriptionUniqueName.Name, orchestrationDescriptionUniqueName.Version), diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesOrchestrationV1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesOrchestrationV1.cs deleted file mode 100644 index e4bd3a13..00000000 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesOrchestrationV1.cs +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2020 Energinet DataHub A/S -// -// Licensed under the Apache License, Version 2.0 (the "License2"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using Energinet.DataHub.ProcessManagement.Core.Infrastructure.Extensions.DurableTask; -using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; -using Microsoft.Azure.Functions.Worker; -using Microsoft.DurableTask; - -namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1; - -// TODO: Implement according to guidelines: https://energinet.atlassian.net/wiki/spaces/D3/pages/824803345/Durable+Functions+Development+Guidelines -internal class RequestCalculatedEnergyTimeSeriesOrchestrationV1 -{ - [Function(nameof(RequestCalculatedEnergyTimeSeriesOrchestrationV1))] - public async Task Run( - [OrchestrationTrigger] TaskOrchestrationContext context) - { - var input = context.GetOrchestrationParameterValue(); - - if (input == null) - return "Error: No input specified."; - - await Task.CompletedTask; - - /* - * Steps: - * 1. Deserialize input - * 2. Async validation - * 3. Query databricks and upload to storage account - * 4. Enqueue Messages in EDI - * 5. Wait for notify from EDI - * 6. Complete process in database - */ - - return $"Success (BusinessReason={input.BusinessReason})"; - } -} diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesTriggerV1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesTriggerV1.cs deleted file mode 100644 index c8786810..00000000 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/RequestCalculatedEnergyTimeSeriesTriggerV1.cs +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2020 Energinet DataHub A/S -// -// Licensed under the Apache License, Version 2.0 (the "License2"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System.Text.Json; -using Azure.Messaging.ServiceBus; -using Energinet.DataHub.Core.Messaging.Communication.Extensions.Options; -using Energinet.DataHub.ProcessManager.Abstractions.Contracts; -using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; -using Energinet.DataHub.ProcessManager.Orchestrations.Extensions.Options; -using Microsoft.Azure.Functions.Worker; -using Microsoft.Extensions.Logging; - -namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1; - -public class RequestCalculatedEnergyTimeSeriesTriggerV1( - ILogger logger, - RequestCalculatedEnergyTimeSeriesHandler handler) -{ - private readonly ILogger _logger = logger; - private readonly RequestCalculatedEnergyTimeSeriesHandler _handler = handler; - - /// - /// Start a BRS-026 request. - /// - [Function(nameof(RequestCalculatedEnergyTimeSeriesTriggerV1))] - public async Task Run( - [ServiceBusTrigger( - $"%{ProcessManagerTopicOptions.SectionName}:{nameof(ProcessManagerTopicOptions.TopicName)}%", - $"%{ProcessManagerTopicOptions.SectionName}:{nameof(ProcessManagerTopicOptions.Brs026SubscriptionName)}%", - Connection = ServiceBusNamespaceOptions.SectionName)] - ServiceBusReceivedMessage message) - { - using var serviceBusMessageLoggerScope = _logger.BeginScope(new - { - ServiceBusMessage = new - { - message.MessageId, - message.CorrelationId, - message.Subject, - }, - }); - - var jsonMessage = message.Body.ToString(); - var startOrchestrationDto = StartOrchestrationDto.Parser.ParseJson(jsonMessage); - using var startOrchestrationLoggerScope = _logger.BeginScope(new - { - StartOrchestration = new - { - startOrchestrationDto.OrchestrationName, - startOrchestrationDto.OrchestrationVersion, - }, - }); - - var requestCalculatedEnergyTimeSeriesDto = JsonSerializer.Deserialize(startOrchestrationDto.JsonInput); - if (requestCalculatedEnergyTimeSeriesDto is null) - { - _logger.LogWarning($"Unable to deserialize {nameof(startOrchestrationDto.JsonInput)} to {nameof(RequestCalculatedEnergyTimeSeriesInputV1)} type:{Environment.NewLine}{0}", startOrchestrationDto.JsonInput); - throw new ArgumentException($"Unable to deserialize {nameof(startOrchestrationDto.JsonInput)} to {nameof(RequestCalculatedEnergyTimeSeriesInputV1)} type"); - } - - await _handler.StartRequestCalculatedEnergyTimeSeriesAsync(requestCalculatedEnergyTimeSeriesDto) - .ConfigureAwait(false); - } -} diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/StartTrigger_RequestCalculatedEnergyTimeSeries_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/StartTrigger_RequestCalculatedEnergyTimeSeries_V1.cs new file mode 100644 index 00000000..43fff4e9 --- /dev/null +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/StartTrigger_RequestCalculatedEnergyTimeSeries_V1.cs @@ -0,0 +1,47 @@ +// Copyright 2020 Energinet DataHub A/S +// +// Licensed under the Apache License, Version 2.0 (the "License2"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Text.Json; +using Azure.Messaging.ServiceBus; +using Energinet.DataHub.Core.Messaging.Communication.Extensions.Options; +using Energinet.DataHub.ProcessManager.Abstractions.Contracts; +using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; +using Energinet.DataHub.ProcessManager.Orchestrations.Extensions.Options; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Logging; + +namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1; + +public class StartTrigger_RequestCalculatedEnergyTimeSeries_V1( + ILogger logger, + RequestCalculatedEnergyTimeSeriesHandlerV1 handler) +{ + private readonly ILogger _logger = logger; + private readonly RequestCalculatedEnergyTimeSeriesHandlerV1 _handler = handler; + + /// + /// Start a BRS-026 request. + /// + [Function(nameof(StartTrigger_RequestCalculatedEnergyTimeSeries_V1))] + public async Task Run( + [ServiceBusTrigger( + $"%{ProcessManagerTopicOptions.SectionName}:{nameof(ProcessManagerTopicOptions.TopicName)}%", + $"%{ProcessManagerTopicOptions.SectionName}:{nameof(ProcessManagerTopicOptions.Brs026SubscriptionName)}%", + Connection = ServiceBusNamespaceOptions.SectionName)] + ServiceBusReceivedMessage message) + { + await _handler.StartOrchestration(message) + .ConfigureAwait(false); + } +} diff --git a/source/ProcessManager.Orchestrations/Program.cs b/source/ProcessManager.Orchestrations/Program.cs index 95e28eb8..ba0b6d72 100644 --- a/source/ProcessManager.Orchestrations/Program.cs +++ b/source/ProcessManager.Orchestrations/Program.cs @@ -69,7 +69,7 @@ // => Handlers services.AddScoped(); services.AddScoped(); - services.AddScoped(); + services.AddScoped(); services.AddScoped(); }) .ConfigureLogging((hostingContext, logging) => @@ -154,12 +154,11 @@ OrchestrationDescription CreateDescription_Brs_026_V1() orchestrationDescriptionUniqueName.Name, orchestrationDescriptionUniqueName.Version), canBeScheduled: false, - functionName: nameof(RequestCalculatedEnergyTimeSeriesOrchestrationV1)); + functionName: nameof(Orchestration_RequestCalculatedEnergyTimeSeries_V1)); description.ParameterDefinition.SetFromType(); description.AppendStepDescription("Asynkron validering"); - description.AppendStepDescription("Hent anmodningsdata"); description.AppendStepDescription("Udsend beskeder"); return description; diff --git a/source/ProcessManager.Orchestrations/Shared/StartOrchestrationFromMessageHandlerBase.cs b/source/ProcessManager.Orchestrations/Shared/StartOrchestrationFromMessageHandlerBase.cs new file mode 100644 index 00000000..8a3b6c9f --- /dev/null +++ b/source/ProcessManager.Orchestrations/Shared/StartOrchestrationFromMessageHandlerBase.cs @@ -0,0 +1,79 @@ +// Copyright 2020 Energinet DataHub A/S +// +// Licensed under the Apache License, Version 2.0 (the "License2"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Text.Json; +using Azure.Messaging.ServiceBus; +using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; +using Energinet.DataHub.ProcessManager.Abstractions.Api.Model; +using Energinet.DataHub.ProcessManager.Abstractions.Contracts; +using Microsoft.Extensions.Logging; + +namespace Energinet.DataHub.ProcessManager.Orchestrations.Shared; + +public abstract class StartOrchestrationFromMessageHandlerBase( + ILogger logger) + where TInputParameterDto : IInputParameterDto +{ + private readonly ILogger _logger = logger; + + public Task StartOrchestration(ServiceBusReceivedMessage message) + { + using var serviceBusMessageLoggerScope = _logger.BeginScope(new + { + ServiceBusMessage = new + { + message.MessageId, + message.CorrelationId, + message.Subject, + }, + }); + + var jsonMessage = message.Body.ToString(); + var startOrchestrationDto = StartOrchestrationDto.Parser.ParseJson(jsonMessage); + using var startOrchestrationLoggerScope = _logger.BeginScope(new + { + StartOrchestration = new + { + startOrchestrationDto.OrchestrationName, + startOrchestrationDto.OrchestrationVersion, + OperatingIdentity = new + { + ActorId = startOrchestrationDto.StartedByActorId, + }, + }, + }); + + var inputParameterDto = JsonSerializer.Deserialize(startOrchestrationDto.JsonInput); + if (inputParameterDto is null) + { + var inputTypeName = typeof(TInputParameterDto).Name; + _logger.LogWarning($"Unable to deserialize {nameof(startOrchestrationDto.JsonInput)} to {inputTypeName} type:{Environment.NewLine}{0}", startOrchestrationDto.JsonInput); + throw new ArgumentException($"Unable to deserialize {nameof(startOrchestrationDto.JsonInput)} to {inputTypeName} type"); + } + + if (!Guid.TryParse(startOrchestrationDto.StartedByActorId, out var actorId)) + { + throw new ArgumentOutOfRangeException( + paramName: nameof(StartOrchestrationDto.StartedByActorId), + actualValue: startOrchestrationDto.StartedByActorId, + message: $"Unable to parse {nameof(startOrchestrationDto.StartedByActorId)} to guid"); + } + + return StartOrchestration( + new ActorIdentity(new ActorId(actorId)), + inputParameterDto); + } + + protected abstract Task StartOrchestration(ActorIdentity actorIdentity, TInputParameterDto input); +} From 8d841105f769b1a120a1952abc182896e3c927c0 Mon Sep 17 00:00:00 2001 From: Ebbe Knudsen Date: Mon, 16 Dec 2024 15:42:59 +0100 Subject: [PATCH 02/11] Add subject filters to subscriptions in test --- .../Fixtures/ProcessManagerClientFixture.cs | 2 ++ .../V1/RequestCalculatedEnergyTimeSeriesTests.cs | 2 +- .../Fixtures/OrchestrationsAppManager.cs | 6 ++++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/source/ProcessManager.Client.Tests/Fixtures/ProcessManagerClientFixture.cs b/source/ProcessManager.Client.Tests/Fixtures/ProcessManagerClientFixture.cs index 7d7021e4..a698c8d7 100644 --- a/source/ProcessManager.Client.Tests/Fixtures/ProcessManagerClientFixture.cs +++ b/source/ProcessManager.Client.Tests/Fixtures/ProcessManagerClientFixture.cs @@ -97,7 +97,9 @@ public async Task InitializeAsync() ProcessManagerTopic = await ServiceBusResourceProvider.BuildTopic("pm-topic") .AddSubscription(brs026SubscriptionName) + .AddSubjectFilter("Brs_026") .AddSubscription(brs021ForwardMeteredDataSubscriptionName) + .AddSubjectFilter("Brs_021_ForwardMeteredData") .CreateAsync(); var brs026Subscription = ProcessManagerTopic.Subscriptions.Single(x => x.SubscriptionName.Equals(brs026SubscriptionName)); var brs021ForwardMeteredDataSubscription = ProcessManagerTopic.Subscriptions.Single(x => x.SubscriptionName.Equals(brs021ForwardMeteredDataSubscriptionName)); diff --git a/source/ProcessManager.Client.Tests/Integration/BRS_026_028/V1/RequestCalculatedEnergyTimeSeriesTests.cs b/source/ProcessManager.Client.Tests/Integration/BRS_026_028/V1/RequestCalculatedEnergyTimeSeriesTests.cs index 09020965..de5c54a2 100644 --- a/source/ProcessManager.Client.Tests/Integration/BRS_026_028/V1/RequestCalculatedEnergyTimeSeriesTests.cs +++ b/source/ProcessManager.Client.Tests/Integration/BRS_026_028/V1/RequestCalculatedEnergyTimeSeriesTests.cs @@ -106,7 +106,7 @@ public async Task RequestCalculatedEnergyTimeSeries_WhenStarted_OrchestrationCom // Assert var orchestration = await _fixture.DurableClient.WaitForOrchestationStartedAsync( createdTimeFrom: orchestrationCreatedAfter, - name: "RequestCalculatedEnergyTimeSeriesOrchestrationV1"); + name: "Orchestration_RequestCalculatedEnergyTimeSeries_V1"); orchestration.Input.ToString().Should().Contain(businessReason); var completedOrchestration = await _fixture.DurableClient.WaitForOrchestrationCompletedAsync( diff --git a/source/ProcessManager.Orchestrations.Tests/Fixtures/OrchestrationsAppManager.cs b/source/ProcessManager.Orchestrations.Tests/Fixtures/OrchestrationsAppManager.cs index d2da40cf..4204e76f 100644 --- a/source/ProcessManager.Orchestrations.Tests/Fixtures/OrchestrationsAppManager.cs +++ b/source/ProcessManager.Orchestrations.Tests/Fixtures/OrchestrationsAppManager.cs @@ -126,12 +126,14 @@ public async Task StartAsync( if (brs026Subscription is null) { - topicResourceBuilder.AddSubscription(brs026SubscriptionName); + topicResourceBuilder.AddSubscription(brs026SubscriptionName) + .AddSubjectFilter("Brs_026"); } if (brs021ForwardMeteredDataSubscription is null) { - topicResourceBuilder.AddSubscription(brs021ForwardMeteredDataSubscriptionName); + topicResourceBuilder.AddSubscription(brs021ForwardMeteredDataSubscriptionName) + .AddSubjectFilter("Brs_021_ForwardMeteredData"); } var topicResource = await topicResourceBuilder.CreateAsync(); From f5fe6544ed36894b18d19756ae7e65bb987471f1 Mon Sep 17 00:00:00 2001 From: Ebbe Knudsen Date: Mon, 16 Dec 2024 15:52:02 +0100 Subject: [PATCH 03/11] Update docs --- .../OrchestrationInstance/OrchestrationInstance.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/source/ProcessManager.Core/Domain/OrchestrationInstance/OrchestrationInstance.cs b/source/ProcessManager.Core/Domain/OrchestrationInstance/OrchestrationInstance.cs index e78c070c..6c2a52d8 100644 --- a/source/ProcessManager.Core/Domain/OrchestrationInstance/OrchestrationInstance.cs +++ b/source/ProcessManager.Core/Domain/OrchestrationInstance/OrchestrationInstance.cs @@ -85,8 +85,9 @@ private OrchestrationInstance() /// /// Transition a step's lifecycle to running /// - /// + /// The sequence number of the step to transition /// + /// Thrown if a step with the given isn't found. public void TransitionStepToRunning(int sequence, IClock clock) { var step = Steps.SingleOrDefault(s => s.Sequence == sequence); @@ -97,6 +98,13 @@ public void TransitionStepToRunning(int sequence, IClock clock) step.Lifecycle.TransitionToRunning(clock); } + /// + /// Transition a step's lifecycle to terminated, with the given + /// + /// The sequence number of the step to transition + /// The state of the termination step (Succeeded, failed etc.) + /// + /// Thrown if a step with the given isn't found. public void TransitionStepToTerminated(int sequence, OrchestrationStepTerminationStates terminationState, IClock clock) { var step = Steps.SingleOrDefault(s => s.Sequence == sequence); From 3a6ac5bc234fde62a008b0c0f11e7e6f3f70d915 Mon Sep 17 00:00:00 2001 From: Ebbe Knudsen Date: Mon, 16 Dec 2024 15:54:35 +0100 Subject: [PATCH 04/11] Remove unused variable --- .../V1/StartTrigger_RequestCalculatedEnergyTimeSeries_V1.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/StartTrigger_RequestCalculatedEnergyTimeSeries_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/StartTrigger_RequestCalculatedEnergyTimeSeries_V1.cs index 43fff4e9..9f9a23ed 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/StartTrigger_RequestCalculatedEnergyTimeSeries_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/StartTrigger_RequestCalculatedEnergyTimeSeries_V1.cs @@ -24,10 +24,8 @@ namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1; public class StartTrigger_RequestCalculatedEnergyTimeSeries_V1( - ILogger logger, RequestCalculatedEnergyTimeSeriesHandlerV1 handler) { - private readonly ILogger _logger = logger; private readonly RequestCalculatedEnergyTimeSeriesHandlerV1 _handler = handler; /// From 078dab0f0f83c6ec9860b35845ec1c0fd93e98df Mon Sep 17 00:00:00 2001 From: Ebbe Knudsen Date: Mon, 16 Dec 2024 15:58:44 +0100 Subject: [PATCH 05/11] Bump version --- docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md | 4 ++++ .../ProcessManager.Abstractions.csproj | 2 +- source/ProcessManager.Client/ProcessManager.Client.csproj | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md b/docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md index 6266c470..604aedc2 100644 --- a/docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md +++ b/docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md @@ -1,5 +1,9 @@ # ProcessManager.Client Release Notes +## Version 0.14.4 + +- Add actor id to start orchestration message + ## Version 0.14.3 - No functional changes diff --git a/source/ProcessManager.Abstractions/ProcessManager.Abstractions.csproj b/source/ProcessManager.Abstractions/ProcessManager.Abstractions.csproj index a7569279..e8b4b5d1 100644 --- a/source/ProcessManager.Abstractions/ProcessManager.Abstractions.csproj +++ b/source/ProcessManager.Abstractions/ProcessManager.Abstractions.csproj @@ -7,7 +7,7 @@ Energinet.DataHub.ProcessManager.Abstractions - 0.14.3$(VersionSuffix) + 0.14.4$(VersionSuffix) DH3 Process Manager Abstractions library Energinet-DataHub Energinet-DataHub diff --git a/source/ProcessManager.Client/ProcessManager.Client.csproj b/source/ProcessManager.Client/ProcessManager.Client.csproj index d97c100c..0cabd9ff 100644 --- a/source/ProcessManager.Client/ProcessManager.Client.csproj +++ b/source/ProcessManager.Client/ProcessManager.Client.csproj @@ -7,7 +7,7 @@ Energinet.DataHub.ProcessManager.Client - 0.14.3$(VersionSuffix) + 0.14.4$(VersionSuffix) DH3 Process Manager Client library Energinet-DataHub Energinet-DataHub From 05b218d6b433267f1c90701d9581d55c08aa82a9 Mon Sep 17 00:00:00 2001 From: Ebbe Knudsen Date: Mon, 16 Dec 2024 16:03:25 +0100 Subject: [PATCH 06/11] Bump version --- docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md | 8 ++++---- .../ReleaseNotes/ReleaseNotes.md | 4 ++++ .../ProcessManager.Orchestrations.Abstractions.csproj | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md b/docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md index 604aedc2..2682f621 100644 --- a/docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md +++ b/docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md @@ -2,19 +2,19 @@ ## Version 0.14.4 -- Add actor id to start orchestration message +- Add actor id to start orchestration messages. ## Version 0.14.3 -- No functional changes +- No functional changes. ## Version 0.14.2 -- No functional changes +- No functional changes. ## Version 0.14.1 -- Rename service bus client options +- Rename service bus client options. ## Version 0.14.0 diff --git a/docs/ProcessManager.Orchestrations.Abstractions/ReleaseNotes/ReleaseNotes.md b/docs/ProcessManager.Orchestrations.Abstractions/ReleaseNotes/ReleaseNotes.md index cfb57435..0a5a99c1 100644 --- a/docs/ProcessManager.Orchestrations.Abstractions/ReleaseNotes/ReleaseNotes.md +++ b/docs/ProcessManager.Orchestrations.Abstractions/ReleaseNotes/ReleaseNotes.md @@ -1,5 +1,9 @@ # ProcessManager.Orchestrations.Abstractions Release Notes +## Version 0.2.5 + +- Add actor id to start orchestration messages. + ## Version 0.2.4 - Update (implement) correct input for starting BRS-026 and BRS-028 orchestrations. diff --git a/source/ProcessManager.Orchestrations.Abstractions/ProcessManager.Orchestrations.Abstractions.csproj b/source/ProcessManager.Orchestrations.Abstractions/ProcessManager.Orchestrations.Abstractions.csproj index 20c36629..396f166a 100644 --- a/source/ProcessManager.Orchestrations.Abstractions/ProcessManager.Orchestrations.Abstractions.csproj +++ b/source/ProcessManager.Orchestrations.Abstractions/ProcessManager.Orchestrations.Abstractions.csproj @@ -7,7 +7,7 @@ Energinet.DataHub.ProcessManager.Orchestrations.Abstractions - 0.2.4$(VersionSuffix) + 0.2.5$(VersionSuffix) DH3 Process Manager Orchestrations Abstractions library Energinet-DataHub Energinet-DataHub From d97949d6f05bc0fd07c6e137de5b04155329dd27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Stenr=C3=B8jl?= Date: Tue, 17 Dec 2024 10:26:18 +0100 Subject: [PATCH 07/11] Named record for ActivityInput --- .../EnqueueMessagesActivity_Brs_026_V1.cs | 8 ++++---- ...EnqueueRejectMessageActivity_Brs_026_V1.cs | 8 ++++---- ...rformAsyncValidationActivity_Brs_026_V1.cs | 6 +++--- .../StartOrchestrationActivity_Brs_026_V1.cs | 11 ++++++---- ...rminateOrchestrationActivity_Brs_026_V1.cs | 6 +++--- .../TerminateStepActivity_Brs_026_V1.cs | 6 +++--- ...on_RequestCalculatedEnergyTimeSeries_V1.cs | 20 +++++++++++-------- 7 files changed, 36 insertions(+), 29 deletions(-) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs index f6bcb9a4..a758bac1 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs @@ -31,7 +31,7 @@ internal class EnqueueMessagesActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, EnqueueMessagesActivityInput activityInput, TaskOptions options) + public static Task RunActivity(TaskOrchestrationContext context, ActivityInput activityInput, TaskOptions options) { return context.CallActivityAsync( nameof(EnqueueMessagesActivity_Brs_026_V1), @@ -41,7 +41,7 @@ public static Task RunActivity(TaskOrchestrationContext context, EnqueueMessages [Function(nameof(EnqueueMessagesActivity_Brs_026_V1))] public async Task Run( - [ActivityTrigger] EnqueueMessagesActivityInput input) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await _progressRepository .GetAsync(input.InstanceId) @@ -54,13 +54,13 @@ public async Task Run( await EnqueueMessagesAsync(input).ConfigureAwait(false); } - private async Task EnqueueMessagesAsync(EnqueueMessagesActivityInput input) + private async Task EnqueueMessagesAsync(ActivityInput input) { // TODO: Enqueue message in EDI instead of delay await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); } - public record EnqueueMessagesActivityInput( + public record ActivityInput( OrchestrationInstanceId InstanceId, RequestCalculatedEnergyTimeSeriesInputV1 RequestInput); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs index f17dafe2..39b12ba2 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs @@ -30,7 +30,7 @@ internal class EnqueueRejectMessageActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, EnqueueRejectMessageActivityInput activityInput, TaskOptions options) + public static Task RunActivity(TaskOrchestrationContext context, ActivityInput activityInput, TaskOptions options) { return context.CallActivityAsync( nameof(EnqueueRejectMessageActivity_Brs_026_V1), @@ -40,7 +40,7 @@ public static Task RunActivity(TaskOrchestrationContext context, EnqueueRejectMe [Function(nameof(EnqueueRejectMessageActivity_Brs_026_V1))] public async Task Run( - [ActivityTrigger] EnqueueRejectMessageActivityInput input) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await _progressRepository .GetAsync(input.InstanceId) @@ -54,13 +54,13 @@ public async Task Run( await EnqueueRejectMessageAsync(input).ConfigureAwait(false); } - private async Task EnqueueRejectMessageAsync(EnqueueRejectMessageActivityInput input) + private async Task EnqueueRejectMessageAsync(ActivityInput input) { // TODO: Enqueue message in EDI instead of delay await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); } - public record EnqueueRejectMessageActivityInput( + public record ActivityInput( OrchestrationInstanceId InstanceId, string ValidationError); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs index ca6d6f3b..ae2eff08 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs @@ -31,7 +31,7 @@ internal class PerformAsyncValidationActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, AsyncValidationActivityInput activityInput, TaskOptions options) + public static Task RunActivity(TaskOrchestrationContext context, ActivityInput activityInput, TaskOptions options) { return context.CallActivityAsync( nameof(PerformAsyncValidationActivity_Brs_026_V1), @@ -41,7 +41,7 @@ public static Task RunActivity(TaskOrchestrationContext context, AsyncVali [Function(nameof(PerformAsyncValidationActivity_Brs_026_V1))] public async Task Run( - [ActivityTrigger] AsyncValidationActivityInput input) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await _progressRepository .GetAsync(input.InstanceId) @@ -64,7 +64,7 @@ private async Task PerformAsyncValidationAsync(RequestCalculatedEnergyTime return true; } - public record AsyncValidationActivityInput( + public record ActivityInput( OrchestrationInstanceId InstanceId, RequestCalculatedEnergyTimeSeriesInputV1 RequestInput); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs index ba380eec..0b8a65db 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs @@ -30,23 +30,26 @@ internal class StartOrchestrationActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, OrchestrationInstanceId orchestrationId, TaskOptions options) + public static Task RunActivity(TaskOrchestrationContext context, ActivityInput input, TaskOptions options) { return context.CallActivityAsync( nameof(StartOrchestrationActivity_Brs_026_V1), - orchestrationId, + input, options); } [Function(nameof(StartOrchestrationActivity_Brs_026_V1))] public async Task Run( - [ActivityTrigger] OrchestrationInstanceId orchestrationInstanceId) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await _progressRepository - .GetAsync(orchestrationInstanceId) + .GetAsync(input.InstanceId) .ConfigureAwait(false); orchestrationInstance.Lifecycle.TransitionToRunning(_clock); await _progressRepository.UnitOfWork.CommitAsync().ConfigureAwait(false); } + + public record ActivityInput( + OrchestrationInstanceId InstanceId); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs index 9de99584..a4fe4bc4 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs @@ -30,7 +30,7 @@ internal class TerminateOrchestrationActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, TerminateOrchestrationActivityInput input, TaskOptions options) + public static Task RunActivity(TaskOrchestrationContext context, ActivityInput input, TaskOptions options) { return context.CallActivityAsync( nameof(TerminateOrchestrationActivity_Brs_026_V1), @@ -40,7 +40,7 @@ public static Task RunActivity(TaskOrchestrationContext context, TerminateOrches [Function(nameof(TerminateOrchestrationActivity_Brs_026_V1))] public async Task Run( - [ActivityTrigger] TerminateOrchestrationActivityInput input) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await _progressRepository .GetAsync(input.InstanceId) @@ -64,7 +64,7 @@ public async Task Run( await _progressRepository.UnitOfWork.CommitAsync().ConfigureAwait(false); } - public record TerminateOrchestrationActivityInput( + public record ActivityInput( OrchestrationInstanceId InstanceId, OrchestrationInstanceTerminationStates TerminationState); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs index a87dc13e..d2e5d0b9 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs @@ -30,7 +30,7 @@ internal class TerminateStepActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, TerminateStepActivityInput input, TaskOptions options) + public static Task RunActivity(TaskOrchestrationContext context, ActivityInput input, TaskOptions options) { return context.CallActivityAsync( nameof(TerminateStepActivity_Brs_026_V1), @@ -40,7 +40,7 @@ public static Task RunActivity(TaskOrchestrationContext context, TerminateStepAc [Function(nameof(TerminateStepActivity_Brs_026_V1))] public async Task Run( - [ActivityTrigger] TerminateStepActivityInput input) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await _progressRepository .GetAsync(input.InstanceId) @@ -54,7 +54,7 @@ public async Task Run( await _progressRepository.UnitOfWork.CommitAsync().ConfigureAwait(false); } - public record TerminateStepActivityInput( + public record ActivityInput( OrchestrationInstanceId InstanceId, int StepSequence, OrchestrationStepTerminationStates TerminationState); diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs index c29339ee..7f275d0e 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs @@ -50,7 +50,11 @@ public async Task Run( var defaultRetryOptions = CreateDefaultRetryOptions(); // Set orchestration lifecycle to running - await StartOrchestrationActivity_Brs_026_V1.RunActivity(context, instanceId, defaultRetryOptions); + await StartOrchestrationActivity_Brs_026_V1.RunActivity( + context, + new StartOrchestrationActivity_Brs_026_V1.ActivityInput( + instanceId), + defaultRetryOptions); var isValid = await PerformAsyncValidation(context, instanceId, input, defaultRetryOptions); @@ -58,7 +62,7 @@ public async Task Run( { await EnqueueMessagesActivity_Brs_026_V1.RunActivity( context, - new EnqueueMessagesActivity_Brs_026_V1.EnqueueMessagesActivityInput( + new EnqueueMessagesActivity_Brs_026_V1.ActivityInput( instanceId, input), defaultRetryOptions); @@ -67,7 +71,7 @@ await EnqueueMessagesActivity_Brs_026_V1.RunActivity( { await EnqueueRejectMessageActivity_Brs_026_V1.RunActivity( context, - new EnqueueRejectMessageActivity_Brs_026_V1.EnqueueRejectMessageActivityInput( + new EnqueueRejectMessageActivity_Brs_026_V1.ActivityInput( instanceId, "Validation error"), defaultRetryOptions); @@ -80,7 +84,7 @@ await EnqueueRejectMessageActivity_Brs_026_V1.RunActivity( : OrchestrationStepTerminationStates.Failed; await TerminateStepActivity_Brs_026_V1.RunActivity( context, - new TerminateStepActivity_Brs_026_V1.TerminateStepActivityInput( + new TerminateStepActivity_Brs_026_V1.ActivityInput( instanceId, EnqueueMessagesStepSequence, enqueueMessagesTerminationState), @@ -96,7 +100,7 @@ await TerminateStepActivity_Brs_026_V1.RunActivity( await TerminateOrchestrationActivity_Brs_026_V1.RunActivity( context, - new TerminateOrchestrationActivity_Brs_026_V1.TerminateOrchestrationActivityInput( + new TerminateOrchestrationActivity_Brs_026_V1.ActivityInput( instanceId, OrchestrationInstanceTerminationStates.Failed), defaultRetryOptions); @@ -106,7 +110,7 @@ await TerminateOrchestrationActivity_Brs_026_V1.RunActivity( await TerminateOrchestrationActivity_Brs_026_V1.RunActivity( context, - new TerminateOrchestrationActivity_Brs_026_V1.TerminateOrchestrationActivityInput( + new TerminateOrchestrationActivity_Brs_026_V1.ActivityInput( instanceId, OrchestrationInstanceTerminationStates.Succeeded), defaultRetryOptions); @@ -131,7 +135,7 @@ private async Task PerformAsyncValidation( { var isValid = await PerformAsyncValidationActivity_Brs_026_V1.RunActivity( context, - new PerformAsyncValidationActivity_Brs_026_V1.AsyncValidationActivityInput( + new PerformAsyncValidationActivity_Brs_026_V1.ActivityInput( instanceId, input), retryOptions); @@ -141,7 +145,7 @@ private async Task PerformAsyncValidation( : OrchestrationStepTerminationStates.Failed; await TerminateStepActivity_Brs_026_V1.RunActivity( context, - new TerminateStepActivity_Brs_026_V1.TerminateStepActivityInput( + new TerminateStepActivity_Brs_026_V1.ActivityInput( instanceId, AsyncValidationStepSequence, asyncValidationTerminationState), From 93d5f3c3df2e047c6cac478f820f1ee3bb8ccd5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Stenr=C3=B8jl?= Date: Tue, 17 Dec 2024 10:27:28 +0100 Subject: [PATCH 08/11] wasMessagesEnqueued --- .../Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs index 7f275d0e..5f1de66a 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs @@ -77,9 +77,9 @@ await EnqueueRejectMessageActivity_Brs_026_V1.RunActivity( defaultRetryOptions); } - var messagesEnqueued = await WaitForEnqueueMessagesResponse(context, instanceId); + var wasMessagesEnqueued = await WaitForEnqueueMessagesResponse(context, instanceId); - var enqueueMessagesTerminationState = messagesEnqueued + var enqueueMessagesTerminationState = wasMessagesEnqueued ? OrchestrationStepTerminationStates.Succeeded : OrchestrationStepTerminationStates.Failed; await TerminateStepActivity_Brs_026_V1.RunActivity( @@ -90,7 +90,7 @@ await TerminateStepActivity_Brs_026_V1.RunActivity( enqueueMessagesTerminationState), defaultRetryOptions); - if (!messagesEnqueued) + if (!wasMessagesEnqueued) { var logger = context.CreateReplaySafeLogger(); logger.Log( From 21705f4d77b00c76c39b22d608f5193686941b32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Stenr=C3=B8jl?= Date: Tue, 17 Dec 2024 11:08:41 +0100 Subject: [PATCH 09/11] Refactor --- .../Activities/EnqueueMessagesActivity_Brs_026_V1.cs | 9 --------- .../EnqueueRejectMessageActivity_Brs_026_V1.cs | 8 -------- .../StartOrchestrationActivity_Brs_026_V1.cs | 9 --------- ...estration_RequestCalculatedEnergyTimeSeries_V1.cs | 12 ++++++------ 4 files changed, 6 insertions(+), 32 deletions(-) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs index a758bac1..250bf81e 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueMessagesActivity_Brs_026_V1.cs @@ -16,7 +16,6 @@ using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; using Microsoft.Azure.Functions.Worker; -using Microsoft.DurableTask; using NodaTime; namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; @@ -31,14 +30,6 @@ internal class EnqueueMessagesActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, ActivityInput activityInput, TaskOptions options) - { - return context.CallActivityAsync( - nameof(EnqueueMessagesActivity_Brs_026_V1), - activityInput, - options); - } - [Function(nameof(EnqueueMessagesActivity_Brs_026_V1))] public async Task Run( [ActivityTrigger] ActivityInput input) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs index 39b12ba2..d2e71ece 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs @@ -30,14 +30,6 @@ internal class EnqueueRejectMessageActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, ActivityInput activityInput, TaskOptions options) - { - return context.CallActivityAsync( - nameof(EnqueueRejectMessageActivity_Brs_026_V1), - activityInput, - options); - } - [Function(nameof(EnqueueRejectMessageActivity_Brs_026_V1))] public async Task Run( [ActivityTrigger] ActivityInput input) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs index 0b8a65db..29110e38 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/StartOrchestrationActivity_Brs_026_V1.cs @@ -15,7 +15,6 @@ using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; using Microsoft.Azure.Functions.Worker; -using Microsoft.DurableTask; using NodaTime; namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; @@ -30,14 +29,6 @@ internal class StartOrchestrationActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, ActivityInput input, TaskOptions options) - { - return context.CallActivityAsync( - nameof(StartOrchestrationActivity_Brs_026_V1), - input, - options); - } - [Function(nameof(StartOrchestrationActivity_Brs_026_V1))] public async Task Run( [ActivityTrigger] ActivityInput input) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs index 5f1de66a..26cf0530 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs @@ -50,8 +50,8 @@ public async Task Run( var defaultRetryOptions = CreateDefaultRetryOptions(); // Set orchestration lifecycle to running - await StartOrchestrationActivity_Brs_026_V1.RunActivity( - context, + await context.CallActivityAsync( + nameof(StartOrchestrationActivity_Brs_026_V1), new StartOrchestrationActivity_Brs_026_V1.ActivityInput( instanceId), defaultRetryOptions); @@ -60,8 +60,8 @@ await StartOrchestrationActivity_Brs_026_V1.RunActivity( if (isValid) { - await EnqueueMessagesActivity_Brs_026_V1.RunActivity( - context, + await context.CallActivityAsync( + nameof(EnqueueMessagesActivity_Brs_026_V1), new EnqueueMessagesActivity_Brs_026_V1.ActivityInput( instanceId, input), @@ -69,8 +69,8 @@ await EnqueueMessagesActivity_Brs_026_V1.RunActivity( } else { - await EnqueueRejectMessageActivity_Brs_026_V1.RunActivity( - context, + await context.CallActivityAsync( + nameof(EnqueueRejectMessageActivity_Brs_026_V1), new EnqueueRejectMessageActivity_Brs_026_V1.ActivityInput( instanceId, "Validation error"), From 995b4612284741eb292c4a4ff6bd84700155e24a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Stenr=C3=B8jl?= Date: Tue, 17 Dec 2024 11:31:26 +0100 Subject: [PATCH 10/11] Refactor --- ...ulationStepStartActivity_Brs_023_027_V1.cs | 7 +- ...ionStepTerminateActivity_Brs_023_027_V1.cs | 7 +- ...essagesStepStartActivity_Brs_023_027_V1.cs | 7 +- ...gesStepTerminateActivity_Brs_023_027_V1.cs | 7 +- ...rationInitializeActivity_Brs_023_027_V1.cs | 7 +- ...trationTerminateActivity_Brs_023_027_V1.cs | 7 +- .../V1/Orchestration_Brs_023_027_V1.cs | 40 +++++++---- ...EnqueueRejectMessageActivity_Brs_026_V1.cs | 1 - ...rformAsyncValidationActivity_Brs_026_V1.cs | 9 --- ...rminateOrchestrationActivity_Brs_026_V1.cs | 9 --- .../TerminateStepActivity_Brs_026_V1.cs | 9 --- ...on_RequestCalculatedEnergyTimeSeries_V1.cs | 66 ++++++++++--------- 12 files changed, 92 insertions(+), 84 deletions(-) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/CalculationStepStartActivity_Brs_023_027_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/CalculationStepStartActivity_Brs_023_027_V1.cs index 7f6fec2f..594de4d9 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/CalculationStepStartActivity_Brs_023_027_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/CalculationStepStartActivity_Brs_023_027_V1.cs @@ -28,10 +28,10 @@ internal class CalculationStepStartActivity_Brs_023_027_V1( { [Function(nameof(CalculationStepStartActivity_Brs_023_027_V1))] public async Task Run( - [ActivityTrigger] Guid orchestrationInstanceId) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await ProgressRepository - .GetAsync(new OrchestrationInstanceId(orchestrationInstanceId)) + .GetAsync(input.InstanceId) .ConfigureAwait(false); var step = orchestrationInstance.Steps.Single(x => x.Sequence == Orchestration_Brs_023_027_V1.CalculationStepSequence); @@ -41,4 +41,7 @@ public async Task Run( // TODO: For demo purposes; remove when done await Task.Delay(TimeSpan.FromSeconds(3)).ConfigureAwait(false); } + + public record ActivityInput( + OrchestrationInstanceId InstanceId); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/CalculationStepTerminateActivity_Brs_023_027_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/CalculationStepTerminateActivity_Brs_023_027_V1.cs index a9176e78..f499d61e 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/CalculationStepTerminateActivity_Brs_023_027_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/CalculationStepTerminateActivity_Brs_023_027_V1.cs @@ -28,10 +28,10 @@ internal class CalculationStepTerminateActivity_Brs_023_027_V1( { [Function(nameof(CalculationStepTerminateActivity_Brs_023_027_V1))] public async Task Run( - [ActivityTrigger] Guid orchestrationInstanceId) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await ProgressRepository - .GetAsync(new OrchestrationInstanceId(orchestrationInstanceId)) + .GetAsync(input.InstanceId) .ConfigureAwait(false); var step = orchestrationInstance.Steps.Single(x => x.Sequence == Orchestration_Brs_023_027_V1.CalculationStepSequence); @@ -41,4 +41,7 @@ public async Task Run( // TODO: For demo purposes; remove when done await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); } + + public record ActivityInput( + OrchestrationInstanceId InstanceId); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/EnqueueMessagesStepStartActivity_Brs_023_027_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/EnqueueMessagesStepStartActivity_Brs_023_027_V1.cs index 8c9c4674..107a945e 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/EnqueueMessagesStepStartActivity_Brs_023_027_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/EnqueueMessagesStepStartActivity_Brs_023_027_V1.cs @@ -28,10 +28,10 @@ internal class EnqueueMessagesStepStartActivity_Brs_023_027_V1( { [Function(nameof(EnqueueMessagesStepStartActivity_Brs_023_027_V1))] public async Task Run( - [ActivityTrigger] Guid orchestrationInstanceId) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await ProgressRepository - .GetAsync(new OrchestrationInstanceId(orchestrationInstanceId)) + .GetAsync(input.InstanceId) .ConfigureAwait(false); var step = orchestrationInstance.Steps.Single(x => x.Sequence == Orchestration_Brs_023_027_V1.EnqueueMessagesStepSequence); @@ -44,4 +44,7 @@ public async Task Run( await Task.Delay(TimeSpan.FromSeconds(3)).ConfigureAwait(false); } } + + public record ActivityInput( + OrchestrationInstanceId InstanceId); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/EnqueueMessagesStepTerminateActivity_Brs_023_027_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/EnqueueMessagesStepTerminateActivity_Brs_023_027_V1.cs index 8854efc1..3f0ec45a 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/EnqueueMessagesStepTerminateActivity_Brs_023_027_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/EnqueueMessagesStepTerminateActivity_Brs_023_027_V1.cs @@ -28,10 +28,10 @@ internal class EnqueueMessagesStepTerminateActivity_Brs_023_027_V1( { [Function(nameof(EnqueueMessagesStepTerminateActivity_Brs_023_027_V1))] public async Task Run( - [ActivityTrigger] Guid orchestrationInstanceId) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await ProgressRepository - .GetAsync(new OrchestrationInstanceId(orchestrationInstanceId)) + .GetAsync(input.InstanceId) .ConfigureAwait(false); var step = orchestrationInstance.Steps.Single(x => x.Sequence == Orchestration_Brs_023_027_V1.EnqueueMessagesStepSequence); @@ -44,4 +44,7 @@ public async Task Run( await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); } } + + public record ActivityInput( + OrchestrationInstanceId InstanceId); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/OrchestrationInitializeActivity_Brs_023_027_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/OrchestrationInitializeActivity_Brs_023_027_V1.cs index d14aa1bd..694e116a 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/OrchestrationInitializeActivity_Brs_023_027_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/OrchestrationInitializeActivity_Brs_023_027_V1.cs @@ -32,10 +32,10 @@ internal class OrchestrationInitializeActivity_Brs_023_027_V1( { [Function(nameof(OrchestrationInitializeActivity_Brs_023_027_V1))] public async Task Run( - [ActivityTrigger] Guid orchestrationInstanceId) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await ProgressRepository - .GetAsync(new OrchestrationInstanceId(orchestrationInstanceId)) + .GetAsync(input.InstanceId) .ConfigureAwait(false); orchestrationInstance.Lifecycle.TransitionToRunning(Clock); @@ -44,4 +44,7 @@ public async Task Run( // TODO: For demo purposes; remove when done await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); } + + public record ActivityInput( + OrchestrationInstanceId InstanceId); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/OrchestrationTerminateActivity_Brs_023_027_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/OrchestrationTerminateActivity_Brs_023_027_V1.cs index 1c9714fe..3d0fb175 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/OrchestrationTerminateActivity_Brs_023_027_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Activities/OrchestrationTerminateActivity_Brs_023_027_V1.cs @@ -32,10 +32,10 @@ internal class OrchestrationTerminateActivity_Brs_023_027_V1( { [Function(nameof(OrchestrationTerminateActivity_Brs_023_027_V1))] public async Task Run( - [ActivityTrigger] Guid orchestrationInstanceId) + [ActivityTrigger] ActivityInput input) { var orchestrationInstance = await ProgressRepository - .GetAsync(new OrchestrationInstanceId(orchestrationInstanceId)) + .GetAsync(input.InstanceId) .ConfigureAwait(false); orchestrationInstance.Lifecycle.TransitionToSucceeded(Clock); @@ -44,4 +44,7 @@ public async Task Run( // TODO: For demo purposes; remove when done await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); } + + public record ActivityInput( + OrchestrationInstanceId InstanceId); } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Orchestration_Brs_023_027_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Orchestration_Brs_023_027_V1.cs index 8dd4c5a5..2333f2bf 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Orchestration_Brs_023_027_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_023_027/V1/Orchestration_Brs_023_027_V1.cs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; using Energinet.DataHub.ProcessManagement.Core.Infrastructure.Extensions.DurableTask; using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_023_027.V1.Model; using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_023_027.V1.Activities; @@ -26,6 +27,13 @@ internal class Orchestration_Brs_023_027_V1 internal const int CalculationStepSequence = 1; internal const int EnqueueMessagesStepSequence = 2; + private readonly TaskOptions _defaultRetryOptions; + + public Orchestration_Brs_023_027_V1() + { + _defaultRetryOptions = CreateDefaultRetryOptions(); + } + [Function(nameof(Orchestration_Brs_023_027_V1))] public async Task Run( [OrchestrationTrigger] TaskOrchestrationContext context) @@ -39,39 +47,45 @@ public async Task Run( if (input == null) return "Error: No input specified."; - var defaultRetryOptions = CreateDefaultRetryOptions(); + var instanceId = new OrchestrationInstanceId(Guid.Parse(context.InstanceId)); // Initialize await context.CallActivityAsync( nameof(OrchestrationInitializeActivity_Brs_023_027_V1), - context.InstanceId, - defaultRetryOptions); + new OrchestrationInitializeActivity_Brs_023_027_V1.ActivityInput( + instanceId), + _defaultRetryOptions); // Step: Calculation await context.CallActivityAsync( nameof(CalculationStepStartActivity_Brs_023_027_V1), - context.InstanceId, - defaultRetryOptions); + new CalculationStepStartActivity_Brs_023_027_V1.ActivityInput( + instanceId), + _defaultRetryOptions); await context.CallActivityAsync( nameof(CalculationStepTerminateActivity_Brs_023_027_V1), - context.InstanceId, - defaultRetryOptions); + new CalculationStepTerminateActivity_Brs_023_027_V1.ActivityInput( + instanceId), + _defaultRetryOptions); // Step: Enqueue messages await context.CallActivityAsync( nameof(EnqueueMessagesStepStartActivity_Brs_023_027_V1), - context.InstanceId, - defaultRetryOptions); + new EnqueueMessagesStepStartActivity_Brs_023_027_V1.ActivityInput( + instanceId), + _defaultRetryOptions); await context.CallActivityAsync( nameof(EnqueueMessagesStepTerminateActivity_Brs_023_027_V1), - context.InstanceId, - defaultRetryOptions); + new EnqueueMessagesStepTerminateActivity_Brs_023_027_V1.ActivityInput( + instanceId), + _defaultRetryOptions); // Terminate await context.CallActivityAsync( nameof(OrchestrationTerminateActivity_Brs_023_027_V1), - context.InstanceId, - defaultRetryOptions); + new OrchestrationTerminateActivity_Brs_023_027_V1.ActivityInput( + instanceId), + _defaultRetryOptions); return "Success"; } diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs index d2e71ece..dc7bc89d 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/EnqueueRejectMessageActivity_Brs_026_V1.cs @@ -15,7 +15,6 @@ using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; using Microsoft.Azure.Functions.Worker; -using Microsoft.DurableTask; using NodaTime; namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs index ae2eff08..1ea40f03 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/PerformAsyncValidationActivity_Brs_026_V1.cs @@ -16,7 +16,6 @@ using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model; using Microsoft.Azure.Functions.Worker; -using Microsoft.DurableTask; using NodaTime; namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; @@ -31,14 +30,6 @@ internal class PerformAsyncValidationActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, ActivityInput activityInput, TaskOptions options) - { - return context.CallActivityAsync( - nameof(PerformAsyncValidationActivity_Brs_026_V1), - activityInput, - options); - } - [Function(nameof(PerformAsyncValidationActivity_Brs_026_V1))] public async Task Run( [ActivityTrigger] ActivityInput input) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs index a4fe4bc4..818d98e8 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateOrchestrationActivity_Brs_026_V1.cs @@ -15,7 +15,6 @@ using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; using Microsoft.Azure.Functions.Worker; -using Microsoft.DurableTask; using NodaTime; namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; @@ -30,14 +29,6 @@ internal class TerminateOrchestrationActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, ActivityInput input, TaskOptions options) - { - return context.CallActivityAsync( - nameof(TerminateOrchestrationActivity_Brs_026_V1), - input, - options); - } - [Function(nameof(TerminateOrchestrationActivity_Brs_026_V1))] public async Task Run( [ActivityTrigger] ActivityInput input) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs index d2e5d0b9..acaabf29 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Activities/TerminateStepActivity_Brs_026_V1.cs @@ -15,7 +15,6 @@ using Energinet.DataHub.ProcessManagement.Core.Application.Orchestration; using Energinet.DataHub.ProcessManagement.Core.Domain.OrchestrationInstance; using Microsoft.Azure.Functions.Worker; -using Microsoft.DurableTask; using NodaTime; namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Activities; @@ -30,14 +29,6 @@ internal class TerminateStepActivity_Brs_026_V1( private readonly IClock _clock = clock; private readonly IOrchestrationInstanceProgressRepository _progressRepository = progressRepository; - public static Task RunActivity(TaskOrchestrationContext context, ActivityInput input, TaskOptions options) - { - return context.CallActivityAsync( - nameof(TerminateStepActivity_Brs_026_V1), - input, - options); - } - [Function(nameof(TerminateStepActivity_Brs_026_V1))] public async Task Run( [ActivityTrigger] ActivityInput input) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs index 26cf0530..fc808837 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs @@ -28,6 +28,13 @@ internal class Orchestration_RequestCalculatedEnergyTimeSeries_V1 public const int AsyncValidationStepSequence = 1; public const int EnqueueMessagesStepSequence = 2; + private readonly TaskOptions _defaultRetryOptions; + + public Orchestration_RequestCalculatedEnergyTimeSeries_V1() + { + _defaultRetryOptions = CreateDefaultRetryOptions(); + } + [Function(nameof(Orchestration_RequestCalculatedEnergyTimeSeries_V1))] public async Task Run( [OrchestrationTrigger] TaskOrchestrationContext context) @@ -42,21 +49,19 @@ public async Task Run( */ var input = context.GetOrchestrationParameterValue(); - if (input == null) return "Error: No input specified."; var instanceId = new OrchestrationInstanceId(Guid.Parse(context.InstanceId)); - var defaultRetryOptions = CreateDefaultRetryOptions(); // Set orchestration lifecycle to running await context.CallActivityAsync( nameof(StartOrchestrationActivity_Brs_026_V1), new StartOrchestrationActivity_Brs_026_V1.ActivityInput( instanceId), - defaultRetryOptions); + _defaultRetryOptions); - var isValid = await PerformAsyncValidation(context, instanceId, input, defaultRetryOptions); + var isValid = await PerformAsyncValidation(context, instanceId, input); if (isValid) { @@ -65,7 +70,7 @@ await context.CallActivityAsync( new EnqueueMessagesActivity_Brs_026_V1.ActivityInput( instanceId, input), - defaultRetryOptions); + _defaultRetryOptions); } else { @@ -74,7 +79,7 @@ await context.CallActivityAsync( new EnqueueRejectMessageActivity_Brs_026_V1.ActivityInput( instanceId, "Validation error"), - defaultRetryOptions); + _defaultRetryOptions); } var wasMessagesEnqueued = await WaitForEnqueueMessagesResponse(context, instanceId); @@ -82,13 +87,13 @@ await context.CallActivityAsync( var enqueueMessagesTerminationState = wasMessagesEnqueued ? OrchestrationStepTerminationStates.Succeeded : OrchestrationStepTerminationStates.Failed; - await TerminateStepActivity_Brs_026_V1.RunActivity( - context, + await context.CallActivityAsync( + nameof(TerminateStepActivity_Brs_026_V1), new TerminateStepActivity_Brs_026_V1.ActivityInput( instanceId, EnqueueMessagesStepSequence, enqueueMessagesTerminationState), - defaultRetryOptions); + _defaultRetryOptions); if (!wasMessagesEnqueued) { @@ -98,26 +103,34 @@ await TerminateStepActivity_Brs_026_V1.RunActivity( "Timeout while waiting for enqueue messages to complete (InstanceId={OrchestrationInstanceId}).", instanceId.Value); - await TerminateOrchestrationActivity_Brs_026_V1.RunActivity( - context, + await context.CallActivityAsync( + nameof(TerminateOrchestrationActivity_Brs_026_V1), new TerminateOrchestrationActivity_Brs_026_V1.ActivityInput( instanceId, OrchestrationInstanceTerminationStates.Failed), - defaultRetryOptions); + _defaultRetryOptions); return "Error: Timeout while waiting for enqueue messages"; } - await TerminateOrchestrationActivity_Brs_026_V1.RunActivity( - context, + await context.CallActivityAsync( + nameof(TerminateOrchestrationActivity_Brs_026_V1), new TerminateOrchestrationActivity_Brs_026_V1.ActivityInput( instanceId, OrchestrationInstanceTerminationStates.Succeeded), - defaultRetryOptions); + _defaultRetryOptions); return $"Success (BusinessReason={input.BusinessReason})"; } + private static TaskOptions CreateDefaultRetryOptions() + { + return TaskOptions.FromRetryPolicy(new RetryPolicy( + maxNumberOfAttempts: 5, + firstRetryInterval: TimeSpan.FromSeconds(30), + backoffCoefficient: 2.0)); + } + private async Task WaitForEnqueueMessagesResponse(TaskOrchestrationContext context, OrchestrationInstanceId instanceId) { // TODO: Use monitor pattern to wait for "notify" from EDI @@ -130,35 +143,26 @@ private async Task WaitForEnqueueMessagesResponse(TaskOrchestrationContext private async Task PerformAsyncValidation( TaskOrchestrationContext context, OrchestrationInstanceId instanceId, - RequestCalculatedEnergyTimeSeriesInputV1 input, - TaskOptions retryOptions) + RequestCalculatedEnergyTimeSeriesInputV1 input) { - var isValid = await PerformAsyncValidationActivity_Brs_026_V1.RunActivity( - context, + var isValid = await context.CallActivityAsync( + nameof(PerformAsyncValidationActivity_Brs_026_V1), new PerformAsyncValidationActivity_Brs_026_V1.ActivityInput( instanceId, input), - retryOptions); + _defaultRetryOptions); var asyncValidationTerminationState = isValid ? OrchestrationStepTerminationStates.Succeeded : OrchestrationStepTerminationStates.Failed; - await TerminateStepActivity_Brs_026_V1.RunActivity( - context, + await context.CallActivityAsync( + nameof(TerminateStepActivity_Brs_026_V1), new TerminateStepActivity_Brs_026_V1.ActivityInput( instanceId, AsyncValidationStepSequence, asyncValidationTerminationState), - retryOptions); + _defaultRetryOptions); return isValid; } - - private TaskOptions CreateDefaultRetryOptions() - { - return TaskOptions.FromRetryPolicy(new RetryPolicy( - maxNumberOfAttempts: 5, - firstRetryInterval: TimeSpan.FromSeconds(30), - backoffCoefficient: 2.0)); - } } From ebca5bb7bce643013d50f434bacca42846e8e4f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Stenr=C3=B8jl?= Date: Tue, 17 Dec 2024 12:20:58 +0100 Subject: [PATCH 11/11] Refactor --- ...on_RequestCalculatedEnergyTimeSeries_V1.cs | 145 ++++++++++-------- 1 file changed, 81 insertions(+), 64 deletions(-) diff --git a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs index fc808837..535935d2 100644 --- a/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs +++ b/source/ProcessManager.Orchestrations/Processes/BRS_026/V1/Orchestration_RequestCalculatedEnergyTimeSeries_V1.cs @@ -39,31 +39,73 @@ public Orchestration_RequestCalculatedEnergyTimeSeries_V1() public async Task Run( [OrchestrationTrigger] TaskOrchestrationContext context) { - /* - * Orchestration: - * 1. Deserialize input - * 2. Async validation - * 3. Enqueue Messages in EDI - * 4. Wait for notify from EDI - * 5. Complete process in database - */ - var input = context.GetOrchestrationParameterValue(); if (input == null) return "Error: No input specified."; + var instanceId = await InitializeOrchestrationAsync(context); + + var isValidAsynchronousValidation = await PerformAsynchronousValidationAsync(context, instanceId, input); + await EnqueueMessagesInEdiAsync(context, instanceId, input, isValidAsynchronousValidation); + + var wasMessagesEnqueued = await WaitForEnqueueMessagesResponseFromEdiAsync(context, instanceId); + return await TerminateOrchestrationAsync(context, instanceId, input, wasMessagesEnqueued); + } + + private static TaskOptions CreateDefaultRetryOptions() + { + return TaskOptions.FromRetryPolicy(new RetryPolicy( + maxNumberOfAttempts: 5, + firstRetryInterval: TimeSpan.FromSeconds(30), + backoffCoefficient: 2.0)); + } + + private async Task InitializeOrchestrationAsync(TaskOrchestrationContext context) + { var instanceId = new OrchestrationInstanceId(Guid.Parse(context.InstanceId)); - // Set orchestration lifecycle to running await context.CallActivityAsync( nameof(StartOrchestrationActivity_Brs_026_V1), new StartOrchestrationActivity_Brs_026_V1.ActivityInput( instanceId), _defaultRetryOptions); - var isValid = await PerformAsyncValidation(context, instanceId, input); + return instanceId; + } + + private async Task PerformAsynchronousValidationAsync( + TaskOrchestrationContext context, + OrchestrationInstanceId instanceId, + RequestCalculatedEnergyTimeSeriesInputV1 input) + { + var isValid = await context.CallActivityAsync( + nameof(PerformAsyncValidationActivity_Brs_026_V1), + new PerformAsyncValidationActivity_Brs_026_V1.ActivityInput( + instanceId, + input), + _defaultRetryOptions); + + var asyncValidationTerminationState = isValid + ? OrchestrationStepTerminationStates.Succeeded + : OrchestrationStepTerminationStates.Failed; + await context.CallActivityAsync( + nameof(TerminateStepActivity_Brs_026_V1), + new TerminateStepActivity_Brs_026_V1.ActivityInput( + instanceId, + AsyncValidationStepSequence, + asyncValidationTerminationState), + _defaultRetryOptions); - if (isValid) + return isValid; + } + + private async Task EnqueueMessagesInEdiAsync( + TaskOrchestrationContext context, + OrchestrationInstanceId instanceId, + RequestCalculatedEnergyTimeSeriesInputV1 input, + bool isValidAsyncValidation) + { + if (isValidAsyncValidation) { await context.CallActivityAsync( nameof(EnqueueMessagesActivity_Brs_026_V1), @@ -81,9 +123,25 @@ await context.CallActivityAsync( "Validation error"), _defaultRetryOptions); } + } - var wasMessagesEnqueued = await WaitForEnqueueMessagesResponse(context, instanceId); + private async Task WaitForEnqueueMessagesResponseFromEdiAsync( + TaskOrchestrationContext context, + OrchestrationInstanceId instanceId) + { + // TODO: Use monitor pattern to wait for "notify" from EDI + var waitForMessagesEnqueued = context.CreateTimer(TimeSpan.FromSeconds(1), CancellationToken.None); + await waitForMessagesEnqueued; + + return true; + } + private async Task TerminateOrchestrationAsync( + TaskOrchestrationContext context, + OrchestrationInstanceId instanceId, + RequestCalculatedEnergyTimeSeriesInputV1 input, + bool wasMessagesEnqueued) + { var enqueueMessagesTerminationState = wasMessagesEnqueued ? OrchestrationStepTerminationStates.Succeeded : OrchestrationStepTerminationStates.Failed; @@ -112,57 +170,16 @@ await context.CallActivityAsync( return "Error: Timeout while waiting for enqueue messages"; } + else + { + await context.CallActivityAsync( + nameof(TerminateOrchestrationActivity_Brs_026_V1), + new TerminateOrchestrationActivity_Brs_026_V1.ActivityInput( + instanceId, + OrchestrationInstanceTerminationStates.Succeeded), + _defaultRetryOptions); - await context.CallActivityAsync( - nameof(TerminateOrchestrationActivity_Brs_026_V1), - new TerminateOrchestrationActivity_Brs_026_V1.ActivityInput( - instanceId, - OrchestrationInstanceTerminationStates.Succeeded), - _defaultRetryOptions); - - return $"Success (BusinessReason={input.BusinessReason})"; - } - - private static TaskOptions CreateDefaultRetryOptions() - { - return TaskOptions.FromRetryPolicy(new RetryPolicy( - maxNumberOfAttempts: 5, - firstRetryInterval: TimeSpan.FromSeconds(30), - backoffCoefficient: 2.0)); - } - - private async Task WaitForEnqueueMessagesResponse(TaskOrchestrationContext context, OrchestrationInstanceId instanceId) - { - // TODO: Use monitor pattern to wait for "notify" from EDI - var waitForMessagesEnqueued = context.CreateTimer(TimeSpan.FromSeconds(1), CancellationToken.None); - await waitForMessagesEnqueued; - - return true; - } - - private async Task PerformAsyncValidation( - TaskOrchestrationContext context, - OrchestrationInstanceId instanceId, - RequestCalculatedEnergyTimeSeriesInputV1 input) - { - var isValid = await context.CallActivityAsync( - nameof(PerformAsyncValidationActivity_Brs_026_V1), - new PerformAsyncValidationActivity_Brs_026_V1.ActivityInput( - instanceId, - input), - _defaultRetryOptions); - - var asyncValidationTerminationState = isValid - ? OrchestrationStepTerminationStates.Succeeded - : OrchestrationStepTerminationStates.Failed; - await context.CallActivityAsync( - nameof(TerminateStepActivity_Brs_026_V1), - new TerminateStepActivity_Brs_026_V1.ActivityInput( - instanceId, - AsyncValidationStepSequence, - asyncValidationTerminationState), - _defaultRetryOptions); - - return isValid; + return $"Success (BusinessReason={input.BusinessReason})"; + } } }