Skip to content

Commit

Permalink
feat: Send wholesale and energy requests to Process Manager (#1418)
Browse files Browse the repository at this point in the history
* Send Wholesale/Energi requests to PM

- Add ActorId to authenticated actor
- Use ProcessManager.Client to start processes in PM

* Update EDI to start process for each transaction

* Move process manager call to incoming messages

* Default to an "empty" spy if none is registered

* Add missing health check

* Use actual values for starting process

* Add test for mapping

* Add app test for starting energy request

* Update packages

* Move actor id to variables
  • Loading branch information
ebbeknudsen authored Dec 16, 2024
1 parent 9e5fa09 commit 431e207
Show file tree
Hide file tree
Showing 47 changed files with 892 additions and 367 deletions.
3 changes: 3 additions & 0 deletions source/ArchitectureTests/RegistrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
using Energinet.DataHub.EDI.Process.Application.Transactions.AggregatedMeasureData;
using Energinet.DataHub.EDI.Process.Infrastructure.Configuration.DataAccess;
using Energinet.DataHub.EDI.Process.Infrastructure.Configuration.Options;
using Energinet.DataHub.ProcessManager.Client.Extensions.Options;
using Energinet.DataHub.Wholesale.Common.Infrastructure.Options;
using FluentAssertions;
using FluentAssertions.Execution;
Expand Down Expand Up @@ -76,6 +77,8 @@ public RegistrationTests()
Environment.SetEnvironmentVariable($"{EdiDatabricksOptions.SectionName}__{nameof(EdiDatabricksOptions.CatalogName)}", "FakeCatalogName");
Environment.SetEnvironmentVariable($"{nameof(DeltaTableOptions.DatabricksCatalogName)}", "FakeCatalogName");

Environment.SetEnvironmentVariable($"{ProcessManagerServiceBusClientOptions.SectionName}__{nameof(ProcessManagerServiceBusClientOptions.TopicName)}", "FakeTopicName");

// Dead-letter logging
Environment.SetEnvironmentVariable($"{BlobDeadLetterLoggerOptions.SectionName}__{nameof(BlobDeadLetterLoggerOptions.StorageAccountUrl)}", TestEnvironment.CreateFakeStorageUrl());
Environment.SetEnvironmentVariable($"{BlobDeadLetterLoggerOptions.SectionName}__{nameof(BlobDeadLetterLoggerOptions.ContainerName)}", "fake-container-name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ namespace Energinet.DataHub.EDI.ArchivedMessages.IntegrationTests;
[Collection(nameof(ArchivedMessagesCollection))]
public class ArchivedMessagesWithOwnedRestrictionTests : IAsyncLifetime
{
private static readonly Guid _actorId = Guid.Parse("00000000-0000-0000-0000-000000000001");
private readonly IArchivedMessagesClient _sut;
private readonly ArchivedMessagesFixture _fixture;

private readonly ActorIdentity _authenticatedActor = new(
ActorNumber.Create("1234512345888"),
Restriction.Owned,
ActorRole.EnergySupplier);
ActorRole.EnergySupplier,
_actorId);

public ArchivedMessagesWithOwnedRestrictionTests(ArchivedMessagesFixture fixture, ITestOutputHelper testOutputHelper)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ namespace Energinet.DataHub.EDI.ArchivedMessages.IntegrationTests;
[Collection(nameof(ArchivedMessagesCollection))]
public class ArchivedMessagesWithoutRestrictionTests : IAsyncLifetime
{
private static readonly Guid _actorId = Guid.Parse("00000000-0000-0000-0000-000000000001");
private readonly IArchivedMessagesClient _sut;
private readonly ArchivedMessagesFixture _fixture;

private readonly ActorIdentity _authenticatedActor = new(
ActorNumber.Create("1234512345811"),
Restriction.None,
ActorRole.MeteredDataAdministrator);
ActorRole.MeteredDataAdministrator,
_actorId);

public ArchivedMessagesWithoutRestrictionTests(ArchivedMessagesFixture fixture, ITestOutputHelper testOutputHelper)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace Energinet.DataHub.EDI.ArchivedMessages.IntegrationTests.Fixture;

public class ArchivedMessagesFixture : IDisposable, IAsyncLifetime
{
private readonly Guid _actorId = Guid.Parse("00000000-0000-0000-0000-000000000001");
private bool _disposed;

public AzuriteManager AzuriteManager { get; } = new(true);
Expand Down Expand Up @@ -112,7 +113,8 @@ public async Task InitializeAsync()
new ActorIdentity(
ActorNumber.Create("1234512345888"),
restriction: Restriction.None,
ActorRole.MeteredDataAdministrator));
ActorRole.MeteredDataAdministrator,
_actorId));

