Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

378 update submission processing to use polars data validator #394

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM ghcr.io/cfpb/regtech/sbl/python-alpine:3.12

FROM --platform=amd64 ghcr.io/cfpb/regtech/sbl/python-ubi8:3.12
ENV UVICORN_LOG_LEVEL=info

WORKDIR /usr/app
RUN mkdir upload

RUN pip install poetry

COPY --chown=sbl:sbl poetry.lock pyproject.toml alembic.ini README.md ./
Expand Down
3,240 changes: 2,244 additions & 996 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ asyncpg = "^0.30.0"
regtech-api-commons = {git = "https://github.com/cfpb/regtech-api-commons.git"}
regtech-data-validator = {git = "https://github.com/cfpb/regtech-data-validator.git"}
regtech-regex = {git = "https://github.com/cfpb/regtech-regex.git"}
boto3 = "~1.34.0"
lchen-2101 marked this conversation as resolved.
Show resolved Hide resolved
python-multipart = "^0.0.12"
boto3 = "^1.35.25"
alembic = "^1.14.0"
async-lru = "^2.0.4"
ujson = "^5.10.0"
Expand Down
22 changes: 11 additions & 11 deletions src/log-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,30 @@ handlers:
class: logging.StreamHandler
stream: ext://sys.stdout
loggers:
uvicorn:
regtech_data_validator:
level: INFO
handlers:
- default
propagate: no
uvicorn.error:
level: INFO
uvicorn.access:
propagate: false
regtech_api_commons:
level: INFO
handlers:
- access
propagate: no
regtech_data_validator:
- default
propagate: false
sbl_filing_api:
level: INFO
handlers:
- default
propagate: false
regtech_api_commons:
uvicorn:
level: INFO
handlers:
- default
propagate: false
sbl_filing_api:
uvicorn.error:
level: INFO
uvicorn.access:
level: INFO
handlers:
- default
- access
propagate: false
3 changes: 2 additions & 1 deletion src/sbl_filing_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Settings(BaseSettings):
conn: PostgresDsn | None = None

fs_upload_config: FsUploadConfig

server_config: ServerConfig = ServerConfig()

submission_file_type: str = "text/csv"
Expand All @@ -61,7 +62,7 @@ class Settings(BaseSettings):

max_validation_errors: int = 1000000
max_json_records: int = 10000
max_json_group_size: int = 0
max_json_group_size: int = 200

def __init__(self, **data):
super().__init__(**data)
Expand Down
5 changes: 5 additions & 0 deletions src/sbl_filing_api/routers/filing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
import csv
import io

from concurrent.futures import ProcessPoolExecutor
from fastapi import Depends, Request, UploadFile, status
Expand Down Expand Up @@ -193,6 +195,9 @@ async def upload_file(request: Request, lei: str, period_code: str, file: Upload
)

submission.state = SubmissionState.SUBMISSION_UPLOADED
with io.BytesIO(content) as byte_stream:
reader = csv.reader(io.TextIOWrapper(byte_stream))
submission.total_records = sum(1 for row in reader) - 1
submission = await repo.update_submission(request.state.db_session, submission)
except Exception as e:
submission.state = SubmissionState.UPLOAD_FAILED
Expand Down
85 changes: 52 additions & 33 deletions src/sbl_filing_api/services/submission_processor.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
from typing import Generator
import pandas as pd
import polars as pl
import importlib.metadata as imeta
import logging

from io import BytesIO
from fastapi import UploadFile
from regtech_data_validator.create_schemas import validate_phases
from regtech_data_validator.validator import validate_batch_csv
from regtech_data_validator.data_formatters import df_to_dicts, df_to_download
from regtech_data_validator.checks import Severity
from regtech_data_validator.validation_results import ValidationResults, ValidationPhase
from regtech_data_validator.validation_results import ValidationPhase, ValidationResults
from sbl_filing_api.entities.engine.engine import SessionLocal
from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState
from sbl_filing_api.entities.repos.submission_repo import update_submission
from http import HTTPStatus
from sbl_filing_api.config import settings
from sbl_filing_api.config import FsProtocol, settings
from sbl_filing_api.services import file_handler
from regtech_api_commons.api.exceptions import RegTechHttpException

Expand Down Expand Up @@ -59,6 +58,13 @@ def get_from_storage(period_code: str, lei: str, file_identifier: str, extension
) from e


def generate_file_path(period_code: str, lei: str, file_identifier: str, extension: str = "csv"):
file_path = f"{settings.fs_upload_config.root}/upload/{period_code}/{lei}/{file_identifier}.{extension}"
if settings.fs_upload_config.protocol == FsProtocol.S3.value:
file_path = "s3://" + file_path
return file_path


