diff --git a/src/cubic_loader/qlik/ods_qlik.py b/src/cubic_loader/qlik/ods_qlik.py index 5e815d4..4633368 100644 --- a/src/cubic_loader/qlik/ods_qlik.py +++ b/src/cubic_loader/qlik/ods_qlik.py @@ -15,6 +15,7 @@ from cubic_loader.utils.aws import s3_object_exists from cubic_loader.utils.aws import s3_split_object_path from cubic_loader.utils.aws import s3_upload_file +from cubic_loader.utils.aws import s3_delete_object from cubic_loader.utils.remote_locations import S3_ARCHIVE from cubic_loader.utils.remote_locations import S3_ERROR from cubic_loader.utils.remote_locations import QLIK @@ -380,6 +381,19 @@ def rds_fact_table_load(self) -> None: logger.log_complete() + def snapshot_reset(self) -> None: + """ + Reset database tables and status files for new snapshot + + Those will lose all history from history table + """ + + s3_delete_object(self.status_path) + self.etl_status = self.load_etl_status() + + self.db.execute(drop_table(self.db_history_table)) + self.db.execute(drop_table(self.db_fact_table)) + def run_etl(self) -> None: """ Run table ETL Process @@ -398,9 +412,15 @@ def run_etl(self) -> None: last_cdc_ts=self.etl_status.last_cdc_ts, ) try: - assert ( - self.etl_status.current_snapshot_ts == self.last_s3_snapshot_dfm.ts - ), f"Expected LOAD SNAPSHOT has changed from {self.etl_status.current_snapshot_ts} to {self.last_s3_snapshot_dfm.ts}" + if self.etl_status.current_snapshot_ts != self.last_s3_snapshot_dfm.ts: + new_snapshot_logger = ProcessLogger( + "ods_snapshot_change", + table=self.table, + old_shapshot=self.etl_status.current_snapshot_ts, + new_shapshot=self.last_s3_snapshot_dfm.ts, + ) + self.snapshot_reset() + new_snapshot_logger.log_complete() # create tables and history table partitions # will be no-op if tables already exist diff --git a/src/cubic_loader/qlik/rds_utils.py b/src/cubic_loader/qlik/rds_utils.py index 01d882b..43cc2ec 100644 --- a/src/cubic_loader/qlik/rds_utils.py +++ b/src/cubic_loader/qlik/rds_utils.py @@ -143,7 +143,7 @@ def drop_table(table_name: str) -> str: """ DROP table from RDS """ - return f"DROP TABLE IF EXISTS {ODS_SCHEMA}.{table_name};" + return f"DROP TABLE IF EXISTS {ODS_SCHEMA}.{table_name} CASCADE;" def add_columns_to_table(new_columns: List[DFMSchemaFields], fact_table: str) -> str: