Skip to content

Commit

Permalink
Feature/pdct 1350 ingest events (#211)
Browse files Browse the repository at this point in the history
* Validate import_id and family_import_id before saving documents

* Make fmaily_document_import_id optional on ingest events

* Ingest events

* Validate import id when saving events

* Validate family_import_id when saving events

* Do not generate an import_id when savin gevents if one is already provided

* Validate event type against corpus taxonomy

* Move collection validation logic to a separate service

* Move family validation logic to a separate service

* Move document validation logic to a separate service

* Move event validation logci to a separate service

* Switch to using validation service when ingesting collections

* Switch to using validation service when ingesting families

* Use validation service when ingesting documents

* Use validation service when ingesting events

* Tidy up tests

* More tidy up of test data

* Tiny refactor

* Return better error on Exception

* Bump minor version

* Fix mocks in unit tests + tidy up

* Add missing docstrings to validation service

* Add missing doctypes and tighten return types

* Update docstrings

* Make test filenames more meaningful

* Reference correct test files after rename

* Update docstrings, again...

* Update tests/integration_tests/ingest/test_ingest.py

Co-authored-by: Katy Baulch <[email protected]>

* Update tests/integration_tests/ingest/test_ingest.py

Co-authored-by: Katy Baulch <[email protected]>

* Update tests/integration_tests/ingest/test_ingest.py

Co-authored-by: Katy Baulch <[email protected]>

* Update tests/unit_tests/routers/ingest/test_ingest.py

Co-authored-by: Katy Baulch <[email protected]>

* Update tests/unit_tests/routers/ingest/test_ingest.py

Co-authored-by: Katy Baulch <[email protected]>

* Update tests/integration_tests/ingest/test_ingest.py

Co-authored-by: Katy Baulch <[email protected]>

* Add missing imports and params in docstrings for validation

---------

Co-authored-by: Katy Baulch <[email protected]>
  • Loading branch information
annaCPR and katybaulch authored Sep 10, 2024
1 parent 7b658e3 commit 1d42fd9
Show file tree
Hide file tree
Showing 21 changed files with 747 additions and 272 deletions.
2 changes: 1 addition & 1 deletion app/api/api_v1/routers/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,5 @@ async def ingest(new_data: UploadFile, corpus_import_id: str) -> Json:
except Exception as e:
_LOGGER.error(e)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Unexpected error"
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(e)
)
1 change: 1 addition & 0 deletions app/model/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class EventCreateDTO(BaseModel):
generated.
"""

import_id: Optional[str] = None
# From FamilyEvent
event_title: str
date: datetime
Expand Down
18 changes: 16 additions & 2 deletions app/model/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from app.model.collection import CollectionCreateDTO
from app.model.document import DocumentCreateDTO
from app.model.event import EventCreateDTO
from app.model.family import FamilyCreateDTO
from app.model.general import Json

Expand Down Expand Up @@ -64,11 +65,25 @@ class IngestEventDTO(BaseModel):

import_id: str
family_import_id: str
family_document_import_id: str
family_document_import_id: Optional[str] = None
event_title: str
date: datetime
event_type_value: str

def to_event_create_dto(self) -> EventCreateDTO:
"""
Convert IngestEventDTO to EventCreateDTO.
:return EventCreateDTO: Converted EventCreateDTO instance.
"""
return EventCreateDTO(
import_id=self.import_id,
family_import_id=self.family_import_id,
event_title=self.event_title,
date=self.date,
event_type_value=self.event_type_value,
)


class IngestDocumentDTO(BaseModel):
"""Representation of a document for ingest."""
Expand All @@ -77,7 +92,6 @@ class IngestDocumentDTO(BaseModel):
family_import_id: str
variant_name: Optional[str] = None
metadata: Json
events: list[str]
title: str
source_url: Optional[AnyHttpUrl] = None
user_language_name: Optional[str]
Expand Down
9 changes: 6 additions & 3 deletions app/repository/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def _event_to_dto(family_event_meta: FamilyEventTuple) -> EventReadDTO:

def _dto_to_event_dict(dto: EventCreateDTO) -> dict:
return {
"import_id": dto.import_id if dto.import_id else None,
"family_import_id": dto.family_import_id,
"family_document_import_id": dto.family_document_import_id,
"date": dto.date,
Expand Down Expand Up @@ -179,9 +180,11 @@ def create(db: Session, event: EventCreateDTO) -> str:
)

org_name = cast(str, org.name)
new_family_event.import_id = cast(
Column, generate_import_id(db, CountedEntity.Event, org_name)
)

if not new_family_event.import_id:
new_family_event.import_id = cast(
Column, generate_import_id(db, CountedEntity.Event, org_name)
)

db.add(new_family_event)
except Exception:
Expand Down
135 changes: 86 additions & 49 deletions app/service/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,128 +5,162 @@
import of data and other services for validation etc.
"""

from typing import Optional
from typing import Any, Optional

from db_client.models.dfce.taxonomy_entry import EntitySpecificTaxonomyKeys
from fastapi import HTTPException, status
from pydantic import ConfigDict, validate_call
from sqlalchemy.orm import Session

import app.clients.db.session as db_session
import app.repository.collection as collection_repository
import app.repository.document as document_repository
import app.repository.event as event_repository
import app.repository.family as family_repository
import app.service.category as category
import app.service.collection as collection
import app.service.corpus as corpus
import app.service.geography as geography
import app.service.metadata as metadata
import app.service.validation as validation
from app.errors import ValidationError
from app.model.ingest import IngestCollectionDTO, IngestDocumentDTO, IngestFamilyDTO
from app.service.collection import validate_import_id
from app.model.ingest import (
IngestCollectionDTO,
IngestDocumentDTO,
IngestEventDTO,
IngestFamilyDTO,
)


@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def save_collections(
collection_data: list[dict], corpus_import_id: str, db: Optional[Session] = None
collection_data: list[dict[str, Any]],
corpus_import_id: str,
db: Optional[Session] = None,
) -> list[str]:
"""
Creates new collections with the values passed.
:param Session db: The database session to use for saving collections.
:param list[dict] collection_data: The data to use for creating collections.
:param list[dict[str, Any]] collection_data: The data to use for creating collections.
:param str corpus_import_id: The import_id of the corpus the collections belong to.
:return str: The new import_ids for the saved collections.
:param Optional[Session] db: The database session to use for saving collections or None.
:return list[str]: The new import_ids for the saved collections.
"""
if db is None:
db = db_session.get_db()

validation.validate_collections(collection_data)

collection_import_ids = []
org_id = corpus.get_corpus_org_id(corpus_import_id)

for coll in collection_data:
dto = IngestCollectionDTO(**coll).to_collection_create_dto()
if dto.import_id:
validate_import_id(dto.import_id)
import_id = collection_repository.create(db, dto, org_id)

collection_import_ids.append(import_id)

return collection_import_ids


@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def save_families(
family_data: list[dict], corpus_import_id: str, db: Optional[Session] = None
family_data: list[dict[str, Any]],
corpus_import_id: str,
db: Optional[Session] = None,
) -> list[str]:
"""
Creates new families with the values passed.
:param Session db: The database session to use for saving families.
:param list[dict] families_data: The data to use for creating families.
:param list[dict[str, Any]] family_data: The data to use for creating families.
:param str corpus_import_id: The import_id of the corpus the families belong to.
:return str: The new import_ids for the saved families.
:param Optional[Session] db: The database session to use for saving families or None.
:return list[str]: The new import_ids for the saved families.
"""

if db is None:
db = db_session.get_db()

validation.validate_families(family_data, corpus_import_id)

family_import_ids = []
org_id = corpus.get_corpus_org_id(corpus_import_id)

for fam in family_data:
# TODO: Uncomment when implementing feature/pdct-1402-validate-collection-exists-before-creating-family
# collection.validate(collections, db)
dto = IngestFamilyDTO(
**fam, corpus_import_id=corpus_import_id
).to_family_create_dto(corpus_import_id)

if dto.import_id:
validate_import_id(dto.import_id)
corpus.validate(db, corpus_import_id)
geo_id = geography.get_id(db, dto.geography)
category.validate(dto.category)
collections = set(dto.collections)
collection.validate_multiple_ids(collections)
# TODO: Uncomment when implementing feature/pdct-1402-validate-collection-exists-before-creating-family
# collection.validate(collections, db)
metadata.validate_metadata(db, corpus_import_id, dto.metadata)

import_id = family_repository.create(db, dto, geo_id, org_id)
import_id = family_repository.create(
db, dto, geography.get_id(db, dto.geography), org_id
)
family_import_ids.append(import_id)

return family_import_ids


@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def save_documents(
document_data: list[dict],
document_data: list[dict[str, Any]],
corpus_import_id: str,
db: Optional[Session] = None,
) -> list[str]:
"""
Creates new documents with the values passed.
:param Session db: The database session to use for saving documents.
:param list[dict] document_data: The data to use for creating documents.
:param list[dict[str, Any]] document_data: The data to use for creating documents.
:param str corpus_import_id: The import_id of the corpus the documents belong to.
:return str: The new import_ids for the saved documents.
:param Optional[Session] db: The database session to use for saving documents or None.
:return list[str]: The new import_ids for the saved documents.
"""
if db is None:
db = db_session.get_db()

validation.validate_documents(document_data, corpus_import_id)

document_import_ids = []

for doc in document_data:
dto = IngestDocumentDTO(**doc).to_document_create_dto()

if dto.variant_name == "":
raise ValidationError("Variant name is empty")
metadata.validate_metadata(
db,
corpus_import_id,
dto.metadata,
EntitySpecificTaxonomyKeys.DOCUMENT.value,
)
import_id = document_repository.create(db, dto)
document_import_ids.append(import_id)

return document_import_ids


def validate_entity_relationships(data: dict) -> None:
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def save_events(
event_data: list[dict[str, Any]],
corpus_import_id: str,
db: Optional[Session] = None,
) -> list[str]:
"""
Creates new events with the values passed.
:param list[dict[str, Any]] event_data: The data to use for creating events.
:param str corpus_import_id: The import_id of the corpus the events belong to.
:param Optional[Session] db: The database session to use for saving events or None.
:return list[str]: The new import_ids for the saved events.
"""
if db is None:
db = db_session.get_db()

validation.validate_events(event_data, corpus_import_id)

event_import_ids = []

for event in event_data:
dto = IngestEventDTO(**event).to_event_create_dto()
import_id = event_repository.create(db, dto)
event_import_ids.append(import_id)

return event_import_ids


def validate_entity_relationships(data: dict[str, Any]) -> None:
"""
Validates relationships between entities contained in data based on import_ids.
For documents, it validates that the family the document is linked to exists.
:param dict[str, Any] data: The data object containing entities to be validated.
:raises ValidationError: raised should there be any unmatched relationships.
"""
families = []
if "families" in data:
for fam in data["families"]:
Expand All @@ -144,21 +178,22 @@ def validate_entity_relationships(data: dict) -> None:


@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def import_data(data: dict, corpus_import_id: str) -> dict:
def import_data(data: dict[str, Any], corpus_import_id: str) -> dict[str, str]:
"""
Imports data for a given corpus_import_id.
:param dict data: The data to be imported.
:param dict[str, Any] data: The data to be imported.
:param str corpus_import_id: The import_id of the corpus the data should be imported into.
:raises RepositoryError: raised on a database error.
:raises ValidationError: raised should the import_id be invalid.
:return dict: Import ids of the saved entities.
:raises ValidationError: raised should the data be invalid.
:return dict[str, str]: Import ids of the saved entities.
"""
db = db_session.get_db()

collection_data = data["collections"] if "collections" in data else None
family_data = data["families"] if "families" in data else None
document_data = data["documents"] if "documents" in data else None
event_data = data["events"] if "events" in data else None

if not data:
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT)
Expand All @@ -176,6 +211,8 @@ def import_data(data: dict, corpus_import_id: str) -> dict:
response["families"] = save_families(family_data, corpus_import_id, db)
if document_data:
response["documents"] = save_documents(document_data, corpus_import_id, db)
if event_data:
response["events"] = save_events(event_data, corpus_import_id, db)

return response
except Exception as e:
Expand Down
Loading

0 comments on commit 1d42fd9

Please sign in to comment.