diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 10291bbc0..610d7534a 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -118,14 +118,14 @@ def extract_radiology_reports(parquet_dir: Path) -> None: # Call the EHR API api_config = api_config_for_queue("ehr") response = requests.post( - url=f"{api_config.base_url}/export-radiology-as-parquet", + url=f"{api_config.base_url}/export-patient-data", json={"project_name": project_name, "extract_datetime": omop_es_datetime.isoformat()}, timeout=10, ) success_code = 200 if response.status_code != success_code: - msg = f"Failed to run extract-radiology-as-parquet due to: {response.text}" + msg = f"Failed to run export-patient-data due to: {response.text}" raise RuntimeError(msg) diff --git a/pixl_core/src/core/upload.py b/pixl_core/src/core/upload.py index ab1d6bf3b..fdcda10ac 100644 --- a/pixl_core/src/core/upload.py +++ b/pixl_core/src/core/upload.py @@ -26,6 +26,9 @@ if TYPE_CHECKING: from socket import socket + from core.exports import ParquetExport + + from core.db.queries import get_project_slug_from_db, update_exported_at logger = logging.getLogger(__name__) @@ -82,6 +85,34 @@ def upload_dicom_image(zip_content: BinaryIO, pseudo_anon_id: str) -> None: update_exported_at(pseudo_anon_id, datetime.now(tz=timezone.utc)) +def upload_parquet_files(parquet_export: ParquetExport) -> None: + """Upload parquet to FTPS under //parquet.""" + current_extract = parquet_export.public_output.parents[1] + # Create the remote directory if it doesn't exist + ftp = _connect_to_ftp() + _create_and_set_as_cwd(ftp, parquet_export.project_slug) + _create_and_set_as_cwd(ftp, parquet_export.extract_time_slug) + _create_and_set_as_cwd(ftp, "parquet") + + export_files = [x for x in current_extract.rglob("*.parquet") if x.is_file()] + if not export_files: + msg = f"No files found in {current_extract}" + raise FileNotFoundError(msg) + + # throw exception if empty dir + for path in export_files: + with path.open("rb") as handle: + command = f"STOR {path.stem}.parquet" + logger.debug("Running %s", command) + + # Store the file using a binary handler + ftp.storbinary(command, handle) + + # Close the FTP connection + ftp.quit() + logger.debug("Finished uploading!") + + def _connect_to_ftp() -> FTP_TLS: # Set your FTP server details ftp_host = os.environ["FTP_HOST"] diff --git a/pixl_core/tests/conftest.py b/pixl_core/tests/conftest.py index 5d96492c4..41bf6d75b 100644 --- a/pixl_core/tests/conftest.py +++ b/pixl_core/tests/conftest.py @@ -15,13 +15,14 @@ import datetime import os -import shutil +import pathlib import subprocess from pathlib import Path from typing import BinaryIO import pytest from core.db.models import Base, Extract, Image +from core.exports import ParquetExport from sqlalchemy import Engine, create_engine from sqlalchemy.orm import Session, sessionmaker @@ -69,16 +70,21 @@ def test_zip_content() -> BinaryIO: @pytest.fixture() -def mounted_data() -> Path: +def mounted_data(run_containers) -> Path: """ The mounted data directory for the ftp server. This will contain the data after successful upload. + Tear down through docker """ yield MOUNTED_DATA_DIR - sub_dirs = [f.path for f in os.scandir(MOUNTED_DATA_DIR) if f.is_dir()] # Tear down the directory after tests - for sub_dir in sub_dirs: - shutil.rmtree(sub_dir, ignore_errors=True) + subprocess.run( + b"docker compose exec ftp-server sh -c 'rm -r /home/pixl/*'", + check=True, + cwd=TEST_DIR, + shell=True, # noqa: S602 + timeout=60, + ).check_returncode() @pytest.fixture(scope="module") @@ -170,3 +176,19 @@ def not_yet_exported_dicom_image(rows_in_session) -> Image: def already_exported_dicom_image(rows_in_session) -> Image: """Return a DICOM image from the database.""" return rows_in_session.query(Image).filter(Image.hashed_identifier == "already_exported").one() + + +@pytest.fixture(autouse=True) +def export_dir(tmp_path_factory: pytest.TempPathFactory) -> pathlib.Path: + """Tmp dir to for tests to extract to.""" + return tmp_path_factory.mktemp("export_base") / "exports" + + +@pytest.fixture() +def parquet_export(export_dir) -> ParquetExport: + """Return a ParquetExport object.""" + return ParquetExport( + project_name="i-am-a-project", + extract_datetime=datetime.datetime.now(tz=datetime.timezone.utc), + export_dir=export_dir, + ) diff --git a/pixl_core/tests/test_upload.py b/pixl_core/tests/test_upload.py index 2c7050bc6..5e05d73b7 100644 --- a/pixl_core/tests/test_upload.py +++ b/pixl_core/tests/test_upload.py @@ -14,12 +14,13 @@ """Test functionality to upload files to an endpoint.""" +import pathlib from datetime import datetime, timezone import pytest from core.db.models import Image from core.db.queries import get_project_slug_from_db, update_exported_at -from core.upload import upload_dicom_image +from core.upload import upload_dicom_image, upload_parquet_files @pytest.mark.usefixtures("run_containers") @@ -65,3 +66,33 @@ def test_update_exported_and_save(rows_in_session) -> None: # ASSERT assert actual_export_time == expected_export_time + + +@pytest.mark.usefixtures("run_containers") +def test_upload_parquet(parquet_export, mounted_data) -> None: + """Tests that parquet files are uploaded to the correct location""" + # ARRANGE + + parquet_export.copy_to_exports( + pathlib.Path(__file__).parents[2] / "test" / "resources" / "omop" + ) + with (parquet_export.public_output.parent / "radiology.parquet").open("w") as handle: + handle.writelines(["dummy data"]) + + # ACT + upload_parquet_files(parquet_export) + # ASSERT + expected_public_parquet_dir = ( + mounted_data / parquet_export.project_slug / parquet_export.extract_time_slug / "parquet" + ) + assert expected_public_parquet_dir.exists() + assert (expected_public_parquet_dir / "PROCEDURE_OCCURRENCE.parquet").exists() + assert (expected_public_parquet_dir / "radiology.parquet").exists() + + +@pytest.mark.usefixtures("run_containers") +def test_no_export_to_upload(parquet_export, mounted_data) -> None: + """If there is nothing in the export directly, an exception is thrown""" + parquet_export.public_output.mkdir(parents=True, exist_ok=True) + with pytest.raises(FileNotFoundError): + upload_parquet_files(parquet_export) diff --git a/pixl_ehr/src/pixl_ehr/main.py b/pixl_ehr/src/pixl_ehr/main.py index 2e1dfa38f..fa406c227 100644 --- a/pixl_ehr/src/pixl_ehr/main.py +++ b/pixl_ehr/src/pixl_ehr/main.py @@ -28,6 +28,7 @@ from core.exports import ParquetExport from core.patient_queue import PixlConsumer from core.rest_api.router import router, state +from core.upload import upload_parquet_files from decouple import config from fastapi import FastAPI from fastapi.responses import JSONResponse @@ -78,18 +79,37 @@ class ExportRadiologyData(BaseModel): @app.post( - "/export-radiology-as-parquet", - summary="Copy all matching radiology reports in the PIXL DB to a parquet file", + "/export-patient-data", + summary="Copy all matching radiology reports in the PIXL DB to a parquet file \ + and send all ParquetExports via FTPS", ) -def export_radiology_as_parquet(export_params: ExportRadiologyData) -> None: +def export_patient_data(export_params: ExportRadiologyData) -> None: """ Batch export of all matching radiology reports in PIXL DB to a parquet file. NOTE: we can't check that all reports in the queue have been processed, so we are relying on the user waiting until processing has finished before running this. """ + export_radiology_as_parquet(export_params) + + # Upload Parquet files to the appropriate endpoint + upload_parquet_files( + ParquetExport( + export_params.project_name, export_params.extract_datetime, export_params.output_dir + ) + ) + + +def export_radiology_as_parquet(export_params: ExportRadiologyData) -> None: + """ + Export radiology reports as a parquet file to + `{EHR_EXPORT_ROOT_DIR}//all_extracts/radiology/radiology.parquet`. + :param export_params: the project name, extract datetime and output directory defined as an + ExportRadiologyData object. + """ pe = ParquetExport( export_params.project_name, export_params.extract_datetime, export_params.output_dir ) + anon_data = PIXLDatabase().get_radiology_reports( pe.project_slug, export_params.extract_datetime ) diff --git a/pixl_ehr/tests/test_processing.py b/pixl_ehr/tests/test_processing.py index b065e1dd7..602c7e824 100644 --- a/pixl_ehr/tests/test_processing.py +++ b/pixl_ehr/tests/test_processing.py @@ -302,7 +302,7 @@ async def test_message_processing(example_messages) -> None: async def test_radiology_export(example_messages, tmp_path) -> None: """ GIVEN a message processed by the EHR API - WHEN export_radiology_as_parquet is called + WHEN export_patient_data is called THEN the radiology reports are exported to a parquet file and symlinked to the latest export directory """ @@ -338,7 +338,7 @@ async def test_radiology_export_multiple_projects(example_messages, tmp_path) -> """ GIVEN EHR API has processed four messages, each from a different project+extract combination (p1e1, p1e2, p2e1, p2e2 to ensure both fields must match) - WHEN export_radiology_as_parquet is called for 1 given project+extract + WHEN export_patient_data is called for 1 given project+extract THEN only the radiology reports for that project+extract are exported """ # ARRANGE @@ -349,7 +349,6 @@ async def test_radiology_export_multiple_projects(example_messages, tmp_path) -> await process_message(mess) # ACT - export_radiology_as_parquet( ExportRadiologyData( project_name=project_name, extract_datetime=extract_datetime, output_dir=tmp_path diff --git a/test/scripts/check_ftps_upload.py b/test/scripts/check_ftps_upload.py index e81a5182f..ec0bedd23 100755 --- a/test/scripts/check_ftps_upload.py +++ b/test/scripts/check_ftps_upload.py @@ -12,38 +12,40 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import subprocess + from pathlib import Path from shutil import rmtree from time import sleep -PARQUET_PATH = Path(__file__).parents[1] / "resources" / "omop" -print(f"parquet path: {PARQUET_PATH}") +OMOP_INPUT_PATH = Path(__file__).parents[1] / "resources" / "omop" +print(f"parquet path: {OMOP_INPUT_PATH}") MOUNTED_DATA_DIR = Path(__file__).parents[1] / "dummy-services" / "ftp-server" / "mounts" / "data" print(f"mounted data dir: {MOUNTED_DATA_DIR}") -project_name = "test-extract-uclh-omop-cdm" -print(f"project name: {project_name}") -expected_output_dir = MOUNTED_DATA_DIR / project_name +project_slug = "test-extract-uclh-omop-cdm" +extract_time_slug = "2023-12-07t14-08-58" + +expected_output_dir = MOUNTED_DATA_DIR / project_slug +expected_public_parquet_dir = expected_output_dir / extract_time_slug / "parquet" print(f"expected output dir: {expected_output_dir}") +print(f"expected parquet files dir: {expected_public_parquet_dir}") SECONDS_WAIT = 5 -glob_list = [] +zip_files = [] for seconds in range(0, 121, SECONDS_WAIT): # Test whether DICOM images have been uploaded - glob_list = list(expected_output_dir.glob("*.zip")) - print(f"Waited for {seconds} seconds. glob_list: {glob_list}") - if len(glob_list) == 2: + zip_files = list(expected_output_dir.glob("*.zip")) + print(f"Waited for {seconds} seconds. glob_list: {zip_files}") + if len(zip_files) == 2: break sleep(SECONDS_WAIT) -# Check for expected number of uploaded files and clean up, even if the assertion fails -try: - # We expect 2 DICOM image studies to be uploaded - assert len(glob_list) == 2 - # TODO: check parquet files upload before deleting -finally: - # To we want to always remove the files if its failed, may help debugging not to? - rmtree(expected_output_dir, ignore_errors=True) +# We expect 2 DICOM image studies to be uploaded +assert len(zip_files) == 2 +assert expected_public_parquet_dir.exists() +assert (expected_public_parquet_dir / "PROCEDURE_OCCURRENCE.parquet").exists() +assert (expected_public_parquet_dir / "radiology.parquet").exists() +# Clean up; only happens if the assertion passes +rmtree(expected_output_dir, ignore_errors=True)