Skip to content

Commit

Permalink
fix multi-collection unfccc ingest rows (#118)
Browse files Browse the repository at this point in the history
* Get multiple-collections working

* add test for 2 collections

* deepcopy the DOC_ROW object

* make the unfccc validation testable and write tests
  • Loading branch information
diversemix authored May 23, 2023
1 parent 77b0999 commit 55155af
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 154 deletions.
95 changes: 13 additions & 82 deletions app/api/api_v1/routers/unfccc_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,15 @@
get_collection_ingestor,
initialise_context,
get_unfccc_document_ingestor,
get_document_validator,
)
from app.core.ingestion.reader import get_file_contents, read
from app.core.ingestion.unfccc.validate import validate_unfccc_csv
from app.core.ingestion.utils import (
IngestContext,
Result,
ResultType,
UNFCCCIngestContext,
)
from app.core.ingestion.utils import (
ValidationResult,
get_result_counts,
)
from app.core.validation.types import ImportSchemaMismatchError
from app.core.validation.util import (
Expand Down Expand Up @@ -159,13 +156,16 @@ def validate_unfccc_law_policy(
all_results = []

try:
_, _, message = _validate_unfccc_csv(
unfccc_data_csv,
collection_csv,
docs = get_file_contents(unfccc_data_csv)
collections = get_file_contents(collection_csv)
message = validate_unfccc_csv(
docs,
collections,
db,
cast(UNFCCCIngestContext, context),
all_results,
)
_LOGGER.info(message)
except ImportSchemaMismatchError as e:
_LOGGER.exception(
"Provided CSV failed law & policy schema validation",
Expand Down Expand Up @@ -237,13 +237,16 @@ def ingest_unfccc_law_policy(

# PHASE 1 - Validate
try:
documents_file_contents, collection_file_contents, _ = _validate_unfccc_csv(
unfccc_data_csv,
collection_csv,
collection_file_contents = get_file_contents(collection_csv)
documents_file_contents = get_file_contents(unfccc_data_csv)
message = validate_unfccc_csv(
documents_file_contents,
collection_file_contents,
db,
cast(UNFCCCIngestContext, context),
all_results,
)
_LOGGER.info(message)
except ImportSchemaMismatchError as e:
_LOGGER.exception(
"Provided CSV failed law & policy schema validation",
Expand Down Expand Up @@ -362,75 +365,3 @@ def ingest_unfccc_law_policy(
import_s3_prefix=s3_prefix,
detail=None, # TODO: add detail?
)


def _validate_unfccc_csv(
unfccc_data_csv: UploadFile,
collection_csv: UploadFile,
db: Session,
context: UNFCCCIngestContext,
all_results: list[Result],
) -> tuple[str, str, str]:
"""
Validates the csv file
:param UploadFile law_policy_csv: incoming file to validate
:param Session db: connection to the database
:param IngestContext context: the ingest context
:param list[Result] all_results: the results
:return tuple[str, str]: the file contents of the csv and the summary message
"""

# First read all the ids in the collection_csv
def collate_ids(context: IngestContext, row: CollectionIngestRow) -> None:
ctx = cast(UNFCCCIngestContext, context)
ctx.collection_ids_defined.append(row.cpr_collection_id)

collection_file_contents = get_file_contents(collection_csv)
read(collection_file_contents, context, CollectionIngestRow, collate_ids)

# Now do the validation of the documents
documents_file_contents = get_file_contents(unfccc_data_csv)
validator = get_document_validator(db, context)
read(documents_file_contents, context, UNFCCCDocumentIngestRow, validator)
# Get the rows here as this is the length of results
rows = len(context.results)

# Check the set of defined collections against those referenced
defined = set(context.collection_ids_defined)
referenced = set(context.collection_ids_referenced)

defined_not_referenced = defined.difference(referenced)

if len(defined_not_referenced) > 0:
# Empty collections are allowed, but need reporting
context.results.append(
Result(
ResultType.OK,
"The following Collection IDs were "
+ f"defined and not referenced: {list(defined_not_referenced)}",
)
)

referenced_not_defined = referenced.difference(defined)
if len(referenced_not_defined) > 0:
context.results.append(
Result(
ResultType.ERROR,
"The following Collection IDs were "
f"referenced and not defined: {list(referenced_not_defined)}",
)
)

_, fails, resolved = get_result_counts(context.results)
all_results.extend(context.results)

context.results = []
message = (
f"UNFCCC validation result: {rows} Rows, {fails} Failures, "
f"{resolved} Resolved"
)

_LOGGER.info(message)

return documents_file_contents, collection_file_contents, message
96 changes: 60 additions & 36 deletions app/core/ingestion/collection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any, Optional, cast

from sqlalchemy.orm import Session
from app.core.ingestion.params import IngestParameters
Expand All @@ -13,6 +13,29 @@
from app.db.models.law_policy.collection import CollectionFamily, CollectionOrganisation


def handle_cclw_collection_and_link(
db: Session,
params: IngestParameters,
org_id: int,
family_import_id: str,
result: dict[str, Any],
) -> Optional[Collection]:
collection = handle_create_collection(
db,
params.cpr_collection_ids[0], # Only every one for CCLW
params.collection_name,
params.collection_summary,
org_id,
result,
)

if collection is not None:
handle_link_collection_to_family(
db, params.cpr_collection_ids, cast(str, family_import_id), result
)
return collection


def create_collection(
db: Session,
row: CollectionIngestRow,
Expand Down Expand Up @@ -65,11 +88,12 @@ def create_collection(
)


def handle_collection_and_link(
def handle_create_collection(
db: Session,
params: IngestParameters,
collection_id: str,
collection_name: str,
collection_summary: str,
org_id: int,
family_import_id: str,
result: dict[str, Any],
) -> Optional[Collection]:
"""
Expand All @@ -85,34 +109,27 @@ def handle_collection_and_link(
:param [dict[str, Any]]: a result dict in which to record what was created.
:return [Collection | None]: A collection if one was created, otherwise None.
"""
if not params.cpr_collection_id or params.cpr_collection_id == "n/a":
if not collection_id or collection_id == "n/a":
return None

# First check for the actual collection
existing_collection = (
db.query(Collection)
.filter(Collection.import_id == params.cpr_collection_id)
.one_or_none()
db.query(Collection).filter(Collection.import_id == collection_id).one_or_none()
)

if existing_collection is None:
if params.create_collections is False:
id = params.cpr_collection_id
msg = f"Collection {id} is not pre-exsiting so not linking"
raise ValueError(msg)

collection = create(
db,
Collection,
import_id=params.cpr_collection_id,
title=params.collection_name,
extra={"description": params.collection_summary},
import_id=collection_id,
title=collection_name,
extra={"description": collection_summary},
)

collection_organisation = create(
db,
CollectionOrganisation,
collection_import_id=collection.import_id,
collection_import_id=collection_id,
organisation_id=org_id,
)

Expand All @@ -121,30 +138,37 @@ def handle_collection_and_link(
else:
collection = existing_collection
updated = {}
update_if_changed(updated, "title", params.collection_name, collection)
update_if_changed(updated, "description", params.collection_summary, collection)
update_if_changed(updated, "title", collection_name, collection)
update_if_changed(updated, "description", collection_summary, collection)
if len(updated) > 0:
result["collection"] = updated
db.add(collection)
db.flush()

# Second check for the family - collection link
existing_link = (
db.query(CollectionFamily)
.filter_by(
collection_import_id=params.cpr_collection_id,
family_import_id=params.cpr_family_id,
)
.one_or_none()
)
return collection

if existing_link is None:
collection_family = create(
db,
CollectionFamily,
collection_import_id=collection.import_id,
family_import_id=family_import_id,

def handle_link_collection_to_family(
db: Session,
collection_ids: list[str],
family_import_id: str,
result: dict[str, Any],
) -> None:
for collection_id in collection_ids:
existing_link = (
db.query(CollectionFamily)
.filter_by(
collection_import_id=collection_id,
family_import_id=family_import_id,
)
.one_or_none()
)
result["collection_family"] = to_dict(collection_family)

return collection
if existing_link is None:
collection_family = create(
db,
CollectionFamily,
collection_import_id=collection_id,
family_import_id=family_import_id,
)
result["collection_family"] = to_dict(collection_family)
2 changes: 1 addition & 1 deletion app/core/ingestion/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ class IngestParameters:
geography: str
cpr_document_id: str
cpr_family_id: str
cpr_collection_id: str
cpr_collection_ids: list[str]
cpr_family_slug: str
cpr_document_slug: str
23 changes: 8 additions & 15 deletions app/core/ingestion/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from sqlalchemy.orm import Session
from app.core.ingestion.collection import (
create_collection,
handle_collection_and_link,
handle_cclw_collection_and_link,
handle_link_collection_to_family,
)
from app.core.ingestion.cclw.event import family_event_from_row
from app.core.ingestion.family import handle_family_from_params
Expand Down Expand Up @@ -78,7 +79,7 @@ def add_metadata(db: Session, import_id: str, taxonomy: Taxonomy, taxonomy_id: i
geography=row.geography,
cpr_document_id=row.cpr_document_id,
cpr_family_id=row.cpr_family_id,
cpr_collection_id=row.cpr_collection_id,
cpr_collection_ids=[row.cpr_collection_id],
cpr_family_slug=row.cpr_family_slug,
cpr_document_slug=row.cpr_document_slug,
)
Expand Down Expand Up @@ -108,7 +109,7 @@ def add_metadata(db: Session, import_id: str, taxonomy: Taxonomy, taxonomy_id: i
geography=row.geography,
cpr_document_id=row.cpr_document_id,
cpr_family_id=row.cpr_family_id,
cpr_collection_id=row.cpr_collection_id,
cpr_collection_ids=row.cpr_collection_id,
cpr_family_slug=row.cpr_family_slug,
cpr_document_slug=row.cpr_document_slug,
)
Expand Down Expand Up @@ -138,12 +139,8 @@ def ingest_cclw_document_row(
)
params = build_params_from_cclw(row)
family = handle_family_from_params(db, params, context.org_id, result)
handle_collection_and_link(
db,
params,
context.org_id,
cast(str, family.import_id),
result,
handle_cclw_collection_and_link(
db, params, context.org_id, cast(str, family.import_id), result
)

_LOGGER.info(
Expand Down Expand Up @@ -181,12 +178,8 @@ def ingest_unfccc_document_row(

params = build_params_from_unfccc(row)
family = handle_family_from_params(db, params, context.org_id, result)
handle_collection_and_link(
db,
params,
context.org_id,
cast(str, family.import_id),
result,
handle_link_collection_to_family(
db, params.cpr_collection_ids, cast(str, family.import_id), result
)

ctx = cast(UNFCCCIngestContext, context)
Expand Down
3 changes: 1 addition & 2 deletions app/core/ingestion/unfccc/ingest_row_unfccc.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ class UNFCCCDocumentIngestRow(BaseIngestRow):
document_variant: str
language: list[str]

cpr_collection_id: str
cpr_collection_id: list[str]
cpr_document_id: str
cpr_family_id: str
cpr_family_slug: str
cpr_document_slug: str
cpr_document_status: str
download_url: str

# FIXME: Where is the summary from?
family_summary: str = "summary"

VALID_COLUMNS: ClassVar[set[str]] = VALID_DOCUMENT_COLUMN_NAMES
Expand Down
Loading

0 comments on commit 55155af

Please sign in to comment.