From 8f709bdb24ba536c2efba8a08573b7561f243047 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 5 Aug 2022 16:04:00 -0700 Subject: [PATCH 01/24] Pass dag_run conf to ProviderDataIngester --- openverse_catalog/dags/providers/factory_utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/openverse_catalog/dags/providers/factory_utils.py b/openverse_catalog/dags/providers/factory_utils.py index 89cbab88d..a2fac4e62 100644 --- a/openverse_catalog/dags/providers/factory_utils.py +++ b/openverse_catalog/dags/providers/factory_utils.py @@ -4,7 +4,7 @@ from types import FunctionType from typing import Callable, Sequence -from airflow.models import TaskInstance +from airflow.models import DagRun, TaskInstance from airflow.utils.dates import cron_presets from common.constants import MediaType from common.storage.media import MediaStore @@ -45,6 +45,7 @@ def generate_tsv_filenames( ingestion_callable: Callable, media_types: list[MediaType], ti: TaskInstance, + dag_run: DagRun, args: Sequence = None, ) -> None: """ @@ -71,7 +72,7 @@ def generate_tsv_filenames( f"Initializing ProviderIngester {ingestion_callable.__name__} in" f"order to generate store filenames." ) - ingester = ingestion_callable(*args) + ingester = ingestion_callable(dag_run.conf, *args) stores = ingester.media_stores # Push the media store output paths to XComs. @@ -87,6 +88,7 @@ def pull_media_wrapper( media_types: list[MediaType], tsv_filenames: list[str], ti: TaskInstance, + dag_run: DagRun, args: Sequence = None, ): """ @@ -116,7 +118,7 @@ def pull_media_wrapper( # A ProviderDataIngester class was passed instead. First we initialize the # class, which will initialize the media stores and DelayedRequester. logger.info(f"Initializing ProviderIngester {ingestion_callable.__name__}") - ingester = ingestion_callable(*args) + ingester = ingestion_callable(dag_run.conf, *args) stores = ingester.media_stores run_func = ingester.ingest_records # args have already been passed into the ingester, we don't need them passed From 20df90371298cb81542e13eac3743849404e38e4 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 5 Aug 2022 16:05:01 -0700 Subject: [PATCH 02/24] Update ProviderDataIngester to conditionally skip errors --- .../provider_data_ingester.py | 77 ++++++++++++++++--- 1 file changed, 65 insertions(+), 12 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index cf1531455..721970f00 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -1,7 +1,9 @@ +import json import logging from abc import ABC, abstractmethod from typing import Dict, List, Optional, Tuple +from airflow.exceptions import AirflowException from airflow.models import Variable from common.requester import DelayedRequester, RetriesExceeded from common.storage.media import MediaStore @@ -12,6 +14,24 @@ logger = logging.getLogger(__name__) +class IngestionError(Exception): + """ + Custom exception which includes information about the query_params that + were being used when the error was encountered. + """ + + def __init__(self, error, query_params, next_query_params): + self.error = error + self.query_params = query_params + self.next_query_params = next_query_params + + def __str__(self): + # Append query_param info to error message + return f"""{self.error} + query_params: {json.dumps(self.query_params)} + next_query_params: {json.dumps(self.next_query_params)}""" + + class ProviderDataIngester(ABC): """ An abstract base class that initializes media stores and ingests records @@ -51,9 +71,10 @@ def endpoint(self): """ pass - def __init__(self, date: str = None): + def __init__(self, conf: dict = None, date: str = None): """ Optional Arguments: + conf: The configuration dict for the running DagRun date: Date String in the form YYYY-MM-DD. This is the date for which running the script will pull data """ @@ -75,6 +96,10 @@ def __init__(self, date: str = None): self.media_stores = self.init_media_stores() self.date = date + # Keep track of ingestion errors + self.conf = conf or {} + self.ingestion_errors = [] + def init_media_stores(self) -> dict[str, MediaStore]: """ Initialize a media store for each media type supported by this @@ -97,14 +122,16 @@ def ingest_records(self, **kwargs) -> None: """ should_continue = True record_count = 0 - query_params = None - logger.info(f"Begin ingestion for {self.__class__.__name__}") + # Get initial query_params + query_params = self.conf.get("initial_query_params") + if not query_params: + query_params = self.get_next_query_params(None, **kwargs) - try: - while should_continue: - query_params = self.get_next_query_params(query_params, **kwargs) + logger.info(f"Begin ingestion for {self.__class__.__name__}") + while should_continue: + try: batch, should_continue = self.get_batch(query_params) if batch and len(batch) > 0: @@ -114,12 +141,37 @@ def ingest_records(self, **kwargs) -> None: logger.info("Batch complete.") should_continue = False - if self.limit and record_count >= self.limit: - logger.info(f"Ingestion limit of {self.limit} has been reached.") - should_continue = False - finally: - total = self.commit_records() - logger.info(f"Committed {total} records") + except Exception as e: + next_query_params = self.get_next_query_params(query_params, **kwargs) + ingestion_error = IngestionError(e, query_params, next_query_params) + + if self.conf.get("skip_ingestion_errors", False): + # Add this to the errors list but continue processing + self.ingestion_errors.append(ingestion_error) + query_params = next_query_params + continue + + # Commit whatever records we were able to process, and rethrow the + # exception so the taskrun fails. + self.commit_records() + raise ingestion_error + + if self.limit and record_count >= self.limit: + logger.info(f"Ingestion limit of {self.limit} has been reached.") + should_continue = False + + # Update query_params before iterating + query_params = self.get_next_query_params(query_params, **kwargs) + + # Commit whatever records we were able to process + self.commit_records() + + # If errors were caught during processing, raise them now + if self.ingestion_errors: + errors_str = ("\n").join(str(e) for e in self.ingestion_errors) + raise AirflowException( + f"The following errors were encountered during ingestion:\n{errors_str}" + ) @abstractmethod def get_next_query_params( @@ -260,4 +312,5 @@ def commit_records(self) -> int: total = 0 for store in self.media_stores.values(): total += store.commit() + logger.info(f"Committed {total} records") return total From da52cc211001c1c7a425a435500f59efe122f5ba Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 5 Aug 2022 16:07:38 -0700 Subject: [PATCH 03/24] Add tests --- .../test_provider_data_ingester.py | 416 ++++++++++-------- tests/dags/providers/test_factory_utils.py | 17 +- 2 files changed, 253 insertions(+), 180 deletions(-) diff --git a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py index a3963c9e9..3948fd3a8 100644 --- a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py +++ b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py @@ -1,11 +1,12 @@ import json import os -import unittest from unittest.mock import call, patch import pytest +from airflow.exceptions import AirflowException from common.storage.audio import AudioStore, MockAudioStore from common.storage.image import ImageStore, MockImageStore +from providers.provider_api_scripts.provider_data_ingester import IngestionError from tests.dags.providers.provider_api_scripts.resources.provider_data_ingester.mock_provider_data_ingester import ( AUDIO_PROVIDER, @@ -20,223 +21,286 @@ os.path.abspath(os.path.dirname(__file__)), "resources/provider_data_ingester" ) +ingester = MockProviderDataIngester() +audio_store = MockAudioStore(AUDIO_PROVIDER) +image_store = MockImageStore(IMAGE_PROVIDER) +ingester.media_stores = {"audio": audio_store, "image": image_store} -class TestProviderDataIngester(unittest.TestCase): - def setUp(self): - self.ingester = MockProviderDataIngester() - # Use mock media stores - self.audio_store = MockAudioStore(AUDIO_PROVIDER) - self.image_store = MockImageStore(IMAGE_PROVIDER) - self.ingester.media_stores = { - "audio": self.audio_store, - "image": self.image_store, - } +def _get_resource_json(json_name): + with open(os.path.join(RESOURCES, json_name)) as f: + resource_json = json.load(f) + return resource_json - def _get_resource_json(self, json_name): - with open(os.path.join(RESOURCES, json_name)) as f: - resource_json = json.load(f) - return resource_json - def test_init_media_stores(self): - ingester = MockProviderDataIngester() +def test_init_media_stores(): + ingester = MockProviderDataIngester() + + # We should have two media stores, with the correct types + assert len(ingester.media_stores) == 2 + assert isinstance(ingester.media_stores["audio"], AudioStore) + assert isinstance(ingester.media_stores["image"], ImageStore) + + +def test_init_with_date(): + ingester = MockProviderDataIngester(date="2020-06-27") + assert ingester.date == "2020-06-27" - # We should have two media stores, with the correct types - assert len(ingester.media_stores) == 2 - assert isinstance(ingester.media_stores["audio"], AudioStore) - assert isinstance(ingester.media_stores["image"], ImageStore) - def test_init_with_date(self): - ingester = MockProviderDataIngester(date="2020-06-27") - assert ingester.date == "2020-06-27" +def test_init_without_date(): + ingester = MockProviderDataIngester() + assert ingester.date is None + + +def test_batch_limit_is_capped_to_ingestion_limit(): + with patch( + "providers.provider_api_scripts.provider_data_ingester.Variable" + ) as MockVariable: + MockVariable.get.side_effect = [20] - def test_init_without_date(self): ingester = MockProviderDataIngester() - assert ingester.date is None + assert ingester.batch_limit == 20 + assert ingester.limit == 20 - def test_batch_limit_is_capped_to_ingestion_limit(self): - with patch( - "providers.provider_api_scripts.provider_data_ingester.Variable" - ) as MockVariable: - MockVariable.get.side_effect = [20] - ingester = MockProviderDataIngester() - assert ingester.batch_limit == 20 - assert ingester.limit == 20 +def test_get_batch_data(): + response_json = _get_resource_json("complete_response.json") + batch = ingester.get_batch_data(response_json) - def test_get_batch_data(self): - response_json = self._get_resource_json("complete_response.json") - batch = self.ingester.get_batch_data(response_json) + assert batch == EXPECTED_BATCH_DATA - assert batch == EXPECTED_BATCH_DATA - def test_process_batch_adds_items_to_correct_media_stores(self): - with ( - patch.object(self.audio_store, "add_item") as audio_store_mock, - patch.object(self.image_store, "add_item") as image_store_mock, - ): - record_count = self.ingester.process_batch(EXPECTED_BATCH_DATA) +def test_process_batch_adds_items_to_correct_media_stores(): + with ( + patch.object(audio_store, "add_item") as audio_store_mock, + patch.object(image_store, "add_item") as image_store_mock, + ): + record_count = ingester.process_batch(EXPECTED_BATCH_DATA) - assert record_count == 3 - assert audio_store_mock.call_count == 1 - assert image_store_mock.call_count == 2 + assert record_count == 3 + assert audio_store_mock.call_count == 1 + assert image_store_mock.call_count == 2 - def test_process_batch_handles_list_of_records(self): - with ( - patch.object(self.audio_store, "add_item") as audio_store_mock, - patch.object(self.image_store, "add_item") as image_store_mock, - patch.object(self.ingester, "get_record_data") as get_record_data_mock, - ): - # Mock `get_record_data` to return a list of records - get_record_data_mock.return_value = MOCK_RECORD_DATA_LIST - record_count = self.ingester.process_batch(EXPECTED_BATCH_DATA[:1]) +def test_process_batch_handles_list_of_records(): + with ( + patch.object(audio_store, "add_item") as audio_store_mock, + patch.object(image_store, "add_item") as image_store_mock, + patch.object(ingester, "get_record_data") as get_record_data_mock, + ): + # Mock `get_record_data` to return a list of records + get_record_data_mock.return_value = MOCK_RECORD_DATA_LIST - # Both records are added, and to the appropriate stores - assert record_count == 2 - assert audio_store_mock.call_count == 1 - assert image_store_mock.call_count == 1 + record_count = ingester.process_batch(EXPECTED_BATCH_DATA[:1]) - def test_ingest_records(self): - with ( - patch.object(self.ingester, "get_batch") as get_batch_mock, - patch.object( - self.ingester, "process_batch", return_value=3 - ) as process_batch_mock, - patch.object(self.ingester, "commit_records") as commit_mock, - ): - get_batch_mock.side_effect = [ - (EXPECTED_BATCH_DATA, True), # First batch - (EXPECTED_BATCH_DATA, True), # Second batch - (None, True), # Final batch - ] + # Both records are added, and to the appropriate stores + assert record_count == 2 + assert audio_store_mock.call_count == 1 + assert image_store_mock.call_count == 1 - self.ingester.ingest_records() - # get_batch is not called again after getting None - assert get_batch_mock.call_count == 3 +def test_ingest_records(): + with ( + patch.object(ingester, "get_batch") as get_batch_mock, + patch.object(ingester, "process_batch", return_value=3) as process_batch_mock, + patch.object(ingester, "commit_records") as commit_mock, + ): + get_batch_mock.side_effect = [ + (EXPECTED_BATCH_DATA, True), # First batch + (EXPECTED_BATCH_DATA, True), # Second batch + (None, True), # Final batch + ] - # process_batch is called for each batch - process_batch_mock.assert_has_calls( - [ - call(EXPECTED_BATCH_DATA), - call(EXPECTED_BATCH_DATA), - ] - ) - # process_batch is not called for a third time with None - assert process_batch_mock.call_count == 2 + ingester.ingest_records() - assert commit_mock.called + # get_batch is not called again after getting None + assert get_batch_mock.call_count == 3 - def test_ingest_records_halts_ingestion_when_should_continue_is_false(self): - with ( - patch.object(self.ingester, "get_batch") as get_batch_mock, - patch.object( - self.ingester, "process_batch", return_value=3 - ) as process_batch_mock, - ): - get_batch_mock.side_effect = [ - (EXPECTED_BATCH_DATA, False), # First batch, should_continue is False + # process_batch is called for each batch + process_batch_mock.assert_has_calls( + [ + call(EXPECTED_BATCH_DATA), + call(EXPECTED_BATCH_DATA), ] + ) + # process_batch is not called for a third time with None + assert process_batch_mock.call_count == 2 - self.ingester.ingest_records() + assert commit_mock.called - # get_batch is not called a second time - assert get_batch_mock.call_count == 1 - assert process_batch_mock.call_count == 1 - process_batch_mock.assert_has_calls( - [ - call(EXPECTED_BATCH_DATA), - ] - ) +def test_ingest_records_halts_ingestion_when_should_continue_is_false(): + with ( + patch.object(ingester, "get_batch") as get_batch_mock, + patch.object(ingester, "process_batch", return_value=3) as process_batch_mock, + ): + get_batch_mock.side_effect = [ + (EXPECTED_BATCH_DATA, False), # First batch, should_continue is False + ] - def test_ingest_records_does_not_process_empty_batch(self): - with ( - patch.object(self.ingester, "get_batch") as get_batch_mock, - patch.object( - self.ingester, "process_batch", return_value=3 - ) as process_batch_mock, - ): - get_batch_mock.side_effect = [ - ([], True), # Empty batch + ingester.ingest_records() + + # get_batch is not called a second time + assert get_batch_mock.call_count == 1 + + assert process_batch_mock.call_count == 1 + process_batch_mock.assert_has_calls( + [ + call(EXPECTED_BATCH_DATA), ] + ) - self.ingester.ingest_records() - # get_batch is not called a second time - assert get_batch_mock.call_count == 1 - # process_batch is not called with an empty batch - assert not process_batch_mock.called - - def test_ingest_records_stops_after_reaching_limit(self): - # Set the ingestion limit for the test to one batch - with patch( - "providers.provider_api_scripts.provider_data_ingester.Variable" - ) as MockVariable: - # Mock the calls to Variable.get, in order - MockVariable.get.side_effect = [3] - - ingester = MockProviderDataIngester() - - with ( - patch.object(ingester, "get_batch") as get_batch_mock, - patch.object( - ingester, "process_batch", return_value=3 - ) as process_batch_mock, - ): - get_batch_mock.side_effect = [ - (EXPECTED_BATCH_DATA, True), # First batch - (EXPECTED_BATCH_DATA, True), # Second batch - (None, True), # Final batch - ] - - ingester.ingest_records() - - # get_batch is not called again after the first batch - assert get_batch_mock.call_count == 1 - assert process_batch_mock.call_count == 1 - - def test_ingest_records_commits_on_exception(self): +def test_ingest_records_does_not_process_empty_batch(): + with ( + patch.object(ingester, "get_batch") as get_batch_mock, + patch.object(ingester, "process_batch", return_value=3) as process_batch_mock, + ): + get_batch_mock.side_effect = [ + ([], True), # Empty batch + ] + + ingester.ingest_records() + + # get_batch is not called a second time + assert get_batch_mock.call_count == 1 + # process_batch is not called with an empty batch + assert not process_batch_mock.called + + +def test_ingest_records_stops_after_reaching_limit(): + # Set the ingestion limit for the test to one batch + with patch( + "providers.provider_api_scripts.provider_data_ingester.Variable" + ) as MockVariable: + # Mock the calls to Variable.get, in order + MockVariable.get.side_effect = [3] + + ingester = MockProviderDataIngester() + with ( - patch.object(self.ingester, "get_batch") as get_batch_mock, + patch.object(ingester, "get_batch") as get_batch_mock, patch.object( - self.ingester, "process_batch", return_value=3 + ingester, "process_batch", return_value=3 ) as process_batch_mock, - patch.object(self.ingester, "commit_records") as commit_mock, ): get_batch_mock.side_effect = [ (EXPECTED_BATCH_DATA, True), # First batch (EXPECTED_BATCH_DATA, True), # Second batch - ValueError("Whoops :C"), # Problem batch - (EXPECTED_BATCH_DATA, True), # Fourth batch, should not be reached + (None, True), # Final batch ] - with pytest.raises(ValueError, match="Whoops :C"): - self.ingester.ingest_records() + ingester.ingest_records() - # Check that get batch was only called thrice - assert get_batch_mock.call_count == 3 + # get_batch is not called again after the first batch + assert get_batch_mock.call_count == 1 + assert process_batch_mock.call_count == 1 - # process_batch is called for each successful batch - process_batch_mock.assert_has_calls( - [ - call(EXPECTED_BATCH_DATA), - call(EXPECTED_BATCH_DATA), - ] - ) - # process_batch is not called for a third time with exception - assert process_batch_mock.call_count == 2 - # Even with the exception, records were still saved - assert commit_mock.called +def test_ingest_records_commits_on_exception(self): + with ( + patch.object(self.ingester, "get_batch") as get_batch_mock, + patch.object( + self.ingester, "process_batch", return_value=3 + ) as process_batch_mock, + patch.object(self.ingester, "commit_records") as commit_mock, + ): + get_batch_mock.side_effect = [ + (EXPECTED_BATCH_DATA, True), # First batch + (EXPECTED_BATCH_DATA, True), # Second batch + ValueError("Whoops :C"), # Problem batch + (EXPECTED_BATCH_DATA, True), # Fourth batch, should not be reached + ] + + with pytest.raises(ValueError, match="Whoops :C"): + self.ingester.ingest_records() - def test_commit_commits_all_stores(self): - with ( - patch.object(self.audio_store, "commit") as audio_store_mock, - patch.object(self.image_store, "commit") as image_store_mock, - ): - self.ingester.commit_records() + # Check that get batch was only called thrice + assert get_batch_mock.call_count == 3 - assert audio_store_mock.called - assert image_store_mock.called + # process_batch is called for each successful batch + process_batch_mock.assert_has_calls( + [ + call(EXPECTED_BATCH_DATA), + call(EXPECTED_BATCH_DATA), + ] + ) + # process_batch is not called for a third time with exception + assert process_batch_mock.call_count == 2 + + # Even with the exception, records were still saved + assert commit_mock.called + + +def test_ingest_records_uses_initial_query_params_from_dagrun_conf(): + # Initialize the ingester with a conf + ingester = MockProviderDataIngester( + {"initial_query_params": {"has_image": 1, "page": 5}} + ) + + with ( + patch.object(ingester, "get_batch", return_value=([], False)) as get_batch_mock, + ): + ingester.ingest_records() + + # get_batch is called with the query_params supplied in the conf + get_batch_mock.assert_called_with({"has_image": 1, "page": 5}) + + +def test_ingest_records_raises_IngestionError(): + with (patch.object(ingester, "get_batch") as get_batch_mock,): + get_batch_mock.side_effect = [ + Exception("Mock exception message"), + (EXPECTED_BATCH_DATA, True), # Second batch should not be reached + ] + + with pytest.raises(IngestionError) as error: + ingester.ingest_records() + + # By default, `skip_ingestion_errors` is False and get_batch_data + # is no longer called after encountering an error + assert get_batch_mock.call_count == 1 + + assert str(error.value) == ( + "Mock exception message\n" + ' query_params: {"has_image": 1, "page": 1}\n' + ' next_query_params: {"has_image": 1, "page": 1}' + ) + + +def test_ingest_records_with_skip_ingestion_errors(): + ingester = MockProviderDataIngester({"skip_ingestion_errors": True}) + + with ( + patch.object(ingester, "get_batch") as get_batch_mock, + patch.object(ingester, "process_batch", return_value=10), + ): + get_batch_mock.side_effect = [ + Exception("Mock exception 1"), # First batch + (EXPECTED_BATCH_DATA, True), # Second batch + Exception("Mock exception 2"), # Third batch + (EXPECTED_BATCH_DATA, False), # Final batch + ] + + # ingest_records ultimately raises an exception + with pytest.raises(AirflowException) as error: + ingester.ingest_records() + + # get_batch was called four times before the exception was thrown, + # despite errors being raised + assert get_batch_mock.call_count == 4 + + # All errors are summarized in the exception thrown at the end + assert "Mock exception 1" in str(error) + assert "Mock exception 2" in str(error) + + +def test_commit_commits_all_stores(): + with ( + patch.object(audio_store, "commit") as audio_store_mock, + patch.object(image_store, "commit") as image_store_mock, + ): + ingester.commit_records() + + assert audio_store_mock.called + assert image_store_mock.called diff --git a/tests/dags/providers/test_factory_utils.py b/tests/dags/providers/test_factory_utils.py index ab33c08c4..d44b05f9a 100644 --- a/tests/dags/providers/test_factory_utils.py +++ b/tests/dags/providers/test_factory_utils.py @@ -2,7 +2,7 @@ import pytest import requests -from airflow.models import TaskInstance +from airflow.models import DagRun, TaskInstance from pendulum import datetime from providers import factory_utils @@ -17,6 +17,11 @@ def ti_mock() -> TaskInstance: return mock.MagicMock(spec=TaskInstance) +@pytest.fixture +def dagrun_mock() -> DagRun: + return mock.MagicMock(spec=DagRun) + + @pytest.fixture def internal_func_mock(): """ @@ -29,7 +34,7 @@ def internal_func_mock(): fdi = FakeDataIngester() -def _set_up_ingester(mock_func, value): +def _set_up_ingester(mock_func, conf, value): """ Set up ingest records as a proxy for calling the mock function, then return the instance. This is necessary because the args are only handed in during @@ -111,12 +116,15 @@ def test_load_provider_script(func, media_types, stores): (FakeDataIngesterClass, 2, list(fdi.media_stores.values())), ], ) -def test_generate_tsv_filenames(func, media_types, stores, ti_mock, internal_func_mock): +def test_generate_tsv_filenames( + func, media_types, stores, ti_mock, dagrun_mock, internal_func_mock +): value = 42 factory_utils.generate_tsv_filenames( func, media_types, ti_mock, + dagrun_mock, args=[internal_func_mock, value], ) # There should be one call to xcom_push for each provided store @@ -189,7 +197,7 @@ def test_generate_tsv_filenames(func, media_types, stores, ti_mock, internal_fun ], ) def test_pull_media_wrapper( - func, media_types, tsv_filenames, stores, ti_mock, internal_func_mock + func, media_types, tsv_filenames, stores, ti_mock, dagrun_mock, internal_func_mock ): value = 42 factory_utils.pull_media_wrapper( @@ -197,6 +205,7 @@ def test_pull_media_wrapper( media_types, tsv_filenames, ti_mock, + dagrun_mock, args=[internal_func_mock, value], ) # We should have one XCom push for duration From 452e67be1ae5a5464b0b0bbe987cd600030c3d3a Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 5 Aug 2022 16:08:02 -0700 Subject: [PATCH 04/24] Document configuration options --- .../dags/providers/provider_dag_factory.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/openverse_catalog/dags/providers/provider_dag_factory.py b/openverse_catalog/dags/providers/provider_dag_factory.py index 3aeb8c59d..28cafc4ef 100644 --- a/openverse_catalog/dags/providers/provider_dag_factory.py +++ b/openverse_catalog/dags/providers/provider_dag_factory.py @@ -39,6 +39,17 @@ previously downloaded data, and update any data that needs updating (eg. popularity metrics). +Provider workflows which extend the ProviderDataIngester class support a few DagRun +configuration variables: + +* `skip_ingestion_errors`: When set to true, errors encountered during ingestion will +be caught to allow ingestion to continue. The `pull_data` task will still fail when +ingestion is complete, and report a summary of all encountered errors. By default +`skip_ingestion_errors` is False. +* `initial_query_params`: An optional dict of query parameters with which to begin +ingestion. This allows a user to manually force ingestion to resume from a particular +batch, for example when retrying after an error. + You can find more background information on the loading process in the following issues and related PRs: From 8f86fbf4f362861a60382cc308c6ac84ccdbb361 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 5 Aug 2022 16:16:29 -0700 Subject: [PATCH 05/24] Improve error formatting --- .../providers/provider_api_scripts/provider_data_ingester.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 721970f00..2d07b0289 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -154,7 +154,7 @@ def ingest_records(self, **kwargs) -> None: # Commit whatever records we were able to process, and rethrow the # exception so the taskrun fails. self.commit_records() - raise ingestion_error + raise e from ingestion_error if self.limit and record_count >= self.limit: logger.info(f"Ingestion limit of {self.limit} has been reached.") From 50a85d559158df0dee95b183317f3b13e85b5216 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 5 Aug 2022 16:28:05 -0700 Subject: [PATCH 06/24] Log initial query params for ease of testing --- .../providers/provider_api_scripts/provider_data_ingester.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 2d07b0289..df293815e 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -128,7 +128,10 @@ def ingest_records(self, **kwargs) -> None: if not query_params: query_params = self.get_next_query_params(None, **kwargs) - logger.info(f"Begin ingestion for {self.__class__.__name__}") + logger.info( + f"Begin ingestion for {self.__class__.__name__}" + f" using initial query params: {query_params}" + ) while should_continue: try: From f13eff0fa36b7ca9ae4df4107c6e8a68c64c0e01 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 5 Aug 2022 16:28:48 -0700 Subject: [PATCH 07/24] Don't catch errors in get_batch --- .../provider_data_ingester.py | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index df293815e..c20aa5018 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -209,26 +209,16 @@ def get_batch(self, query_params: Dict) -> Tuple[Optional[List], bool]: batch = None should_continue = True - try: - # Get the API response - response_json = self.get_response_json(query_params) - - # Build a list of records from the response - batch = self.get_batch_data(response_json) - - # Optionally, apply some logic to the response to determine whether - # ingestion should continue or if should be short-circuited. By default - # this will return True and ingestion continues. - should_continue = self.get_should_continue(response_json) - - except ( - RequestException, - RetriesExceeded, - JSONDecodeError, - ValueError, - TypeError, - ) as e: - logger.error(f"Error getting next query parameters due to {e}") + # Get the API response + response_json = self.get_response_json(query_params) + + # Build a list of records from the response + batch = self.get_batch_data(response_json) + + # Optionally, apply some logic to the response to determine whether + # ingestion should continue or if should be short-circuited. By default + # this will return True and ingestion continues. + should_continue = self.get_should_continue(response_json) return batch, should_continue From 4edaa73324eb023db1e59d56129da69302ba02e7 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 5 Aug 2022 16:46:20 -0700 Subject: [PATCH 08/24] Fix test --- .../provider_api_scripts/test_provider_data_ingester.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py index 3948fd3a8..824980787 100644 --- a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py +++ b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py @@ -6,7 +6,6 @@ from airflow.exceptions import AirflowException from common.storage.audio import AudioStore, MockAudioStore from common.storage.image import ImageStore, MockImageStore -from providers.provider_api_scripts.provider_data_ingester import IngestionError from tests.dags.providers.provider_api_scripts.resources.provider_data_ingester.mock_provider_data_ingester import ( AUDIO_PROVIDER, @@ -254,18 +253,14 @@ def test_ingest_records_raises_IngestionError(): (EXPECTED_BATCH_DATA, True), # Second batch should not be reached ] - with pytest.raises(IngestionError) as error: + with pytest.raises(Exception) as error: ingester.ingest_records() # By default, `skip_ingestion_errors` is False and get_batch_data # is no longer called after encountering an error assert get_batch_mock.call_count == 1 - assert str(error.value) == ( - "Mock exception message\n" - ' query_params: {"has_image": 1, "page": 1}\n' - ' next_query_params: {"has_image": 1, "page": 1}' - ) + assert str(error.value) == "Mock exception message" def test_ingest_records_with_skip_ingestion_errors(): From fa0df8b718299edd1b7e437b7e5a2d52252ac1d3 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 9 Aug 2022 11:47:47 -0700 Subject: [PATCH 09/24] Fix test --- tests/dags/providers/test_factory_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dags/providers/test_factory_utils.py b/tests/dags/providers/test_factory_utils.py index d44b05f9a..7a2521fba 100644 --- a/tests/dags/providers/test_factory_utils.py +++ b/tests/dags/providers/test_factory_utils.py @@ -34,7 +34,7 @@ def internal_func_mock(): fdi = FakeDataIngester() -def _set_up_ingester(mock_func, conf, value): +def _set_up_ingester(mock_conf, mock_func, value): """ Set up ingest records as a proxy for calling the mock function, then return the instance. This is necessary because the args are only handed in during From f8de955e2ee464be1005a16b2a61163194fbc336 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 9 Aug 2022 14:49:08 -0700 Subject: [PATCH 10/24] Do not swallow errors from get_batch and get_should_continue --- .../provider_api_scripts/provider_data_ingester.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index c20aa5018..953051b7a 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -210,7 +210,11 @@ def get_batch(self, query_params: Dict) -> Tuple[Optional[List], bool]: should_continue = True # Get the API response - response_json = self.get_response_json(query_params) + try: + response_json = self.get_response_json(query_params) + except Exception as e: + logger.error(f"Error getting response due to {e}") + response_json = None # Build a list of records from the response batch = self.get_batch_data(response_json) From 638b1d4ded70dc1712830539a91af086f9da69a7 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 9 Aug 2022 14:55:50 -0700 Subject: [PATCH 11/24] Log errors when they are skipped --- .../providers/provider_api_scripts/provider_data_ingester.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 953051b7a..b630cb9a4 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -151,6 +151,8 @@ def ingest_records(self, **kwargs) -> None: if self.conf.get("skip_ingestion_errors", False): # Add this to the errors list but continue processing self.ingestion_errors.append(ingestion_error) + logger.info(f"Skipping ingestion error: {e}") + query_params = next_query_params continue From dda3d78aba0d2d654d58e54e64d643838708d659 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 9 Aug 2022 16:19:23 -0700 Subject: [PATCH 12/24] Print tracebacks in error sumamry --- .../provider_data_ingester.py | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index b630cb9a4..4ee848ef1 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -1,5 +1,6 @@ import json import logging +import traceback from abc import ABC, abstractmethod from typing import Dict, List, Optional, Tuple @@ -20,8 +21,9 @@ class IngestionError(Exception): were being used when the error was encountered. """ - def __init__(self, error, query_params, next_query_params): + def __init__(self, error, traceback, query_params, next_query_params): self.error = error + self.traceback = traceback self.query_params = query_params self.next_query_params = next_query_params @@ -31,6 +33,11 @@ def __str__(self): query_params: {json.dumps(self.query_params)} next_query_params: {json.dumps(self.next_query_params)}""" + def print_with_traceback(self): + # Append traceback + return f"""{str(self)} + {self.traceback}""" + class ProviderDataIngester(ABC): """ @@ -144,14 +151,17 @@ def ingest_records(self, **kwargs) -> None: logger.info("Batch complete.") should_continue = False - except Exception as e: + except Exception as error: next_query_params = self.get_next_query_params(query_params, **kwargs) - ingestion_error = IngestionError(e, query_params, next_query_params) + + ingestion_error = IngestionError( + error, traceback.format_exc(), query_params, next_query_params + ) if self.conf.get("skip_ingestion_errors", False): # Add this to the errors list but continue processing self.ingestion_errors.append(ingestion_error) - logger.info(f"Skipping ingestion error: {e}") + logger.error(f"Skipping ingestion error: {error}") query_params = next_query_params continue @@ -159,7 +169,7 @@ def ingest_records(self, **kwargs) -> None: # Commit whatever records we were able to process, and rethrow the # exception so the taskrun fails. self.commit_records() - raise e from ingestion_error + raise error from ingestion_error if self.limit and record_count >= self.limit: logger.info(f"Ingestion limit of {self.limit} has been reached.") @@ -173,7 +183,9 @@ def ingest_records(self, **kwargs) -> None: # If errors were caught during processing, raise them now if self.ingestion_errors: - errors_str = ("\n").join(str(e) for e in self.ingestion_errors) + errors_str = ("\n").join( + e.print_with_traceback() for e in self.ingestion_errors + ) raise AirflowException( f"The following errors were encountered during ingestion:\n{errors_str}" ) From 0b3ffc6d49c052052a0fe8b8eba8817bb7fba1e0 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Wed, 10 Aug 2022 16:27:36 -0700 Subject: [PATCH 13/24] Add conf option to provide a list of query_params to use --- .../provider_api_scripts/cleveland_museum.py | 2 +- .../provider_data_ingester.py | 64 ++++++++++++++----- 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/cleveland_museum.py b/openverse_catalog/dags/providers/provider_api_scripts/cleveland_museum.py index 32f640f21..d53831ed7 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/cleveland_museum.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/cleveland_museum.py @@ -14,7 +14,7 @@ class ClevelandDataIngester(ProviderDataIngester): providers = {"image": prov.CLEVELAND_DEFAULT_PROVIDER} endpoint = "http://openaccess-api.clevelandart.org/api/artworks/" - batch_limit = 1000 + batch_limit = 100 delay = 5 def get_next_query_params(self, prev_query_params, **kwargs): diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 4ee848ef1..e7612022d 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -103,9 +103,22 @@ def __init__(self, conf: dict = None, date: str = None): self.media_stores = self.init_media_stores() self.date = date - # Keep track of ingestion errors - self.conf = conf or {} - self.ingestion_errors = [] + # dag_run configuration options + conf = conf or {} + + # Used to skip over errors and continue ingestion. When enabled, errors + # are not reported until ingestion has completed. + self.skip_ingestion_errors = conf.get("skip_ingestion_errors", False) + self.ingestion_errors: List[IngestionError] = [] # Keep track of errors + + # An optional set of initial query params from which to begin ingestion. + self.initial_query_params = conf.get("initial_query_params") + + # An optional list of `query_params`. When provided, ingestion will be run for + # just these sets of params. + self.override_query_params = None + if query_params_list := conf.get("query_params_list"): + self.override_query_params = (qp for qp in query_params_list) def init_media_stores(self) -> dict[str, MediaStore]: """ @@ -131,17 +144,20 @@ def ingest_records(self, **kwargs) -> None: record_count = 0 # Get initial query_params - query_params = self.conf.get("initial_query_params") - if not query_params: - query_params = self.get_next_query_params(None, **kwargs) + if query_params := self.initial_query_params: + logger.info(f"Using initial_query_params from dag_run conf: {query_params}") + else: + query_params = self.get_query_params(None, **kwargs) - logger.info( - f"Begin ingestion for {self.__class__.__name__}" - f" using initial query params: {query_params}" - ) + logger.info(f"Begin ingestion for {self.__class__.__name__}") while should_continue: try: + # Break out of ingestion if no query_params are supplied. This can + # happen when the final `manual_query_params` is processed. + if query_params is None: + break + batch, should_continue = self.get_batch(query_params) if batch and len(batch) > 0: @@ -152,13 +168,13 @@ def ingest_records(self, **kwargs) -> None: should_continue = False except Exception as error: - next_query_params = self.get_next_query_params(query_params, **kwargs) + next_query_params = self.get_query_params(query_params, **kwargs) ingestion_error = IngestionError( error, traceback.format_exc(), query_params, next_query_params ) - if self.conf.get("skip_ingestion_errors", False): + if self.skip_ingestion_errors: # Add this to the errors list but continue processing self.ingestion_errors.append(ingestion_error) logger.error(f"Skipping ingestion error: {error}") @@ -171,13 +187,13 @@ def ingest_records(self, **kwargs) -> None: self.commit_records() raise error from ingestion_error + # Update query params before iterating + query_params = self.get_query_params(query_params, **kwargs) + if self.limit and record_count >= self.limit: logger.info(f"Ingestion limit of {self.limit} has been reached.") should_continue = False - # Update query_params before iterating - query_params = self.get_next_query_params(query_params, **kwargs) - # Commit whatever records we were able to process self.commit_records() @@ -190,6 +206,24 @@ def ingest_records(self, **kwargs) -> None: f"The following errors were encountered during ingestion:\n{errors_str}" ) + def get_query_params( + self, prev_query_params: Optional[Dict], **kwargs + ) -> Optional[Dict]: + """ + Returns the next set of query_params for the next request. If a + `query_params_list` has been provided in the dag_run conf, it will fetch + the next set of query_params from that list. If not, it just passes through + to the class implementation of `get_next_query_params`. + """ + if self.override_query_params: + next_params = next(self.override_query_params, None) + logger.info(f"Using query params from dag_run conf: {next_params}") + return next_params + + # Default behavior when no conf options are provided; build the next + # set of query params, given the previous. + return self.get_next_query_params(prev_query_params, **kwargs) + @abstractmethod def get_next_query_params( self, prev_query_params: Optional[Dict], **kwargs From 116016a3701b3d7542072f67b6bf30b269dd00a1 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Wed, 10 Aug 2022 16:44:19 -0700 Subject: [PATCH 14/24] Revert accidental change to Cleveland script --- .../dags/providers/provider_api_scripts/cleveland_museum.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/cleveland_museum.py b/openverse_catalog/dags/providers/provider_api_scripts/cleveland_museum.py index d53831ed7..32f640f21 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/cleveland_museum.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/cleveland_museum.py @@ -14,7 +14,7 @@ class ClevelandDataIngester(ProviderDataIngester): providers = {"image": prov.CLEVELAND_DEFAULT_PROVIDER} endpoint = "http://openaccess-api.clevelandart.org/api/artworks/" - batch_limit = 100 + batch_limit = 1000 delay = 5 def get_next_query_params(self, prev_query_params, **kwargs): From 85ecedf99860bfe3f27e082d5af4ee1d6a4e2e8d Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Wed, 10 Aug 2022 17:10:08 -0700 Subject: [PATCH 15/24] Test --- .../test_provider_data_ingester.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py index 824980787..7eee67079 100644 --- a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py +++ b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py @@ -237,6 +237,7 @@ def test_ingest_records_uses_initial_query_params_from_dagrun_conf(): {"initial_query_params": {"has_image": 1, "page": 5}} ) + # Mock get_batch to halt ingestion after a single batch with ( patch.object(ingester, "get_batch", return_value=([], False)) as get_batch_mock, ): @@ -246,6 +247,38 @@ def test_ingest_records_uses_initial_query_params_from_dagrun_conf(): get_batch_mock.assert_called_with({"has_image": 1, "page": 5}) +def test_ingest_records_uses_query_params_list_from_dagrun_conf(): + # Initialize the ingester with a conf + ingester = MockProviderDataIngester( + { + "query_params_list": [ + {"has_image": 1, "page": 5}, + {"has_image": 1, "page": 12}, + {"has_image": 1, "page": 142}, + ] + } + ) + + with ( + patch.object( + ingester, "get_batch", return_value=(EXPECTED_BATCH_DATA, True) + ) as get_batch_mock, + patch.object(ingester, "process_batch", return_value=3), + ): + ingester.ingest_records() + + # get_batch is called only three times, for each set of query_params + # in the list, even though `should_continue` is still True + assert get_batch_mock.call_count == 3 + get_batch_mock.assert_has_calls( + [ + call({"has_image": 1, "page": 5}), + call({"has_image": 1, "page": 12}), + call({"has_image": 1, "page": 142}), + ] + ) + + def test_ingest_records_raises_IngestionError(): with (patch.object(ingester, "get_batch") as get_batch_mock,): get_batch_mock.side_effect = [ From 7719b97fa93e288d1bba5c9d08d084c080ff63c9 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Wed, 10 Aug 2022 17:12:50 -0700 Subject: [PATCH 16/24] Remove unnecessary printing of next_query_params --- .../provider_data_ingester.py | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index e7612022d..912161db8 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -21,17 +21,15 @@ class IngestionError(Exception): were being used when the error was encountered. """ - def __init__(self, error, traceback, query_params, next_query_params): + def __init__(self, error, traceback, query_params): self.error = error self.traceback = traceback self.query_params = query_params - self.next_query_params = next_query_params def __str__(self): # Append query_param info to error message return f"""{self.error} - query_params: {json.dumps(self.query_params)} - next_query_params: {json.dumps(self.next_query_params)}""" + query_params: {json.dumps(self.query_params)}""" def print_with_traceback(self): # Append traceback @@ -142,22 +140,18 @@ def ingest_records(self, **kwargs) -> None: """ should_continue = True record_count = 0 - - # Get initial query_params - if query_params := self.initial_query_params: - logger.info(f"Using initial_query_params from dag_run conf: {query_params}") - else: - query_params = self.get_query_params(None, **kwargs) + query_params = None logger.info(f"Begin ingestion for {self.__class__.__name__}") while should_continue: - try: + query_params = self.get_query_params(query_params, **kwargs) + if query_params is None: # Break out of ingestion if no query_params are supplied. This can # happen when the final `manual_query_params` is processed. - if query_params is None: - break + break + try: batch, should_continue = self.get_batch(query_params) if batch and len(batch) > 0: @@ -168,18 +162,14 @@ def ingest_records(self, **kwargs) -> None: should_continue = False except Exception as error: - next_query_params = self.get_query_params(query_params, **kwargs) - ingestion_error = IngestionError( - error, traceback.format_exc(), query_params, next_query_params + error, traceback.format_exc(), query_params ) if self.skip_ingestion_errors: # Add this to the errors list but continue processing self.ingestion_errors.append(ingestion_error) - logger.error(f"Skipping ingestion error: {error}") - - query_params = next_query_params + logger.error(f"Skipping batch due to ingestion error: {error}") continue # Commit whatever records we were able to process, and rethrow the @@ -187,9 +177,6 @@ def ingest_records(self, **kwargs) -> None: self.commit_records() raise error from ingestion_error - # Update query params before iterating - query_params = self.get_query_params(query_params, **kwargs) - if self.limit and record_count >= self.limit: logger.info(f"Ingestion limit of {self.limit} has been reached.") should_continue = False @@ -210,14 +197,25 @@ def get_query_params( self, prev_query_params: Optional[Dict], **kwargs ) -> Optional[Dict]: """ - Returns the next set of query_params for the next request. If a - `query_params_list` has been provided in the dag_run conf, it will fetch - the next set of query_params from that list. If not, it just passes through - to the class implementation of `get_next_query_params`. + Returns the next set of query_params for the next request, handling + optional overrides via the dag_run conf. """ + # If we are getting query_params for the first batch and initial_query_params + # have been set, return them. + if prev_query_params is None and self.initial_query_params: + logger.info( + "Using initial_query_params from dag_runconf:" + f" {self.initial_query_params}" + ) + return self.initial_query_params + + # If a list of query_params was provided, return the next value. if self.override_query_params: next_params = next(self.override_query_params, None) - logger.info(f"Using query params from dag_run conf: {next_params}") + logger.info( + "Using query params from `query_params_list` set in dag_run conf:" + f" {next_params}" + ) return next_params # Default behavior when no conf options are provided; build the next From 0809ffb6a1b16931faacd3e4ff222dd27054412a Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Wed, 10 Aug 2022 17:33:19 -0700 Subject: [PATCH 17/24] Log list of bad query params, for convenience of copying --- .../provider_data_ingester.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 912161db8..38eb2d029 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -24,12 +24,12 @@ class IngestionError(Exception): def __init__(self, error, traceback, query_params): self.error = error self.traceback = traceback - self.query_params = query_params + self.query_params = json.dumps(query_params) def __str__(self): # Append query_param info to error message return f"""{self.error} - query_params: {json.dumps(self.query_params)}""" + query_params: {self.query_params}""" def print_with_traceback(self): # Append traceback @@ -186,7 +186,15 @@ def ingest_records(self, **kwargs) -> None: # If errors were caught during processing, raise them now if self.ingestion_errors: - errors_str = ("\n").join( + # Log the affected query_params + bad_query_params = ", \n".join( + [f"{e.query_params}" for e in self.ingestion_errors] + ) + logger.info( + "The following query_params resulted in errors: \n" + f"{bad_query_params}" + ) + errors_str = "\n".join( e.print_with_traceback() for e in self.ingestion_errors ) raise AirflowException( @@ -204,7 +212,7 @@ def get_query_params( # have been set, return them. if prev_query_params is None and self.initial_query_params: logger.info( - "Using initial_query_params from dag_runconf:" + "Using initial_query_params from dag_run conf:" f" {self.initial_query_params}" ) return self.initial_query_params From 410e8ab98474e8dbc4bfe75bf732ede581beacd1 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 12 Aug 2022 13:49:14 -0700 Subject: [PATCH 18/24] Add exception handling back in after rebase --- .../provider_api_scripts/provider_data_ingester.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 38eb2d029..798b2c7be 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -266,7 +266,13 @@ def get_batch(self, query_params: Dict) -> Tuple[Optional[List], bool]: # Get the API response try: response_json = self.get_response_json(query_params) - except Exception as e: + except ( + RequestException, + RetriesExceeded, + JSONDecodeError, + ValueError, + TypeError, + ) as e: logger.error(f"Error getting response due to {e}") response_json = None From 5cb47ef5ab3bd9871302d4d4ce2329ee555f56fe Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 12 Aug 2022 16:35:19 -0700 Subject: [PATCH 19/24] Update method name, fix comments --- .../provider_api_scripts/provider_data_ingester.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 798b2c7be..af79579a1 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -28,13 +28,11 @@ def __init__(self, error, traceback, query_params): def __str__(self): # Append query_param info to error message - return f"""{self.error} - query_params: {self.query_params}""" + return f"{self.error}\nquery_params: {self.query_params}" - def print_with_traceback(self): + def repr_with_traceback(self): # Append traceback - return f"""{str(self)} - {self.traceback}""" + return f"{str(self)}\n{self.traceback}" class ProviderDataIngester(ABC): @@ -116,6 +114,7 @@ def __init__(self, conf: dict = None, date: str = None): # just these sets of params. self.override_query_params = None if query_params_list := conf.get("query_params_list"): + # Create a generator to facilitate fetching the next set of query_params. self.override_query_params = (qp for qp in query_params_list) def init_media_stores(self) -> dict[str, MediaStore]: @@ -148,7 +147,7 @@ def ingest_records(self, **kwargs) -> None: query_params = self.get_query_params(query_params, **kwargs) if query_params is None: # Break out of ingestion if no query_params are supplied. This can - # happen when the final `manual_query_params` is processed. + # happen when the final `override_query_params` is processed. break try: @@ -195,7 +194,7 @@ def ingest_records(self, **kwargs) -> None: f"{bad_query_params}" ) errors_str = "\n".join( - e.print_with_traceback() for e in self.ingestion_errors + e.repr_with_traceback() for e in self.ingestion_errors ) raise AirflowException( f"The following errors were encountered during ingestion:\n{errors_str}" From 73a7583c36d1b0c8a476ddea1fcd23f4dbe34621 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 12 Aug 2022 16:52:07 -0700 Subject: [PATCH 20/24] Update tests --- .../test_provider_data_ingester.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py index 7eee67079..326f47501 100644 --- a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py +++ b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py @@ -196,13 +196,11 @@ def test_ingest_records_stops_after_reaching_limit(): assert process_batch_mock.call_count == 1 -def test_ingest_records_commits_on_exception(self): +def test_ingest_records_commits_on_exception(): with ( - patch.object(self.ingester, "get_batch") as get_batch_mock, - patch.object( - self.ingester, "process_batch", return_value=3 - ) as process_batch_mock, - patch.object(self.ingester, "commit_records") as commit_mock, + patch.object(ingester, "get_batch") as get_batch_mock, + patch.object(ingester, "process_batch", return_value=3) as process_batch_mock, + patch.object(ingester, "commit_records") as commit_mock, ): get_batch_mock.side_effect = [ (EXPECTED_BATCH_DATA, True), # First batch @@ -212,7 +210,7 @@ def test_ingest_records_commits_on_exception(self): ] with pytest.raises(ValueError, match="Whoops :C"): - self.ingester.ingest_records() + ingester.ingest_records() # Check that get batch was only called thrice assert get_batch_mock.call_count == 3 @@ -286,15 +284,13 @@ def test_ingest_records_raises_IngestionError(): (EXPECTED_BATCH_DATA, True), # Second batch should not be reached ] - with pytest.raises(Exception) as error: + with pytest.raises(Exception, match="Mock exception message"): ingester.ingest_records() # By default, `skip_ingestion_errors` is False and get_batch_data # is no longer called after encountering an error assert get_batch_mock.call_count == 1 - assert str(error.value) == "Mock exception message" - def test_ingest_records_with_skip_ingestion_errors(): ingester = MockProviderDataIngester({"skip_ingestion_errors": True}) From ec082218d27b72950c785f0863e80995134c0a73 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 12 Aug 2022 18:16:15 -0700 Subject: [PATCH 21/24] Never skip AirflowExceptions --- .../provider_data_ingester.py | 23 +++++++- .../test_provider_data_ingester.py | 53 +++++++++++++++---- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index af79579a1..e9dac6012 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -160,6 +160,16 @@ def ingest_records(self, **kwargs) -> None: logger.info("Batch complete.") should_continue = False + except AirflowException as error: + # AirflowExceptions should not be caught, as execution should not + # continue when the task is being stopped by Airflow. + + # If errors have already been caught during processing, raise them + # as well. + if error_summary := self.get_ingestion_errors(): + raise error_summary from error + raise + except Exception as error: ingestion_error = IngestionError( error, traceback.format_exc(), query_params @@ -184,6 +194,16 @@ def ingest_records(self, **kwargs) -> None: self.commit_records() # If errors were caught during processing, raise them now + if error_summary := self.get_ingestion_errors(): + raise error_summary + + def get_ingestion_errors(self) -> AirflowException | None: + """ " + If any errors were skipped during ingestion, returns an AirflowException + with a summary of all errors. + + It there are no errors to report, returns None. + """ if self.ingestion_errors: # Log the affected query_params bad_query_params = ", \n".join( @@ -196,9 +216,10 @@ def ingest_records(self, **kwargs) -> None: errors_str = "\n".join( e.repr_with_traceback() for e in self.ingestion_errors ) - raise AirflowException( + return AirflowException( f"The following errors were encountered during ingestion:\n{errors_str}" ) + return None def get_query_params( self, prev_query_params: Optional[Dict], **kwargs diff --git a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py index 326f47501..696fc066a 100644 --- a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py +++ b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py @@ -292,19 +292,52 @@ def test_ingest_records_raises_IngestionError(): assert get_batch_mock.call_count == 1 -def test_ingest_records_with_skip_ingestion_errors(): +@pytest.mark.parametrize( + "batches, expected_call_count, error_messages", + [ + # Multiple errors are skipped + ( + [ + Exception("Mock exception 1"), + (EXPECTED_BATCH_DATA, True), # First error + Exception("Mock exception 2"), + (EXPECTED_BATCH_DATA, False), # Second error, `should_continue` False + ], + 4, # get_batch is called until `should_continue` is False, ignoring errors + ["Mock exception 1", "Mock exception 2"], + ), + # An AirflowException should not be skipped + ( + [ + (EXPECTED_BATCH_DATA, True), + AirflowException("An Airflow exception"), # Second batch, should raise + (EXPECTED_BATCH_DATA, True), # This batch should not be reached + ], + 2, # The final batch should not be reached + ["An Airflow exception"], + ), + # An AirflowException is raised, but there were already other ingestion errors + ( + [ + Exception("Some other exception"), # First batch, should be skipped + AirflowException("An Airflow exception"), # Second batch, should raise + (EXPECTED_BATCH_DATA, True), # This batch should not be reached + ], + 2, # The final batch should not be reached + ["Some other exception"], # Ingestion errors reported + ), + ], +) +def test_ingest_records_with_skip_ingestion_errors( + batches, expected_call_count, error_messages +): ingester = MockProviderDataIngester({"skip_ingestion_errors": True}) with ( patch.object(ingester, "get_batch") as get_batch_mock, patch.object(ingester, "process_batch", return_value=10), ): - get_batch_mock.side_effect = [ - Exception("Mock exception 1"), # First batch - (EXPECTED_BATCH_DATA, True), # Second batch - Exception("Mock exception 2"), # Third batch - (EXPECTED_BATCH_DATA, False), # Final batch - ] + get_batch_mock.side_effect = batches # ingest_records ultimately raises an exception with pytest.raises(AirflowException) as error: @@ -312,11 +345,11 @@ def test_ingest_records_with_skip_ingestion_errors(): # get_batch was called four times before the exception was thrown, # despite errors being raised - assert get_batch_mock.call_count == 4 + assert get_batch_mock.call_count == expected_call_count # All errors are summarized in the exception thrown at the end - assert "Mock exception 1" in str(error) - assert "Mock exception 2" in str(error) + for error_message in error_messages: + assert error_message in str(error) def test_commit_commits_all_stores(): From 19145783a81c372d0ca05e9361a12cf8f62da1f3 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Mon, 15 Aug 2022 15:47:32 -0700 Subject: [PATCH 22/24] Raise AggregateIngestionError; only log verbose error messages --- .../provider_data_ingester.py | 23 +++++++++++++++---- .../test_provider_data_ingester.py | 19 ++++++++------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index e9dac6012..b81a75ff4 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -15,6 +15,15 @@ logger = logging.getLogger(__name__) +class AggregateIngestionError(Exception): + """ + Custom exception when multiple ingestion errors are skipped and then + raised in aggregate at the end of ingestion. + """ + + pass + + class IngestionError(Exception): """ Custom exception which includes information about the query_params that @@ -197,10 +206,10 @@ def ingest_records(self, **kwargs) -> None: if error_summary := self.get_ingestion_errors(): raise error_summary - def get_ingestion_errors(self) -> AirflowException | None: - """ " - If any errors were skipped during ingestion, returns an AirflowException - with a summary of all errors. + def get_ingestion_errors(self) -> AggregateIngestionError | None: + """ + If any errors were skipped during ingestion, log them as well as the + associated query parameters. Then return an AggregateIngestionError. It there are no errors to report, returns None. """ @@ -216,9 +225,13 @@ def get_ingestion_errors(self) -> AirflowException | None: errors_str = "\n".join( e.repr_with_traceback() for e in self.ingestion_errors ) - return AirflowException( + logger.error( f"The following errors were encountered during ingestion:\n{errors_str}" ) + return AggregateIngestionError( + f"{len(self.ingestion_errors)} errors were skipped during ingestion " + " using the `skip_ingestion_errors` flag. See the log for more details." + ) return None def get_query_params( diff --git a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py index 696fc066a..d59b36078 100644 --- a/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py +++ b/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py @@ -6,6 +6,9 @@ from airflow.exceptions import AirflowException from common.storage.audio import AudioStore, MockAudioStore from common.storage.image import ImageStore, MockImageStore +from providers.provider_api_scripts.provider_data_ingester import ( + AggregateIngestionError, +) from tests.dags.providers.provider_api_scripts.resources.provider_data_ingester.mock_provider_data_ingester import ( AUDIO_PROVIDER, @@ -293,7 +296,7 @@ def test_ingest_records_raises_IngestionError(): @pytest.mark.parametrize( - "batches, expected_call_count, error_messages", + "batches, expected_call_count, expected_error", [ # Multiple errors are skipped ( @@ -304,7 +307,7 @@ def test_ingest_records_raises_IngestionError(): (EXPECTED_BATCH_DATA, False), # Second error, `should_continue` False ], 4, # get_batch is called until `should_continue` is False, ignoring errors - ["Mock exception 1", "Mock exception 2"], + AggregateIngestionError, ), # An AirflowException should not be skipped ( @@ -314,7 +317,7 @@ def test_ingest_records_raises_IngestionError(): (EXPECTED_BATCH_DATA, True), # This batch should not be reached ], 2, # The final batch should not be reached - ["An Airflow exception"], + AirflowException, ), # An AirflowException is raised, but there were already other ingestion errors ( @@ -324,12 +327,12 @@ def test_ingest_records_raises_IngestionError(): (EXPECTED_BATCH_DATA, True), # This batch should not be reached ], 2, # The final batch should not be reached - ["Some other exception"], # Ingestion errors reported + AggregateIngestionError, # Ingestion errors reported ), ], ) def test_ingest_records_with_skip_ingestion_errors( - batches, expected_call_count, error_messages + batches, expected_call_count, expected_error ): ingester = MockProviderDataIngester({"skip_ingestion_errors": True}) @@ -340,17 +343,13 @@ def test_ingest_records_with_skip_ingestion_errors( get_batch_mock.side_effect = batches # ingest_records ultimately raises an exception - with pytest.raises(AirflowException) as error: + with pytest.raises(expected_error): ingester.ingest_records() # get_batch was called four times before the exception was thrown, # despite errors being raised assert get_batch_mock.call_count == expected_call_count - # All errors are summarized in the exception thrown at the end - for error_message in error_messages: - assert error_message in str(error) - def test_commit_commits_all_stores(): with ( From 84367f04cdfa1c11b83d71784543ef178bb168fd Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Mon, 15 Aug 2022 16:03:42 -0700 Subject: [PATCH 23/24] Make it easier to copy-paste query_params from logs --- .../providers/provider_api_scripts/provider_data_ingester.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index b81a75ff4..bcf7bef86 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -246,7 +246,7 @@ def get_query_params( if prev_query_params is None and self.initial_query_params: logger.info( "Using initial_query_params from dag_run conf:" - f" {self.initial_query_params}" + f" {json.dumps(self.initial_query_params)}" ) return self.initial_query_params From b44f2b22a0e6a53fe908b5a26e9f1dced13ab6a6 Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Mon, 15 Aug 2022 17:03:33 -0700 Subject: [PATCH 24/24] Tweak aggregate error message wording --- .../providers/provider_api_scripts/provider_data_ingester.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index bcf7bef86..773b38d6d 100644 --- a/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/openverse_catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -229,8 +229,9 @@ def get_ingestion_errors(self) -> AggregateIngestionError | None: f"The following errors were encountered during ingestion:\n{errors_str}" ) return AggregateIngestionError( - f"{len(self.ingestion_errors)} errors were skipped during ingestion " - " using the `skip_ingestion_errors` flag. See the log for more details." + f"{len(self.ingestion_errors)} query batches were skipped due to " + "errors during ingestion using the `skip_ingestion_errors` flag. " + "See the log for more details." ) return None