Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "aggregate job metrics" #692

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def _is_valid_backend_name(name):

WORKDIR = Path(os.environ.get("WORKDIR", default_work_dir)).resolve()
DATABASE_FILE = WORKDIR / "db.sqlite"
METRICS_FILE = WORKDIR / "metrics.sqlite"
GIT_REPO_DIR = WORKDIR / "repos"

# valid archive formats
Expand Down
29 changes: 7 additions & 22 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from pipeline.legacy import get_all_output_patterns_from_project_file

from jobrunner import config, record_stats
from jobrunner import config
from jobrunner.executors import volumes
from jobrunner.job_executor import (
ExecutorAPI,
Expand Down Expand Up @@ -241,24 +241,16 @@ def get_status(self, job_definition, timeout=15):
f"docker timed out after {timeout}s inspecting container {name}"
)

metrics = record_stats.read_job_metrics(job_definition.id)

if container is None: # container doesn't exist
if job_definition.cancelled:
if volumes.get_volume_api(job_definition).volume_exists(job_definition):
# jobs prepared but not running do not need to finalize, so we
# proceed directly to the FINALIZED state here
return JobStatus(
ExecutorState.FINALIZED,
"Prepared job was cancelled",
metrics=metrics,
ExecutorState.FINALIZED, "Prepared job was cancelled"
)
else:
return JobStatus(
ExecutorState.UNKNOWN,
"Pending job was cancelled",
metrics=metrics,
)
return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled")

# timestamp file presence means we have finished preparing
timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp(
Expand All @@ -269,31 +261,24 @@ def get_status(self, job_definition, timeout=15):
# re-prepare it anyway.
if timestamp_ns is None:
# we are Jon Snow
return JobStatus(ExecutorState.UNKNOWN, metrics={})
return JobStatus(ExecutorState.UNKNOWN)
else:
# we've finish preparing
return JobStatus(
ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics
)
return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns)

if container["State"]["Running"]:
timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"])
return JobStatus(
ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics
)
return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns)
elif job_definition.id in RESULTS:
return JobStatus(
ExecutorState.FINALIZED,
timestamp_ns=RESULTS[job_definition.id].timestamp_ns,
metrics=metrics,
)
else:
# container present but not running, i.e. finished
# Nb. this does not include prepared jobs, as they have a volume but not a container
timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(
ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics
)
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)

def get_results(self, job_definition):
if job_definition.id not in RESULTS:
Expand Down
1 change: 0 additions & 1 deletion jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class JobStatus:
timestamp_ns: int = (
None # timestamp this JobStatus occurred, in integer nanoseconds
)
metrics: dict = field(default_factory=dict)


@dataclass
Expand Down
7 changes: 0 additions & 7 deletions jobrunner/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import secrets
import shlex
from enum import Enum
from functools import total_ordering

from jobrunner.lib.commands import requires_db_access
from jobrunner.lib.database import databaseclass, migration
Expand All @@ -37,7 +36,6 @@ class State(Enum):
# affordances in the web, cli and telemetry.


@total_ordering
class StatusCode(Enum):
# PENDING states
#
Expand Down Expand Up @@ -79,10 +77,6 @@ class StatusCode(Enum):
def is_final_code(self):
return self in StatusCode._FINAL_STATUS_CODES

def __lt__(self, other):
order = list(self.__class__)
return order.index(self) < order.index(other)


# used for tracing to know if a state is final or not
StatusCode._FINAL_STATUS_CODES = [
Expand Down Expand Up @@ -251,7 +245,6 @@ class Job:
# used to track the OTel trace context for this job
trace_context: dict = None

# map of file -> error
level4_excluded_files: dict = None

# used to cache the job_request json by the tracing code
Expand Down
104 changes: 8 additions & 96 deletions jobrunner/record_stats.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""
Super crude docker/system stats logger
"""
import json
import logging
import subprocess
import sys
import time
from collections import defaultdict

from opentelemetry import trace

Expand All @@ -19,62 +17,12 @@
log = logging.getLogger(__name__)
tracer = trace.get_tracer("ticks")

# Simplest possible table. We're only storing aggregate data
DDL = """
CREATE TABLE IF NOT EXISTS jobs (
id TEXT,
metrics TEXT,
PRIMARY KEY (id)
)
"""

_conn = None


def ensure_metrics_db():
global _conn
_conn = database.get_connection(config.METRICS_FILE)
_conn.execute("PRAGMA journal_mode = WAL")
_conn.execute(DDL)


def read_job_metrics(job_id, **metrics):
if _conn is None:
ensure_metrics_db()

raw_metrics = _conn.execute(
"SELECT metrics FROM jobs WHERE id = ?",
(job_id,),
).fetchone()
if raw_metrics is None:
metrics = {}
else:
metrics = json.loads(raw_metrics["metrics"])
return defaultdict(float, metrics)


def write_job_metrics(job_id, metrics):
if _conn is None:
ensure_metrics_db()

raw_metrics = json.dumps(metrics)
_conn.execute(
"""
INSERT INTO jobs (id, metrics) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set metrics = ?
""",
(job_id, raw_metrics, raw_metrics),
)


def main():
last_run = None
while True:
before = time.time()
active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)
last_run = record_tick_trace(last_run, active_jobs)
last_run = record_tick_trace(last_run)

# record_tick_trace might have take a while, so sleep the remainding interval
# enforce a minimum time of 3s to ensure we don't hammer honeycomb or
Expand All @@ -83,7 +31,7 @@ def main():
time.sleep(max(2, config.STATS_POLL_INTERVAL - elapsed))


def record_tick_trace(last_run, active_jobs):
def record_tick_trace(last_run):
"""Record a period tick trace of current jobs.

This will give us more realtime information than the job traces, which only
Expand Down Expand Up @@ -121,7 +69,10 @@ def record_tick_trace(last_run, active_jobs):
# every span has the same timings
start_time = last_run
end_time = now
duration_s = int((end_time - start_time) / 1e9)

active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)

