Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add elasticsearch concurrency tags for Airflow #3910

Closed
wants to merge 16 commits into from

Conversation

stacimc
Copy link
Collaborator

@stacimc stacimc commented Mar 13, 2024

Fixes

Fixes #3891 by @stacimc

Blocked, this needs to be rebased on #3890 when merged (missing latest changes from that PR)
TODO should also add an exception for relationship between staging_db_restore and the proportional index DAG.

Description

We now have quite a few DAGs that all operate on Elasticsearch or the underlying databases, and therefore must not run concurrently. We've been using the prevent_concurrency_with_dags and wait_for_external_dags utils to accomplish this in each DAG, by passing in a hard-coded list of the external DAGs we want to either wait on or otherwise prevent concurrency with. Since this list is hard-coded for each DAG, it is extremely easy to miss dependencies especially as DAGs are added and changed. A few were missed in the implementation of the Search Relevancy project so far.

This PR introduces two new Tags that are used to identify DAGs as being part of a "concurrency group": production_es_concurrency and staging_es_concurrency. Each DAG in these groups have been given the appropriate tag (so for example, create_new_staging_es_index and staging_database_restore both have the staging_es_concurrency tag). The concurrency tasks have been updated to accept the name of the tag you want to select on, rather than a hard coded list, ie:

prevent_concurrency = prevent_concurrency_with_dags_with_tag(
    tag=STAGING_ES_CONCURRENCY_TAG,
)

^ This will select all the DAGs that have the given tag, except for the running DAG/the DAG that called the task, and fail immediately if any of them are running.

Screenshot 2024-03-13 at 10 35 10 AM

So now if you add a new ES dag, rather than having to manually update a bunch of lists of DAG dependencies on every single other ES dag, you just have to remember to add the appropriate tag to your new DAG. The concurrency util will raise an error if you forget, as well.

Additional thoughts/alternatives

This comment on the issue explains why it wasn't possible to use the existing feature of Airflow pools to do this. Tags also have the benefit of being very visible in the UI and easy to filter by.

Testing Instructions

These are the intended concurrency groups:

  • staging_es_concurrency
    • staging_database_restore
    • recreate_full_staging_index
    • point_staging_es_alias
    • create_new_staging_es_index
    • create_proportional_by_source_staging_index
  • production_es_concurrency
    • both data refreshes
    • both filtered index creation dags
    • create_new_production_es_index
    • point_production_es_index

For each DAG, run the DAG and check that the appropriate DAGs were waited on/prevented_concurrency_with. You can do this by looking at the logs for the get_dags_in_<tag>_group task, and then checking that the correct number of mapped tasks were made. (Example: create_proportional_by_source_staging_index is in the staging group which has 6 dags. It has a prevent_concurrency_with_dag task generated for every other DAG in the group except itself).

Airflow 2.9 is supposed to include the ability to prove meaningful names for dynamically mapped tasks (instead of just a numerical index), which will make this much easier to read at-a-glance once we upgrade!

Screenshot 2024-03-13 at 10 37 03 AM

Note: The data refreshes are a little special. They use the SingleRunExternalDagSensor to wait on each other, so in their wait_for_es_dags task they should only wait on the other production ES Dags (5 dags total -- the two filtered index, create_new_production_es_index, and point_production_es_index).

Test the actual concurrency utils still works

We should also test the actual concurrency for at least a few options. The easiest way to do this is to trigger them in sequence through the Airflow CLI. Make sure the relevant DAGs are turned on in Airflow, then run just catalog/shell and:

# For example, this would trigger the new staging index DAG and then immediately trigger staging db restore
# We would expect staging_database_restore to pass all of its other `wait` tasks, but poke a few times on the task that
# waits for `create_new_staging_es_index` and only continue once that DAG completes.
> airflow dags trigger create_new_staging_es_index && airflow dags trigger staging_database_restore

A general tip for remembering how the concurrency checks should work: scheduled DAGs (data refreshes, staging database restore) will wait if a conflicting DAG is running, and resume once it completes. DAGs that are only run manually (create_new_es_index, create_filtered_index, recreate_full_staging_index) fail immediately if a conflicting DAG is running.

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (if applicable).

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

stacimc added 16 commits March 8, 2024 14:44
* `point_alias` needs to have trigger_rule=TriggerRule.NONE_FAILED, because the previous step to remove existing alias my be skipped
* however, this is a problem for DAGs that import the entire point_alias taskGroup and try to skip it using a branching operator. Because `point_alias` runs when NONE_FAILED, it will try to run even though the entire taskgroup has been skipped.
* easiest solution is to have all the individual tasks in the point_alias group individually handle skipping if the appropriate params haven't been passed in
@stacimc stacimc added 🟨 priority: medium Not blocking but should be addressed soon 🛠 goal: fix Bug fix 💻 aspect: code Concerns the software code in the repository 🧰 goal: internal improvement Improvement that benefits maintainers, not users 🧱 stack: catalog Related to the catalog and Airflow DAGs labels Mar 13, 2024
@stacimc stacimc self-assigned this Mar 13, 2024
@openverse-bot openverse-bot added the 🏷 status: label work required Needs proper labelling before it can be worked on label Mar 13, 2024
Copy link

Full-stack documentation: https://docs.openverse.org/_preview/3910

Please note that GitHub pages takes a little time to deploy newly pushed code, if the links above don't work or you see old versions, wait 5 minutes and try again.

You can check the GitHub pages deployment action list to see the current status of the deployments.

Changed files 🔄:

@stacimc stacimc removed the 🛠 goal: fix Bug fix label Mar 13, 2024
@stacimc
Copy link
Collaborator Author

stacimc commented Mar 14, 2024

Closing in favor of #3921 to disentangle merge issues

@stacimc stacimc closed this Mar 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository 🧰 goal: internal improvement Improvement that benefits maintainers, not users 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs 🏷 status: label work required Needs proper labelling before it can be worked on
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Improve support for "concurrency pools" in Elasticsearch DAGs
2 participants