Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Add configuration options to skip ingestion errors #650

Merged
merged 24 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8f709bd
Pass dag_run conf to ProviderDataIngester
stacimc Aug 5, 2022
20df903
Update ProviderDataIngester to conditionally skip errors
stacimc Aug 5, 2022
da52cc2
Add tests
stacimc Aug 5, 2022
452e67b
Document configuration options
stacimc Aug 5, 2022
8f86fbf
Improve error formatting
stacimc Aug 5, 2022
50a85d5
Log initial query params for ease of testing
stacimc Aug 5, 2022
f13eff0
Don't catch errors in get_batch
stacimc Aug 5, 2022
4edaa73
Fix test
stacimc Aug 5, 2022
fa0df8b
Fix test
stacimc Aug 9, 2022
f8de955
Do not swallow errors from get_batch and get_should_continue
stacimc Aug 9, 2022
638b1d4
Log errors when they are skipped
stacimc Aug 9, 2022
dda3d78
Print tracebacks in error sumamry
stacimc Aug 9, 2022
0b3ffc6
Add conf option to provide a list of query_params to use
stacimc Aug 10, 2022
116016a
Revert accidental change to Cleveland script
stacimc Aug 10, 2022
85ecedf
Test
stacimc Aug 11, 2022
7719b97
Remove unnecessary printing of next_query_params
stacimc Aug 11, 2022
0809ffb
Log list of bad query params, for convenience of copying
stacimc Aug 11, 2022
410e8ab
Add exception handling back in after rebase
stacimc Aug 12, 2022
5cb47ef
Update method name, fix comments
stacimc Aug 12, 2022
73a7583
Update tests
stacimc Aug 12, 2022
ec08221
Never skip AirflowExceptions
stacimc Aug 13, 2022
1914578
Raise AggregateIngestionError; only log verbose error messages
stacimc Aug 15, 2022
84367f0
Make it easier to copy-paste query_params from logs
stacimc Aug 15, 2022
b44f2b2
Tweak aggregate error message wording
AetherUnbound Aug 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ def __init__(self, error, traceback, query_params):

def __str__(self):
# Append query_param info to error message
return f"""{self.error}
query_params: {self.query_params}"""
return f"{self.error}\nquery_params: {self.query_params}"

def print_with_traceback(self):
def repr_with_traceback(self):
# Append traceback
return f"""{str(self)}
{self.traceback}"""
return f"{str(self)}\n{self.traceback}"


class ProviderDataIngester(ABC):
Expand Down Expand Up @@ -116,6 +114,7 @@ def __init__(self, conf: dict = None, date: str = None):
# just these sets of params.
self.override_query_params = None
if query_params_list := conf.get("query_params_list"):
# Create a generator to facilitate fetching the next set of query_params.
self.override_query_params = (qp for qp in query_params_list)
AetherUnbound marked this conversation as resolved.
Show resolved Hide resolved

def init_media_stores(self) -> dict[str, MediaStore]:
Expand Down Expand Up @@ -148,7 +147,7 @@ def ingest_records(self, **kwargs) -> None:
query_params = self.get_query_params(query_params, **kwargs)
if query_params is None:
# Break out of ingestion if no query_params are supplied. This can
# happen when the final `manual_query_params` is processed.
# happen when the final `override_query_params` is processed.
break

try:
Expand All @@ -161,6 +160,16 @@ def ingest_records(self, **kwargs) -> None:
logger.info("Batch complete.")
should_continue = False

except AirflowException as error:
# AirflowExceptions should not be caught, as execution should not
# continue when the task is being stopped by Airflow.

# If errors have already been caught during processing, raise them
# as well.
if error_summary := self.get_ingestion_errors():
raise error_summary from error
zackkrida marked this conversation as resolved.
Show resolved Hide resolved
raise

except Exception as error:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the changes in #645, I think we'll need to be careful about bare Exception catches. It looks like if skip_ingestion_errors is True, then execution will continue. This breaks the case where the task is stopped by Airflow. I think we can catch that specific case by handling AirflowException differently and immediately re-raising that though!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extremely good point. I think we'll also want to make sure we raise any ingestion errors that have previously been skipped before we encounter this.

What do you think about making skip_ingestion_errors a list of Exception types to skip, rather than a simple boolean?

Copy link
Contributor

@AetherUnbound AetherUnbound Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it! Only trouble would be matching on the exceptions since we have to go from a string to an exception type 😅 I guess we could just do if exception.__type__.__name__ in skip_ingestion_errors? I think as long as we're handling the AirflowException case (and that's the right case for Airflow stuff in general) then we could add that down the line 🙂

