From 5467c362dd8f910ab9eb46ae557e79d6d9347b54 Mon Sep 17 00:00:00 2001 From: olaughter Date: Thu, 25 Jan 2024 11:19:27 +0000 Subject: [PATCH] Reluctantly move ingest code into tests for setup Unfortunatly we rely on the ingest code for test setup. So although this has been removed from the app, we still need it to maintain test coverage. That being said, we are one step closer to deleting it and no longer need to test it, so this is still progress. The ideal solution here probably to replace the many fragmented test setup with a single, repurposable setup factory. But thats beyond the scope of the current change --- .../core/ingestion/legacy_setup/cclw/event.py | 68 ++++ .../legacy_setup/cclw/ingest_row_cclw.py | 117 ++++++ .../ingestion/legacy_setup/cclw/metadata.py | 75 ++++ .../core/ingestion/legacy_setup/collection.py | 233 +++++++++++ tests/core/ingestion/legacy_setup/family.py | 259 ++++++++++++ .../ingestion/legacy_setup/ingest_row_base.py | 87 ++++ tests/core/ingestion/legacy_setup/match.py | 86 ++++ tests/core/ingestion/legacy_setup/metadata.py | 61 +++ tests/core/ingestion/legacy_setup/params.py | 31 ++ .../legacy_setup/physical_document.py | 111 ++++++ tests/core/ingestion/legacy_setup/pipeline.py | 75 ++++ .../core/ingestion/legacy_setup/processor.py | 375 ++++++++++++++++++ tests/core/ingestion/legacy_setup/reader.py | 56 +++ .../ingestion/legacy_setup/unfccc/event.py | 46 +++ .../legacy_setup/unfccc/ingest_row_unfccc.py | 83 ++++ .../ingestion/legacy_setup/unfccc/metadata.py | 74 ++++ .../ingestion/legacy_setup/unfccc/validate.py | 83 ++++ tests/core/ingestion/legacy_setup/utils.py | 282 +++++++++++++ .../core/ingestion/legacy_setup/validator.py | 249 ++++++++++++ tests/routes/document_helpers.py | 36 +- tests/routes/test_search.py | 2 +- 21 files changed, 2483 insertions(+), 6 deletions(-) create mode 100644 tests/core/ingestion/legacy_setup/cclw/event.py create mode 100644 tests/core/ingestion/legacy_setup/cclw/ingest_row_cclw.py create mode 100644 tests/core/ingestion/legacy_setup/cclw/metadata.py create mode 100644 tests/core/ingestion/legacy_setup/collection.py create mode 100644 tests/core/ingestion/legacy_setup/family.py create mode 100644 tests/core/ingestion/legacy_setup/ingest_row_base.py create mode 100644 tests/core/ingestion/legacy_setup/match.py create mode 100644 tests/core/ingestion/legacy_setup/metadata.py create mode 100644 tests/core/ingestion/legacy_setup/params.py create mode 100644 tests/core/ingestion/legacy_setup/physical_document.py create mode 100644 tests/core/ingestion/legacy_setup/pipeline.py create mode 100644 tests/core/ingestion/legacy_setup/processor.py create mode 100644 tests/core/ingestion/legacy_setup/reader.py create mode 100644 tests/core/ingestion/legacy_setup/unfccc/event.py create mode 100644 tests/core/ingestion/legacy_setup/unfccc/ingest_row_unfccc.py create mode 100644 tests/core/ingestion/legacy_setup/unfccc/metadata.py create mode 100644 tests/core/ingestion/legacy_setup/unfccc/validate.py create mode 100644 tests/core/ingestion/legacy_setup/utils.py create mode 100644 tests/core/ingestion/legacy_setup/validator.py diff --git a/tests/core/ingestion/legacy_setup/cclw/event.py b/tests/core/ingestion/legacy_setup/cclw/event.py new file mode 100644 index 00000000..1ea586a2 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/cclw/event.py @@ -0,0 +1,68 @@ +import json +import logging +from typing import Any, Optional + +from pydantic.json import pydantic_encoder +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import Session +from tests.core.ingestion.legacy_setup.cclw.ingest_row_cclw import EventIngestRow +from tests.core.ingestion.legacy_setup.utils import get_or_create, to_dict + +from app.db.models.law_policy import FamilyEvent + +_LOGGER = logging.getLogger(__name__) + + +def family_event_from_row( + db: Session, + row: EventIngestRow, + result: dict[str, Any], +) -> FamilyEvent: + """ + Create any missing Family, FamilyDocument & Associated links from the given row + + :param [Session] db: connection to the database. + :param [EventIngestRow] row: the row built from the events CSV. + :param [dict[str, Any]] result: a result dict in which to track what was created + :raises [ValueError]: When there is an existing family name that only differs by + case or when the geography associated with this row cannot be found in the + database. + :return [FamilyEvent]: The family event that was either retrieved or created + """ + # Get or create FamilyEvent + family_event = _maybe_create_family_event(db, row, result) + + return family_event + + +def _maybe_create_family_event( + db: Session, row: EventIngestRow, result: dict[str, Any] +) -> Optional[FamilyEvent]: + try: + family_event = get_or_create( + db, + FamilyEvent, + import_id=row.cpr_event_id, + extra={ + "title": row.title, + "date": row.date, + "event_type_name": row.event_type, + "family_import_id": row.cpr_family_id, + "family_document_import_id": None, # TODO: link to documents in future + "status": row.event_status, + }, + ) + family_event_results = result.get("family_events", []) + family_event_results.append(to_dict(family_event)) + result["family_events"] = family_event_results + return family_event + except IntegrityError: + row_dict = json.loads(json.dumps(row, default=pydantic_encoder)) + _LOGGER.exception( + "Failed to create family event due to foreign key violation", + extra={"props": {"event_details": row_dict}}, + ) + family_event_errors = result.get("family_event_errors", []) + family_event_errors.append(row_dict) + result["family_event_errors"] = family_event_errors + return None diff --git a/tests/core/ingestion/legacy_setup/cclw/ingest_row_cclw.py b/tests/core/ingestion/legacy_setup/cclw/ingest_row_cclw.py new file mode 100644 index 00000000..fb27ab18 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/cclw/ingest_row_cclw.py @@ -0,0 +1,117 @@ +from datetime import datetime +from typing import ClassVar, Optional + +from pydantic import ConfigDict +from pydantic.dataclasses import dataclass +from tests.core.ingestion.legacy_setup.ingest_row_base import BaseIngestRow + +from app.db.models.law_policy import EventStatus, FamilyCategory + + +_REQUIRED_DOCUMENT_COLUMNS = [ + "ID", + "Document ID", + "Collection name", + "Collection summary", + "Document title", + "Family name", + "Family summary", + "Document role", + "Document variant", + "Geography ISO", + "Documents", + "Category", + "Sectors", + "Instruments", + "Frameworks", + "Responses", + "Natural Hazards", + "Document Type", + "Language", + "Keywords", + "Geography", + "CPR Document ID", + "CPR Family ID", + "CPR Collection ID", + "CPR Family Slug", + "CPR Document Slug", + "CPR Document Status", +] +VALID_DOCUMENT_COLUMN_NAMES = set(_REQUIRED_DOCUMENT_COLUMNS) + +_REQUIRED_EVENT_COLUMNS = [ + "Id", + "Event type", + "Title", + "Date", + "CPR Event ID", + "CPR Family ID", +] +VALID_EVENT_COLUMN_NAMES = set(_REQUIRED_EVENT_COLUMNS) + + +@dataclass(config=ConfigDict(frozen=True, validate_assignment=True, extra="forbid")) +class CCLWDocumentIngestRow(BaseIngestRow): + """Represents a single row of input from the documents-families-collections CSV.""" + + id: str + document_id: str + collection_name: str + collection_summary: str + document_title: str + family_name: str + family_summary: str + document_role: str + document_variant: str + geography_iso: str + documents: str + category: FamilyCategory + sectors: list[str] # METADATA + instruments: list[str] # METADATA + frameworks: list[str] # METADATA + responses: list[str] # METADATA - topics + natural_hazards: list[str] # METADATA - hazard + keywords: list[str] + document_type: str + language: list[str] + geography: str + cpr_document_id: str + cpr_family_id: str + cpr_collection_id: str + cpr_family_slug: str + cpr_document_slug: str + cpr_document_status: str + + VALID_COLUMNS: ClassVar[set[str]] = VALID_DOCUMENT_COLUMN_NAMES + + @staticmethod + def _key(key: str) -> str: + return key.lower().replace(" ", "_") + + def get_first_url(self) -> Optional[str]: + """ + Get the first URL from the 'documents' attribute. + + TODO: This could/should be written with more validation. + """ + documents = self.documents.split(";") + if len(documents) != 1: + raise ValueError(f"Expected 1 document to be parsed from: {self.documents}") + + first_url = documents[0].split("|")[0] + return first_url or None + + +@dataclass(config=ConfigDict(frozen=True, validate_assignment=True, extra="ignore")) +class EventIngestRow(BaseIngestRow): + """Represents a single row of input from the events CSV.""" + + id: str + event_type: str + title: str + date: datetime + cpr_event_id: str + cpr_family_id: str + event_status: EventStatus + + VALID_COLUMNS: ClassVar[set[str]] = VALID_EVENT_COLUMN_NAMES diff --git a/tests/core/ingestion/legacy_setup/cclw/metadata.py b/tests/core/ingestion/legacy_setup/cclw/metadata.py new file mode 100644 index 00000000..876f2e4f --- /dev/null +++ b/tests/core/ingestion/legacy_setup/cclw/metadata.py @@ -0,0 +1,75 @@ +from typing import Union + +from tests.core.ingestion.legacy_setup.cclw.ingest_row_cclw import CCLWDocumentIngestRow +from app.db.models.law_policy.metadata import FamilyMetadata +from sqlalchemy.orm import Session +from tests.core.ingestion.legacy_setup.utils import Result, ResultType +from tests.core.ingestion.legacy_setup.metadata import ( + Taxonomy, + MetadataJson, + build_metadata_field, +) + + +MAP_OF_LIST_VALUES = { + "sector": "sectors", + "instrument": "instruments", + "framework": "frameworks", + "topic": "responses", + "hazard": "natural_hazards", + "keyword": "keywords", +} + + +def add_cclw_metadata( + db: Session, + family_import_id: str, + taxonomy: Taxonomy, + taxonomy_id: int, + row: CCLWDocumentIngestRow, +) -> bool: + result, metadata = build_cclw_metadata(taxonomy, row) + if result.type == ResultType.ERROR: + return False + + db.add( + FamilyMetadata( + family_import_id=family_import_id, + taxonomy_id=taxonomy_id, + value=metadata, + ) + ) + return True + + +def build_cclw_metadata( + taxonomy: Taxonomy, row: CCLWDocumentIngestRow +) -> tuple[Result, MetadataJson]: + detail_list = [] + value: dict[str, Union[str, list[str]]] = {} + num_fails = 0 + num_resolved = 0 + + for tax_key, row_key in MAP_OF_LIST_VALUES.items(): + ingest_values = getattr(row, row_key) + result, field_value = build_metadata_field( + row.row_number, taxonomy, ingest_values, tax_key + ) + + if result.type == ResultType.OK: + value[tax_key] = field_value + elif result.type == ResultType.RESOLVED: + value[tax_key] = field_value + detail_list.append(result.details) + num_resolved += 1 + else: + detail_list.append(result.details) + num_fails += 1 + + row_result_type = ResultType.OK + if num_resolved: + row_result_type = ResultType.RESOLVED + if num_fails: + row_result_type = ResultType.ERROR + + return Result(type=row_result_type, details="\n".join(detail_list)), value diff --git a/tests/core/ingestion/legacy_setup/collection.py b/tests/core/ingestion/legacy_setup/collection.py new file mode 100644 index 00000000..d6990a19 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/collection.py @@ -0,0 +1,233 @@ +from typing import Any, Optional, cast + +from sqlalchemy.orm import Session +from tests.core.ingestion.legacy_setup.params import IngestParameters +from tests.core.ingestion.legacy_setup.unfccc.ingest_row_unfccc import ( + CollectionIngestRow, +) +from tests.core.ingestion.legacy_setup.utils import ( + create, + to_dict, + update_if_changed, +) + +from app.db.models.law_policy import Collection +from app.db.models.law_policy.collection import CollectionFamily, CollectionOrganisation + + +def handle_cclw_collection_and_link( + db: Session, + params: IngestParameters, + org_id: int, + family_import_id: str, + result: dict[str, Any], +) -> Optional[Collection]: + collection_id = params.cpr_collection_ids[0] # Only ever one for CCLW + + collection = handle_create_collection( + db, + collection_id, + params.collection_name, + params.collection_summary, + org_id, + result, + ) + + handle_link_family_to_one_collection( + db, collection_id, cast(str, family_import_id), result + ) + return collection + + +def create_collection( + db: Session, + row: CollectionIngestRow, + org_id: int, + result: dict[str, Any], +) -> Optional[Collection]: + # First check for the actual collection + existing_collection = ( + db.query(Collection) + .filter(Collection.import_id == row.cpr_collection_id) + .one_or_none() + ) + if existing_collection is None: + collection = create( + db, + Collection, + import_id=row.cpr_collection_id, + title=row.collection_name, + extra={"description": row.collection_summary}, + ) + + collection_organisation = create( + db, + CollectionOrganisation, + collection_import_id=collection.import_id, + organisation_id=org_id, + ) + + result["collection_organisation"] = to_dict(collection_organisation) + result["collection"] = to_dict(collection) + + return collection + + if existing_collection is not None: + # Check it belongs to the same organisation + existing_collection_organisation = ( + db.query(CollectionOrganisation) + .filter( + CollectionOrganisation.collection_import_id == row.cpr_collection_id + ) + .filter(CollectionOrganisation.organisation_id == org_id) + .one_or_none() + ) + + if not existing_collection_organisation: + raise ValueError( + f"This collection {row.cpr_collection_id}" + + " belongs to another org or none." + ) + + # Check values match + collection = ( + db.query(Collection) + .filter(Collection.title == row.collection_name) + .filter(Collection.description == row.collection_summary) + .filter(Collection.import_id == row.cpr_collection_id) + .one_or_none() + ) + if collection: + return collection + raise ValueError(f"Collection {row.collection_name} has incompatible values") + + raise ValueError( + f"Collection {row.cpr_collection_id} is pre-exiting, and mis-matches" + ) + + +def is_a_collection_id(collection_id: str) -> bool: + return len(collection_id) > 0 and collection_id.lower() != "n/a" + + +def handle_create_collection( + db: Session, + collection_id: str, + collection_name: str, + collection_summary: str, + org_id: int, + result: dict[str, Any], +) -> Optional[Collection]: + """ + Creates or Updates the collection part of the schema from the row if needed. + + NOTE: This determines the operation CREATE/UPDATE independently of the + operation being performed on the Family/FamilyDocument structures. + + :param [Session] db: connection to the database. + :param [DocumentIngestRow] row: the row built from the CSV. + :param [int] org_id: the organisation id associated with this row. + :param [str] family_import_id: the family id associated with this row. + :param [dict[str, Any]]: a result dict in which to record what was created. + :return [Collection | None]: A collection if one was created, otherwise None. + """ + + if not is_a_collection_id(collection_id): + return None + + # First check for the actual collection + existing_collection = ( + db.query(Collection).filter(Collection.import_id == collection_id).one_or_none() + ) + + if existing_collection is None: + collection = create( + db, + Collection, + import_id=collection_id, + title=collection_name, + extra={"description": collection_summary}, + ) + + collection_organisation = create( + db, + CollectionOrganisation, + collection_import_id=collection_id, + organisation_id=org_id, + ) + + result["collection_organisation"] = to_dict(collection_organisation) + result["collection"] = to_dict(collection) + else: + collection = existing_collection + updated = {} + update_if_changed(updated, "title", collection_name, collection) + update_if_changed(updated, "description", collection_summary, collection) + if len(updated) > 0: + result["collection"] = updated + db.add(collection) + db.flush() + + return collection + + +def handle_link_collection_to_family( + db: Session, + collection_ids: list[str], + family_import_id: str, + result: dict[str, Any], +) -> None: + # TODO: PDCT-167 remove all links not to this collection_id + # then if we don't have a link to this collection_id then add it + for collection_id in collection_ids: + existing_link = ( + db.query(CollectionFamily) + .filter_by( + collection_import_id=collection_id, + family_import_id=family_import_id, + ) + .one_or_none() + ) + + if existing_link is None: + collection_family = create( + db, + CollectionFamily, + collection_import_id=collection_id, + family_import_id=family_import_id, + ) + result["collection_family"] = to_dict(collection_family) + + +def handle_link_family_to_one_collection( + db: Session, + collection_id: str, + family_import_id: str, + result: dict[str, Any], +) -> None: + existing_links = ( + db.query(CollectionFamily) + .filter_by( + family_import_id=family_import_id, + ) + .all() + ) + + if len(existing_links) > 0: + if collection_id in [link.collection_import_id for link in existing_links]: + # Nothing to do as its already part of the collection + return + else: + # Remove any links (enforce one collection per family) + for link in existing_links: + db.delete(link) + + # Now we need to add the link to the correct collection + if is_a_collection_id(collection_id): + collection_family = create( + db, + CollectionFamily, + collection_import_id=collection_id, + family_import_id=family_import_id, + ) + result["collection_family"] = to_dict(collection_family) diff --git a/tests/core/ingestion/legacy_setup/family.py b/tests/core/ingestion/legacy_setup/family.py new file mode 100644 index 00000000..fb914b8b --- /dev/null +++ b/tests/core/ingestion/legacy_setup/family.py @@ -0,0 +1,259 @@ +from typing import Any, Optional, cast + +from sqlalchemy.orm import Session + +from tests.core.ingestion.legacy_setup.params import IngestParameters +from app.core.organisation import get_organisation_taxonomy +from tests.core.ingestion.legacy_setup.physical_document import ( + create_physical_document_from_params, + update_physical_document_languages, +) +from tests.core.ingestion.legacy_setup.utils import ( + create, + get_or_create, + to_dict, + update_if_changed, + update_if_enum_changed, +) +from app.db.models.law_policy import ( + FamilyCategory, + Family, + FamilyDocument, + FamilyOrganisation, + Geography, + Slug, +) + + +def handle_family_from_params( + db: Session, + params: IngestParameters, + org_id: int, + result: dict[str, Any], +) -> Family: + """ + Create any Family + other entities and links from the row found in the db. + + :param [Session] db: connection to the database. + :param [int] org_id: the organisation id associated with this row. + :param [IngestRow] row: the row built from the CSV. + :param [dict[str, Any]] result: a result dict in which to track what was created + :raises [ValueError]: When there is an existing family name that only differs by + case or when the geography associated with this row cannot be found in the + database. + :return [Family]: The family that was either retrieved or created + """ + family = _operate_on_family(db, params, org_id, result) + + handle_family_document_from_params(db, params, family, result) + + return family + + +def _after_create_family( + db: Session, params: IngestParameters, org_id: int, result: dict[str, Any] +): + def _create_family_links(family: Family): + family_slug = Slug( + name=params.cpr_family_slug, family_import_id=family.import_id + ) + + db.add(family_slug) + result["family_slug"] = (to_dict(family_slug),) + + family_organisation = FamilyOrganisation( + family_import_id=family.import_id, organisation_id=org_id + ) + db.add(family_organisation) + result["family_organisation"] = to_dict(family_organisation) + + tax_id, taxonomy = get_organisation_taxonomy(db, org_id) + params.add_metadata(db, cast(str, family.import_id), taxonomy, tax_id) + + return _create_family_links + + +def _operate_on_family( + db: Session, + params: IngestParameters, + org_id: int, + result: dict[str, Any], +) -> Family: + category = FamilyCategory(params.category.upper()) + + geography = _get_geography(db, params) + extra = { + "title": params.family_name, + "geography_id": geography.id, + "description": params.family_summary, + "family_category": category, + } + + family = ( + db.query(Family).filter(Family.import_id == params.cpr_family_id).one_or_none() + ) + + if family is None: + family = create( + db, + Family, + import_id=params.cpr_family_id, + extra=extra, + after_create=_after_create_family(db, params, org_id, result), + ) + result["family"] = to_dict(family) + else: + updated = {} + + update_if_changed(updated, "title", params.family_name, family) + update_if_changed(updated, "description", params.family_summary, family) + update_if_changed(updated, "family_category", category, family) + + if len(updated) > 0: + db.add(family) + db.flush() + result["family"] = updated + + return family + + +def handle_family_document_from_params( + db: Session, + params: IngestParameters, + family: Family, + result: dict[str, Any], +) -> FamilyDocument: + def none_if_empty(data: str) -> Optional[str]: + return data if data != "" else None + + # NOTE: op is determined by existence or otherwise of FamilyDocument + family_document = ( + db.query(FamilyDocument) + .filter(FamilyDocument.import_id == params.cpr_document_id) + .one_or_none() + ) + + # If the family document exists we can assume that the associated physical + # document and slug have also been created + if family_document is not None: + updated = {} + update_if_changed( + updated, + "family_import_id", + none_if_empty(params.cpr_family_id), + family_document, + ) + update_if_changed( + updated, + "document_type", + none_if_empty(params.document_type), + family_document, + ) + update_if_changed( + updated, + "document_role", + none_if_empty(params.document_role), + family_document, + ) + update_if_changed( + updated, + "variant_name", + none_if_empty(params.document_variant), + family_document, + ) + update_if_enum_changed( + updated, + "document_status", + params.cpr_document_status, + family_document, + ) + if len(updated) > 0: + db.add(family_document) + db.flush() + result["family_document"] = updated + + # Now the physical document + updated = {} + + # If source_url changed then create a new physical_document + if params.source_url != family_document.physical_document.source_url: + physical_document = create_physical_document_from_params(db, params, result) + family_document.physical_document = physical_document + else: + update_if_changed( + updated, + "title", + params.document_title, + family_document.physical_document, + ) + update_physical_document_languages( + db, params.language, result, family_document.physical_document + ) + + if len(updated) > 0: + db.add(family_document.physical_document) + db.flush() + result["physical_document"] = updated + + # Check if slug has changed + existing_slug = ( + db.query(Slug).filter(Slug.name == params.cpr_document_slug).one_or_none() + ) + if existing_slug is None: + _add_family_document_slug(db, params, family_document, result) + else: + physical_document = create_physical_document_from_params(db, params, result) + family_document = FamilyDocument( + family_import_id=family.import_id, + physical_document_id=physical_document.id, + import_id=params.cpr_document_id, + variant_name=none_if_empty(params.document_variant), + document_status=params.cpr_document_status, + document_type=none_if_empty(params.document_type), + document_role=none_if_empty(params.document_role), + ) + + db.add(family_document) + db.flush() + result["family_document"] = to_dict(family_document) + _add_family_document_slug(db, params, family_document, result) + + return family_document + + +def _get_geography(db: Session, params: IngestParameters) -> Geography: + geography = ( + db.query(Geography) + .filter(Geography.value == params.geography_iso) + .one_or_none() + ) + if geography is None: + raise ValueError( + f"Geography value of {params.geography_iso} does not exist in the database." + ) + return geography + + +def _add_family_document_slug( + db: Session, + params: IngestParameters, + family_document: FamilyDocument, + result: dict[str, Any], +) -> Slug: + """ + Adds the slugs for the family and family_document. + + :param [Session] db: connection to the database. + :param [IngestRow] row: the row built from the CSV. + :param [FamilyDocument] family_document: family document associated with this row. + :param [dict[str, Any]] result: a dictionary in which to record what was created. + :return [Slug]: the created slug object + """ + family_document_slug = get_or_create( + db, + Slug, + name=params.cpr_document_slug, + family_document_import_id=family_document.import_id, + ) + result["family_document_slug"] = to_dict(family_document_slug) + return family_document_slug diff --git a/tests/core/ingestion/legacy_setup/ingest_row_base.py b/tests/core/ingestion/legacy_setup/ingest_row_base.py new file mode 100644 index 00000000..bc01b764 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/ingest_row_base.py @@ -0,0 +1,87 @@ +import abc +from dataclasses import fields +from pydantic.dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, ClassVar, Sequence +from pydantic import ConfigDict + + +@dataclass(config=ConfigDict(frozen=True, validate_assignment=True, extra="forbid")) +class BaseIngestRow(abc.ABC): + """Represents a single row of input from a CSV.""" + + row_number: int + + VALID_COLUMNS: ClassVar[set[str]] = set() + + @classmethod + def from_row(cls, row_number: int, data: dict[str, str]): + """Parse a row from a CSV.""" + field_info = cls.field_info() + return cls( + row_number=row_number, + **{ + cls._key(k): cls._parse_str(cls._key(k), v, field_info) + for (k, v) in data.items() + if cls._key(k) in field_info.keys() + }, + ) + + @classmethod + def field_info(cls) -> dict[str, type]: + """Returns an information mapping from field name to expected type.""" + return {field.name: field.type for field in fields(cls)} + + @classmethod + def _parse_str(cls, key: str, value: str, field_info: dict[str, type]) -> Any: + if key not in field_info: + # Let pydantic deal with unexpected fields + return value + + if field_info[key] == datetime: + if "T" in value: + # this doesn't accept all valid ISO 8601 strings, only ones + # generated by isoformat. So we need some shenanigans. + value_to_use = value.replace("Z", "+00:00") + return datetime.fromisoformat(value_to_use) + else: + return datetime.strptime(value, "%Y-%m-%d").replace(tzinfo=timezone.utc) + + if field_info[key] == list[str]: + return [e.strip() for e in value.split(";") if e.strip()] + + if field_info[key] == int: + return int(value) if value else 0 + + if field_info[key] == str: + if (na := str(value).lower()) == "n/a": + return na + else: + return value + + # Let pydantic deal with other field types (e.g. str-Enums) + return value + + @staticmethod + def _key(key: str) -> str: + return key.lower().replace(" ", "_") + + +def validate_csv_columns( + column_names: Sequence[str], + valid_column_names: set[str], +) -> list[str]: + """Check that the given set of column names is valid.""" + + def original_name(col: str) -> str: + for name in valid_column_names: + if name.upper() == col: + return name + raise ValueError(f"Original name called with value not in set: {col}") + + cols_to_check = set(name.upper() for name in column_names) + expected = set([name.upper() for name in valid_column_names]) + missing_insensitive = list(expected.difference(cols_to_check)) + missing = [original_name(col) for col in missing_insensitive] + missing.sort() + return missing diff --git a/tests/core/ingestion/legacy_setup/match.py b/tests/core/ingestion/legacy_setup/match.py new file mode 100644 index 00000000..bfbc1a0c --- /dev/null +++ b/tests/core/ingestion/legacy_setup/match.py @@ -0,0 +1,86 @@ +import re +from typing import Optional, Set + +REGEX_ENDS_WITH_NUMBER = re.compile(r"(\D+)(\d+)$") + + +def _match_icase(unknown_value: str, allowed_set: Set) -> Optional[str]: + def try_case(value: str): + return value.upper() == unknown_value.upper() + + match = list(filter(try_case, allowed_set)) + if len(match) > 0: + return match[0] + + return None + + +def match_unknown_value(unknown_value: str, allowed_set: Set) -> Optional[str]: + # Just try a case insensitive match + match = _match_icase(unknown_value, allowed_set) + if match: + return match + + # Try with a plural - good for EV + match = _match_icase(unknown_value + "s", allowed_set) + if match: + return match + + # Try with no "ation" good for Transportation + if unknown_value.endswith("ation"): + match = _match_icase(unknown_value[0:-5], allowed_set) + if match: + return match + + # Try hyphenating trailing numbers - good for Covid19 + ends_with_number = REGEX_ENDS_WITH_NUMBER.match(unknown_value) + + if ends_with_number: + hyphenated_number = ( + ends_with_number.groups()[0].strip() + "-" + ends_with_number.groups()[1] + ) + + match = _match_icase(hyphenated_number, allowed_set) + if match: + return match + + # Try without an "es" ending + if unknown_value.endswith("es"): + no_plural = unknown_value[0:-2] + + match = _match_icase(no_plural, allowed_set) + if match: + return match + + # Try stripping any spaces + if " " in unknown_value: + no_spaces = unknown_value.replace(" ", "") + match = _match_icase(no_spaces, allowed_set) + if match: + return match + + # Try hyphenating Co... + if unknown_value.upper().startswith("CO"): + hyphenated_co = "Co-" + unknown_value[2:].strip() + match = _match_icase(hyphenated_co, allowed_set) + if match: + return match + + # Try hyphenating multi + if unknown_value.upper().startswith("MULTI "): + hyphenated_multi = "Multi-" + unknown_value[5:].strip() + match = _match_icase(hyphenated_multi, allowed_set) + if match: + return match + + # Try adding brackets to multi words + words = unknown_value.split(" ") + if len(words) > 2: + abbrev = "".join([w[0] for w in words]) + + with_abbrev = f"{unknown_value} ({abbrev})" + match = _match_icase(with_abbrev, allowed_set) + if match: + return match + + return None diff --git a/tests/core/ingestion/legacy_setup/metadata.py b/tests/core/ingestion/legacy_setup/metadata.py new file mode 100644 index 00000000..948f26f9 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/metadata.py @@ -0,0 +1,61 @@ +from typing import Any, Mapping, Sequence, Union + +from app.core.ingestion.types import Taxonomy + +from tests.core.ingestion.legacy_setup.match import match_unknown_value +from tests.core.ingestion.legacy_setup.utils import Result, ResultType + + +MetadataJson = Mapping[str, Union[str, Sequence[str]]] + + +def resolve_unknown(unknown_set: set[str], allowed_set: set[str]) -> set[str]: + suggestions = set() + for unknown_value in unknown_set: + suggestion = match_unknown_value(unknown_value, allowed_set) + if suggestion: + suggestions.add(suggestion) + return suggestions + + +def build_metadata_field( + row_number: int, taxonomy: Taxonomy, ingest_values: Any, tax_key: str +) -> tuple[Result, list[str]]: + if type(ingest_values) == str: + ingest_values = [ingest_values] + row_set = set(ingest_values) + allowed_set: set[str] = set(taxonomy[tax_key].allowed_values) + allow_blanks = taxonomy[tax_key].allow_blanks + allow_any = taxonomy[tax_key].allow_any + + if len(row_set) == 0: + if not allow_blanks: + details = f"Row {row_number} is blank for {tax_key} - which is not allowed." + return Result(type=ResultType.ERROR, details=details), [] + return Result(), [] # field is blank and allowed + + if allow_any: + return Result(), ingest_values + + unknown_set = row_set.difference(allowed_set) + if not unknown_set: + return Result(), ingest_values # all is well - everything found + + resolved_set = resolve_unknown(unknown_set, allowed_set) + + if len(resolved_set) == len(unknown_set): + details = f"Row {row_number} RESOLVED: {resolved_set}" + vals = row_set.difference(unknown_set).union(resolved_set) + return Result(type=ResultType.RESOLVED, details=details), list(vals) + + # If we get here we have not managed to resolve the unknown values. + + details = ( + f"Row {row_number} has value(s) for '{tax_key}' that is/are " + f"unrecognised: '{unknown_set}' " + ) + + if len(resolved_set): + details += f"able to resolve: {resolved_set}" + + return Result(type=ResultType.ERROR, details=details), [] diff --git a/tests/core/ingestion/legacy_setup/params.py b/tests/core/ingestion/legacy_setup/params.py new file mode 100644 index 00000000..9941b3d4 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/params.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass +from typing import Optional, Callable + + +@dataclass +class IngestParameters: + """Agnostic parameters for any ingest.""" + + create_collections: bool + add_metadata: Callable + source_url: Optional[str] # get_first_url() + document_id: str + collection_name: str + collection_summary: str + document_title: str + family_name: str + family_summary: str + document_role: str + document_variant: str + geography_iso: str + documents: str + category: str + document_type: str + language: list[str] + geography: str + cpr_document_id: str + cpr_family_id: str + cpr_collection_ids: list[str] + cpr_family_slug: str + cpr_document_slug: str + cpr_document_status: str diff --git a/tests/core/ingestion/legacy_setup/physical_document.py b/tests/core/ingestion/legacy_setup/physical_document.py new file mode 100644 index 00000000..b23519bc --- /dev/null +++ b/tests/core/ingestion/legacy_setup/physical_document.py @@ -0,0 +1,111 @@ +from logging import getLogger +from typing import Any + +from sqlalchemy.orm import Session +from tests.core.ingestion.legacy_setup.params import IngestParameters +from tests.core.ingestion.legacy_setup.utils import to_dict + +from app.db.models.document import PhysicalDocument +from app.db.models.document.physical_document import ( + Language, + LanguageSource, + PhysicalDocumentLanguage, +) + +_LOGGER = getLogger(__name__) + + +def create_physical_document_from_params( + db: Session, + params: IngestParameters, + result: dict[str, Any], +) -> PhysicalDocument: + """ + Create the document part of the schema from the row. + + :param [Session] db: connection to the database. + :param IngestParameters params: The parameters for the ingest. + :param dict[str, Any] result: The result of the ingest + :return [dict[str, Any]]: a dictionary to describe what was created. + """ + physical_document = PhysicalDocument( + title=params.document_title, + source_url=params.source_url, + md5_sum=None, + content_type=None, + cdn_object=None, + ) + db.add(physical_document) + db.flush() + result["physical_document"] = to_dict(physical_document) + + update_physical_document_languages(db, params.language, result, physical_document) + + return physical_document + + +def update_physical_document_languages( + db: Session, + langs: list[str], + result: dict[str, Any], + physical_document: PhysicalDocument, +) -> None: + """ + Updates the physical document with the languages in param. + + :param Session db: connection to the db. + :param list[str] langs: List of languages. + :param dict[str, Any] result: The result of the ingest + :param PhysicalDocument physical_document: The physical document to update + """ + existing_language_links: dict[int, PhysicalDocumentLanguage] = { + pdl.language_id: pdl + for pdl in db.query(PhysicalDocumentLanguage) + .filter_by(document_id=physical_document.id) + .all() + } + + result_document_languages = [] + result_physical_document_languages = [] + for language in langs: + lang = db.query(Language).filter(Language.name == language).one_or_none() + if lang is None: + _LOGGER.error( + f"Ingest for physical_document '{physical_document.id}' attempted to " + f"assign langauge '{language}' which does not exist in the database" + ) + continue + + if lang.id in existing_language_links: + physical_document_language = existing_language_links[lang.id] + + # If the Language source is already set to USER, do not toggle visibility + should_update_language_link = ( + physical_document_language.source != LanguageSource.USER + and not physical_document_language.visible + ) + if should_update_language_link: + physical_document_language.source = LanguageSource.USER # type: ignore + physical_document_language.visible = True # type: ignore + db.flush() + result_physical_document_languages.append( + to_dict(physical_document_language) + ) + else: + physical_document_language = PhysicalDocumentLanguage( + language_id=lang.id, + document_id=physical_document.id, + source=LanguageSource.USER, + visible=True, + ) + db.add(physical_document_language) + db.flush() + result_document_languages.append(to_dict(lang)) + result_physical_document_languages.append( + to_dict(physical_document_language) + ) + + if result_document_languages: + result["language"] = result_document_languages + if result_physical_document_languages: + result["physical_document_language"] = result_physical_document_languages diff --git a/tests/core/ingestion/legacy_setup/pipeline.py b/tests/core/ingestion/legacy_setup/pipeline.py new file mode 100644 index 00000000..932c9267 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/pipeline.py @@ -0,0 +1,75 @@ +from datetime import datetime, timezone +from typing import Any, Sequence, Tuple, cast + +from sqlalchemy.orm import Session + +from app.api.api_v1.schemas.document import DocumentParserInput +from app.db.models.app.users import Organisation +from app.db.models.law_policy.family import ( + Family, + FamilyDocument, + FamilyOrganisation, + Geography, +) +from app.db.models.law_policy.metadata import FamilyMetadata + + +def generate_pipeline_ingest_input(db: Session) -> Sequence[DocumentParserInput]: + """Generates a complete view of the current document database as pipeline input""" + query = ( + db.query(Family, FamilyDocument, FamilyMetadata, Geography, Organisation) + .join(Family, Family.import_id == FamilyDocument.family_import_id) + .join( + FamilyOrganisation, FamilyOrganisation.family_import_id == Family.import_id + ) + .join(FamilyMetadata, Family.import_id == FamilyMetadata.family_import_id) + .join(Organisation, Organisation.id == FamilyOrganisation.organisation_id) + .join(Geography, Geography.id == Family.geography_id) + ) + + query_result = cast( + Sequence[ + Tuple[Family, FamilyDocument, FamilyMetadata, Geography, Organisation] + ], + query.all(), + ) + fallback_date = datetime(1900, 1, 1, tzinfo=timezone.utc) + documents: Sequence[DocumentParserInput] = [ + DocumentParserInput( + name=cast(str, family.title), # All documents in a family indexed by title + description=cast(str, family.description), + category=str(family.family_category), + publication_ts=family.published_date or fallback_date, + import_id=cast(str, family_document.import_id), + slug=cast(str, family_document.slugs[-1].name), + family_import_id=cast(str, family.import_id), + family_slug=cast(str, family.slugs[-1].name), + source_url=( + cast(str, family_document.physical_document.source_url) + if family_document.physical_document is not None + else None + ), + download_url=None, + type=cast(str, family_document.document_type or ""), + source=cast(str, organisation.name), + geography=cast(str, geography.value), + languages=[ + cast(str, lang.name) + for lang in ( + family_document.physical_document.languages + if family_document.physical_document is not None + else [] + ) + ], + metadata=cast(dict[str, Any], family_metadata.value), + ) + for ( + family, + family_document, + family_metadata, + geography, + organisation, + ) in query_result + ] + + return documents diff --git a/tests/core/ingestion/legacy_setup/processor.py b/tests/core/ingestion/legacy_setup/processor.py new file mode 100644 index 00000000..e4327de6 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/processor.py @@ -0,0 +1,375 @@ +import logging +from typing import Any, Callable, TypeVar, cast + +from sqlalchemy.orm import Session +from tests.core.ingestion.legacy_setup.collection import ( + create_collection, + handle_cclw_collection_and_link, + handle_link_collection_to_family, +) +from tests.core.ingestion.legacy_setup.cclw.event import family_event_from_row +from tests.core.ingestion.legacy_setup.family import handle_family_from_params +from tests.core.ingestion.legacy_setup.cclw.ingest_row_cclw import ( + CCLWDocumentIngestRow, + EventIngestRow, +) +from tests.core.ingestion.legacy_setup.cclw.metadata import add_cclw_metadata +from tests.core.ingestion.legacy_setup.ingest_row_base import BaseIngestRow +from tests.core.ingestion.legacy_setup.metadata import Taxonomy +from tests.core.ingestion.legacy_setup.params import IngestParameters +from tests.core.ingestion.legacy_setup.unfccc.event import create_event_from_row +from tests.core.ingestion.legacy_setup.unfccc.ingest_row_unfccc import ( + CollectionIngestRow, + UNFCCCDocumentIngestRow, +) +from tests.core.ingestion.legacy_setup.unfccc.metadata import add_unfccc_metadata +from app.core.organisation import get_organisation_taxonomy +from tests.core.ingestion.legacy_setup.utils import ( + CCLWIngestContext, + IngestContext, + Result, + ResultType, + UNFCCCIngestContext, +) +from tests.core.ingestion.legacy_setup.validator import ( + validate_cclw_document_row, + validate_unfccc_document_row, +) +from app.db.models.app import ORGANISATION_CCLW, ORGANISATION_UNFCCC +from app.db.models.app.users import Organisation +from app.db.models.law_policy.geography import GEO_INTERNATIONAL, GEO_NONE + + +_LOGGER = logging.getLogger(__name__) + +_RowType = TypeVar("_RowType", bound=BaseIngestRow) + +ProcessFunc = Callable[[IngestContext, _RowType], None] + + +def parse_csv_geography(csv_geo: str) -> str: + if csv_geo == "": + return GEO_NONE + + if csv_geo == "INT": + return GEO_INTERNATIONAL # Support old style + + return csv_geo + + +def build_params_from_cclw(row: CCLWDocumentIngestRow) -> IngestParameters: + def add_metadata(db: Session, import_id: str, taxonomy: Taxonomy, taxonomy_id: int): + add_cclw_metadata(db, import_id, taxonomy, taxonomy_id, row) + + return IngestParameters( + create_collections=True, + add_metadata=add_metadata, + source_url=row.get_first_url(), + document_id=row.document_id, + collection_name=row.collection_name, + collection_summary=row.collection_summary, + document_title=row.document_title, + family_name=row.family_name, + family_summary=row.family_summary, + document_role=row.document_role, + document_variant=row.document_variant, + geography_iso=parse_csv_geography(row.geography_iso), + documents=row.documents, + category=row.category, + document_type=row.document_type, + language=row.language, + geography=row.geography, + cpr_document_id=row.cpr_document_id, + cpr_family_id=row.cpr_family_id, + cpr_collection_ids=[row.cpr_collection_id], + cpr_family_slug=row.cpr_family_slug, + cpr_document_slug=row.cpr_document_slug, + cpr_document_status=row.cpr_document_status, + ) + + +def build_params_from_unfccc(row: UNFCCCDocumentIngestRow) -> IngestParameters: + def add_metadata(db: Session, import_id: str, taxonomy: Taxonomy, taxonomy_id: int): + add_unfccc_metadata(db, import_id, taxonomy, taxonomy_id, row) + + def build_summary() -> str: + start = f"{row.family_name}, {row.submission_type}" + return f"{start} from {row.author} in {row.date.year}" + + return IngestParameters( + create_collections=False, + add_metadata=add_metadata, + source_url=row.documents, + document_id=row.cpr_document_id, + collection_name="", + collection_summary="", + document_title=row.document_title, + family_name=row.family_name, + family_summary=build_summary(), + document_role=row.document_role, + document_variant=row.document_variant, + geography_iso=parse_csv_geography(row.geography_iso), + documents=row.documents, + category=row.category, + document_type=row.submission_type, + language=row.language, + geography=row.geography, + cpr_document_id=row.cpr_document_id, + cpr_family_id=row.cpr_family_id, + cpr_collection_ids=row.cpr_collection_id, + cpr_family_slug=row.cpr_family_slug, + cpr_document_slug=row.cpr_document_slug, + cpr_document_status="PUBLISHED", + ) + + +def ingest_cclw_document_row( + db: Session, context: IngestContext, row: CCLWDocumentIngestRow +) -> dict[str, Any]: + """ + Create the constituent elements in the database that represent this row. + + :param [Session] db: the connection to the database. + :param [DocumentIngestRow] row: the IngestRow object of the current CSV row + :returns [dict[str, Any]]: a result dictionary describing what was created + """ + result = {} + import_id = row.cpr_document_id + + _LOGGER.info( + f"Ingest starting for row {row.row_number}.", + extra={ + "props": { + "row_number": row.row_number, + "import_id": import_id, + } + }, + ) + params = build_params_from_cclw(row) + family = handle_family_from_params(db, params, context.org_id, result) + + handle_cclw_collection_and_link( + db, params, context.org_id, cast(str, family.import_id), result + ) + + _LOGGER.info( + f"Ingest complete for row {row.row_number}", + extra={"props": {"result": str(result)}}, + ) + + return result + + +def ingest_unfccc_document_row( + db: Session, + context: IngestContext, + row: UNFCCCDocumentIngestRow, +) -> dict[str, Any]: + """ + Create the constituent elements in the database that represent this row. + + :param [Session] db: the connection to the database. + :param [DocumentIngestRow] row: the IngestRow object of the current CSV row + :returns [dict[str, Any]]: a result dictionary describing what was created + """ + result = {} + import_id = row.cpr_document_id + + _LOGGER.info( + f"Ingest starting for row {row.row_number}.", + extra={ + "props": { + "row_number": row.row_number, + "import_id": import_id, + } + }, + ) + + params = build_params_from_unfccc(row) + family = handle_family_from_params(db, params, context.org_id, result) + handle_link_collection_to_family( + db, params.cpr_collection_ids, cast(str, family.import_id), result + ) + + # Now create a FamilyEvent to store the date + create_event_from_row(db, row) + + ctx = cast(UNFCCCIngestContext, context) + ctx.download_urls[import_id] = row.download_url + + _LOGGER.info( + f"Ingest complete for row {row.row_number}", + extra={"props": {"result": str(result)}}, + ) + + return result + + +def ingest_collection_row( + db: Session, context: IngestContext, row: CollectionIngestRow +) -> dict[str, Any]: + result = {} + create_collection(db, row, context.org_id, result) + return result + + +def ingest_event_row( + db: Session, context: IngestContext, row: EventIngestRow +) -> dict[str, Any]: + """ + Create the constituent elements in the database that represent this row. + + :param [Session] db: the connection to the database. + :param [EventIngestRow] row: the IngestRow object of the current CSV row + :returns [dict[str, Any]]: a result dictionary describing what was created + """ + result = {} + family_event_from_row(db=db, row=row, result=result) + return result + + +def initialise_context(db: Session, org_name: str) -> IngestContext: + """ + Initialise the database + + :return [IngestContext]: The organisation that will be used for the ingest. + """ + with db.begin(): + organisation = db.query(Organisation).filter_by(name=org_name).one() + if org_name == ORGANISATION_CCLW: + return CCLWIngestContext( + org_name=org_name, org_id=cast(int, organisation.id), results=[] + ) + if org_name == ORGANISATION_UNFCCC: + return UNFCCCIngestContext( + org_name=org_name, org_id=cast(int, organisation.id), results=[] + ) + raise ValueError(f"Code not in sync with data - org {org_name} unknown to code") + + +def get_event_ingestor(db: Session) -> ProcessFunc: + """ + Get the ingestion function for ingesting an event CSV row. + + :return [ProcessFunc]: The function used to ingest the CSV row. + """ + + def process(context: IngestContext, row: EventIngestRow) -> None: + """Processes the row into the db.""" + _LOGGER.info(f"Ingesting event row: {row.row_number}") + + with db.begin(): + ingest_event_row(db, context, row=row) + + return process + + +def get_collection_ingestor(db: Session) -> ProcessFunc: + """ + Get the ingestion function for ingesting a collection CSV row. + + :return [ProcessFunc]: The function used to ingest the CSV row. + """ + + def process(context: IngestContext, row: CollectionIngestRow) -> None: + """Processes the row into the db.""" + _LOGGER.info(f"Ingesting collection row: {row.row_number}") + + with db.begin(): + ingest_collection_row(db, context, row=row) + + return process + + +def get_cclw_document_ingestor(db: Session, context: IngestContext) -> ProcessFunc: + """ + Get the ingestion function for ingesting a law & policy CSV row. + + :return [ProcessFunc]: The function used to ingest the CSV row. + """ + + def cclw_process(context: IngestContext, row: CCLWDocumentIngestRow) -> None: + """Processes the row into the db.""" + _LOGGER.info(f"Ingesting document row: {row.row_number}") + + with db.begin(): + try: + ingest_cclw_document_row(db, context, row=row) + except Exception as e: + error = Result( + ResultType.ERROR, f"Row {row.row_number}: Error {str(e)}" + ) + context.results.append(error) + _LOGGER.error( + "Error on ingest", + extra={"props": {"row_number": row.row_number, "error": str(e)}}, + ) + + return cclw_process + + +def get_unfccc_document_ingestor(db: Session, context: IngestContext) -> ProcessFunc: + """ + Get the ingestion function for ingesting a law & policy CSV row. + + :return [ProcessFunc]: The function used to ingest the CSV row. + """ + + def unfccc_process(context: IngestContext, row: UNFCCCDocumentIngestRow) -> None: + """Processes the row into the db.""" + _LOGGER.info(f"Ingesting document row: {row.row_number}") + + with db.begin(): + try: + ingest_unfccc_document_row(db, context, row=row) + except Exception as e: + error = Result( + ResultType.ERROR, f"Row {row.row_number}: Error {str(e)}" + ) + context.results.append(error) + _LOGGER.error( + "Error on ingest", + extra={"props": {"row_number": row.row_number, "error": str(e)}}, + ) + + return unfccc_process + + +def get_document_validator(db: Session, context: IngestContext) -> ProcessFunc: + """ + Get the validation function for ingesting a law & policy CSV. + + :param [IngestContext] context: The context of the current ingest + :return [ProcessFunc]: The function used to validate the CSV file + """ + with db.begin(): + _, taxonomy = get_organisation_taxonomy(db, context.org_id) + + def cclw_process(context: IngestContext, row: CCLWDocumentIngestRow) -> None: + """Processes the row into the db.""" + _LOGGER.info(f"Validating document row: {row.row_number}") + with db.begin(): + validate_cclw_document_row( + db=db, + context=cast(CCLWIngestContext, context), + taxonomy=taxonomy, + row=row, + ) + + def unfccc_process(context: IngestContext, row: UNFCCCDocumentIngestRow) -> None: + """Processes the row into the db.""" + _LOGGER.info(f"Validating document row: {row.row_number}") + with db.begin(): + validate_unfccc_document_row( + db=db, + context=cast(UNFCCCIngestContext, context), + taxonomy=taxonomy, + row=row, + ) + + if context.org_name == ORGANISATION_CCLW: + return cclw_process + elif context.org_name == ORGANISATION_UNFCCC: + return unfccc_process + + raise ValueError(f"Unknown org {context.org_name} for validation.") diff --git a/tests/core/ingestion/legacy_setup/reader.py b/tests/core/ingestion/legacy_setup/reader.py new file mode 100644 index 00000000..00d76197 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/reader.py @@ -0,0 +1,56 @@ +import csv +from io import StringIO +from typing import Type + +from fastapi import UploadFile +from tests.core.ingestion.legacy_setup.ingest_row_base import ( + BaseIngestRow, + validate_csv_columns, +) +from tests.core.ingestion.legacy_setup.processor import ProcessFunc + +from tests.core.ingestion.legacy_setup.utils import IngestContext +from app.core.validation.types import ImportSchemaMismatchError + + +def get_file_contents(csv_upload: UploadFile) -> str: + """ + Gets the file contents from an UploadFile. + + :param [UploadFile] csv_upload: The UploadFile from an HTTP request. + :return [str]: The contents of the file. + """ + return csv_upload.file.read().decode("utf8") + + +def read( + file_contents: str, + context: IngestContext, + row_type: Type[BaseIngestRow], + process: ProcessFunc, +) -> None: + """ + Read a CSV file and call process() for each row. + + :param [str] file_contents: the content of the imported CSV file. + :param [IngestContext] context: a context to use during import. + :param [Type[BaseIngestRow]] row_type: the type of row expected from the CSV. + :param [ProcessFunc] process: the function to call to process a single row. + """ + reader = csv.DictReader(StringIO(initial_value=file_contents)) + if reader.fieldnames is None: + raise ImportSchemaMismatchError("No fields in CSV!", {}) + + missing_columns = validate_csv_columns( + reader.fieldnames, + row_type.VALID_COLUMNS, + ) + if missing_columns: + raise ImportSchemaMismatchError( + "Field names in CSV did not validate", {"missing": missing_columns} + ) + row_count = 0 + + for row in reader: + row_count += 1 + process(context, row_type.from_row(row_count, row)) diff --git a/tests/core/ingestion/legacy_setup/unfccc/event.py b/tests/core/ingestion/legacy_setup/unfccc/event.py new file mode 100644 index 00000000..d4d75bde --- /dev/null +++ b/tests/core/ingestion/legacy_setup/unfccc/event.py @@ -0,0 +1,46 @@ +from sqlalchemy.orm import Session +from tests.core.ingestion.legacy_setup.unfccc.ingest_row_unfccc import ( + UNFCCCDocumentIngestRow, +) +from app.db.models.law_policy.family import EventStatus, FamilyEvent + + +TYPE_MAP = { + "ANNEX": "Updated", + "SUPPORTING DOCUMENTATION": "Updated", + "MAIN": "Passed/Approved", +} + + +def _get_type_from_role(role): + if role in TYPE_MAP: + return TYPE_MAP[role] + return "Other" + + +def _create_event_id(doc_id: str) -> str: + id_bits = doc_id.split(".") + if len(id_bits) < 4: + raise ValueError(f"Document import id has unexpected format {doc_id}") + id_bits[1] = "unfccc_event" + return ".".join(id_bits) + + +def create_event_from_row(db: Session, row: UNFCCCDocumentIngestRow): + # Don't create an event for summaries + if row.document_role.upper() == "SUMMARY": + return + + event_type = _get_type_from_role(row.document_role.upper()) + event = FamilyEvent( + import_id=_create_event_id(row.cpr_document_id), + title=event_type, + date=row.date, + event_type_name=event_type, + family_import_id=row.cpr_family_id, + family_document_import_id=row.cpr_document_id, + status=EventStatus.OK, + ) + + db.add(event) + db.flush() diff --git a/tests/core/ingestion/legacy_setup/unfccc/ingest_row_unfccc.py b/tests/core/ingestion/legacy_setup/unfccc/ingest_row_unfccc.py new file mode 100644 index 00000000..6e497048 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/unfccc/ingest_row_unfccc.py @@ -0,0 +1,83 @@ +from datetime import datetime +from typing import ClassVar + +from pydantic import ConfigDict +from pydantic.dataclasses import dataclass +from tests.core.ingestion.legacy_setup.ingest_row_base import BaseIngestRow + +_REQUIRED_DOCUMENT_COLUMNS = [ + "Category", + "Submission Type", + "Family Name", + "Document Title", + "Documents", + "Author", + "Author Type", + "Geography", + "Geography ISO", + "Date", + "Document Role", + "Document Variant", + "Language", + "CPR Collection ID", + "CPR Document ID", + "CPR Family ID", + "CPR Family Slug", + "CPR Document Slug", + "CPR Document Status", + "Download URL", +] +VALID_DOCUMENT_COLUMN_NAMES = set(_REQUIRED_DOCUMENT_COLUMNS) + +_REQUIRED_COLLECTION_COLUMNS = [ + "CPR Collection ID", + "Collection name", + "Collection summary", +] +VALID_COLLECTION_COLUMN_NAMES = set(_REQUIRED_COLLECTION_COLUMNS) + + +@dataclass(config=ConfigDict(frozen=True, validate_assignment=True, extra="forbid")) +class UNFCCCDocumentIngestRow(BaseIngestRow): + """Represents a single row of input from the UNFCCC CSV.""" + + category: str + submission_type: str # aka Document Type for UNFCCC + family_name: str + document_title: str + documents: str + author: str # METADATA + author_type: str # METADATA + geography: str + geography_iso: str + date: datetime + document_role: str + document_variant: str + language: list[str] + + cpr_collection_id: list[str] + cpr_document_id: str + cpr_family_id: str + cpr_family_slug: str + cpr_document_slug: str + cpr_document_status: str + download_url: str + + family_summary: str = "summary" + + VALID_COLUMNS: ClassVar[set[str]] = VALID_DOCUMENT_COLUMN_NAMES + + @staticmethod + def _key(key: str) -> str: + return key.lower().replace(" ", "_") + + +@dataclass(config=ConfigDict(frozen=True, validate_assignment=True, extra="ignore")) +class CollectionIngestRow(BaseIngestRow): + """Represents a single row of input from the collection CSV.""" + + cpr_collection_id: str + collection_name: str + collection_summary: str + + VALID_COLUMNS: ClassVar[set[str]] = VALID_COLLECTION_COLUMN_NAMES diff --git a/tests/core/ingestion/legacy_setup/unfccc/metadata.py b/tests/core/ingestion/legacy_setup/unfccc/metadata.py new file mode 100644 index 00000000..d5fce6f3 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/unfccc/metadata.py @@ -0,0 +1,74 @@ +from typing import Union +from sqlalchemy.orm import Session + +from tests.core.ingestion.legacy_setup.utils import Result, ResultType +from tests.core.ingestion.legacy_setup.metadata import Taxonomy +from app.db.models.law_policy.metadata import FamilyMetadata + +from tests.core.ingestion.legacy_setup.metadata import ( + MetadataJson, + build_metadata_field, +) +from tests.core.ingestion.legacy_setup.unfccc.ingest_row_unfccc import ( + UNFCCCDocumentIngestRow, +) + + +MAP_OF_LIST_VALUES = { + "author": "author", + "author_type": "author_type", +} + + +def add_unfccc_metadata( + db: Session, + family_import_id: str, + taxonomy: Taxonomy, + taxonomy_id: int, + row: UNFCCCDocumentIngestRow, +) -> bool: + result, metadata = build_unfccc_metadata(taxonomy, row) + if result.type == ResultType.ERROR: + return False + + db.add( + FamilyMetadata( + family_import_id=family_import_id, + taxonomy_id=taxonomy_id, + value=metadata, + ) + ) + return True + + +def build_unfccc_metadata( + taxonomy: Taxonomy, row: UNFCCCDocumentIngestRow +) -> tuple[Result, MetadataJson]: + detail_list = [] + value: dict[str, Union[str, list[str]]] = {} + num_fails = 0 + num_resolved = 0 + + for tax_key, row_key in MAP_OF_LIST_VALUES.items(): + ingest_values = getattr(row, row_key) + result, field_value = build_metadata_field( + row.row_number, taxonomy, ingest_values, tax_key + ) + + if result.type == ResultType.OK: + value[tax_key] = field_value + elif result.type == ResultType.RESOLVED: + value[tax_key] = field_value + detail_list.append(result.details) + num_resolved += 1 + else: + detail_list.append(result.details) + num_fails += 1 + + row_result_type = ResultType.OK + if num_resolved: + row_result_type = ResultType.RESOLVED + if num_fails: + row_result_type = ResultType.ERROR + + return Result(type=row_result_type, details="\n".join(detail_list)), value diff --git a/tests/core/ingestion/legacy_setup/unfccc/validate.py b/tests/core/ingestion/legacy_setup/unfccc/validate.py new file mode 100644 index 00000000..eb9995d0 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/unfccc/validate.py @@ -0,0 +1,83 @@ +from typing import cast +from sqlalchemy.orm import Session +from tests.core.ingestion.legacy_setup.processor import get_document_validator +from tests.core.ingestion.legacy_setup.unfccc.ingest_row_unfccc import ( + CollectionIngestRow, + UNFCCCDocumentIngestRow, +) +from tests.core.ingestion.legacy_setup.utils import ( + IngestContext, + Result, + ResultType, + UNFCCCIngestContext, + get_result_counts, +) +from tests.core.ingestion.legacy_setup.reader import read + + +def validate_unfccc_csv( + documents_file_contents: str, + collection_file_contents: str, + db: Session, + context: UNFCCCIngestContext, + all_results: list[Result], +) -> str: + """ + Validates the csv file + + :param UploadFile law_policy_csv: incoming file to validate + :param Session db: connection to the database + :param IngestContext context: the ingest context + :param list[Result] all_results: the results + :return tuple[str, str]: the file contents of the csv and the summary message + """ + + # First read all the ids in the collection_csv + def collate_ids(context: IngestContext, row: CollectionIngestRow) -> None: + ctx = cast(UNFCCCIngestContext, context) + ctx.collection_ids_defined.append(row.cpr_collection_id) + + read(collection_file_contents, context, CollectionIngestRow, collate_ids) + + # Now do the validation of the documents + validator = get_document_validator(db, context) + read(documents_file_contents, context, UNFCCCDocumentIngestRow, validator) + # Get the rows here as this is the length of results + rows = len(context.results) + + # Check the set of defined collections against those referenced + defined = set(context.collection_ids_defined) + referenced = set(context.collection_ids_referenced) + + defined_not_referenced = defined.difference(referenced) + + if len(defined_not_referenced) > 0: + # Empty collections are allowed, but need reporting + context.results.append( + Result( + ResultType.OK, + "The following Collection IDs were " + + f"defined and not referenced: {list(defined_not_referenced)}", + ) + ) + + referenced_not_defined = referenced.difference(defined) + if len(referenced_not_defined) > 0: + context.results.append( + Result( + ResultType.ERROR, + "The following Collection IDs were " + f"referenced and not defined: {list(referenced_not_defined)}", + ) + ) + + _, fails, resolved = get_result_counts(context.results) + all_results.extend(context.results) + + context.results = [] + message = ( + f"UNFCCC validation result: {rows} Rows, {fails} Failures, " + f"{resolved} Resolved" + ) + + return message diff --git a/tests/core/ingestion/legacy_setup/utils.py b/tests/core/ingestion/legacy_setup/utils.py new file mode 100644 index 00000000..697f910d --- /dev/null +++ b/tests/core/ingestion/legacy_setup/utils.py @@ -0,0 +1,282 @@ +import abc +from dataclasses import dataclass +import enum +from typing import Any, Callable, Optional, TypeVar, cast +from app.db.models.app import ORGANISATION_CCLW, ORGANISATION_UNFCCC +from app.db.session import AnyModel +from sqlalchemy.orm import Session + + +_DbModel = TypeVar("_DbModel", bound=AnyModel) + + +def create(db: Session, model: _DbModel, **kwargs) -> _DbModel: + """ + Creates a row represented by model, and described by kwargs. + + :param [Session] db: connection to the database. + :param [_DbModel] model: the model (table) you are querying. + :param kwargs: a list of attributes to describe the row you are interested in. + - if kwargs contains an `extra` key then this will be used during + creation. + - if kwargs contains an `after_create` key then the value should + be a callback function that is called after an object is created. + + :return [_DbModel]: The object that was either created or retrieved, or None + """ + extra, after_create = _vars_from_kwargs(kwargs) + + return _create_instance(db, extra, after_create, model, **kwargs) + + +def get_or_create(db: Session, model: _DbModel, **kwargs) -> _DbModel: + """ + Get or create a row represented by model, and described by kwargs. + + :param [Session] db: connection to the database. + :param [_DbModel] model: the model (table) you are querying. + :param kwargs: a list of attributes to describe the row you are interested in. + - if kwargs contains an `extra` key then this will be used during + creation. + - if kwargs contains an `after_create` key then the value should + be a callback function that is called after an object is created. + + :return [_DbModel]: The object that was either created or retrieved, or None + """ + extra, after_create = _vars_from_kwargs(kwargs) + + instance = db.query(model).filter_by(**kwargs).one_or_none() + + if instance is not None: + return instance + + return _create_instance(db, extra, after_create, model, **kwargs) + + +def _create_instance( + db: Session, + extra: dict, + after_create: Optional[Callable], + model: AnyModel, + **kwargs, +): + # Add the extra args in for creation + for k, v in extra.items(): + kwargs[k] = v + instance = model(**kwargs) + db.add(instance) + db.flush() + if after_create: + after_create(instance) + return instance + + +def _vars_from_kwargs(kwargs: dict[str, Any]) -> tuple[dict, Optional[Callable]]: + extra = {} + after_create = None + if "extra" in kwargs.keys(): + extra = kwargs["extra"] + del kwargs["extra"] + if "after_create" in kwargs.keys(): + after_create = kwargs["after_create"] + del kwargs["after_create"] + return cast(dict, extra), after_create + + +def _sanitize(value: str) -> str: + """ + Sanitizes a string by parsing out the class name and truncating. + + Used by `to_dict()` + + :param [str] value: the string to be sanitized. + :return [str]: the sanitized string. + """ + s = str(value) + if s.startswith(" 80: + return s[:80] + "..." + return s + + +def to_dict(base_object: AnyModel) -> dict: + """ + Returns a dict of the attributes of the db Base object. + + This also adds the class name. + """ + extra = ["__class__"] + return dict( + (col, _sanitize(getattr(base_object, col))) + for col in base_object.__table__.columns.keys() + extra + ) + + +class ResultType(str, enum.Enum): + """Result type used when processing metadata values.""" + + OK = "Ok" + RESOLVED = "Resolved" + ERROR = "Error" + + +@dataclass +class Result: + """Augmented result class for reporting extra details about processed metadata.""" + + type: ResultType = ResultType.OK + details: str = "" + + +@dataclass +class ConsistentFields: + """CSV entity-fields for an entity which is defined multiple times.""" + + name: str + summary: str + # TODO@ add status: str + + +class ConsistencyValidator: + """Used by validation to ensure consistency for families and collections.""" + + families: dict[str, ConsistentFields] + collections: dict[str, ConsistentFields] + + def __init__(self) -> None: + self.families = {} + self.collections = {} + + def check_family( + self, + n_row: int, + family_id: str, + family_name: str, + family_summary: str, + errors: list[Result], + ) -> None: + """Check the consistency of this family with one previously defined.""" + self._check_( + self.families, + "Family", + n_row, + family_id, + family_name, + family_summary, + errors, + ) + + def check_collection( + self, + n_row: int, + collection_id: str, + collection_name: str, + collection_summary: str, + errors: list[Result], + ) -> None: + """Check the consistency of this collection with one previously defined.""" + self._check_( + self.families, + "Collection", + n_row, + collection_id, + collection_name, + collection_summary, + errors, + ) + + @staticmethod + def _check_( + entities: dict[str, ConsistentFields], + entity_name: str, + n_row: int, + id: str, + name: str, + summary: str, + errors: list[Result], + ) -> None: + error_start = f"{entity_name} {id} has differing" + on_row = f"on row {n_row}" + fields = entities.get(id) + if fields: + if fields.name != name: + error = Result(ResultType.ERROR, f"{error_start} name {on_row}") + errors.append(error) + if fields.summary != summary: + error = Result(ResultType.ERROR, f"{error_start} summary {on_row}") + errors.append(error) + else: + entities[id] = ConsistentFields(name, summary) + + +@dataclass +class IngestContext(abc.ABC): + """Context used when processing.""" + + org_name: str + org_id: int + results: list[Result] + + +@dataclass +class UNFCCCIngestContext(IngestContext): + """Ingest Context for UNFCCC""" + + collection_ids_defined: list[str] + collection_ids_referenced: list[str] + # Just for families: + consistency_validator: ConsistencyValidator + download_urls: dict[str, str] # import_id -> url + + def __init__(self, org_name=ORGANISATION_UNFCCC, org_id=2, results=None): + self.collection_ids_defined = [] + self.collection_ids_referenced = [] + self.consistency_validator = ConsistencyValidator() + self.download_urls = {} + self.org_name = org_name + self.org_id = org_id + self.results = [] if results is None else results + + +@dataclass +class CCLWIngestContext(IngestContext): + """Ingest Context for CCLW""" + + consistency_validator: ConsistencyValidator + + def __init__(self, org_name=ORGANISATION_CCLW, org_id=1, results=None): + self.consistency_validator = ConsistencyValidator() + self.org_name = org_name + self.org_id = org_id + self.results = [] if results is None else results + + +@dataclass +class ValidationResult: + """Returned when validating a CSV""" + + message: str + errors: list[Result] + + +def get_result_counts(results: list[Result]) -> tuple[int, int, int]: + rows = len(results) + fails = len([r for r in results if r.type == ResultType.ERROR]) + resolved = len([r for r in results if r.type == ResultType.RESOLVED]) + return rows, fails, resolved + + +def update_if_changed(updated: dict, updated_key: str, source: Any, dest: Any): + if getattr(dest, updated_key) != source: + setattr(dest, updated_key, source) + updated[updated_key] = source + return updated + + +def update_if_enum_changed(updated: dict, updated_key: str, source: str, dest: Any): + if getattr(dest, updated_key).value != source.title(): + setattr(dest, updated_key, source) + updated[updated_key] = source + return updated diff --git a/tests/core/ingestion/legacy_setup/validator.py b/tests/core/ingestion/legacy_setup/validator.py new file mode 100644 index 00000000..4c2f4ac1 --- /dev/null +++ b/tests/core/ingestion/legacy_setup/validator.py @@ -0,0 +1,249 @@ +from sqlalchemy import Column +from sqlalchemy.orm import Session + +from tests.core.ingestion.legacy_setup.cclw.ingest_row_cclw import ( + CCLWDocumentIngestRow, + EventIngestRow, +) +from tests.core.ingestion.legacy_setup.metadata import Taxonomy +from tests.core.ingestion.legacy_setup.unfccc.ingest_row_unfccc import ( + UNFCCCDocumentIngestRow, +) +from tests.core.ingestion.legacy_setup.cclw.metadata import build_cclw_metadata +from tests.core.ingestion.legacy_setup.utils import ( + CCLWIngestContext, + IngestContext, + Result, + ResultType, + UNFCCCIngestContext, +) +from tests.core.ingestion.legacy_setup.unfccc.metadata import build_unfccc_metadata +from app.core.validation import IMPORT_ID_MATCHER +from app.db.models.law_policy.family import ( + FamilyDocumentRole, + FamilyDocumentType, + Variant, + Geography, +) +from app.db.models.law_policy.geography import GEO_INTERNATIONAL, GEO_NONE +from app.db.session import Base + +DbTable = Base +CheckResult = Result + + +def _check_value_in_db( + row_num: int, + db: Session, + value: str, + model: DbTable, + model_field: Column, +) -> CheckResult: + if value != "": + val = db.query(model).filter(model_field == value).one_or_none() + if val is None: + result = Result( + ResultType.ERROR, + f"Row {row_num}: Not found in db {model.__tablename__}={value}", + ) + return result + return Result() + + +def _check_geo_in_db(row_num: int, db: Session, geo_iso: str) -> CheckResult: + if geo_iso == "INT": + geo_iso = GEO_INTERNATIONAL + + if geo_iso == "": + return Result( + ResultType.ERROR, + f"Row {row_num}: Geography is empty.", + ) + val = db.query(Geography).filter(Geography.value == geo_iso).one_or_none() + if val is None: + result = Result( + ResultType.ERROR, + f"Row {row_num}: Geography {geo_iso} found in db", + ) + return result + return Result() + + +def validate_unfccc_document_row( + db: Session, + context: UNFCCCIngestContext, + row: UNFCCCDocumentIngestRow, + taxonomy: Taxonomy, +) -> None: + """ + Validate the constituent elements that represent this law & policy document row. + + :param [IngestContext] context: The ingest context. + :param [DocumentIngestRow] row: DocumentIngestRow object from the current CSV row. + :param [Taxonomy] taxonomy: the Taxonomy against which metadata should be validated. + """ + + errors = [] + n = row.row_number + + # don't validate: collection_name: str + # don't validate: family_name: str + # don't validate: document_title: str + # don't validate: documents: str + # don't validate: author: str + # don't validate: geography: str + # don't validate: date: datetime + + # Validate family id + if IMPORT_ID_MATCHER.match(row.cpr_family_id) is None: + errors.append( + Result(ResultType.ERROR, f"Family ID format error {row.cpr_family_id}") + ) + + # Validate collection id (optional) + if row.cpr_collection_id: + for collection_id in row.cpr_collection_id: + if IMPORT_ID_MATCHER.match(collection_id) is None: + errors.append( + Result( + ResultType.ERROR, f"Collection ID format error {collection_id}" + ) + ) + + # Validate document id + if IMPORT_ID_MATCHER.match(row.cpr_document_id) is None: + errors.append( + Result(ResultType.ERROR, f"Document ID format error {row.cpr_document_id}") + ) + + # validate: document_role: str + result = _check_value_in_db( + n, db, row.document_role, FamilyDocumentRole, FamilyDocumentRole.name + ) + if result.type != ResultType.OK: + errors.append(result) + + # validate: document_variant: str + result = _check_value_in_db( + n, db, row.document_variant, Variant, Variant.variant_name + ) + if result.type != ResultType.OK: + errors.append(result) + + # validate: geography_iso: str + if row.geography_iso != "": + result = _check_geo_in_db(n, db, row.geography_iso) + if result.type != ResultType.OK: + errors.append(result) + else: + row.geography_iso = GEO_NONE + + # validate: Submission type as document type + result = _check_value_in_db( + n, db, row.submission_type, FamilyDocumentType, FamilyDocumentType.name + ) + if result.type != ResultType.OK: + errors.append(result) + + # validate: language: list[str] + + # Check metadata + # validate: author_type: str # METADATA + result, _ = build_unfccc_metadata(taxonomy, row) + if result.type != ResultType.OK: + errors.append(result) + + # Check family + context.consistency_validator.check_family( + row.row_number, + row.cpr_family_id, + row.family_name, + row.family_summary, + errors, + ) + + # Add to the collections that are referenced so we can validate later + context.collection_ids_referenced.extend(row.cpr_collection_id) + + if len(errors) > 0: + context.results += errors + else: + context.results.append(Result()) + + +def validate_cclw_document_row( + db: Session, + context: CCLWIngestContext, + row: CCLWDocumentIngestRow, + taxonomy: Taxonomy, +) -> None: + """ + Validate the constituent elements that represent this law & policy document row. + + :param [IngestContext] context: The ingest context. + :param [DocumentIngestRow] row: DocumentIngestRow object from the current CSV row. + :param [Taxonomy] taxonomy: the Taxonomy against which metadata should be validated. + """ + + errors = [] + n = row.row_number + result = _check_value_in_db( + n, db, row.document_type, FamilyDocumentType, FamilyDocumentType.name + ) + if result.type != ResultType.OK: + errors.append(result) + + result = _check_value_in_db( + n, db, row.document_role, FamilyDocumentRole, FamilyDocumentRole.name + ) + if result.type != ResultType.OK: + errors.append(result) + + result = _check_value_in_db( + n, db, row.document_variant, Variant, Variant.variant_name + ) + if result.type != ResultType.OK: + errors.append(result) + + result = _check_geo_in_db(n, db, row.geography_iso) + if result.type != ResultType.OK: + errors.append(result) + + # Check metadata + result, _ = build_cclw_metadata(taxonomy, row) + if result.type != ResultType.OK: + errors.append(result) + + # Check family + context.consistency_validator.check_family( + row.row_number, + row.cpr_family_id, + row.family_name, + row.family_summary, + errors, + ) + + # Check collection + context.consistency_validator.check_collection( + row.row_number, + row.cpr_collection_id, + row.collection_name, + row.collection_summary, + errors, + ) + + if len(errors) > 0: + context.results += errors + else: + context.results.append(Result()) + + +def validate_event_row(context: IngestContext, row: EventIngestRow) -> None: + """ + Validate the constituent elements that represent this event row. + + :param [IngestContext] context: The ingest context. + :param [DocumentIngestRow] row: DocumentIngestRow object from the current CSV row. + """ + result = Result(ResultType.OK, f"Event: {row.cpr_event_id}, org {context.org_id}") + context.results.append(result) diff --git a/tests/routes/document_helpers.py b/tests/routes/document_helpers.py index bee6b422..e236b232 100644 --- a/tests/routes/document_helpers.py +++ b/tests/routes/document_helpers.py @@ -1,4 +1,7 @@ -from app.api.api_v1.routers.cclw_ingest import _start_ingest +from sqlalchemy.orm import Session + + +from app.db.models.app import ORGANISATION_CCLW from app.data_migrations import ( populate_document_role, populate_document_type, @@ -9,6 +12,17 @@ populate_taxonomy, ) +from tests.core.ingestion.legacy_setup.cclw.ingest_row_cclw import ( + CCLWDocumentIngestRow, + EventIngestRow, +) +from tests.core.ingestion.legacy_setup.processor import ( + initialise_context, + get_cclw_document_ingestor, + get_event_ingestor, +) +from tests.core.ingestion.legacy_setup.reader import read + TWO_UNPUBLISHED_DFC_ROW = """ID,Document ID,CCLW Description,Part of collection?,Create new family/ies?,Collection ID,Collection name,Collection summary,Document title,Family name,Family summary,Family ID,Document role,Applies to ID,Geography ISO,Documents,Category,Events,Sectors,Instruments,Frameworks,Responses,Natural Hazards,Document Type,Year,Language,Keywords,Geography,Parent Legislation,Comment,CPR Document ID,CPR Family ID,CPR Collection ID,CPR Family Slug,CPR Document Slug,Document variant,CPR Document Status 1001,0,Test1,FALSE,FALSE,N/A,Collection1,CollectionSummary1,Title1,Fam1,Summary1,,MAIN,,GBR,http://somewhere|en,executive,02/02/2014|Law passed,Energy,,,Mitigation,,Order,,,Energy Supply,Algeria,,,CCLW.executive.1.2,CCLW.family.1001.0,CPR.Collection.1,FamSlug1,DocSlug1,Translation,CREATED @@ -62,8 +76,20 @@ """ +def _start_ingest( + db: Session, + documents_file_contents: str, + events_file_contents: str, +): + context = initialise_context(db, ORGANISATION_CCLW) + document_ingestor = get_cclw_document_ingestor(db, context) + read(documents_file_contents, context, CCLWDocumentIngestRow, document_ingestor) + event_ingestor = get_event_ingestor(db) + read(events_file_contents, context, EventIngestRow, event_ingestor) + + def setup_with_docs(test_db, mocker): - mock_s3 = mocker.patch("app.core.aws.S3Client") + mocker.patch("app.core.aws.S3Client") populate_geography(test_db) populate_taxonomy(test_db) @@ -74,7 +100,7 @@ def setup_with_docs(test_db, mocker): populate_language(test_db) test_db.commit() - _start_ingest(test_db, mock_s3, "s3_prefix", ONE_DFC_ROW, ONE_EVENT_ROW) + _start_ingest(test_db, ONE_DFC_ROW, ONE_EVENT_ROW) test_db.commit() @@ -91,7 +117,7 @@ def setup_with_two_docs_one_family(test_db, mocker): def setup_with_multiple_docs(test_db, mocker, doc_data, event_data): - mock_s3 = mocker.patch("app.core.aws.S3Client") + mocker.patch("app.core.aws.S3Client") populate_geography(test_db) populate_taxonomy(test_db) @@ -102,5 +128,5 @@ def setup_with_multiple_docs(test_db, mocker, doc_data, event_data): populate_language(test_db) test_db.commit() - _start_ingest(test_db, mock_s3, "s3_prefix", doc_data, event_data) + _start_ingest(test_db, doc_data, event_data) test_db.commit() diff --git a/tests/routes/test_search.py b/tests/routes/test_search.py index 1338ad31..a7433190 100644 --- a/tests/routes/test_search.py +++ b/tests/routes/test_search.py @@ -20,7 +20,7 @@ SearchRequestBody, ) from app.core.search import _FILTER_FIELD_MAP, OpenSearchQueryConfig -from app.core.ingestion.utils import get_or_create +from tests.core.ingestion.legacy_setup.utils import get_or_create from app.data_migrations.taxonomy_cclw import get_cclw_taxonomy from app.db.models.app import Organisation from app.db.models.law_policy.family import (