Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Add configuration options to skip ingestion errors #650

Merged
merged 24 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8f709bd
Pass dag_run conf to ProviderDataIngester
stacimc Aug 5, 2022
20df903
Update ProviderDataIngester to conditionally skip errors
stacimc Aug 5, 2022
da52cc2
Add tests
stacimc Aug 5, 2022
452e67b
Document configuration options
stacimc Aug 5, 2022
8f86fbf
Improve error formatting
stacimc Aug 5, 2022
50a85d5
Log initial query params for ease of testing
stacimc Aug 5, 2022
f13eff0
Don't catch errors in get_batch
stacimc Aug 5, 2022
4edaa73
Fix test
stacimc Aug 5, 2022
fa0df8b
Fix test
stacimc Aug 9, 2022
f8de955
Do not swallow errors from get_batch and get_should_continue
stacimc Aug 9, 2022
638b1d4
Log errors when they are skipped
stacimc Aug 9, 2022
dda3d78
Print tracebacks in error sumamry
stacimc Aug 9, 2022
0b3ffc6
Add conf option to provide a list of query_params to use
stacimc Aug 10, 2022
116016a
Revert accidental change to Cleveland script
stacimc Aug 10, 2022
85ecedf
Test
stacimc Aug 11, 2022
7719b97
Remove unnecessary printing of next_query_params
stacimc Aug 11, 2022
0809ffb
Log list of bad query params, for convenience of copying
stacimc Aug 11, 2022
410e8ab
Add exception handling back in after rebase
stacimc Aug 12, 2022
5cb47ef
Update method name, fix comments
stacimc Aug 12, 2022
73a7583
Update tests
stacimc Aug 12, 2022
ec08221
Never skip AirflowExceptions
stacimc Aug 13, 2022
1914578
Raise AggregateIngestionError; only log verbose error messages
stacimc Aug 15, 2022
84367f0
Make it easier to copy-paste query_params from logs
stacimc Aug 15, 2022
b44f2b2
Tweak aggregate error message wording
AetherUnbound Aug 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions openverse_catalog/dags/providers/factory_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from types import FunctionType
from typing import Callable, Sequence

