Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add elasticsearch concurrency tags for Airflow #3921

Merged
merged 11 commits into from
Mar 26, 2024

Conversation

stacimc
Copy link
Collaborator

@stacimc stacimc commented Mar 14, 2024

Fixes

Fixes #3891 by @stacimc

Description

We now have quite a few DAGs that all operate on Elasticsearch or the underlying databases, and therefore must not run concurrently. We've been using the prevent_concurrency_with_dags and wait_for_external_dags utils to accomplish this in each DAG, by passing in a hard-coded list of the external DAGs we want to either wait on or otherwise prevent concurrency with. Since this list is hard-coded for each DAG, it is extremely easy to miss dependencies especially as DAGs are added and changed. A few were missed in the implementation of the Search Relevancy project so far.

This PR introduces two three new Tags that are used to identify DAGs as being part of a "concurrency group": production_elasticsearch_concurrency, and staging_elasticsearch_concurrency for DAGs which affect the elasticsearch cluster, and staging_api_db_concurrency for DAGs which affect the API DB. Each DAG in these groups have been given the appropriate tag (the groups are listed below). The concurrency tasks have been updated to accept the name of the tag you want to select on, rather than a hard coded list, ie:

prevent_concurrency = prevent_concurrency_with_dags_with_tag(
    tag=STAGING_ES_CONCURRENCY_TAG,
)

^ This will select all the DAGs that have the given tag, except for the running DAG/the DAG that called the task, and fail immediately if any of them are running.

Screenshot 2024-03-13 at 10 35 10 AM

So now if you add a new ES dag, rather than having to manually update a bunch of lists of DAG dependencies on every single other ES dag, you just have to remember to add the appropriate tag(s) to your new DAG. The concurrency util will raise an error if you forget, as well.

Additional thoughts/alternatives

This comment on the issue explains why it wasn't possible to use the existing feature of Airflow pools to do this. Tags also have the benefit of being very visible in the UI and easy to filter by.

Testing Instructions

These are the intended concurrency groups:

  • staging_api_db_concurrency
    • recreate_full_staging_index
    • staging_database_restore
  • staging_elasticsearch_concurrency
    • recreate_full_staging_index
    • point_staging_es_alias
    • create_new_staging_es_index
    • create_proportional_by_source_staging_index
  • production_elasticsearch_concurrency
    • both data refreshes
    • both filtered index creation dags
    • create_new_production_es_index
    • point_production_es_index

