From 997c0bafeacf3e88b3c6dbaf7ae031a9edba6f66 Mon Sep 17 00:00:00 2001 From: Staci Mullins <63313398+stacimc@users.noreply.github.com> Date: Thu, 10 Oct 2024 10:23:04 -0700 Subject: [PATCH] Add tests for data refresh task dependencies (#5027) --- .../data_refresh/test_task_dependencies.py | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 catalog/tests/dags/data_refresh/test_task_dependencies.py diff --git a/catalog/tests/dags/data_refresh/test_task_dependencies.py b/catalog/tests/dags/data_refresh/test_task_dependencies.py new file mode 100644 index 00000000000..950fc51b913 --- /dev/null +++ b/catalog/tests/dags/data_refresh/test_task_dependencies.py @@ -0,0 +1,97 @@ +""" +Tests vital dependencies for the data refresh: for example, ensuring that the indices have been +created and refreshed before index promotion runs. These dependencies can be broken in +unexpected ways so it is important to verify that the most critical ones are preserved. +""" + +import logging + +from airflow.models import DagBag + + +logger = logging.getLogger(__name__) + + +def test_staging_schedule(): + dagbag = DagBag() + + # Assert that all of the staging data refresh DAGs + # have a schedule of None + for dag in [ + dag for dag in dagbag.dags.values() if "staging_data_refresh" in dag.tags + ]: + assert dag.schedule_interval is None, ( + "Staging data refresh DAGs must be on a `None` schedule" + " or risk breaking the load_sample_data script." + ) + + +def _assert_dependencies(downstream_task_id: str, upstream_task_ids: list[str]): + dag = DagBag().get_dag("staging_image_data_refresh") + + downstream_task = dag.get_task(downstream_task_id) + # Get all upstream tasks for this task, not just the direct upstream tasks + upstream_tasks = downstream_task.get_flat_relative_ids(upstream=True) + + for upstream_task_id in upstream_task_ids: + assert upstream_task_id in upstream_tasks + + +def test_dependencies_alter_data_task(): + _assert_dependencies( + downstream_task_id="alter_table_data.alter_data_batch", + upstream_task_ids=["copy_upstream_tables.copy_upstream_table.copy_data"], + ) + + +def test_dependencies_reindex_task(): + _assert_dependencies( + downstream_task_id="run_distributed_reindex.reindex.trigger_reindexing_task", + upstream_task_ids=[ + "copy_upstream_tables.copy_upstream_table.copy_data", + "alter_table_data.alter_data_batch", + "create_index", + ], + ) + + +def test_dependencies_create_and_populate_filtered_index_task(): + _assert_dependencies( + downstream_task_id="create_and_populate_filtered_index.trigger_and_wait_for_reindex.trigger_reindex", + upstream_task_ids=[ + "run_distributed_reindex.reindex.assert_reindexing_success", + "run_distributed_reindex.refresh_index", + ], + ) + + +def test_dependencies_promote_table_task(): + _assert_dependencies( + downstream_task_id="promote_tables.promote_table.promote", + upstream_task_ids=[ + "run_distributed_reindex.reindex.assert_reindexing_success", + "create_and_populate_filtered_index.refresh_index", + "promote_tables.promote_table.remap_table_indices_to_table.create_table_indices", + "promote_tables.promote_table.remap_table_constraints_to_table.apply_constraints_to_table", + ], + ) + + +def test_dependenceis_promote_index_task(): + _assert_dependencies( + downstream_task_id="promote_index.point_new_alias", + upstream_task_ids=[ + "run_distributed_reindex.refresh_index", + "promote_tables.promote_table.promote", + ], + ) + + +def test_dependenceis_promote_filtered_index_task(): + _assert_dependencies( + downstream_task_id="promote_filtered_index.point_new_alias", + upstream_task_ids=[ + "create_and_populate_filtered_index.refresh_index", + "promote_tables.promote_table.promote", + ], + )