ArchivedMessagesClient = Services.GetRequiredService<IArchivedMessagesClient>();
}
Expand Down
3 changes: 3 additions & 0 deletions source/B2BApi.AppTests/B2BApi.AppTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
<None Update="TestData\CalculationResults\wholesale_calculation_results.amounts_per_charge_v1.csv">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="TestData\Messages\json\RequestAggregatedMeasureDataForEnergySupplier.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

<ItemDefinitionGroup>
Expand Down
25 changes: 21 additions & 4 deletions source/B2BApi.AppTests/Fixtures/B2BApiAppFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
using Energinet.DataHub.EDI.IntegrationTests.AuditLog.Fixture;
using Energinet.DataHub.EDI.OutgoingMessages.Infrastructure.Extensions.Options;
using Energinet.DataHub.EDI.Process.Infrastructure.Configuration.Options;
using Energinet.DataHub.ProcessManager.Client.Extensions.Options;
using Energinet.DataHub.RevisionLog.Integration.Options;
using Energinet.DataHub.Wholesale.Common.Infrastructure.Options;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
Expand Down Expand Up @@ -137,7 +138,10 @@ public B2BApiAppFixture()
/// Topic resource for integration events.
/// </summary>
[NotNull]
public TopicResource? TopicResource { get; private set; }
public TopicResource? IntegrationEventsTopicResource { get; private set; }

[NotNull]
public TopicResource? ProcessManagerTopicResource { get; private set; }

public ServiceBusListenerMock ServiceBusListenerMock { get; }

Expand Down Expand Up @@ -179,15 +183,26 @@ public async Task InitializeAsync()
LogStopwatch(stopwatch, nameof(CreateAppHostSettings));

// ServiceBus entities
TopicResource = await ServiceBusResourceProvider
IntegrationEventsTopicResource = await ServiceBusResourceProvider
.BuildTopic("integration-events")
.Do(topic => appHostSettings.ProcessEnvironmentVariables
.Add($"{IntegrationEventsOptions.SectionName}__{nameof(IntegrationEventsOptions.TopicName)}", topic.Name))
.AddSubscription("subscription")
.Do(subscription => appHostSettings.ProcessEnvironmentVariables
.Add($"{IntegrationEventsOptions.SectionName}__{nameof(IntegrationEventsOptions.SubscriptionName)}", subscription.SubscriptionName))
.CreateAsync();
LogStopwatch(stopwatch, nameof(TopicResource));
LogStopwatch(stopwatch, nameof(IntegrationEventsTopicResource));

ProcessManagerTopicResource = await ServiceBusResourceProvider
.BuildTopic("process-manager")
.Do(topic => appHostSettings.ProcessEnvironmentVariables
.Add($"{ProcessManagerServiceBusClientOptions.SectionName}__{nameof(ProcessManagerServiceBusClientOptions.TopicName)}", topic.Name))
.AddSubscription("process-manager-subscription")
.CreateAsync();
LogStopwatch(stopwatch, nameof(ProcessManagerTopicResource));
await ServiceBusListenerMock.AddTopicSubscriptionListenerAsync(
topicName: ProcessManagerTopicResource.Name,
subscriptionName: ProcessManagerTopicResource.Subscriptions.Single().SubscriptionName);

await ServiceBusResourceProvider
.BuildQueue("edi-inbox")
Expand Down Expand Up @@ -263,11 +278,13 @@ public void SetTestOutputHelper(ITestOutputHelper testOutputHelper)
}