with tracer.start_as_current_span(
"TICK", start_time=start_time, attributes=trace_attrs
Expand All @@ -131,61 +82,22 @@ def record_tick_trace(last_run, active_jobs):
root.add_event("stats_error", attributes=error_attrs, timestamp=start_time)

for job in active_jobs:
# we are using seconds for our metric calculations

metrics = stats.get(job.id, {})
span = tracer.start_span(job.status_code.name, start_time=start_time)

# set up attributes
job_span_attrs = {}
job_span_attrs.update(trace_attrs)
metrics = stats.get(job.id, {})
job_span_attrs["has_metrics"] = metrics != {}
job_span_attrs.update(metrics)

# this means the job is running
if metrics:
runtime_s = int(now / 1e9) - job.started_at
# protect against unexpected runtimes
if runtime_s > 0:
job_metrics = update_job_metrics(
job,
metrics,
duration_s,
runtime_s,
)
job_span_attrs.update(job_metrics)
else:
job_span_attrs.set("bad_tick_runtime", runtime_s)

# record span
span = tracer.start_span(job.status_code.name, start_time=start_time)
tracing.set_span_metadata(span, job, **job_span_attrs)
span.end(end_time)

return end_time


def update_job_metrics(job, raw_metrics, duration_s, runtime_s):
"""Update and persist per-job aggregate stats in the metrics db"""

job_metrics = read_job_metrics(job.id)

cpu = raw_metrics["cpu_percentage"]
mem = raw_metrics["memory_used"]

job_metrics["cpu_sample"] = cpu
job_metrics["cpu_cumsum"] += duration_s * cpu
job_metrics["cpu_mean"] = job_metrics["cpu_cumsum"] / runtime_s
job_metrics["cpu_peak"] = max(job_metrics["cpu_peak"], cpu)
job_metrics["mem_sample"] = mem
job_metrics["mem_cumsum"] += duration_s * mem
job_metrics["mem_mean"] = job_metrics["mem_cumsum"] / runtime_s
job_metrics["mem_peak"] = max(job_metrics["mem_peak"], mem)

write_job_metrics(job.id, job_metrics)

return job_metrics


if __name__ == "__main__":
configure_logging()

Expand Down
6 changes: 2 additions & 4 deletions jobrunner/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import requests

from jobrunner import config, queries, record_stats
from jobrunner import config, queries
from jobrunner.create_or_update_jobs import create_or_update_jobs
from jobrunner.lib.database import find_where, select_values
from jobrunner.lib.log_utils import configure_logging, set_log_context
Expand Down Expand Up @@ -143,21 +143,19 @@ def job_to_remote_format(job):
Convert our internal representation of a Job into whatever format the
job-server expects
"""

return {
"identifier": job.id,
"job_request_id": job.job_request_id,
"action": job.action,
"run_command": job.run_command,
"status": job.state.value,
"status_code": job.status_code.value,
"status_code": job.status_code.value if job.status_code else "",
"status_message": job.status_message or "",
"created_at": job.created_at_isoformat,
"updated_at": job.updated_at_isoformat,
"started_at": job.started_at_isoformat,
"completed_at": job.completed_at_isoformat,
"trace_context": job.trace_context,
"metrics": record_stats.read_job_metrics(job.id),
}


Expand Down
13 changes: 1 addition & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

import jobrunner.executors.local
from jobrunner import config, record_stats, tracing
from jobrunner import config, tracing
from jobrunner.executors import volumes
from jobrunner.job_executor import Study
from jobrunner.lib import database
Expand Down Expand Up @@ -195,17 +195,6 @@ def db(monkeypatch, request):
del database.CONNECTION_CACHE.__dict__[database_file]


@pytest.fixture(autouse=True)
def metrics_db(monkeypatch, request):
"""Create a throwaway metrics db."""
record_stats._conn = None
database_file = f"file:metrics-{request.node.name}?mode=memory&cache=shared"
monkeypatch.setattr(config, "METRICS_FILE", database_file)
yield
database.CONNECTION_CACHE.__dict__.pop(database_file, None)
record_stats._conn = None


@dataclass
class SubprocessStub:
calls: deque = field(default_factory=deque)
Expand Down
17 changes: 1 addition & 16 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections import defaultdict
from copy import deepcopy

from jobrunner import config, record_stats, tracing
from jobrunner import config, tracing
from jobrunner.job_executor import ExecutorState, JobResults, JobStatus
from jobrunner.lib import docker
from jobrunner.lib.database import insert
Expand Down Expand Up @@ -78,12 +78,6 @@ def job_factory(job_request=None, **kwargs):
values["created_at"] = int(timestamp)
if "updated_at" not in kwargs:
values["updated_at"] = int(timestamp)

if "started_at" not in kwargs:
status_code = kwargs.get("status_code", values["status_code"])
if status_code and status_code >= StatusCode.EXECUTING:
values["started_at"] = int(timestamp)

if "status_code_updated_at" not in kwargs:
values["status_code_updated_at"] = int(timestamp * 1e9)
values.update(kwargs)
Expand All @@ -109,15 +103,6 @@ def job_results_factory(timestamp_ns=None, **kwargs):
return JobResults(timestamp_ns=timestamp_ns, **values)


def metrics_factory(job=None, metrics=None):
if job is None:
job = job_factory()
if metrics is None:
metrics = {}

record_stats.write_job_metrics(job.id, metrics)


class StubExecutorAPI:
"""Dummy implementation of the ExecutorAPI, for use in tests.

Expand Down
Loading