Skip to content

Commit

Permalink
Expose metrics from executor.
Browse files Browse the repository at this point in the history
This modifies local executor get_status() to return any collected
metrics.

It did mean altering the record_tick_trace function to make it more
testable, but its an improvement anyway.
  • Loading branch information
bloodearnest committed Dec 11, 2023
1 parent 571164c commit 20e0235
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 28 deletions.
29 changes: 21 additions & 8 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from pipeline.legacy import get_all_output_patterns_from_project_file

from jobrunner import config
from jobrunner import config, models
from jobrunner.executors import volumes
from jobrunner.job_executor import (
ExecutorAPI,
Expand All @@ -22,7 +22,7 @@
JobStatus,
Privacy,
)
from jobrunner.lib import datestr_to_ns_timestamp, docker
from jobrunner.lib import database, datestr_to_ns_timestamp, docker
from jobrunner.lib.git import checkout_commit
from jobrunner.lib.path_utils import list_dir_with_ignore_patterns
from jobrunner.lib.string_utils import tabulate
Expand Down Expand Up @@ -249,10 +249,16 @@ def get_status(self, job_definition, timeout=15):
# jobs prepared but not running do not need to finalize, so we
# proceed directly to the FINALIZED state here
return JobStatus(
ExecutorState.FINALIZED, "Prepared job was cancelled"
ExecutorState.FINALIZED,
"Prepared job was cancelled",
metrics=metrics,
)
else:
return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled")
return JobStatus(
ExecutorState.UNKNOWN,
"Pending job was cancelled",
metrics=metrics,
)

# timestamp file presence means we have finished preparing
timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp(
Expand All @@ -263,24 +269,31 @@ def get_status(self, job_definition, timeout=15):
# re-prepare it anyway.
if timestamp_ns is None:
# we are Jon Snow
return JobStatus(ExecutorState.UNKNOWN)
return JobStatus(ExecutorState.UNKNOWN, metrics={})
else:
# we've finish preparing
return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics
)

if container["State"]["Running"]:
timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"])
return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics
)
elif job_definition.id in RESULTS:
return JobStatus(
ExecutorState.FINALIZED,
timestamp_ns=RESULTS[job_definition.id].timestamp_ns,
metrics=metrics,
)
else:
# container present but not running, i.e. finished
# Nb. this does not include prepared jobs, as they have a volume but not a container
timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics
)

def get_results(self, job_definition):
if job_definition.id not in RESULTS:
Expand Down
1 change: 1 addition & 0 deletions jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class JobStatus:
timestamp_ns: int = (
None # timestamp this JobStatus occurred, in integer nanoseconds
)
metrics: dict = field(default_factory=dict)


@dataclass
Expand Down
9 changes: 4 additions & 5 deletions jobrunner/record_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ def main():
last_run = None
while True:
before = time.time()
last_run = record_tick_trace(last_run)
active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)
last_run = record_tick_trace(last_run, active_jobs)

# record_tick_trace might have take a while, so sleep the remainding interval
# enforce a minimum time of 3s to ensure we don't hammer honeycomb or
Expand Down Expand Up @@ -120,10 +123,6 @@ def record_tick_trace(last_run, active_jobs):
end_time = now
duration_s = int((end_time - start_time) / 1e9)

active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)

with tracer.start_as_current_span(
"TICK", start_time=start_time, attributes=trace_attrs
) as root:
Expand Down
52 changes: 43 additions & 9 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

import pytest

from jobrunner import config
from jobrunner import config, models, record_stats
from jobrunner.executors import local, volumes
from jobrunner.job_executor import ExecutorState, JobDefinition, Privacy, Study
from jobrunner.lib import datestr_to_ns_timestamp, docker
from tests.conftest import SUPPORTED_VOLUME_APIS
from tests.factories import ensure_docker_images_present
from tests.factories import ensure_docker_images_present, job_factory


# this is parametized fixture, and test using it will run multiple times, once
Expand Down Expand Up @@ -215,7 +215,7 @@ def test_prepare_job_no_input_file(docker_cleanup, job_definition, volume_api):


