Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix multi-collection unfccc ingest rows #118

Merged
merged 4 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 13 additions & 82 deletions app/api/api_v1/routers/unfccc_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,15 @@
get_collection_ingestor,
initialise_context,
get_unfccc_document_ingestor,
get_document_validator,
)
from app.core.ingestion.reader import get_file_contents, read
from app.core.ingestion.unfccc.validate import validate_unfccc_csv
from app.core.ingestion.utils import (
IngestContext,
Result,
ResultType,
UNFCCCIngestContext,
)
from app.core.ingestion.utils import (
ValidationResult,
get_result_counts,
)
from app.core.validation.types import ImportSchemaMismatchError
from app.core.validation.util import (
Expand Down Expand Up @@ -159,13 +156,16 @@ def validate_unfccc_law_policy(
all_results = []

try:
_, _, message = _validate_unfccc_csv(
unfccc_data_csv,
collection_csv,
docs = get_file_contents(unfccc_data_csv)
collections = get_file_contents(collection_csv)
message = validate_unfccc_csv(
docs,
collections,
db,
cast(UNFCCCIngestContext, context),
all_results,
)
_LOGGER.info(message)
except ImportSchemaMismatchError as e:
_LOGGER.exception(
"Provided CSV failed law & policy schema validation",
Expand Down Expand Up @@ -237,13 +237,16 @@ def ingest_unfccc_law_policy(

# PHASE 1 - Validate
try:
documents_file_contents, collection_file_contents, _ = _validate_unfccc_csv(
unfccc_data_csv,
collection_csv,
collection_file_contents = get_file_contents(collection_csv)
documents_file_contents = get_file_contents(unfccc_data_csv)
message = validate_unfccc_csv(
documents_file_contents,
collection_file_contents,
db,
cast(UNFCCCIngestContext, context),
all_results,
)
_LOGGER.info(message)
except ImportSchemaMismatchError as e:
_LOGGER.exception(
"Provided CSV failed law & policy schema validation",
Expand Down Expand Up @@ -362,75 +365,3 @@ def ingest_unfccc_law_policy(
import_s3_prefix=s3_prefix,
detail=None, # TODO: add detail?
)


def _validate_unfccc_csv(
unfccc_data_csv: UploadFile,
collection_csv: UploadFile,
db: Session,
context: UNFCCCIngestContext,
all_results: list[Result],
) -> tuple[str, str, str]:
"""
Validates the csv file

:param UploadFile law_policy_csv: incoming file to validate
:param Session db: connection to the database
:param IngestContext context: the ingest context
:param list[Result] all_results: the results
:return tuple[str, str]: the file contents of the csv and the summary message
"""

# First read all the ids in the collection_csv
def collate_ids(context: IngestContext, row: CollectionIngestRow) -> None:
ctx = cast(UNFCCCIngestContext, context)
ctx.collection_ids_defined.append(row.cpr_collection_id)

collection_file_contents = get_file_contents(collection_csv)
read(collection_file_contents, context, CollectionIngestRow, collate_ids)

# Now do the validation of the documents
documents_file_contents = get_file_contents(unfccc_data_csv)
validator = get_document_validator(db, context)
read(documents_file_contents, context, UNFCCCDocumentIngestRow, validator)
# Get the rows here as this is the length of results
rows = len(context.results)

# Check the set of defined collections against those referenced
defined = set(context.collection_ids_defined)
referenced = set(context.collection_ids_referenced)

defined_not_referenced = defined.difference(referenced)

if len(defined_not_referenced) > 0:
# Empty collections are allowed, but need reporting
context.results.append(
Result(
ResultType.OK,
"The following Collection IDs were "
+ f"defined and not referenced: {list(defined_not_referenced)}",
)
)

referenced_not_defined = referenced.difference(defined)
if len(referenced_not_defined) > 0:
context.results.append(
Result(
ResultType.ERROR,
"The following Collection IDs were "
f"referenced and not defined: {list(referenced_not_defined)}",
)
)

_, fails, resolved = get_result_counts(context.results)
all_results.extend(context.results)

context.results = []
message = (
f"UNFCCC validation result: {rows} Rows, {fails} Failures, "
f"{resolved} Resolved"
)

_LOGGER.info(message)

return documents_file_contents, collection_file_contents, message
96 changes: 60 additions & 36 deletions app/core/ingestion/collection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any, Optional, cast

from sqlalchemy.orm import Session
from app.core.ingestion.params import IngestParameters
Expand All @@ -13,6 +13,29 @@
from app.db.models.law_policy.collection import CollectionFamily, CollectionOrganisation


def handle_cclw_collection_and_link(
db: Session,
params: IngestParameters,
org_id: int,
family_import_id: str,
result: dict[str, Any],
) -> Optional[Collection]:
collection = handle_create_collection(
db,
params.cpr_collection_ids[0], # Only every one for CCLW
params.collection_name,
params.collection_summary,
org_id,
result,
)

if collection is not None:
handle_link_collection_to_family(
db, params.cpr_collection_ids, cast(str, family_import_id), result
)
return collection


def create_collection(
db: Session,
row: CollectionIngestRow,
Expand Down Expand Up @@ -65,11 +88,12 @@ def create_collection(
)


def handle_collection_and_link(
def handle_create_collection(
db: Session,
params: IngestParameters,
collection_id: str,
collection_name: str,
collection_summary: str,
org_id: int,
family_import_id: str,
result: dict[str, Any],
) -> Optional[Collection]:
"""
Expand All @@ -85,34 +109,27 @@ def handle_collection_and_link(
:param [dict[str, Any]]: a result dict in which to record what was created.
:return [Collection | None]: A collection if one was created, otherwise None.
"""
if not params.cpr_collection_id or params.cpr_collection_id == "n/a":
if not collection_id or collection_id == "n/a":
return None

# First check for the actual collection
existing_collection = (
db.query(Collection)
.filter(Collection.import_id == params.cpr_collection_id)
.one_or_none()
db.query(Collection).filter(Collection.import_id == collection_id).one_or_none()
)

if existing_collection is None:
if params.create_collections is False:
id = params.cpr_collection_id
msg = f"Collection {id} is not pre-exsiting so not linking"
raise ValueError(msg)

collection = create(
db,
Collection,
import_id=params.cpr_collection_id,
title=params.collection_name,
extra={"description": params.collection_summary},
import_id=collection_id,
title=collection_name,
extra={"description": collection_summary},
)

collection_organisation = create(
db,
CollectionOrganisation,
collection_import_id=collection.import_id,
collection_import_id=collection_id,
organisation_id=org_id,
)

Expand All @@ -121,30 +138,37 @@ def handle_collection_and_link(
else:
collection = existing_collection
updated = {}
update_if_changed(updated, "title", params.collection_name, collection)
update_if_changed(updated, "description", params.collection_summary, collection)
update_if_changed(updated, "title", collection_name, collection)
update_if_changed(updated, "description", collection_summary, collection)
if len(updated) > 0:
result["collection"] = updated
db.add(collection)
db.flush()

# Second check for the family - collection link
existing_link = (
db.query(CollectionFamily)
.filter_by(
collection_import_id=params.cpr_collection_id,
family_import_id=params.cpr_family_id,
)
.one_or_none()
)
return collection

if existing_link is None:
collection_family = create(
db,
CollectionFamily,
collection_import_id=collection.import_id,
family_import_id=family_import_id,

def handle_link_collection_to_family(
db: Session,
collection_ids: list[str],
family_import_id: str,
result: dict[str, Any],
) -> None:
for collection_id in collection_ids:
existing_link = (
db.query(CollectionFamily)
.filter_by(
collection_import_id=collection_id,
family_import_id=family_import_id,
)
.one_or_none()
)
result["collection_family"] = to_dict(collection_family)

return collection
if existing_link is None:
collection_family = create(
db,
CollectionFamily,
collection_import_id=collection_id,
family_import_id=family_import_id,
)
result["collection_family"] = to_dict(collection_family)
2 changes: 1 addition & 1 deletion app/core/ingestion/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ class IngestParameters:
geography: str
cpr_document_id: str
cpr_family_id: str
cpr_collection_id: str
cpr_collection_ids: list[str]
cpr_family_slug: str
cpr_document_slug: str
23 changes: 8 additions & 15 deletions app/core/ingestion/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from sqlalchemy.orm import Session
from app.core.ingestion.collection import (
create_collection,
handle_collection_and_link,
handle_cclw_collection_and_link,
handle_link_collection_to_family,
)
from app.core.ingestion.cclw.event import family_event_from_row
from app.core.ingestion.family import handle_family_from_params
Expand Down Expand Up @@ -78,7 +79,7 @@ def add_metadata(db: Session, import_id: str, taxonomy: Taxonomy, taxonomy_id: i
geography=row.geography,
cpr_document_id=row.cpr_document_id,
cpr_family_id=row.cpr_family_id,
cpr_collection_id=row.cpr_collection_id,
cpr_collection_ids=[row.cpr_collection_id],
cpr_family_slug=row.cpr_family_slug,
cpr_document_slug=row.cpr_document_slug,
)
Expand Down Expand Up @@ -108,7 +109,7 @@ def add_metadata(db: Session, import_id: str, taxonomy: Taxonomy, taxonomy_id: i
geography=row.geography,
cpr_document_id=row.cpr_document_id,
cpr_family_id=row.cpr_family_id,
cpr_collection_id=row.cpr_collection_id,
cpr_collection_ids=row.cpr_collection_id,
cpr_family_slug=row.cpr_family_slug,
cpr_document_slug=row.cpr_document_slug,
)
Expand Down Expand Up @@ -138,12 +139,8 @@ def ingest_cclw_document_row(
)
params = build_params_from_cclw(row)
family = handle_family_from_params(db, params, context.org_id, result)
handle_collection_and_link(
db,
params,
context.org_id,
cast(str, family.import_id),
result,
handle_cclw_collection_and_link(
db, params, context.org_id, cast(str, family.import_id), result
)

_LOGGER.info(
Expand Down Expand Up @@ -181,12 +178,8 @@ def ingest_unfccc_document_row(

params = build_params_from_unfccc(row)
family = handle_family_from_params(db, params, context.org_id, result)
handle_collection_and_link(
db,
params,
context.org_id,
cast(str, family.import_id),
result,
handle_link_collection_to_family(
db, params.cpr_collection_ids, cast(str, family.import_id), result
)

ctx = cast(UNFCCCIngestContext, context)
Expand Down
3 changes: 1 addition & 2 deletions app/core/ingestion/unfccc/ingest_row_unfccc.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ class UNFCCCDocumentIngestRow(BaseIngestRow):
document_variant: str
language: list[str]

cpr_collection_id: str
cpr_collection_id: list[str]
cpr_document_id: str
cpr_family_id: str
cpr_family_slug: str
cpr_document_slug: str
cpr_document_status: str
download_url: str

# FIXME: Where is the summary from?
family_summary: str = "summary"

VALID_COLUMNS: ClassVar[set[str]] = VALID_DOCUMENT_COLUMN_NAMES
Expand Down
Loading