Skip to content

Commit

Permalink
Use queue for concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
FirestarJes committed Sep 23, 2024
1 parent 62ffd6c commit e24b7cd
Showing 1 changed file with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ namespace Energinet.DataHub.SettlementReport.Orchestration.SettlementReports.Fun

internal sealed class SettlementReportOrchestration
{
private const int MaxQueuedItems = 5;

[Function(nameof(OrchestrateSettlementReport))]
public async Task<string> OrchestrateSettlementReport(
[OrchestrationTrigger] TaskOrchestrationContext context,
Expand Down Expand Up @@ -55,27 +57,39 @@ public async Task<string> OrchestrateSettlementReport(

context.SetCustomStatus(new OrchestrateSettlementReportMetadata { OrchestrationProgress = 10 });

var generatedFiles = new ConcurrentBag<GeneratedSettlementReportFileDto>();
var generatedFiles = new List<GeneratedSettlementReportFileDto>();
var orderedResults = scatterResults
.OrderBy(x => x.PartialFileInfo.FileOffset)
.ThenBy(x => x.PartialFileInfo.ChunkOffset)
.ToList();

await Parallel.ForEachAsync(orderedResults, new ParallelOptions { MaxDegreeOfParallelism = 5 }, async (fileRequest, token) =>
{
var result = await context
.CallActivityAsync<GeneratedSettlementReportFileDto>(
nameof(GenerateSettlementReportFileActivity),
new GenerateSettlementReportFileInput(fileRequest, settlementReportRequest.ActorInfo),
dataSourceExceptionHandler);

generatedFiles.Add(result);
var processingQueue = new ConcurrentQueue<SettlementReportFileRequestDto>(orderedResults);
var tasks = new List<Task<GeneratedSettlementReportFileDto>>();

while (processingQueue.Count > 0)
{
while (tasks.Count < MaxQueuedItems && processingQueue.TryDequeue(out var item))
{
var fileRequestTask = context
.CallActivityAsync<GeneratedSettlementReportFileDto>(
nameof(GenerateSettlementReportFileActivity),
new GenerateSettlementReportFileInput(item, settlementReportRequest.ActorInfo),
dataSourceExceptionHandler);

tasks.Add(fileRequestTask);
}

var completedTask = await Task.WhenAny(tasks);
tasks.Remove(completedTask);
generatedFiles.Add(await completedTask);
context.SetCustomStatus(new OrchestrateSettlementReportMetadata
{
OrchestrationProgress = (80.0 * generatedFiles.Count / orderedResults.Count) + 10,
});
});
tasks.Remove(completedTask);
}

await Task.WhenAll(tasks);

var generatedSettlementReport = await context.CallActivityAsync<GeneratedSettlementReportDto>(
nameof(GatherSettlementReportFilesActivity),
Expand Down

0 comments on commit e24b7cd

Please sign in to comment.