-
Notifications
You must be signed in to change notification settings - Fork 54
Refactor Metropolitan Museum of Art to use ProviderDataIngester #674
Changes from 25 commits
c7e1903
eca69f5
f1988ce
62a02ff
0de8ae7
db7a861
c42c8bc
75aaf10
16d51e0
ae99654
6ee7f87
f431932
67f6673
8ac2941
b54f605
bc55683
5cca07b
bb98bdd
57ec40e
1c2e0f5
e441b79
e78b6d9
8bec026
248b1a1
90d3727
3658dc4
1dcc93d
a1670e5
3f7765e
17d0aff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -6,138 +6,167 @@ | |||||
Output: TSV file containing the image, their respective | ||||||
meta-data. | ||||||
|
||||||
Notes: https://metmuseum.github.io/ | ||||||
No rate limit specified. | ||||||
Notes: https://metmuseum.github.io/#search | ||||||
"Please limit requests to 80 requests per second." May need to | ||||||
bump up the delay (e.g. to 3 seconds), to avoid of blocking | ||||||
during local development testing. | ||||||
|
||||||
Some analysis to improve data quality was conducted using a | ||||||
separate csv file here: https://github.com/metmuseum/openaccess | ||||||
|
||||||
Get a list of object IDs: | ||||||
https://collectionapi.metmuseum.org/public/collection/v1/objects?metadataDate=2022-08-10 | ||||||
Get a specific object: | ||||||
https://collectionapi.metmuseum.org/public/collection/v1/objects/1027 | ||||||
The search functionality requires a specific query (term search) | ||||||
in addition to date and public domain. It seems like it won't | ||||||
connect with just date and license. | ||||||
https://collectionapi.metmuseum.org/public/collection/v1/search?isPublicDomain=true&metadataDate=2022-08-07 | ||||||
|
||||||
""" | ||||||
|
||||||
import argparse | ||||||
import logging | ||||||
|
||||||
from common.licenses import get_license_info | ||||||
from common.requester import DelayedRequester | ||||||
from common.storage.image import ImageStore | ||||||
|
||||||
from common.loader import provider_details as prov | ||||||
from provider_data_ingester import ProviderDataIngester | ||||||
|
||||||
DELAY = 1.0 # time delay (in seconds) | ||||||
PROVIDER = "met" | ||||||
ENDPOINT = "https://collectionapi.metmuseum.org/public/collection/v1/objects" | ||||||
DEFAULT_LICENSE_INFO = get_license_info(license_="cc0", license_version="1.0") | ||||||
|
||||||
logging.basicConfig( | ||||||
format="%(asctime)s - %(name)s - %(levelname)s: %(message)s", level=logging.INFO | ||||||
) | ||||||
logger = logging.getLogger(__name__) | ||||||
|
||||||
delayed_requester = DelayedRequester(DELAY) | ||||||
image_store = ImageStore(provider=PROVIDER) | ||||||
|
||||||
|
||||||
def main(date=None): | ||||||
""" | ||||||
This script pulls the data for a given date from the Metropolitan | ||||||
Museum of Art API, and writes it into a .TSV file to be eventually | ||||||
read into our DB. | ||||||
|
||||||
Required Arguments: | ||||||
|
||||||
date: Date String in the form YYYY-MM-DD. This is the date for | ||||||
which running the script will pull data. | ||||||
""" | ||||||
|
||||||
logger.info(f"Begin: Met Museum API requests for date: {date}") | ||||||
|
||||||
fetch_the_object_id = _get_object_ids(date) | ||||||
if fetch_the_object_id: | ||||||
logger.info(f"Total object found {fetch_the_object_id[0]}") | ||||||
_extract_the_data(fetch_the_object_id[1]) | ||||||
|
||||||
total_images = image_store.commit() | ||||||
logger.info(f"Total CC0 images received {total_images}") | ||||||
|
||||||
|
||||||
def _get_object_ids(date, endpoint=ENDPOINT): | ||||||
query_params = "" | ||||||
if date: | ||||||
query_params = {"metadataDate": date} | ||||||
|
||||||
response = _get_response_json(query_params, endpoint) | ||||||
|
||||||
if response: | ||||||
total_object_ids = response["total"] | ||||||
object_ids = response["objectIDs"] | ||||||
else: | ||||||
logger.warning("No content available") | ||||||
return None | ||||||
return [total_object_ids, object_ids] | ||||||
|
||||||
|
||||||
def _get_response_json( | ||||||
query_params, | ||||||
endpoint, | ||||||
retries=5, | ||||||
): | ||||||
response_json = delayed_requester.get_response_json( | ||||||
endpoint, query_params=query_params, retries=retries | ||||||
) | ||||||
|
||||||
return response_json | ||||||
|
||||||
|
||||||
def _extract_the_data(object_ids): | ||||||
for i in object_ids: | ||||||
_get_data_for_image(i) | ||||||
|
||||||
|
||||||
def _get_data_for_image(object_id): | ||||||
object_json = _get_and_validate_object_json(object_id) | ||||||
if not object_json: | ||||||
logger.warning(f"Could not retrieve object_json for object_id: {object_id}") | ||||||
return | ||||||
|
||||||
main_image = object_json.get("primaryImage") | ||||||
other_images = object_json.get("additionalImages", []) | ||||||
image_list = [main_image] + other_images | ||||||
|
||||||
meta_data = _create_meta_data(object_json) | ||||||
|
||||||
for img in image_list: | ||||||
foreign_id = _build_foreign_id(object_id, img) | ||||||
image_store.add_item( | ||||||
foreign_landing_url=object_json.get("objectURL"), | ||||||
image_url=img, | ||||||
license_info=DEFAULT_LICENSE_INFO, | ||||||
foreign_identifier=foreign_id, | ||||||
creator=object_json.get("artistDisplayName"), | ||||||
title=object_json.get("title"), | ||||||
meta_data=meta_data, | ||||||
class MetMuseumDataIngester(ProviderDataIngester): | ||||||
providers = {"image": prov.METROPOLITAN_MUSEUM_DEFAULT_PROVIDER} | ||||||
endpoint = "https://collectionapi.metmuseum.org/public/collection/v1/objects" | ||||||
DEFAULT_LICENSE_INFO = get_license_info(license_="cc0", license_version="1.0") | ||||||
|
||||||
# adding args for automatically generated parameters from generate_tsv_filenames | ||||||
def __init__(self, conf: dict = None, date: str = None): | ||||||
super(MetMuseumDataIngester, self).__init__(conf=conf, date=date) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW in python 3 this can just be:
Suggested change
We don't need to specify the parent class unless the Module Resolution Order needs to be specifically navigated:
Footnotes |
||||||
self.retries = 5 | ||||||
|
||||||
self.query_param = None | ||||||
if date: | ||||||
self.query_param = {"metadataDate": date} | ||||||
|
||||||
# this seems like useful information to track for context on the existing load | ||||||
# metrics, but just adding them to the log in aggregate for now rather than | ||||||
# logging each record individually or doing something fancier in airflow. | ||||||
Comment on lines
+58
to
+60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💯 |
||||||
self.object_ids_retrieved = 0 # total object IDs based on date | ||||||
self.non_cc0_objects = 0 # number checked and ignored because of licensing | ||||||
|
||||||
def get_next_query_params(self, prev_query_params=None): | ||||||
return self.query_param | ||||||
|
||||||
def get_batch_data(self, response_json): | ||||||
if response_json: | ||||||
self.object_ids_retrieved = response_json["total"] | ||||||
logger.info(f"Total objects found {self.object_ids_retrieved}") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't incremented, so this will always be the number of ids retrieved in that batch, is that right? Would it make more sense to add to the total each time, or else change the wording of the log message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The met museum returns a json with two things: a list of object ids, and the length of that list. The additional API calls are just to get the details about any given object ID, not to get another list of object IDs. So, I think it's ok to just take this from the source once for each dag run. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But this is reporting once for every batch, indicating the number of ids in that batch as opposed to the total number ingested by the run. I see it multiple times in the logs for a particular dagrun. Is that the intention, or am I wrong about what that number means? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh... Maybe I'm confused about what a batch means and how it's functioning here. My sense is that there should only be one batch per dag-run. I'll take another look. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hopefully this enforces the one batch per dagrun thing! 3f7765e |
||||||
return response_json["objectIDs"] | ||||||
else: | ||||||
logger.warning("No content available") | ||||||
return None | ||||||
|
||||||
def get_record_data(self, object_id): | ||||||
object_endpoint = f"{self.endpoint}/{object_id}" | ||||||
object_json = self.delayed_requester.get_response_json( | ||||||
object_endpoint, self.retries | ||||||
) | ||||||
|
||||||
|
||||||
def _get_and_validate_object_json(object_id, endpoint=ENDPOINT): | ||||||
object_endpoint = f"{endpoint}/{object_id}" | ||||||
object_json = _get_response_json(None, object_endpoint) | ||||||
if not object_json.get("isPublicDomain"): | ||||||
logger.warning("CC0 license not detected") | ||||||
object_json = None | ||||||
return object_json | ||||||
|
||||||
|
||||||
def _build_foreign_id(object_id, image_url): | ||||||
unique_identifier = image_url.split("/")[-1].split(".")[0] | ||||||
return f"{object_id}-{unique_identifier}" | ||||||
|
||||||
|
||||||
def _create_meta_data(object_json): | ||||||
meta_data = { | ||||||
"accession_number": object_json.get("accessionNumber"), | ||||||
"classification": object_json.get("classification"), | ||||||
"culture": object_json.get("culture"), | ||||||
"date": object_json.get("objectDate"), | ||||||
"medium": object_json.get("medium"), | ||||||
"credit_line": object_json.get("creditLine"), | ||||||
} | ||||||
meta_data = {k: v for k, v in meta_data.items() if v is not None} | ||||||
return meta_data | ||||||
if object_json.get("isPublicDomain") is False: | ||||||
self.non_cc0_objects += 1 | ||||||
if self.non_cc0_objects % self.batch_limit == 0: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very cool :) What do you think about also reporting the final number at the end of ingestion? |
||||||
logger.info(f"Retrieved {self.non_cc0_objects} non-CC0 records.") | ||||||
return None | ||||||
|
||||||
main_image = object_json.get("primaryImage") | ||||||
other_images = object_json.get("additionalImages", []) | ||||||
image_list = [main_image] + other_images | ||||||
|
||||||
meta_data = self._get_meta_data(object_json) | ||||||
raw_tags = self._get_tag_list(object_json) | ||||||
title = self._get_title(object_json) | ||||||
artist = self._get_artist_name(object_json) | ||||||
|
||||||
# We aren't currently populating creator_url. In theory we could url encode | ||||||
# f"https://collectionapi.metmuseum.org/public/collection/v1/search?artistOrCulture={artist}" | ||||||
# per API guide here: https://metmuseum.github.io/#search | ||||||
# but it seems fairly buggy (i.e. nonresponsive), at least when tested with | ||||||
# "Chelsea Porcelain Manufactory" and "Minton(s)" and "Jean Pucelle" | ||||||
# Should we use artistWikidata_URL or artistULAN_URL? They're populated approx | ||||||
# 65% of the time. | ||||||
|
||||||
return [ | ||||||
{ | ||||||
"foreign_landing_url": object_json.get("objectURL"), | ||||||
"image_url": img, | ||||||
"license_info": self.DEFAULT_LICENSE_INFO, | ||||||
"foreign_identifier": self._get_foreign_id(object_id, img), | ||||||
"creator": artist, | ||||||
"title": title, | ||||||
"meta_data": meta_data, | ||||||
"raw_tags": raw_tags, | ||||||
} | ||||||
for img in image_list | ||||||
] | ||||||
|
||||||
def _get_foreign_id(self, object_id: int, image_url: str): | ||||||
unique_identifier = image_url.split("/")[-1].split(".")[0] | ||||||
return f"{object_id}-{unique_identifier}" | ||||||
|
||||||
def _get_meta_data(self, object_json): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, but this and other methods that don't access There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Huh, what are the benefits of making that declaration? The main one I'm familiar with is being able to call them from outside a specific instance of the class. Would that make testing more efficient in this case? Are there other benefits I'm not thinking of? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's usually a good practice to make functions that you know won't need to access or modify the class's state via |
||||||
if object_json is None: | ||||||
return | ||||||
if object_json.get("accessionNumber"): | ||||||
return { | ||||||
"accession_number": object_json.get("accessionNumber"), | ||||||
} | ||||||
|
||||||
def _get_tag_list(self, object_json): | ||||||
if object_json is None: | ||||||
return | ||||||
tag_list = [ | ||||||
tag | ||||||
for tag in [ | ||||||
object_json.get("department"), | ||||||
object_json.get("medium"), | ||||||
object_json.get("culture"), | ||||||
object_json.get("objectName"), | ||||||
self._get_artist_name(object_json), | ||||||
object_json.get("classification"), | ||||||
object_json.get("objectDate"), | ||||||
object_json.get("creditLine"), | ||||||
object_json.get("period"), | ||||||
] | ||||||
if tag | ||||||
] | ||||||
if object_json.get("tags"): | ||||||
tag_list += [tag["term"] for tag in object_json.get("tags")] | ||||||
return tag_list | ||||||
stacimc marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
def _get_title(self, record): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||||||
if record is not None: | ||||||
# Use or to skip false-y (empty) titles: "" | ||||||
return record.get("title") or record.get("objectName") | ||||||
|
||||||
def _get_artist_name(self, record): | ||||||
stacimc marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
if record is None: | ||||||
return | ||||||
return record.get("artistDisplayName") | ||||||
|
||||||
def get_media_type(self, record): | ||||||
# This provider only supports Images. | ||||||
return "image" | ||||||
|
||||||
|
||||||
def main(date: str): | ||||||
logger.info("Begin: Metropolitan Museum data ingestion") | ||||||
ingester = MetMuseumDataIngester(date) | ||||||
ingester.ingest_records() | ||||||
|
||||||
|
||||||
if __name__ == "__main__": | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for always adding excellent notes ✨