Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

local l4 checks #677

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion jobrunner/cli/local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,13 @@ def create_and_run_jobs(
print(" outputs:")
outputs = sorted(job.outputs.items()) if job.outputs else []
print(tabulate(outputs, separator=" - ", indent=5, empty="(no outputs)"))

if job.level4_excluded_files:
print(" invalid moderately_sensitive outputs:")
print(
tabulate(job.level4_excluded_files.items(), separator=" - ", indent=5)
)

# If a job exited with an error code then try to display the end of the
# log output in case that makes the problem immediately obvious
if job.status_code == StatusCode.NONZERO_EXIT:
Expand All @@ -440,7 +447,6 @@ def create_and_run_jobs(


def create_job_request_and_jobs(project_dir, actions, force_run_dependencies):

job_request = JobRequest(
id=random_id(),
repo_url=str(project_dir),
Expand Down
45 changes: 24 additions & 21 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ def execute(self, job_definition):
return JobStatus(ExecutorState.EXECUTING)

def finalize(self, job_definition):

current_status = self.get_status(job_definition)
if current_status.state == ExecutorState.UNKNOWN:
# job had not started running, so do not finalize
Expand Down Expand Up @@ -488,13 +487,18 @@ def persist_outputs(job_definition, outputs, job_metadata):

# Copy out medium privacy files
medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace)
if medium_privacy_dir:
for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
ok, job_msg, file_msg = check_l4_file(
job_definition, filename, sizes[filename], workspace_dir
)

for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
ok, job_msg, file_msg = check_l4_file(
job_definition, filename, sizes[filename], workspace_dir
)

if not ok:
excluded_files[filename] = job_msg

# local run currently does not have a level 4 directory
if medium_privacy_dir:
message_file = medium_privacy_dir / (filename + ".txt")

if ok:
Expand All @@ -504,19 +508,18 @@ def persist_outputs(job_definition, outputs, job_metadata):
# if it previously had a too big notice, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])
else:
excluded_files[filename] = job_msg
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text(file_msg)

# this can be removed once osrelease is dead
write_manifest_file(
medium_privacy_dir,
{
# this currently needs to exist, but is not used
"repo": None,
"workspace": job_definition.workspace,
},
)
# this can be removed once osrelease is dead
write_manifest_file(
medium_privacy_dir,
{
# this currently needs to exist, but is not used
"repo": None,
"workspace": job_definition.workspace,
},
)

return excluded_files

Expand Down Expand Up @@ -575,7 +578,6 @@ def mb(b):
file_msgs.append(INVALID_FILE_TYPE_MSG.format(filename=filename, suffix=suffix))

elif suffix == ".csv":

# note: this assumes the local executor can directly access the long term storage on disk
# this may need to be abstracted in future
actual_file = workspace_dir / filename
Expand Down Expand Up @@ -664,9 +666,10 @@ def write_log_file(job_definition, job_metadata, filename, excluded):
f.write("\noutputs:\n")
f.write(tabulate(outputs, separator=" - ", indent=2, empty="(no outputs)"))
if excluded:
f.write("\nexcluded files:\n")
for excluded_file, msg in excluded.items():
f.write(f"{excluded_file}: {msg}")
f.write("\n")
f.write("\nInvalid moderately_sensitive outputs:\n")
f.write(tabulate(excluded.items(), separator=" - ", indent=2))
f.write("\n")


# Keys of fields to log in manifest.json and log file
Expand Down
3 changes: 0 additions & 3 deletions jobrunner/executors/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def docker_volume_name(job):


class DockerVolumeAPI:

# don't run with UIDs for now. We maybe be able to support this in future.
requires_root = True
supported_platforms = ("linux", "win32", "darwin")
Expand Down Expand Up @@ -91,7 +90,6 @@ def host_volume_path(job, create=True):


class BindMountVolumeAPI:

# Only works running jobs with uid:gid
requires_root = False
supported_platforms = ("linux",)
Expand Down Expand Up @@ -145,7 +143,6 @@ def copy_from_volume(job, src, dst, timeout=None):
return copy_file(path, dst)

def delete_volume(job):

failed_files = {}

# if we logged each file error directly, it would spam the logs, so we collect them
Expand Down
1 change: 0 additions & 1 deletion jobrunner/lib/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def create_table(conn, cls):


def migrate_db(conn, migrations=None, verbose=False):

current_version = conn.execute("PRAGMA user_version").fetchone()[0]
applied = []

Expand Down
1 change: 0 additions & 1 deletion jobrunner/lib/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def configure_logging(


class JobRunnerFormatter(logging.Formatter):

converter = time.gmtime # utc rather than local
default_msec_format = "%s.%03dZ" # s/,/. and append Z

Expand Down
11 changes: 10 additions & 1 deletion jobrunner/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class State(Enum):


class StatusCode(Enum):

# PENDING states
#
# initial state of a job, not yet running
Expand Down Expand Up @@ -158,6 +157,7 @@ class Job:
completed_at INT,
trace_context TEXT,
status_code_updated_at INT,
level4_excluded_files TEXT,

PRIMARY KEY (id)
);
Expand All @@ -180,6 +180,13 @@ class Job:
""",
)

migration(
2,
"""
ALTER TABLE job ADD COLUMN level4_excluded_files TEXT;
""",
)

id: str = None # noqa: A003
job_request_id: str = None
state: State = None
Expand Down Expand Up @@ -238,6 +245,8 @@ class Job:
# used to track the OTel trace context for this job
trace_context: dict = None

level4_excluded_files: dict = None

# used to cache the job_request json by the tracing code
_job_request = None

Expand Down
4 changes: 1 addition & 3 deletions jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def handle_job(job, api, mode=None, paused=None):
new_status = api.execute(job_definition)

elif initial_status.state == ExecutorState.EXECUTED:

if ExecutorState.FINALIZING in synchronous_transitions:
# finalize is synchronous, which means set our code to FINALIZING
# before calling api.finalize(), and we expect it to be FINALIZED
Expand Down Expand Up @@ -435,6 +434,7 @@ def save_results(job, job_definition, results):
"""Extract the results of the execution and update the job accordingly."""
# save job outputs
job.outputs = results.outputs
job.level4_excluded_files = results.level4_excluded_files

message = None
error = False
Expand Down Expand Up @@ -493,7 +493,6 @@ def get_obsolete_files(job_definition, outputs):


def job_to_job_definition(job):

allow_database_access = False
env = {"OPENSAFELY_BACKEND": config.BACKEND}
if job.requires_db:
Expand Down Expand Up @@ -599,7 +598,6 @@ def set_code(

# if status code has changed then trace it and update
if job.status_code != new_status_code:

# handle timer measurement errors
if job.status_code_updated_at > timestamp_ns:
# we somehow have a negative duration, which honeycomb does funny things with.
Expand Down
1 change: 0 additions & 1 deletion tests/cli/test_kill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

@pytest.mark.parametrize("cleanup", [False, True])
def test_kill_job(cleanup, tmp_work_dir, db, monkeypatch):

job = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING)

mocker = mock.MagicMock(spec=local.docker)
Expand Down
31 changes: 31 additions & 0 deletions tests/cli/test_local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import pytest
from pipeline import load_pipeline
from ruyaml import YAML

from jobrunner import config
from jobrunner.actions import get_action_specification
Expand Down Expand Up @@ -40,6 +41,36 @@ def test_local_run_limits_applied(db, tmp_path, docker_cleanup):
assert metadata["HostConfig"]["NanoCpus"] == 1.5 * 1e9


@pytest.mark.slow_test
@pytest.mark.needs_docker
def test_local_run_level_4_checks_applied_and_logged(
db, tmp_path, docker_cleanup, capsys
):
project_dir = tmp_path / "project"
shutil.copytree(str(FIXTURE_DIR / "full_project"), project_dir)
project_yaml = project_dir / "project.yaml"
yaml = YAML()
project = yaml.load(project_yaml)
outputs = project["actions"]["generate_dataset"]["outputs"].pop("highly_sensitive")
project["actions"]["generate_dataset"]["outputs"]["moderately_sensitive"] = outputs
yaml.dump(project, project_yaml)

local_run.main(
project_dir=project_dir,
actions=["generate_dataset"],
debug=True, # preserves containers for inspection
)

job = list(database.find_all(Job))[0]
assert job.level4_excluded_files == {
"output/dataset.csv": "File has patient_id column"
}

stdout = capsys.readouterr().out
assert "invalid moderately_sensitive outputs:" in stdout
assert "output/dataset.csv - File has patient_id column" in stdout


@pytest.mark.parametrize("extraction_tool", ["cohortextractor", "databuilder"])
@pytest.mark.slow_test
@pytest.mark.needs_docker
Expand Down
1 change: 0 additions & 1 deletion tests/cli/test_prepare_for_reboot.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@


def test_prepare_for_reboot(db, monkeypatch):

j1 = job_factory(state=State.RUNNING, status_code=StatusCode.EXECUTING)
j2 = job_factory(
state=State.PENDING, status_code=StatusCode.WAITING_ON_DEPENDENCIES
Expand Down
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ def db(monkeypatch, request):

@dataclass
class SubprocessStub:

calls: deque = field(default_factory=deque)

def add_call(self, cmd, **kwargs):
Expand Down
1 change: 0 additions & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ class StubExecutorAPI:
synchronous_transitions = []

def __init__(self):

self.tracker = {
"prepare": set(),
"execute": set(),
Expand Down
1 change: 0 additions & 1 deletion tests/lib/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def test_migrate_in_transaction(tmp_path):


def test_ensure_valid_db(tmp_path):

# db doesn't exists
with pytest.raises(MigrationNeeded) as exc:
ensure_valid_db("not_exists")
Expand Down
Loading