Skip to content

Commit

Permalink
Feature/pdct 1499 add a limit to how many documents can be ingested i…
Browse files Browse the repository at this point in the history
…n one go (#215)

* Remove comments

* Correct typo and move tests around

* Do not save documents over ingest limit + update repo mocks

* Add idempotency when saving documents and families

* Refactor

* Refactor

* Add idempotency for saving events

* Add idempotency when saving collections

* Refactor to remove duplication

* Bump minor version

* Update check-auto-tagging-will-work job in git workflows

* Update python-precommit-validator in git workflows

* Update python-precommit-validator to latest

* Fix tests

* Add db assertions to the integration test
  • Loading branch information
annaCPR authored Sep 26, 2024
1 parent 993a404 commit b721f56
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 91 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
if: |
github.event_name == 'pull_request' &&
(! startsWith(github.ref, 'refs/tags') && ! startsWith(github.ref, 'refs/heads/main'))
uses: climatepolicyradar/reusable-workflows/.github/workflows/check-auto-tagging-will-work.yml@v3
uses: climatepolicyradar/reusable-workflows/.github/workflows/check-auto-tagging-will-work.yml@v10

code-quality:
if: |
Expand All @@ -36,7 +36,7 @@ jobs:
checks: write
# For repo checkout
contents: read
uses: climatepolicyradar/reusable-workflows/.github/workflows/python-precommit-validator.yml@v3
uses: climatepolicyradar/reusable-workflows/.github/workflows/python-precommit-validator.yml@v16

test:
if: |
Expand Down
93 changes: 68 additions & 25 deletions app/service/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
"""

from enum import Enum
from typing import Any, Optional
from typing import Any, Optional, Type, TypeVar

from db_client.models.dfce.collection import Collection
from db_client.models.dfce.family import Family, FamilyDocument, FamilyEvent
from fastapi import HTTPException, status
from pydantic import ConfigDict, validate_call
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.orm import Session

import app.clients.db.session as db_session
Expand All @@ -28,6 +31,8 @@
IngestFamilyDTO,
)

DOCUMENT_INGEST_LIMIT = 1000


class IngestEntityList(str, Enum):
"""Name of the list of entities that can be ingested."""
Expand All @@ -38,6 +43,26 @@ class IngestEntityList(str, Enum):
Events = "events"


class BaseModel(DeclarativeMeta):
import_id: str


T = TypeVar("T", bound=BaseModel)


def _exists_in_db(entity: Type[T], import_id: str, db: Session) -> bool:
"""
Check if a entity exists in the database by import_id.
:param Type[T] entity: The model of the entity to be looked up in the db.
:param str import_id: The import_id of the entity.
:param Session db: The database session.
:return bool: True if the entity exists, False otherwise.
"""
entity_exists = db.query(entity).filter(entity.import_id == import_id).one_or_none()
return entity_exists is not None


@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def save_collections(
collection_data: list[dict[str, Any]],
Expand All @@ -61,9 +86,10 @@ def save_collections(
org_id = corpus.get_corpus_org_id(corpus_import_id)

for coll in collection_data:
dto = IngestCollectionDTO(**coll).to_collection_create_dto()
import_id = collection_repository.create(db, dto, org_id)
collection_import_ids.append(import_id)
if not _exists_in_db(Collection, coll["import_id"], db):
dto = IngestCollectionDTO(**coll).to_collection_create_dto()
import_id = collection_repository.create(db, dto, org_id)
collection_import_ids.append(import_id)

return collection_import_ids

Expand Down Expand Up @@ -92,16 +118,15 @@ def save_families(
org_id = corpus.get_corpus_org_id(corpus_import_id)

for fam in family_data:
# TODO: Uncomment when implementing feature/pdct-1402-validate-collection-exists-before-creating-family
# collection.validate(collections, db)
dto = IngestFamilyDTO(
**fam, corpus_import_id=corpus_import_id
).to_family_create_dto(corpus_import_id)
geo_ids = []
for geo in dto.geography:
geo_ids.append(geography.get_id(db, geo))
import_id = family_repository.create(db, dto, geo_ids, org_id)
family_import_ids.append(import_id)
if not _exists_in_db(Family, fam["import_id"], db):
dto = IngestFamilyDTO(
**fam, corpus_import_id=corpus_import_id
).to_family_create_dto(corpus_import_id)
geo_ids = []
for geo in dto.geography:
geo_ids.append(geography.get_id(db, geo))
import_id = family_repository.create(db, dto, geo_ids, org_id)
family_import_ids.append(import_id)

return family_import_ids

Expand All @@ -126,11 +151,17 @@ def save_documents(
validation.validate_documents(document_data, corpus_import_id)

document_import_ids = []
saved_documents_counter = 0

for doc in document_data:
dto = IngestDocumentDTO(**doc).to_document_create_dto()
import_id = document_repository.create(db, dto)
document_import_ids.append(import_id)
if (
not _exists_in_db(FamilyDocument, doc["import_id"], db)
and saved_documents_counter < DOCUMENT_INGEST_LIMIT
):
dto = IngestDocumentDTO(**doc).to_document_create_dto()
import_id = document_repository.create(db, dto)
document_import_ids.append(import_id)
saved_documents_counter += 1

return document_import_ids

Expand All @@ -157,9 +188,10 @@ def save_events(
event_import_ids = []

for event in event_data:
dto = IngestEventDTO(**event).to_event_create_dto()
import_id = event_repository.create(db, dto)
event_import_ids.append(import_id)
if not _exists_in_db(FamilyEvent, event["import_id"], db):
dto = IngestEventDTO(**event).to_event_create_dto()
import_id = event_repository.create(db, dto)
event_import_ids.append(import_id)

return event_import_ids

Expand Down Expand Up @@ -250,6 +282,20 @@ def validate_entity_relationships(data: dict[str, Any]) -> None:
_validate_families_exist_for_events_and_documents(data)


def _validate_ingest_data(data: dict[str, Any]) -> None:
"""
Validates data to be ingested.
:param dict[str, Any] data: The data object to be validated.
:raises HTTPException: raised if data is empty or None.
"""

if not data:
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT)

validate_entity_relationships(data)


@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def import_data(data: dict[str, Any], corpus_import_id: str) -> dict[str, str]:
"""
Expand All @@ -261,21 +307,18 @@ def import_data(data: dict[str, Any], corpus_import_id: str) -> dict[str, str]:
:raises ValidationError: raised should the data be invalid.
:return dict[str, str]: Import ids of the saved entities.
"""
_validate_ingest_data(data)

db = db_session.get_db()

collection_data = data["collections"] if "collections" in data else None
family_data = data["families"] if "families" in data else None
document_data = data["documents"] if "documents" in data else None
event_data = data["events"] if "events" in data else None

if not data:
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT)

response = {}

try:
validate_entity_relationships(data)

if collection_data:
response["collections"] = save_collections(
collection_data, corpus_import_id, db
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "admin_backend"
version = "2.15.3"
version = "2.16.0"
description = ""
authors = ["CPR-dev-team <[email protected]>"]
packages = [{ include = "app" }, { include = "tests" }]
Expand Down
97 changes: 91 additions & 6 deletions tests/integration_tests/ingest/test_ingest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import io
import json
import os

from db_client.models.dfce import FamilyEvent
Expand Down Expand Up @@ -58,14 +60,14 @@ def test_ingest_when_ok(data_db: Session, client: TestClient, user_header_token)
for fam in saved_families:
assert fam.import_id in expected_family_import_ids

saved_events = (
saved_documents = (
data_db.query(FamilyDocument)
.filter(FamilyDocument.import_id.in_(expected_document_import_ids))
.all()
)

assert len(saved_events) == 2
for doc in saved_events:
assert len(saved_documents) == 2
for doc in saved_documents:
assert doc.import_id in expected_document_import_ids
assert doc.family_import_id in expected_family_import_ids

Expand All @@ -76,9 +78,9 @@ def test_ingest_when_ok(data_db: Session, client: TestClient, user_header_token)
)

assert len(saved_events) == 2
for doc in saved_events:
assert doc.import_id in expected_event_import_ids
assert doc.family_import_id in expected_family_import_ids
for ev in saved_events:
assert ev.import_id in expected_event_import_ids
assert ev.family_import_id in expected_family_import_ids


def test_ingest_rollback(
Expand Down Expand Up @@ -108,6 +110,89 @@ def test_ingest_rollback(
assert actual_collection is None


def test_ingest_idempotency(data_db: Session, client: TestClient, user_header_token):
family_import_id = "test.new.family.0"
event_import_id = "test.new.event.0"
collection_import_id = "test.new.collection.0"
test_data = {
"collections": [
{
"import_id": collection_import_id,
"title": "Test title",
"description": "Test description",
},
],
"families": [
{
"import_id": family_import_id,
"title": "Test",
"summary": "Test",
"geographies": ["South Asia"],
"category": "UNFCCC",
"metadata": {"author_type": ["Non-Party"], "author": ["Test"]},
"collections": [collection_import_id],
}
],
"documents": [
{
"import_id": f"test.new.document.{i}",
"family_import_id": family_import_id,
"metadata": {"role": ["MAIN"], "type": ["Law"]},
"variant_name": "Original Language",
"title": f"Document{i}",
"user_language_name": "",
}
for i in range(1001)
],
"events": [
{
"import_id": event_import_id,
"family_import_id": family_import_id,
"event_title": "Test",
"date": "2024-01-01",
"event_type_value": "Amended",
}
],
}

test_json = json.dumps(test_data).encode("utf-8")
test_data_file = io.BytesIO(test_json)

response = client.post(
"/api/v1/ingest/UNFCCC.corpus.i00000001.n0000",
files={"new_data": test_data_file},
headers=user_header_token,
)

assert (
not data_db.query(FamilyDocument)
.filter(FamilyDocument.import_id == "test.new.document.1000")
.one_or_none()
)
assert [family_import_id] == response.json()["families"]
assert [event_import_id] == response.json()["events"]
assert [collection_import_id] == response.json()["collections"]
assert "test.new.document.1000" not in response.json()["documents"]

response = client.post(
"/api/v1/ingest/UNFCCC.corpus.i00000001.n0000",
files={"new_data": test_data_file},
headers=user_header_token,
)

assert (
"test.new.document.1000"
== data_db.query(FamilyDocument)
.filter(FamilyDocument.import_id == "test.new.document.1000")
.one_or_none()
.import_id
)
assert not response.json()["families"]
assert not response.json()["events"]
assert not response.json()["collections"]
assert ["test.new.document.1000"] == response.json()["documents"]


def test_ingest_when_corpus_import_id_invalid(
data_db: Session, client: TestClient, user_header_token
):
Expand Down
2 changes: 1 addition & 1 deletion tests/mocks/repos/collection_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def mock_create(_, data: CollectionReadDTO, __) -> str:
maybe_throw()
if collection_repo.return_empty:
raise exc.NoResultFound()
return "test.new.collection.0"
return data.import_id if data.import_id else "test.new.collection.0"

def mock_delete(_, import_id: str) -> bool:
maybe_throw()
Expand Down
2 changes: 1 addition & 1 deletion tests/mocks/repos/document_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def mock_create(_, data: DocumentCreateDTO) -> str:
maybe_throw()
if document_repo.return_empty:
raise exc.NoResultFound()
return "test.new.doc.0"
return data.import_id if data.import_id else "test.new.doc.0"

def mock_delete(_, import_id: str) -> bool:
maybe_throw()
Expand Down
4 changes: 3 additions & 1 deletion tests/mocks/repos/family_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def update(db: Session, import_id: str, family: FamilyWriteDTO, geo_id: int) ->

def create(db: Session, family: FamilyCreateDTO, geo_id: int, org_id: int) -> str:
_maybe_throw()
return "" if family_repo.return_empty else "created"
if family_repo.return_empty:
return ""
return family.import_id if family.import_id else "created"


def delete(db: Session, import_id: str) -> bool:
Expand Down
14 changes: 10 additions & 4 deletions tests/unit_tests/routers/ingest/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io
import json
import os
from unittest.mock import Mock, patch

from fastapi import status
from fastapi.testclient import TestClient
Expand All @@ -19,6 +20,7 @@ def test_ingest_when_not_authenticated(client: TestClient):
assert response.status_code == status.HTTP_401_UNAUTHORIZED


@patch("app.service.ingest._exists_in_db", Mock(return_value=False))
def test_ingest_data_when_ok(
client: TestClient,
user_header_token,
Expand All @@ -45,9 +47,9 @@ def test_ingest_data_when_ok(

assert response.status_code == status.HTTP_201_CREATED
assert response.json() == {
"collections": ["test.new.collection.0", "test.new.collection.0"],
"families": ["created", "created"],
"documents": ["test.new.doc.0", "test.new.doc.0"],
"collections": ["test.new.collection.0", "test.new.collection.1"],
"families": ["test.new.family.0", "test.new.family.1"],
"documents": ["test.new.document.0", "test.new.document.1"],
}


Expand Down Expand Up @@ -95,8 +97,12 @@ def test_ingest_when_no_data(
assert response.status_code == status.HTTP_204_NO_CONTENT


@patch("app.service.ingest._exists_in_db", Mock(return_value=False))
def test_ingest_data_when_db_error(
client: TestClient, user_header_token, corpus_repo_mock, collection_repo_mock
client: TestClient,
user_header_token,
corpus_repo_mock,
collection_repo_mock,
):
collection_repo_mock.throw_repository_error = True

Expand Down
Loading

0 comments on commit b721f56

Please sign in to comment.