Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

378 update submission processing to use polars data validator #394

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM ghcr.io/cfpb/regtech/sbl/python-alpine:3.12

FROM --platform=amd64 ghcr.io/cfpb/regtech/sbl/python-ubi8:3.12
ENV UVICORN_LOG_LEVEL=info

WORKDIR /usr/app
RUN mkdir upload

RUN pip install poetry

COPY --chown=sbl:sbl poetry.lock pyproject.toml alembic.ini README.md ./
Expand All @@ -19,7 +19,8 @@ RUN chmod -R 447 /usr/app/upload
WORKDIR /usr/app/src

EXPOSE 8888
#RUN groupadd --system sbl && useradd --system --create-home sbl -s /sbin/nologin -g sbl

USER sbl
#USER sbl

CMD uvicorn sbl_filing_api.main:app --host 0.0.0.0 --port 8888 --log-config log-config.yml --log-level $UVICORN_LOG_LEVEL --timeout-keep-alive 65
3,462 changes: 2,402 additions & 1,060 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ asyncpg = "^0.29.0"
regtech-api-commons = {git = "https://github.com/cfpb/regtech-api-commons.git"}
regtech-data-validator = {git = "https://github.com/cfpb/regtech-data-validator.git"}
regtech-regex = {git = "https://github.com/cfpb/regtech-regex.git"}
boto3 = "~1.34.0"
lchen-2101 marked this conversation as resolved.
Show resolved Hide resolved
python-multipart = "^0.0.12"
boto3 = "^1.35.25"
alembic = "^1.13.3"
async-lru = "^2.0.4"
ujson = "^5.10.0"
psutil = "^6.0.0"
lchen-2101 marked this conversation as resolved.
Show resolved Hide resolved

[tool.poetry.group.dev.dependencies]
pytest = "^8.3.3"
Expand Down
22 changes: 11 additions & 11 deletions src/log-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,30 @@ handlers:
class: logging.StreamHandler
stream: ext://sys.stdout
loggers:
uvicorn:
regtech_data_validator:
level: INFO
handlers:
- default
propagate: no
uvicorn.error:
level: INFO
uvicorn.access:
propagate: false
regtech_api_commons:
level: INFO
handlers:
- access
propagate: no
regtech_data_validator:
- default
propagate: false
sbl_filing_api:
level: INFO
handlers:
- default
propagate: false
regtech_api_commons:
uvicorn:
level: INFO
handlers:
- default
propagate: false
sbl_filing_api:
uvicorn.error:
level: INFO
uvicorn.access:
level: INFO
handlers:
- default
- access
propagate: false
3 changes: 2 additions & 1 deletion src/sbl_filing_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Settings(BaseSettings):
conn: PostgresDsn | None = None

fs_upload_config: FsUploadConfig

server_config: ServerConfig = ServerConfig()

submission_file_type: str = "text/csv"
Expand All @@ -61,7 +62,7 @@ class Settings(BaseSettings):

max_validation_errors: int = 1000000
max_json_records: int = 10000
max_json_group_size: int = 0
max_json_group_size: int = 200

def __init__(self, **data):
super().__init__(**data)
Expand Down
5 changes: 5 additions & 0 deletions src/sbl_filing_api/routers/filing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
import csv
import io

from concurrent.futures import ProcessPoolExecutor
from fastapi import Depends, Request, UploadFile, status
Expand Down Expand Up @@ -193,6 +195,9 @@ async def upload_file(request: Request, lei: str, period_code: str, file: Upload
)

submission.state = SubmissionState.SUBMISSION_UPLOADED
with io.BytesIO(content) as byte_stream:
reader = csv.reader(io.TextIOWrapper(byte_stream))
submission.total_records = sum(1 for row in reader) - 1
submission = await repo.update_submission(request.state.db_session, submission)
except Exception as e:
submission.state = SubmissionState.UPLOAD_FAILED
Expand Down
93 changes: 53 additions & 40 deletions src/sbl_filing_api/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
from typing import Generator
import pandas as pd
import polars as pl
import importlib.metadata as imeta
import logging

from io import BytesIO
from fastapi import UploadFile
from regtech_data_validator.create_schemas import validate_phases
from regtech_data_validator.validator import validate_batch_csv
from regtech_data_validator.data_formatters import df_to_dicts, df_to_download
from regtech_data_validator.checks import Severity
from regtech_data_validator.validation_results import ValidationResults, ValidationPhase
from regtech_data_validator.validation_results import ValidationPhase, ValidationResults
from sbl_filing_api.entities.engine.engine import SessionLocal
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState
from sbl_filing_api.entities.repos.submission_repo import update_submission
from http import HTTPStatus
from sbl_filing_api.config import settings
from sbl_filing_api.config import FsProtocol, settings
from sbl_filing_api.services import file_handler
from regtech_api_commons.api.exceptions import RegTechHttpException

Expand Down Expand Up @@ -59,6 +58,13 @@ def get_from_storage(period_code: str, lei: str, file_identifier: str, extension
) from e


def generate_file_path(period_code: str, lei: str, file_identifier: str, extension: str = "csv"):
file_path = f"{settings.fs_upload_config.root}/upload/{period_code}/{lei}/{file_identifier}.{extension}"
if settings.fs_upload_config.protocol == FsProtocol.S3.value:
file_path = "s3://" + file_path
return file_path


