-
Notifications
You must be signed in to change notification settings - Fork 54
Add configuration options to skip ingestion errors #650
Conversation
@AetherUnbound @obulat I'm putting this into review for early feedback, but especially curious for thoughts on the overall approach and my question about catching errors in processing individual records versus batches. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really interesting solution to a very difficult problem, @stacimc ! I really like it.
Because errors are caught within ingest_records, this means batches are skipped as opposed to individual records. We could fine tune this by only (or additionally) wrapping get_record_data in the try/except. What do you think?
I think using batches is the best option because a batch uses the same query parameters, so we probably won't win too much if we try re-trying with individual records: we would still make a lot of requests anyways.
We could also add a different mode to the ProviderDataIngester for re-trying the batches that raised errors. It could accept a list of next_query_parameters
and would not attempt to build the new queries. It would only go through the URLs for the failed batches. What do you think, @stacimc ?
@obulat That's a great idea! Definitely feels useful to be able to re-run just the failed batches after a bug fix for example. I've added a |
f"The following errors were encountered during ingestion:\n{errors_str}" | ||
) | ||
|
||
def get_query_params( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was attempting to avoid renaming the current get_next_query_params
method, but I'm not happy with this naming. Any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe determine_query_params
?
This is going to take a little bit of thought to rebase with #645. I'll work on that today. |
00a53d2
to
410e8ab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fantastic! Way more thorough than I was initially thinking with this issue, which is excellent. This gives us a lot of fine-grain control for reruns which is exactly what we need 🤩
I have a few comments, my biggest concern is conformity to the changes in #645 with the main loop.
openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
Show resolved
Hide resolved
openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
Outdated
Show resolved
Hide resolved
openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
Outdated
Show resolved
Hide resolved
openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
Show resolved
Hide resolved
openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
Outdated
Show resolved
Hide resolved
finally: | ||
total = self.commit_records() | ||
logger.info(f"Committed {total} records") | ||
except Exception as error: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the changes in #645, I think we'll need to be careful about bare Exception
catches. It looks like if skip_ingestion_errors
is True
, then execution will continue. This breaks the case where the task is stopped by Airflow. I think we can catch that specific case by handling AirflowException
differently and immediately re-raising that though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extremely good point. I think we'll also want to make sure we raise any ingestion errors that have previously been skipped before we encounter this.
What do you think about making skip_ingestion_errors
a list of Exception types to skip, rather than a simple boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it! Only trouble would be matching on the exceptions since we have to go from a string to an exception type 😅 I guess we could just do if exception.__type__.__name__ in skip_ingestion_errors
? I think as long as we're handling the AirflowException
case (and that's the right case for Airflow stuff in general) then we could add that down the line 🙂
tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py
Show resolved
Hide resolved
tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py
Outdated
Show resolved
Hide resolved
openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is so close! I just have some thoughts about the final exception and what gets logged.
errors_str = "\n".join( | ||
e.repr_with_traceback() for e in self.ingestion_errors | ||
) | ||
return AirflowException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking on this again, should this maybe be a new exception type, like AggregateIngestionError
? AirflowException
doesn't seem to be the right one to use 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this exception will also get sent to Slack, maybe it would be ideal to log the more verbose errors and then raise a single exception at the end that says something along the lines of {len(self.ingestion_errors)} ingestion errors occurred while running
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like AggregateIngestionError
(I had considered making a custom error before and couldn't come up with a name 😅 ). Totally agreed on the error logging -- I was under the impression the Slack messages would truncate but it doesn't look like that's the case, thanks for testing!
Updated to only log the verbose errors, while throwing an AggregateIngestionError
.
openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tweaked the aggregate exception message, hope that's okay! This looks fantastic and works great. I'm very excited for us to be able to employ this going forward 🚀 ⚡ 🔧
openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love how this well this worked together with #645, and the evolution of the error messages in the logs was really entertaining :)
Great improvement, hopefully we'll have more efficient ingestion process with fewer request to the providers.
Fixes
Fixes WordPress/openverse#1653 by @AetherUnbound
Background
Problem
If a provider workflow encounters errors during the
pull_data
step, the task will fail and any partial data that was collected will be loaded. To resume ingestion, the DAG must be manually re-reun, and because the ingestion task has no context for the previous run, it will re-process all of the data from the API from the very beginning. This can be very time consuming!A note about alternative solutions
I attempted several strategies for allowing the
pull_data
task to retry, picking up from the last set of query parameters. This goes against the philosophy of Airflow, which expects tasks to be idempotent. Ultimately, even if we were willing to slightly misuse Airflow in this way, I think it would be very difficult if not impossible. For instance, Airflow actually explicitly clears XComs on every task retry.Description
This PR attempts to tackle the problem with the addition of two new DagRun configuration options, which will be supported out of the box by all provider workflows extending the
ProviderDataIngester
:skip_ingestion_errors
: When set to true, any errors encountered during the fetching/processing of batch data will be caught, and ingestion allowed to continue. At the end of the ingestion, if there were any caught errors the task will still fail and report a summary of all encountered errors.initial_query_params
: When set, this will be passed in as the first set of query_params, and ingestion will resume from there.query_params_list
: A list of query_params. When set, ingestion will run for just this list of query_params. This can be used to re-run ingestion for just the problematic batches.This PR also adds a new
IngestionError
custom exception which extends error messages to also report thequery_params
that were being used when the error was reached. These values can then be pasted into the DagRun configurationinitial_query_params
setting to re-run starting at the failed batch.Notes/To Do
Because errors are caught within
ingest_records
, this means batches are skipped as opposed to individual records. We could fine tune this by only (or additionally) wrappingget_record_data
in the try/except. What do you think?Testing Instructions
Setup
To test this you will need to make a provider script throw periodic errors. I used Cleveland Museum and edited get_batch_data to add the following:
I also updated the
batch_limit
of Cleveland Museum to100
to make it run slightly faster (if you do not do this, you'll need to use values['2000', '5000', '7000']
in the code above).I also capped the amount of records processed by adding an
ingestion_limit
Airflow variable in the UI with a value of 1000.Tests
Let's simulate how we might use these configuration variables in the real world!
Simulate a scheduled DagRun (no conf options supplied) encountering an error
Re-start the DAG from the point at which it failed, and silence errors
You can trigger a manual run with a config by right clicking the play button and selecting the option:
Here's an example of what my configuration looks like. For the
initial_query_params
, I've just copied directly from the error message in the previous step.When
skip_ingestion_errors
is enabled, the DAG should run until theingestion_limit
is reached, and then thepull_data
task should fail with a singleAggregateIngestionError
. Check the logs to see that all of the individual errors with their tracebacks were logged, along with a list of the query_params that caused the issues. This is logged separately for ease of copy/pasting into the dagrun conf.Check the logs to verify that the
initial_query_params
used were the expected ones. You should see something like:skip_ingestion_errors
orinitial_query_params
at a timeSimulate 'fixing the bug' and re-run the DAG for only the failed query_parameters
Our bug is easy to fix: just remove the lines you inserted into the provider script to raise the Exception 😄 Now we want to re-run the DAG but only for the affected batches. Here's my conf (you can copy/paste the list of query_params that resulted in errors from the logs of the failed run for convenience):
Re-run the DAG with the
query_params_list
and check the logs to make sure thatget_batch
was only run for those params!Finally, run a provider_workflow that uses the ProviderDataIngester and which has not been configured to throw errors (eg Science Museum). Make sure it still works/passes.
Checklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin