diff --git a/jobrunner/cli/local_run.py b/jobrunner/cli/local_run.py index fd0c3f29..68d6081c 100644 --- a/jobrunner/cli/local_run.py +++ b/jobrunner/cli/local_run.py @@ -426,6 +426,13 @@ def create_and_run_jobs( print(" outputs:") outputs = sorted(job.outputs.items()) if job.outputs else [] print(tabulate(outputs, separator=" - ", indent=5, empty="(no outputs)")) + + if job.level4_excluded_files: + print(" invalid moderately_sensitive outputs:") + print( + tabulate(job.level4_excluded_files.items(), separator=" - ", indent=5) + ) + # If a job exited with an error code then try to display the end of the # log output in case that makes the problem immediately obvious if job.status_code == StatusCode.NONZERO_EXIT: @@ -440,7 +447,6 @@ def create_and_run_jobs( def create_job_request_and_jobs(project_dir, actions, force_run_dependencies): - job_request = JobRequest( id=random_id(), repo_url=str(project_dir), diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index 63bcfcad..ae71d997 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -184,7 +184,6 @@ def execute(self, job_definition): return JobStatus(ExecutorState.EXECUTING) def finalize(self, job_definition): - current_status = self.get_status(job_definition) if current_status.state == ExecutorState.UNKNOWN: # job had not started running, so do not finalize @@ -488,13 +487,18 @@ def persist_outputs(job_definition, outputs, job_metadata): # Copy out medium privacy files medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace) - if medium_privacy_dir: - for filename, privacy_level in outputs.items(): - if privacy_level == "moderately_sensitive": - ok, job_msg, file_msg = check_l4_file( - job_definition, filename, sizes[filename], workspace_dir - ) + for filename, privacy_level in outputs.items(): + if privacy_level == "moderately_sensitive": + ok, job_msg, file_msg = check_l4_file( + job_definition, filename, sizes[filename], workspace_dir + ) + + if not ok: + excluded_files[filename] = job_msg + + # local run currently does not have a level 4 directory + if medium_privacy_dir: message_file = medium_privacy_dir / (filename + ".txt") if ok: @@ -504,19 +508,18 @@ def persist_outputs(job_definition, outputs, job_metadata): # if it previously had a too big notice, delete it delete_files_from_directory(medium_privacy_dir, [message_file]) else: - excluded_files[filename] = job_msg message_file.parent.mkdir(exist_ok=True, parents=True) message_file.write_text(file_msg) - # this can be removed once osrelease is dead - write_manifest_file( - medium_privacy_dir, - { - # this currently needs to exist, but is not used - "repo": None, - "workspace": job_definition.workspace, - }, - ) + # this can be removed once osrelease is dead + write_manifest_file( + medium_privacy_dir, + { + # this currently needs to exist, but is not used + "repo": None, + "workspace": job_definition.workspace, + }, + ) return excluded_files @@ -575,7 +578,6 @@ def mb(b): file_msgs.append(INVALID_FILE_TYPE_MSG.format(filename=filename, suffix=suffix)) elif suffix == ".csv": - # note: this assumes the local executor can directly access the long term storage on disk # this may need to be abstracted in future actual_file = workspace_dir / filename @@ -664,9 +666,10 @@ def write_log_file(job_definition, job_metadata, filename, excluded): f.write("\noutputs:\n") f.write(tabulate(outputs, separator=" - ", indent=2, empty="(no outputs)")) if excluded: - f.write("\nexcluded files:\n") - for excluded_file, msg in excluded.items(): - f.write(f"{excluded_file}: {msg}") + f.write("\n") + f.write("\nInvalid moderately_sensitive outputs:\n") + f.write(tabulate(excluded.items(), separator=" - ", indent=2)) + f.write("\n") # Keys of fields to log in manifest.json and log file diff --git a/jobrunner/executors/volumes.py b/jobrunner/executors/volumes.py index 952f7563..879cfdf9 100644 --- a/jobrunner/executors/volumes.py +++ b/jobrunner/executors/volumes.py @@ -34,7 +34,6 @@ def docker_volume_name(job): class DockerVolumeAPI: - # don't run with UIDs for now. We maybe be able to support this in future. requires_root = True supported_platforms = ("linux", "win32", "darwin") @@ -91,7 +90,6 @@ def host_volume_path(job, create=True): class BindMountVolumeAPI: - # Only works running jobs with uid:gid requires_root = False supported_platforms = ("linux",) @@ -145,7 +143,6 @@ def copy_from_volume(job, src, dst, timeout=None): return copy_file(path, dst) def delete_volume(job): - failed_files = {} # if we logged each file error directly, it would spam the logs, so we collect them diff --git a/jobrunner/lib/database.py b/jobrunner/lib/database.py index ddec4e43..8703b620 100644 --- a/jobrunner/lib/database.py +++ b/jobrunner/lib/database.py @@ -264,7 +264,6 @@ def create_table(conn, cls): def migrate_db(conn, migrations=None, verbose=False): - current_version = conn.execute("PRAGMA user_version").fetchone()[0] applied = [] diff --git a/jobrunner/lib/log_utils.py b/jobrunner/lib/log_utils.py index eee76bab..7350e75e 100644 --- a/jobrunner/lib/log_utils.py +++ b/jobrunner/lib/log_utils.py @@ -72,7 +72,6 @@ def configure_logging( class JobRunnerFormatter(logging.Formatter): - converter = time.gmtime # utc rather than local default_msec_format = "%s.%03dZ" # s/,/. and append Z diff --git a/jobrunner/models.py b/jobrunner/models.py index 84076b96..1d7ab4bc 100644 --- a/jobrunner/models.py +++ b/jobrunner/models.py @@ -37,7 +37,6 @@ class State(Enum): class StatusCode(Enum): - # PENDING states # # initial state of a job, not yet running @@ -158,6 +157,7 @@ class Job: completed_at INT, trace_context TEXT, status_code_updated_at INT, + level4_excluded_files TEXT, PRIMARY KEY (id) ); @@ -180,6 +180,13 @@ class Job: """, ) + migration( + 2, + """ + ALTER TABLE job ADD COLUMN level4_excluded_files TEXT; + """, + ) + id: str = None # noqa: A003 job_request_id: str = None state: State = None @@ -238,6 +245,8 @@ class Job: # used to track the OTel trace context for this job trace_context: dict = None + level4_excluded_files: dict = None + # used to cache the job_request json by the tracing code _job_request = None diff --git a/jobrunner/run.py b/jobrunner/run.py index 5b3c15ae..437ea3c0 100644 --- a/jobrunner/run.py +++ b/jobrunner/run.py @@ -356,7 +356,6 @@ def handle_job(job, api, mode=None, paused=None): new_status = api.execute(job_definition) elif initial_status.state == ExecutorState.EXECUTED: - if ExecutorState.FINALIZING in synchronous_transitions: # finalize is synchronous, which means set our code to FINALIZING # before calling api.finalize(), and we expect it to be FINALIZED @@ -435,6 +434,7 @@ def save_results(job, job_definition, results): """Extract the results of the execution and update the job accordingly.""" # save job outputs job.outputs = results.outputs + job.level4_excluded_files = results.level4_excluded_files message = None error = False @@ -493,7 +493,6 @@ def get_obsolete_files(job_definition, outputs): def job_to_job_definition(job): - allow_database_access = False env = {"OPENSAFELY_BACKEND": config.BACKEND} if job.requires_db: @@ -599,7 +598,6 @@ def set_code( # if status code has changed then trace it and update if job.status_code != new_status_code: - # handle timer measurement errors if job.status_code_updated_at > timestamp_ns: # we somehow have a negative duration, which honeycomb does funny things with. diff --git a/tests/cli/test_kill_job.py b/tests/cli/test_kill_job.py index acf827c2..c5091cdf 100644 --- a/tests/cli/test_kill_job.py +++ b/tests/cli/test_kill_job.py @@ -11,7 +11,6 @@ @pytest.mark.parametrize("cleanup", [False, True]) def test_kill_job(cleanup, tmp_work_dir, db, monkeypatch): - job = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING) mocker = mock.MagicMock(spec=local.docker) diff --git a/tests/cli/test_local_run.py b/tests/cli/test_local_run.py index 2bc46388..a1a72a15 100644 --- a/tests/cli/test_local_run.py +++ b/tests/cli/test_local_run.py @@ -8,6 +8,7 @@ import pytest from pipeline import load_pipeline +from ruyaml import YAML from jobrunner import config from jobrunner.actions import get_action_specification @@ -40,6 +41,36 @@ def test_local_run_limits_applied(db, tmp_path, docker_cleanup): assert metadata["HostConfig"]["NanoCpus"] == 1.5 * 1e9 +@pytest.mark.slow_test +@pytest.mark.needs_docker +def test_local_run_level_4_checks_applied_and_logged( + db, tmp_path, docker_cleanup, capsys +): + project_dir = tmp_path / "project" + shutil.copytree(str(FIXTURE_DIR / "full_project"), project_dir) + project_yaml = project_dir / "project.yaml" + yaml = YAML() + project = yaml.load(project_yaml) + outputs = project["actions"]["generate_dataset"]["outputs"].pop("highly_sensitive") + project["actions"]["generate_dataset"]["outputs"]["moderately_sensitive"] = outputs + yaml.dump(project, project_yaml) + + local_run.main( + project_dir=project_dir, + actions=["generate_dataset"], + debug=True, # preserves containers for inspection + ) + + job = list(database.find_all(Job))[0] + assert job.level4_excluded_files == { + "output/dataset.csv": "File has patient_id column" + } + + stdout = capsys.readouterr().out + assert "invalid moderately_sensitive outputs:" in stdout + assert "output/dataset.csv - File has patient_id column" in stdout + + @pytest.mark.parametrize("extraction_tool", ["cohortextractor", "databuilder"]) @pytest.mark.slow_test @pytest.mark.needs_docker diff --git a/tests/cli/test_prepare_for_reboot.py b/tests/cli/test_prepare_for_reboot.py index 5d3433c1..b4ed56c6 100644 --- a/tests/cli/test_prepare_for_reboot.py +++ b/tests/cli/test_prepare_for_reboot.py @@ -9,7 +9,6 @@ def test_prepare_for_reboot(db, monkeypatch): - j1 = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING) j2 = job_factory( state=State.PENDING, status_code=StatusCode.WAITING_ON_DEPENDENCIES diff --git a/tests/conftest.py b/tests/conftest.py index baf8d677..80cb53de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -197,7 +197,6 @@ def db(monkeypatch, request): @dataclass class SubprocessStub: - calls: deque = field(default_factory=deque) def add_call(self, cmd, **kwargs): diff --git a/tests/factories.py b/tests/factories.py index 73c98ac5..7872769d 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -124,7 +124,6 @@ class StubExecutorAPI: synchronous_transitions = [] def __init__(self): - self.tracker = { "prepare": set(), "execute": set(), diff --git a/tests/lib/test_database.py b/tests/lib/test_database.py index 55ebe9b2..56df4edc 100644 --- a/tests/lib/test_database.py +++ b/tests/lib/test_database.py @@ -179,7 +179,6 @@ def test_migrate_in_transaction(tmp_path): def test_ensure_valid_db(tmp_path): - # db doesn't exists with pytest.raises(MigrationNeeded) as exc: ensure_valid_db("not_exists") diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 2e844f18..24472a19 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -115,7 +115,6 @@ def workspace_log_file_exists(job_definition): def test_prepare_success( docker_cleanup, job_definition, test_repo, tmp_work_dir, volume_api, freezer ): - job_definition.inputs = ["output/input.csv"] populate_workspace(job_definition.workspace, "output/input.csv") @@ -147,7 +146,6 @@ def test_prepare_success( @pytest.mark.needs_docker def test_prepare_already_prepared(docker_cleanup, job_definition, volume_api): - # create the volume already volume_api.create_volume(job_definition) volume_api.write_timestamp(job_definition, local.TIMESTAMP_REFERENCE_FILE) @@ -208,7 +206,6 @@ def test_prepare_job_bad_commit(docker_cleanup, job_definition, test_repo): @pytest.mark.needs_docker def test_prepare_job_no_input_file(docker_cleanup, job_definition, volume_api): - job_definition.inputs = ["output/input.csv"] with pytest.raises(local.LocalDockerError) as exc_info: @@ -219,7 +216,6 @@ def test_prepare_job_no_input_file(docker_cleanup, job_definition, volume_api): @pytest.mark.needs_docker def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, volume_api): - # check limits are applied job_definition.cpu_count = 1.5 job_definition.memory_limit = "1G" @@ -289,7 +285,6 @@ def test_execute_not_prepared(docker_cleanup, job_definition, tmp_work_dir, volu @pytest.mark.needs_docker def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_api): - job_definition.args = [ "touch", "/workspace/output/output.csv", @@ -341,7 +336,6 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a @pytest.mark.needs_docker def test_finalize_failed(docker_cleanup, job_definition, tmp_work_dir, volume_api): - job_definition.args = ["false"] job_definition.output_spec = { "output/output.*": "highly_sensitive", @@ -374,7 +368,6 @@ def test_finalize_failed(docker_cleanup, job_definition, tmp_work_dir, volume_ap @pytest.mark.needs_docker def test_finalize_unmatched(docker_cleanup, job_definition, tmp_work_dir, volume_api): - # the sleep is needed to make sure the unmatched file is *newer* enough job_definition.args = ["sh", "-c", "sleep 1; touch /workspace/unmatched"] job_definition.output_spec = { @@ -409,7 +402,6 @@ def test_finalize_unmatched(docker_cleanup, job_definition, tmp_work_dir, volume @pytest.mark.needs_docker def test_finalize_failed_137(docker_cleanup, job_definition, tmp_work_dir, volume_api): - job_definition.args = ["sleep", "101"] api = local.LocalDockerAPI() @@ -442,7 +434,6 @@ def test_finalize_failed_137(docker_cleanup, job_definition, tmp_work_dir, volum @pytest.mark.needs_docker def test_finalize_failed_oomkilled(docker_cleanup, job_definition, tmp_work_dir): - # Consume memory by writing to the tmpfs at /dev/shm # We write a lot more that our limit, to ensure the OOM killer kicks in # regardless of our tests host's vm.overcommit_memory settings. @@ -473,9 +464,17 @@ def test_finalize_failed_oomkilled(docker_cleanup, job_definition, tmp_work_dir) assert workspace_log_file_exists(job_definition) +@pytest.fixture(params=[True, False]) +def local_run(request, monkeypatch): + # local_run does not have a level 4 configured + if request.param: + monkeypatch.setattr(config, "MEDIUM_PRIVACY_WORKSPACES_DIR", None) + return request.param + + @pytest.mark.needs_docker def test_finalize_large_level4_outputs( - docker_cleanup, job_definition, tmp_work_dir, volume_api + docker_cleanup, job_definition, tmp_work_dir, volume_api, local_run ): job_definition.args = [ "truncate", @@ -507,16 +506,20 @@ def test_finalize_large_level4_outputs( "output/output.txt": "File size of 1.0Mb is larger that limit of 0.5Mb.", } - level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) - message_file = level4_dir / "output/output.txt.txt" - txt = message_file.read_text() - assert "output/output.txt" in txt - assert "1.0Mb" in txt - assert "0.5Mb" in txt - log_file = level4_dir / "metadata/action.log" + log_file = local.get_log_dir(job_definition) / "logs.txt" log = log_file.read_text() - assert "excluded files:" in log - assert "output/output.txt: File size of 1.0Mb is larger that limit of 0.5Mb." in log + assert "Invalid moderately_sensitive outputs:" in log + assert ( + "output/output.txt - File size of 1.0Mb is larger that limit of 0.5Mb." in log + ) + + if not local_run: + level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) + message_file = level4_dir / "output/output.txt.txt" + txt = message_file.read_text() + assert "output/output.txt" in txt + assert "1.0Mb" in txt + assert "0.5Mb" in txt @pytest.mark.needs_docker @@ -551,14 +554,16 @@ def test_finalize_invalid_file_type(docker_cleanup, job_definition, tmp_work_dir txt = message_file.read_text() assert "output/output.rds" in txt - log_file = level4_dir / "metadata/action.log" + log_file = local.get_log_dir(job_definition) / "logs.txt" log = log_file.read_text() - assert "excluded files:" in log - assert "output/output.rds: File type of .rds is not valid level 4 file" in log + assert "Invalid moderately_sensitive outputs:" in log + assert "output/output.rds - File type of .rds is not valid level 4 file" in log @pytest.mark.needs_docker -def test_finalize_patient_id_header(docker_cleanup, job_definition, tmp_work_dir): +def test_finalize_patient_id_header( + docker_cleanup, job_definition, tmp_work_dir, local_run +): job_definition.args = [ "sh", "-c", @@ -588,16 +593,18 @@ def test_finalize_patient_id_header(docker_cleanup, job_definition, tmp_work_dir "output/output.csv": "File has patient_id column", } - level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) - message_file = level4_dir / "output/output.csv.txt" - txt = message_file.read_text() - assert "output/output.csv" in txt - assert "patient_id" in txt - - log_file = level4_dir / "metadata/action.log" + log_file = local.get_log_dir(job_definition) / "logs.txt" log = log_file.read_text() - assert "excluded files:" in log - assert "output/output.csv: File has patient_id column" in log + assert "Invalid moderately_sensitive outputs:" in log + assert "output/output.csv - File has patient_id column" in log + + if not local_run: + level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) + + message_file = level4_dir / "output/output.csv.txt" + txt = message_file.read_text() + assert "output/output.csv" in txt + assert "patient_id" in txt @pytest.mark.needs_docker @@ -727,7 +734,6 @@ def test_running_job_terminated_finalized(docker_cleanup, job_definition, tmp_wo @pytest.mark.needs_docker def test_cleanup_success(docker_cleanup, job_definition, tmp_work_dir, volume_api): - populate_workspace(job_definition.workspace, "output/input.csv") api = local.LocalDockerAPI() @@ -749,7 +755,6 @@ def test_cleanup_success(docker_cleanup, job_definition, tmp_work_dir, volume_ap def test_delete_files_success(tmp_work_dir): - high = populate_workspace("test", "file.txt") medium = populate_workspace("test", "file.txt", privacy="medium") @@ -774,7 +779,6 @@ def test_delete_files_success(tmp_work_dir): def test_delete_files_error(tmp_work_dir): - # use the fact that unlink() on a director raises an error populate_workspace("test", "bad/_") @@ -812,7 +816,6 @@ def inspect(*args, **kwargs): def test_write_read_timestamps( docker_cleanup, job_definition, tmp_work_dir, volume_api ): - assert volume_api.read_timestamp(job_definition, "test") is None volume_api.create_volume(job_definition) @@ -826,7 +829,6 @@ def test_write_read_timestamps( @pytest.mark.needs_docker def test_read_timestamp_stat_fallback(docker_cleanup, job_definition, tmp_work_dir): - volumes.DockerVolumeAPI.create_volume(job_definition) volume_name = volumes.DockerVolumeAPI.volume_name(job_definition) diff --git a/tests/test_run.py b/tests/test_run.py index 28990f22..fc7117b8 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -957,7 +957,6 @@ def test_ignores_cancelled_jobs_when_calculating_dependencies(db): def test_get_obsolete_files_nothing_to_delete(db): - outputs = { "high.txt": "highly_sensitive", "medium.txt": "moderately_sensitive", @@ -974,7 +973,6 @@ def test_get_obsolete_files_nothing_to_delete(db): def test_get_obsolete_files_things_to_delete(db): - old_outputs = { "old_high.txt": "highly_sensitive", "old_medium.txt": "moderately_sensitive", @@ -996,7 +994,6 @@ def test_get_obsolete_files_things_to_delete(db): def test_get_obsolete_files_case_change(db): - old_outputs = { "high.txt": "highly_sensitive", }