Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Add iNaturalist.org metadata (#549)
Browse files Browse the repository at this point in the history
* templated changes from existing script/readme

* first stab at transformation using pandas

* add capability to load .gz files from s3

* load data from S3 and store transformation logic

* clean up old file structure and files

* add test source files

* cleanup file structure

* get sample data into minio container

* add sql transformation steps

* hide minio testing side effects from git

* add python wrappers for airflow and testing

* fyi on the source of the sample data

* Minor tweaks to get this working locally

* Update .gitignore

Use exclude so only need to list the one provider subfolder

Co-authored-by: Madison Swain-Bowden <[email protected]>

* Linting

* add separate s3 load step to dev docker setup

* airflow loads s3 & calls data ingestor to save tsv

* make bucket loading piece readable

* remove unnecessary gitignore change

* put back basic minio volume

* fix count typo in testing comment

* linting sql files from sql fluff

* fixing linting capitalization error

* strengthen file references

Co-authored-by: Madison Swain-Bowden <[email protected]>

* move pg hook to init to reduce overhead

* Aggregate dependencies into reusable just recipe, add load_to_s3

* Add check file exists on S3 to DAG

* fix ingester class initialization override

* stop overwriting pg temp files

* fix typo in order of callables

* dag runs with python pagination, though tests fail

* sql for python pagination

* create and manage larger test data files

* performant SQL to export JSON

* landing url not langing url

* temp file name with timestamp for postgres

* inaturalist provider ingester + testing

* removed likely to error command

* committing workflow for review, not complete

* use batch_limit

* complete the workflow cycle

* make preingestion factory logic more concise

* check s3 file update date to run dag daily

* testing with resource files instead of minio

* more concise batch data testing for none

* monthly run, checking if files have been updated

* linting

* pass tags as a list

* list reformat linting

* more readable bash command

Co-authored-by: Madison Swain-Bowden <[email protected]>

* inaturalist --> iNaturalist in class name

* old_query_params --> prev_query_params

* improved description for sample data file manager

Co-authored-by: Madison Swain-Bowden <[email protected]>

* parametrize tests for missing data

Co-authored-by: Madison Swain-Bowden <[email protected]>

* more concise / non-redundant get usage

Co-authored-by: Olga Bulat <[email protected]>

* streamline checking for new data

* remove unnecessary logging and imports

* iNaturalistDataIngester -> INaturalistDataIngester

* don't need to make foreign ID a string up front

* clarify comment on observer FK

Co-authored-by: sarayourfriend <[email protected]>

* more readable use of sql template

* streamline transformations in postgres

* add test case with no ancestry info / tags

* linting test db response

* remove env check because file won't be in prod

* Add SQL tests

* remove universal tags

* adjusted db response without "life" tag

* lint db response

* remove life from hard coded test results

* drop source tables and schema after ingestion

* nicer feedback on initial load test with pytest

* try adding default image category

* Print logs on failure

* Revert "Print logs on failure"

This reverts commit 9eed1f9.

* post-ingestion task comment

* clarify and simplify test

* simplify sql names and remove dedent

* clarify duplicate photo id status

* lint fix

* documentation for pre- and post-ingestion task params

* Update DAGs.md

* add issue references to comments

* fix comment typo

Co-authored-by: Olga Bulat <[email protected]>

Co-authored-by: Madison Swain-Bowden <[email protected]>
Co-authored-by: Olga Bulat <[email protected]>
Co-authored-by: sarayourfriend <[email protected]>
  • Loading branch information
4 people authored Aug 19, 2022
1 parent 2d41485 commit ee474f2
Show file tree
Hide file tree
Showing 22 changed files with 854 additions and 6 deletions.
26 changes: 26 additions & 0 deletions DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The following are DAGs grouped by their primary tag:
| `finnish_museums_workflow` | `@monthly` | `False` | image |
| [`flickr_workflow`](#flickr_workflow) | `@daily` | `True` | image |
| [`freesound_workflow`](#freesound_workflow) | `@monthly` | `False` | audio |
| [`inaturalist_workflow`](#inaturalist_workflow) | `@monthly` | `False` | image |
| [`jamendo_workflow`](#jamendo_workflow) | `@monthly` | `False` | audio |
| [`metropolitan_museum_workflow`](#metropolitan_museum_workflow) | `@daily` | `True` | image |
| `museum_victoria_workflow` | `@monthly` | `False` | image |
Expand Down Expand Up @@ -117,6 +118,7 @@ The following is documentation associated with each DAG (where available):
1. [`flickr_workflow`](#flickr_workflow)
1. [`freesound_workflow`](#freesound_workflow)
1. [`image_data_refresh`](#image_data_refresh)
1. [`inaturalist_workflow`](#inaturalist_workflow)
1. [`jamendo_workflow`](#jamendo_workflow)
1. [`metropolitan_museum_workflow`](#metropolitan_museum_workflow)
1. [`oauth2_authorization`](#oauth2_authorization)
Expand Down Expand Up @@ -292,6 +294,30 @@ https://github.com/WordPress/openverse-catalog/issues/353)
https://github.com/WordPress/openverse-catalog/issues/453)


## `inaturalist_workflow`


Provider: iNaturalist

Output: TSV file containing the media metadata.

Notes: [The iNaturalist API is not intended for data scraping.]
(https://api.inaturalist.org/v1/docs/)

[But there is a full dump intended for sharing on S3.]
(https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata)

Because these are very large normalized tables, as opposed to more document
oriented API responses, we found that bringing the data into postgres first
was the most effective approach. [More detail in slack here.]
(https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N)

We use the table structure defined [here,]
(https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql)
except for adding ancestry tags to the taxa table.



## `jamendo_workflow`


Expand Down
32 changes: 31 additions & 1 deletion docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ services:
MINIO_ROOT_PASSWORD: ${AWS_SECRET_KEY}
# Comma separated list of buckets to create on startup
BUCKETS_TO_CREATE: ${OPENVERSE_BUCKET},openverse-airflow-logs,commonsmapper-v2,commonsmapper
# Create the buckets on every container startup
# Create empty buckets on every container startup
# Note: $0 is included in the exec because "/bin/bash -c" swallows the first
# argument, so it must be re-added at the beginning of the exec call
entrypoint: >-
Expand All @@ -46,6 +46,36 @@ services:
timeout: 20s
retries: 3

load_to_s3:
image: minio/mc:latest
env_file:
- .env
depends_on:
- s3
volumes:
# Buckets for testing provider data imported from s3 are subdirectories under
# /tests/s3-data/
- ./tests/s3-data:/data:rw
# Loop through subdirectories mounted to the volume and load them to s3/minio.
# This takes care of filesystem delays on some local dev environments that may make
# minio miss files included directly in the minio volume.
# More info here: https://stackoverflow.com/questions/72867045
# This does *not* allow for testing permissions issues that may come up in real AWS.
# And, if you remove files from /tests/s3-data, you will need to use `just down -v`
# and `just up` or `just recreate` to see the minio bucket without those files.
entrypoint: >
/bin/sh -c "
/usr/bin/mc config host add s3 http://s3:5000 ${AWS_ACCESS_KEY} ${AWS_SECRET_KEY};
cd /data;
for b in */ ; do
echo \"Loading bucket $$b\"
/usr/bin/mc mb --ignore-existing s3/$$b
/usr/bin/mc cp --r $$b s3/$$b
/usr/bin/mc ls s3/$$b;
done ;
exit 0;
"
# Dev changes for the webserver container
webserver:
depends_on:
Expand Down
9 changes: 7 additions & 2 deletions docker/local_postgres/0002_aws_s3_mock.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,27 @@ LANGUAGE plpython3u
AS $$
import os
import boto3
from datetime import datetime as dt
s3_obj = boto3.resource(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY', 'test_key'),
aws_secret_access_key=os.getenv('AWS_SECRET_KEY', 'test_secret'),
region_name=region,
endpoint_url=os.getenv('S3_LOCAL_ENDPOINT', 'http://s3:5000')
).Object(bucket, file_path)
temp_location = '/tmp/postgres_loading.tsv'
temp_location = f"/tmp/pg_load_{dt.now().timestamp()}_{file_path.split('/')[-1]}"
s3_obj.download_file(temp_location)
if file_path[-3:]=='.gz':
copy_from = f"PROGRAM 'gzip -dc {temp_location}'"
else:
copy_from = plpy.quote_literal(temp_location)
with open(temp_location) as f:
columns = '({})'.format(column_list) if column_list else ''
res = plpy.execute(
'COPY {} {} FROM {} {};'.format(
table_name,
columns,
plpy.quote_literal(temp_location),
copy_from,
options
)
)
Expand Down
10 changes: 7 additions & 3 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ deploy:
lint:
pre-commit run --all-files

# Load any dependencies necessary for actions on the stack without running the webserver
_deps:
@just up "postgres s3 load_to_s3"

# Mount the tests directory and run a particular command
@_mount-tests command: (up "postgres s3")
@_mount-tests command: _deps
# The test directory is mounted into the container only during testing
docker-compose {{ DOCKER_FILES }} run \
-v {{ justfile_directory() }}/tests:/usr/local/airflow/tests/ \
Expand All @@ -90,7 +94,7 @@ shell: up
docker-compose {{ DOCKER_FILES }} exec {{ SERVICE }} /bin/bash

# Launch an IPython REPL using the webserver image
ipython: (up "postgres s3")
ipython: _deps
docker-compose {{ DOCKER_FILES }} run \
--rm \
-w /usr/local/airflow/openverse_catalog/dags \
Expand All @@ -99,7 +103,7 @@ ipython: (up "postgres s3")
/usr/local/airflow/.local/bin/ipython

# Run a given command using the webserver image
run *args: (up "postgres s3")
run *args: _deps
docker-compose {{ DOCKER_FILES }} run --rm {{ SERVICE }} {{ args }}

# Launch a pgcli shell on the postgres container (defaults to openledger) use "airflow" for airflow metastore
Expand Down
2 changes: 2 additions & 0 deletions openverse_catalog/dags/common/loader/provider_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
STOCKSNAP_DEFAULT_PROVIDER = "stocksnap"
WORDPRESS_DEFAULT_PROVIDER = "wordpress"
FREESOUND_DEFAULT_PROVIDER = "freesound"
INATURALIST_DEFAULT_PROVIDER = "inaturalist"

# Finnish parameters
FINNISH_SUB_PROVIDERS = {
Expand Down Expand Up @@ -130,6 +131,7 @@ class ImageCategory(Enum):
"deviantart": ImageCategory.DIGITIZED_ARTWORK.value,
"digitaltmuseum": ImageCategory.DIGITIZED_ARTWORK.value,
"floraon": ImageCategory.PHOTOGRAPH.value,
"inaturalist": ImageCategory.PHOTOGRAPH.value,
"mccordmuseum": ImageCategory.DIGITIZED_ARTWORK.value,
"met": ImageCategory.DIGITIZED_ARTWORK.value,
"museumsvictoria": ImageCategory.DIGITIZED_ARTWORK.value,
Expand Down
166 changes: 166 additions & 0 deletions openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""
Provider: iNaturalist
Output: TSV file containing the media metadata.
Notes: [The iNaturalist API is not intended for data scraping.]
(https://api.inaturalist.org/v1/docs/)
[But there is a full dump intended for sharing on S3.]
(https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata)
Because these are very large normalized tables, as opposed to more document
oriented API responses, we found that bringing the data into postgres first
was the most effective approach. [More detail in slack here.]
(https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N)
We use the table structure defined [here,]
(https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql)
except for adding ancestry tags to the taxa table.
"""

import os
from pathlib import Path
from typing import Dict

import pendulum
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup
from common.constants import POSTGRES_CONN_ID
from common.licenses import LicenseInfo, get_license_info
from common.loader import provider_details as prov
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester


AWS_CONN_ID = os.getenv("AWS_CONN_ID", "test_conn_id")
PROVIDER = prov.INATURALIST_DEFAULT_PROVIDER
SCRIPT_DIR = Path(__file__).parents[1] / "provider_csv_load_scripts/inaturalist"
SOURCE_FILE_NAMES = ["photos", "observations", "taxa", "observers"]


class INaturalistDataIngester(ProviderDataIngester):

providers = {"image": prov.INATURALIST_DEFAULT_PROVIDER}

def __init__(self, *kwargs):
super(INaturalistDataIngester, self).__init__()
self.pg = PostgresHook(POSTGRES_CONN_ID)

# adjustments to buffer limits. TO DO: try to integrate this with the dev
# environment logic in the base class, rather than just over-writing it.
# See https://github.com/WordPress/openverse-catalog/issues/682
self.media_stores["image"].buffer_length = 10_000
self.batch_limit = 10_000
self.sql_template = (SCRIPT_DIR / "export_to_json.template.sql").read_text()

def get_next_query_params(self, prev_query_params=None, **kwargs):
if prev_query_params is None:
return {"offset_num": 0}
else:
next_offset = prev_query_params["offset_num"] + self.batch_limit
return {"offset_num": next_offset}

def get_response_json(self, query_params: Dict):
"""
Call the SQL to pull json from Postgres, where the raw data has been loaded.
"""
sql_string = self.sql_template.format(
batch_limit=self.batch_limit, offset_num=query_params["offset_num"]
)
sql_result = self.pg.get_records(sql_string)
# Postgres returns a list of tuples, even if it's one tuple with one item.
return sql_result[0][0]

def get_batch_data(self, response_json):
if response_json:
return response_json
return None

def get_record_data(self, data):
if data.get("foreign_identifier") is None:
# TO DO: maybe raise an error here or after a certain number of these?
# more info in https://github.com/WordPress/openverse-catalog/issues/684
return None
license_url = data.get("license_url")
license_info = get_license_info(license_url=license_url)
if license_info == LicenseInfo(None, None, None, None):
return None
record_data = {k: data[k] for k in data.keys() if k != "license_url"}
record_data["license_info"] = license_info
return record_data

def get_media_type(self, record):
# This provider only supports Images via S3, though they have some audio files
# on site and in the API.
return "image"

def endpoint(self):
raise NotImplementedError("Normalized TSV files from AWS S3 means no endpoint.")

@staticmethod
def compare_update_dates(
last_success: pendulum.DateTime | None, s3_keys: list, aws_conn_id=AWS_CONN_ID
):
# if it was never run, assume the data is new
if last_success is None:
return
s3 = S3Hook(aws_conn_id=aws_conn_id)
s3_client = s3.get_client_type()
for key in s3_keys:
# this will error out if the files don't exist, and bubble up as an
# informative failure
last_modified = s3_client.head_object(
Bucket="inaturalist-open-data", Key=key
)["LastModified"]
# if any file has been updated, let's pull them all
if last_success < last_modified:
return
# If no files have been updated, skip the DAG
raise AirflowSkipException("Nothing new to ingest")

@staticmethod
def create_preingestion_tasks():

with TaskGroup(group_id="preingestion_tasks") as preingestion_tasks:

check_for_file_updates = PythonOperator(
task_id="check_for_file_updates",
python_callable=INaturalistDataIngester.compare_update_dates,
op_kwargs={
"last_success": "{{ prev_start_date_success }}",
"s3_keys": [
f"{file_name}.csv.gz" for file_name in SOURCE_FILE_NAMES
],
},
)

create_inaturalist_schema = PostgresOperator(
task_id="create_inaturalist_schema",
postgres_conn_id=POSTGRES_CONN_ID,
sql=(SCRIPT_DIR / "create_schema.sql").read_text(),
)

with TaskGroup(group_id="load_source_files") as load_source_files:
for source_name in SOURCE_FILE_NAMES:
PostgresOperator(
task_id=f"load_{source_name}",
postgres_conn_id=POSTGRES_CONN_ID,
sql=(SCRIPT_DIR / f"{source_name}.sql").read_text(),
),

(check_for_file_updates >> create_inaturalist_schema >> load_source_files)

return preingestion_tasks

@staticmethod
def create_postingestion_tasks():
drop_inaturalist_schema = PostgresOperator(
task_id="drop_inaturalist_schema",
postgres_conn_id=POSTGRES_CONN_ID,
sql="DROP SCHEMA IF EXISTS inaturalist CASCADE",
)
return drop_inaturalist_schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE SCHEMA IF NOT EXISTS inaturalist;
COMMIT;
SELECT schema_name
FROM information_schema.schemata WHERE schema_name = 'inaturalist';
Loading

0 comments on commit ee474f2

Please sign in to comment.