From afa15e2b97a7e53ba6da50f7e68033390ae92920 Mon Sep 17 00:00:00 2001 From: Simon Davy Date: Thu, 14 Dec 2023 09:26:48 +0000 Subject: [PATCH] Revert "aggregate job metrics" --- jobrunner/config.py | 1 - jobrunner/executors/local.py | 29 +++------- jobrunner/job_executor.py | 1 - jobrunner/models.py | 7 --- jobrunner/record_stats.py | 104 +++-------------------------------- jobrunner/sync.py | 6 +- tests/conftest.py | 13 +---- tests/factories.py | 17 +----- tests/test_local_executor.py | 52 +++--------------- tests/test_record_stats.py | 96 +++----------------------------- tests/test_sync.py | 28 ---------- 11 files changed, 35 insertions(+), 319 deletions(-) diff --git a/jobrunner/config.py b/jobrunner/config.py index c86f8a79..39f039ef 100644 --- a/jobrunner/config.py +++ b/jobrunner/config.py @@ -46,7 +46,6 @@ def _is_valid_backend_name(name): WORKDIR = Path(os.environ.get("WORKDIR", default_work_dir)).resolve() DATABASE_FILE = WORKDIR / "db.sqlite" -METRICS_FILE = WORKDIR / "metrics.sqlite" GIT_REPO_DIR = WORKDIR / "repos" # valid archive formats diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index d79cecc5..b1d736fc 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -11,7 +11,7 @@ from pipeline.legacy import get_all_output_patterns_from_project_file -from jobrunner import config, record_stats +from jobrunner import config from jobrunner.executors import volumes from jobrunner.job_executor import ( ExecutorAPI, @@ -241,24 +241,16 @@ def get_status(self, job_definition, timeout=15): f"docker timed out after {timeout}s inspecting container {name}" ) - metrics = record_stats.read_job_metrics(job_definition.id) - if container is None: # container doesn't exist if job_definition.cancelled: if volumes.get_volume_api(job_definition).volume_exists(job_definition): # jobs prepared but not running do not need to finalize, so we # proceed directly to the FINALIZED state here return JobStatus( - ExecutorState.FINALIZED, - "Prepared job was cancelled", - metrics=metrics, + ExecutorState.FINALIZED, "Prepared job was cancelled" ) else: - return JobStatus( - ExecutorState.UNKNOWN, - "Pending job was cancelled", - metrics=metrics, - ) + return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled") # timestamp file presence means we have finished preparing timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp( @@ -269,31 +261,24 @@ def get_status(self, job_definition, timeout=15): # re-prepare it anyway. if timestamp_ns is None: # we are Jon Snow - return JobStatus(ExecutorState.UNKNOWN, metrics={}) + return JobStatus(ExecutorState.UNKNOWN) else: # we've finish preparing - return JobStatus( - ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics - ) + return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns) if container["State"]["Running"]: timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"]) - return JobStatus( - ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics - ) + return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns) elif job_definition.id in RESULTS: return JobStatus( ExecutorState.FINALIZED, timestamp_ns=RESULTS[job_definition.id].timestamp_ns, - metrics=metrics, ) else: # container present but not running, i.e. finished # Nb. this does not include prepared jobs, as they have a volume but not a container timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"]) - return JobStatus( - ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics - ) + return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns) def get_results(self, job_definition): if job_definition.id not in RESULTS: diff --git a/jobrunner/job_executor.py b/jobrunner/job_executor.py index da135bae..2ee0039d 100644 --- a/jobrunner/job_executor.py +++ b/jobrunner/job_executor.py @@ -67,7 +67,6 @@ class JobStatus: timestamp_ns: int = ( None # timestamp this JobStatus occurred, in integer nanoseconds ) - metrics: dict = field(default_factory=dict) @dataclass diff --git a/jobrunner/models.py b/jobrunner/models.py index 0b53450f..1d7ab4bc 100644 --- a/jobrunner/models.py +++ b/jobrunner/models.py @@ -14,7 +14,6 @@ import secrets import shlex from enum import Enum -from functools import total_ordering from jobrunner.lib.commands import requires_db_access from jobrunner.lib.database import databaseclass, migration @@ -37,7 +36,6 @@ class State(Enum): # affordances in the web, cli and telemetry. -@total_ordering class StatusCode(Enum): # PENDING states # @@ -79,10 +77,6 @@ class StatusCode(Enum): def is_final_code(self): return self in StatusCode._FINAL_STATUS_CODES - def __lt__(self, other): - order = list(self.__class__) - return order.index(self) < order.index(other) - # used for tracing to know if a state is final or not StatusCode._FINAL_STATUS_CODES = [ @@ -251,7 +245,6 @@ class Job: # used to track the OTel trace context for this job trace_context: dict = None - # map of file -> error level4_excluded_files: dict = None # used to cache the job_request json by the tracing code diff --git a/jobrunner/record_stats.py b/jobrunner/record_stats.py index 01cdef75..67fd96ba 100644 --- a/jobrunner/record_stats.py +++ b/jobrunner/record_stats.py @@ -1,12 +1,10 @@ """ Super crude docker/system stats logger """ -import json import logging import subprocess import sys import time -from collections import defaultdict from opentelemetry import trace @@ -19,62 +17,12 @@ log = logging.getLogger(__name__) tracer = trace.get_tracer("ticks") -# Simplest possible table. We're only storing aggregate data -DDL = """ -CREATE TABLE IF NOT EXISTS jobs ( - id TEXT, - metrics TEXT, - PRIMARY KEY (id) -) -""" - -_conn = None - - -def ensure_metrics_db(): - global _conn - _conn = database.get_connection(config.METRICS_FILE) - _conn.execute("PRAGMA journal_mode = WAL") - _conn.execute(DDL) - - -def read_job_metrics(job_id, **metrics): - if _conn is None: - ensure_metrics_db() - - raw_metrics = _conn.execute( - "SELECT metrics FROM jobs WHERE id = ?", - (job_id,), - ).fetchone() - if raw_metrics is None: - metrics = {} - else: - metrics = json.loads(raw_metrics["metrics"]) - return defaultdict(float, metrics) - - -def write_job_metrics(job_id, metrics): - if _conn is None: - ensure_metrics_db() - - raw_metrics = json.dumps(metrics) - _conn.execute( - """ - INSERT INTO jobs (id, metrics) VALUES (?, ?) - ON CONFLICT(id) DO UPDATE set metrics = ? - """, - (job_id, raw_metrics, raw_metrics), - ) - def main(): last_run = None while True: before = time.time() - active_jobs = database.find_where( - models.Job, state__in=[models.State.PENDING, models.State.RUNNING] - ) - last_run = record_tick_trace(last_run, active_jobs) + last_run = record_tick_trace(last_run) # record_tick_trace might have take a while, so sleep the remainding interval # enforce a minimum time of 3s to ensure we don't hammer honeycomb or @@ -83,7 +31,7 @@ def main(): time.sleep(max(2, config.STATS_POLL_INTERVAL - elapsed)) -def record_tick_trace(last_run, active_jobs): +def record_tick_trace(last_run): """Record a period tick trace of current jobs. This will give us more realtime information than the job traces, which only @@ -121,7 +69,10 @@ def record_tick_trace(last_run, active_jobs): # every span has the same timings start_time = last_run end_time = now - duration_s = int((end_time - start_time) / 1e9) + + active_jobs = database.find_where( + models.Job, state__in=[models.State.PENDING, models.State.RUNNING] + ) with tracer.start_as_current_span( "TICK", start_time=start_time, attributes=trace_attrs @@ -131,61 +82,22 @@ def record_tick_trace(last_run, active_jobs): root.add_event("stats_error", attributes=error_attrs, timestamp=start_time) for job in active_jobs: - # we are using seconds for our metric calculations - - metrics = stats.get(job.id, {}) + span = tracer.start_span(job.status_code.name, start_time=start_time) # set up attributes job_span_attrs = {} job_span_attrs.update(trace_attrs) + metrics = stats.get(job.id, {}) job_span_attrs["has_metrics"] = metrics != {} job_span_attrs.update(metrics) - # this means the job is running - if metrics: - runtime_s = int(now / 1e9) - job.started_at - # protect against unexpected runtimes - if runtime_s > 0: - job_metrics = update_job_metrics( - job, - metrics, - duration_s, - runtime_s, - ) - job_span_attrs.update(job_metrics) - else: - job_span_attrs.set("bad_tick_runtime", runtime_s) - # record span - span = tracer.start_span(job.status_code.name, start_time=start_time) tracing.set_span_metadata(span, job, **job_span_attrs) span.end(end_time) return end_time -def update_job_metrics(job, raw_metrics, duration_s, runtime_s): - """Update and persist per-job aggregate stats in the metrics db""" - - job_metrics = read_job_metrics(job.id) - - cpu = raw_metrics["cpu_percentage"] - mem = raw_metrics["memory_used"] - - job_metrics["cpu_sample"] = cpu - job_metrics["cpu_cumsum"] += duration_s * cpu - job_metrics["cpu_mean"] = job_metrics["cpu_cumsum"] / runtime_s - job_metrics["cpu_peak"] = max(job_metrics["cpu_peak"], cpu) - job_metrics["mem_sample"] = mem - job_metrics["mem_cumsum"] += duration_s * mem - job_metrics["mem_mean"] = job_metrics["mem_cumsum"] / runtime_s - job_metrics["mem_peak"] = max(job_metrics["mem_peak"], mem) - - write_job_metrics(job.id, job_metrics) - - return job_metrics - - if __name__ == "__main__": configure_logging() diff --git a/jobrunner/sync.py b/jobrunner/sync.py index d0086c38..f0a89439 100644 --- a/jobrunner/sync.py +++ b/jobrunner/sync.py @@ -9,7 +9,7 @@ import requests -from jobrunner import config, queries, record_stats +from jobrunner import config, queries from jobrunner.create_or_update_jobs import create_or_update_jobs from jobrunner.lib.database import find_where, select_values from jobrunner.lib.log_utils import configure_logging, set_log_context @@ -143,21 +143,19 @@ def job_to_remote_format(job): Convert our internal representation of a Job into whatever format the job-server expects """ - return { "identifier": job.id, "job_request_id": job.job_request_id, "action": job.action, "run_command": job.run_command, "status": job.state.value, - "status_code": job.status_code.value, + "status_code": job.status_code.value if job.status_code else "", "status_message": job.status_message or "", "created_at": job.created_at_isoformat, "updated_at": job.updated_at_isoformat, "started_at": job.started_at_isoformat, "completed_at": job.completed_at_isoformat, "trace_context": job.trace_context, - "metrics": record_stats.read_job_metrics(job.id), } diff --git a/tests/conftest.py b/tests/conftest.py index fdf9fb7b..80cb53de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,7 @@ from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter import jobrunner.executors.local -from jobrunner import config, record_stats, tracing +from jobrunner import config, tracing from jobrunner.executors import volumes from jobrunner.job_executor import Study from jobrunner.lib import database @@ -195,17 +195,6 @@ def db(monkeypatch, request): del database.CONNECTION_CACHE.__dict__[database_file] -@pytest.fixture(autouse=True) -def metrics_db(monkeypatch, request): - """Create a throwaway metrics db.""" - record_stats._conn = None - database_file = f"file:metrics-{request.node.name}?mode=memory&cache=shared" - monkeypatch.setattr(config, "METRICS_FILE", database_file) - yield - database.CONNECTION_CACHE.__dict__.pop(database_file, None) - record_stats._conn = None - - @dataclass class SubprocessStub: calls: deque = field(default_factory=deque) diff --git a/tests/factories.py b/tests/factories.py index 968eb440..7872769d 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -4,7 +4,7 @@ from collections import defaultdict from copy import deepcopy -from jobrunner import config, record_stats, tracing +from jobrunner import config, tracing from jobrunner.job_executor import ExecutorState, JobResults, JobStatus from jobrunner.lib import docker from jobrunner.lib.database import insert @@ -78,12 +78,6 @@ def job_factory(job_request=None, **kwargs): values["created_at"] = int(timestamp) if "updated_at" not in kwargs: values["updated_at"] = int(timestamp) - - if "started_at" not in kwargs: - status_code = kwargs.get("status_code", values["status_code"]) - if status_code and status_code >= StatusCode.EXECUTING: - values["started_at"] = int(timestamp) - if "status_code_updated_at" not in kwargs: values["status_code_updated_at"] = int(timestamp * 1e9) values.update(kwargs) @@ -109,15 +103,6 @@ def job_results_factory(timestamp_ns=None, **kwargs): return JobResults(timestamp_ns=timestamp_ns, **values) -def metrics_factory(job=None, metrics=None): - if job is None: - job = job_factory() - if metrics is None: - metrics = {} - - record_stats.write_job_metrics(job.id, metrics) - - class StubExecutorAPI: """Dummy implementation of the ExecutorAPI, for use in tests. diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 9694bf11..24472a19 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -4,12 +4,12 @@ import pytest -from jobrunner import config, models, record_stats +from jobrunner import config from jobrunner.executors import local, volumes from jobrunner.job_executor import ExecutorState, JobDefinition, Privacy, Study from jobrunner.lib import datestr_to_ns_timestamp, docker from tests.conftest import SUPPORTED_VOLUME_APIS -from tests.factories import ensure_docker_images_present, job_factory +from tests.factories import ensure_docker_images_present # this is parametized fixture, and test using it will run multiple times, once @@ -215,7 +215,7 @@ def test_prepare_job_no_input_file(docker_cleanup, job_definition, volume_api): @pytest.mark.needs_docker -def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, db, volume_api): +def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, volume_api): # check limits are applied job_definition.cpu_count = 1.5 job_definition.memory_limit = "1G" @@ -229,52 +229,18 @@ def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, db, volum status = api.execute(job_definition) assert status.state == ExecutorState.EXECUTING + # could be in either state + assert api.get_status(job_definition).state in ( + ExecutorState.EXECUTING, + ExecutorState.EXECUTED, + ) + container_data = docker.container_inspect(local.container_name(job_definition)) assert container_data["State"]["ExitCode"] == 0 assert container_data["HostConfig"]["NanoCpus"] == int(1.5 * 1e9) assert container_data["HostConfig"]["Memory"] == 2**30 # 1G -@pytest.mark.needs_docker -def test_execute_metrics(docker_cleanup, job_definition, tmp_work_dir, db): - job_definition.args = ["sleep", "10"] - last_run = time.time_ns() - - api = local.LocalDockerAPI() - - status = api.prepare(job_definition) - assert status.state == ExecutorState.PREPARED - - # we need scheduler job state to be able to collect stats - job = job_factory( - id=job_definition.id, - state=models.State.RUNNING, - status_code=models.StatusCode.EXECUTING, - started_at=int(last_run / 1e9), - ) - - status = api.execute(job_definition) - assert status.state == ExecutorState.EXECUTING - - # simulate stats thread collecting stats - record_stats.record_tick_trace(last_run, [job]) - - docker.kill(local.container_name(job_definition)) - - status = wait_for_state(api, job_definition, ExecutorState.EXECUTED) - - assert list(status.metrics.keys()) == [ - "cpu_sample", - "cpu_cumsum", - "cpu_mean", - "cpu_peak", - "mem_sample", - "mem_cumsum", - "mem_mean", - "mem_peak", - ] - - @pytest.mark.skipif( sys.platform != "linux" and sys.platform != "darwin", reason="linux/darwin only" ) diff --git a/tests/test_record_stats.py b/tests/test_record_stats.py index 98491985..a6e389c8 100644 --- a/tests/test_record_stats.py +++ b/tests/test_record_stats.py @@ -4,7 +4,7 @@ from jobrunner import record_stats from jobrunner.models import State, StatusCode from tests.conftest import get_trace -from tests.factories import job_factory, metrics_factory +from tests.factories import job_factory def test_record_tick_trace(db, freezer, monkeypatch): @@ -28,12 +28,12 @@ def test_record_tick_trace(db, freezer, monkeypatch): # this should not be tick'd job_factory(state=State.SUCCEEDED, status_code=StatusCode.SUCCEEDED) - last_run1 = record_stats.record_tick_trace(None, jobs) + last_run1 = record_stats.record_tick_trace(None) assert len(get_trace("ticks")) == 0 freezer.tick(10) - last_run2 = record_stats.record_tick_trace(last_run1, jobs) + last_run2 = record_stats.record_tick_trace(last_run1) assert last_run2 == last_run1 + 10 * 1e9 spans = get_trace("ticks") @@ -56,17 +56,7 @@ def test_record_tick_trace(db, freezer, monkeypatch): if job is running_job: assert span.attributes["has_metrics"] is True assert span.attributes["cpu_percentage"] == 50.0 - assert span.attributes["cpu_sample"] == 50.0 - assert span.attributes["cpu_sample"] == 50.0 - assert span.attributes["cpu_peak"] == 50.0 - assert span.attributes["cpu_cumsum"] == 500.0 # 50% * 10s - assert span.attributes["cpu_mean"] == 50.0 - assert span.attributes["memory_used"] == 1000 - assert span.attributes["mem_sample"] == 1000 - assert span.attributes["mem_peak"] == 1000 - assert span.attributes["mem_cumsum"] == 10000 # 1000 * 10s - assert span.attributes["mem_mean"] == 1000 else: assert span.attributes["has_metrics"] is False @@ -74,7 +64,7 @@ def test_record_tick_trace(db, freezer, monkeypatch): def test_record_tick_trace_stats_timeout(db, freezer, monkeypatch): - job = job_factory(status_code=StatusCode.EXECUTING) + job_factory(status_code=StatusCode.EXECUTING) def timeout(): raise subprocess.TimeoutExpired("cmd", 10) @@ -84,7 +74,7 @@ def timeout(): last_run = time.time() freezer.tick(10) - record_stats.record_tick_trace(last_run, [job]) + record_stats.record_tick_trace(last_run) assert len(get_trace("ticks")) == 2 spans = get_trace("ticks") @@ -92,14 +82,13 @@ def timeout(): assert "cpu_percentage" not in span.attributes assert "memory_used" not in span.attributes - assert "mem_peak" not in span.attributes assert span.attributes["has_metrics"] is False assert span.attributes["stats_timeout"] is True assert span.attributes["stats_error"] is False def test_record_tick_trace_stats_error(db, freezer, monkeypatch): - job = job_factory(status_code=StatusCode.EXECUTING) + job_factory(status_code=StatusCode.EXECUTING) def error(): raise subprocess.CalledProcessError( @@ -109,7 +98,7 @@ def error(): monkeypatch.setattr(record_stats, "get_job_stats", error) last_run = time.time() - record_stats.record_tick_trace(last_run, [job]) + record_stats.record_tick_trace(last_run) assert len(get_trace("ticks")) == 2 spans = get_trace("ticks") @@ -117,7 +106,6 @@ def error(): assert "cpu_percentage" not in span.attributes assert "memory_used" not in span.attributes - assert "mem_peak" not in span.attributes assert span.attributes["has_metrics"] is False assert span.attributes["stats_timeout"] is False assert span.attributes["stats_error"] is True @@ -129,73 +117,3 @@ def error(): assert root.events[0].attributes["cmd"] == "test cmd" assert root.events[0].attributes["output"] == "stderr\n\nstdout" assert root.events[0].name == "stats_error" - - -def test_update_job_metrics(db): - - job = job_factory(status_code=StatusCode.EXECUTING) - metrics_factory(job) - - metrics = record_stats.read_job_metrics(job.id) - - assert metrics == {} - - # 50%/100m for 1s - record_stats.update_job_metrics( - job, - {"cpu_percentage": 50, "memory_used": 100}, - duration_s=1.0, - runtime_s=1.0, - ) - - metrics = record_stats.read_job_metrics(job.id) - assert metrics == { - "cpu_cumsum": 50.0, - "cpu_mean": 50.0, - "cpu_peak": 50, - "cpu_sample": 50, - "mem_cumsum": 100.0, - "mem_mean": 100.0, - "mem_peak": 100, - "mem_sample": 100, - } - - # 100%/1000m for 1s - record_stats.update_job_metrics( - job, - {"cpu_percentage": 100, "memory_used": 1000}, - duration_s=1.0, - runtime_s=2.0, - ) - - metrics = record_stats.read_job_metrics(job.id) - assert metrics == { - "cpu_cumsum": 150.0, - "cpu_mean": 75.0, - "cpu_peak": 100, - "cpu_sample": 100, - "mem_cumsum": 1100.0, - "mem_mean": 550.0, - "mem_peak": 1000, - "mem_sample": 1000, - } - - # 100%/1000m for 8s - record_stats.update_job_metrics( - job, - {"cpu_percentage": 100, "memory_used": 1000}, - duration_s=8.0, - runtime_s=10.0, - ) - - metrics = record_stats.read_job_metrics(job.id) - assert metrics == { - "cpu_cumsum": 950.0, - "cpu_mean": 95.0, - "cpu_peak": 100, - "cpu_sample": 100, - "mem_cumsum": 9100.0, - "mem_mean": 910.0, - "mem_peak": 1000, - "mem_sample": 1000, - } diff --git a/tests/test_sync.py b/tests/test_sync.py index 0a63be14..a4a15de4 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -4,7 +4,6 @@ from jobrunner import config, queries, sync from jobrunner.models import JobRequest -from tests.factories import job_factory, metrics_factory def test_job_request_from_remote_format(): @@ -77,33 +76,6 @@ def test_job_request_from_remote_format_database_name_fallback(): assert job_request == expected -def test_job_to_remote_format_default(db): - job = job_factory() - - json = sync.job_to_remote_format(job) - - assert json["action"] == "action_name" - assert json["run_command"] == "python myscript.py" - assert json["status"] == "pending" - assert json["status_code"] == "created" - assert json["metrics"] == {} - - -def test_job_to_remote_format_null_status_message(db): - job = job_factory(status_message=None) - json = sync.job_to_remote_format(job) - assert json["status_message"] == "" - - -def test_job_to_remote_format_metrics(db): - job = job_factory() - metrics_factory(job, metrics={"test": 0.0}) - - json = sync.job_to_remote_format(job) - - assert json["metrics"] == {"test": 0.0} - - def test_session_request_no_flags(db, responses): responses.add( method="GET",