from airflow.models import TaskInstance
from airflow.models import DagRun, TaskInstance
from airflow.utils.dates import cron_presets
from common.constants import MediaType
from common.storage.media import MediaStore
Expand Down Expand Up @@ -45,6 +45,7 @@ def generate_tsv_filenames(
ingestion_callable: Callable,
media_types: list[MediaType],
ti: TaskInstance,
dag_run: DagRun,
args: Sequence = None,
) -> None:
"""
Expand All @@ -71,7 +72,7 @@ def generate_tsv_filenames(
f"Initializing ProviderIngester {ingestion_callable.__name__} in"
f"order to generate store filenames."
)
ingester = ingestion_callable(*args)
ingester = ingestion_callable(dag_run.conf, *args)
stores = ingester.media_stores

# Push the media store output paths to XComs.
Expand All @@ -87,6 +88,7 @@ def pull_media_wrapper(
media_types: list[MediaType],
tsv_filenames: list[str],
ti: TaskInstance,
dag_run: DagRun,
args: Sequence = None,
):
"""
Expand Down Expand Up @@ -116,7 +118,7 @@ def pull_media_wrapper(
# A ProviderDataIngester class was passed instead. First we initialize the
# class, which will initialize the media stores and DelayedRequester.
logger.info(f"Initializing ProviderIngester {ingestion_callable.__name__}")
ingester = ingestion_callable(*args)
ingester = ingestion_callable(dag_run.conf, *args)
stores = ingester.media_stores
run_func = ingester.ingest_records
# args have already been passed into the ingester, we don't need them passed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
import logging
import traceback
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple

from airflow.exceptions import AirflowException
from airflow.models import Variable
from common.requester import DelayedRequester, RetriesExceeded
from common.storage.media import MediaStore
Expand All @@ -12,6 +15,35 @@
logger = logging.getLogger(__name__)


class AggregateIngestionError(Exception):
"""
Custom exception when multiple ingestion errors are skipped and then
raised in aggregate at the end of ingestion.
"""

pass


class IngestionError(Exception):
zackkrida marked this conversation as resolved.
Show resolved Hide resolved
"""
Custom exception which includes information about the query_params that
were being used when the error was encountered.
"""

def __init__(self, error, traceback, query_params):
self.error = error
self.traceback = traceback
self.query_params = json.dumps(query_params)

def __str__(self):
# Append query_param info to error message
return f"{self.error}\nquery_params: {self.query_params}"

def repr_with_traceback(self):
# Append traceback
return f"{str(self)}\n{self.traceback}"


class ProviderDataIngester(ABC):
"""
An abstract base class that initializes media stores and ingests records
Expand Down Expand Up @@ -51,9 +83,10 @@ def endpoint(self):
"""
pass

def __init__(self, date: str = None):
def __init__(self, conf: dict = None, date: str = None):
"""
Optional Arguments:
conf: The configuration dict for the running DagRun
date: Date String in the form YYYY-MM-DD. This is the date for
which running the script will pull data
"""
Expand All @@ -75,6 +108,24 @@ def __init__(self, date: str = None):
self.media_stores = self.init_media_stores()
self.date = date

# dag_run configuration options
conf = conf or {}

# Used to skip over errors and continue ingestion. When enabled, errors
# are not reported until ingestion has completed.
self.skip_ingestion_errors = conf.get("skip_ingestion_errors", False)
self.ingestion_errors: List[IngestionError] = [] # Keep track of errors

# An optional set of initial query params from which to begin ingestion.
self.initial_query_params = conf.get("initial_query_params")

# An optional list of `query_params`. When provided, ingestion will be run for
# just these sets of params.
self.override_query_params = None
if query_params_list := conf.get("query_params_list"):
# Create a generator to facilitate fetching the next set of query_params.
self.override_query_params = (qp for qp in query_params_list)
AetherUnbound marked this conversation as resolved.
Show resolved Hide resolved

def init_media_stores(self) -> dict[str, MediaStore]:
"""
Initialize a media store for each media type supported by this
Expand All @@ -101,10 +152,14 @@ def ingest_records(self, **kwargs) -> None:

logger.info(f"Begin ingestion for {self.__class__.__name__}")

try:
while should_continue:
query_params = self.get_next_query_params(query_params, **kwargs)
while should_continue:
query_params = self.get_query_params(query_params, **kwargs)
if query_params is None:
# Break out of ingestion if no query_params are supplied. This can
# happen when the final `override_query_params` is processed.
break

try:
batch, should_continue = self.get_batch(query_params)

if batch and len(batch) > 0:
Expand All @@ -114,12 +169,100 @@ def ingest_records(self, **kwargs) -> None:
logger.info("Batch complete.")
should_continue = False

if self.limit and record_count >= self.limit:
logger.info(f"Ingestion limit of {self.limit} has been reached.")
should_continue = False
finally:
total = self.commit_records()
logger.info(f"Committed {total} records")
except AirflowException as error:
# AirflowExceptions should not be caught, as execution should not
# continue when the task is being stopped by Airflow.

# If errors have already been caught during processing, raise them
# as well.
if error_summary := self.get_ingestion_errors():
raise error_summary from error
zackkrida marked this conversation as resolved.
Show resolved Hide resolved
raise

except Exception as error:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the changes in #645, I think we'll need to be careful about bare Exception catches. It looks like if skip_ingestion_errors is True, then execution will continue. This breaks the case where the task is stopped by Airflow. I think we can catch that specific case by handling AirflowException differently and immediately re-raising that though!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extremely good point. I think we'll also want to make sure we raise any ingestion errors that have previously been skipped before we encounter this.

What do you think about making skip_ingestion_errors a list of Exception types to skip, rather than a simple boolean?

Copy link
Contributor

@AetherUnbound AetherUnbound Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it! Only trouble would be matching on the exceptions since we have to go from a string to an exception type 😅 I guess we could just do if exception.__type__.__name__ in skip_ingestion_errors? I think as long as we're handling the AirflowException case (and that's the right case for Airflow stuff in general) then we could add that down the line 🙂

ingestion_error = IngestionError(
error, traceback.format_exc(), query_params
)

if self.skip_ingestion_errors:
# Add this to the errors list but continue processing
self.ingestion_errors.append(ingestion_error)
logger.error(f"Skipping batch due to ingestion error: {error}")
continue

# Commit whatever records we were able to process, and rethrow the
# exception so the taskrun fails.
self.commit_records()
raise error from ingestion_error

if self.limit and record_count >= self.limit:
logger.info(f"Ingestion limit of {self.limit} has been reached.")
should_continue = False

# Commit whatever records we were able to process
self.commit_records()

# If errors were caught during processing, raise them now
if error_summary := self.get_ingestion_errors():
raise error_summary

def get_ingestion_errors(self) -> AggregateIngestionError | None:
"""
If any errors were skipped during ingestion, log them as well as the
associated query parameters. Then return an AggregateIngestionError.

It there are no errors to report, returns None.
"""
if self.ingestion_errors:
# Log the affected query_params
bad_query_params = ", \n".join(
[f"{e.query_params}" for e in self.ingestion_errors]
)
logger.info(
"The following query_params resulted in errors: \n"
f"{bad_query_params}"
)
errors_str = "\n".join(
e.repr_with_traceback() for e in self.ingestion_errors
)
logger.error(
f"The following errors were encountered during ingestion:\n{errors_str}"
)
return AggregateIngestionError(
f"{len(self.ingestion_errors)} query batches were skipped due to "
"errors during ingestion using the `skip_ingestion_errors` flag. "
zackkrida marked this conversation as resolved.
Show resolved Hide resolved
"See the log for more details."
)
return None

def get_query_params(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was attempting to avoid renaming the current get_next_query_params method, but I'm not happy with this naming. Any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe determine_query_params?

self, prev_query_params: Optional[Dict], **kwargs
) -> Optional[Dict]:
"""
Returns the next set of query_params for the next request, handling
optional overrides via the dag_run conf.
"""
# If we are getting query_params for the first batch and initial_query_params
# have been set, return them.
if prev_query_params is None and self.initial_query_params:
logger.info(
"Using initial_query_params from dag_run conf:"
f" {json.dumps(self.initial_query_params)}"
)
return self.initial_query_params

# If a list of query_params was provided, return the next value.
if self.override_query_params:
next_params = next(self.override_query_params, None)
logger.info(
"Using query params from `query_params_list` set in dag_run conf:"
f" {next_params}"
)
return next_params

# Default behavior when no conf options are provided; build the next
# set of query params, given the previous.
return self.get_next_query_params(prev_query_params, **kwargs)

@abstractmethod
def get_next_query_params(
Expand Down Expand Up @@ -154,26 +297,26 @@ def get_batch(self, query_params: Dict) -> Tuple[Optional[List], bool]:
batch = None
should_continue = True

# Get the API response
try:
# Get the API response
response_json = self.get_response_json(query_params)

# Build a list of records from the response
batch = self.get_batch_data(response_json)

# Optionally, apply some logic to the response to determine whether
# ingestion should continue or if should be short-circuited. By default
# this will return True and ingestion continues.
should_continue = self.get_should_continue(response_json)

except (
RequestException,
RetriesExceeded,
JSONDecodeError,
ValueError,
TypeError,
) as e:
logger.error(f"Error getting next query parameters due to {e}")
logger.error(f"Error getting response due to {e}")
response_json = None

# Build a list of records from the response
batch = self.get_batch_data(response_json)

# Optionally, apply some logic to the response to determine whether
# ingestion should continue or if should be short-circuited. By default
# this will return True and ingestion continues.
should_continue = self.get_should_continue(response_json)

return batch, should_continue

Expand Down Expand Up @@ -260,4 +403,5 @@ def commit_records(self) -> int:
total = 0
for store in self.media_stores.values():
total += store.commit()
logger.info(f"Committed {total} records")
return total
11 changes: 11 additions & 0 deletions openverse_catalog/dags/providers/provider_dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@
previously downloaded data, and update any data that needs updating
(eg. popularity metrics).

Provider workflows which extend the ProviderDataIngester class support a few DagRun
configuration variables:

* `skip_ingestion_errors`: When set to true, errors encountered during ingestion will
be caught to allow ingestion to continue. The `pull_data` task will still fail when
ingestion is complete, and report a summary of all encountered errors. By default
`skip_ingestion_errors` is False.
* `initial_query_params`: An optional dict of query parameters with which to begin
ingestion. This allows a user to manually force ingestion to resume from a particular
batch, for example when retrying after an error.

You can find more background information on the loading process in the following
issues and related PRs:

Expand Down
Loading