Skip to content

Commit

Permalink
Merge pull request #692 from opensafely-core/revert-686-aggregate-job…
Browse files Browse the repository at this point in the history
…-metrics

Revert "aggregate job metrics"
  • Loading branch information
bloodearnest authored Dec 14, 2023
2 parents 82e69f3 + afa15e2 commit 2a67c26
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 319 deletions.
1 change: 0 additions & 1 deletion jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def _is_valid_backend_name(name):

WORKDIR = Path(os.environ.get("WORKDIR", default_work_dir)).resolve()
DATABASE_FILE = WORKDIR / "db.sqlite"
METRICS_FILE = WORKDIR / "metrics.sqlite"
GIT_REPO_DIR = WORKDIR / "repos"

# valid archive formats
Expand Down
29 changes: 7 additions & 22 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from pipeline.legacy import get_all_output_patterns_from_project_file

from jobrunner import config, record_stats
from jobrunner import config
from jobrunner.executors import volumes
from jobrunner.job_executor import (
ExecutorAPI,
Expand Down Expand Up @@ -241,24 +241,16 @@ def get_status(self, job_definition, timeout=15):
f"docker timed out after {timeout}s inspecting container {name}"
)

metrics = record_stats.read_job_metrics(job_definition.id)

if container is None: # container doesn't exist
if job_definition.cancelled:
if volumes.get_volume_api(job_definition).volume_exists(job_definition):
# jobs prepared but not running do not need to finalize, so we
# proceed directly to the FINALIZED state here
return JobStatus(
ExecutorState.FINALIZED,
"Prepared job was cancelled",
metrics=metrics,
ExecutorState.FINALIZED, "Prepared job was cancelled"
)
else:
return JobStatus(
ExecutorState.UNKNOWN,
"Pending job was cancelled",
metrics=metrics,
)
return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled")

# timestamp file presence means we have finished preparing
timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp(
Expand All @@ -269,31 +261,24 @@ def get_status(self, job_definition, timeout=15):
# re-prepare it anyway.
if timestamp_ns is None:
# we are Jon Snow
return JobStatus(ExecutorState.UNKNOWN, metrics={})
return JobStatus(ExecutorState.UNKNOWN)
else:
# we've finish preparing
return JobStatus(
ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics
)
return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns)

if container["State"]["Running"]:
timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"])
return JobStatus(
ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics
)
return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns)
elif job_definition.id in RESULTS:
return JobStatus(
ExecutorState.FINALIZED,
timestamp_ns=RESULTS[job_definition.id].timestamp_ns,
metrics=metrics,
)
else:
# container present but not running, i.e. finished
# Nb. this does not include prepared jobs, as they have a volume but not a container
timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(
ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics
)
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)

def get_results(self, job_definition):
if job_definition.id not in RESULTS:
Expand Down
1 change: 0 additions & 1 deletion jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class JobStatus:
timestamp_ns: int = (
None # timestamp this JobStatus occurred, in integer nanoseconds
)
metrics: dict = field(default_factory=dict)


@dataclass
Expand Down
7 changes: 0 additions & 7 deletions jobrunner/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import secrets
import shlex
from enum import Enum
from functools import total_ordering

from jobrunner.lib.commands import requires_db_access
from jobrunner.lib.database import databaseclass, migration
Expand All @@ -37,7 +36,6 @@ class State(Enum):
# affordances in the web, cli and telemetry.


@total_ordering
class StatusCode(Enum):
# PENDING states
#
Expand Down Expand Up @@ -79,10 +77,6 @@ class StatusCode(Enum):
def is_final_code(self):
return self in StatusCode._FINAL_STATUS_CODES

def __lt__(self, other):
order = list(self.__class__)
return order.index(self) < order.index(other)


