Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

level 4 limits #662

Merged
merged 2 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def database_urls_from_env(env):
MAX_WORKERS = int(os.environ.get("MAX_WORKERS") or max(cpu_count() - 1, 1))
MAX_DB_WORKERS = int(os.environ.get("MAX_DB_WORKERS") or MAX_WORKERS)
MAX_RETRIES = int(os.environ.get("MAX_RETRIES", 0))
MAX_LEVEL4_FILESIZE = int(
os.environ.get("MAX_LEVEL4_FILESIZE", 16 * 1024 * 1024)
) # 16mb


STATA_LICENSE = os.environ.get("STATA_LICENSE")
Expand Down
73 changes: 59 additions & 14 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,20 @@ def delete_files(self, workspace, privacy, files):
else:
raise Exception(f"unknown privacy of {privacy}")

errors = []
for name in files:
path = root / name
try:
path.unlink(missing_ok=True)
except Exception:
log.exception(f"Could not delete {path}")
errors.append(name)
return delete_files_from_directory(root, files)

return errors

def delete_files_from_directory(directory, files):
errors = []
for name in files:
path = directory / name
try:
path.unlink(missing_ok=True)
except Exception:
log.exception(f"Could not delete {path}")
errors.append(name)

return errors


def prepare_job(job_definition):
Expand Down Expand Up @@ -411,9 +415,14 @@ def finalize_job(job_definition):
write_job_logs(job_definition, job_metadata, copy_log_to_workspace=False)
else:
write_job_logs(job_definition, job_metadata, copy_log_to_workspace=True)
persist_outputs(job_definition, results.outputs, job_metadata)
excluded = persist_outputs(job_definition, results.outputs, job_metadata)
results.level4_excluded_files.update(**excluded)

RESULTS[job_definition.id] = results

# for ease of testing
return results


def get_job_metadata(job_definition, outputs, container_metadata):
# job_metadata is a big dict capturing everything we know about the state
Expand Down Expand Up @@ -457,25 +466,59 @@ def write_job_logs(job_definition, job_metadata, copy_log_to_workspace=True):
)


MAX_SIZE_MSG = """
The file {filename} was {size}Mb, which is above the limit for
moderately_sensitive files of {limit}Mb.

As such, it has *not* been copied to Level 4 storage. Please double check that
{filename} contains only aggregate information, and is an appropriate size to
be able to be output checked.
"""


def persist_outputs(job_definition, outputs, job_metadata):
"""Copy generated outputs to persistant storage."""
# Extract outputs to workspace
workspace_dir = get_high_privacy_workspace(job_definition.workspace)

excluded_files = {}

def mb(b):
return round(b / (1024 * 1024), 2)

sizes = {}
for filename in outputs.keys():
log.info(f"Extracting output file: {filename}")
volumes.get_volume_api(job_definition).copy_from_volume(
size = volumes.get_volume_api(job_definition).copy_from_volume(
job_definition, filename, workspace_dir / filename
)
sizes[filename] = size

# Copy out medium privacy files
medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace)
if medium_privacy_dir:
for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
volumes.copy_file(
workspace_dir / filename, medium_privacy_dir / filename
)
size = sizes[filename]
message_file = medium_privacy_dir / (filename + ".txt")

if size < job_definition.max_level4_filesize:
volumes.copy_file(
workspace_dir / filename, medium_privacy_dir / filename
)
# if it previously had a too big notice, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])
else:
msg = f"File size of {mb(size)}Mb is larger that limit of {mb(job_definition.max_level4_filesize)}Mb."
excluded_files[filename] = msg
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text(
MAX_SIZE_MSG.format(
filename=filename,
size=mb(size),
limit=mb(job_definition.max_level4_filesize),
)
)

# this can be removed once osrelease is dead
write_manifest_file(
Expand All @@ -487,6 +530,8 @@ def persist_outputs(job_definition, outputs, job_metadata):
},
)

return excluded_files


def find_matching_outputs(job_definition):
"""
Expand Down
6 changes: 4 additions & 2 deletions jobrunner/executors/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def copy_file(source, dest, follow_symlinks=True):
with atomic_writer(dest) as tmp:
shutil.copy(source, tmp, follow_symlinks=follow_symlinks)

return dest.stat().st_size


def docker_volume_name(job):
return f"os-volume-{job.id}"
Expand All @@ -50,7 +52,7 @@ def copy_to_volume(job, src, dst, timeout=None):
docker.copy_to_volume(docker_volume_name(job), src, dst, timeout)

def copy_from_volume(job, src, dst, timeout=None):
docker.copy_from_volume(docker_volume_name(job), src, dst, timeout)
return docker.copy_from_volume(docker_volume_name(job), src, dst, timeout)

def delete_volume(job):
docker.delete_volume(docker_volume_name(job))
Expand Down Expand Up @@ -140,7 +142,7 @@ def copy_to_volume(job, src, dst, timeout=None):
def copy_from_volume(job, src, dst, timeout=None):
# this is only used to copy final outputs/logs.
path = host_volume_path(job) / src
copy_file(path, dst)
return copy_file(path, dst)

def delete_volume(job):

Expand Down
6 changes: 5 additions & 1 deletion jobrunner/job_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Mapping, Optional

Expand Down Expand Up @@ -35,6 +35,7 @@ class JobDefinition:
database_name: str = None
cpu_count: str = None # number of CPUs to be allocated
memory_limit: str = None # memory limit to apply
max_level4_filesize: int = 16 * 1024 * 1024
# if a job has been cancelled, the name of the canceller - either "user" or "admin"
cancelled: str = None

Expand Down Expand Up @@ -78,6 +79,9 @@ class JobResults:
# timestamp these results were finalized, in integer nanoseconds
timestamp_ns: int = None

# files not copied to level 4 (too big or similar reason)
level4_excluded_files: Mapping[str, str] = field(default_factory=dict)

# to be extracted from the image labels
action_version: str = "unknown"
action_revision: str = "unknown"
Expand Down
2 changes: 2 additions & 0 deletions jobrunner/lib/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ def copy_from_volume(volume_name, source, dest, timeout=None):
timeout=timeout,
)

return dest.stat().st_size


def glob_volume_files(volume_name, glob_patterns):
"""
Expand Down
7 changes: 7 additions & 0 deletions jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,12 @@ def save_results(job, job_definition, results):
code = StatusCode.SUCCEEDED
message = "Completed successfully"

