Skip to content

Commit

Permalink
Merge pull request #662 from opensafely-core/level-4-limits
Browse files Browse the repository at this point in the history
level 4 limits
  • Loading branch information
bloodearnest authored Oct 11, 2023
2 parents f8c751b + 5c9cf6a commit fb86868
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 42 deletions.
3 changes: 3 additions & 0 deletions jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def database_urls_from_env(env):
MAX_WORKERS = int(os.environ.get("MAX_WORKERS") or max(cpu_count() - 1, 1))
MAX_DB_WORKERS = int(os.environ.get("MAX_DB_WORKERS") or MAX_WORKERS)
MAX_RETRIES = int(os.environ.get("MAX_RETRIES", 0))
MAX_LEVEL4_FILESIZE = int(
os.environ.get("MAX_LEVEL4_FILESIZE", 16 * 1024 * 1024)
) # 16mb


STATA_LICENSE = os.environ.get("STATA_LICENSE")
Expand Down
73 changes: 59 additions & 14 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,20 @@ def delete_files(self, workspace, privacy, files):
else:
raise Exception(f"unknown privacy of {privacy}")

errors = []
for name in files:
path = root / name
try:
path.unlink(missing_ok=True)
except Exception:
log.exception(f"Could not delete {path}")
errors.append(name)
return delete_files_from_directory(root, files)

return errors

def delete_files_from_directory(directory, files):
errors = []
for name in files:
path = directory / name
try:
path.unlink(missing_ok=True)
except Exception:
log.exception(f"Could not delete {path}")
errors.append(name)

return errors


def prepare_job(job_definition):
Expand Down Expand Up @@ -411,9 +415,14 @@ def finalize_job(job_definition):
write_job_logs(job_definition, job_metadata, copy_log_to_workspace=False)
else:
write_job_logs(job_definition, job_metadata, copy_log_to_workspace=True)
persist_outputs(job_definition, results.outputs, job_metadata)
excluded = persist_outputs(job_definition, results.outputs, job_metadata)
results.level4_excluded_files.update(**excluded)

RESULTS[job_definition.id] = results

# for ease of testing
return results


def get_job_metadata(job_definition, outputs, container_metadata):
# job_metadata is a big dict capturing everything we know about the state
Expand Down Expand Up @@ -457,25 +466,59 @@ def write_job_logs(job_definition, job_metadata, copy_log_to_workspace=True):
)


MAX_SIZE_MSG = """
The file {filename} was {size}Mb, which is above the limit for
moderately_sensitive files of {limit}Mb.
As such, it has *not* been copied to Level 4 storage. Please double check that
{filename} contains only aggregate information, and is an appropriate size to
be able to be output checked.
"""


def persist_outputs(job_definition, outputs, job_metadata):
"""Copy generated outputs to persistant storage."""
# Extract outputs to workspace
workspace_dir = get_high_privacy_workspace(job_definition.workspace)

excluded_files = {}

def mb(b):
return round(b / (1024 * 1024), 2)

sizes = {}
for filename in outputs.keys():
log.info(f"Extracting output file: {filename}")
volumes.get_volume_api(job_definition).copy_from_volume(
size = volumes.get_volume_api(job_definition).copy_from_volume(
job_definition, filename, workspace_dir / filename
)
sizes[filename] = size

# Copy out medium privacy files
medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace)
if medium_privacy_dir:
for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
volumes.copy_file(
workspace_dir / filename, medium_privacy_dir / filename
)
size = sizes[filename]
message_file = medium_privacy_dir / (filename + ".txt")

if size < job_definition.max_level4_filesize:
volumes.copy_file(
workspace_dir / filename, medium_privacy_dir / filename
)
# if it previously had a too big notice, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])
else:
msg = f"File size of {mb(size)}Mb is larger that limit of {mb(job_definition.max_level4_filesize)}Mb."
excluded_files[filename] = msg
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text(
MAX_SIZE_MSG.format(
filename=filename,
size=mb(size),
limit=mb(job_definition.max_level4_filesize),
)
)

