Skip to content

Commit

Permalink
Move process manager call to incoming messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ebbeknudsen committed Dec 11, 2024
1 parent 165a6b4 commit 77e82f2
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 108 deletions.
21 changes: 18 additions & 3 deletions source/B2BApi.AppTests/Fixtures/B2BApiAppFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
using Energinet.DataHub.EDI.IntegrationTests.AuditLog.Fixture;
using Energinet.DataHub.EDI.OutgoingMessages.Infrastructure.Extensions.Options;
using Energinet.DataHub.EDI.Process.Infrastructure.Configuration.Options;
using Energinet.DataHub.ProcessManager.Client.Extensions.Options;
using Energinet.DataHub.RevisionLog.Integration.Options;
using Energinet.DataHub.Wholesale.Common.Infrastructure.Options;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
Expand Down Expand Up @@ -137,7 +138,10 @@ public B2BApiAppFixture()
/// Topic resource for integration events.
/// </summary>
[NotNull]
public TopicResource? TopicResource { get; private set; }
public TopicResource? IntegrationEventsTopicResource { get; private set; }

[NotNull]
public TopicResource? ProcessManagerTopicResource { get; private set; }

public ServiceBusListenerMock ServiceBusListenerMock { get; }

Expand Down Expand Up @@ -179,15 +183,26 @@ public async Task InitializeAsync()
LogStopwatch(stopwatch, nameof(CreateAppHostSettings));

// ServiceBus entities
TopicResource = await ServiceBusResourceProvider
IntegrationEventsTopicResource = await ServiceBusResourceProvider
.BuildTopic("integration-events")
.Do(topic => appHostSettings.ProcessEnvironmentVariables
.Add($"{IntegrationEventsOptions.SectionName}__{nameof(IntegrationEventsOptions.TopicName)}", topic.Name))
.AddSubscription("subscription")
.Do(subscription => appHostSettings.ProcessEnvironmentVariables
.Add($"{IntegrationEventsOptions.SectionName}__{nameof(IntegrationEventsOptions.SubscriptionName)}", subscription.SubscriptionName))
.CreateAsync();
LogStopwatch(stopwatch, nameof(TopicResource));
LogStopwatch(stopwatch, nameof(IntegrationEventsTopicResource));

var processManagerTopic = await ServiceBusResourceProvider
.BuildTopic("process-manager")
.Do(topic => appHostSettings.ProcessEnvironmentVariables
.Add($"{ProcessManagerServiceBusClientOptions.SectionName}__{nameof(ProcessManagerServiceBusClientOptions.TopicName)}", topic.Name))
.AddSubscription("process-manager-subscription")
.CreateAsync();
LogStopwatch(stopwatch, nameof(processManagerTopic));
await ServiceBusListenerMock.AddTopicSubscriptionListenerAsync(
topicName: processManagerTopic.Name,
subscriptionName: processManagerTopic.Subscriptions.Single().SubscriptionName);