if results.level4_excluded_files:
files = "\n".join(
f"{f}: {msg}" for f, msg in results.level4_excluded_files.items()
)
message += f", but {len(results.level4_excluded_files)} file(s) marked as moderately_sensitive were excluded:\n{files}"

set_code(job, code, message, error=error, results=results)


Expand Down Expand Up @@ -550,6 +556,7 @@ def job_to_job_definition(job):
# config defaults.
cpu_count=config.DEFAULT_JOB_CPU_COUNT,
memory_limit=config.DEFAULT_JOB_MEMORY_LIMIT,
max_level4_filesize=config.MAX_LEVEL4_FILESIZE,
cancelled=job_definition_cancelled,
)

Expand Down
97 changes: 89 additions & 8 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a
]
job_definition.inputs = ["output/input.csv"]
job_definition.output_spec = {
"output/output.*": "high_privacy",
"output/summary.*": "medium_privacy",
"output/output.*": "highly_sensitive",
"output/summary.*": "moderately_sensitive",
}
populate_workspace(job_definition.workspace, "output/input.csv")

Expand Down Expand Up @@ -328,8 +328,8 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a
results = api.get_results(job_definition)
assert results.exit_code == 0
assert results.outputs == {
"output/output.csv": "high_privacy",
"output/summary.csv": "medium_privacy",
"output/output.csv": "highly_sensitive",
"output/summary.csv": "moderately_sensitive",
}
assert results.unmatched_patterns == []

Expand All @@ -344,8 +344,8 @@ def test_finalize_failed(docker_cleanup, job_definition, tmp_work_dir, volume_ap

job_definition.args = ["false"]
job_definition.output_spec = {
"output/output.*": "high_privacy",
"output/summary.*": "medium_privacy",
"output/output.*": "highly_sensitive",
"output/summary.*": "moderately_sensitive",
}

api = local.LocalDockerAPI()
Expand Down Expand Up @@ -378,8 +378,8 @@ def test_finalize_unmatched(docker_cleanup, job_definition, tmp_work_dir, volume
# the sleep is needed to make sure the unmatched file is *newer* enough
job_definition.args = ["sh", "-c", "sleep 1; touch /workspace/unmatched"]
job_definition.output_spec = {
"output/output.*": "high_privacy",
"output/summary.*": "medium_privacy",
"output/output.*": "highly_sensitive",
"output/summary.*": "moderately_sensitive",
}

api = local.LocalDockerAPI()
Expand Down Expand Up @@ -473,6 +473,87 @@ def test_finalize_failed_oomkilled(docker_cleanup, job_definition, tmp_work_dir)
assert workspace_log_file_exists(job_definition)


@pytest.mark.needs_docker
def test_finalize_large_level4_outputs(
docker_cleanup, job_definition, tmp_work_dir, volume_api
):
job_definition.args = [
"truncate",
"-s",
str(1024 * 1024),
"/workspace/output/output.csv",
]
job_definition.output_spec = {
"output/output.csv": "moderately_sensitive",
}
job_definition.max_level4_filesize = 512 * 1024

api = local.LocalDockerAPI()

status = api.prepare(job_definition)
assert status.state == ExecutorState.PREPARED
status = api.execute(job_definition)
assert status.state == ExecutorState.EXECUTING

status = wait_for_state(api, job_definition, ExecutorState.EXECUTED)

status = api.finalize(job_definition)
assert status.state == ExecutorState.FINALIZED

result = api.get_results(job_definition)

assert result.exit_code == 0
assert result.level4_excluded_files == {
"output/output.csv": "File size of 1.0Mb is larger that limit of 0.5Mb.",
}

level4_dir = local.get_medium_privacy_workspace(job_definition.workspace)
message_file = level4_dir / "output/output.csv.txt"
txt = message_file.read_text()
assert "output/output.csv" in txt
assert "1.0Mb" in txt
assert "0.5Mb" in txt


@pytest.mark.needs_docker
def test_finalize_large_level4_outputs_cleanup(
docker_cleanup, job_definition, tmp_work_dir, volume_api
):
job_definition.args = [
"truncate",
"-s",
str(256 * 1024),
"/workspace/output/output.csv",
]
job_definition.output_spec = {
"output/output.csv": "moderately_sensitive",
}
job_definition.max_level4_filesize = 512 * 1024

level4_dir = local.get_medium_privacy_workspace(job_definition.workspace)
message_file = level4_dir / "output/output.csv.txt"
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text("message")

api = local.LocalDockerAPI()

status = api.prepare(job_definition)
assert status.state == ExecutorState.PREPARED
status = api.execute(job_definition)
assert status.state == ExecutorState.EXECUTING

status = wait_for_state(api, job_definition, ExecutorState.EXECUTED)

status = api.finalize(job_definition)
assert status.state == ExecutorState.FINALIZED

result = api.get_results(job_definition)

assert result.exit_code == 0
assert result.level4_excluded_files == {}
assert not message_file.exists()


@pytest.mark.needs_docker
def test_pending_job_terminated_not_finalized(
docker_cleanup, job_definition, tmp_work_dir
Expand Down
Loading