Skip to content

Commit

Permalink
Revert Skip/Take for time series.
Browse files Browse the repository at this point in the history
  • Loading branch information
defectiveAi committed Sep 19, 2024
1 parent 572a95f commit 31ba38d
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ public async Task WriteAsync(
await using (csvHelper.ConfigureAwait(false))
{
csvHelper.Context.TypeConverterOptionsCache.AddOptions<decimal>(
new TypeConverterOptions
{
Formats = ["0.000"],
});
new TypeConverterOptions { Formats = ["0.000"], });

if (fileInfo is { FileOffset: 0, ChunkOffset: 0 })
{
Expand All @@ -80,47 +77,37 @@ public async Task WriteAsync(
await csvHelper.NextRecordAsync().ConfigureAwait(false);
}

var rowsCount = 0;
var loopCount = 0;
do
await foreach (var record in _dataSource.GetAsync(filter, maximumCalculationVersion, _resolution, fileInfo.ChunkOffset * ChunkSize, ChunkSize).ConfigureAwait(false))
{
rowsCount = 0;
await foreach (var record in _dataSource.GetAsync(filter, maximumCalculationVersion, _resolution, loopCount * ChunkSize, ChunkSize).ConfigureAwait(false))
csvHelper.WriteField(record.MeteringPointId, shouldQuote: true);
csvHelper.WriteField(record.MeteringPointType switch
{
csvHelper.WriteField(record.MeteringPointId, shouldQuote: true);
csvHelper.WriteField(record.MeteringPointType switch
{
MeteringPointType.Consumption => "E17",
MeteringPointType.Production => "E18",
MeteringPointType.Exchange => "E20",
MeteringPointType.VeProduction => "D01",
MeteringPointType.NetProduction => "D05",
MeteringPointType.SupplyToGrid => "D06",
MeteringPointType.ConsumptionFromGrid => "D07",
MeteringPointType.WholesaleServicesInformation => "D08",
MeteringPointType.OwnProduction => "D09",
MeteringPointType.NetFromGrid => "D10",
MeteringPointType.NetToGrid => "D11",
MeteringPointType.TotalConsumption => "D12",
MeteringPointType.ElectricalHeating => "D14",
MeteringPointType.NetConsumption => "D15",
MeteringPointType.EffectSettlement => "D19",
_ => throw new ArgumentOutOfRangeException(nameof(record.MeteringPointType)),
});
csvHelper.WriteField(record.StartDateTime);

for (var i = 0; i < expectedQuantities; ++i)
{
csvHelper.WriteField<decimal?>(record.Quantities.Count > i ? record.Quantities[i].Quantity : null);
}
MeteringPointType.Consumption => "E17",
MeteringPointType.Production => "E18",
MeteringPointType.Exchange => "E20",
MeteringPointType.VeProduction => "D01",
MeteringPointType.NetProduction => "D05",
MeteringPointType.SupplyToGrid => "D06",
MeteringPointType.ConsumptionFromGrid => "D07",
MeteringPointType.WholesaleServicesInformation => "D08",
MeteringPointType.OwnProduction => "D09",
MeteringPointType.NetFromGrid => "D10",
MeteringPointType.NetToGrid => "D11",
MeteringPointType.TotalConsumption => "D12",
MeteringPointType.ElectricalHeating => "D14",
MeteringPointType.NetConsumption => "D15",
MeteringPointType.EffectSettlement => "D19",
_ => throw new ArgumentOutOfRangeException(nameof(record.MeteringPointType)),
});
csvHelper.WriteField(record.StartDateTime);

await csvHelper.NextRecordAsync().ConfigureAwait(false);
rowsCount++;
for (var i = 0; i < expectedQuantities; ++i)
{
csvHelper.WriteField<decimal?>(record.Quantities.Count > i ? record.Quantities[i].Quantity : null);
}

loopCount++;
await csvHelper.NextRecordAsync().ConfigureAwait(false);
}
while (rowsCount >= ChunkSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,18 @@ public SettlementReportMeteringPointTimeSeriesResultRepository(ISettlementReport

public Task<int> CountAsync(SettlementReportRequestFilterDto filter, long maximumCalculationVersion, Resolution resolution)
{
return Task.FromResult(1);
if (filter.CalculationType == CalculationType.BalanceFixing)
{
return CountLatestAsync(filter, maximumCalculationVersion, resolution);
}

var (_, calculationId) = filter.GridAreas.Single();

return ApplyFilter(_settlementReportDatabricksContext.MeteringPointTimeSeriesView, filter, resolution)
.Where(row => row.CalculationId == calculationId!.Id)
.Select(row => row.MeteringPointId)
.Distinct()
.DatabricksSqlCountAsync();
}

public async IAsyncEnumerable<SettlementReportMeteringPointTimeSeriesResultRow> GetAsync(SettlementReportRequestFilterDto filter, long maximumCalculationVersion, Resolution resolution, int skip, int take)
Expand Down Expand Up @@ -153,6 +164,37 @@ into meteringPointGroup
return query.AsAsyncEnumerable();
}

private Task<int> CountLatestAsync(SettlementReportRequestFilterDto filter, long maximumCalculationVersion, Resolution resolution)
{
var view = ApplyFilter(_settlementReportDatabricksContext.MeteringPointTimeSeriesView, filter, resolution);

var dailyCalculationVersion = view
.Where(row => row.CalculationVersion <= maximumCalculationVersion)
.GroupBy(row => DbFunctions.ToStartOfDayInTimeZone(row.Time, "Europe/Copenhagen"))
.Select(group => new
{
start_of_day = group.Key,
max_calc_version = group.Max(row => row.CalculationVersion),
});

var dailyMeteringPoints =
from row in view
join calculationVersion in dailyCalculationVersion on
new { start_of_day = DbFunctions.ToStartOfDayInTimeZone(row.Time, "Europe/Copenhagen"), max_calc_version = row.CalculationVersion }
equals
new { calculationVersion.start_of_day, calculationVersion.max_calc_version }
select new
{
calculationVersion.start_of_day,
row.CalculationId,
row.MeteringPointId,
};

return dailyMeteringPoints
.Distinct()
.DatabricksSqlCountAsync();
}

private static IQueryable<SettlementReportMeteringPointTimeSeriesEntity> ApplyFilter(
IQueryable<SettlementReportMeteringPointTimeSeriesEntity> source,
SettlementReportRequestFilterDto filter,
Expand Down

0 comments on commit 31ba38d

Please sign in to comment.