async def validate_and_update_submission(
period_code: str, lei: str, submission: SubmissionDAO, content: bytes, exec_check: dict
):
Expand All @@ -69,33 +75,45 @@ async def validate_and_update_submission(
submission.state = SubmissionState.VALIDATION_IN_PROGRESS
submission = await update_submission(session, submission)

df = pd.read_csv(BytesIO(content), dtype=str, na_filter=False)
submission.total_records = len(df)
file_path = generate_file_path(period_code, lei, submission.id)

final_phase = ValidationPhase.LOGICAL
all_findings = []
final_df = pl.DataFrame()

for validation_results in validate_batch_csv(
file_path,
context={"lei": lei},
batch_size=50000,
batch_count=1,
max_errors=settings.max_validation_errors,
):
final_phase = validation_results.phase
all_findings.append(validation_results)

# Validate Phases
results = validate_phases(df, {"lei": lei}, max_errors=settings.max_validation_errors)
if all_findings:
final_df = pl.concat([v.findings for v in all_findings], how="diagonal")

submission.validation_results = build_validation_results(results)
submission.validation_results = build_validation_results(final_df, all_findings, final_phase)

if results.findings.empty:
if final_df.is_empty():
submission.state = SubmissionState.VALIDATION_SUCCESSFUL
elif (
results.phase == ValidationPhase.SYNTACTICAL
final_phase == ValidationPhase.SYNTACTICAL
or submission.validation_results["logic_errors"]["total_count"] > 0
):
submission.state = SubmissionState.VALIDATION_WITH_ERRORS
else:
submission.state = SubmissionState.VALIDATION_WITH_WARNINGS

submission_report = df_to_download(
results.findings,
warning_count=results.warning_counts.total_count,
error_count=results.error_counts.total_count,
final_df,
warning_count=sum([r.warning_counts.total_count for r in all_findings]),
error_count=sum([r.error_counts.total_count for r in all_findings]),
max_errors=settings.max_validation_errors,
)
upload_to_storage(
period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report.encode("utf-8")
)

upload_to_storage(period_code, lei, str(submission.id) + REPORT_QUALIFIER, submission_report)

if not exec_check["continue"]:
log.warning(f"Submission {submission.id} is expired, will not be updating final state with results.")
Expand All @@ -114,15 +132,16 @@ async def validate_and_update_submission(
await update_submission(session, submission)


def build_validation_results(results: ValidationResults):
val_json = df_to_dicts(results.findings, settings.max_json_records, settings.max_json_group_size)
if results.phase == ValidationPhase.SYNTACTICAL:
def build_validation_results(final_df: pl.DataFrame, results: list[ValidationResults], final_phase: ValidationPhase):
val_json = df_to_dicts(final_df, settings.max_json_records, settings.max_json_group_size)
if final_phase == ValidationPhase.SYNTACTICAL:
syntax_error_counts = sum([r.error_counts.single_field_count for r in results])
val_res = {
"syntax_errors": {
"single_field_count": results.error_counts.single_field_count,
"multi_field_count": results.error_counts.multi_field_count, # this will always be zero for syntax errors
"register_count": results.error_counts.register_count, # this will always be zero for syntax errors
"total_count": results.error_counts.total_count,
"single_field_count": syntax_error_counts,
"multi_field_count": 0, # this will always be zero for syntax errors
"register_count": 0, # this will always be zero for syntax errors
"total_count": syntax_error_counts,
"details": val_json,
}
}
Expand All @@ -138,17 +157,17 @@ def build_validation_results(results: ValidationResults):
"details": [],
},
"logic_errors": {
"single_field_count": results.error_counts.single_field_count,
"multi_field_count": results.error_counts.multi_field_count,
"register_count": results.error_counts.register_count,
"total_count": results.error_counts.total_count,
"single_field_count": sum([r.error_counts.single_field_count for r in results]),
"multi_field_count": sum([r.error_counts.multi_field_count for r in results]),
"register_count": sum([r.error_counts.register_count for r in results]),
"total_count": sum([r.error_counts.total_count for r in results]),
"details": errors_list,
},
"logic_warnings": {
"single_field_count": results.warning_counts.single_field_count,
"multi_field_count": results.warning_counts.multi_field_count,
"register_count": results.warning_counts.register_count,
"total_count": results.warning_counts.total_count,
"single_field_count": sum([r.warning_counts.single_field_count for r in results]),
"multi_field_count": sum([r.warning_counts.multi_field_count for r in results]),
"register_count": sum([r.warning_counts.register_count for r in results]),
"total_count": sum([r.warning_counts.total_count for r in results]),
"details": warnings_list,
},
}
Expand Down
2 changes: 1 addition & 1 deletion tests/app/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_default_maxes():
settings = Settings()
assert settings.max_validation_errors == 1000000
assert settings.max_json_records == 10000
assert settings.max_json_group_size == 0
assert settings.max_json_group_size == 200


def test_default_server_configs():
Expand Down
96 changes: 42 additions & 54 deletions tests/services/conftest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import pandas as pd
import polars as pl
import pytest

from pytest_mock import MockerFixture
from textwrap import dedent
from unittest.mock import Mock

from sbl_filing_api.entities.models.dao import SubmissionDAO, SubmissionState

from regtech_data_validator.validation_results import ValidationResults, ValidationPhase, Counts
from regtech_data_validator.checks import Severity