async def validate_and_update_submission(
period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check: dict
):
Expand All @@ -69,60 +75,67 @@ async def validate_and_update_submission(
submission.state = SubmissionState.VALIDATION_IN_PROGRESS
submission = await update_submission(session, submission)

df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)
submission.total_records = len(df)
file_path = generate_file_path(period_code, lei, submission.id)

final_phase = ValidationPhase.LOGICAL
all_findings = []
final_df = pl.DataFrame()

# Validate Phases
results = validate_phases(df, {"lei": lei}, max_errors=settings.max_validation_errors)
for validation_results in validate_batch_csv(
file_path, context={"lei": lei}, batch_size=50000, batch_count=1
):
final_phase = validation_results.phase
all_findings.append(validation_results)

if all_findings:
final_df = pl.concat([v.findings for v in all_findings], how="diagonal")

submission.validation_results = build_validation_results(results)
submission.validation_results = build_validation_results(final_df, all_findings, final_phase)

if results.findings.empty:
if final_df.is_empty():
submission.state = SubmissionState.VALIDATION_SUCCESSFUL
elif (
results.phase == ValidationPhase.SYNTACTICAL
final_phase == ValidationPhase.SYNTACTICAL
or submission.validation_results["logic_errors"]["total_count"] > 0
):
submission.state = SubmissionState.VALIDATION_WITH_ERRORS
else:
submission.state = SubmissionState.VALIDATION_WITH_WARNINGS

submission_report = df_to_download(
results.findings,
warning_count=results.warning_counts.total_count,
error_count=results.error_counts.total_count,
max_errors=settings.max_validation_errors,
)
upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)
report_path = generate_file_path(period_code, lei, f"{submission.id}_report")

df_to_download(final_df, report_path)
lchen-2101 marked this conversation as resolved.
Show resolved Hide resolved

# upload_to_storage(
# period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
# )
if not exec_check["continue"]:
log.warning(f"Submission {submission.id} is expired, will not be updating final state with results.")
return

await update_submission(session, submission)

except RuntimeError:
log.exception("The file is malformed.")
except RuntimeError as re:
log.exception("The file is malformed.", re)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't be needed, log.exception contains exc_info=True, which extracts the exception from the stack.

submission.state = SubmissionState.SUBMISSION_UPLOAD_MALFORMED
await update_submission(session, submission)

except Exception:
log.exception("Validation for submission %d did not complete due to an unexpected error.", submission.id)
except Exception as e:
log.exception("Validation for submission %d did not complete due to an unexpected error.", submission.id, e)
submission.state = SubmissionState.VALIDATION_ERROR
await update_submission(session, submission)


def build_validation_results(results: ValidationResults):
val_json = df_to_dicts(results.findings, settings.max_json_records, settings.max_json_group_size)
if results.phase == ValidationPhase.SYNTACTICAL:
def build_validation_results(final_df: pl.DataFrame, results: list[ValidationResults], final_phase: ValidationPhase):
val_json = df_to_dicts(final_df, settings.max_json_records, settings.max_json_group_size)
if final_phase == ValidationPhase.SYNTACTICAL:
syntax_error_counts = sum([r.error_counts.single_field_count for r in results])
val_res = {
"syntax_errors": {
"single_field_count": results.error_counts.single_field_count,
"multi_field_count": results.error_counts.multi_field_count, # this will always be zero for syntax errors
"register_count": results.error_counts.register_count, # this will always be zero for syntax errors
"total_count": results.error_counts.total_count,
"single_field_count": syntax_error_counts,
"multi_field_count": 0, # this will always be zero for syntax errors
"register_count": 0, # this will always be zero for syntax errors
"total_count": syntax_error_counts,
"details": val_json,
}
}
Expand All @@ -138,17 +151,17 @@ def build_validation_results(results: ValidationResults):
"details": [],
},
"logic_errors": {
"single_field_count": results.error_counts.single_field_count,
"multi_field_count": results.error_counts.multi_field_count,
"register_count": results.error_counts.register_count,
"total_count": results.error_counts.total_count,
"single_field_count": sum([r.error_counts.single_field_count for r in results]),
"multi_field_count": sum([r.error_counts.multi_field_count for r in results]),
"register_count": sum([r.error_counts.register_count for r in results]),
"total_count": sum([r.error_counts.total_count for r in results]),
"details": errors_list,
},
"logic_warnings": {
"single_field_count": results.warning_counts.single_field_count,
"multi_field_count": results.warning_counts.multi_field_count,
"register_count": results.warning_counts.register_count,
"total_count": results.warning_counts.total_count,
"single_field_count": sum([r.warning_counts.single_field_count for r in results]),
"multi_field_count": sum([r.warning_counts.multi_field_count for r in results]),
"register_count": sum([r.warning_counts.register_count for r in results]),
"total_count": sum([r.warning_counts.total_count for r in results]),
"details": warnings_list,
},
}
Expand Down
2 changes: 1 addition & 1 deletion tests/app/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_default_maxes():
settings = Settings()
assert settings.max_validation_errors == 1000000
assert settings.max_json_records == 10000
assert settings.max_json_group_size == 0
assert settings.max_json_group_size == 200


def test_default_server_configs():
Expand Down
Loading
Loading