Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (Step 1) Add new calculation version cols to calculations #3061

Merged
merged 7 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class TableColumnNames:
calculation_period_end = "calculation_period_end"
calculation_type = "calculation_type"
calculation_version = "calculation_version"
calculation_version_dh2 = "calculation_version_dh2"
calculation_version_dh3 = "calculation_version_dh3_temp"
is_internal_calculation = "is_internal_calculation"
"""True if the calculation is an internal calculation, False otherwise."""
created_by_user_id = "created_by_user_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from dependency_injector.wiring import inject, Provide
from pyspark.sql import DataFrame, SparkSession
from delta.exceptions import MetadataChangedException
Expand Down Expand Up @@ -48,23 +49,26 @@ def write_calculation(
calculation_execution_time_start = args.calculation_execution_time_start.strftime(
timestamp_format
)[:-3]

# We had to use sql statement to insert the data because the DataFrame.write.insertInto() method does not support IDENTITY columns
# Also, since IDENTITY COLUMN requires an exclusive lock on the table, we allow up to METADATA_CHANGED_RETRIES retries of the transaction.
for attempt in range(METADATA_CHANGED_RETRIES):
try:
spark.sql(
f"INSERT INTO {infrastructure_settings.catalog_name}.{WholesaleInternalDatabase.DATABASE_NAME}.{WholesaleInternalDatabase.CALCULATIONS_TABLE_NAME}"
f" ({TableColumnNames.calculation_id}, {TableColumnNames.calculation_type}, {TableColumnNames.calculation_period_start}, {TableColumnNames.calculation_period_end}, {TableColumnNames.calculation_execution_time_start}, {TableColumnNames.calculation_succeeded_time}, {TableColumnNames.is_internal_calculation})"
f" VALUES ('{args.calculation_id}', '{args.calculation_type.value}', '{calculation_period_start_datetime}', '{calculation_period_end_datetime}', '{calculation_execution_time_start}', NULL, '{args.is_internal_calculation}');"
)
break
except MetadataChangedException as e:
if attempt == METADATA_CHANGED_RETRIES:
raise e
else:
spark.catalog.uncacheTable(
f"{infrastructure_settings.catalog_name}.{WholesaleInternalDatabase.DATABASE_NAME}.{WholesaleInternalDatabase.CALCULATIONS_TABLE_NAME}"
)
table_targeted_by_query = f"{infrastructure_settings.catalog_name}.{WholesaleInternalDatabase.DATABASE_NAME}.{WholesaleInternalDatabase.CALCULATIONS_TABLE_NAME}"
execute_spark_sql_in_retry_loop(
spark,
METADATA_CHANGED_RETRIES,
f"INSERT INTO {infrastructure_settings.catalog_name}.{WholesaleInternalDatabase.DATABASE_NAME}.{WholesaleInternalDatabase.CALCULATIONS_TABLE_NAME}"
f" ({TableColumnNames.calculation_id}, {TableColumnNames.calculation_type}, {TableColumnNames.calculation_period_start}, {TableColumnNames.calculation_period_end}, {TableColumnNames.calculation_execution_time_start}, {TableColumnNames.calculation_succeeded_time}, {TableColumnNames.is_internal_calculation}, {TableColumnNames.calculation_version_dh2}, {TableColumnNames.calculation_version_dh3})"
f" VALUES ('{args.calculation_id}', '{args.calculation_type.value}', '{calculation_period_start_datetime}', '{calculation_period_end_datetime}', '{calculation_execution_time_start}', NULL, '{args.is_internal_calculation}', NULL, NULL);",
table_targeted_by_query,
)

# And since the combination with DH2 calculations requires the identity column to decide the calculation_version,
# we have to perform a separate update after the insert to finalize the calculation_version.
spark.sql(
f"UPDATE {infrastructure_settings.catalog_name}.{WholesaleInternalDatabase.DATABASE_NAME}.{WholesaleInternalDatabase.CALCULATIONS_TABLE_NAME}"
f" SET {TableColumnNames.calculation_version_dh3} = {TableColumnNames.calculation_version}"
f" WHERE {TableColumnNames.calculation_id} = '{args.calculation_id}'"
)


@use_span("calculation.write-calculation-grid-areas")
Expand Down Expand Up @@ -100,3 +104,20 @@ def write_calculation_succeeded_time(
WHERE {TableColumnNames.calculation_id} = '{calculation_id}'
"""
)


def execute_spark_sql_in_retry_loop(
spark: SparkSession,
num_retries: int,
query: str,
table_to_uncache_on_failure: Optional[str],
) -> None:
for attempt in range(num_retries):
try:
spark.sql(query)
break
except MetadataChangedException as e:
if attempt == METADATA_CHANGED_RETRIES:
raise e
elif table_to_uncache_on_failure is not None:
spark.catalog.uncacheTable(table_to_uncache_on_failure)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Adds new columns for DH2 calculation versions to be filled out my a manual migration script.
-- Requires a follow-up script too.
ALTER TABLE {CATALOG_NAME}.{WHOLESALE_INTERNAL_DATABASE_NAME}.calculations SET TBLPROPERTIES (
'delta.columnMapping.mode' = 'name'
)
GO

ALTER TABLE {CATALOG_NAME}.{WHOLESALE_INTERNAL_DATABASE_NAME}.calculations
ADD COLUMNS (calculation_version_dh2 BIGINT, calculation_version_dh3_temp BIGINT)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def get_substitutions(catalog_name: str, is_testing: bool) -> dict[str, str]:
"{WHOLESALE_RESULTS_DATABASE_NAME}": paths.WholesaleResultsDatabase.DATABASE_NAME,
"{WHOLESALE_SETTLEMENT_REPORTS_DATABASE_NAME}": paths.WholesaleSettlementReportsDatabase.DATABASE_NAME,
"{WHOLESALE_SAP_DATABASE_NAME}": paths.WholesaleSapDatabase.DATABASE_NAME,
"{SHARED_WHOLESALE_INPUT}": paths.MigrationsWholesaleDatabase.DATABASE_NAME,
# Flags
"{DATABRICKS-ONLY}": (
"--" if is_testing else ""
Expand Down
Loading