Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export current extract parquet files over FTPS #264

Merged
merged 32 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
99bbc0b
Rough sketch of parquet FTP upload testing
milanmlft Jan 24, 2024
e929d11
Replace SendViaStow with SendViaFTPS
ruaridhg Jan 25, 2024
5ce99a6
pair prog switchover
ruaridhg Jan 25, 2024
b794e1b
pair prog switchover
ruaridhg Jan 25, 2024
e238b26
Basic upload tested
stefpiatek Jan 25, 2024
97f700f
Ensure exception thrown if no files in export
stefpiatek Jan 25, 2024
afdbb4e
tidy up
ruaridhg Jan 25, 2024
94f5572
deleting mounted data in tests through docker
ruaridhg Jan 25, 2024
4a67a38
tidy up
ruaridhg Jan 25, 2024
325fecc
Adapt fixtures to new ParquetExport
ruaridhg Jan 29, 2024
e1f8157
pair prog
ruaridhg Jan 30, 2024
48d2dc1
fix[core]: tests leaving files
peshence Jan 30, 2024
23f6253
Fix ruff and remove comments
ruaridhg Jan 30, 2024
d9a769a
Suggestions from PR review
ruaridhg Jan 30, 2024
226e18d
Update ref to pixl_ehr function
ruaridhg Jan 30, 2024
42675b6
send_via_ftps added to single API endpoint
ruaridhg Jan 30, 2024
0fd1029
fix Path type
ruaridhg Jan 30, 2024
943f127
Add missing environment variables in test fixture
milanmlft Jan 30, 2024
c89cf94
Add ftp-server to `pixl_ehr` tests
milanmlft Jan 30, 2024
2043805
Moved export radiology into separate function
ruaridhg Jan 31, 2024
fd1c968
Updated ehr api name
ruaridhg Jan 31, 2024
ee457b6
Merge branch 'main' into milanmlft/radiology-upload-ftps
milanmlft Jan 31, 2024
d60db3e
Merge branch 'main' into milanmlft/radiology-upload-ftps
milanmlft Feb 2, 2024
bbaac38
Remove ftp-server setup in `pixl_ehr`
milanmlft Feb 2, 2024
2946946
updated error message
ruaridhg Feb 2, 2024
0930771
Oly clean up exported files after successful test
milanmlft Feb 5, 2024
fd14efe
Better variable names
milanmlft Feb 5, 2024
bfa9998
Remove unused import
milanmlft Feb 5, 2024
906b4df
Check successful upload of parquet files to FTPS
milanmlft Feb 5, 2024
14d3c67
Reduce complexity, improve docstrings
milanmlft Feb 5, 2024
4de546e
Bug fix: set `current_extract` to the correct directory
milanmlft Feb 5, 2024
010cf9a
Merge branch 'main' into milanmlft/radiology-upload-ftps
milanmlft Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def extract_radiology_reports(parquet_dir: Path) -> None:
# Call the EHR API
api_config = api_config_for_queue("ehr")
response = requests.post(
url=f"{api_config.base_url}/export-radiology-as-parquet",
url=f"{api_config.base_url}/export-patient-data",
json={"project_name": project_name, "extract_datetime": omop_es_datetime.isoformat()},
timeout=10,
)
Expand Down
4 changes: 2 additions & 2 deletions orthanc/orthanc-anon/plugin/pixl.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def OnChange(changeType, level, resource) -> None: # noqa: ARG001
"""
Three ChangeTypes included in this function:
- If a study if stable and if ShouldAutoRoute returns true
then SendViaStow is called
then SendViaFTPS is called
- If orthanc has started then message added to Orthanc LogWarning
and AzureDICOMTokenRefresh called
- If orthanc has stopped and TIMER is not none then message added
Expand All @@ -202,7 +202,7 @@ def OnChange(changeType, level, resource) -> None: # noqa: ARG001

if changeType == orthanc.ChangeType.STABLE_STUDY and ShouldAutoRoute():
print("Stable study: %s" % resource) # noqa: T201
SendViaStow(resource)
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
SendViaFTPS(resource)

if changeType == orthanc.ChangeType.ORTHANC_STARTED:
orthanc.LogWarning("Starting the scheduler")
Expand Down
31 changes: 31 additions & 0 deletions pixl_core/src/core/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
if TYPE_CHECKING:
from socket import socket

from core.exports import ParquetExport


from core.db.queries import get_project_slug_from_db, update_exported_at

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -77,6 +80,34 @@ def upload_dicom_image(zip_content: BinaryIO, pseudo_anon_id: str) -> None:
update_exported_at(pseudo_anon_id, datetime.now(tz=timezone.utc))


def upload_parquet_files(parquet_export: ParquetExport) -> None:
"""Upload parquet to FTPS under <project name>/<extract datetime>/parquet."""
current_extract = parquet_export.public_output.parent
# Create the remote directory if it doesn't exist
ftp = _connect_to_ftp()
_create_and_set_as_cwd(ftp, parquet_export.project_slug)
_create_and_set_as_cwd(ftp, parquet_export.extract_time_slug)
_create_and_set_as_cwd(ftp, "parquet")

export_files = [x for x in current_extract.rglob("*.parquet") if x.is_file()]
if not export_files:
msg = f"No files found in {current_extract}"
raise FileNotFoundError(msg)

# throw exception if empty dir
for path in export_files:
with path.open("rb") as handle:
command = f"STOR {path.stem}.parquet"
logger.debug("Running %s", command)

# Store the file using a binary handler
ftp.storbinary(command, handle)

# Close the FTP connection
ftp.quit()
logger.debug("Finished uploading!")


def _connect_to_ftp() -> FTP_TLS:
# Set your FTP server details
ftp_host = os.environ["FTP_HOST"]
Expand Down
34 changes: 27 additions & 7 deletions pixl_core/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@

import datetime
import os
import shutil
import pathlib
import subprocess
from pathlib import Path
from typing import BinaryIO

import pytest
from core.db.models import Base, Extract, Image
from core.exports import ParquetExport
from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker

Expand Down Expand Up @@ -66,18 +67,21 @@ def test_zip_content() -> BinaryIO:


@pytest.fixture()
def mounted_data() -> Path:
def mounted_data(run_containers) -> Path:
"""
The mounted data directory for the ftp server.
This will contain the data after successful upload.
Tear down through docker
"""
yield TEST_DIR / "ftp-server" / "mounts" / "data"
sub_dirs = [
f.path for f in os.scandir(TEST_DIR / "ftp-server" / "mounts" / "data") if f.is_dir()
]
# Tear down the directory after tests
for sub_dir in sub_dirs:
shutil.rmtree(sub_dir, ignore_errors=True)
subprocess.run(
b"docker compose exec ftp-server sh -c 'rm -r /home/pixl/*'",
check=True,
cwd=TEST_DIR,
shell=True, # noqa: S602
timeout=60,
).check_returncode()


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -169,3 +173,19 @@ def not_yet_exported_dicom_image(rows_in_session) -> Image:
def already_exported_dicom_image(rows_in_session) -> Image:
"""Return a DICOM image from the database."""
return rows_in_session.query(Image).filter(Image.hashed_identifier == "already_exported").one()


@pytest.fixture(autouse=True)
def export_dir(tmp_path_factory: pytest.TempPathFactory) -> pathlib.Path:
"""Tmp dir to for tests to extract to."""
return tmp_path_factory.mktemp("export_base") / "exports"


@pytest.fixture()
def parquet_export(export_dir) -> ParquetExport:
"""Return a ParquetExport object."""
return ParquetExport(
project_name="i-am-a-project",
extract_datetime=datetime.datetime.now(tz=datetime.timezone.utc),
export_dir=export_dir,
)
33 changes: 32 additions & 1 deletion pixl_core/tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
"""Test functionality to upload files to an endpoint."""


import pathlib
from datetime import datetime, timezone

import pytest
from core.db.models import Image
from core.db.queries import get_project_slug_from_db, update_exported_at
from core.upload import upload_dicom_image
from core.upload import upload_dicom_image, upload_parquet_files


@pytest.mark.usefixtures("run_containers")
Expand Down Expand Up @@ -65,3 +66,33 @@ def test_update_exported_and_save(rows_in_session) -> None:

# ASSERT
assert actual_export_time == expected_export_time


@pytest.mark.usefixtures("run_containers")
def test_upload_parquet(parquet_export, mounted_data) -> None:
"""Tests that parquet files are uploaded to the correct location"""
# ARRANGE

parquet_export.copy_to_exports(
pathlib.Path(__file__).parents[2] / "test" / "resources" / "omop"
)
with (parquet_export.public_output.parent / "radiology.parquet").open("w") as handle:
handle.writelines(["dummy data"])

# ACT
upload_parquet_files(parquet_export)
# ASSERT
expected_public_parquet_dir = (
mounted_data / parquet_export.project_slug / parquet_export.extract_time_slug / "parquet"
)
assert expected_public_parquet_dir.exists()
assert (expected_public_parquet_dir / "PROCEDURE_OCCURRENCE.parquet").exists()
assert (expected_public_parquet_dir / "radiology.parquet").exists()


@pytest.mark.usefixtures("run_containers")
def test_no_export_to_upload(parquet_export, mounted_data) -> None:
"""If there is nothing in the export directly, an exception is thrown"""
parquet_export.public_output.mkdir(parents=True, exist_ok=True)
with pytest.raises(FileNotFoundError):
upload_parquet_files(parquet_export)
26 changes: 23 additions & 3 deletions pixl_ehr/src/pixl_ehr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from core.exports import ParquetExport
from core.patient_queue import PixlConsumer
from core.rest_api.router import router, state
from core.upload import upload_parquet_files
from decouple import config
from fastapi import FastAPI
from fastapi.responses import JSONResponse
Expand Down Expand Up @@ -78,24 +79,43 @@ class ExportRadiologyData(BaseModel):


@app.post(
"/export-radiology-as-parquet",
summary="Copy all matching radiology reports in the PIXL DB to a parquet file",
"/export-patient-data",
summary="Copy all matching radiology reports in the PIXL DB to a parquet file \
and send all ParquetExports via FTPS",
)
def export_radiology_as_parquet(export_params: ExportRadiologyData) -> None:
def export_patient_data(export_params: ExportRadiologyData) -> None:
"""
Batch export of all matching radiology reports in PIXL DB to a parquet file.
NOTE: we can't check that all reports in the queue have been processed, so
we are relying on the user waiting until processing has finished before running this.
"""
export_radiology_as_parquet(export_params)

send_via_ftps(
export_params.project_name, export_params.extract_datetime, export_params.output_dir
)


def export_radiology_as_parquet(export_params: ExportRadiologyData) -> None:
"""export-radiology-as-parquet"""
pe = ParquetExport(
export_params.project_name, export_params.extract_datetime, export_params.output_dir
)

anon_data = PIXLDatabase().get_radiology_reports(
pe.project_slug, export_params.extract_datetime
)
pe.export_radiology(anon_data)


def send_via_ftps(
project_name: str, extract_datetime: datetime, export_dir: Optional[Path]
) -> None:
"""Send parquet files via FTPS"""
pe = ParquetExport(project_name, extract_datetime, export_dir)
upload_parquet_files(pe)


@app.get(
"/az-copy-current",
summary="Copy the current state of the PIXL anon EHR schema to azure",
Expand Down
4 changes: 4 additions & 0 deletions pixl_ehr/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@
os.environ["EMAP_UDS_PASSWORD"] = "postgres" # noqa: S105
os.environ["EMAP_UDS_SCHEMA_NAME"] = "star"
os.environ["COGSTACK_REDACT_URL"] = "test"
os.environ["FTP_HOST"] = "localhost"
os.environ["FTP_USER_NAME"] = "pixl"
os.environ["FTP_USER_PASS"] = "longpassword" # noqa: S105 Hardcoding password
os.environ["FTP_PORT"] = "20021"
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
25 changes: 25 additions & 0 deletions pixl_ehr/tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,28 @@ services:
interval: 10s
timeout: 30s
retries: 5

ftp-server:
container_name: test-ftp-server
build:
# TODO: update after PR #268 has been merged
context: ../../pixl_core/tests/ftp-server/
ports:
- "20021:21"
- "21000-21010:21000-21010"
volumes:
# TODO: update after PR #268 has been merged
# Mount for uploaded data
- "../../pixl_core/tests/ftp-server/mounts/data/:/home/pixl/"
# Mount SSL keys for TLS
- "../../pixl_core/tests/ftp-server/mounts/ssl/:/etc/ssl/private/"
environment:
ADDRESS: "localhost"
USERS: pixl|longpassword|/home/pixl
TLS_KEY: /etc/ssl/private/localhost.key
TLS_CERT: /etc/ssl/private/localhost.crt
healthcheck:
test: ping localhost:21 -w 2
interval: 10s
timeout: 5s
retries: 5
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 2 additions & 3 deletions pixl_ehr/tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ async def test_message_processing(example_messages) -> None:
async def test_radiology_export(example_messages, tmp_path) -> None:
"""
GIVEN a message processed by the EHR API
WHEN export_radiology_as_parquet is called
WHEN export_patient_data is called
THEN the radiology reports are exported to a parquet file and symlinked to the latest export
directory
"""
Expand Down Expand Up @@ -335,7 +335,7 @@ async def test_radiology_export_multiple_projects(example_messages, tmp_path) ->
"""
GIVEN EHR API has processed four messages, each from a different project+extract combination
(p1e1, p1e2, p2e1, p2e2 to ensure both fields must match)
WHEN export_radiology_as_parquet is called for 1 given project+extract
WHEN export_patient_data is called for 1 given project+extract
THEN only the radiology reports for that project+extract are exported
"""
# ARRANGE
Expand All @@ -346,7 +346,6 @@ async def test_radiology_export_multiple_projects(example_messages, tmp_path) ->
await process_message(mess)

# ACT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is still calling export_radiology_as_parquet below and not export_patient_data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 second option is to create a separte function export_reports_to_parquet and test that instead of testing the REST API call. That feels like less faffing around but I'm easy

I assumed this is what you meant by having export_radiology_as_parquet and send_via_ftps as separate functions called in a single API endpoint i.e. export_patient_data but testing just export_radiology_as_parquet in test_processing since send via ftps is tested elsewhere


export_radiology_as_parquet(
ExportRadiologyData(
project_name=project_name, extract_datetime=extract_datetime, output_dir=tmp_path
Expand Down
Loading