# used for tracing to know if a state is final or not
StatusCode._FINAL_STATUS_CODES = [
Expand Down Expand Up @@ -251,7 +245,6 @@ class Job:
# used to track the OTel trace context for this job
trace_context: dict = None

# map of file -> error
level4_excluded_files: dict = None

# used to cache the job_request json by the tracing code
Expand Down
104 changes: 8 additions & 96 deletions jobrunner/record_stats.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""
Super crude docker/system stats logger
"""
import json
import logging
import subprocess
import sys
import time
from collections import defaultdict

from opentelemetry import trace

Expand All @@ -19,62 +17,12 @@
log = logging.getLogger(__name__)
tracer = trace.get_tracer("ticks")

# Simplest possible table. We're only storing aggregate data
DDL = """
CREATE TABLE IF NOT EXISTS jobs (
id TEXT,
metrics TEXT,
PRIMARY KEY (id)
)
"""

_conn = None


def ensure_metrics_db():
global _conn
_conn = database.get_connection(config.METRICS_FILE)
_conn.execute("PRAGMA journal_mode = WAL")
_conn.execute(DDL)


def read_job_metrics(job_id, **metrics):
if _conn is None:
ensure_metrics_db()

raw_metrics = _conn.execute(
"SELECT metrics FROM jobs WHERE id = ?",
(job_id,),
).fetchone()
if raw_metrics is None:
metrics = {}
else:
metrics = json.loads(raw_metrics["metrics"])
return defaultdict(float, metrics)


def write_job_metrics(job_id, metrics):
if _conn is None:
ensure_metrics_db()

raw_metrics = json.dumps(metrics)
_conn.execute(
"""
INSERT INTO jobs (id, metrics) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set metrics = ?
""",
(job_id, raw_metrics, raw_metrics),
)


def main():
last_run = None
while True:
before = time.time()
active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)
last_run = record_tick_trace(last_run, active_jobs)
last_run = record_tick_trace(last_run)

# record_tick_trace might have take a while, so sleep the remainding interval
# enforce a minimum time of 3s to ensure we don't hammer honeycomb or
Expand All @@ -83,7 +31,7 @@ def main():
time.sleep(max(2, config.STATS_POLL_INTERVAL - elapsed))


def record_tick_trace(last_run, active_jobs):
def record_tick_trace(last_run):
"""Record a period tick trace of current jobs.
This will give us more realtime information than the job traces, which only
Expand Down Expand Up @@ -121,7 +69,10 @@ def record_tick_trace(last_run, active_jobs):
# every span has the same timings
start_time = last_run
end_time = now
duration_s = int((end_time - start_time) / 1e9)

active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)

