Skip to content

Commit

Permalink
SOF-8 Refactor for ingest (#114)
Browse files Browse the repository at this point in the history
* begin re-factor of how taxonomies are populated

* refactor

* add type to arg

* Add a little more documentation for taxonomy

* More refactoring and add tests

* finish tests

* First re-factor to accomodate UNFCCC

* get tests working (not)

* lunch time commit

* fix datetime issue

* more re-factoring

* end of day commit

* get validation working

* start implementing the collections csv

* Validate the Collections CSV

* add more tests

* add database migration

* Update app/core/ingestion/processor.py

Co-authored-by: Joel Wright <[email protected]>

* remove law-policy from the routes

* create sub-modules

* fully validate collections

* update field names for collection csv

* Add new CSV columns

* remove collection_id as no longer used

* write test but comment out - pivoting

* remove submission type from taxonomy and add to document type

* fix failing populate_taxonomy test

* more ingest work

* more tests ready for writing ingest code

* Allow pre-existing collection if they match

* add download_url

* add beginnings of ingest tests for unfccc

* first refactor before unfccc ingest

* more stuff

* finish tests

* finish re-factor

* Add a test for the documents object sent to the pipeline

---------

Co-authored-by: Joel Wright <[email protected]>
  • Loading branch information
diversemix and Joel Wright authored May 22, 2023
1 parent 99ff502 commit a4c363c
Show file tree
Hide file tree
Showing 25 changed files with 410 additions and 663 deletions.
8 changes: 4 additions & 4 deletions app/api/api_v1/routers/cclw_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
CCLWDocumentIngestRow,
EventIngestRow,
)
from app.core.ingestion.cclw.pipeline import generate_pipeline_ingest_input
from app.core.ingestion.pipeline import generate_pipeline_ingest_input
from app.core.ingestion.processor import (
initialise_context,
get_document_ingestor,
get_cclw_document_ingestor,
get_document_validator,
get_event_ingestor,
)
from app.core.ingestion.cclw.reader import get_file_contents, read
from app.core.ingestion.reader import get_file_contents, read
from app.core.ingestion.utils import (
IngestContext,
Result,
Expand Down Expand Up @@ -69,7 +69,7 @@ def _start_ingest(
# TODO: add a way for a user to monitor progress of the ingest
try:
context = initialise_context(db, "CCLW")
document_ingestor = get_document_ingestor(db, context)
document_ingestor = get_cclw_document_ingestor(db, context)
read(documents_file_contents, context, CCLWDocumentIngestRow, document_ingestor)
event_ingestor = get_event_ingestor(db)
read(events_file_contents, context, EventIngestRow, event_ingestor)
Expand Down
13 changes: 6 additions & 7 deletions app/api/api_v1/routers/unfccc_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
UNFCCCDocumentIngestRow,
)

from app.core.ingestion.unfccc.pipeline import generate_pipeline_ingest_input
from app.core.ingestion.pipeline import generate_pipeline_ingest_input
from app.core.ingestion.processor import (
get_collection_ingestor,
initialise_context,
get_document_ingestor,
get_unfccc_document_ingestor,
get_document_validator,
)
from app.core.ingestion.unfccc.reader import get_file_contents, read
from app.core.ingestion.reader import get_file_contents, read
from app.core.ingestion.utils import (
IngestContext,
Result,
Expand All @@ -56,7 +56,7 @@
unfccc_ingest_router = r = APIRouter()


def _start_ingest(
def start_unfccc_ingest(
db: Session,
s3_client: S3Client,
s3_prefix: str,
Expand All @@ -71,8 +71,7 @@ def _start_ingest(
collection_ingestor = get_collection_ingestor(db)
read(collection_file_contents, context, CollectonIngestRow, collection_ingestor)

# FIXME: Write a unfccc ingestor
document_ingestor = get_document_ingestor(db, context)
document_ingestor = get_unfccc_document_ingestor(db, context)
read(
documents_file_contents, context, UNFCCCDocumentIngestRow, document_ingestor
)
Expand Down Expand Up @@ -334,7 +333,7 @@ def ingest_unfccc_law_policy(

# PHASE 3 - Start the ingest (kick off background task to do the actual ingest)
background_tasks.add_task(
_start_ingest,
start_unfccc_ingest,
db,
s3_client,
s3_prefix,
Expand Down
2 changes: 1 addition & 1 deletion app/core/ingestion/cclw/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
}


def add_metadata(
def add_cclw_metadata(
db: Session,
family_import_id: str,
taxonomy: Taxonomy,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from typing import Any, Optional

from sqlalchemy.orm import Session
from app.core.ingestion.cclw.ingest_row_cclw import CCLWDocumentIngestRow
from app.core.ingestion.params import IngestParameters
from app.core.ingestion.unfccc.ingest_row_unfccc import CollectonIngestRow
from app.core.ingestion.utils import create, to_dict, update_if_changed
from app.core.ingestion.utils import (
create,
to_dict,
update_if_changed,
)

from app.db.models.law_policy import Collection
from app.db.models.law_policy.collection import CollectionFamily, CollectionOrganisation
Expand Down Expand Up @@ -61,9 +65,9 @@ def create_collection(
)


def handle_collection_from_row(
def handle_collection_and_link(
db: Session,
row: CCLWDocumentIngestRow,
params: IngestParameters,
org_id: int,
family_import_id: str,
result: dict[str, Any],
Expand All @@ -81,23 +85,28 @@ def handle_collection_from_row(
:param [dict[str, Any]]: a result dict in which to record what was created.
:return [Collection | None]: A collection if one was created, otherwise None.
"""
if not row.cpr_collection_id or row.cpr_collection_id == "n/a":
if not params.cpr_collection_id or params.cpr_collection_id == "n/a":
return None

# First check for the actual collection
existing_collection = (
db.query(Collection)
.filter(Collection.import_id == row.cpr_collection_id)
.filter(Collection.import_id == params.cpr_collection_id)
.one_or_none()
)

if existing_collection is None:
if params.create_collections is False:
id = params.cpr_collection_id
msg = f"Collection {id} is not pre-exsiting so not linking"
raise ValueError(msg)

collection = create(
db,
Collection,
import_id=row.cpr_collection_id,
title=row.collection_name,
extra={"description": row.collection_summary},
import_id=params.cpr_collection_id,
title=params.collection_name,
extra={"description": params.collection_summary},
)

collection_organisation = create(
Expand All @@ -112,8 +121,8 @@ def handle_collection_from_row(
else:
collection = existing_collection
updated = {}
update_if_changed(updated, "title", row.collection_name, collection)
update_if_changed(updated, "description", row.collection_summary, collection)
update_if_changed(updated, "title", params.collection_name, collection)
update_if_changed(updated, "description", params.collection_summary, collection)
if len(updated) > 0:
result["collection"] = updated
db.add(collection)
Expand All @@ -123,8 +132,8 @@ def handle_collection_from_row(
existing_link = (
db.query(CollectionFamily)
.filter_by(
collection_import_id=row.cpr_collection_id,
family_import_id=row.cpr_family_id,
collection_import_id=params.cpr_collection_id,
family_import_id=params.cpr_family_id,
)
.one_or_none()
)
Expand Down
Loading

0 comments on commit a4c363c

Please sign in to comment.