Skip to content

Commit

Permalink
Add RequestCalculatedEnergyTimeSeriesTriggerV1
Browse files Browse the repository at this point in the history
  • Loading branch information
ebbeknudsen committed Nov 14, 2024
1 parent 39a7369 commit e4cf282
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 16 deletions.
17 changes: 17 additions & 0 deletions source/ProcessManager.Client/ProcessManager.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
<Compile Include="..\Shared\ProcessManager\Api\Model\ScheduleOrchestrationInstanceDto.cs" Link="Model\ScheduleOrchestrationInstanceDto.cs" />
<Compile Include="..\Shared\ProcessManager\Orchestrations\Processes\BRS_023_027\V1\Model\CalculationTypes.cs" Link="Processes\BRS_023_027\V1\Model\CalculationTypes.cs" />
<Compile Include="..\Shared\ProcessManager\Orchestrations\Processes\BRS_023_027\V1\Model\NotifyAggregatedMeasureDataInputV1.cs" Link="Processes\BRS_023_027\V1\Model\NotifyAggregatedMeasureDataInputV1.cs" />
<Compile Include="..\Shared\ProcessManager\Orchestrations\Processes\BRS_026\V1\Model\RequestCalculatedEnergyTimeSeriesInputV1.cs">
<Link>Processes\BRS_026_028\V1\Model\RequestCalculatedEnergyTimeSeriesInputV1.cs</Link>
</Compile>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
Expand All @@ -74,6 +77,20 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Google.Protobuf" Version="3.28.2" />
<PackageReference Include="Grpc.Tools" Version="2.62.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<Protobuf Include="../Shared/**/*.proto">
<GrpcServices>None</GrpcServices>
<Access>Public</Access>
<ProtoCompile>True</ProtoCompile>
<CompileOutputs>True</CompileOutputs>
<Generator>MSBuild:Compile</Generator>
</Protobuf>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Dynamic;
using Energinet.DataHub.ProcessManager.Client.Processes.BRS_026_028.V1.Model;
using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Models;

namespace Energinet.DataHub.ProcessManager.Client.Processes.BRS_026_028.V1;

Expand All @@ -24,10 +25,10 @@ public interface IRequestCalculatedDataClientV1
/// <summary>
/// Start a request for energy results
/// </summary>
public Task RequestCalculatedEnergyTimeSeriesAsync(ExpandoObject input, CancellationToken cancellationToken);
public Task RequestCalculatedEnergyTimeSeriesAsync(RequestCalculatedDataInputV1<RequestCalculatedEnergyTimeSeriesInputV1> input, CancellationToken cancellationToken);