@pytest.fixture(scope="function")
Expand All @@ -21,63 +21,63 @@ def validate_submission_mock(mocker: MockerFixture):
mock_update_submission = mocker.patch("sbl_filing_api.services.submission_processor.update_submission")
mock_update_submission.return_value = return_sub

mock_read_csv = mocker.patch("pandas.read_csv")
mock_read_csv.return_value = pd.DataFrame([["0", "1"]], columns=["Submission_Column_1", "Submission_Column_2"])

return mock_update_submission


@pytest.fixture(scope="function")
def error_submission_mock(mocker: MockerFixture, validate_submission_mock: Mock):
mock_validation = mocker.patch("sbl_filing_api.services.submission_processor.validate_phases")
mock_validation.return_value = ValidationResults(
phase=ValidationPhase.SYNTACTICAL,
error_counts=Counts(
single_field_count=1,
multi_field_count=0,
register_count=0,
total_count=1,
),
warning_counts=Counts(single_field_count=0, multi_field_count=0, register_count=0, total_count=0),
findings=pd.DataFrame([["Error"]], columns=["validation_severity"]),
is_valid=False,

mock_read_csv = mocker.patch("sbl_filing_api.services.submission_processor.validate_batch_csv")
mock_read_csv.return_value = iter(
[
ValidationResults(
error_counts=Counts(),
warning_counts=Counts(),
is_valid=False,
findings=pl.DataFrame({"validation_type": [Severity.ERROR]}),
phase=ValidationPhase.LOGICAL,
)
]
)

return validate_submission_mock


@pytest.fixture(scope="function")
def successful_submission_mock(mocker: MockerFixture, validate_submission_mock: Mock):
mock_validation = mocker.patch("sbl_filing_api.services.submission_processor.validate_phases")
mock_validation.return_value = ValidationResults(
phase=ValidationPhase.LOGICAL,
error_counts=Counts(
single_field_count=0,
multi_field_count=0,
register_count=0,
total_count=0,
),
warning_counts=Counts(single_field_count=0, multi_field_count=0, register_count=0, total_count=0),
findings=pd.DataFrame(),
is_valid=True,

mock_read_csv = mocker.patch("sbl_filing_api.services.submission_processor.validate_batch_csv")
mock_read_csv.return_value = iter(
[
ValidationResults(
error_counts=Counts(),
warning_counts=Counts(),
is_valid=True,
findings=pl.DataFrame(),
phase=ValidationPhase.LOGICAL,
)
]
)

return validate_submission_mock


@pytest.fixture(scope="function")
def warning_submission_mock(mocker: MockerFixture, validate_submission_mock: Mock):
mock_validation = mocker.patch("sbl_filing_api.services.submission_processor.validate_phases")
mock_validation.return_value = ValidationResults(
phase=ValidationPhase.LOGICAL,
error_counts=Counts(
single_field_count=0,
multi_field_count=0,
register_count=0,
total_count=0,
),
warning_counts=Counts(single_field_count=1, multi_field_count=0, register_count=0, total_count=1),
findings=pd.DataFrame([["Warning"]], columns=["validation_severity"]),
is_valid=False,

mock_read_csv = mocker.patch("sbl_filing_api.services.submission_processor.validate_batch_csv")
mock_read_csv.return_value = iter(
[
ValidationResults(
error_counts=Counts(),
warning_counts=Counts(),
is_valid=False,
findings=pl.DataFrame({"validation_type": [Severity.WARNING]}),
phase=ValidationPhase.LOGICAL,
)
]
)

return validate_submission_mock


Expand All @@ -90,17 +90,5 @@ def build_validation_results_mock(mocker: MockerFixture, validate_submission_moc

@pytest.fixture(scope="function")
def df_to_download_mock(mocker: MockerFixture):
expected_output = dedent(
"""
validation_type,validation_id,validation_name,row,unique_identifier,fig_link,validation_description,field_1,value_1
Warning,W0003,uid.invalid_uid_lei,1,ZZZZZZZZZZZZZZZZZZZZZ1,https://www.consumerfinance.gov/data-research/small-business-lending/filing-instructions-guide/2024-guide/#4.4.1,"* The first 20 characters of the 'unique identifier' should
match the Legal Entity Identifier (LEI) for the financial institution.
",uid,ZZZZZZZZZZZZZZZZZZZZZ1
Warning,W0003,uid.invalid_uid_lei,2,ZZZZZZZZZZZZZZZZZZZZZS,https://www.consumerfinance.gov/data-research/small-business-lending/filing-instructions-guide/2024-guide/#4.4.1,"* The first 20 characters of the 'unique identifier' should
match the Legal Entity Identifier (LEI) for the financial institution.
",uid,ZZZZZZZZZZZZZZZZZZZZZS
"""
).strip("\n")
mock_download_formatting = mocker.patch("sbl_filing_api.services.submission_processor.df_to_download")
mock_download_formatting.return_value = expected_output
return mock_download_formatting
mock_download_formatting.return_value = b"\x01"
Loading
Loading