Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect, store and send aggregate metrics, take 2 #691

Merged
merged 5 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def _is_valid_backend_name(name):

WORKDIR = Path(os.environ.get("WORKDIR", default_work_dir)).resolve()
DATABASE_FILE = WORKDIR / "db.sqlite"
METRICS_FILE = WORKDIR / "metrics.sqlite"
GIT_REPO_DIR = WORKDIR / "repos"

# valid archive formats
Expand Down
29 changes: 22 additions & 7 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from pipeline.legacy import get_all_output_patterns_from_project_file

from jobrunner import config
from jobrunner import config, record_stats
from jobrunner.executors import volumes
from jobrunner.job_executor import (
ExecutorAPI,
Expand Down Expand Up @@ -241,16 +241,24 @@ def get_status(self, job_definition, timeout=15):
f"docker timed out after {timeout}s inspecting container {name}"
)

metrics = record_stats.read_job_metrics(job_definition.id)

if container is None: # container doesn't exist
if job_definition.cancelled:
if volumes.get_volume_api(job_definition).volume_exists(job_definition):
# jobs prepared but not running do not need to finalize, so we
# proceed directly to the FINALIZED state here
return JobStatus(
ExecutorState.FINALIZED, "Prepared job was cancelled"
ExecutorState.FINALIZED,
"Prepared job was cancelled",
metrics=metrics,
)
else:
return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled")
return JobStatus(
ExecutorState.UNKNOWN,
"Pending job was cancelled",
metrics=metrics,
)

# timestamp file presence means we have finished preparing
timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp(
Expand All @@ -261,24 +269,31 @@ def get_status(self, job_definition, timeout=15):
# re-prepare it anyway.
if timestamp_ns is None:
# we are Jon Snow
return JobStatus(ExecutorState.UNKNOWN)
return JobStatus(ExecutorState.UNKNOWN, metrics={})
else:
# we've finish preparing
return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics
)

if container["State"]["Running"]:
timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"])
return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics
)
elif job_definition.id in RESULTS:
return JobStatus(
ExecutorState.FINALIZED,
timestamp_ns=RESULTS[job_definition.id].timestamp_ns,
metrics=metrics,
)
else:
# container present but not running, i.e. finished
# Nb. this does not include prepared jobs, as they have a volume but not a container
timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)
return JobStatus(
ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics
)

def get_results(self, job_definition):
if job_definition.id not in RESULTS:
Expand Down
1 change: 1 addition & 0 deletions jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class JobStatus:
timestamp_ns: int = (
None # timestamp this JobStatus occurred, in integer nanoseconds
)
metrics: dict = field(default_factory=dict)


@dataclass
Expand Down
7 changes: 7 additions & 0 deletions jobrunner/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import secrets
import shlex
from enum import Enum
from functools import total_ordering

from jobrunner.lib.commands import requires_db_access
from jobrunner.lib.database import databaseclass, migration
Expand All @@ -36,6 +37,7 @@ class State(Enum):
# affordances in the web, cli and telemetry.


@total_ordering
class StatusCode(Enum):
# PENDING states
#
Expand Down Expand Up @@ -77,6 +79,10 @@ class StatusCode(Enum):
def is_final_code(self):
return self in StatusCode._FINAL_STATUS_CODES

def __lt__(self, other):
order = list(self.__class__)
return order.index(self) < order.index(other)


# used for tracing to know if a state is final or not
StatusCode._FINAL_STATUS_CODES = [
Expand Down Expand Up @@ -245,6 +251,7 @@ class Job:
# used to track the OTel trace context for this job
trace_context: dict = None

# map of file -> error
level4_excluded_files: dict = None

# used to cache the job_request json by the tracing code
Expand Down
144 changes: 136 additions & 8 deletions jobrunner/record_stats.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
"""
Super crude docker/system stats logger
"""
import json
import logging
import sqlite3
import subprocess
import sys
import threading
import time
from collections import defaultdict
from pathlib import Path

from opentelemetry import trace

Expand All @@ -17,12 +22,99 @@
log = logging.getLogger(__name__)
tracer = trace.get_tracer("ticks")

# Simplest possible table. We're only storing aggregate data
DDL = """
CREATE TABLE IF NOT EXISTS jobs (
id TEXT,
metrics TEXT,
PRIMARY KEY (id)
)
"""

CONNECTION_CACHE = threading.local()


def get_connection(readonly=True):
db_file = config.METRICS_FILE

# developer check against using memory dbs, which cannot be used with this
# function, as we need to set mode ourselves
assert isinstance(db_file, Path), "config.METRICS_FILE db must be file path"
assert not str(db_file).startswith(
"file:"
), "config.METRICS_FILE db must be file path, not url"

if readonly:
db = f"file:{db_file}?mode=ro"
else:
db = f"file:{db_file}?mode=rwc"

cache = CONNECTION_CACHE.__dict__
if db not in cache:
try:
conn = sqlite3.connect(db, uri=True)
except sqlite3.OperationalError as exc:
# if its readonly, we cannot create file, so fail gracefully.
# Caller should check for conn being None.
if readonly and "unable to open" in str(exc).lower():
return None
raise

# manual transactions
conn.isolation_level = None
# Support dict-like access to rows
conn.row_factory = sqlite3.Row

if not readonly:
conn.execute("PRAGMA journal_mode = WAL")
conn.execute(DDL)

cache[db] = conn

return cache[db]


def read_job_metrics(job_id, **metrics):
conn = get_connection(readonly=True)

raw_metrics = None

if conn is not None:
try:
raw_metrics = conn.execute(
"SELECT metrics FROM jobs WHERE id = ?",
(job_id,),
).fetchone()
except sqlite3.OperationalError as exc:
if "no such table" not in str(exc).lower():
raise

if raw_metrics is None:
metrics = {}
else:
metrics = json.loads(raw_metrics["metrics"])
return defaultdict(float, metrics)


def write_job_metrics(job_id, metrics):
raw_metrics = json.dumps(metrics)
get_connection(readonly=False).execute(
bloodearnest marked this conversation as resolved.
Show resolved Hide resolved
"""
INSERT INTO jobs (id, metrics) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set metrics = ?
""",
(job_id, raw_metrics, raw_metrics),
)


def main():
last_run = None
while True:
before = time.time()
last_run = record_tick_trace(last_run)
active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)
last_run = record_tick_trace(last_run, active_jobs)

