Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

131 look into setting time limit on background validation task #159

Merged
47 changes: 47 additions & 0 deletions db_revisions/versions/0040045eae14_add_validation_expired_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Add VALIDATION_EXPIRED state

Revision ID: 0040045eae14
Revises: fb46d55283d6
Create Date: 2024-04-11 13:08:20.850470

"""
from typing import Sequence, Union

from alembic import op, context


# revision identifiers, used by Alembic.
revision: str = "0040045eae14"
down_revision: Union[str, None] = "ccc50ec18a7e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

# fmt: off
old_options = (
'SUBMISSION_ACCEPTED',
'SUBMISSION_STARTED',
'SUBMISSION_UPLOADED',
'SUBMISSION_UPLOAD_MALFORMED',
'VALIDATION_IN_PROGRESS',
'VALIDATION_WITH_ERRORS',
'VALIDATION_WITH_WARNINGS',
'VALIDATION_SUCCESSFUL',
)
new_options = tuple(sorted(old_options + ('VALIDATION_EXPIRED','UPLOAD_FAILED')))
# fmt: on


def upgrade() -> None:
if "sqlite" not in context.get_context().dialect.name:
op.execute("ALTER TYPE submissionstate RENAME TO submissionstate_old")
op.execute(f"CREATE TYPE submissionstate AS ENUM{new_options}")
op.execute("ALTER TABLE submission ALTER COLUMN state TYPE submissionstate USING state::text::submissionstate")
op.execute("DROP TYPE submissionstate_old")


def downgrade() -> None:
if "sqlite" not in context.get_context().dialect.name:
op.execute("ALTER TYPE submissionstate RENAME TO submissionstate_old")
op.execute(f"CREATE TYPE submissionstate AS ENUM{old_options}")
op.execute("ALTER TABLE submission ALTER COLUMN state TYPE submissionstate USING state::text::submissionstate")
op.execute("DROP TYPE submissionstate_old")
425 changes: 238 additions & 187 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async-lru = "^2.0.4"
fsspec = "^2024.2.0"
s3fs = "^2024.2.0"
httpx = "^0.26.0"
fastapi-utilities = "^0.2.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, one more, 😂

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and probably need to update the lock file as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curses! removed


[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
Expand Down
4 changes: 3 additions & 1 deletion src/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ JWT_OPTS_VERIFY_ISS="false"
FS_UPLOAD_CONFIG__PROTOCOL="file"
FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one isn't needed anymore, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope thank you, removing

EXPIRED_SUBMISSION_CHECK_SECS=60
EXPIRED_SUBMISSION_DIFF_SECS=300
4 changes: 3 additions & 1 deletion src/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ FS_UPLOAD_CONFIG__PROTOCOL="file"
FS_UPLOAD_CONFIG__ROOT="../upload"
FS_UPLOAD_CONFIG__MKDIR=true
FS_DOWNLOAD_CONFIG__PROTOCOL="file"
USER_FI_API_URL=http://localhost:8881/v1/institutions/
USER_FI_API_URL=http://localhost:8881/v1/institutions/
EXPIRED_SUBMISSION_CHECK_SECS=60
EXPIRED_SUBMISSION_DIFF_SECS=300
2 changes: 2 additions & 0 deletions src/sbl_filing_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class Settings(BaseSettings):
submission_file_extension: str = "csv"
submission_file_size: int = 2 * (1024**3)

expired_submission_check_secs: int = 300

user_fi_api_url: str = "http://sbl-project-user_fi-1:8888/v1/institutions/"

def __init__(self, **data):
Expand Down
10 changes: 6 additions & 4 deletions src/sbl_filing_api/entities/models/model_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@


class SubmissionState(str, Enum):
SUBMISSION_ACCEPTED = "SUBMISSION_ACCEPTED"
SUBMISSION_STARTED = "SUBMISSION_STARTED"
SUBMISSION_UPLOAD_MALFORMED = "SUBMISSION_UPLOAD_MALFORMED"
SUBMISSION_UPLOADED = "SUBMISSION_UPLOADED"
UPLOAD_FAILED = "UPLOAD_FAILED"
VALIDATION_EXPIRED = "VALIDATION_EXPIRED"
VALIDATION_IN_PROGRESS = "VALIDATION_IN_PROGRESS"
VALIDATION_SUCCESSFUL = "VALIDATION_SUCCESSFUL"
VALIDATION_WITH_ERRORS = "VALIDATION_WITH_ERRORS"
VALIDATION_WITH_WARNINGS = "VALIDATION_WITH_WARNINGS"
VALIDATION_SUCCESSFUL = "VALIDATION_SUCCESSFUL"
SUBMISSION_ACCEPTED = "SUBMISSION_ACCEPTED"
SUBMISSION_STARTED = "SUBMISSION_STARTED"
SUBMISSION_UPLOAD_MALFORMED = "SUBMISSION_UPLOAD_MALFORMED"


class FilingTaskState(str, Enum):
Expand Down
2 changes: 1 addition & 1 deletion src/sbl_filing_api/entities/repos/submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from sbl_filing_api.entities.models.dao import (
SubmissionDAO,
SubmissionState,
FilingPeriodDAO,
FilingDAO,
FilingTaskDAO,
Expand All @@ -25,6 +24,7 @@
SubmitterDAO,
)
from sbl_filing_api.entities.models.dto import FilingPeriodDTO, FilingDTO, ContactInfoDTO
from sbl_filing_api.entities.models.model_enums import SubmissionState

logger = logging.getLogger(__name__)

Expand Down
45 changes: 29 additions & 16 deletions src/sbl_filing_api/routers/filing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

from fastapi import Depends, Request, UploadFile, BackgroundTasks, status, HTTPException
from fastapi.responses import JSONResponse, FileResponse
from regtech_api_commons.api.router_wrapper import Router
Expand Down Expand Up @@ -30,6 +32,7 @@ async def set_db(request: Request, session: Annotated[AsyncSession, Depends(get_


router = Router(dependencies=[Depends(set_db), Depends(verify_user_lei_relation)])
logger = logging.getLogger(__name__)


@router.get("/periods", response_model=List[FilingPeriodDTO])
Expand Down Expand Up @@ -109,22 +112,32 @@ async def upload_file(
)

submission = await repo.add_submission(request.state.db_session, filing.id, file.filename)
submitter = await repo.add_submitter(
request.state.db_session,
submission_id=submission.id,
submitter=request.user.id,
submitter_name=request.user.name,
submitter_email=request.user.email,
)
submission.submitter = submitter
submission = await repo.update_submission(submission)
await submission_processor.upload_to_storage(period_code, lei, submission.id, content, file.filename.split(".")[-1])

submission.state = SubmissionState.SUBMISSION_UPLOADED
submission = await repo.update_submission(submission)
background_tasks.add_task(
submission_processor.validate_and_update_submission, period_code, lei, submission, content
)
try:
submitter = await repo.add_submitter(
request.state.db_session,
submission_id=submission.id,
submitter=request.user.id,
submitter_name=request.user.name,
submitter_email=request.user.email,
)
submission.submitter = submitter
submission = await repo.update_submission(submission)
await submission_processor.upload_to_storage(
period_code, lei, submission.id, content, file.filename.split(".")[-1]
)

submission.state = SubmissionState.SUBMISSION_UPLOADED
submission = await repo.update_submission(submission)
except Exception as e:
logger.error(f"Error while trying to process Submission {submission.id}", e, exec_info=True, stack_info=True)
submission.state = SubmissionState.UPLOAD_FAILED
submission = await repo.update_submission(submission)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=f"{e}",
)

background_tasks.add_task(submission_processor.validation_monitor, period_code, lei, submission, content)

return submission

Expand Down
19 changes: 18 additions & 1 deletion src/sbl_filing_api/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import asyncio

from io import BytesIO
from fastapi import UploadFile
Expand All @@ -20,6 +21,23 @@
REPORT_QUALIFIER = "_report"


async def validation_monitor(period_code: str, lei: str, submission: SubmissionDAO, content: bytes):
try:
await asyncio.wait_for(
validate_and_update_submission(period_code, lei, submission, content),
timeout=settings.expired_submission_check_secs,
)
except asyncio.TimeoutError as te:
log.warn(
f"Validation for submission {submission.id} did not complete within the expected timeframe, will be set to VALIDATION_EXPIRED.",
te,
exc_info=True,
stack_info=True,
)
submission.state = SubmissionState.VALIDATION_EXPIRED
update_submission(submission)


def validate_file_processable(file: UploadFile) -> None:
extension = file.filename.split(".")[-1].lower()
if file.content_type != settings.submission_file_type or extension != settings.submission_file_extension:
Expand Down Expand Up @@ -94,4 +112,3 @@ async def validate_and_update_submission(period_code: str, lei: str, submission:
log.error("The file is malformed", re, exc_info=True, stack_info=True)
submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED
await update_submission(submission)
raise HTTPException(status_code=HTTPStatus.UNPROCESSABLE_ENTITY, detail=re)
71 changes: 68 additions & 3 deletions tests/api/routers/test_filing_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,7 @@ def test_authed_upload_file(
mock_validate_file.return_value = None
mock_upload = mocker.patch("sbl_filing_api.services.submission_processor.upload_to_storage")
mock_upload.return_value = None
mock_validate_submission = mocker.patch(
"sbl_filing_api.services.submission_processor.validate_and_update_submission"
)
mock_validate_submission = mocker.patch("sbl_filing_api.services.submission_processor.validation_monitor")
mock_validate_submission.return_value = None
async_mock = AsyncMock(return_value=return_sub)
mock_add_submission = mocker.patch(
Expand All @@ -240,6 +238,7 @@ def test_authed_upload_file(

res = client.post("/v1/filing/institutions/1234567890/filings/2024/submissions", files=files)
mock_add_submission.assert_called_with(ANY, 1, "submission.csv")
mock_validate_submission.assert_called_with("2024", "1234567890", return_sub, open(submission_csv, "rb").read())
assert mock_update_submission.call_args.args[0].state == SubmissionState.SUBMISSION_UPLOADED
assert res.status_code == 200
assert res.json()["id"] == 1
Expand Down Expand Up @@ -280,6 +279,72 @@ def test_upload_file_invalid_size(
res = client.post("/v1/filing/institutions/1234567890/filings/2024/submissions", files=files)
assert res.status_code == HTTPStatus.REQUEST_ENTITY_TOO_LARGE

def test_submission_update_fail(
self,
mocker: MockerFixture,
app_fixture: FastAPI,
authed_user_mock: Mock,
submission_csv: str,
get_filing_mock: Mock,
):
return_sub = SubmissionDAO(
id=1,
filing=1,
state=SubmissionState.SUBMISSION_UPLOADED,
filename="submission.csv",
)

mock_logger = mocker.patch("sbl_filing_api.routers.filing.logger")

mock_validate_file = mocker.patch("sbl_filing_api.services.submission_processor.validate_file_processable")
mock_validate_file.return_value = None

async_mock = AsyncMock(return_value=return_sub)
mocker.patch("sbl_filing_api.entities.repos.submission_repo.add_submission", side_effect=async_mock)

mock_upload = mocker.patch("sbl_filing_api.services.submission_processor.upload_to_storage")
mock_upload.return_value = None
mock_validate_submission = mocker.patch("sbl_filing_api.services.submission_processor.validation_monitor")
mock_validate_submission.return_value = None

mock_update_submission = mocker.patch(
"sbl_filing_api.entities.repos.submission_repo.update_submission", side_effect=async_mock
)

mock_add_submitter = mocker.patch("sbl_filing_api.entities.repos.submission_repo.add_submitter")
mock_add_submitter.side_effect = RuntimeError("Failed to add submitter.")

file = {"file": ("submission.csv", open(submission_csv, "rb"))}
client = TestClient(app_fixture)

res = client.post("/v1/filing/institutions/1234567890/filings/2024/submissions", files=file)
mock_logger.mock_calls[0].error.assert_called_with(
"Error while trying to process Submission 1", RuntimeError, exec_info=True, stack_info=True
)
assert mock_update_submission.call_args.args[0].state == SubmissionState.UPLOAD_FAILED
assert res.status_code == 500
assert res.json() == "Failed to add submitter."

mock_add_submitter.side_effect = None
mock_add_submitter.return_value = SubmitterDAO(
id=1,
submission=1,
submitter="123456-7890-ABCDEF-GHIJ",
submitter_name="test",
submitter_email="[email protected]",
)

mock_upload.side_effect = HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Failed to upload file"
)
res = client.post("/v1/filing/institutions/1234567890/filings/2024/submissions", files=file)
mock_logger.mock_calls[0].error.assert_called_with(
"Error while trying to process Submission 1", RuntimeError, exec_info=True, stack_info=True
)
assert mock_update_submission.call_args.args[0].state == SubmissionState.UPLOAD_FAILED
assert res.status_code == 500
assert res.json() == "500: Failed to upload file"

async def test_unauthed_patch_filing(self, app_fixture: FastAPI):
client = TestClient(app_fixture)

Expand Down
25 changes: 17 additions & 8 deletions tests/entities/repos/test_submission_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def setup(
filing=2,
state=SubmissionState.SUBMISSION_UPLOADED,
validation_ruleset_version="v1",
submission_time=(dt.now() - datetime.timedelta(seconds=1000)),
submission_time=(dt.now() - datetime.timedelta(seconds=200)),
filename="file2.csv",
)
submission3 = SubmissionDAO(
Expand All @@ -106,10 +106,19 @@ async def setup(
submission_time=dt.now(),
filename="file3.csv",
)
submission4 = SubmissionDAO(
id=4,
filing=1,
state=SubmissionState.SUBMISSION_UPLOADED,
validation_ruleset_version="v1",
submission_time=(dt.now() - datetime.timedelta(seconds=400)),
filename="file4.csv",
)

transaction_session.add(submission1)
transaction_session.add(submission2)
transaction_session.add(submission3)
transaction_session.add(submission4)

contact_info1 = ContactInfoDAO(
id=1,
Expand Down Expand Up @@ -330,8 +339,8 @@ async def test_get_submission(self, query_session: AsyncSession):

async def test_get_submissions(self, query_session: AsyncSession):
res = await repo.get_submissions(query_session)
assert len(res) == 3
assert {1, 2, 3} == set([s.id for s in res])
assert len(res) == 4
assert {1, 2, 3, 4} == set([s.id for s in res])
assert res[1].filing == 2
assert res[2].state == SubmissionState.SUBMISSION_UPLOADED

Expand All @@ -351,7 +360,7 @@ async def test_add_submission(self, transaction_session: AsyncSession):
filing_id=1,
filename="file1.csv",
)
assert res.id == 4
assert res.id == 5
assert res.filing == 1
assert res.state == SubmissionState.SUBMISSION_STARTED

Expand All @@ -368,9 +377,9 @@ async def test_update_submission(self, session_generator: async_scoped_session):

async def query_updated_dao():
async with session_generator() as search_session:
stmt = select(SubmissionDAO).filter(SubmissionDAO.id == 4)
stmt = select(SubmissionDAO).filter(SubmissionDAO.id == 5)
new_res1 = await search_session.scalar(stmt)
assert new_res1.id == 4
assert new_res1.id == 5
assert new_res1.filing == 1
assert new_res1.state == SubmissionState.VALIDATION_IN_PROGRESS

Expand All @@ -385,9 +394,9 @@ async def query_updated_dao():

async def query_updated_dao():
async with session_generator() as search_session:
stmt = select(SubmissionDAO).filter(SubmissionDAO.id == 4)
stmt = select(SubmissionDAO).filter(SubmissionDAO.id == 5)
new_res2 = await search_session.scalar(stmt)
assert new_res2.id == 4
assert new_res2.id == 5
assert new_res2.filing == 1
assert new_res2.state == SubmissionState.VALIDATION_WITH_ERRORS
assert new_res2.validation_json == validation_json
Expand Down
4 changes: 4 additions & 0 deletions tests/migrations/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,7 @@ def test_migration_to_2e81179924b5(alembic_runner: MigrationContext, alembic_eng

def test_migration_to_ccc50ec18a7e(alembic_runner: MigrationContext, alembic_engine: Engine):
alembic_runner.migrate_up_to("ccc50ec18a7e")


def test_migration_to_0040045eae14(alembic_runner: MigrationContext, alembic_engine: Engine):
alembic_runner.migrate_up_to("0040045eae14")
Loading
Loading