Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement BRS-026 walking skeleton #47

Merged
merged 11 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions docs/ProcessManager.Client/ReleaseNotes/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
# ProcessManager.Client Release Notes

## Version 0.14.4

- Add actor id to start orchestration messages.

## Version 0.14.3

- No functional changes
- No functional changes.

## Version 0.14.2

- No functional changes
- No functional changes.

## Version 0.14.1

- Rename service bus client options
- Rename service bus client options.

## Version 0.14.0

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# ProcessManager.Orchestrations.Abstractions Release Notes

## Version 0.2.5

- Add actor id to start orchestration messages.

## Version 0.2.4

- Update (implement) correct input for starting BRS-026 and BRS-028 orchestrations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ option csharp_namespace = "Energinet.DataHub.ProcessManager.Abstractions.Contrac
message StartOrchestrationDto {
string orchestration_name = 1; // The name of the orchestration to start.
int32 orchestration_version = 2; // The version of the orchestration to start.
string started_by_actor_id = 4; // The actor id who wants to start the orchestration
string json_input = 3; // The input to the orchestration serialized as a JSON
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<PropertyGroup>
<PackageId>Energinet.DataHub.ProcessManager.Abstractions</PackageId>
<PackageVersion>0.14.3$(VersionSuffix)</PackageVersion>
<PackageVersion>0.14.4$(VersionSuffix)</PackageVersion>
<Title>DH3 Process Manager Abstractions library</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public async Task InitializeAsync()

ProcessManagerTopic = await ServiceBusResourceProvider.BuildTopic("pm-topic")
.AddSubscription(brs026SubscriptionName)
.AddSubjectFilter("Brs_026")
.AddSubscription(brs021ForwardMeteredDataSubscriptionName)
.AddSubjectFilter("Brs_021_ForwardMeteredData")
.CreateAsync();
var brs026Subscription = ProcessManagerTopic.Subscriptions.Single(x => x.SubscriptionName.Equals(brs026SubscriptionName));
var brs021ForwardMeteredDataSubscription = ProcessManagerTopic.Subscriptions.Single(x => x.SubscriptionName.Equals(brs021ForwardMeteredDataSubscriptionName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public async Task RequestCalculatedEnergyTimeSeries_WhenStarted_OrchestrationCom
// Assert
var orchestration = await _fixture.DurableClient.WaitForOrchestationStartedAsync(
createdTimeFrom: orchestrationCreatedAfter,
name: "RequestCalculatedEnergyTimeSeriesOrchestrationV1");
name: "Orchestration_RequestCalculatedEnergyTimeSeries_V1");
orchestration.Input.ToString().Should().Contain(businessReason);

var completedOrchestration = await _fixture.DurableClient.WaitForOrchestrationCompletedAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<PropertyGroup>
<PackageId>Energinet.DataHub.ProcessManager.Client</PackageId>
<PackageVersion>0.14.3$(VersionSuffix)</PackageVersion>
<PackageVersion>0.14.4$(VersionSuffix)</PackageVersion>
<Title>DH3 Process Manager Client library</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down
14 changes: 5 additions & 9 deletions source/ProcessManager.Client/ProcessManagerMessageClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,28 @@ public Task StartNewOrchestrationInstanceAsync<TInputParameterDto>(
CancellationToken cancellationToken)
where TInputParameterDto : IInputParameterDto
{
var serviceBusMessage = CreateServiceBusMessage(
command.OrchestrationDescriptionUniqueName.Name,
command.OrchestrationDescriptionUniqueName.Version,
command);
var serviceBusMessage = CreateServiceBusMessage(command);

return SendServiceBusMessage(
serviceBusMessage,
cancellationToken);
}

private ServiceBusMessage CreateServiceBusMessage<TInputParameterDto>(
string orchestrationName,
int orchestrationVersion,
MessageCommand<TInputParameterDto> command)
where TInputParameterDto : IInputParameterDto
{
var message = new StartOrchestrationDto
{
OrchestrationName = orchestrationName,
OrchestrationVersion = orchestrationVersion,
OrchestrationName = command.OrchestrationDescriptionUniqueName.Name,
OrchestrationVersion = command.OrchestrationDescriptionUniqueName.Version,
StartedByActorId = command.OperatingIdentity.ActorId.ToString(),
JsonInput = JsonSerializer.Serialize(command.InputParameter),
};

ServiceBusMessage serviceBusMessage = new(JsonFormatter.Default.Format(message))
{
Subject = orchestrationName,
Subject = command.OrchestrationDescriptionUniqueName.Name,
MessageId = command.MessageId,
ContentType = "application/json",
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,39 @@ private OrchestrationInstance()
/// </summary>
internal OrchestrationDescriptionId OrchestrationDescriptionId { get; }

/// <summary>
/// Transition a step's lifecycle to running
/// </summary>
/// <param name="sequence">The sequence number of the step to transition</param>
/// <param name="clock"></param>
/// <exception cref="ArgumentOutOfRangeException">Thrown if a step with the given <paramref name="sequence"/> isn't found.</exception>
public void TransitionStepToRunning(int sequence, IClock clock)
{
var step = Steps.SingleOrDefault(s => s.Sequence == sequence);

if (step == null)
throw new ArgumentOutOfRangeException(nameof(sequence), sequence, "A step with the given sequence does not exist");

step.Lifecycle.TransitionToRunning(clock);
}

/// <summary>
/// Transition a step's lifecycle to terminated, with the given <paramref name="terminationState"/>
/// </summary>
/// <param name="sequence">The sequence number of the step to transition</param>
/// <param name="terminationState">The state of the termination step (Succeeded, failed etc.)</param>
/// <param name="clock"></param>
/// <exception cref="ArgumentOutOfRangeException">Thrown if a step with the given <paramref name="sequence"/> isn't found.</exception>
public void TransitionStepToTerminated(int sequence, OrchestrationStepTerminationStates terminationState, IClock clock)
{
var step = Steps.SingleOrDefault(s => s.Sequence == sequence);

if (step == null)
throw new ArgumentOutOfRangeException(nameof(sequence), sequence, "A step with the given sequence does not exist");

step.Lifecycle.TransitionToTerminated(clock, terminationState);
}

/// <summary>
/// Factory method that ensures domain rules are obeyed when creating a new
/// orchestration instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<PropertyGroup>
<PackageId>Energinet.DataHub.ProcessManager.Orchestrations.Abstractions</PackageId>
<PackageVersion>0.2.4$(VersionSuffix)</PackageVersion>
<PackageVersion>0.2.5$(VersionSuffix)</PackageVersion>
<Title>DH3 Process Manager Orchestrations Abstractions library</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,14 @@ public async Task StartAsync(

if (brs026Subscription is null)
{
topicResourceBuilder.AddSubscription(brs026SubscriptionName);
topicResourceBuilder.AddSubscription(brs026SubscriptionName)
.AddSubjectFilter("Brs_026");
}

if (brs021ForwardMeteredDataSubscription is null)
{
topicResourceBuilder.AddSubscription(brs021ForwardMeteredDataSubscriptionName);
topicResourceBuilder.AddSubscription(brs021ForwardMeteredDataSubscriptionName)
.AddSubjectFilter("Brs_021_ForwardMeteredData");
}

var topicResource = await topicResourceBuilder.CreateAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
<ItemGroup>
<Folder Include="Api\Mappers\" />
<Folder Include="Processes\BRS_021\ForwardMeteredData\" />
<Folder Include="Processes\BRS_026\V1\Activities\" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\Shared\ProcessManager\Api\Mappers\OrchestrationInstanceMapperExtensions.cs" Link="Api\Mappers\OrchestrationInstanceMapperExtensions.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ internal class CalculationStepStartActivity_Brs_023_027_V1(
{
[Function(nameof(CalculationStepStartActivity_Brs_023_027_V1))]
public async Task Run(
[ActivityTrigger] Guid orchestrationInstanceId)
[ActivityTrigger] ActivityInput input)
{
var orchestrationInstance = await ProgressRepository
.GetAsync(new OrchestrationInstanceId(orchestrationInstanceId))
.GetAsync(input.InstanceId)
.ConfigureAwait(false);

var step = orchestrationInstance.Steps.Single(x => x.Sequence == Orchestration_Brs_023_027_V1.CalculationStepSequence);
Expand All @@ -41,4 +41,7 @@ public async Task Run(
// TODO: For demo purposes; remove when done
await Task.Delay(TimeSpan.FromSeconds(3)).ConfigureAwait(false);
}

public record ActivityInput(
OrchestrationInstanceId InstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ internal class CalculationStepTerminateActivity_Brs_023_027_V1(
{
[Function(nameof(CalculationStepTerminateActivity_Brs_023_027_V1))]
public async Task Run(
[ActivityTrigger] Guid orchestrationInstanceId)
[ActivityTrigger] ActivityInput input)
{
var orchestrationInstance = await ProgressRepository
.GetAsync(new OrchestrationInstanceId(orchestrationInstanceId))
.GetAsync(input.InstanceId)
.ConfigureAwait(false);

var step = orchestrationInstance.Steps.Single(x => x.Sequence == Orchestration_Brs_023_027_V1.CalculationStepSequence);
Expand All @@ -41,4 +41,7 @@ public async Task Run(
// TODO: For demo purposes; remove when done
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
}

public record ActivityInput(
OrchestrationInstanceId InstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ internal class EnqueueMessagesStepStartActivity_Brs_023_027_V1(
{
[Function(nameof(EnqueueMessagesStepStartActivity_Brs_023_027_V1))]
public async Task Run(
[ActivityTrigger] Guid orchestrationInstanceId)
[ActivityTrigger] ActivityInput input)
{
var orchestrationInstance = await ProgressRepository
.GetAsync(new OrchestrationInstanceId(orchestrationInstanceId))
.GetAsync(input.InstanceId)
.ConfigureAwait(false);

var step = orchestrationInstance.Steps.Single(x => x.Sequence == Orchestration_Brs_023_027_V1.EnqueueMessagesStepSequence);
Expand All @@ -44,4 +44,7 @@ public async Task Run(
await Task.Delay(TimeSpan.FromSeconds(3)).ConfigureAwait(false);
}
}

public record ActivityInput(
OrchestrationInstanceId InstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ internal class EnqueueMessagesStepTerminateActivity_Brs_023_027_V1(
{
[Function(nameof(EnqueueMessagesStepTerminateActivity_Brs_023_027_V1))]
public async Task Run(
[ActivityTrigger] Guid orchestrationInstanceId)
[ActivityTrigger] ActivityInput input)
{
var orchestrationInstance = await ProgressRepository
.GetAsync(new OrchestrationInstanceId(orchestrationInstanceId))
.GetAsync(input.InstanceId)
.ConfigureAwait(false);

var step = orchestrationInstance.Steps.Single(x => x.Sequence == Orchestration_Brs_023_027_V1.EnqueueMessagesStepSequence);
Expand All @@ -44,4 +44,7 @@ public async Task Run(
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
}
}

public record ActivityInput(
OrchestrationInstanceId InstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ internal class OrchestrationInitializeActivity_Brs_023_027_V1(
{
[Function(nameof(OrchestrationInitializeActivity_Brs_023_027_V1))]
public async Task Run(
[ActivityTrigger] Guid orchestrationInstanceId)
[ActivityTrigger] ActivityInput input)
{
var orchestrationInstance = await ProgressRepository
.GetAsync(new OrchestrationInstanceId(orchestrationInstanceId))
.GetAsync(input.InstanceId)
.ConfigureAwait(false);

orchestrationInstance.Lifecycle.TransitionToRunning(Clock);
Expand All @@ -44,4 +44,7 @@ public async Task Run(
// TODO: For demo purposes; remove when done
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
}

public record ActivityInput(
OrchestrationInstanceId InstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ internal class OrchestrationTerminateActivity_Brs_023_027_V1(
{
[Function(nameof(OrchestrationTerminateActivity_Brs_023_027_V1))]
public async Task Run(
[ActivityTrigger] Guid orchestrationInstanceId)
[ActivityTrigger] ActivityInput input)
{
var orchestrationInstance = await ProgressRepository
.GetAsync(new OrchestrationInstanceId(orchestrationInstanceId))
.GetAsync(input.InstanceId)
.ConfigureAwait(false);

orchestrationInstance.Lifecycle.TransitionToSucceeded(Clock);
Expand All @@ -44,4 +44,7 @@ public async Task Run(
// TODO: For demo purposes; remove when done
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
}

public record ActivityInput(
OrchestrationInstanceId InstanceId);
}
Loading
Loading