From 61a8dbfc45dc46dd54bbbf216aa6a2ec79cc7bb3 Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Tue, 22 Aug 2023 10:51:20 -0700 Subject: [PATCH] Bind engine before attempting to drop archive tables --- airflow/utils/db_cleanup.py | 1 + tests/utils/test_db_cleanup.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index 79a804a9e501..4c82f90ab381 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -194,6 +194,7 @@ def _do_delete(*, query, orm_model, skip_archive, session): session.execute(delete) session.commit() if skip_archive: + metadata.bind = session.get_bind() target_table.drop() session.commit() print("Finished Performing Delete") diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index 80419cb9437a..7140bcb35fa7 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -41,6 +41,7 @@ _cleanup_table, _confirm_drop_archives, _dump_table_to_file, + _get_archived_table_names, config_dict, drop_archived_tables, export_archived_records, @@ -266,6 +267,34 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, e else: raise Exception("unexpected") + @pytest.mark.parametrize( + "skip_archive, expected_archives", + [param(True, 0, id="skip_archive"), param(False, 1, id="do_archive")], + ) + def test__skip_archive(self, skip_archive, expected_archives): + """ + Verify that running cleanup_table with drops the archives when requested. + """ + base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone("UTC")) + num_tis = 10 + create_tis( + base_date=base_date, + num_tis=num_tis, + ) + with create_session() as session: + clean_before_date = base_date.add(days=5) + _cleanup_table( + **config_dict["dag_run"].__dict__, + clean_before_timestamp=clean_before_date, + dry_run=False, + session=session, + table_names=["dag_run"], + skip_archive=skip_archive, + ) + model = config_dict["dag_run"].orm_model + assert len(session.query(model).all()) == 5 + assert len(_get_archived_table_names(["dag_run"], session)) == expected_archives + def test_no_models_missing(self): """ 1. Verify that for all tables in `airflow.models`, we either have them enabled in db cleanup,