Skip to content

Commit

Permalink
Merge branch 'main' into xabpe/Add-Durable-TestCommon
Browse files Browse the repository at this point in the history
  • Loading branch information
dstenroejl authored Dec 9, 2024
2 parents 870ed08 + 05eff44 commit 183dfc0
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
t.StructField("calculation_type", t.StringType(), not nullable),
#
# UTC time
t.StructField("calculation_period_start", t.TimestampType(), False),
t.StructField("calculation_period_start", t.TimestampType(), not nullable),
#
# UTC time. The time is exclusive.
t.StructField("calculation_period_end", t.TimestampType(), False),
t.StructField("calculation_period_end", t.TimestampType(), not nullable),
#
# Number series per calculation type. Starts from number 1.
t.StructField("calculation_version", t.LongType(), False),
t.StructField("calculation_version", t.LongType(), not nullable),
]
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TableColumnNames:
calculation_type = "calculation_type"
calculation_version = "calculation_version"
calculation_version_dh2 = "calculation_version_dh2"
calculation_version_dh3 = "calculation_version_dh3_temp"
calculation_version_dh3 = "calculation_version_dh3"
is_internal_calculation = "is_internal_calculation"
"""True if the calculation is an internal calculation, False otherwise."""
created_by_user_id = "created_by_user_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def write_calculation(
spark,
METADATA_CHANGED_RETRIES,
f"INSERT INTO {infrastructure_settings.catalog_name}.{WholesaleInternalDatabase.DATABASE_NAME}.{WholesaleInternalDatabase.CALCULATIONS_TABLE_NAME}"
f" ({TableColumnNames.calculation_id}, {TableColumnNames.calculation_type}, {TableColumnNames.calculation_period_start}, {TableColumnNames.calculation_period_end}, {TableColumnNames.calculation_execution_time_start}, {TableColumnNames.calculation_succeeded_time}, {TableColumnNames.is_internal_calculation}, {TableColumnNames.calculation_version_dh2}, {TableColumnNames.calculation_version_dh3})"
f" ({TableColumnNames.calculation_id}, {TableColumnNames.calculation_type}, {TableColumnNames.calculation_period_start}, {TableColumnNames.calculation_period_end}, {TableColumnNames.calculation_execution_time_start}, {TableColumnNames.calculation_succeeded_time}, {TableColumnNames.is_internal_calculation}, {TableColumnNames.calculation_version_dh2}, {TableColumnNames.calculation_version})"
f" VALUES ('{args.calculation_id}', '{args.calculation_type.value}', '{calculation_period_start_datetime}', '{calculation_period_end_datetime}', '{calculation_execution_time_start}', NULL, '{args.is_internal_calculation}', NULL, NULL);",
table_targeted_by_query,
)
Expand All @@ -66,7 +66,7 @@ def write_calculation(
# we have to perform a separate update after the insert to finalize the calculation_version.
spark.sql(
f"UPDATE {infrastructure_settings.catalog_name}.{WholesaleInternalDatabase.DATABASE_NAME}.{WholesaleInternalDatabase.CALCULATIONS_TABLE_NAME}"
f" SET {TableColumnNames.calculation_version_dh3} = {TableColumnNames.calculation_version}"
f" SET {TableColumnNames.calculation_version} = {TableColumnNames.calculation_version_dh3}"
f" WHERE {TableColumnNames.calculation_id} = '{args.calculation_id}'"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
-- Reusable migration script for DH2 calculations in Wholesale.
-- It works in three general steps:
-- * 1: Delete all rows from all tables with calculation version = 0 in calculations.
-- * 2: Remove the calculations from our main table
-- * 3: Re-insert the new calculations from DH2 into the main table with version = 0
-- * 3: Re-migrate everything from the DH2 calculations input.
--
-- Currently implemented tables:
-- * wholesale_results_internal
-- * energy_per_brp
-- * energy_per_es
-- * energy
-- * wholesale_internal
-- * calculation_grid_areas
--

-- STEP 0:
-- Manually declare your environment variables.
DECLARE OR REPLACE VARIABLE CATALOG_NAME STRING = "ctl_shres_t_we_001";
DECLARE OR REPLACE VARIABLE WHOLESALE_RESULTS_INTERNAL_DATABASE_NAME STRING = "wholesale_results_internal";
DECLARE OR REPLACE VARIABLE WHOLESALE_INTERNAL_DATABASE_NAME STRING = "wholesale_internal";
DECLARE OR REPLACE VARIABLE SHARED_WHOLESALE_INPUT STRING = "shared_wholesale_input";


-- STEP 1: Delete existing rows across Wholesale's domain
MERGE INTO CATALOG_NAME.WHOLESALE_RESULTS_INTERNAL_DATABASE_NAME.energy e1
USING CATALOG_NAME.WHOLESALE_INTERNAL_DATABASE_NAME.calculations c
ON c.calculation_id <=> e1.calculation_id and c.calculation_version_dh2 is not null
WHEN MATCHED THEN DELETE;

MERGE INTO CATALOG_NAME.WHOLESALE_RESULTS_INTERNAL_DATABASE_NAME.energy_per_brp e2
USING CATALOG_NAME.WHOLESALE_INTERNAL_DATABASE_NAME.calculations c
ON c.calculation_id <=> e2.calculation_id and c.calculation_version_dh2 is not null
WHEN MATCHED THEN DELETE;

MERGE INTO CATALOG_NAME.WHOLESALE_RESULTS_INTERNAL_DATABASE_NAME.energy_per_es e3
USING CATALOG_NAME.WHOLESALE_INTERNAL_DATABASE_NAME.calculations c
ON c.calculation_id <=> e3.calculation_id and c.calculation_version_dh2 is not null
WHEN MATCHED THEN DELETE;

MERGE INTO CATALOG_NAME.WHOLESALE_INTERNAL_DATABASE_NAME.calculation_grid_areas g1
USING CATALOG_NAME.WHOLESALE_INTERNAL_DATABASE_NAME.calculations c
ON c.calculation_id <=> g1.calculation_id and c.calculation_version_dh2 is not null
WHEN MATCHED THEN DELETE;

-- STEP 2: Remove the DH2 calculations from the main table
DELETE FROM CATALOG_NAME.WHOLESALE_INTERNAL_DATABASE_NAME.calculations
WHERE calculation_version_dh2 is not null;

-- STEP 3: Re-migrate each of the tables with calculations from DH2.
-- TODO: Replace "0" with whatever version is given by VOLT later.
INSERT INTO CATALOG_NAME.WHOLESALE_INTERNAL_DATABASE_NAME.calculations
(calculation_id, calculation_type, calculation_period_start, calculation_period_end, calculation_execution_time_start, calculation_succeeded_time, is_internal_calculation, calculation_version_dh2, calculation_version)
SELECT (calculation_id, calculation_type, calculation_period_start, calculation_period_end, calculation_execution_time_start, calculation_succeeded_time, False, 0, 0) FROM CATALOG_NAME.SHARED_WHOLESALE_INPUT.calculations_view_v1;

-- Result ID for the energy-tables should be unique per:
-- calculation_id, grid_area_code, balance_responsible_party_id, energy_supplier_id, time_series_type
-- [energy_supplier_id's last 8 digits]-[grid_area_code]-[BRP's last 4 digits]-[time_series_type abbreviated]-[calculation_id's final 12 characters]
INSERT INTO CATALOG_NAME.WHOLESALE_RESULTS_INTERNAL_DATABASE_NAME.energy
SELECT
calculation_id,
CONCAT(
SUBSTRING(energy_supplier_id, -8), '-',
grid_area_code, '-',
SUBSTRING(balance_responsible_party_id, -4), '-',
CASE WHEN time_series_type = 'non_profiled_consumption' THEN 'nonp'
WHEN time_series_type = 'production' THEN 'prod'
WHEN time_series_type = 'flex_consumption' THEN 'flex'
ELSE SUBSTRING(time_series_type, 1, 4) END,
'-',
SUBSTRING(calculation_id, -12)
) as result_id,
grid_area_code,
time_series_type,
resolution,
time,
quantity,
quantity_qualities
FROM CATALOG_NAME.SHARED_WHOLESALE_INPUT.calculation_results;

INSERT INTO CATALOG_NAME.WHOLESALE_RESULTS_INTERNAL_DATABASE_NAME.energy_per_brp
SELECT
calculation_id,
CONCAT(
SUBSTRING(energy_supplier_id, -8), '-',
grid_area_code, '-',
SUBSTRING(balance_responsible_party_id, -4), '-',
CASE WHEN time_series_type = 'non_profiled_consumption' THEN 'nonp'
WHEN time_series_type = 'production' THEN 'prod'
WHEN time_series_type = 'flex_consumption' THEN 'flex'
ELSE SUBSTRING(time_series_type, 1, 4) END,
'-',
SUBSTRING(calculation_id, -12)
) as result_id,
grid_area_code,
balance_responsible_party,
time_series_type,
resolution,
time,
quantity,
quantity_qualities
FROM CATALOG_NAME.SHARED_WHOLESALE_INPUT.calculation_results_energy_per_brp_view_v1;

INSERT INTO CATALOG_NAME.WHOLESALE_RESULTS_INTERNAL_DATABASE_NAME.energy_per_es
SELECT
calculation_id,
CONCAT(
SUBSTRING(energy_supplier_id, -8), '-',
grid_area_code, '-',
SUBSTRING(balance_responsible_party_id, -4), '-',
CASE WHEN time_series_type = 'non_profiled_consumption' THEN 'nonp'
WHEN time_series_type = 'production' THEN 'prod'
WHEN time_series_type = 'flex_consumption' THEN 'flex'
ELSE SUBSTRING(time_series_type, 1, 4) END,
'-',
SUBSTRING(calculation_id, -12)
) as result_id,
grid_area_code,
energy_supplier_id,
balance_responsible_party,
time_series_type,
resolution,
time,
quantity,
quantity_qualities
FROM CATALOG_NAME.SHARED_WHOLESALE_INPUT.calculation_results_energy_per_es_view_v1;

INSERT INTO CATALOG_NAME.WHOLESALE_INTERNAL_DATABASE_NAME.calculation_grid_areas
SELECT calculation_id, grid_area_code FROM CATALOG_NAME.SHARED_WHOLESALE_INPUT.calculation_grid_areas_view_v1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Swaps the column namings to make the calculation_version the "combined" column rather than just for DH3.
ALTER TABLE {CATALOG_NAME}.{WHOLESALE_INTERNAL_DATABASE_NAME}.calculations
RENAME COLUMN calculation_version TO calculation_version_dh3
GO

ALTER TABLE {CATALOG_NAME}.{WHOLESALE_INTERNAL_DATABASE_NAME}.calculations
RENAME COLUMN calculation_version_dh3_temp TO calculation_version

0 comments on commit 183dfc0

Please sign in to comment.