Unify indexing concurrency checks in Airflow DAGs #2480
Labels
💻 aspect: code
Concerns the software code in the repository
🧰 goal: internal improvement
Improvement that benefits maintainers, not users
🟨 priority: medium
Not blocking but should be addressed soon
🧱 stack: catalog
Related to the catalog and Airflow DAGs
🔧 tech: airflow
Involves Apache Airflow
🐍 tech: python
Involves Python
Problem
See the discussion in #2420 for the full background on this issue.
To briefly summarise the problem: it's important that we correctly prevent certain DAGs from running concurrently. Airflow pools are not sufficient for this (see the discussion above). The approach currently taken, which works, is to query the currently running list of DAGs to see if another relevant DAG is running. Both data refresh and filtered index creation do this. However, this would need to be manually copied to new DAGs that need it (like the index settings DAG etc), and requires manually configuring the list of relevant DAGs. Different DAGs also need different behaviour when other DAGs are running. Data refresh takes priority over every other manually triggered DAG, so it is configured to wait for the other indexing DAGs to finish. The other DAGs, however, should simply exit if they are triggered during a data refresh with an appropriate AirflowSkip error. This is the current behaviour of the filtered index creation and data refresh DAGs, however it is not easily generalisable right now, despite the fact that we are adding new DAGs that must follow similar rules.
Description
@AetherUnbound and I discussed ways to generalise this approach and landed generalising the current approach by using a new tag to assign to relevant DAGs. All DAGs that cause indexing should be tagged with
indexes_{environment}
. The task in the filtered index creation DAG that checks for a running data refresh should instead check for anything with the appropriate tag for the environment (other than itself).Note: The environment distinction is important because we are also adding DAGs that cause indexing in staging for the first time as part of #2358. The existing DAGs only index production, but because existing or future DAGs may be generalised to work in multiple environments, we need a way to distinguish this. A DAG factory function that sets the appropriate tag for the environment would make it easy to share DAGs between environments in the future without causing concurrency issues.
For data refresh, if there are other DAGs running with the tag, then it should wait until those DAGs finish via a sensor that checks the same query. All other DAGs should AirflowSkip with a message about not being able to run concurrently with other indexing DAGs along with a list of the DAGs that were detected as running.
Alternatives
Additional context
The text was updated successfully, but these errors were encountered: