From 20e0235d0c89510b3d6cfccd12afa281f4099865 Mon Sep 17 00:00:00 2001 From: bloodearnest Date: Mon, 4 Dec 2023 11:44:33 +0000 Subject: [PATCH] Expose metrics from executor. This modifies local executor get_status() to return any collected metrics. It did mean altering the record_tick_trace function to make it more testable, but its an improvement anyway. --- jobrunner/executors/local.py | 29 ++++++++++++++------ jobrunner/job_executor.py | 1 + jobrunner/record_stats.py | 9 +++---- tests/test_local_executor.py | 52 +++++++++++++++++++++++++++++------- tests/test_record_stats.py | 12 ++++----- 5 files changed, 75 insertions(+), 28 deletions(-) diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index 01fa9ba3..21507a23 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -11,7 +11,7 @@ from pipeline.legacy import get_all_output_patterns_from_project_file -from jobrunner import config +from jobrunner import config, models from jobrunner.executors import volumes from jobrunner.job_executor import ( ExecutorAPI, @@ -22,7 +22,7 @@ JobStatus, Privacy, ) -from jobrunner.lib import datestr_to_ns_timestamp, docker +from jobrunner.lib import database, datestr_to_ns_timestamp, docker from jobrunner.lib.git import checkout_commit from jobrunner.lib.path_utils import list_dir_with_ignore_patterns from jobrunner.lib.string_utils import tabulate @@ -249,10 +249,16 @@ def get_status(self, job_definition, timeout=15): # jobs prepared but not running do not need to finalize, so we # proceed directly to the FINALIZED state here return JobStatus( - ExecutorState.FINALIZED, "Prepared job was cancelled" + ExecutorState.FINALIZED, + "Prepared job was cancelled", + metrics=metrics, ) else: - return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled") + return JobStatus( + ExecutorState.UNKNOWN, + "Pending job was cancelled", + metrics=metrics, + ) # timestamp file presence means we have finished preparing timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp( @@ -263,24 +269,31 @@ def get_status(self, job_definition, timeout=15): # re-prepare it anyway. if timestamp_ns is None: # we are Jon Snow - return JobStatus(ExecutorState.UNKNOWN) + return JobStatus(ExecutorState.UNKNOWN, metrics={}) else: # we've finish preparing - return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns) + return JobStatus( + ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics + ) if container["State"]["Running"]: timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"]) - return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns) + return JobStatus( + ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics + ) elif job_definition.id in RESULTS: return JobStatus( ExecutorState.FINALIZED, timestamp_ns=RESULTS[job_definition.id].timestamp_ns, + metrics=metrics, ) else: # container present but not running, i.e. finished # Nb. this does not include prepared jobs, as they have a volume but not a container timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"]) - return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns) + return JobStatus( + ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics + ) def get_results(self, job_definition): if job_definition.id not in RESULTS: diff --git a/jobrunner/job_executor.py b/jobrunner/job_executor.py index 2ee0039d..da135bae 100644 --- a/jobrunner/job_executor.py +++ b/jobrunner/job_executor.py @@ -67,6 +67,7 @@ class JobStatus: timestamp_ns: int = ( None # timestamp this JobStatus occurred, in integer nanoseconds ) + metrics: dict = field(default_factory=dict) @dataclass diff --git a/jobrunner/record_stats.py b/jobrunner/record_stats.py index 557da9a8..01cdef75 100644 --- a/jobrunner/record_stats.py +++ b/jobrunner/record_stats.py @@ -71,7 +71,10 @@ def main(): last_run = None while True: before = time.time() - last_run = record_tick_trace(last_run) + active_jobs = database.find_where( + models.Job, state__in=[models.State.PENDING, models.State.RUNNING] + ) + last_run = record_tick_trace(last_run, active_jobs) # record_tick_trace might have take a while, so sleep the remainding interval # enforce a minimum time of 3s to ensure we don't hammer honeycomb or @@ -120,10 +123,6 @@ def record_tick_trace(last_run, active_jobs): end_time = now duration_s = int((end_time - start_time) / 1e9) - active_jobs = database.find_where( - models.Job, state__in=[models.State.PENDING, models.State.RUNNING] - ) - with tracer.start_as_current_span( "TICK", start_time=start_time, attributes=trace_attrs ) as root: diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 24472a19..9694bf11 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -4,12 +4,12 @@ import pytest -from jobrunner import config +from jobrunner import config, models, record_stats from jobrunner.executors import local, volumes from jobrunner.job_executor import ExecutorState, JobDefinition, Privacy, Study from jobrunner.lib import datestr_to_ns_timestamp, docker from tests.conftest import SUPPORTED_VOLUME_APIS -from tests.factories import ensure_docker_images_present +from tests.factories import ensure_docker_images_present, job_factory # this is parametized fixture, and test using it will run multiple times, once @@ -215,7 +215,7 @@ def test_prepare_job_no_input_file(docker_cleanup, job_definition, volume_api): @pytest.mark.needs_docker -def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, volume_api): +def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, db, volume_api): # check limits are applied job_definition.cpu_count = 1.5 job_definition.memory_limit = "1G" @@ -229,18 +229,52 @@ def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, volume_ap status = api.execute(job_definition) assert status.state == ExecutorState.EXECUTING - # could be in either state - assert api.get_status(job_definition).state in ( - ExecutorState.EXECUTING, - ExecutorState.EXECUTED, - ) - container_data = docker.container_inspect(local.container_name(job_definition)) assert container_data["State"]["ExitCode"] == 0 assert container_data["HostConfig"]["NanoCpus"] == int(1.5 * 1e9) assert container_data["HostConfig"]["Memory"] == 2**30 # 1G +@pytest.mark.needs_docker +def test_execute_metrics(docker_cleanup, job_definition, tmp_work_dir, db): + job_definition.args = ["sleep", "10"] + last_run = time.time_ns() + + api = local.LocalDockerAPI() + + status = api.prepare(job_definition) + assert status.state == ExecutorState.PREPARED + + # we need scheduler job state to be able to collect stats + job = job_factory( + id=job_definition.id, + state=models.State.RUNNING, + status_code=models.StatusCode.EXECUTING, + started_at=int(last_run / 1e9), + ) + + status = api.execute(job_definition) + assert status.state == ExecutorState.EXECUTING + + # simulate stats thread collecting stats + record_stats.record_tick_trace(last_run, [job]) + + docker.kill(local.container_name(job_definition)) + + status = wait_for_state(api, job_definition, ExecutorState.EXECUTED) + + assert list(status.metrics.keys()) == [ + "cpu_sample", + "cpu_cumsum", + "cpu_mean", + "cpu_peak", + "mem_sample", + "mem_cumsum", + "mem_mean", + "mem_peak", + ] + + @pytest.mark.skipif( sys.platform != "linux" and sys.platform != "darwin", reason="linux/darwin only" ) diff --git a/tests/test_record_stats.py b/tests/test_record_stats.py index 9ff99e07..98491985 100644 --- a/tests/test_record_stats.py +++ b/tests/test_record_stats.py @@ -28,12 +28,12 @@ def test_record_tick_trace(db, freezer, monkeypatch): # this should not be tick'd job_factory(state=State.SUCCEEDED, status_code=StatusCode.SUCCEEDED) - last_run1 = record_stats.record_tick_trace(None) + last_run1 = record_stats.record_tick_trace(None, jobs) assert len(get_trace("ticks")) == 0 freezer.tick(10) - last_run2 = record_stats.record_tick_trace(last_run1) + last_run2 = record_stats.record_tick_trace(last_run1, jobs) assert last_run2 == last_run1 + 10 * 1e9 spans = get_trace("ticks") @@ -74,7 +74,7 @@ def test_record_tick_trace(db, freezer, monkeypatch): def test_record_tick_trace_stats_timeout(db, freezer, monkeypatch): - job_factory(status_code=StatusCode.EXECUTING) + job = job_factory(status_code=StatusCode.EXECUTING) def timeout(): raise subprocess.TimeoutExpired("cmd", 10) @@ -84,7 +84,7 @@ def timeout(): last_run = time.time() freezer.tick(10) - record_stats.record_tick_trace(last_run) + record_stats.record_tick_trace(last_run, [job]) assert len(get_trace("ticks")) == 2 spans = get_trace("ticks") @@ -99,7 +99,7 @@ def timeout(): def test_record_tick_trace_stats_error(db, freezer, monkeypatch): - job_factory(status_code=StatusCode.EXECUTING) + job = job_factory(status_code=StatusCode.EXECUTING) def error(): raise subprocess.CalledProcessError( @@ -109,7 +109,7 @@ def error(): monkeypatch.setattr(record_stats, "get_job_stats", error) last_run = time.time() - record_stats.record_tick_trace(last_run) + record_stats.record_tick_trace(last_run, [job]) assert len(get_trace("ticks")) == 2 spans = get_trace("ticks")