Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Add a Nappy provider DAG using ProviderDataIngester #796

Merged
merged 37 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
6cb6d7f
_-prefix methods that should not be overridden
stacimc Oct 10, 2022
a507cd7
Initial template
stacimc Oct 10, 2022
98fef3c
Add initial docs
stacimc Oct 10, 2022
277ecab
Update template, add test template file
stacimc Oct 11, 2022
6e5fd04
Add script to generate template files
stacimc Oct 11, 2022
a208913
Update docs to reference script
stacimc Oct 11, 2022
b073485
Moving more documentation into the code
stacimc Oct 11, 2022
eeaebfd
Reformat docs
stacimc Oct 13, 2022
8f1ad6d
Small tweaks
stacimc Oct 13, 2022
889d61d
Remove unused 'license_url' from nappy and comment out unused test im…
zackkrida Oct 14, 2022
471c266
Remove unused 'license_url' from nappy and comment out unused test im…
zackkrida Oct 14, 2022
8794a0a
Add width, height, and filesize
zackkrida Oct 14, 2022
ff3713c
write small helper fn for filesizes
zackkrida Oct 14, 2022
35fd929
Add UA string header
zackkrida Oct 14, 2022
9815feb
move thumbnail_url to metadata for now
zackkrida Oct 14, 2022
7e4c21d
rename thumbnail_url metadata field to thumbnail
zackkrida Oct 14, 2022
aea5a20
Merge branch 'main' into nappy-provider-dag
rwidom Dec 28, 2022
6d82d3e
add dag start date
rwidom Dec 29, 2022
5f2677d
no header in next params & add thumbnail_url
rwidom Dec 29, 2022
43810a0
add tests and test resources
rwidom Dec 29, 2022
c38d1fe
remove questionable tag from test image
rwidom Dec 29, 2022
b0685b3
update docs
rwidom Dec 30, 2022
896aac1
add popularity metrics to metadata
rwidom Dec 30, 2022
2b9ae7e
Add url to source docs
rwidom Jan 2, 2023
e4d4138
remove template comment from next query params
rwidom Jan 2, 2023
9b89294
remove template comment on optional fields
rwidom Jan 2, 2023
1f01065
remove template comment on get batch
rwidom Jan 2, 2023
09ae681
remove template comment from main
rwidom Jan 2, 2023
46c037e
remove template comment from get_record_data
rwidom Jan 2, 2023
26bff3b
pass batch_limit to the API
rwidom Jan 2, 2023
f160dda
tests for batch limit API parameter
rwidom Jan 2, 2023
751ad04
point to popularity metrics
rwidom Jan 2, 2023
5e02316
template test directory fix
rwidom Jan 2, 2023
3871691
make license info a class variable
rwidom Jan 2, 2023
c24d729
Remove outdated/duplicated template creation files
AetherUnbound Jan 2, 2023
a81d523
Update DAG documentation
AetherUnbound Jan 2, 2023
53eb8f9
fortify and test convert filesize
rwidom Jan 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions openverse_catalog/dags/common/loader/provider_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,27 @@


# Default provider names
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sorted this list alphabetically, but can revert if desired.

FLICKR_DEFAULT_PROVIDER = "flickr"
EUROPEANA_DEFAULT_PROVIDER = "europeana"
WIKIMEDIA_AUDIO_PROVIDER = "wikimedia_audio"
WIKIMEDIA_DEFAULT_PROVIDER = "wikimedia"
SMITHSONIAN_DEFAULT_PROVIDER = "smithsonian"
BROOKLYN_DEFAULT_PROVIDER = "brooklynmuseum"
CLEVELAND_DEFAULT_PROVIDER = "clevelandmuseum"
EUROPEANA_DEFAULT_PROVIDER = "europeana"
FINNISH_DEFAULT_PROVIDER = "finnishmuseums"
FLICKR_DEFAULT_PROVIDER = "flickr"
FREESOUND_DEFAULT_PROVIDER = "freesound"
INATURALIST_DEFAULT_PROVIDER = "inaturalist"
JAMENDO_DEFAULT_PROVIDER = "jamendo"
METROPOLITAN_MUSEUM_DEFAULT_PROVIDER = "met"
VICTORIA_DEFAULT_PROVIDER = "museumsvictoria"
NAPPY_DEFAULT_PROVIDER = "nappy"
NYPL_DEFAULT_PROVIDER = "nypl"
RAWPIXEL_DEFAULT_PROVIDER = "rawpixel"
SCIENCE_DEFAULT_PROVIDER = "sciencemuseum"
SMITHSONIAN_DEFAULT_PROVIDER = "smithsonian"
SMK_DEFAULT_PROVIDER = "smk"
WALTERS_DEFAULT_PROVIDER = "waltersartmuseum"
FINNISH_DEFAULT_PROVIDER = "finnishmuseums"
JAMENDO_DEFAULT_PROVIDER = "jamendo"
STOCKSNAP_DEFAULT_PROVIDER = "stocksnap"
VICTORIA_DEFAULT_PROVIDER = "museumsvictoria"
WALTERS_DEFAULT_PROVIDER = "waltersartmuseum"
WIKIMEDIA_AUDIO_PROVIDER = "wikimedia_audio"
WIKIMEDIA_DEFAULT_PROVIDER = "wikimedia"
WORDPRESS_DEFAULT_PROVIDER = "wordpress"
FREESOUND_DEFAULT_PROVIDER = "freesound"
INATURALIST_DEFAULT_PROVIDER = "inaturalist"