ingestion_error = IngestionError(
error, traceback.format_exc(), query_params
Expand All @@ -185,6 +194,16 @@ def ingest_records(self, **kwargs) -> None:
self.commit_records()

# If errors were caught during processing, raise them now
if error_summary := self.get_ingestion_errors():
raise error_summary

def get_ingestion_errors(self) -> AirflowException | None:
""" "
If any errors were skipped during ingestion, returns an AirflowException
with a summary of all errors.

It there are no errors to report, returns None.
"""
if self.ingestion_errors:
# Log the affected query_params
bad_query_params = ", \n".join(
Expand All @@ -195,11 +214,12 @@ def ingest_records(self, **kwargs) -> None:
f"{bad_query_params}"
)
errors_str = "\n".join(
e.print_with_traceback() for e in self.ingestion_errors
e.repr_with_traceback() for e in self.ingestion_errors
)
raise AirflowException(
return AirflowException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking on this again, should this maybe be a new exception type, like AggregateIngestionError? AirflowException doesn't seem to be the right one to use 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this exception will also get sent to Slack, maybe it would be ideal to log the more verbose errors and then raise a single exception at the end that says something along the lines of {len(self.ingestion_errors)} ingestion errors occurred while running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like AggregateIngestionError (I had considered making a custom error before and couldn't come up with a name 😅 ). Totally agreed on the error logging -- I was under the impression the Slack messages would truncate but it doesn't look like that's the case, thanks for testing!

Updated to only log the verbose errors, while throwing an AggregateIngestionError.

f"The following errors were encountered during ingestion:\n{errors_str}"
)
return None

def get_query_params(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was attempting to avoid renaming the current get_next_query_params method, but I'm not happy with this naming. Any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe determine_query_params?

self, prev_query_params: Optional[Dict], **kwargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,11 @@ def test_ingest_records_stops_after_reaching_limit():
assert process_batch_mock.call_count == 1


def test_ingest_records_commits_on_exception(self):
def test_ingest_records_commits_on_exception():
with (
patch.object(self.ingester, "get_batch") as get_batch_mock,
patch.object(
self.ingester, "process_batch", return_value=3
) as process_batch_mock,
patch.object(self.ingester, "commit_records") as commit_mock,
patch.object(ingester, "get_batch") as get_batch_mock,
patch.object(ingester, "process_batch", return_value=3) as process_batch_mock,
patch.object(ingester, "commit_records") as commit_mock,
):
get_batch_mock.side_effect = [
(EXPECTED_BATCH_DATA, True), # First batch
Expand All @@ -212,7 +210,7 @@ def test_ingest_records_commits_on_exception(self):
]

with pytest.raises(ValueError, match="Whoops :C"):
self.ingester.ingest_records()
ingester.ingest_records()

# Check that get batch was only called thrice
assert get_batch_mock.call_count == 3
Expand Down Expand Up @@ -286,41 +284,72 @@ def test_ingest_records_raises_IngestionError():
(EXPECTED_BATCH_DATA, True), # Second batch should not be reached
]

with pytest.raises(Exception) as error:
with pytest.raises(Exception, match="Mock exception message"):
ingester.ingest_records()

# By default, `skip_ingestion_errors` is False and get_batch_data
# is no longer called after encountering an error
assert get_batch_mock.call_count == 1

assert str(error.value) == "Mock exception message"


def test_ingest_records_with_skip_ingestion_errors():
@pytest.mark.parametrize(
"batches, expected_call_count, error_messages",
[
# Multiple errors are skipped
(
[
Exception("Mock exception 1"),
(EXPECTED_BATCH_DATA, True), # First error
Exception("Mock exception 2"),
(EXPECTED_BATCH_DATA, False), # Second error, `should_continue` False
],
4, # get_batch is called until `should_continue` is False, ignoring errors
["Mock exception 1", "Mock exception 2"],
),
# An AirflowException should not be skipped
(
[
(EXPECTED_BATCH_DATA, True),
AirflowException("An Airflow exception"), # Second batch, should raise
(EXPECTED_BATCH_DATA, True), # This batch should not be reached
],
2, # The final batch should not be reached
["An Airflow exception"],
),
# An AirflowException is raised, but there were already other ingestion errors
(
[
Exception("Some other exception"), # First batch, should be skipped
AirflowException("An Airflow exception"), # Second batch, should raise
(EXPECTED_BATCH_DATA, True), # This batch should not be reached
],
2, # The final batch should not be reached
["Some other exception"], # Ingestion errors reported
),
],
)
def test_ingest_records_with_skip_ingestion_errors(
batches, expected_call_count, error_messages
):
ingester = MockProviderDataIngester({"skip_ingestion_errors": True})

with (
patch.object(ingester, "get_batch") as get_batch_mock,
patch.object(ingester, "process_batch", return_value=10),
):
get_batch_mock.side_effect = [
Exception("Mock exception 1"), # First batch
(EXPECTED_BATCH_DATA, True), # Second batch
Exception("Mock exception 2"), # Third batch
(EXPECTED_BATCH_DATA, False), # Final batch
]
get_batch_mock.side_effect = batches

# ingest_records ultimately raises an exception
with pytest.raises(AirflowException) as error:
ingester.ingest_records()

# get_batch was called four times before the exception was thrown,
# despite errors being raised
assert get_batch_mock.call_count == 4
assert get_batch_mock.call_count == expected_call_count

# All errors are summarized in the exception thrown at the end
assert "Mock exception 1" in str(error)
assert "Mock exception 2" in str(error)
for error_message in error_messages:
assert error_message in str(error)


def test_commit_commits_all_stores():
Expand Down