Skip to content

Commit

Permalink
Merge pull request #677 from opensafely-core/local-l4-checks
Browse files Browse the repository at this point in the history
local l4 checks
  • Loading branch information
bloodearnest authored Nov 13, 2023
2 parents bbf52e6 + 7cb29d4 commit fb80525
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 76 deletions.
8 changes: 7 additions & 1 deletion jobrunner/cli/local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,13 @@ def create_and_run_jobs(
print(" outputs:")
outputs = sorted(job.outputs.items()) if job.outputs else []
print(tabulate(outputs, separator=" - ", indent=5, empty="(no outputs)"))

if job.level4_excluded_files:
print(" invalid moderately_sensitive outputs:")
print(
tabulate(job.level4_excluded_files.items(), separator=" - ", indent=5)
)

# If a job exited with an error code then try to display the end of the
# log output in case that makes the problem immediately obvious
if job.status_code == StatusCode.NONZERO_EXIT:
Expand All @@ -440,7 +447,6 @@ def create_and_run_jobs(


def create_job_request_and_jobs(project_dir, actions, force_run_dependencies):

job_request = JobRequest(
id=random_id(),
repo_url=str(project_dir),
Expand Down
45 changes: 24 additions & 21 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ def execute(self, job_definition):
return JobStatus(ExecutorState.EXECUTING)

def finalize(self, job_definition):

current_status = self.get_status(job_definition)
if current_status.state == ExecutorState.UNKNOWN:
# job had not started running, so do not finalize
Expand Down Expand Up @@ -488,13 +487,18 @@ def persist_outputs(job_definition, outputs, job_metadata):

# Copy out medium privacy files
medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace)
if medium_privacy_dir:
for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
ok, job_msg, file_msg = check_l4_file(
job_definition, filename, sizes[filename], workspace_dir
)

for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
ok, job_msg, file_msg = check_l4_file(
job_definition, filename, sizes[filename], workspace_dir
)

if not ok:
excluded_files[filename] = job_msg

# local run currently does not have a level 4 directory
if medium_privacy_dir:
message_file = medium_privacy_dir / (filename + ".txt")

if ok:
Expand All @@ -504,19 +508,18 @@ def persist_outputs(job_definition, outputs, job_metadata):
# if it previously had a too big notice, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])
else:
excluded_files[filename] = job_msg
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text(file_msg)

# this can be removed once osrelease is dead
write_manifest_file(
medium_privacy_dir,
{
# this currently needs to exist, but is not used
"repo": None,
"workspace": job_definition.workspace,
},
)
# this can be removed once osrelease is dead
write_manifest_file(
medium_privacy_dir,
{
# this currently needs to exist, but is not used
"repo": None,
"workspace": job_definition.workspace,
},
)

return excluded_files

Expand Down Expand Up @@ -575,7 +578,6 @@ def mb(b):
file_msgs.append(INVALID_FILE_TYPE_MSG.format(filename=filename, suffix=suffix))

elif suffix == ".csv":

# note: this assumes the local executor can directly access the long term storage on disk
# this may need to be abstracted in future
actual_file = workspace_dir / filename
Expand Down Expand Up @@ -664,9 +666,10 @@ def write_log_file(job_definition, job_metadata, filename, excluded):
f.write("\noutputs:\n")
f.write(tabulate(outputs, separator=" - ", indent=2, empty="(no outputs)"))
if excluded:
f.write("\nexcluded files:\n")
for excluded_file, msg in excluded.items():
f.write(f"{excluded_file}: {msg}")
f.write("\n")
f.write("\nInvalid moderately_sensitive outputs:\n")
f.write(tabulate(excluded.items(), separator=" - ", indent=2))
f.write("\n")


# Keys of fields to log in manifest.json and log file
Expand Down
3 changes: 0 additions & 3 deletions jobrunner/executors/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def docker_volume_name(job):


class DockerVolumeAPI:

# don't run with UIDs for now. We maybe be able to support this in future.
requires_root = True
supported_platforms = ("linux", "win32", "darwin")
Expand Down Expand Up @@ -91,7 +90,6 @@ def host_volume_path(job, create=True):


class BindMountVolumeAPI:

# Only works running jobs with uid:gid
requires_root = False
supported_platforms = ("linux",)
Expand Down Expand Up @@ -145,7 +143,6 @@ def copy_from_volume(job, src, dst, timeout=None):
return copy_file(path, dst)

def delete_volume(job):

failed_files = {}

# if we logged each file error directly, it would spam the logs, so we collect them
Expand Down
1 change: 0 additions & 1 deletion jobrunner/lib/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def create_table(conn, cls):


def migrate_db(conn, migrations=None, verbose=False):

current_version = conn.execute("PRAGMA user_version").fetchone()[0]
applied = []

Expand Down
1 change: 0 additions & 1 deletion jobrunner/lib/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def configure_logging(


class JobRunnerFormatter(logging.Formatter):

converter = time.gmtime # utc rather than local
default_msec_format = "%s.%03dZ" # s/,/. and append Z

Expand Down
11 changes: 10 additions & 1 deletion jobrunner/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class State(Enum):


class StatusCode(Enum):

# PENDING states
#
# initial state of a job, not yet running
Expand Down Expand Up @@ -158,6 +157,7 @@ class Job:
completed_at INT,
trace_context TEXT,
status_code_updated_at INT,
level4_excluded_files TEXT,
PRIMARY KEY (id)
);
Expand All @@ -180,6 +180,13 @@ class Job:
""",
)

migration(
2,
"""
ALTER TABLE job ADD COLUMN level4_excluded_files TEXT;
""",
)

id: str = None # noqa: A003
job_request_id: str = None
state: State = None
Expand Down Expand Up @@ -238,6 +245,8 @@ class Job:
# used to track the OTel trace context for this job
trace_context: dict = None

level4_excluded_files: dict = None

# used to cache the job_request json by the tracing code
_job_request = None

Expand Down
4 changes: 1 addition & 3 deletions jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def handle_job(job, api, mode=None, paused=None):
new_status = api.execute(job_definition)

elif initial_status.state == ExecutorState.EXECUTED:

if ExecutorState.FINALIZING in synchronous_transitions:
# finalize is synchronous, which means set our code to FINALIZING
# before calling api.finalize(), and we expect it to be FINALIZED
Expand Down Expand Up @@ -435,6 +434,7 @@ def save_results(job, job_definition, results):
"""Extract the results of the execution and update the job accordingly."""
# save job outputs
job.outputs = results.outputs
job.level4_excluded_files = results.level4_excluded_files

message = None
error = False
Expand Down Expand Up @@ -493,7 +493,6 @@ def get_obsolete_files(job_definition, outputs):


def job_to_job_definition(job):

allow_database_access = False
env = {"OPENSAFELY_BACKEND": config.BACKEND}
if job.requires_db:
Expand Down Expand Up @@ -599,7 +598,6 @@ def set_code(

# if status code has changed then trace it and update
if job.status_code != new_status_code:

# handle timer measurement errors
if job.status_code_updated_at > timestamp_ns:
# we somehow have a negative duration, which honeycomb does funny things with.
Expand Down
1 change: 0 additions & 1 deletion tests/cli/test_kill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

@pytest.mark.parametrize("cleanup", [False, True])
def test_kill_job(cleanup, tmp_work_dir, db, monkeypatch):

job = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING)

mocker = mock.MagicMock(spec=local.docker)
Expand Down
31 changes: 31 additions & 0 deletions tests/cli/test_local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import pytest
from pipeline import load_pipeline
from ruyaml import YAML

from jobrunner import config
from jobrunner.actions import get_action_specification
Expand Down Expand Up @@ -40,6 +41,36 @@ def test_local_run_limits_applied(db, tmp_path, docker_cleanup):
assert metadata["HostConfig"]["NanoCpus"] == 1.5 * 1e9


@pytest.mark.slow_test
@pytest.mark.needs_docker
def test_local_run_level_4_checks_applied_and_logged(
db, tmp_path, docker_cleanup, capsys
):
project_dir = tmp_path / "project"
shutil.copytree(str(FIXTURE_DIR / "full_project"), project_dir)
project_yaml = project_dir / "project.yaml"
yaml = YAML()
project = yaml.load(project_yaml)
outputs = project["actions"]["generate_dataset"]["outputs"].pop("highly_sensitive")
project["actions"]["generate_dataset"]["outputs"]["moderately_sensitive"] = outputs
yaml.dump(project, project_yaml)

local_run.main(
project_dir=project_dir,
actions=["generate_dataset"],
debug=True, # preserves containers for inspection
)

job = list(database.find_all(Job))[0]
assert job.level4_excluded_files == {
"output/dataset.csv": "File has patient_id column"
}

stdout = capsys.readouterr().out
assert "invalid moderately_sensitive outputs:" in stdout
assert "output/dataset.csv - File has patient_id column" in stdout


@pytest.mark.parametrize("extraction_tool", ["cohortextractor", "databuilder"])
@pytest.mark.slow_test
@pytest.mark.needs_docker
Expand Down
1 change: 0 additions & 1 deletion tests/cli/test_prepare_for_reboot.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@


def test_prepare_for_reboot(db, monkeypatch):

j1 = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING)
j2 = job_factory(
state=State.PENDING, status_code=StatusCode.WAITING_ON_DEPENDENCIES
Expand Down
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ def db(monkeypatch, request):

@dataclass
class SubprocessStub:

calls: deque = field(default_factory=deque)

def add_call(self, cmd, **kwargs):
Expand Down
1 change: 0 additions & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ class StubExecutorAPI:
synchronous_transitions = []

def __init__(self):

self.tracker = {
"prepare": set(),
"execute": set(),
Expand Down
1 change: 0 additions & 1 deletion tests/lib/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def test_migrate_in_transaction(tmp_path):


def test_ensure_valid_db(tmp_path):

# db doesn't exists
with pytest.raises(MigrationNeeded) as exc:
ensure_valid_db("not_exists")
Expand Down
Loading

0 comments on commit fb80525

Please sign in to comment.