with tracer.start_as_current_span(
"TICK", start_time=start_time, attributes=trace_attrs
Expand All @@ -131,61 +82,22 @@ def record_tick_trace(last_run, active_jobs):
root.add_event("stats_error", attributes=error_attrs, timestamp=start_time)

for job in active_jobs:
# we are using seconds for our metric calculations

metrics = stats.get(job.id, {})
span = tracer.start_span(job.status_code.name, start_time=start_time)

# set up attributes
job_span_attrs = {}
job_span_attrs.update(trace_attrs)
metrics = stats.get(job.id, {})
job_span_attrs["has_metrics"] = metrics != {}
job_span_attrs.update(metrics)

# this means the job is running
if metrics:
runtime_s = int(now / 1e9) - job.started_at
# protect against unexpected runtimes
if runtime_s > 0:
job_metrics = update_job_metrics(
job,
metrics,
duration_s,
runtime_s,
)
job_span_attrs.update(job_metrics)
else:
job_span_attrs.set("bad_tick_runtime", runtime_s)

# record span
span = tracer.start_span(job.status_code.name, start_time=start_time)
tracing.set_span_metadata(span, job, **job_span_attrs)
span.end(end_time)

return end_time


def update_job_metrics(job, raw_metrics, duration_s, runtime_s):
"""Update and persist per-job aggregate stats in the metrics db"""

job_metrics = read_job_metrics(job.id)

cpu = raw_metrics["cpu_percentage"]
mem = raw_metrics["memory_used"]

job_metrics["cpu_sample"] = cpu
job_metrics["cpu_cumsum"] += duration_s * cpu
job_metrics["cpu_mean"] = job_metrics["cpu_cumsum"] / runtime_s
job_metrics["cpu_peak"] = max(job_metrics["cpu_peak"], cpu)
job_metrics["mem_sample"] = mem
job_metrics["mem_cumsum"] += duration_s * mem
job_metrics["mem_mean"] = job_metrics["mem_cumsum"] / runtime_s
job_metrics["mem_peak"] = max(job_metrics["mem_peak"], mem)

write_job_metrics(job.id, job_metrics)

return job_metrics


if __name__ == "__main__":
configure_logging()

Expand Down
6 changes: 2 additions & 4 deletions jobrunner/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import requests

from jobrunner import config, queries, record_stats
from jobrunner import config, queries
from jobrunner.create_or_update_jobs import create_or_update_jobs
from jobrunner.lib.database import find_where, select_values
from jobrunner.lib.log_utils import configure_logging, set_log_context
Expand Down Expand Up @@ -143,21 +143,19 @@ def job_to_remote_format(job):
Convert our internal representation of a Job into whatever format the
job-server expects
"""

return {
"identifier": job.id,
"job_request_id": job.job_request_id,
"action": job.action,
"run_command": job.run_command,
"status": job.state.value,
"status_code": job.status_code.value,
"status_code": job.status_code.value if job.status_code else "",
"status_message": job.status_message or "",
"created_at": job.created_at_isoformat,
"updated_at": job.updated_at_isoformat,
"started_at": job.started_at_isoformat,
"completed_at": job.completed_at_isoformat,
"trace_context": job.trace_context,
"metrics": record_stats.read_job_metrics(job.id),
}


Expand Down
13 changes: 1 addition & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

import jobrunner.executors.local
from jobrunner import config, record_stats, tracing
from jobrunner import config, tracing
from jobrunner.executors import volumes
from jobrunner.job_executor import Study
from jobrunner.lib import database
Expand Down Expand Up @@ -195,17 +195,6 @@ def db(monkeypatch, request):
del database.CONNECTION_CACHE.__dict__[database_file]


@pytest.fixture(autouse=True)
def metrics_db(monkeypatch, request):
"""Create a throwaway metrics db."""
record_stats._conn = None
database_file = f"file:metrics-{request.node.name}?mode=memory&cache=shared"
monkeypatch.setattr(config, "METRICS_FILE", database_file)
yield
database.CONNECTION_CACHE.__dict__.pop(database_file, None)
record_stats._conn = None


@dataclass
class SubprocessStub:
calls: deque = field(default_factory=deque)
Expand Down
17 changes: 1 addition & 16 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections import defaultdict
from copy import deepcopy

from jobrunner import config, record_stats, tracing
from jobrunner import config, tracing
from jobrunner.job_executor import ExecutorState, JobResults, JobStatus
from jobrunner.lib import docker
from jobrunner.lib.database import insert
Expand Down Expand Up @@ -78,12 +78,6 @@ def job_factory(job_request=None, **kwargs):
values["created_at"] = int(timestamp)
if "updated_at" not in kwargs:
values["updated_at"] = int(timestamp)

if "started_at" not in kwargs:
status_code = kwargs.get("status_code", values["status_code"])
if status_code and status_code >= StatusCode.EXECUTING:
values["started_at"] = int(timestamp)

if "status_code_updated_at" not in kwargs:
values["status_code_updated_at"] = int(timestamp * 1e9)
values.update(kwargs)
Expand All @@ -109,15 +103,6 @@ def job_results_factory(timestamp_ns=None, **kwargs):
return JobResults(timestamp_ns=timestamp_ns, **values)


def metrics_factory(job=None, metrics=None):
if job is None:
job = job_factory()
if metrics is None:
metrics = {}

record_stats.write_job_metrics(job.id, metrics)


class StubExecutorAPI:
"""Dummy implementation of the ExecutorAPI, for use in tests.
Expand Down
Loading

0 comments on commit 2a67c26

Please sign in to comment.