Skip to content

Commit

Permalink
FEAT: Reset ODS tables for new QLIK Snapshot (#23)
Browse files Browse the repository at this point in the history
The original implementation of the ODS-QLIK RDS loader assumed that new snapshots would not be created. An assert check made sure that this condition stopped any loading operation if a new snapshot was detected.

This change loads new QLIK snapshots by totally wiping out the existing history and fact tables and loading the newest snapshot that is found.

With this implementation all transaction history will be lost any time cubic restarts the QLIK replication process.
  • Loading branch information
rymarczy authored Sep 17, 2024
1 parent a8ad5b6 commit d1c4015
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
26 changes: 23 additions & 3 deletions src/cubic_loader/qlik/ods_qlik.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cubic_loader.utils.aws import s3_object_exists
from cubic_loader.utils.aws import s3_split_object_path
from cubic_loader.utils.aws import s3_upload_file
from cubic_loader.utils.aws import s3_delete_object
from cubic_loader.utils.remote_locations import S3_ARCHIVE
from cubic_loader.utils.remote_locations import S3_ERROR
from cubic_loader.utils.remote_locations import QLIK
Expand Down Expand Up @@ -380,6 +381,19 @@ def rds_fact_table_load(self) -> None:

logger.log_complete()

def snapshot_reset(self) -> None:
"""
Reset database tables and status files for new snapshot
Those will lose all history from history table
"""

s3_delete_object(self.status_path)
self.etl_status = self.load_etl_status()

self.db.execute(drop_table(self.db_history_table))
self.db.execute(drop_table(self.db_fact_table))

def run_etl(self) -> None:
"""
Run table ETL Process
Expand All @@ -398,9 +412,15 @@ def run_etl(self) -> None:
last_cdc_ts=self.etl_status.last_cdc_ts,
)
try:
assert (
self.etl_status.current_snapshot_ts == self.last_s3_snapshot_dfm.ts
), f"Expected LOAD SNAPSHOT has changed from {self.etl_status.current_snapshot_ts} to {self.last_s3_snapshot_dfm.ts}"
if self.etl_status.current_snapshot_ts != self.last_s3_snapshot_dfm.ts:
new_snapshot_logger = ProcessLogger(
"ods_snapshot_change",
table=self.table,
old_shapshot=self.etl_status.current_snapshot_ts,
new_shapshot=self.last_s3_snapshot_dfm.ts,
)
self.snapshot_reset()
new_snapshot_logger.log_complete()

# create tables and history table partitions
# will be no-op if tables already exist
Expand Down
2 changes: 1 addition & 1 deletion src/cubic_loader/qlik/rds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def drop_table(table_name: str) -> str:
"""
DROP table from RDS
"""
return f"DROP TABLE IF EXISTS {ODS_SCHEMA}.{table_name};"
return f"DROP TABLE IF EXISTS {ODS_SCHEMA}.{table_name} CASCADE;"


def add_columns_to_table(new_columns: List[DFMSchemaFields], fact_table: str) -> str:
Expand Down

0 comments on commit d1c4015

Please sign in to comment.