/// <summary>
/// Start a request for wholesale results
/// </summary>
public Task RequestCalculatedWholesaleServicesAsync(ExpandoObject input, CancellationToken cancellationToken);
public Task RequestCalculatedWholesaleServicesAsync(RequestCalculatedDataInputV1<object> input, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Energinet.DataHub.ProcessManager.Client.Processes.BRS_026_028.V1.Model;

public record RequestCalculatedDataInputV1<TInput>(
string MessageId,
TInput Input)
where TInput : class;
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,70 @@
using System.Text.Json;
using Azure.Messaging.ServiceBus;
using Energinet.DataHub.ProcessManager.Client.Extensions.Options;
using Energinet.DataHub.ProcessManager.Client.Processes.BRS_026_028.V1.Model;
using Energinet.DataHub.ProcessManager.Orchestrations.Contracts;
using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Models;
using Google.Protobuf;
using Microsoft.Extensions.Azure;

namespace Energinet.DataHub.ProcessManager.Client.Processes.BRS_026_028.V1;

public class RequestCalculatedDataClientV1(
IAzureClientFactory<ServiceBusSender> serviceBusFactory) : IRequestCalculatedDataClientV1
{
private readonly ServiceBusSender _serviceBusSender = serviceBusFactory.CreateClient(nameof(ProcessManagerClientOptions.ProcessManagerTopic));
private readonly ServiceBusSender _serviceBusSender = serviceBusFactory.CreateClient(nameof(ProcessManagerServiceBusOptions.ProcessManagerTopic));

public async Task RequestCalculatedEnergyTimeSeriesAsync(ExpandoObject input, CancellationToken cancellationToken)
public async Task RequestCalculatedEnergyTimeSeriesAsync(RequestCalculatedDataInputV1<RequestCalculatedEnergyTimeSeriesInputV1> input, CancellationToken cancellationToken)
{
// TODO: Should input be generic or specific to the process?
// TODO: Create and send "Start orchestration DTO" protobuf messages
var jsonMessage = JsonSerializer.Serialize(input);
ServiceBusMessage serviceBusMessage = new(jsonMessage);
await _serviceBusSender.SendMessageAsync(serviceBusMessage, cancellationToken)
var serviceBusMessage = CreateServiceBusMessage(
"BRS_026",
1,
input);

await SendServiceBusMessage(
serviceBusMessage,
cancellationToken)
.ConfigureAwait(false);
}

public async Task RequestCalculatedWholesaleServicesAsync(RequestCalculatedDataInputV1<object> input, CancellationToken cancellationToken)
{
var serviceBusMessage = CreateServiceBusMessage(
"BRS_028",
1,
input);

await SendServiceBusMessage(serviceBusMessage, cancellationToken)
.ConfigureAwait(false);
}

public async Task RequestCalculatedWholesaleServicesAsync(ExpandoObject input, CancellationToken cancellationToken)
private ServiceBusMessage CreateServiceBusMessage<TInput>(
string orchestrationName,
int orchestrationVersion,
RequestCalculatedDataInputV1<TInput> input)
where TInput : class
{
var message = new StartOrchestrationDto
{
OrchestrationName = orchestrationName,
OrchestrationVersion = orchestrationVersion,
JsonInput = JsonSerializer.Serialize(input.Input),
};

ServiceBusMessage serviceBusMessage = new(JsonFormatter.Default.Format(message))
{
Subject = orchestrationName,
MessageId = input.MessageId,
ContentType = "application/json",
};

return serviceBusMessage;
}

private async Task SendServiceBusMessage(
ServiceBusMessage serviceBusMessage,
CancellationToken cancellationToken)
{
// TODO: Should input be generic or specific to the process?
// TODO: Create and send "Start orchestration DTO" protobuf messages
var jsonMessage = JsonSerializer.Serialize(input);
ServiceBusMessage serviceBusMessage = new(jsonMessage);
await _serviceBusSender.SendMessageAsync(serviceBusMessage, cancellationToken)
.ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
</PropertyGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.ServiceBus" Version="5.22.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.17.4" />
<PackageReference Include="Energinet.DataHub.Core.App.FunctionApp" Version="13.2.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.1.5" />
<PackageReference Include="Google.Protobuf" Version="3.28.2" />
<PackageReference Include="Grpc.Tools" Version="2.62.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\ProcessManager.Core\ProcessManager.Core.csproj" />
Expand All @@ -29,11 +35,24 @@
<ItemGroup>
<Folder Include="Api\Model\" />
<Folder Include="Processes\BRS_021\" />
<Folder Include="Processes\BRS_026_028\" />
<Folder Include="Processes\BRS_026\V1\Activities\" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\Shared\ProcessManager\Api\Model\ScheduleOrchestrationInstanceDto.cs" Link="Api\Model\ScheduleOrchestrationInstanceDto.cs" />
<Compile Include="..\Shared\ProcessManager\Orchestrations\Processes\BRS_023_027\V1\Model\CalculationTypes.cs" Link="Processes\BRS_023_027\V1\Model\CalculationTypes.cs" />
<Compile Include="..\Shared\ProcessManager\Orchestrations\Processes\BRS_023_027\V1\Model\NotifyAggregatedMeasureDataInputV1.cs" Link="Processes\BRS_023_027\V1\Model\NotifyAggregatedMeasureDataInputV1.cs" />
<Compile Include="..\Shared\ProcessManager\Orchestrations\Processes\BRS_026\V1\Model\RequestCalculatedEnergyTimeSeriesInputV1.cs">
<Link>Processes\BRS_026\V1\Models\RequestCalculatedEnergyTimeSeriesInputV1.cs</Link>
</Compile>
</ItemGroup>

<ItemGroup>
<Protobuf Include="../Shared/**/*.proto">
<GrpcServices>None</GrpcServices>
<Access>Public</Access>
<ProtoCompile>True</ProtoCompile>
<CompileOutputs>True</CompileOutputs>
<Generator>MSBuild:Compile</Generator>
</Protobuf>
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Energinet.DataHub.ProcessManagement.Core.Application;
using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Models;

namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1;

public class RequestCalculatedEnergyTimeSeriesHandler(
IOrchestrationInstanceManager manager)
{
private readonly IOrchestrationInstanceManager _manager = manager;

public async Task StartRequestCalculatedEnergyTimeSeriesAsync(RequestCalculatedEnergyTimeSeriesInputV1 input)
{
await _manager.StartNewOrchestrationInstanceAsync(
"BRS_026",
1,
input,
[])
.ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Energinet.DataHub.ProcessManagement.Core.Infrastructure.Extensions.DurableTask;
using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Models;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;

namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1;

// TODO: Implement according to guidelines: https://energinet.atlassian.net/wiki/spaces/D3/pages/824803345/Durable+Functions+Development+Guidelines
internal class RequestCalculatedEnergyTimeSeriesOrchestrationV1
{
[Function(nameof(RequestCalculatedEnergyTimeSeriesOrchestrationV1))]
public async Task<string> Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var input = context.GetOrchestrationParameterValue<RequestCalculatedEnergyTimeSeriesInputV1>();

if (input == null)
return "Error: No input specified.";

await Task.CompletedTask;

/*
* Steps:
* 1. Deserialize input
* 2. Async validation
* 3. Query databricks and upload to storage account
* 4. Enqueue Messages in EDI
* 5. Wait for notify from EDI
* 6. Complete process in database
*/

return $"Success (BusinessReason={input.BusinessReason})";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Text.Json;
using Azure.Messaging.ServiceBus;
using Energinet.DataHub.ProcessManager.Orchestrations.Contracts;
using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Models;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1;

public class RequestCalculatedEnergyTimeSeriesTriggerV1(
ILogger<RequestCalculatedEnergyTimeSeriesTriggerV1> logger,
RequestCalculatedEnergyTimeSeriesHandler handler)
{
private readonly ILogger<RequestCalculatedEnergyTimeSeriesTriggerV1> _logger = logger;
private readonly RequestCalculatedEnergyTimeSeriesHandler _handler = handler;

/// <summary>
/// Schedule a BRS-023 or BRS-027 calculation and return its id.
/// </summary>
[Function(nameof(RequestCalculatedEnergyTimeSeriesTriggerV1))]
public async Task Run(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
{
using var serviceBusMessageLoggerScope = _logger.BeginScope(new
{
ServiceBusMessage = new
{
message.MessageId,
message.CorrelationId,
message.Subject,
},
});

var jsonMessage = message.Body.ToString();
var startOrchestrationDto = StartOrchestrationDto.Parser.ParseJson(jsonMessage);
using var startOrchestrationLoggerScope = _logger.BeginScope(new
{
StartOrchestration = new
{
startOrchestrationDto.OrchestrationName,
startOrchestrationDto.OrchestrationVersion,
},
});

var requestCalculatedEnergyTimeSeriesDto = JsonSerializer.Deserialize<RequestCalculatedEnergyTimeSeriesInputV1>(startOrchestrationDto.JsonInput);
if (requestCalculatedEnergyTimeSeriesDto is null)
{
_logger.LogWarning($"Unable to deserialize {nameof(startOrchestrationDto.JsonInput)} to {nameof(RequestCalculatedEnergyTimeSeriesInputV1)} type:{Environment.NewLine}{0}", startOrchestrationDto.JsonInput);
throw new ArgumentException($"Unable to deserialize {nameof(startOrchestrationDto.JsonInput)} to {nameof(RequestCalculatedEnergyTimeSeriesInputV1)} type");
}

await _handler.StartRequestCalculatedEnergyTimeSeriesAsync(requestCalculatedEnergyTimeSeriesDto)
.ConfigureAwait(false);
}
}
12 changes: 12 additions & 0 deletions source/ProcessManager.Orchestrations/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
using Energinet.DataHub.ProcessManagement.Core.Infrastructure.Telemetry;
using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_023_027.V1;
using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_023_027.V1.Model;
using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1;
using Energinet.DataHub.ProcessManager.Orchestrations.Processes.BRS_026.V1.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

Expand Down Expand Up @@ -55,6 +57,16 @@
canBeSkipped: true,
skipReason: "Do not perform this step for an internal calculation.");

var brs_026_v1 = new OrchestrationDescription(
name: "BRS_026",
version: 1,
canBeScheduled: false,
functionName: nameof(RequestCalculatedEnergyTimeSeriesOrchestrationV1));
brs_026_v1.ParameterDefinition.SetFromType<RequestCalculatedEnergyTimeSeriesInputV1>();
brs_026_v1.AppendStepDescription("Asynkron validering");
brs_026_v1.AppendStepDescription("Hent anmodningsdata");
brs_026_v1.AppendStepDescription("Udsend beskeder");

return [brs_023_027_v1];
});
// => Handlers
Expand Down
Loading

0 comments on commit e4cf282

Please sign in to comment.