# this can be removed once osrelease is dead
write_manifest_file(
Expand All @@ -487,6 +530,8 @@ def persist_outputs(job_definition, outputs, job_metadata):
},
)

return excluded_files


def find_matching_outputs(job_definition):
"""
Expand Down
6 changes: 4 additions & 2 deletions jobrunner/executors/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def copy_file(source, dest, follow_symlinks=True):
with atomic_writer(dest) as tmp:
shutil.copy(source, tmp, follow_symlinks=follow_symlinks)

return dest.stat().st_size


def docker_volume_name(job):
return f"os-volume-{job.id}"
Expand All @@ -50,7 +52,7 @@ def copy_to_volume(job, src, dst, timeout=None):
docker.copy_to_volume(docker_volume_name(job), src, dst, timeout)

def copy_from_volume(job, src, dst, timeout=None):
docker.copy_from_volume(docker_volume_name(job), src, dst, timeout)
return docker.copy_from_volume(docker_volume_name(job), src, dst, timeout)

def delete_volume(job):
docker.delete_volume(docker_volume_name(job))
Expand Down Expand Up @@ -140,7 +142,7 @@ def copy_to_volume(job, src, dst, timeout=None):
def copy_from_volume(job, src, dst, timeout=None):
# this is only used to copy final outputs/logs.
path = host_volume_path(job) / src
copy_file(path, dst)
return copy_file(path, dst)

def delete_volume(job):

Expand Down
6 changes: 5 additions & 1 deletion jobrunner/job_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Mapping, Optional

Expand Down Expand Up @@ -35,6 +35,7 @@ class JobDefinition:
database_name: str = None
cpu_count: str = None # number of CPUs to be allocated
memory_limit: str = None # memory limit to apply
max_level4_filesize: int = 16 * 1024 * 1024
# if a job has been cancelled, the name of the canceller - either "user" or "admin"
cancelled: str = None

Expand Down Expand Up @@ -78,6 +79,9 @@ class JobResults:
# timestamp these results were finalized, in integer nanoseconds
timestamp_ns: int = None

# files not copied to level 4 (too big or similar reason)
level4_excluded_files: Mapping[str, str] = field(default_factory=dict)

# to be extracted from the image labels
action_version: str = "unknown"
action_revision: str = "unknown"
Expand Down
2 changes: 2 additions & 0 deletions jobrunner/lib/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ def copy_from_volume(volume_name, source, dest, timeout=None):
timeout=timeout,
)

return dest.stat().st_size


def glob_volume_files(volume_name, glob_patterns):
"""
Expand Down
7 changes: 7 additions & 0 deletions jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,12 @@ def save_results(job, job_definition, results):
code = StatusCode.SUCCEEDED
message = "Completed successfully"

if results.level4_excluded_files:
files = "\n".join(
f"{f}: {msg}" for f, msg in results.level4_excluded_files.items()
)
message += f", but {len(results.level4_excluded_files)} file(s) marked as moderately_sensitive were excluded:\n{files}"

set_code(job, code, message, error=error, results=results)


Expand Down Expand Up @@ -550,6 +556,7 @@ def job_to_job_definition(job):
# config defaults.
cpu_count=config.DEFAULT_JOB_CPU_COUNT,
memory_limit=config.DEFAULT_JOB_MEMORY_LIMIT,
max_level4_filesize=config.MAX_LEVEL4_FILESIZE,
cancelled=job_definition_cancelled,
)

Expand Down
97 changes: 89 additions & 8 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a
]
job_definition.inputs = ["output/input.csv"]
job_definition.output_spec = {
"output/output.*": "high_privacy",
"output/summary.*": "medium_privacy",
"output/output.*": "highly_sensitive",
"output/summary.*": "moderately_sensitive",
}
populate_workspace(job_definition.workspace, "output/input.csv")

