Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix advanced provider options for dags that override ingest records #4214

Conversation

stacimc
Copy link
Collaborator

@stacimc stacimc commented Apr 26, 2024

Fixes

Fixes #4207 by @stacimc

This PR aimed to fix a problem with Science Museum, but it turned out the source of the problem was a larger failure in the way many of our providers override aspects of the ProviderDataIngester base class. Consequently this PR actually ends up fixing quite a bit more!

When this is merged, we will be able to do a full DagRun of Science Museum with skip_ingestion_errors enabled, and actually process all possible records. This is blocking in order to enable Science Museum in production (see #4013 for context).

This is a difficult one to explain, so please ping me if anything needs further explanation.

Description

Our provider workflows have several advanced options in DagRun confs -- notably, initial_query_params (which overrides the starting point of the DAG, allowing us to resume a DAG from a point of failure) and skip_ingestion_errors (which skips batches with errors rather than halting the DAG, and then reports them in aggregate at the end).

We also have a number of DAGs that override the ingest_records method in order to run ingestion multiple times for a set of fixed query params. (For example, Science Museum runs ingest_records multiple times for different date ranges. Within each date range, the page number and other params advance as normal). This breaks the advanced conf options for all of these DAGs. For example:

  • If you supply initial_query_params to a DAG that overrides ingest_records, it will start the first batch on those query params. It will also do so every time ingest_records is called. This completely breaks the fixed params.
  • If you enable skip_ingestion_errors, ingestion errors will be skipped and then reported in aggregate -- but at the end of the current call to ingest_records. So for Science Museum for example, enabling skipped_ingestion_errors will only allow the DAG to get through the first date range where it encounters an error. In production, this was in the 1500-1750 date range. So we did not ingest any records after 1750, even though we had skip_ingestion_errors enabled!

This PR updates the ProviderDataIngester base class to add support for fixed_query_params in a way that does not break the advanced conf options, and allows all the DAGs that were previously overriding ingest_records to no longer do so. It does this by adding a get_fixed_query_params method to the class which can optionally be overridden to provide a set of fixed query params, for each of which ingest_records will be called. It also updates the TimeDelineatedProviderDataIngester to work with this new feature. Finally it updates all of the DAGs to use this.

Unfortunately it was not really possible to do this in smaller pieces. The good news is that this PR fixes initial_query_params and skip_ingestion_errors for a ton of DAGs that they would not have previously reliably worked for!

ℹ️ Tip for reviewing: look at the changes to ProviderDataIngester first, then TimeDelineatedProviderDataIngester

Testing Instructions

To test this more easily, I applied the diff below to ProviderDataIngester to make it run more quickly (only processing the first two batches of each fixed param), and to log the query_params being used so you can see that they're advancing as expected.

diff --git a/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
index 3624062b1..bc002e2ef 100644
--- a/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
+++ b/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
@@ -241,14 +241,17 @@ class ProviderDataIngester(ABC):
             return
 
         logger.info(f"Begin ingestion for {self.__class__.__name__}")
+        count = 0
 
-        while should_continue:
+        while should_continue and count < 2:
+            count += 1
             if query_params is None:
                 # Break out of ingestion if no query_params are supplied. This can
                 # happen when the final `override_query_params` is processed.
                 break
 
             try:
+                logger.info(query_params)
                 batch, should_continue = self.get_batch(query_params)
 
                 if batch and len(batch) > 0:

Test some provider DAGs that were not changed/should not be affected

For example I ran Cleveland and ensured that it passed.

Test DAGs that previously overrode ingest_records

Run all the DAGs that previously overrode ingest_records and ensure they do not fail:

  • Finnish Museums (Try setting the date conf option to 2024-04-24 to ensure you get a day with data)
  • Flickr
  • Museum Victoria
  • Science Museum
  • Smithsonian

You should see in the logs that the DAGs run ingestion multiple times for each of their fixed params. Science Museum for example will run _ingest_records for each of its date ranges. You can see in the logs something like this (I've omitted logs that are not relevant for brevity):

[2024-04-27, 00:52:56 UTC] {provider_data_ingester.py:359} INFO - ==Starting ingestion with fixed params: {'date[from]': 0, 'date[to]': 200}==
[2024-04-27, 00:52:56 UTC] {provider_data_ingester.py:254} INFO - {'has_image': 1, 'image_license': 'CC', 'page[size]': 100, 'page[number]': 0, 'date[from]': 0, 'date[to]': 200}
[2024-04-27, 00:53:01 UTC] {provider_data_ingester.py:254} INFO - {'has_image': 1, 'image_license': 'CC', 'page[size]': 100, 'page[number]': 1, 'date[from]': 0, 'date[to]': 200}
[2024-04-27, 00:53:08 UTC] {provider_data_ingester.py:359} INFO - ==Starting ingestion with fixed params: {'date[from]': 200, 'date[to]': 1500}==
[2024-04-27, 00:53:08 UTC] {provider_data_ingester.py:254} INFO - {'has_image': 1, 'image_license': 'CC', 'page[size]': 100, 'page[number]': 0, 'date[from]': 200, 'date[to]': 1500}
[2024-04-27, 00:53:13 UTC] {provider_data_ingester.py:254} INFO - {'has_image': 1, 'image_license': 'CC', 'page[size]': 100, 'page[number]': 1, 'date[from]': 200, 'date[to]': 1500}
[2024-04-27, 00:53:23 UTC] {provider_data_ingester.py:359} INFO - ==Starting ingestion with fixed params: {'date[from]': 1500, 'date[to]': 1750}==
...

Test initial_query_params

Try running Science Museum with initial_query_params set to {"has_image": 1, "image_license": "CC", "page[size]": 100, "page[number]": 4, "date[from]": 200, "date[to]": 1500}. Note that I've started at the second date range (a fixed param) and I've also started at a later page (a non fixed param).

Verify in the logs that it starts ingestion on the correct date range and page number. The page number advances appropriately instead of resetting back to 1. When it advances to the next date range, it advances appropriately (200-1500 to 1500-1750, instead of resetting back to 0-200):

[2024-04-27, 01:06:32 UTC] {provider_data_ingester.py:342} INFO - ==Starting ingestion with fixed params: {'date[from]': 200, 'date[to]': 1500}==
[2024-04-27, 01:06:32 UTC] {provider_data_ingester.py:232} INFO - Using initial_query_params from dag_run conf: {"has_image": 1, "image_license": "CC", "page[size]": 100, "page[number]": 4, "date[from]": 200, "date[to]": 1500}
[2024-04-27, 01:06:32 UTC] {provider_data_ingester.py:254} INFO - {'has_image': 1, 'image_license': 'CC', 'page[size]': 100, 'page[number]': 4, 'date[from]': 200, 'date[to]': 1500}
[2024-04-27, 01:06:35 UTC] {provider_data_ingester.py:254} INFO - {'has_image': 1, 'image_license': 'CC', 'page[size]': 100, 'page[number]': 5, 'date[from]': 200, 'date[to]': 1500}
[2024-04-27, 01:06:44 UTC] {provider_data_ingester.py:355} INFO - ==Starting ingestion with fixed params: {'date[from]': 1500, 'date[to]': 1750}==
[2024-04-27, 01:06:44 UTC] {provider_data_ingester.py:254} INFO - {'has_image': 1, 'image_license': 'CC', 'page[size]': 100, 'page[number]': 0, 'date[from]': 1500, 'date[to]': 1750}
[2024-04-27, 01:06:49 UTC] {provider_data_ingester.py:254} INFO - {'has_image': 1, 'image_license': 'CC', 'page[size]': 100, 'page[number]': 1, 'date[from]': 1500, 'date[to]': 1750}

Test skip_ingestion_errors

Update science_museum.py to raise a ValueError in get_batch_data. Keep your other patch applied so that it will only try to pull 2 batches per date range, and then run the DAG again. This time enable the skip_ingestion_errors option and verify that not only does it skip errors for the first date range, but it continues to the next date range as well.

[2024-04-27, 01:12:42 UTC] {provider_data_ingester.py:342} INFO - ==Starting ingestion with fixed params: {'date[from]': 0, 'date[to]': 200}==
[2024-04-27, 01:12:45 UTC] {provider_data_ingester.py:287} ERROR - Skipping batch due to ingestion error: foo
[2024-04-27, 01:12:49 UTC] {provider_data_ingester.py:287} ERROR - Skipping batch due to ingestion error: foo
[2024-04-27, 01:12:54 UTC] {provider_data_ingester.py:355} INFO - ==Starting ingestion with fixed params: {'date[from]': 200, 'date[to]': 1500}==
[2024-04-27, 01:12:59 UTC] {provider_data_ingester.py:287} ERROR - Skipping batch due to ingestion error: foo
...

Test both options at the same time, with a TimeDelineated provider

The TimeDelineatedProviderDataIngester should be tested separately. You can do this for Finnish or Flickr. Finnish is particularly finicky because it uses fixed params (iterating over buildings) and timestamp pairs.

Update Finnish museums to raise an error during get_batch_data. Then run the DAG with date set to 2024-04-24, skip_ingestion_errors enabled, and initial_query_params:

{
    "field[]":[
        "authors","buildings","id","imageRights","images","subjects","title"
    ],
    "filter[]":[
        "format:'0/Image/'",
        "building:'0/Museovirasto/'",
        "last_indexed:'[2024-04-24T20:09:00Z TO 2024-04-24T20:12:00Z]'"
    ],
    "limit":100,
    "page":650
}

You will have to wait for it to generate all the timestamp pairs. Once it begins ingestion, you should see in the logs that it begins at the Museovirasto building for timestamps [2024-04-24T20:09:00Z TO 2024-04-24T20:12:00Z] and page 650. It should get an error which it will skip and advance to page 651. It should then proceed to the next chronological timestamp for the same building, starting at page 1.

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (if applicable).

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@stacimc stacimc self-assigned this Apr 26, 2024
@github-actions github-actions bot added the 🧱 stack: catalog Related to the catalog and Airflow DAGs label Apr 26, 2024
@openverse-bot openverse-bot added the 🚦 status: awaiting triage Has not been triaged & therefore, not ready for work label Apr 26, 2024
@stacimc stacimc added 🟧 priority: high Stalls work on the project or its dependents 🛠 goal: fix Bug fix 💻 aspect: code Concerns the software code in the repository and removed 🚦 status: awaiting triage Has not been triaged & therefore, not ready for work labels Apr 27, 2024
@stacimc stacimc marked this pull request as ready for review April 27, 2024 02:11
@stacimc stacimc requested review from a team as code owners April 27, 2024 02:11
@stacimc stacimc requested review from fcoveram, krysal and obulat and removed request for fcoveram April 27, 2024 02:11
Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, this is definitely a complex set of circumstances! The ability to pull the fixed parameters out of the initial parameters is really great. I tested most of the cases locally and all worked as expected!

Comment on lines +330 to +337
initial_fixed_params = next(
(
qp
for qp in fixed_query_params
if qp.items() <= self.initial_query_params.items()
),
fixed_query_params[0],
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is...really wild that it works, honestly!

@openverse-bot
Copy link
Collaborator

Based on the high urgency of this PR, the following reviewers are being gently reminded to review this PR:

@krysal
@obulat
This reminder is being automatically generated due to the urgency configuration.

Excluding weekend1 days, this PR was ready for review 2 day(s) ago. PRs labelled with high urgency are expected to be reviewed within 2 weekday(s)2.

@stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings.

Footnotes

  1. Specifically, Saturday and Sunday.

  2. For the purpose of these reminders we treat Monday - Friday as weekdays. Please note that the operation that generates these reminders runs at midnight UTC on Monday - Friday. This means that depending on your timezone, you may be pinged outside of the expected range.

Copy link
Member

@krysal krysal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things are getting hairy around the DAGs configuration—wow! But everything seems under control, which is impressive! 👏 I followed the instructions and tested some cases and params increment correctly. Nothing to add on my part.

Co-authored-by: Krystle Salazar <[email protected]>
@stacimc
Copy link
Collaborator Author

stacimc commented May 1, 2024

Things are getting hairy around the DAGs configuration—wow! But everything seems under control, which is impressive!

Definitely -- we have a lot of use cases to cover 😅 I'm pleased to get more of that complexity into the base class, at least, so that the provider ingesters don't need to know about any of that!

@stacimc stacimc merged commit 7f4fb7c into main May 1, 2024
39 checks passed
@stacimc stacimc deleted the fix/advanced-provider-options-for-dags-that-override-ingest-records branch May 1, 2024 16:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository 🛠 goal: fix Bug fix 🟧 priority: high Stalls work on the project or its dependents 🧱 stack: catalog Related to the catalog and Airflow DAGs
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Science Museum halts early despite skipping ingestion errors
4 participants