@pytest.mark.needs_docker
def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, volume_api):
def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, db, volume_api):
# check limits are applied
job_definition.cpu_count = 1.5
job_definition.memory_limit = "1G"
Expand All @@ -229,18 +229,52 @@ def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, volume_ap
status = api.execute(job_definition)
assert status.state == ExecutorState.EXECUTING

# could be in either state
assert api.get_status(job_definition).state in (
ExecutorState.EXECUTING,
ExecutorState.EXECUTED,
)

container_data = docker.container_inspect(local.container_name(job_definition))
assert container_data["State"]["ExitCode"] == 0
assert container_data["HostConfig"]["NanoCpus"] == int(1.5 * 1e9)
assert container_data["HostConfig"]["Memory"] == 2**30 # 1G


@pytest.mark.needs_docker
def test_execute_metrics(docker_cleanup, job_definition, tmp_work_dir, db):
job_definition.args = ["sleep", "10"]
last_run = time.time_ns()

api = local.LocalDockerAPI()

status = api.prepare(job_definition)
assert status.state == ExecutorState.PREPARED

# we need scheduler job state to be able to collect stats
job = job_factory(
id=job_definition.id,
state=models.State.RUNNING,
status_code=models.StatusCode.EXECUTING,
started_at=int(last_run / 1e9),
)

status = api.execute(job_definition)
assert status.state == ExecutorState.EXECUTING

# simulate stats thread collecting stats
record_stats.record_tick_trace(last_run, [job])

docker.kill(local.container_name(job_definition))

status = wait_for_state(api, job_definition, ExecutorState.EXECUTED)

assert list(status.metrics.keys()) == [
"cpu_sample",
"cpu_cumsum",
"cpu_mean",
"cpu_peak",
"mem_sample",
"mem_cumsum",
"mem_mean",
"mem_peak",
]


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "darwin", reason="linux/darwin only"
)
Expand Down
12 changes: 6 additions & 6 deletions tests/test_record_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ def test_record_tick_trace(db, freezer, monkeypatch):
# this should not be tick'd
job_factory(state=State.SUCCEEDED, status_code=StatusCode.SUCCEEDED)

last_run1 = record_stats.record_tick_trace(None)
last_run1 = record_stats.record_tick_trace(None, jobs)
assert len(get_trace("ticks")) == 0

freezer.tick(10)

last_run2 = record_stats.record_tick_trace(last_run1)
last_run2 = record_stats.record_tick_trace(last_run1, jobs)
assert last_run2 == last_run1 + 10 * 1e9

spans = get_trace("ticks")
Expand Down Expand Up @@ -74,7 +74,7 @@ def test_record_tick_trace(db, freezer, monkeypatch):


def test_record_tick_trace_stats_timeout(db, freezer, monkeypatch):
job_factory(status_code=StatusCode.EXECUTING)
job = job_factory(status_code=StatusCode.EXECUTING)

def timeout():
raise subprocess.TimeoutExpired("cmd", 10)
Expand All @@ -84,7 +84,7 @@ def timeout():
last_run = time.time()
freezer.tick(10)

record_stats.record_tick_trace(last_run)
record_stats.record_tick_trace(last_run, [job])
assert len(get_trace("ticks")) == 2

spans = get_trace("ticks")
Expand All @@ -99,7 +99,7 @@ def timeout():


def test_record_tick_trace_stats_error(db, freezer, monkeypatch):
job_factory(status_code=StatusCode.EXECUTING)
job = job_factory(status_code=StatusCode.EXECUTING)

def error():
raise subprocess.CalledProcessError(
Expand All @@ -109,7 +109,7 @@ def error():
monkeypatch.setattr(record_stats, "get_job_stats", error)

last_run = time.time()
record_stats.record_tick_trace(last_run)
record_stats.record_tick_trace(last_run, [job])
assert len(get_trace("ticks")) == 2

spans = get_trace("ticks")
Expand Down

0 comments on commit 20e0235

Please sign in to comment.