public void EnsureAppHostUsesFeatureFlagValue(
bool useRequestWholesaleServicesOrchestration = false)
bool useRequestWholesaleServicesOrchestration = false,
bool useRequestAggregatedMeasureDataOrchestration = false)
{
AppHostManager.RestartHostIfChanges(new Dictionary<string, string>
{
{ $"FeatureManagement__{FeatureFlagName.UseRequestWholesaleServicesProcessOrchestration.ToString()}", useRequestWholesaleServicesOrchestration.ToString().ToLower() },
{ $"FeatureManagement__{FeatureFlagName.UseRequestAggregatedMeasureDataProcessOrchestration.ToString()}", useRequestAggregatedMeasureDataOrchestration.ToString().ToLower() },
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public static class HttpRequestExtensions
{
public static Task<HttpRequestMessage> CreateRequestWholesaleServicesHttpRequestAsync(
this B2BApiAppFixture fixture,
Actor actor)
Actor actor,
string? transactionId = null)
{
var documentPath = actor.ActorRole.Name switch
{
Expand All @@ -38,7 +39,28 @@ public static Task<HttpRequestMessage> CreateRequestWholesaleServicesHttpRequest
documentPath,
IncomingDocumentType.RequestWholesaleSettlement.Name,
"application/xml",
actor);
actor,
transactionId);
}

public static Task<HttpRequestMessage> CreateRequestAggregatedMeasureDataHttpRequestAsync(
this B2BApiAppFixture fixture,
Actor actor,
string? transactionId = null)
{
var documentPath = actor.ActorRole.Name switch
{
DataHubNames.ActorRole.EnergySupplier => "TestData/Messages/json/RequestAggregatedMeasureDataForEnergySupplier.json",
_ => throw new ArgumentOutOfRangeException(actor.ActorRole.Name),
};

return CreateIncomingMessageHttpRequestAsync(
fixture,
documentPath,
IncomingDocumentType.RequestAggregatedMeasureData.Name,
"application/json",
actor,
transactionId);
}

public static Task<HttpRequestMessage> CreatePeekHttpRequestAsync(
Expand Down Expand Up @@ -70,7 +92,8 @@ private static async Task<HttpRequestMessage> CreateIncomingMessageHttpRequestAs
string filePath,
string documentType,
string contentType,
Actor actor)
Actor actor,
string? transactionId)
{
HttpRequestMessage? request = null;
try
Expand All @@ -80,7 +103,7 @@ private static async Task<HttpRequestMessage> CreateIncomingMessageHttpRequestAs
.Replace("{ActorNumber}", actor.ActorNumber.Value)
.Replace("{ActorRole}", actor.ActorRole.Code)
.Replace("{MessageId}", Guid.NewGuid().ToString())
.Replace("{TransactionId}", Guid.NewGuid().ToString());
.Replace("{TransactionId}", transactionId ?? Guid.NewGuid().ToString());

request = await CreateHttpRequestAsync(
fixture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public async Task Given_CalculationOrchestrationId_When_CalculationCompletedEven

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);

// Assert
// => Verify expected behaviour by searching the orchestration history
Expand Down Expand Up @@ -202,7 +202,7 @@ await ClearAndAddDatabricksData(

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);

// Assert
// => Verify expected behaviour by searching the orchestration history
Expand Down Expand Up @@ -297,7 +297,7 @@ public async Task Given_DatabricksHasNoData_When_CalculationCompletedEventIsHand

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);

// Assert
// => Verify expected behaviour by searching the orchestration history
Expand Down Expand Up @@ -358,7 +358,7 @@ await ClearAndAddInvalidDatabricksData(

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(wholesaleCalculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(wholesaleCalculationCompletedEventMessage);
var actualWholesaleOrchestrationStatus = await Fixture.DurableClient.WaitForOrchestationStartedAsync(createdTimeFrom: beforeOrchestrationCreated);

// Assert
Expand Down Expand Up @@ -422,7 +422,7 @@ await ClearAndAddInvalidDatabricksData(

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(energyCalculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(energyCalculationCompletedEventMessage);
var actualEnergyOrchestrationStatus = await Fixture.DurableClient.WaitForOrchestationStartedAsync(createdTimeFrom: beforeOrchestrationCreated);

// Assert
Expand Down
Loading

0 comments on commit 431e207

Please sign in to comment.