Note that staging_database_restore is not part of the staging_elasticsearch_concurrency group, because it only affects the API database and therefore DAGs that only operate on the staging ES cluster (like create_new_staging_es_index don't need to block on it. Also note that recreate_full_staging_index is part of both the staging_elasticsearch_concurrency and staging_api_db_concurrency group, as it pulls records from the api db for reindexing.

For each DAG, run the DAG and check that the appropriate DAGs were waited on/prevented_concurrency_with. You can do this by looking at the logs for the get_dags_in_<tag>_group task, and then checking that the correct number of mapped tasks were made. (Example: create_proportional_by_source_staging_index is in the staging group which has 4 dags total. It has a prevent_concurrency_with_dag task generated for each of the 3 DAGs in the group excluding itself). Check the code to see special cases where some dependencies have been excluded for the data refresh.

Airflow 2.9 is supposed to include the ability to prove meaningful names for dynamically mapped tasks (instead of just a numerical index), which will make this much easier to read at-a-glance once we upgrade!

Screenshot 2024-03-13 at 10 37 03 AM

Test the actual concurrency utils still works

We should also test the actual concurrency for at least a few options. The easiest way to do this is to trigger them in sequence through the Airflow CLI. Make sure the relevant DAGs are turned on in Airflow, then run just catalog/shell and:

# For example, this would trigger the recreate full staging index DAG and then immediately trigger staging db restore
# We would expect staging_database_restore to poke a few times while awaiting the `recreate_full_staging_index`
# DAG to complete, then resume once that DAG completes
> airflow dags trigger recreate_full_staging_index && airflow dags trigger staging_database_restore

A general tip for remembering how the concurrency checks should work: scheduled DAGs (data refreshes, staging database restore) will wait if a conflicting DAG is running, and resume once it completes. DAGs that are only run manually (create_new_es_index, create_filtered_index, recreate_full_staging_index) fail immediately if a conflicting DAG is running.

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (if applicable).

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@openverse-bot openverse-bot added 🟨 priority: medium Not blocking but should be addressed soon ✨ goal: improvement Improvement to an existing user-facing feature 💻 aspect: code Concerns the software code in the repository 🚦 status: awaiting triage Has not been triaged & therefore, not ready for work labels Mar 14, 2024
@github-actions github-actions bot added 🧱 stack: catalog Related to the catalog and Airflow DAGs 🧱 stack: documentation Related to Sphinx documentation labels Mar 14, 2024
@stacimc stacimc self-assigned this Mar 14, 2024
@stacimc stacimc removed the 🚦 status: awaiting triage Has not been triaged & therefore, not ready for work label Mar 14, 2024
Copy link

Full-stack documentation: https://docs.openverse.org/_preview/3921

Please note that GitHub pages takes a little time to deploy newly pushed code, if the links above don't work or you see old versions, wait 5 minutes and try again.

You can check the GitHub pages deployment action list to see the current status of the deployments.

Changed files 🔄:

@stacimc stacimc force-pushed the add/elasticsearch-concurrency-groups branch from cffaa7a to 5173842 Compare March 14, 2024 20:13
@stacimc stacimc marked this pull request as ready for review March 14, 2024 20:23
@stacimc stacimc requested review from a team as code owners March 14, 2024 20:23
@stacimc stacimc requested review from fcoveram, dhruvkb and krysal and removed request for fcoveram March 14, 2024 20:23
@stacimc
Copy link
Collaborator Author

stacimc commented Mar 14, 2024

Pinging @AetherUnbound as well since she asked if something like this was possible on a previous PR

Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fantastic! What a cool way to use tags 😄 🚀 - I tested everything locally and it worked as expected, even failed an ES index creation run when the data refresh was running. Only a few small things, otherwise this is good to go!

catalog/dags/common/sensors/utils.py Outdated Show resolved Hide resolved
catalog/dags/common/sensors/utils.py Show resolved Hide resolved
catalog/dags/common/sensors/utils.py Outdated Show resolved Hide resolved
@krysal krysal removed the request for review from dhruvkb March 18, 2024 19:11
Copy link
Member

@krysal krysal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a creative solution to potential concurrency conflicts between DAGs! Curiously, it also resembles the concept of pools.

I have some questions. First, what is the expected result of the test instructions showed? In my case, the staging_database_restore passes the concurrency test, while the create_new_staging_es_index fails it. The logic you mentioned suggests it should be the other way around, but it may be because the staging_database_restore DAG starts with the DAG being turned on.

Second, from what I understand, with the current code, all DAGs from the staging_es_concurrency tag wait for staging_database_restore, but this one only waits for recreate_full_staging_index, which is the only one with whom it really has no conflict. This sounds fine to me if we don't want to complicate things too much with many exceptions. An alternative I can think of is to create another staging_db_concurrency tag for DAGs working at the DB level. Both DAGs can be in that bag while recreate_full_staging_index retains the staging_es_concurrency tag. This keeps dependencies straight. What do you think of this approach?

@stacimc
Copy link
Collaborator Author

stacimc commented Mar 18, 2024

An alternative I can think of is to create another staging_db_concurrency tag for DAGs working at the DB level

It's funny you bring this up -- I just mentioned to @AetherUnbound earlier today that I was thinking of making an issue to do exactly that in the future 😄

The thinking was that for now, it is not actually a problem for any of these DAGs to be slightly over cautious in what DAGs they block on, and we're not introducing dependencies that weren't already hard-coded. Since you also brought it up, though, I'll just go ahead and implement it in this PR; it is the better long-term solution.

@stacimc stacimc marked this pull request as draft March 18, 2024 21:52
@stacimc
Copy link
Collaborator Author

stacimc commented Mar 19, 2024

All requested changes have been made, the biggest of which is simply splitting the staging concurrency groups out into separate DB and ES tags. The PR description has been updated, including the testing instructions -- as @krysal pointed out I had forgotten to update the testing instructions after changing the behavior of the staging_database_restore task, but it should now be accurate!

@stacimc stacimc marked this pull request as ready for review March 19, 2024 22:41
@stacimc stacimc requested a review from krysal March 19, 2024 22:41
@openverse-bot
Copy link
Collaborator

Based on the medium urgency of this PR, the following reviewers are being gently reminded to review this PR:

@krysal
This reminder is being automatically generated due to the urgency configuration.

Excluding weekend1 days, this PR was ready for review 4 day(s) ago. PRs labelled with medium urgency are expected to be reviewed within 4 weekday(s)2.

@stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings.

Footnotes

  1. Specifically, Saturday and Sunday.

  2. For the purpose of these reminders we treat Monday - Friday as weekdays. Please note that the operation that generates these reminders runs at midnight UTC on Monday - Friday. This means that depending on your timezone, you may be pinged outside of the expected range.

@zackkrida
Copy link
Member

@krysal will you have an opportunity to review this this week? If not, please go ahead and assign another reviewer. Thank you!

Copy link
Member

@krysal krysal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. This looks great! 💯

@stacimc stacimc merged commit ed267d8 into main Mar 26, 2024
48 checks passed
@stacimc stacimc deleted the add/elasticsearch-concurrency-groups branch March 26, 2024 21:57
@stacimc stacimc mentioned this pull request Mar 26, 2024
16 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs 🧱 stack: documentation Related to Sphinx documentation
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Improve support for "concurrency pools" in Elasticsearch DAGs
5 participants