Skip to content

Commit

Permalink
WIP: Targeted ingestion provider workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
sarayourfriend committed Aug 21, 2024
1 parent f944960 commit 59d2e4c
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 0 deletions.
16 changes: 16 additions & 0 deletions catalog/dags/providers/provider_api_scripts/flickr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
from common.licenses import LicenseInfo, get_license_info
from common.loader import provider_details as prov
from common.loader.provider_details import ImageCategory
from providers.provider_api_scripts.targeted_records_provider_data_ingester import (
TargetedRecordsProviderDataIngesterMixin,
)
from providers.provider_api_scripts.time_delineated_provider_data_ingester import (
TimeDelineatedProviderDataIngester,
)
Expand All @@ -47,6 +50,7 @@ class FlickrDataIngester(TimeDelineatedProviderDataIngester):

providers = {"image": provider_string}
endpoint = "https://api.flickr.com/services/rest"
single_record_endpoint = endpoint
batch_limit = 500
retries = 5

Expand Down Expand Up @@ -347,6 +351,18 @@ def _get_category(image_data):
return None


class TargetedFlickrDataIngester(
TargetedRecordsProviderDataIngesterMixin, FlickrDataIngester
):
def get_query_params(self, foreign_identifier: str) -> dict:
return {
"method": "flickr.photos.search",
"format": "json",
"api_key": self.api_key,
"photo_id": foreign_identifier,
}


def main(date):
ingester = FlickrDataIngester(date=date)
ingester.ingest_records()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester


class TargetedRecordsProviderDataIngesterMixin:
"""
A mixin used with ProviderDataIngesters to create subclasses for ingesting targeted records.
Records are identified to the `ingest_records` entrypoint by their foreign identifiers.
This class leverages existing functionality in ProviderDataIngester implementations
like the batched processing methods. It mimics batched processing by wrapping the
responses for each retrieved record in a list. In other words, each individual record
is treated as its own "batch", which would have otherwise corresponded to a single
page of the regular ingestion workflow for the provider. As such, if the provider returns
the same data format for individual records as for those records in search, it is not necessary
to override the data processing methods for individual records that will already be present
in ProviderDataIngester implementations.
At request time, URLs and query parameters unique to each foreign identifier should
be created by implementing overrides to `get_endpoint` and `get_query_params`, each
of which take the foreign identifier as an argument. For more complex scenarios,
you may override `get_record` instead for.
"""

def __init__(self: ProviderDataIngester, conf: dict, *args, **kwargs):
self.foreign_identifiers = conf["foreign_identifiers"]
super().__init__(conf, *args, **kwargs)

def ingest_records(self: ProviderDataIngester) -> None:
logger.info(f"Begin ingestion of single records for {self.__class__.__name__}")

for foreign_identifier in self.foreign_identifiers:
single_record = self.get_record(foreign_identifier)
self.record_count += self.process_batch([single_record])
logger.info(f"{self.record_count} records ingested so far.")

if self.limit and self.record_count >= self.limit:
logger.info(f"Ingestion limit of {self.limit} has been reached.")
break

self._commit_records()

def get_query_params(self: ProviderDataIngester, foreign_identifier: str) -> dict:
"""Build the query params to request the data for the given foreign identifier"""
return self.additional_query_params

def get_endpoint(self: ProviderDataIngester, foreign_identifier: str) -> str:
"""Build the endpoint to request the data for the given foreign identifier"""
return self.endpoint

def get_record(self: ProviderDataIngester, foreign_identifier: str) -> dict | None:
"""
Retrieve an individual record's data from the provider.
Analogous to `ProviderDataIngester::get_batch`.
"""
logger.info("Getting single record %s", foreign_identifier)
endpoint = self.get_endpoint(foreign_identifier)
query_params = self.get_query_params(foreign_identifier)
return self.get_response_json(query_params, endpoint)
88 changes: 88 additions & 0 deletions catalog/dags/providers/provider_dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@
from common.loader import loader, reporting, s3, sql
from providers.factory_utils import date_partition_for_prefix, pull_media_wrapper
from providers.provider_reingestion_workflows import ProviderReingestionWorkflow
from providers.provider_targeted_ingestion_workflows import (
ProviderTargetedIngestionWorkflow,
)
from providers.provider_workflows import (
ProviderWorkflow,
TaskOverride,
Expand Down Expand Up @@ -702,3 +705,88 @@ def create_day_partitioned_reingestion_dag(
_apply_configuration_overrides(dag, provider_conf.overrides)

return dag


def create_targeted_ingestion_dag(provider_conf: ProviderTargetedIngestionWorkflow):
"""
Instantiate a targeted ingestion DAG.
"""
default_args = {**DAG_DEFAULT_ARGS, **(provider_conf.default_args or {})}

dag = DAG(
dag_id=provider_conf.dag_id,
default_args={**default_args, "start_date": provider_conf.start_date},
max_active_tasks=provider_conf.max_active_tasks,
max_active_runs=provider_conf.max_active_runs,
start_date=provider_conf.start_date,
schedule=None,
doc_md=provider_conf.doc_md,
tags=[
"provider",
*[f"provider: {media_type}" for media_type in provider_conf.media_types],
"ingestion: targeted",
*provider_conf.tags,
],
render_template_as_native_obj=True,
params={
"foreign_identifiers": Param(
type=["array"],
items={"type": "string"},
description=(
"The list of targeted foreign identifiers to ingest from the provider."
),
),
"additional_query_params": Param(
default={},
type=["object", "null"],
description=(
"Supplement the `query_params` on each request. This option is used"
" to run a DAG but restrict the search by specifying extra query"
" params."
),
),
"skip_ingestion_errors": Param(
default=False,
type="boolean",
description=(
"Whether to skip over all errors encountered during ingestion,"
" continuing to the next batch and reporting errors in aggregate"
" when ingestion is complete. This option should be used sparingly."
),
),
"sql_rm_source_data_after_ingesting": Param(
default=False,
type="boolean",
description=(
"Whether to delete source data from airflow and DB once ingestion"
" is complete. This is used to support data quality testing in"
" SQL-only DAGs like iNaturalist."
),
),
},
)

with dag:
if callable(
getattr(provider_conf.ingester_class, "create_ingestion_workflow", None)
):
(
ingest_data,
ingestion_metrics,
) = provider_conf.ingester_class.create_ingestion_workflow()
else:
ingest_data, ingestion_metrics = create_ingestion_workflow(provider_conf)

report_load_completion = create_report_load_completion(
provider_conf.dag_id,
provider_conf.media_types,
ingestion_metrics,
provider_conf.dated,
)

ingest_data >> report_load_completion

# Apply any overrides from the DAG configuration
_apply_configuration_overrides(dag, provider_conf.overrides)

return dag
17 changes: 17 additions & 0 deletions catalog/dags/providers/provider_targeted_ingestion_dag_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""
# Provider Targeted Ingestion Workflow DAG Factory
This file iterates over the configurations defined in PROVIDER_TARGETED_INGESTION_WORKFLOWS
and generates a targeted ingestion workflow DAG for each.
"""

from providers.provider_dag_factory import create_targeted_ingestion_dag
from providers.provider_targeted_ingestion_workflows import (
PROVIDER_TARGETED_INGESTION_WORKFLOWS,
)


for provider_workflow in PROVIDER_TARGETED_INGESTION_WORKFLOWS:
globals()[provider_workflow.dag_id] = create_targeted_ingestion_dag(
provider_workflow
)
42 changes: 42 additions & 0 deletions catalog/dags/providers/provider_targeted_ingestion_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from dataclasses import dataclass

from providers.provider_api_scripts.flickr import FlickrDataIngester
from providers.provider_workflows import ProviderWorkflow


@dataclass
class ProviderTargetedIngestionWorkflow(ProviderWorkflow):
"""
Provider targeted ingestion workflow configurations.
Extends the ProviderWorkflow with configuration options used to set up
a targeted ingestion workflow DAG.
"""

max_foreign_identifiers: int = 0
"""
The maximum foreign identifiers to allow for a single run of the DAG.
This should limit the DAG run to less than the setting for ``pull_timeout``.
"""

schedule_string: None = None
"""
Targeted reingestion cannot happen on a schedule and must be triggered
by another process.
"""

def __post_init__(self):
if not self.dag_id:
_, provider_name = self._get_module_info()
self.dag_id = f"{provider_name}_targeted_ingestion_workflow"

super().__post_init__()


PROVIDER_TARGETED_INGESTION_WORKFLOWS = [
ProviderTargetedIngestionWorkflow(
ingester_class=FlickrDataIngester,
max_foreign_identifiers=2000,
),
]

0 comments on commit 59d2e4c

Please sign in to comment.