# Finnish parameters
FINNISH_SUB_PROVIDERS = {
Expand Down Expand Up @@ -136,6 +137,7 @@ class ImageCategory(Enum):
"mccordmuseum": ImageCategory.DIGITIZED_ARTWORK.value,
"met": ImageCategory.DIGITIZED_ARTWORK.value,
"museumsvictoria": ImageCategory.DIGITIZED_ARTWORK.value,
"nappy": ImageCategory.PHOTOGRAPH.value,
"phylopic": ImageCategory.ILLUSTRATION.value,
"rijksmuseum": ImageCategory.DIGITIZED_ARTWORK.value,
"sciencemuseum": ImageCategory.PHOTOGRAPH.value,
Expand Down
111 changes: 111 additions & 0 deletions openverse_catalog/dags/providers/provider_api_scripts/nappy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
Content Provider: Nappy

ETL Process: Use the API to identify all CC0-licensed images.

Output: TSV file containing the image meta-data.

Notes: This api was written specially for Openverse.
There are no known limits or restrictions.
rwidom marked this conversation as resolved.
Show resolved Hide resolved

"""
import logging

from common import constants
from common.licenses import get_license_info
from common.loader import provider_details as prov
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester


logger = logging.getLogger(__name__)


class NappyDataIngester(ProviderDataIngester):
providers = {"image": prov.NAPPY_DEFAULT_PROVIDER}
endpoint = "https://api.nappy.co/v1/openverse/images"
# TODO The following are set to their default values. Remove them if the defaults
# are acceptible, or override them.
delay = 1
retries = 3
headers = {"Accept": "application/json"}
zackkrida marked this conversation as resolved.
Show resolved Hide resolved

def get_next_query_params(self, prev_query_params: dict | None, **kwargs) -> dict:
# On the first request, `prev_query_params` will be `None`. We can detect this
# and return our default params.
rwidom marked this conversation as resolved.
Show resolved Hide resolved
if not prev_query_params:
return {
"page": 1,
}
else:
return {
**prev_query_params,
"page": prev_query_params["page"] + 1,
}

def get_batch_data(self, response_json):
# Takes the raw API response from calling `get` on the endpoint, and returns
# the list of records to process.
rwidom marked this conversation as resolved.
Show resolved Hide resolved
if response_json:
return response_json.get("images")
return None

def get_should_continue(self, response_json):
return bool(response_json.get("next_page"))

def get_media_type(self, record: dict):
return constants.IMAGE

def get_record_data(self, data: dict) -> dict | list[dict] | None:
# Parse out the necessary info from the record data into a dictionary.

rwidom marked this conversation as resolved.
Show resolved Hide resolved
if (foreign_landing_url := data.get("foreign_landing_url")) is None:
return None

if (image_url := data.get("url")) is None:
return None

# Hardoded to CC0, the only license Nappy.co uses
license_info = get_license_info(
"https://creativecommons.org/publicdomain/zero/1.0/"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all results are CC0, we should set this as a value on the class or instance and use it there rather than calling this function for every record!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops! Totally, yes.

if license_info is None:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since license_info will never be None, we can remove this:

Suggested change
if license_info is None:
return None


# OPTIONAL FIELDS
# Obtain as many optional fields as possible.
rwidom marked this conversation as resolved.
Show resolved Hide resolved
foreign_identifier = data.get("foreign_identifier")
thumbnail_url = data.get("url") + "?auto=format&w=600&q=75"
filesize = data.get("filesize")
filetype = data.get("filetype")
creator = data.get("creator")
creator_url = data.get("creator_url")
title = data.get("title")
meta_data = data.get("meta_data")
raw_tags = data.get("tags").split(",")

return {
"foreign_landing_url": foreign_landing_url,
"image_url": image_url,
"license_info": license_info,
"foreign_identifier": foreign_identifier,
"thumbnail_url": thumbnail_url,
"filesize": filesize,
"filetype": filetype,
"creator": creator,
"creator_url": creator_url,
"title": title,
"meta_data": meta_data,
"raw_tags": raw_tags,
}


def main():
# Allows running ingestion from the CLI without Airflow running for debugging
# purposes.
rwidom marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Begin: Nappy data ingestion")
ingester = NappyDataIngester()
ingester.ingest_records()


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,20 @@ class ProviderDataIngester(ABC):
@abstractmethod
def providers(self) -> dict[str, str]:
"""
A dictionary whose keys are the supported `media_types`, and values are
the `provider` string in the `media` table of the DB for that type.
A dictionary mapping each supported media type to its corresponding
`provider` string (the string that will populate the `provider` field
in the Catalog DB). These strings should be defined as constants in
common.loader.provider_details.py

By convention, when a provider supports multiple media types we set
separate provider strings for each type. For example:

```
providers = {
"image": provider_details.MYPROVIDER_IMAGE_PROVIDER,
"audio": provider_details.MYPROVIDER_AUDIO_PROVIDER,
}
```
"""
pass

Expand Down Expand Up @@ -105,7 +117,7 @@ def __init__(self, conf: dict = None, date: str = None):
self.delayed_requester = DelayedRequester(
delay=self.delay, headers=self.headers
)
self.media_stores = self.init_media_stores()
self.media_stores = self._init_media_stores()
self.date = date

# dag_run configuration options
Expand All @@ -126,7 +138,7 @@ def __init__(self, conf: dict = None, date: str = None):
# Create a generator to facilitate fetching the next set of query_params.
self.override_query_params = (qp for qp in query_params_list)

def init_media_stores(self) -> dict[str, MediaStore]:
def _init_media_stores(self) -> dict[str, MediaStore]:
"""
Initialize a media store for each media type supported by this
provider.
Expand All @@ -153,7 +165,7 @@ def ingest_records(self, **kwargs) -> None:
logger.info(f"Begin ingestion for {self.__class__.__name__}")

while should_continue:
query_params = self.get_query_params(query_params, **kwargs)
query_params = self._get_query_params(query_params, **kwargs)
if query_params is None:
# Break out of ingestion if no query_params are supplied. This can
# happen when the final `override_query_params` is processed.
Expand All @@ -175,7 +187,7 @@ def ingest_records(self, **kwargs) -> None:

# If errors have already been caught during processing, raise them
# as well.
if error_summary := self.get_ingestion_errors():
if error_summary := self._get_ingestion_errors():
raise error_summary from error
raise

Expand All @@ -192,21 +204,21 @@ def ingest_records(self, **kwargs) -> None:

# Commit whatever records we were able to process, and rethrow the
# exception so the taskrun fails.
self.commit_records()
self._commit_records()
raise error from ingestion_error

if self.limit and record_count >= self.limit:
logger.info(f"Ingestion limit of {self.limit} has been reached.")
should_continue = False

# Commit whatever records we were able to process
self.commit_records()
self._commit_records()

# If errors were caught during processing, raise them now
if error_summary := self.get_ingestion_errors():
if error_summary := self._get_ingestion_errors():
raise error_summary

def get_ingestion_errors(self) -> AggregateIngestionError | None:
def _get_ingestion_errors(self) -> AggregateIngestionError | None:
"""
If any errors were skipped during ingestion, log them as well as the
associated query parameters. Then return an AggregateIngestionError.
Expand Down Expand Up @@ -235,10 +247,13 @@ def get_ingestion_errors(self) -> AggregateIngestionError | None:
)
return None

def get_query_params(self, prev_query_params: dict | None, **kwargs) -> dict | None:
def _get_query_params(
self, prev_query_params: dict | None, **kwargs
) -> dict | None:
"""
Returns the next set of query_params for the next request, handling
optional overrides via the dag_run conf.
optional overrides via the dag_run conf. This method should not be overridden;
instead override get_next_query_params.
"""
# If we are getting query_params for the first batch and initial_query_params
# have been set, return them.
Expand Down Expand Up @@ -391,7 +406,7 @@ def get_record_data(self, data: dict) -> dict | list[dict] | None:
"""
pass

def commit_records(self) -> int:
def _commit_records(self) -> int:
total = 0
for store in self.media_stores.values():
total += store.commit()
Expand Down
5 changes: 5 additions & 0 deletions openverse_catalog/dags/providers/provider_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from providers.provider_api_scripts.inaturalist import INaturalistDataIngester
from providers.provider_api_scripts.metropolitan_museum import MetMuseumDataIngester
from providers.provider_api_scripts.museum_victoria import VictoriaDataIngester
from providers.provider_api_scripts.nappy import NappyDataIngester
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester
from providers.provider_api_scripts.science_museum import ScienceMuseumDataIngester
from providers.provider_api_scripts.smk import SmkDataIngester
Expand Down Expand Up @@ -168,6 +169,10 @@ def __post_init__(self):
ingestion_callable=VictoriaDataIngester,
start_date=datetime(2020, 1, 1),
),
ProviderWorkflow(
provider_script="nappy",
ingestion_callable=NappyDataIngester,
),
ProviderWorkflow(
provider_script="nypl",
start_date=datetime(2020, 1, 1),
Expand Down
Loading