Skip to content

Commit

Permalink
Feature/pdct 1345 save ingest file to s3 (#227)
Browse files Browse the repository at this point in the history
* WIP

* Add basic function to upload a json file to S3

* Add exception handling + refactor

* Save json file to s3 on ingest + test refactor

* Extract s3 bucket name to env variable

* Fix unit tests

* Fix integration test

* Update s3 client fixture in unit test conftest

* Remove duplicate tests

* Add ingest action uuid to the name of the file when saving to s3

* WIP

* Save ingest result to s3

* Consolidate duplicated testsand change ingest endpoint success response code to 202

* Rename

* Bump minor version

* Bump db-client to latest

* Update poetry lock file

* Remove unused logger

* Add note to readme re integration tests using mocks and rename variable

* Update idempotency test to include checking the document status

* Save ingest request and result files to project root rather than s3 when running locally

* Fix test

* Try setting env variable in test
  • Loading branch information
annaCPR authored Oct 10, 2024
1 parent ee27859 commit 8f410be
Show file tree
Hide file tree
Showing 19 changed files with 873 additions and 467 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ ADMIN_POSTGRES_HOST=admin_backend_db
# API
SECRET_KEY=secret_test_key
API_HOST=http://navigator-admin-backend:8888

# AWS
INGEST_JSON_BUCKET=test_bucket
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,9 @@ See [the linear project](https://linear.app/climate-policy-radar/project/admin-i

If you are new to the repository, please ensure you read the [getting starting guide](GETTING_STARTED.md)
and the [design guide](DESIGN.md).

## Testing

Currently the integration tests around the ingest endpoint use the AWS S3 mock
as we do not have our own implementation.
Ticket to fix that: [PDCT-1570](https://linear.app/climate-policy-radar/issue/PDCT-1570/add-aws-s3-docker-image-for-integration-tests)
2 changes: 1 addition & 1 deletion app/api/api_v1/routers/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def _validate_ingest_data(data: dict[str, Any]) -> None:
@r.post(
"/ingest/{corpus_import_id}",
response_model=Json,
status_code=status.HTTP_201_CREATED,
status_code=status.HTTP_202_ACCEPTED,
)
async def ingest(
new_data: UploadFile, corpus_import_id: str, background_tasks: BackgroundTasks
Expand Down
68 changes: 67 additions & 1 deletion app/clients/aws/s3bucket.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import json
import logging
import os
import re
from datetime import datetime
from typing import Any
from urllib.parse import quote_plus, urlsplit

from botocore.exceptions import ClientError
from pydantic import BaseModel

from app.clients.aws.client import AWSClient
from app.clients.aws.client import AWSClient, get_s3_client
from app.errors import RepositoryError

_LOGGER = logging.getLogger(__name__)


class S3UploadContext(BaseModel):
bucket_name: str
object_name: str


def _encode_characters_in_path(s3_path: str) -> str:
"""
Expand Down Expand Up @@ -65,4 +78,57 @@ def get_s3_url(region: str, bucket: str, key: str) -> str:
return f"https://{bucket}.s3.{region}.amazonaws.com/{key}"


def upload_json_to_s3(
s3_client: AWSClient, context: S3UploadContext, json_data: dict[str, Any]
) -> None:
"""
Upload a JSON file to S3
:param S3UploadContext context: The context of the upload.
:param dict[str, Any] json_data: The json data to be uploaded to S3.
:raises Exception: on any error when uploading the file to S3.
"""
try:
s3_client.put_object(
Bucket=context.bucket_name,
Key=context.object_name,
Body=json.dumps(json_data),
ContentType="application/json",
)
_LOGGER.info(
f"🎉 Successfully uploaded JSON to S3: {context.bucket_name}/{context.object_name}"
)
except Exception as e:
_LOGGER.error(f"💥 Failed to upload JSON to S3:{e}]")
raise


def upload_ingest_json_to_s3(
ingest_id: str, corpus_import_id: str, data: dict[str, Any]
) -> None:
"""
Upload an ingest JSON file to S3
:param str ingest_id: The uuid of the ingest action.
:param str corpus_import_id: The import_id of the corpus the ingest data belongs to.
:param dict[str, Any] json_data: The ingest json data to be uploaded to S3.
"""
ingest_upload_bucket = os.environ["INGEST_JSON_BUCKET"]
current_timestamp = datetime.now().strftime("%m-%d-%YT%H:%M:%S")
filename = f"{ingest_id}-{corpus_import_id}-{current_timestamp}.json"

if ingest_upload_bucket == "skip":
with open(filename, "w") as f:
json.dump(data, f, indent=4)
return

s3_client = get_s3_client()

context = S3UploadContext(
bucket_name=ingest_upload_bucket,
object_name=filename,
)
upload_json_to_s3(s3_client, context, data)


# TODO: add more s3 functions like listing and reading files here
19 changes: 12 additions & 7 deletions app/service/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
from enum import Enum
from typing import Any, Optional, Type, TypeVar
from uuid import uuid4

from db_client.models.dfce.collection import Collection
from db_client.models.dfce.family import Family, FamilyDocument, FamilyEvent
Expand All @@ -23,6 +24,7 @@
import app.service.corpus as corpus
import app.service.geography as geography
import app.service.validation as validation
from app.clients.aws.s3bucket import upload_ingest_json_to_s3
from app.model.ingest import (
IngestCollectionDTO,
IngestDocumentDTO,
Expand Down Expand Up @@ -206,7 +208,7 @@ def save_events(
dto = IngestEventDTO(**event).to_event_create_dto()
import_id = event_repository.create(db, dto)
event_import_ids.append(import_id)
total_events_saved += 0
total_events_saved += 1

_LOGGER.info(f"Saved {total_events_saved} events")
return event_import_ids
Expand All @@ -222,6 +224,8 @@ def import_data(data: dict[str, Any], corpus_import_id: str) -> None:
:raises RepositoryError: raised on a database error.
:raises ValidationError: raised should the data be invalid.
"""
ingest_uuid = uuid4()
upload_ingest_json_to_s3(f"{ingest_uuid}-request", corpus_import_id, data)

_LOGGER.info("Getting DB session")

Expand All @@ -232,29 +236,30 @@ def import_data(data: dict[str, Any], corpus_import_id: str) -> None:
document_data = data["documents"] if "documents" in data else None
event_data = data["events"] if "events" in data else None

response = {}
result = {}

try:
if collection_data:
_LOGGER.info("Saving collections")
response["collections"] = save_collections(
result["collections"] = save_collections(
collection_data, corpus_import_id, db
)
if family_data:
_LOGGER.info("Saving families")
response["families"] = save_families(family_data, corpus_import_id, db)
result["families"] = save_families(family_data, corpus_import_id, db)
if document_data:
_LOGGER.info("Saving documents")
response["documents"] = save_documents(document_data, corpus_import_id, db)
result["documents"] = save_documents(document_data, corpus_import_id, db)
if event_data:
_LOGGER.info("Saving events")
response["events"] = save_events(event_data, corpus_import_id, db)
result["events"] = save_events(event_data, corpus_import_id, db)

_LOGGER.info(
f"Bulk import for corpus: {corpus_import_id} successfully completed"
)

# save response to S3 as part of PDCT-1345
upload_ingest_json_to_s3(f"{ingest_uuid}-result", corpus_import_id, result)

except Exception as e:
_LOGGER.error(
f"Rolling back transaction due to the following error: {e}", exc_info=True
Expand Down
Loading

0 comments on commit 8f410be

Please sign in to comment.