Skip to content

Commit

Permalink
Implemented helper and start job
Browse files Browse the repository at this point in the history
  • Loading branch information
FirestarJes committed Sep 7, 2024
1 parent 6f22de1 commit 768e983
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,10 @@ public Task InitializeAsync(
var settlementReport = new SettlementReport(SystemClock.Instance, userId, actorId, hideReport, requestId, request);
return _repository.AddOrUpdateAsync(settlementReport);
}

public Task InitializeFromJobAsync(Guid userId, Guid actorId, bool hideReport, long jobId, SettlementReportRequestDto request)
{
var settlementReport = new SettlementReport(SystemClock.Instance, userId, actorId, hideReport, new SettlementReportRequestId(jobId.ToString()), request);
return _repository.AddOrUpdateAsync(settlementReport);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Energinet.DataHub.SettlementReport.Infrastructure.Commands;
using Energinet.DataHub.SettlementReport.Interfaces.SettlementReports_v2.Models;

namespace Energinet.DataHub.SettlementReport.Infrastructure.Handlers;

Expand All @@ -22,6 +22,9 @@ public interface IRequestSettlemenReportJobHandler
/// Request a settlement report job
/// </summary>
/// <param name="command"></param>
/// <param name="userId"></param>
/// <param name="actorId"></param>
/// <param name="isFas"></param>
/// <returns>A long value representing the job id of the requested settlement report.</returns>
Task<long> HandleAsync(RequestSettlementReportJobCommand command);
Task<long> HandleAsync(SettlementReportRequestDto command, Guid userId, Guid actorId, bool isFas);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Energinet.DataHub.Core.Databricks.Jobs.Abstractions;
using Energinet.DataHub.SettlementReport.Infrastructure.Commands;
using Energinet.DataHub.SettlementReport.Infrastructure.SqlStatements.Mappers;
using Energinet.DataHub.SettlementReport.Interfaces.Models;
using Energinet.DataHub.SettlementReport.Infrastructure.Helpers;
using Energinet.DataHub.SettlementReport.Interfaces.SettlementReports_v2;
using Energinet.DataHub.SettlementReport.Interfaces.SettlementReports_v2.Models;
using Microsoft.Azure.Databricks.Client.Models;

namespace Energinet.DataHub.SettlementReport.Infrastructure.Handlers;

public sealed class RequestSettlementReportHandler : IRequestSettlemenReportJobHandler
{
private readonly IJobsApiClient _jobsApiClient;
private readonly IDatabricksJobsHelper _jobHelper;
private readonly ISettlementReportInitializeHandler _settlementReportInitializeHandler;

public RequestSettlementReportHandler(IJobsApiClient jobsApiClient)
public RequestSettlementReportHandler(
IDatabricksJobsHelper jobHelper,
ISettlementReportInitializeHandler settlementReportInitializeHandler)
{
_jobsApiClient = jobsApiClient;
_jobHelper = jobHelper;
_settlementReportInitializeHandler = settlementReportInitializeHandler;
}

public async Task<long> HandleAsync(RequestSettlementReportJobCommand command)
public async Task<long> HandleAsync(SettlementReportRequestDto request, Guid userId, Guid actorId, bool isFas)
{
var job = await GetSettlementReportsJobAsync(command.Request.Filter.CalculationType == CalculationType.BalanceFixing
? DatabricksJobNames.BalanceFixing
: DatabricksJobNames.Wholesale)
var jobId = await _jobHelper.RunSettlementReportsJobAsync(request).ConfigureAwait(false);
await _settlementReportInitializeHandler
.InitializeFromJobAsync(
userId,
actorId,
isFas,
jobId,
request)
.ConfigureAwait(false);

var parameters = CreateParameters(command.Request);
return await _jobsApiClient.Jobs.RunNow(job.JobId, parameters).ConfigureAwait(false);
}

private RunParameters CreateParameters(SettlementReportRequestDto request)
{
var gridAreas = string.Join(", ", request.Filter.GridAreas.Select(c => c.Key));

var jobParameters = new List<string>
{
$"--grid-areas=[{gridAreas}]",
$"--period-start-datetime={request.Filter.PeriodStart}",
$"--period-end-datetime={request.Filter.PeriodEnd}",
$"--calculation-type={CalculationTypeMapper.ToDeltaTableValue(request.Filter.CalculationType)}",
};

return RunParameters.CreatePythonParams(jobParameters);
}

private async Task<Job> GetSettlementReportsJobAsync(string jobName)
{
var calculatorJob = await _jobsApiClient.Jobs
.ListPageable(name: jobName)
.SingleAsync()
.ConfigureAwait(false);

return await _jobsApiClient.Jobs.Get(calculatorJob.JobId).ConfigureAwait(false);
return jobId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Energinet.DataHub.Core.Databricks.Jobs.Abstractions;
using Energinet.DataHub.SettlementReport.Infrastructure.Handlers;
using Energinet.DataHub.SettlementReport.Infrastructure.SqlStatements.Mappers;
using Energinet.DataHub.SettlementReport.Interfaces.Models;
using Energinet.DataHub.SettlementReport.Interfaces.SettlementReports_v2.Models;
using Microsoft.Azure.Databricks.Client.Models;

namespace Energinet.DataHub.SettlementReport.Infrastructure.Helpers;

public class DatabricksJobsHelper : IDatabricksJobsHelper
{
private readonly IJobsApiClient _jobsApiClient;

public DatabricksJobsHelper(IJobsApiClient jobsApiClient)
{
_jobsApiClient = jobsApiClient;
}

public async Task<long> RunSettlementReportsJobAsync(SettlementReportRequestDto request)
{
var job = await GetSettlementReportsJobAsync(GetJobName(request.Filter.CalculationType)).ConfigureAwait(false);
return await _jobsApiClient.Jobs.RunNow(job.JobId, CreateParameters(request)).ConfigureAwait(false);
}

private string GetJobName(CalculationType calculationType)
{
return calculationType switch
{
CalculationType.BalanceFixing => DatabricksJobNames.BalanceFixing,
CalculationType.WholesaleFixing => DatabricksJobNames.Wholesale,
CalculationType.FirstCorrectionSettlement => DatabricksJobNames.Wholesale,
CalculationType.SecondCorrectionSettlement => DatabricksJobNames.Wholesale,
CalculationType.ThirdCorrectionSettlement => DatabricksJobNames.Wholesale,
CalculationType.Aggregation => DatabricksJobNames.Wholesale,
_ => throw new ArgumentOutOfRangeException(nameof(calculationType), calculationType, null),
};
}

private async Task<Job> GetSettlementReportsJobAsync(string jobName)
{
var calculatorJob = await _jobsApiClient.Jobs
.ListPageable(name: jobName)
.SingleAsync()
.ConfigureAwait(false);

return await _jobsApiClient.Jobs.Get(calculatorJob.JobId).ConfigureAwait(false);
}

private RunParameters CreateParameters(SettlementReportRequestDto request)
{
var gridAreas = string.Join(", ", request.Filter.GridAreas.Select(c => c.Key));

var jobParameters = new List<string>
{
$"--grid-areas=[{gridAreas}]",
$"--period-start-datetime={request.Filter.PeriodStart}",
$"--period-end-datetime={request.Filter.PeriodEnd}",
$"--calculation-type={CalculationTypeMapper.ToDeltaTableValue(request.Filter.CalculationType)}",
};

return RunParameters.CreatePythonParams(jobParameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

using Energinet.DataHub.SettlementReport.Interfaces.SettlementReports_v2.Models;

namespace Energinet.DataHub.SettlementReport.Infrastructure.Commands;
namespace Energinet.DataHub.SettlementReport.Infrastructure.Helpers;

public record RequestSettlementReportJobCommand(SettlementReportRequestDto Request);
public interface IDatabricksJobsHelper
{
Task<long> RunSettlementReportsJobAsync(SettlementReportRequestDto request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ Task InitializeAsync(
bool hideReport,
SettlementReportRequestId requestId,
SettlementReportRequestDto request);

Task InitializeFromJobAsync(
Guid userId,
Guid actorId,
bool hideReport,
long jobId,
SettlementReportRequestDto request);
}

0 comments on commit 768e983

Please sign in to comment.