Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use boto3, and stream file content without holding in memory #198

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
531 changes: 14 additions & 517 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ python-multipart = "^0.0.7"
boto3 = "^1.33.12"
alembic = "^1.12.0"
async-lru = "^2.0.4"
fsspec = "^2024.2.0"
s3fs = "^2024.2.0"
httpx = "^0.26.0"

[tool.poetry.group.dev.dependencies]
Expand Down
2 changes: 0 additions & 2 deletions src/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,4 @@ JWT_OPTS_VERIFY_AUD="false"
JWT_OPTS_VERIFY_ISS="false"
FS_UPLOAD_CONFIG__PROTOCOL="file"
FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
EXPIRED_SUBMISSION_CHECK_SECS=120
2 changes: 0 additions & 2 deletions src/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,5 @@ TOKEN_URL=${KC_REALM_URL}/protocol/openid-connect/token
CERTS_URL=${KC_REALM_URL}/protocol/openid-connect/certs
FS_UPLOAD_CONFIG__PROTOCOL="file"
FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
USER_FI_API_URL=http://localhost:8881/v1/institutions/
EXPIRED_SUBMISSION_CHECK_SECS=120
10 changes: 0 additions & 10 deletions src/sbl_filing_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ class FsProtocol(StrEnum):
class FsUploadConfig(BaseModel):
protocol: str = FsProtocol.FILE.value
root: str
mkdir: bool = True


class FsDownloadConfig(BaseModel):
protocol: str = FsProtocol.FILE.value
target_protocol: str = None
cache_storage: str = None
check_files: bool = True
version_aware: bool = True


class Settings(BaseSettings):
Expand All @@ -45,7 +36,6 @@ class Settings(BaseSettings):
conn: PostgresDsn | None = None

fs_upload_config: FsUploadConfig
fs_download_config: FsDownloadConfig