Expand Down Expand Up @@ -328,8 +328,8 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a
results = api.get_results(job_definition)
assert results.exit_code == 0
assert results.outputs == {
"output/output.csv": "high_privacy",
"output/summary.csv": "medium_privacy",
"output/output.csv": "highly_sensitive",
"output/summary.csv": "moderately_sensitive",
}
assert results.unmatched_patterns == []

Expand All @@ -344,8 +344,8 @@ def test_finalize_failed(docker_cleanup, job_definition, tmp_work_dir, volume_ap

job_definition.args = ["false"]
job_definition.output_spec = {
"output/output.*": "high_privacy",
"output/summary.*": "medium_privacy",
"output/output.*": "highly_sensitive",
"output/summary.*": "moderately_sensitive",
}

api = local.LocalDockerAPI()
Expand Down Expand Up @@ -378,8 +378,8 @@ def test_finalize_unmatched(docker_cleanup, job_definition, tmp_work_dir, volume
# the sleep is needed to make sure the unmatched file is *newer* enough
job_definition.args = ["sh", "-c", "sleep 1; touch /workspace/unmatched"]
job_definition.output_spec = {
"output/output.*": "high_privacy",
"output/summary.*": "medium_privacy",
"output/output.*": "highly_sensitive",
"output/summary.*": "moderately_sensitive",
}

api = local.LocalDockerAPI()
Expand Down Expand Up @@ -473,6 +473,87 @@ def test_finalize_failed_oomkilled(docker_cleanup, job_definition, tmp_work_dir)
assert workspace_log_file_exists(job_definition)


@pytest.mark.needs_docker
def test_finalize_large_level4_outputs(
docker_cleanup, job_definition, tmp_work_dir, volume_api
):
job_definition.args = [
"truncate",
"-s",
str(1024 * 1024),
"/workspace/output/output.csv",
]
job_definition.output_spec = {
"output/output.csv": "moderately_sensitive",
}
job_definition.max_level4_filesize = 512 * 1024

api = local.LocalDockerAPI()

status = api.prepare(job_definition)
assert status.state == ExecutorState.PREPARED
status = api.execute(job_definition)
assert status.state == ExecutorState.EXECUTING

status = wait_for_state(api, job_definition, ExecutorState.EXECUTED)

status = api.finalize(job_definition)
assert status.state == ExecutorState.FINALIZED

result = api.get_results(job_definition)

assert result.exit_code == 0
assert result.level4_excluded_files == {
"output/output.csv": "File size of 1.0Mb is larger that limit of 0.5Mb.",
}

level4_dir = local.get_medium_privacy_workspace(job_definition.workspace)
message_file = level4_dir / "output/output.csv.txt"
txt = message_file.read_text()
assert "output/output.csv" in txt
assert "1.0Mb" in txt
assert "0.5Mb" in txt


@pytest.mark.needs_docker
def test_finalize_large_level4_outputs_cleanup(
docker_cleanup, job_definition, tmp_work_dir, volume_api
):
job_definition.args = [
"truncate",
"-s",
str(256 * 1024),
"/workspace/output/output.csv",
]
job_definition.output_spec = {
"output/output.csv": "moderately_sensitive",
}
job_definition.max_level4_filesize = 512 * 1024

level4_dir = local.get_medium_privacy_workspace(job_definition.workspace)
message_file = level4_dir / "output/output.csv.txt"
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text("message")

api = local.LocalDockerAPI()

status = api.prepare(job_definition)
assert status.state == ExecutorState.PREPARED
status = api.execute(job_definition)
assert status.state == ExecutorState.EXECUTING

status = wait_for_state(api, job_definition, ExecutorState.EXECUTED)

status = api.finalize(job_definition)
assert status.state == ExecutorState.FINALIZED

result = api.get_results(job_definition)

assert result.exit_code == 0
assert result.level4_excluded_files == {}
assert not message_file.exists()


@pytest.mark.needs_docker
def test_pending_job_terminated_not_finalized(
docker_cleanup, job_definition, tmp_work_dir
Expand Down
Loading

0 comments on commit fb86868

Please sign in to comment.