await ServiceBusResourceProvider
.BuildQueue("edi-inbox")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public static class HttpRequestExtensions
{
public static Task<HttpRequestMessage> CreateRequestWholesaleServicesHttpRequestAsync(
this B2BApiAppFixture fixture,
Actor actor)
Actor actor,
string? transactionId = null)
{
var documentPath = actor.ActorRole.Name switch
{
Expand Down Expand Up @@ -70,7 +71,8 @@ private static async Task<HttpRequestMessage> CreateIncomingMessageHttpRequestAs
string filePath,
string documentType,
string contentType,
Actor actor)
Actor actor,
string? transactionId = null)
{
HttpRequestMessage? request = null;
try
Expand All @@ -80,7 +82,7 @@ private static async Task<HttpRequestMessage> CreateIncomingMessageHttpRequestAs
.Replace("{ActorNumber}", actor.ActorNumber.Value)
.Replace("{ActorRole}", actor.ActorRole.Code)
.Replace("{MessageId}", Guid.NewGuid().ToString())
.Replace("{TransactionId}", Guid.NewGuid().ToString());
.Replace("{TransactionId}", transactionId ?? Guid.NewGuid().ToString());

request = await CreateHttpRequestAsync(
fixture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public async Task Given_CalculationOrchestrationId_When_CalculationCompletedEven

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);

// Assert
// => Verify expected behaviour by searching the orchestration history
Expand Down Expand Up @@ -202,7 +202,7 @@ await ClearAndAddDatabricksData(

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);

// Assert
// => Verify expected behaviour by searching the orchestration history
Expand Down Expand Up @@ -297,7 +297,7 @@ public async Task Given_DatabricksHasNoData_When_CalculationCompletedEventIsHand

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(calculationCompletedEventMessage);

// Assert
// => Verify expected behaviour by searching the orchestration history
Expand Down Expand Up @@ -358,7 +358,7 @@ await ClearAndAddInvalidDatabricksData(

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(wholesaleCalculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(wholesaleCalculationCompletedEventMessage);
var actualWholesaleOrchestrationStatus = await Fixture.DurableClient.WaitForOrchestationStartedAsync(createdTimeFrom: beforeOrchestrationCreated);

// Assert
Expand Down Expand Up @@ -422,7 +422,7 @@ await ClearAndAddInvalidDatabricksData(

// Act
var beforeOrchestrationCreated = DateTime.UtcNow;
await Fixture.TopicResource.SenderClient.SendMessageAsync(energyCalculationCompletedEventMessage);
await Fixture.IntegrationEventsTopicResource.SenderClient.SendMessageAsync(energyCalculationCompletedEventMessage);
var actualEnergyOrchestrationStatus = await Fixture.DurableClient.WaitForOrchestationStartedAsync(createdTimeFrom: beforeOrchestrationCreated);

// Assert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Net;
using Energinet.DataHub.Core.DurableFunctionApp.TestCommon.DurableTask;
using Energinet.DataHub.Core.FunctionApp.TestCommon.Databricks;
using Energinet.DataHub.Core.FunctionApp.TestCommon.ServiceBus.ListenerMock;
using Energinet.DataHub.EDI.B2BApi.AppTests.Fixtures;
using Energinet.DataHub.EDI.B2BApi.AppTests.Fixtures.Extensions;
using Energinet.DataHub.EDI.B2BApi.Functions.RequestWholesaleServices;
Expand Down Expand Up @@ -51,6 +52,7 @@ public RequestWholesaleServicesOrchestrationTests(
public async Task InitializeAsync()
{
Fixture.AppHostManager.ClearHostLog();
Fixture.ServiceBusListenerMock.ResetMessageHandlersAndReceivedMessages();
await Task.CompletedTask;
}

Expand All @@ -68,7 +70,7 @@ public async Task DisposeAsync()
/// - The peeked messages all have the correct document type.
/// </summary>
[Fact]
public async Task Given_RequestWholesaleServices_When_RequestWholesaleServicesOrchestrationIsCompleted_Then_EnqueuedMessagesCanBePeeked()
public async Task Given_RequestWholesaleServices_When_RequestIsReceived_Then_ServiceBusMessageIsSentToProcessManagerTopic()
{
// Arrange
EnableRequestWholesaleServicesOrchestrationFeature();
Expand All @@ -77,46 +79,28 @@ public async Task Given_RequestWholesaleServices_When_RequestWholesaleServicesOr
ActorNumber.Create("5790000701278"),
ActorRole.EnergySupplier);

var amountPerChargeDescription = new WholesaleResultForAmountPerChargeDescription();
await ClearAndAddDatabricksData(amountPerChargeDescription);
var transactionId = Guid.NewGuid().ToString();

// Test steps:
// => HTTP POST: RequestWholesaleServices
var beforeOrchestrationCreated = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromSeconds(30));
using var httpRequest = await Fixture.CreateRequestWholesaleServicesHttpRequestAsync(energySupplier);
using var httpRequest = await Fixture.CreateRequestWholesaleServicesHttpRequestAsync(energySupplier, transactionId);
using var httpResponse = await Fixture.AppHostManager.HttpClient.SendAsync(httpRequest);
await httpResponse.EnsureSuccessStatusCodeWithLogAsync(Fixture.TestLogger);

// => Wait for orchestration to start
var startedOrchestrationStatus = await Fixture.DurableClient.WaitForOrchestationStartedAsync(
createdTimeFrom: beforeOrchestrationCreated.ToDateTimeUtc(),
name: nameof(RequestWholesaleServicesOrchestration));
startedOrchestrationStatus.Should().NotBeNull();

// => Wait for orchestration to complete
var completedOrchestrationStatus = await Fixture.DurableClient.WaitForOrchestrationCompletedAsync(
startedOrchestrationStatus.InstanceId,
TimeSpan.FromMinutes(5));
completedOrchestrationStatus.Should().NotBeNull();

// Assert activities
// => Assert enqueued messages in orchestrator
using (new AssertionScope())
{
completedOrchestrationStatus.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);
completedOrchestrationStatus.Output.ToString()
.Should().Contain("AcceptedMessagesCount=33")
.And.Contain("RejectedMessagesCount=0");
}
// => Assert service bus message is sent to Process Manager topic
var verifyServiceBusMessage = await Fixture.ServiceBusListenerMock
.When(
msg =>
{
var messageIdMatch = msg.MessageId == transactionId;
var subjectMatch = msg.Subject == "Brs_028";

// => HTTP GET: Peek messages
var peekedDocuments = await PeekAllMessages(energySupplier);
return messageIdMatch && subjectMatch;
})
.VerifyOnceAsync();

using var assertionScope = new AssertionScope();
peekedDocuments.Should()
.HaveCount(33)
.And.AllSatisfy(
peekedDocument => peekedDocument.Should().Contain("NotifyWholesaleServices_MarketDocument"));
var messageReceived = verifyServiceBusMessage.Wait(TimeSpan.FromSeconds(30));
messageReceived.Should().BeTrue("because a Brs_028 message should be sent to the Process Manager topic");
}

private async Task<IReadOnlyCollection<string>> PeekAllMessages(Actor actor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure;
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure.Configuration.DataAccess;
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure.Configuration.Options;
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure.ProcessManager;
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure.Repositories.MessageId;
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure.Repositories.TransactionId;
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure.Response;
using Energinet.DataHub.EDI.IncomingMessages.Interfaces;
using Energinet.DataHub.ProcessManager.Client.Extensions.DependencyInjection;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -138,6 +140,12 @@ public static IServiceCollection AddIncomingMessagesModule(this IServiceCollecti
services.AddTransient<IMessageParser, AggregatedMeasureDataJsonMessageParser>();
services.AddTransient<IMessageParser, AggregatedMeasureDataB2CJsonMessageParser>();

/*
* Process Manager
*/
services.AddTransient<IRequestProcessOrchestrationStarter, RequestProcessOrchestrationStarter>();
services.AddProcessManagerMessageClient();

return services;
}
}
60 changes: 45 additions & 15 deletions source/IncomingMessages.Infrastructure/IncomingMessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

using Azure.Messaging.ServiceBus;
using Energinet.DataHub.EDI.BuildingBlocks.Domain.Authentication;
using Energinet.DataHub.EDI.BuildingBlocks.Infrastructure.FeatureFlag;
using Energinet.DataHub.EDI.BuildingBlocks.Interfaces;
using Energinet.DataHub.EDI.IncomingMessages.Domain;
using Energinet.DataHub.EDI.IncomingMessages.Domain.Abstractions;
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure.Configuration.Options;
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure.Factories;
using Energinet.DataHub.EDI.IncomingMessages.Infrastructure.ProcessManager;
using Energinet.DataHub.EDI.Process.Interfaces;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Options;
Expand All @@ -29,18 +31,24 @@ public class IncomingMessagePublisher
{
private readonly AuthenticatedActor _authenticatedActor;
private readonly ISerializer _serializer;
private readonly IFeatureFlagManager _featureFlagManager;
private readonly IRequestProcessOrchestrationStarter _requestProcessOrchestrationStarter;
private readonly ServiceBusSender _sender;

public IncomingMessagePublisher(
AuthenticatedActor authenticatedActor,
IOptions<IncomingMessagesQueueOptions> options,
IAzureClientFactory<ServiceBusSender> senderFactory,
ISerializer serializer)
ISerializer serializer,
IFeatureFlagManager featureFlagManager,
IRequestProcessOrchestrationStarter requestProcessOrchestrationStarter)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(senderFactory);
_authenticatedActor = authenticatedActor;
_serializer = serializer;
_featureFlagManager = featureFlagManager;
_requestProcessOrchestrationStarter = requestProcessOrchestrationStarter;

_sender = senderFactory.CreateClient(options.Value.QueueName);
}
Expand Down Expand Up @@ -70,28 +78,50 @@ private async Task SendInitializeAggregatedMeasureDataProcessAsync(InitializeAgg
{
ArgumentNullException.ThrowIfNull(initializeAggregatedMeasureDataProcessDto);

var serviceBusMessage =
new ServiceBusMessage(
new BinaryData(_serializer.Serialize(initializeAggregatedMeasureDataProcessDto)))
{
Subject = nameof(InitializeAggregatedMeasureDataProcessDto),
};
if (await _featureFlagManager.UseRequestAggregatedMeasureDataProcessOrchestrationAsync()
.ConfigureAwait(false))
{
await _requestProcessOrchestrationStarter.StartRequestAggregatedMeasureDataOrchestrationAsync(
initializeAggregatedMeasureDataProcessDto,
cancellationToken)
.ConfigureAwait(false);
}
else
{
var serviceBusMessage =
new ServiceBusMessage(
new BinaryData(_serializer.Serialize(initializeAggregatedMeasureDataProcessDto)))
{
Subject = nameof(InitializeAggregatedMeasureDataProcessDto),
};

await _sender.SendMessageAsync(serviceBusMessage, cancellationToken).ConfigureAwait(false);
await _sender.SendMessageAsync(serviceBusMessage, cancellationToken).ConfigureAwait(false);
}
}

private async Task SendInitializeWholesaleServicesProcessAsync(InitializeWholesaleServicesProcessDto initializeWholesaleServicesProcessDto, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(initializeWholesaleServicesProcessDto);

var serviceBusMessage =
new ServiceBusMessage(
_serializer.Serialize(initializeWholesaleServicesProcessDto))
{
Subject = nameof(InitializeWholesaleServicesProcessDto),
};
if (await _featureFlagManager.UseRequestWholesaleServicesProcessOrchestrationAsync()
.ConfigureAwait(false))
{
await _requestProcessOrchestrationStarter.StartRequestWholesaleServicesOrchestrationAsync(
initializeWholesaleServicesProcessDto,
cancellationToken)
.ConfigureAwait(false);
}
else
{
var serviceBusMessage =
new ServiceBusMessage(
_serializer.Serialize(initializeWholesaleServicesProcessDto))
{
Subject = nameof(InitializeWholesaleServicesProcessDto),
};

await _sender.SendMessageAsync(serviceBusMessage, cancellationToken).ConfigureAwait(false);
await _sender.SendMessageAsync(serviceBusMessage, cancellationToken).ConfigureAwait(false);
}
}

private async Task SendInitializeMeteredDataForMeasurementPointMessageProcessAsync(InitializeMeteredDataForMeasurementPointMessageProcessDto initializeMeteredDataForMeasurementPointMessageProcessDto, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Energinet.DataHub.ProcessManager.Client" Version="0.14.1" />
<PackageReference Include="Energinet.DataHub.ProcessManager.Orchestrations.Abstractions" Version="0.2.0" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.9.0" />
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="8.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="17.12.19">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

using Energinet.DataHub.EDI.Process.Interfaces;

namespace Energinet.DataHub.EDI.Process.Domain.Transactions;
namespace Energinet.DataHub.EDI.IncomingMessages.Infrastructure.ProcessManager;

public interface IRequestProcessOrchestrationStarter
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
// limitations under the License.

using Energinet.DataHub.EDI.BuildingBlocks.Domain.Authentication;
using Energinet.DataHub.EDI.Process.Domain.Transactions;
using Energinet.DataHub.EDI.Process.Interfaces;
using Energinet.DataHub.ProcessManager.Abstractions.Api.Model.OrchestrationInstance;
using Energinet.DataHub.ProcessManager.Client;
using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_026.V1.Model;
using Energinet.DataHub.ProcessManager.Orchestrations.Abstractions.Processes.BRS_028.V1.Model;

namespace Energinet.DataHub.EDI.Process.Infrastructure.Transactions;
namespace Energinet.DataHub.EDI.IncomingMessages.Infrastructure.ProcessManager;

public class RequestProcessOrchestrationStarter(
IProcessManagerMessageClient processManagerMessageClient,
Expand Down
Loading

0 comments on commit 77e82f2

Please sign in to comment.