diff --git a/jobrunner/config.py b/jobrunner/config.py index c3283ead..ac6dd81b 100644 --- a/jobrunner/config.py +++ b/jobrunner/config.py @@ -136,6 +136,9 @@ def database_urls_from_env(env): MAX_WORKERS = int(os.environ.get("MAX_WORKERS") or max(cpu_count() - 1, 1)) MAX_DB_WORKERS = int(os.environ.get("MAX_DB_WORKERS") or MAX_WORKERS) MAX_RETRIES = int(os.environ.get("MAX_RETRIES", 0)) +MAX_LEVEL4_FILESIZE = int( + os.environ.get("MAX_LEVEL4_FILESIZE", 16 * 1024 * 1024) +) # 16mb STATA_LICENSE = os.environ.get("STATA_LICENSE") diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index 25b9aef7..5324d8f9 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -294,16 +294,20 @@ def delete_files(self, workspace, privacy, files): else: raise Exception(f"unknown privacy of {privacy}") - errors = [] - for name in files: - path = root / name - try: - path.unlink(missing_ok=True) - except Exception: - log.exception(f"Could not delete {path}") - errors.append(name) + return delete_files_from_directory(root, files) - return errors + +def delete_files_from_directory(directory, files): + errors = [] + for name in files: + path = directory / name + try: + path.unlink(missing_ok=True) + except Exception: + log.exception(f"Could not delete {path}") + errors.append(name) + + return errors def prepare_job(job_definition): @@ -411,9 +415,14 @@ def finalize_job(job_definition): write_job_logs(job_definition, job_metadata, copy_log_to_workspace=False) else: write_job_logs(job_definition, job_metadata, copy_log_to_workspace=True) - persist_outputs(job_definition, results.outputs, job_metadata) + excluded = persist_outputs(job_definition, results.outputs, job_metadata) + results.level4_excluded_files.update(**excluded) + RESULTS[job_definition.id] = results + # for ease of testing + return results + def get_job_metadata(job_definition, outputs, container_metadata): # job_metadata is a big dict capturing everything we know about the state @@ -457,25 +466,59 @@ def write_job_logs(job_definition, job_metadata, copy_log_to_workspace=True): ) +MAX_SIZE_MSG = """ +The file {filename} was {size}Mb, which is above the limit for +moderately_sensitive files of {limit}Mb. + +As such, it has *not* been copied to Level 4 storage. Please double check that +{filename} contains only aggregate information, and is an appropriate size to +be able to be output checked. +""" + + def persist_outputs(job_definition, outputs, job_metadata): """Copy generated outputs to persistant storage.""" # Extract outputs to workspace workspace_dir = get_high_privacy_workspace(job_definition.workspace) + excluded_files = {} + + def mb(b): + return round(b / (1024 * 1024), 2) + + sizes = {} for filename in outputs.keys(): log.info(f"Extracting output file: {filename}") - volumes.get_volume_api(job_definition).copy_from_volume( + size = volumes.get_volume_api(job_definition).copy_from_volume( job_definition, filename, workspace_dir / filename ) + sizes[filename] = size # Copy out medium privacy files medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace) if medium_privacy_dir: for filename, privacy_level in outputs.items(): if privacy_level == "moderately_sensitive": - volumes.copy_file( - workspace_dir / filename, medium_privacy_dir / filename - ) + size = sizes[filename] + message_file = medium_privacy_dir / (filename + ".txt") + + if size < job_definition.max_level4_filesize: + volumes.copy_file( + workspace_dir / filename, medium_privacy_dir / filename + ) + # if it previously had a too big notice, delete it + delete_files_from_directory(medium_privacy_dir, [message_file]) + else: + msg = f"File size of {mb(size)}Mb is larger that limit of {mb(job_definition.max_level4_filesize)}Mb." + excluded_files[filename] = msg + message_file.parent.mkdir(exist_ok=True, parents=True) + message_file.write_text( + MAX_SIZE_MSG.format( + filename=filename, + size=mb(size), + limit=mb(job_definition.max_level4_filesize), + ) + ) # this can be removed once osrelease is dead write_manifest_file( @@ -487,6 +530,8 @@ def persist_outputs(job_definition, outputs, job_metadata): }, ) + return excluded_files + def find_matching_outputs(job_definition): """ diff --git a/jobrunner/executors/volumes.py b/jobrunner/executors/volumes.py index 2dc4d6f2..952f7563 100644 --- a/jobrunner/executors/volumes.py +++ b/jobrunner/executors/volumes.py @@ -26,6 +26,8 @@ def copy_file(source, dest, follow_symlinks=True): with atomic_writer(dest) as tmp: shutil.copy(source, tmp, follow_symlinks=follow_symlinks) + return dest.stat().st_size + def docker_volume_name(job): return f"os-volume-{job.id}" @@ -50,7 +52,7 @@ def copy_to_volume(job, src, dst, timeout=None): docker.copy_to_volume(docker_volume_name(job), src, dst, timeout) def copy_from_volume(job, src, dst, timeout=None): - docker.copy_from_volume(docker_volume_name(job), src, dst, timeout) + return docker.copy_from_volume(docker_volume_name(job), src, dst, timeout) def delete_volume(job): docker.delete_volume(docker_volume_name(job)) @@ -140,7 +142,7 @@ def copy_to_volume(job, src, dst, timeout=None): def copy_from_volume(job, src, dst, timeout=None): # this is only used to copy final outputs/logs. path = host_volume_path(job) / src - copy_file(path, dst) + return copy_file(path, dst) def delete_volume(job): diff --git a/jobrunner/job_executor.py b/jobrunner/job_executor.py index 94877f30..6bbdbe30 100644 --- a/jobrunner/job_executor.py +++ b/jobrunner/job_executor.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from enum import Enum from typing import List, Mapping, Optional @@ -35,6 +35,7 @@ class JobDefinition: database_name: str = None cpu_count: str = None # number of CPUs to be allocated memory_limit: str = None # memory limit to apply + max_level4_filesize: int = 16 * 1024 * 1024 # if a job has been cancelled, the name of the canceller - either "user" or "admin" cancelled: str = None @@ -78,6 +79,9 @@ class JobResults: # timestamp these results were finalized, in integer nanoseconds timestamp_ns: int = None + # files not copied to level 4 (too big or similar reason) + level4_excluded_files: Mapping[str, str] = field(default_factory=dict) + # to be extracted from the image labels action_version: str = "unknown" action_revision: str = "unknown" diff --git a/jobrunner/lib/docker.py b/jobrunner/lib/docker.py index 313ebb34..54df55f3 100644 --- a/jobrunner/lib/docker.py +++ b/jobrunner/lib/docker.py @@ -276,6 +276,8 @@ def copy_from_volume(volume_name, source, dest, timeout=None): timeout=timeout, ) + return dest.stat().st_size + def glob_volume_files(volume_name, glob_patterns): """ diff --git a/jobrunner/run.py b/jobrunner/run.py index 2b729389..e54074f4 100644 --- a/jobrunner/run.py +++ b/jobrunner/run.py @@ -465,6 +465,12 @@ def save_results(job, job_definition, results): code = StatusCode.SUCCEEDED message = "Completed successfully" + if results.level4_excluded_files: + files = "\n".join( + f"{f}: {msg}" for f, msg in results.level4_excluded_files.items() + ) + message += f", but {len(results.level4_excluded_files)} file(s) marked as moderately_sensitive were excluded:\n{files}" + set_code(job, code, message, error=error, results=results) @@ -550,6 +556,7 @@ def job_to_job_definition(job): # config defaults. cpu_count=config.DEFAULT_JOB_CPU_COUNT, memory_limit=config.DEFAULT_JOB_MEMORY_LIMIT, + max_level4_filesize=config.MAX_LEVEL4_FILESIZE, cancelled=job_definition_cancelled, ) diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 9dd99274..77cd5035 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -297,8 +297,8 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a ] job_definition.inputs = ["output/input.csv"] job_definition.output_spec = { - "output/output.*": "high_privacy", - "output/summary.*": "medium_privacy", + "output/output.*": "highly_sensitive", + "output/summary.*": "moderately_sensitive", } populate_workspace(job_definition.workspace, "output/input.csv") @@ -328,8 +328,8 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a results = api.get_results(job_definition) assert results.exit_code == 0 assert results.outputs == { - "output/output.csv": "high_privacy", - "output/summary.csv": "medium_privacy", + "output/output.csv": "highly_sensitive", + "output/summary.csv": "moderately_sensitive", } assert results.unmatched_patterns == [] @@ -344,8 +344,8 @@ def test_finalize_failed(docker_cleanup, job_definition, tmp_work_dir, volume_ap job_definition.args = ["false"] job_definition.output_spec = { - "output/output.*": "high_privacy", - "output/summary.*": "medium_privacy", + "output/output.*": "highly_sensitive", + "output/summary.*": "moderately_sensitive", } api = local.LocalDockerAPI() @@ -378,8 +378,8 @@ def test_finalize_unmatched(docker_cleanup, job_definition, tmp_work_dir, volume # the sleep is needed to make sure the unmatched file is *newer* enough job_definition.args = ["sh", "-c", "sleep 1; touch /workspace/unmatched"] job_definition.output_spec = { - "output/output.*": "high_privacy", - "output/summary.*": "medium_privacy", + "output/output.*": "highly_sensitive", + "output/summary.*": "moderately_sensitive", } api = local.LocalDockerAPI() @@ -473,6 +473,87 @@ def test_finalize_failed_oomkilled(docker_cleanup, job_definition, tmp_work_dir) assert workspace_log_file_exists(job_definition) +@pytest.mark.needs_docker +def test_finalize_large_level4_outputs( + docker_cleanup, job_definition, tmp_work_dir, volume_api +): + job_definition.args = [ + "truncate", + "-s", + str(1024 * 1024), + "/workspace/output/output.csv", + ] + job_definition.output_spec = { + "output/output.csv": "moderately_sensitive", + } + job_definition.max_level4_filesize = 512 * 1024 + + api = local.LocalDockerAPI() + + status = api.prepare(job_definition) + assert status.state == ExecutorState.PREPARED + status = api.execute(job_definition) + assert status.state == ExecutorState.EXECUTING + + status = wait_for_state(api, job_definition, ExecutorState.EXECUTED) + + status = api.finalize(job_definition) + assert status.state == ExecutorState.FINALIZED + + result = api.get_results(job_definition) + + assert result.exit_code == 0 + assert result.level4_excluded_files == { + "output/output.csv": "File size of 1.0Mb is larger that limit of 0.5Mb.", + } + + level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) + message_file = level4_dir / "output/output.csv.txt" + txt = message_file.read_text() + assert "output/output.csv" in txt + assert "1.0Mb" in txt + assert "0.5Mb" in txt + + +@pytest.mark.needs_docker +def test_finalize_large_level4_outputs_cleanup( + docker_cleanup, job_definition, tmp_work_dir, volume_api +): + job_definition.args = [ + "truncate", + "-s", + str(256 * 1024), + "/workspace/output/output.csv", + ] + job_definition.output_spec = { + "output/output.csv": "moderately_sensitive", + } + job_definition.max_level4_filesize = 512 * 1024 + + level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) + message_file = level4_dir / "output/output.csv.txt" + message_file.parent.mkdir(exist_ok=True, parents=True) + message_file.write_text("message") + + api = local.LocalDockerAPI() + + status = api.prepare(job_definition) + assert status.state == ExecutorState.PREPARED + status = api.execute(job_definition) + assert status.state == ExecutorState.EXECUTING + + status = wait_for_state(api, job_definition, ExecutorState.EXECUTED) + + status = api.finalize(job_definition) + assert status.state == ExecutorState.FINALIZED + + result = api.get_results(job_definition) + + assert result.exit_code == 0 + assert result.level4_excluded_files == {} + assert not message_file.exists() + + @pytest.mark.needs_docker def test_pending_job_terminated_not_finalized( docker_cleanup, job_definition, tmp_work_dir diff --git a/tests/test_run.py b/tests/test_run.py index e955c0e7..92294f45 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -542,11 +542,11 @@ def test_handle_job_finalized_success_with_delete(db): job_factory( state=State.SUCCEEDED, status_code=StatusCode.SUCCEEDED, - outputs={"output/old.csv": "medium"}, + outputs={"output/old.csv": "highly_sensitive"}, ) job = api.add_test_job(ExecutorState.FINALIZED, State.RUNNING, StatusCode.FINALIZED) - api.set_job_result(job, outputs={"output/file.csv": "medium"}) + api.set_job_result(job, outputs={"output/file.csv": "highly_sensitive"}) run.handle_job(job, api) @@ -558,7 +558,7 @@ def test_handle_job_finalized_success_with_delete(db): # our state assert job.state == State.SUCCEEDED assert job.status_message == "Completed successfully" - assert job.outputs == {"output/file.csv": "medium"} + assert job.outputs == {"output/file.csv": "highly_sensitive"} assert api.deleted["workspace"][Privacy.MEDIUM] == ["output/old.csv"] assert api.deleted["workspace"][Privacy.HIGH] == ["output/old.csv"] @@ -569,6 +569,37 @@ def test_handle_job_finalized_success_with_delete(db): assert spans[-1].name == "JOB" +def test_handle_job_finalized_success_with_large_file(db): + api = StubExecutorAPI() + + # insert previous outputs + job_factory( + state=State.SUCCEEDED, + status_code=StatusCode.SUCCEEDED, + outputs={"output/output.csv": "moderately_sensitive"}, + ) + + job = api.add_test_job(ExecutorState.FINALIZED, State.RUNNING, StatusCode.FINALIZED) + api.set_job_result( + job, + outputs={"output/output.csv": "moderately_sensitive"}, + level4_excluded_files={"output/output.csv": "too big"}, + ) + + run.handle_job(job, api) + + # executor state + assert job.id in api.tracker["cleanup"] + # its been cleaned up and is now unknown + assert api.get_status(job).state == ExecutorState.UNKNOWN + + # our state + assert job.state == State.SUCCEEDED + assert "Completed successfully" in job.status_message + assert "were excluded" in job.status_message + assert "output/output.csv: too big" in job.status_message + + @pytest.mark.parametrize( "exit_code,run_command,extra_message", [ @@ -607,7 +638,10 @@ def test_handle_job_finalized_failed_exit_code( run_command=run_command, ) api.set_job_result( - job, outputs={"output/file.csv": "medium"}, exit_code=exit_code, message=None + job, + outputs={"output/file.csv": "highly_sensitive"}, + exit_code=exit_code, + message=None, ) run.handle_job(job, api) @@ -624,7 +658,7 @@ def test_handle_job_finalized_failed_exit_code( if extra_message: expected += f": {extra_message}" assert job.status_message == expected - assert job.outputs == {"output/file.csv": "medium"} + assert job.outputs == {"output/file.csv": "highly_sensitive"} spans = get_trace("jobs") assert spans[-3].name == "FINALIZED" @@ -644,7 +678,7 @@ def test_handle_job_finalized_failed_unmatched_patterns(db): job = api.add_test_job(ExecutorState.FINALIZED, State.RUNNING, StatusCode.FINALIZED) api.set_job_result( job, - outputs={"output/file.csv": "medium"}, + outputs={"output/file.csv": "highly_sensitive"}, unmatched_patterns=["badfile.csv"], unmatched_outputs=["otherbadfile.csv"], ) @@ -659,7 +693,7 @@ def test_handle_job_finalized_failed_unmatched_patterns(db): # our state assert job.state == State.FAILED assert job.status_message == "No outputs found matching patterns:\n - badfile.csv" - assert job.outputs == {"output/file.csv": "medium"} + assert job.outputs == {"output/file.csv": "highly_sensitive"} assert job.unmatched_outputs == ["otherbadfile.csv"] spans = get_trace("jobs") @@ -925,8 +959,8 @@ def test_ignores_cancelled_jobs_when_calculating_dependencies(db): def test_get_obsolete_files_nothing_to_delete(db): outputs = { - "high.txt": "high_privacy", - "medium.txt": "medium_privacy", + "high.txt": "highly_sensitive", + "medium.txt": "moderately_sensitive", } job = job_factory( state=State.SUCCEEDED, @@ -942,14 +976,14 @@ def test_get_obsolete_files_nothing_to_delete(db): def test_get_obsolete_files_things_to_delete(db): old_outputs = { - "old_high.txt": "high_privacy", - "old_medium.txt": "medium_privacy", - "current.txt": "high_privacy", + "old_high.txt": "highly_sensitive", + "old_medium.txt": "moderately_sensitive", + "current.txt": "highly_sensitive", } new_outputs = { - "new_high.txt": "high_privacy", - "new_medium.txt": "medium_privacy", - "current.txt": "high_privacy", + "new_high.txt": "highly_sensitive", + "new_medium.txt": "moderately_sensitive", + "current.txt": "highly_sensitive", } job = job_factory( state=State.SUCCEEDED, @@ -964,10 +998,10 @@ def test_get_obsolete_files_things_to_delete(db): def test_get_obsolete_files_case_change(db): old_outputs = { - "high.txt": "high_privacy", + "high.txt": "highly_sensitive", } new_outputs = { - "HIGH.txt": "high_privacy", + "HIGH.txt": "highly_sensitive", } job = job_factory( state=State.SUCCEEDED,