# record_tick_trace might have take a while, so sleep the remainding interval
# enforce a minimum time of 3s to ensure we don't hammer honeycomb or
Expand All @@ -31,7 +123,7 @@ def main():
time.sleep(max(2, config.STATS_POLL_INTERVAL - elapsed))


def record_tick_trace(last_run):
def record_tick_trace(last_run, active_jobs):
"""Record a period tick trace of current jobs.

This will give us more realtime information than the job traces, which only
Expand Down Expand Up @@ -69,10 +161,7 @@ def record_tick_trace(last_run):
# every span has the same timings
start_time = last_run
end_time = now

active_jobs = database.find_where(
models.Job, state__in=[models.State.PENDING, models.State.RUNNING]
)
duration_s = int((end_time - start_time) / 1e9)

with tracer.start_as_current_span(
"TICK", start_time=start_time, attributes=trace_attrs
Expand All @@ -82,22 +171,61 @@ def record_tick_trace(last_run):
root.add_event("stats_error", attributes=error_attrs, timestamp=start_time)

for job in active_jobs:
span = tracer.start_span(job.status_code.name, start_time=start_time)
# we are using seconds for our metric calculations

metrics = stats.get(job.id, {})

# set up attributes
job_span_attrs = {}
job_span_attrs.update(trace_attrs)
metrics = stats.get(job.id, {})
job_span_attrs["has_metrics"] = metrics != {}
job_span_attrs.update(metrics)

# this means the job is running
if metrics:
runtime_s = int(now / 1e9) - job.started_at
# protect against unexpected runtimes
if runtime_s > 0:
job_metrics = update_job_metrics(
job,
metrics,
duration_s,
runtime_s,
)
job_span_attrs.update(job_metrics)
else:
job_span_attrs.set("bad_tick_runtime", runtime_s)

# record span
span = tracer.start_span(job.status_code.name, start_time=start_time)
tracing.set_span_metadata(span, job, **job_span_attrs)
span.end(end_time)

return end_time


def update_job_metrics(job, raw_metrics, duration_s, runtime_s):
"""Update and persist per-job aggregate stats in the metrics db"""

job_metrics = read_job_metrics(job.id)

cpu = raw_metrics["cpu_percentage"]
mem = raw_metrics["memory_used"]

job_metrics["cpu_sample"] = cpu
job_metrics["cpu_cumsum"] += duration_s * cpu
job_metrics["cpu_mean"] = job_metrics["cpu_cumsum"] / runtime_s
job_metrics["cpu_peak"] = max(job_metrics["cpu_peak"], cpu)
job_metrics["mem_sample"] = mem
job_metrics["mem_cumsum"] += duration_s * mem
job_metrics["mem_mean"] = job_metrics["mem_cumsum"] / runtime_s
job_metrics["mem_peak"] = max(job_metrics["mem_peak"], mem)

write_job_metrics(job.id, job_metrics)

return job_metrics


if __name__ == "__main__":
configure_logging()

Expand Down
6 changes: 4 additions & 2 deletions jobrunner/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import requests

from jobrunner import config, queries
from jobrunner import config, queries, record_stats
from jobrunner.create_or_update_jobs import create_or_update_jobs
from jobrunner.lib.database import find_where, select_values
from jobrunner.lib.log_utils import configure_logging, set_log_context
Expand Down Expand Up @@ -143,19 +143,21 @@ def job_to_remote_format(job):
Convert our internal representation of a Job into whatever format the
job-server expects
"""

return {
"identifier": job.id,
"job_request_id": job.job_request_id,
"action": job.action,
"run_command": job.run_command,
"status": job.state.value,
"status_code": job.status_code.value if job.status_code else "",
"status_code": job.status_code.value,
"status_message": job.status_message or "",
"created_at": job.created_at_isoformat,
"updated_at": job.updated_at_isoformat,
"started_at": job.started_at_isoformat,
"completed_at": job.completed_at_isoformat,
"trace_context": job.trace_context,
"metrics": record_stats.read_job_metrics(job.id),
}


Expand Down
14 changes: 13 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

import jobrunner.executors.local
from jobrunner import config, tracing
from jobrunner import config, record_stats, tracing
from jobrunner.executors import volumes
from jobrunner.job_executor import Study
from jobrunner.lib import database
Expand Down Expand Up @@ -195,6 +195,18 @@ def db(monkeypatch, request):
del database.CONNECTION_CACHE.__dict__[database_file]


@pytest.fixture(autouse=True)
def metrics_db(monkeypatch, tmp_path, request):
"""Create a throwaway metrics db.

It must be a file, not memory, because we use readonly connections.
"""
db_path = tmp_path / "metrics.db"
monkeypatch.setattr(config, "METRICS_FILE", db_path)
yield
record_stats.CONNECTION_CACHE.__dict__.clear()


@dataclass
class SubprocessStub:
calls: deque = field(default_factory=deque)
Expand Down
Loading