submission_file_type: str = "text/csv"
submission_file_extension: str = "csv"
Expand Down
20 changes: 14 additions & 6 deletions src/sbl_filing_api/routers/filing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from concurrent.futures import ProcessPoolExecutor
from fastapi import Depends, Request, UploadFile, BackgroundTasks, status
from fastapi.responses import Response, JSONResponse, FileResponse
from fastapi.responses import Response, JSONResponse, StreamingResponse
from multiprocessing import Manager
from regtech_api_commons.api.router_wrapper import Router
from regtech_api_commons.api.exceptions import RegTechHttpException
Expand Down Expand Up @@ -166,7 +166,7 @@ async def upload_file(
)
submission = await repo.add_submission(request.state.db_session, filing.id, file.filename, submitter.id)
try:
await submission_processor.upload_to_storage(
submission_processor.upload_to_storage(
period_code, lei, submission.id, content, file.filename.split(".")[-1]
)

Expand Down Expand Up @@ -319,10 +319,14 @@ async def put_contact_info(request: Request, lei: str, period_code: str, contact
async def get_latest_submission_report(request: Request, lei: str, period_code: str):
latest_sub = await repo.get_latest_submission(request.state.db_session, lei, period_code)
if latest_sub:
file_data = await submission_processor.get_from_storage(
file_data = submission_processor.get_from_storage(
period_code, lei, str(latest_sub.id) + submission_processor.REPORT_QUALIFIER
)
return FileResponse(path=file_data, media_type="text/csv", filename=f"{latest_sub.id}_validation_report.csv")
return StreamingResponse(
content=file_data,
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{latest_sub.id}_validation_report.csv"'},
)
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT, content=None)


Expand All @@ -334,8 +338,12 @@ async def get_latest_submission_report(request: Request, lei: str, period_code:
async def get_submission_report(request: Request, response: Response, lei: str, period_code: str, id: int):
sub = await repo.get_submission(request.state.db_session, id)
if sub:
file_data = await submission_processor.get_from_storage(
file_data = submission_processor.get_from_storage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be decoding to UTF-8 here?

period_code, lei, str(sub.id) + submission_processor.REPORT_QUALIFIER
)
return FileResponse(path=file_data, media_type="text/csv", filename=f"{sub.id}_validation_report.csv")
return StreamingResponse(
content=file_data,
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{sub.id}_validation_report.csv"'},
)
response.status_code = status.HTTP_404_NOT_FOUND
40 changes: 40 additions & 0 deletions src/sbl_filing_api/services/file_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging
from typing import Generator
import boto3
from pathlib import Path
from sbl_filing_api.config import FsProtocol, settings

log = logging.getLogger(__name__)


def upload(path: str, content: bytes) -> None:
if settings.fs_upload_config.protocol == FsProtocol.FILE:
file = Path(f"{settings.fs_upload_config.root}/{path}")
file.parent.mkdir(parents=True, exist_ok=True)
file.write_bytes(content)
else:
s3 = boto3.client("s3")
r = s3.put_object(
Bucket=settings.fs_upload_config.root,
Key=path,
Body=content,
)
log.debug(
"s3 upload response for key: %s, period: %s file: %s, response: %s",
path,
r,
)


def download(path: str) -> Generator:
if settings.fs_upload_config.protocol == FsProtocol.FILE:
with open(f"{settings.fs_upload_config.root}/{path}") as f:
yield from f
else:
s3 = boto3.client("s3")
r = s3.get_object(
Bucket=settings.fs_upload_config.root,
Key=path,
)
with r["Body"] as f:
yield from f
39 changes: 8 additions & 31 deletions src/sbl_filing_api/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from typing import Generator
import pandas as pd
import importlib.metadata as imeta
import logging
Expand All @@ -12,10 +13,9 @@
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState
from sbl_filing_api.entities.repos.submission_repo import update_submission
from http import HTTPStatus
from fsspec import AbstractFileSystem, filesystem
from sbl_filing_api.config import FsProtocol, settings
from sbl_filing_api.config import settings
from sbl_filing_api.services import file_handler
from regtech_api_commons.api.exceptions import RegTechHttpException
import boto3

log = logging.getLogger(__name__)

Expand All @@ -41,41 +41,18 @@ def validate_file_processable(file: UploadFile) -> None:
)


async def upload_to_storage(period_code: str, lei: str, file_identifier: str, content: bytes, extension: str = "csv"):
def upload_to_storage(period_code: str, lei: str, file_identifier: str, content: bytes, extension: str = "csv") -> None:
try:
if settings.fs_upload_config.protocol == FsProtocol.FILE:
fs: AbstractFileSystem = filesystem(settings.fs_upload_config.protocol)
fs.mkdirs(f"{settings.fs_upload_config.root}/upload/{period_code}/{lei}", exist_ok=True)
with fs.open(
f"{settings.fs_upload_config.root}/upload/{period_code}/{lei}/{file_identifier}.{extension}", "wb"
) as f:
f.write(content)
else:
s3 = boto3.client("s3")
r = s3.put_object(
Bucket=settings.fs_upload_config.root,
Key=f"upload/{period_code}/{lei}/{file_identifier}.{extension}",
Body=content,
)
log.debug(
"s3 upload response for lei: %s, period: %s file: %s, response: %s",
lei,
period_code,
file_identifier,
r,
)
file_handler.upload(path=f"upload/{period_code}/{lei}/{file_identifier}.{extension}", content=content)
except Exception as e:
raise RegTechHttpException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, name="Upload Failure", detail="Failed to upload file"
) from e


async def get_from_storage(period_code: str, lei: str, file_identifier: str, extension: str = "csv"):
def get_from_storage(period_code: str, lei: str, file_identifier: str, extension: str = "csv") -> Generator:
try:
fs: AbstractFileSystem = filesystem(**settings.fs_download_config.__dict__)
file_path = f"{settings.fs_upload_config.root}/upload/{period_code}/{lei}/{file_identifier}.{extension}"
with fs.open(file_path, "r") as f:
return f.name
return file_handler.download(f"upload/{period_code}/{lei}/{file_identifier}.{extension}")
except Exception as e:
raise RegTechHttpException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, name="Download Failure", detail="Failed to read file."
Expand Down Expand Up @@ -109,7 +86,7 @@ async def validate_and_update_submission(

submission.validation_results = build_validation_results(result)
submission_report = df_to_download(result[1])
await upload_to_storage(
upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)

Expand Down
19 changes: 4 additions & 15 deletions tests/api/routers/test_filing_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from sbl_filing_api.services.multithread_handler import handle_submission, check_future

from sqlalchemy.exc import IntegrityError
from tempfile import NamedTemporaryFile
from sbl_filing_api.config import regex_configs


Expand Down Expand Up @@ -976,12 +975,9 @@ async def test_get_latest_sub_report(self, mocker: MockerFixture, app_fixture: F
filename="file1.csv",
)

file_content = b"Test"
temp_file = NamedTemporaryFile(delete=False, suffix=".csv")
temp_file.write(file_content)
temp_file.close()
file_content = "Test"
file_mock = mocker.patch("sbl_filing_api.services.submission_processor.get_from_storage")
file_mock.return_value = temp_file.name
file_mock.return_value = [c for c in file_content]

client = TestClient(app_fixture)
res = client.get("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions/latest/report")
Expand All @@ -998,8 +994,6 @@ async def test_get_latest_sub_report(self, mocker: MockerFixture, app_fixture: F
sub_mock.assert_called_with(ANY, "1234567890ZXWVUTSR00", "2024")
assert res.status_code == 204

os.unlink(temp_file.name)

async def test_get_sub_report(self, mocker: MockerFixture, app_fixture: FastAPI, authed_user_mock: Mock):
sub_mock = mocker.patch("sbl_filing_api.entities.repos.submission_repo.get_submission")
sub_mock.return_value = SubmissionDAO(
Expand All @@ -1019,12 +1013,9 @@ async def test_get_sub_report(self, mocker: MockerFixture, app_fixture: FastAPI,
filename="file1.csv",
)

file_content = b"Test"
temp_file = NamedTemporaryFile(delete=False, suffix=".csv")
temp_file.write(file_content)
temp_file.close()
file_content = "Test"
file_mock = mocker.patch("sbl_filing_api.services.submission_processor.get_from_storage")
file_mock.return_value = temp_file.name
file_mock.return_value = [c for c in file_content]

client = TestClient(app_fixture)
res = client.get("/v1/filing/institutions/1234567890ZXWVUTSR00/filings/2024/submissions/2/report")
Expand All @@ -1041,8 +1032,6 @@ async def test_get_sub_report(self, mocker: MockerFixture, app_fixture: FastAPI,
sub_mock.assert_called_with(ANY, 1)
assert res.status_code == 404

os.unlink(temp_file.name)

def test_contact_info_invalid_email(self, mocker: MockerFixture, app_fixture: FastAPI, authed_user_mock: Mock):
client = TestClient(app_fixture)
contact_info_json = {
Expand Down
83 changes: 83 additions & 0 deletions tests/services/test_file_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from pytest_mock import MockerFixture
from unittest.mock import Mock
import io

from sbl_filing_api.config import FsProtocol, settings
import sbl_filing_api.services.file_handler as fh


def test_upload_local_fs(mocker: MockerFixture):
default_file_proto = settings.fs_upload_config.protocol
settings.fs_upload_config.protocol = FsProtocol.FILE

path_mock = mocker.patch("sbl_filing_api.services.file_handler.Path")
file_mock = Mock()
path_mock.return_value = file_mock

path = "test"
content = b"test"
fh.upload(path, b"test")
path_mock.assert_called_with(f"{settings.fs_upload_config.root}/{path}")
file_mock.parent.mkdir.assert_called_with(parents=True, exist_ok=True)
file_mock.write_bytes.assert_called_with(content)
settings.fs_upload_config.protocol = default_file_proto


def test_upload_s3(mocker: MockerFixture):
default_file_proto = settings.fs_upload_config.protocol
settings.fs_upload_config.protocol = FsProtocol.S3

boto3_mock = mocker.patch("sbl_filing_api.services.file_handler.boto3")
client_mock = Mock()
boto3_mock.client.return_value = client_mock

path = "test"
content = b"test"
fh.upload(path, b"test")

boto3_mock.client.assert_called_once_with("s3")
client_mock.put_object.assert_called_once_with(
Bucket=settings.fs_upload_config.root,
Key=path,
Body=content,
)

settings.fs_upload_config.protocol = default_file_proto


def test_download_local(mocker: MockerFixture):
default_file_proto = settings.fs_upload_config.protocol
settings.fs_upload_config.protocol = FsProtocol.FILE
path = "test"
content = "test content"
open_mock = mocker.patch("builtins.open")
res = ""
with mocker.mock_open(open_mock, read_data=content):
for chunk in fh.download(path):
res += chunk
open_mock.assert_called_once_with(f"{settings.fs_upload_config.root}/{path}")
assert res == content
settings.fs_upload_config.protocol = default_file_proto


def test_download_s3(mocker: MockerFixture):
default_file_proto = settings.fs_upload_config.protocol
settings.fs_upload_config.protocol = FsProtocol.S3
path = "test"
content = "test content"
boto3_mock = mocker.patch("sbl_filing_api.services.file_handler.boto3")
client_mock = Mock()
boto3_mock.client.return_value = client_mock
client_mock.get_object.return_value = {"Body": io.StringIO(content)}

res = ""
for chunk in fh.download(path):
res += chunk

boto3_mock.client.assert_called_once_with("s3")
client_mock.get_object.assert_called_once_with(
Bucket=settings.fs_upload_config.root,
Key=path,
)
assert res == content
settings.fs_upload_config.protocol = default_file_proto
Loading
Loading