diff --git a/jobrunner/config.py b/jobrunner/config.py index 5e1c5e93..54e9ddec 100644 --- a/jobrunner/config.py +++ b/jobrunner/config.py @@ -46,6 +46,7 @@ def _is_valid_backend_name(name): WORKDIR = Path(os.environ.get("WORKDIR", default_work_dir)).resolve() DATABASE_FILE = WORKDIR / "db.sqlite" +METRICS_FILE = WORKDIR / "metrics.sqlite" GIT_REPO_DIR = WORKDIR / "repos" # valid archive formats diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index b1d736fc..d79cecc5 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -11,7 +11,7 @@ from pipeline.legacy import get_all_output_patterns_from_project_file -from jobrunner import config +from jobrunner import config, record_stats from jobrunner.executors import volumes from jobrunner.job_executor import ( ExecutorAPI, @@ -241,16 +241,24 @@ def get_status(self, job_definition, timeout=15): f"docker timed out after {timeout}s inspecting container {name}" ) + metrics = record_stats.read_job_metrics(job_definition.id) + if container is None: # container doesn't exist if job_definition.cancelled: if volumes.get_volume_api(job_definition).volume_exists(job_definition): # jobs prepared but not running do not need to finalize, so we # proceed directly to the FINALIZED state here return JobStatus( - ExecutorState.FINALIZED, "Prepared job was cancelled" + ExecutorState.FINALIZED, + "Prepared job was cancelled", + metrics=metrics, ) else: - return JobStatus(ExecutorState.UNKNOWN, "Pending job was cancelled") + return JobStatus( + ExecutorState.UNKNOWN, + "Pending job was cancelled", + metrics=metrics, + ) # timestamp file presence means we have finished preparing timestamp_ns = volumes.get_volume_api(job_definition).read_timestamp( @@ -261,24 +269,31 @@ def get_status(self, job_definition, timeout=15): # re-prepare it anyway. if timestamp_ns is None: # we are Jon Snow - return JobStatus(ExecutorState.UNKNOWN) + return JobStatus(ExecutorState.UNKNOWN, metrics={}) else: # we've finish preparing - return JobStatus(ExecutorState.PREPARED, timestamp_ns=timestamp_ns) + return JobStatus( + ExecutorState.PREPARED, timestamp_ns=timestamp_ns, metrics=metrics + ) if container["State"]["Running"]: timestamp_ns = datestr_to_ns_timestamp(container["State"]["StartedAt"]) - return JobStatus(ExecutorState.EXECUTING, timestamp_ns=timestamp_ns) + return JobStatus( + ExecutorState.EXECUTING, timestamp_ns=timestamp_ns, metrics=metrics + ) elif job_definition.id in RESULTS: return JobStatus( ExecutorState.FINALIZED, timestamp_ns=RESULTS[job_definition.id].timestamp_ns, + metrics=metrics, ) else: # container present but not running, i.e. finished # Nb. this does not include prepared jobs, as they have a volume but not a container timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"]) - return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns) + return JobStatus( + ExecutorState.EXECUTED, timestamp_ns=timestamp_ns, metrics=metrics + ) def get_results(self, job_definition): if job_definition.id not in RESULTS: diff --git a/jobrunner/job_executor.py b/jobrunner/job_executor.py index 2ee0039d..da135bae 100644 --- a/jobrunner/job_executor.py +++ b/jobrunner/job_executor.py @@ -67,6 +67,7 @@ class JobStatus: timestamp_ns: int = ( None # timestamp this JobStatus occurred, in integer nanoseconds ) + metrics: dict = field(default_factory=dict) @dataclass diff --git a/jobrunner/models.py b/jobrunner/models.py index 1d7ab4bc..0b53450f 100644 --- a/jobrunner/models.py +++ b/jobrunner/models.py @@ -14,6 +14,7 @@ import secrets import shlex from enum import Enum +from functools import total_ordering from jobrunner.lib.commands import requires_db_access from jobrunner.lib.database import databaseclass, migration @@ -36,6 +37,7 @@ class State(Enum): # affordances in the web, cli and telemetry. +@total_ordering class StatusCode(Enum): # PENDING states # @@ -77,6 +79,10 @@ class StatusCode(Enum): def is_final_code(self): return self in StatusCode._FINAL_STATUS_CODES + def __lt__(self, other): + order = list(self.__class__) + return order.index(self) < order.index(other) + # used for tracing to know if a state is final or not StatusCode._FINAL_STATUS_CODES = [ @@ -245,6 +251,7 @@ class Job: # used to track the OTel trace context for this job trace_context: dict = None + # map of file -> error level4_excluded_files: dict = None # used to cache the job_request json by the tracing code diff --git a/jobrunner/record_stats.py b/jobrunner/record_stats.py index 67fd96ba..8e7f4bca 100644 --- a/jobrunner/record_stats.py +++ b/jobrunner/record_stats.py @@ -1,10 +1,15 @@ """ Super crude docker/system stats logger """ +import json import logging +import sqlite3 import subprocess import sys +import threading import time +from collections import defaultdict +from pathlib import Path from opentelemetry import trace @@ -17,12 +22,99 @@ log = logging.getLogger(__name__) tracer = trace.get_tracer("ticks") +# Simplest possible table. We're only storing aggregate data +DDL = """ +CREATE TABLE IF NOT EXISTS jobs ( + id TEXT, + metrics TEXT, + PRIMARY KEY (id) +) +""" + +CONNECTION_CACHE = threading.local() + + +def get_connection(readonly=True): + db_file = config.METRICS_FILE + + # developer check against using memory dbs, which cannot be used with this + # function, as we need to set mode ourselves + assert isinstance(db_file, Path), "config.METRICS_FILE db must be file path" + assert not str(db_file).startswith( + "file:" + ), "config.METRICS_FILE db must be file path, not url" + + if readonly: + db = f"file:{db_file}?mode=ro" + else: + db = f"file:{db_file}?mode=rwc" + + cache = CONNECTION_CACHE.__dict__ + if db not in cache: + try: + conn = sqlite3.connect(db, uri=True) + except sqlite3.OperationalError as exc: + # if its readonly, we cannot create file, so fail gracefully. + # Caller should check for conn being None. + if readonly and "unable to open" in str(exc).lower(): + return None + raise + + # manual transactions + conn.isolation_level = None + # Support dict-like access to rows + conn.row_factory = sqlite3.Row + + if not readonly: + conn.execute("PRAGMA journal_mode = WAL") + conn.execute(DDL) + + cache[db] = conn + + return cache[db] + + +def read_job_metrics(job_id, **metrics): + conn = get_connection(readonly=True) + + raw_metrics = None + + if conn is not None: + try: + raw_metrics = conn.execute( + "SELECT metrics FROM jobs WHERE id = ?", + (job_id,), + ).fetchone() + except sqlite3.OperationalError as exc: + if "no such table" not in str(exc).lower(): + raise + + if raw_metrics is None: + metrics = {} + else: + metrics = json.loads(raw_metrics["metrics"]) + return defaultdict(float, metrics) + + +def write_job_metrics(job_id, metrics): + raw_metrics = json.dumps(metrics) + get_connection(readonly=False).execute( + """ + INSERT INTO jobs (id, metrics) VALUES (?, ?) + ON CONFLICT(id) DO UPDATE set metrics = ? + """, + (job_id, raw_metrics, raw_metrics), + ) + def main(): last_run = None while True: before = time.time() - last_run = record_tick_trace(last_run) + active_jobs = database.find_where( + models.Job, state__in=[models.State.PENDING, models.State.RUNNING] + ) + last_run = record_tick_trace(last_run, active_jobs) # record_tick_trace might have take a while, so sleep the remainding interval # enforce a minimum time of 3s to ensure we don't hammer honeycomb or @@ -31,7 +123,7 @@ def main(): time.sleep(max(2, config.STATS_POLL_INTERVAL - elapsed)) -def record_tick_trace(last_run): +def record_tick_trace(last_run, active_jobs): """Record a period tick trace of current jobs. This will give us more realtime information than the job traces, which only @@ -69,10 +161,7 @@ def record_tick_trace(last_run): # every span has the same timings start_time = last_run end_time = now - - active_jobs = database.find_where( - models.Job, state__in=[models.State.PENDING, models.State.RUNNING] - ) + duration_s = int((end_time - start_time) / 1e9) with tracer.start_as_current_span( "TICK", start_time=start_time, attributes=trace_attrs @@ -82,22 +171,61 @@ def record_tick_trace(last_run): root.add_event("stats_error", attributes=error_attrs, timestamp=start_time) for job in active_jobs: - span = tracer.start_span(job.status_code.name, start_time=start_time) + # we are using seconds for our metric calculations + + metrics = stats.get(job.id, {}) # set up attributes job_span_attrs = {} job_span_attrs.update(trace_attrs) - metrics = stats.get(job.id, {}) job_span_attrs["has_metrics"] = metrics != {} job_span_attrs.update(metrics) + # this means the job is running + if metrics: + runtime_s = int(now / 1e9) - job.started_at + # protect against unexpected runtimes + if runtime_s > 0: + job_metrics = update_job_metrics( + job, + metrics, + duration_s, + runtime_s, + ) + job_span_attrs.update(job_metrics) + else: + job_span_attrs.set("bad_tick_runtime", runtime_s) + # record span + span = tracer.start_span(job.status_code.name, start_time=start_time) tracing.set_span_metadata(span, job, **job_span_attrs) span.end(end_time) return end_time +def update_job_metrics(job, raw_metrics, duration_s, runtime_s): + """Update and persist per-job aggregate stats in the metrics db""" + + job_metrics = read_job_metrics(job.id) + + cpu = raw_metrics["cpu_percentage"] + mem = raw_metrics["memory_used"] + + job_metrics["cpu_sample"] = cpu + job_metrics["cpu_cumsum"] += duration_s * cpu + job_metrics["cpu_mean"] = job_metrics["cpu_cumsum"] / runtime_s + job_metrics["cpu_peak"] = max(job_metrics["cpu_peak"], cpu) + job_metrics["mem_sample"] = mem + job_metrics["mem_cumsum"] += duration_s * mem + job_metrics["mem_mean"] = job_metrics["mem_cumsum"] / runtime_s + job_metrics["mem_peak"] = max(job_metrics["mem_peak"], mem) + + write_job_metrics(job.id, job_metrics) + + return job_metrics + + if __name__ == "__main__": configure_logging() diff --git a/jobrunner/sync.py b/jobrunner/sync.py index f0a89439..d0086c38 100644 --- a/jobrunner/sync.py +++ b/jobrunner/sync.py @@ -9,7 +9,7 @@ import requests -from jobrunner import config, queries +from jobrunner import config, queries, record_stats from jobrunner.create_or_update_jobs import create_or_update_jobs from jobrunner.lib.database import find_where, select_values from jobrunner.lib.log_utils import configure_logging, set_log_context @@ -143,19 +143,21 @@ def job_to_remote_format(job): Convert our internal representation of a Job into whatever format the job-server expects """ + return { "identifier": job.id, "job_request_id": job.job_request_id, "action": job.action, "run_command": job.run_command, "status": job.state.value, - "status_code": job.status_code.value if job.status_code else "", + "status_code": job.status_code.value, "status_message": job.status_message or "", "created_at": job.created_at_isoformat, "updated_at": job.updated_at_isoformat, "started_at": job.started_at_isoformat, "completed_at": job.completed_at_isoformat, "trace_context": job.trace_context, + "metrics": record_stats.read_job_metrics(job.id), } diff --git a/tests/conftest.py b/tests/conftest.py index 80cb53de..4d86e31d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,7 @@ from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter import jobrunner.executors.local -from jobrunner import config, tracing +from jobrunner import config, record_stats, tracing from jobrunner.executors import volumes from jobrunner.job_executor import Study from jobrunner.lib import database @@ -195,6 +195,18 @@ def db(monkeypatch, request): del database.CONNECTION_CACHE.__dict__[database_file] +@pytest.fixture(autouse=True) +def metrics_db(monkeypatch, tmp_path, request): + """Create a throwaway metrics db. + + It must be a file, not memory, because we use readonly connections. + """ + db_path = tmp_path / "metrics.db" + monkeypatch.setattr(config, "METRICS_FILE", db_path) + yield + record_stats.CONNECTION_CACHE.__dict__.clear() + + @dataclass class SubprocessStub: calls: deque = field(default_factory=deque) diff --git a/tests/factories.py b/tests/factories.py index 7872769d..968eb440 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -4,7 +4,7 @@ from collections import defaultdict from copy import deepcopy -from jobrunner import config, tracing +from jobrunner import config, record_stats, tracing from jobrunner.job_executor import ExecutorState, JobResults, JobStatus from jobrunner.lib import docker from jobrunner.lib.database import insert @@ -78,6 +78,12 @@ def job_factory(job_request=None, **kwargs): values["created_at"] = int(timestamp) if "updated_at" not in kwargs: values["updated_at"] = int(timestamp) + + if "started_at" not in kwargs: + status_code = kwargs.get("status_code", values["status_code"]) + if status_code and status_code >= StatusCode.EXECUTING: + values["started_at"] = int(timestamp) + if "status_code_updated_at" not in kwargs: values["status_code_updated_at"] = int(timestamp * 1e9) values.update(kwargs) @@ -103,6 +109,15 @@ def job_results_factory(timestamp_ns=None, **kwargs): return JobResults(timestamp_ns=timestamp_ns, **values) +def metrics_factory(job=None, metrics=None): + if job is None: + job = job_factory() + if metrics is None: + metrics = {} + + record_stats.write_job_metrics(job.id, metrics) + + class StubExecutorAPI: """Dummy implementation of the ExecutorAPI, for use in tests. diff --git a/tests/lib/test_database.py b/tests/lib/test_database.py index 56df4edc..5620aa5c 100644 --- a/tests/lib/test_database.py +++ b/tests/lib/test_database.py @@ -9,6 +9,7 @@ ensure_db, ensure_valid_db, find_one, + get_connection, insert, migrate_db, select_values, @@ -17,6 +18,12 @@ from jobrunner.models import Job, State +def test_get_connection(): + db = "file:test_get_connection?mode=memory&cache=shared" + conn = get_connection(db) + assert conn is get_connection(db) + + def test_basic_roundtrip(tmp_work_dir): job = Job( id="foo123", diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 24472a19..9694bf11 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -4,12 +4,12 @@ import pytest -from jobrunner import config +from jobrunner import config, models, record_stats from jobrunner.executors import local, volumes from jobrunner.job_executor import ExecutorState, JobDefinition, Privacy, Study from jobrunner.lib import datestr_to_ns_timestamp, docker from tests.conftest import SUPPORTED_VOLUME_APIS -from tests.factories import ensure_docker_images_present +from tests.factories import ensure_docker_images_present, job_factory # this is parametized fixture, and test using it will run multiple times, once @@ -215,7 +215,7 @@ def test_prepare_job_no_input_file(docker_cleanup, job_definition, volume_api): @pytest.mark.needs_docker -def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, volume_api): +def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, db, volume_api): # check limits are applied job_definition.cpu_count = 1.5 job_definition.memory_limit = "1G" @@ -229,18 +229,52 @@ def test_execute_success(docker_cleanup, job_definition, tmp_work_dir, volume_ap status = api.execute(job_definition) assert status.state == ExecutorState.EXECUTING - # could be in either state - assert api.get_status(job_definition).state in ( - ExecutorState.EXECUTING, - ExecutorState.EXECUTED, - ) - container_data = docker.container_inspect(local.container_name(job_definition)) assert container_data["State"]["ExitCode"] == 0 assert container_data["HostConfig"]["NanoCpus"] == int(1.5 * 1e9) assert container_data["HostConfig"]["Memory"] == 2**30 # 1G +@pytest.mark.needs_docker +def test_execute_metrics(docker_cleanup, job_definition, tmp_work_dir, db): + job_definition.args = ["sleep", "10"] + last_run = time.time_ns() + + api = local.LocalDockerAPI() + + status = api.prepare(job_definition) + assert status.state == ExecutorState.PREPARED + + # we need scheduler job state to be able to collect stats + job = job_factory( + id=job_definition.id, + state=models.State.RUNNING, + status_code=models.StatusCode.EXECUTING, + started_at=int(last_run / 1e9), + ) + + status = api.execute(job_definition) + assert status.state == ExecutorState.EXECUTING + + # simulate stats thread collecting stats + record_stats.record_tick_trace(last_run, [job]) + + docker.kill(local.container_name(job_definition)) + + status = wait_for_state(api, job_definition, ExecutorState.EXECUTED) + + assert list(status.metrics.keys()) == [ + "cpu_sample", + "cpu_cumsum", + "cpu_mean", + "cpu_peak", + "mem_sample", + "mem_cumsum", + "mem_mean", + "mem_peak", + ] + + @pytest.mark.skipif( sys.platform != "linux" and sys.platform != "darwin", reason="linux/darwin only" ) diff --git a/tests/test_record_stats.py b/tests/test_record_stats.py index a6e389c8..533f2cfe 100644 --- a/tests/test_record_stats.py +++ b/tests/test_record_stats.py @@ -1,10 +1,56 @@ +import sqlite3 import subprocess import time -from jobrunner import record_stats +from jobrunner import config, record_stats from jobrunner.models import State, StatusCode from tests.conftest import get_trace -from tests.factories import job_factory +from tests.factories import job_factory, metrics_factory + + +def test_get_connection_readonly(): + conn = record_stats.get_connection(readonly=True) + assert conn is None + + conn = record_stats.get_connection(readonly=False) + assert conn is record_stats.get_connection(readonly=False) # cached + assert conn.isolation_level is None + assert conn.row_factory is sqlite3.Row + assert conn.execute("PRAGMA journal_mode").fetchone()["journal_mode"] == "wal" + assert ( + conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name=?", ("jobs",) + ).fetchone()["name"] + == "jobs" + ) + + ro_conn = record_stats.get_connection(readonly=True) + assert ro_conn is record_stats.get_connection(readonly=True) # cached + assert ro_conn is not conn + assert conn.isolation_level is None + assert conn.row_factory is sqlite3.Row + assert conn.execute("PRAGMA journal_mode").fetchone()["journal_mode"] == "wal" + assert ( + conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name=?", ("jobs",) + ).fetchone()["name"] + == "jobs" + ) + + +def test_read_write_job_metrics(): + + assert record_stats.read_job_metrics("id") == {} + + # create db file + sqlite3.connect(config.METRICS_FILE) + + # possible race condition, no table yet, should still report no metrics + assert record_stats.read_job_metrics("id") == {} + + record_stats.write_job_metrics("id", {"test": 1.0}) + + assert record_stats.read_job_metrics("id") == {"test": 1.0} def test_record_tick_trace(db, freezer, monkeypatch): @@ -28,12 +74,12 @@ def test_record_tick_trace(db, freezer, monkeypatch): # this should not be tick'd job_factory(state=State.SUCCEEDED, status_code=StatusCode.SUCCEEDED) - last_run1 = record_stats.record_tick_trace(None) + last_run1 = record_stats.record_tick_trace(None, jobs) assert len(get_trace("ticks")) == 0 freezer.tick(10) - last_run2 = record_stats.record_tick_trace(last_run1) + last_run2 = record_stats.record_tick_trace(last_run1, jobs) assert last_run2 == last_run1 + 10 * 1e9 spans = get_trace("ticks") @@ -56,7 +102,17 @@ def test_record_tick_trace(db, freezer, monkeypatch): if job is running_job: assert span.attributes["has_metrics"] is True assert span.attributes["cpu_percentage"] == 50.0 + assert span.attributes["cpu_sample"] == 50.0 + assert span.attributes["cpu_sample"] == 50.0 + assert span.attributes["cpu_peak"] == 50.0 + assert span.attributes["cpu_cumsum"] == 500.0 # 50% * 10s + assert span.attributes["cpu_mean"] == 50.0 + assert span.attributes["memory_used"] == 1000 + assert span.attributes["mem_sample"] == 1000 + assert span.attributes["mem_peak"] == 1000 + assert span.attributes["mem_cumsum"] == 10000 # 1000 * 10s + assert span.attributes["mem_mean"] == 1000 else: assert span.attributes["has_metrics"] is False @@ -64,7 +120,7 @@ def test_record_tick_trace(db, freezer, monkeypatch): def test_record_tick_trace_stats_timeout(db, freezer, monkeypatch): - job_factory(status_code=StatusCode.EXECUTING) + job = job_factory(status_code=StatusCode.EXECUTING) def timeout(): raise subprocess.TimeoutExpired("cmd", 10) @@ -74,7 +130,7 @@ def timeout(): last_run = time.time() freezer.tick(10) - record_stats.record_tick_trace(last_run) + record_stats.record_tick_trace(last_run, [job]) assert len(get_trace("ticks")) == 2 spans = get_trace("ticks") @@ -82,13 +138,14 @@ def timeout(): assert "cpu_percentage" not in span.attributes assert "memory_used" not in span.attributes + assert "mem_peak" not in span.attributes assert span.attributes["has_metrics"] is False assert span.attributes["stats_timeout"] is True assert span.attributes["stats_error"] is False def test_record_tick_trace_stats_error(db, freezer, monkeypatch): - job_factory(status_code=StatusCode.EXECUTING) + job = job_factory(status_code=StatusCode.EXECUTING) def error(): raise subprocess.CalledProcessError( @@ -98,7 +155,7 @@ def error(): monkeypatch.setattr(record_stats, "get_job_stats", error) last_run = time.time() - record_stats.record_tick_trace(last_run) + record_stats.record_tick_trace(last_run, [job]) assert len(get_trace("ticks")) == 2 spans = get_trace("ticks") @@ -106,6 +163,7 @@ def error(): assert "cpu_percentage" not in span.attributes assert "memory_used" not in span.attributes + assert "mem_peak" not in span.attributes assert span.attributes["has_metrics"] is False assert span.attributes["stats_timeout"] is False assert span.attributes["stats_error"] is True @@ -117,3 +175,73 @@ def error(): assert root.events[0].attributes["cmd"] == "test cmd" assert root.events[0].attributes["output"] == "stderr\n\nstdout" assert root.events[0].name == "stats_error" + + +def test_update_job_metrics(db): + + job = job_factory(status_code=StatusCode.EXECUTING) + metrics_factory(job) + + metrics = record_stats.read_job_metrics(job.id) + + assert metrics == {} + + # 50%/100m for 1s + record_stats.update_job_metrics( + job, + {"cpu_percentage": 50, "memory_used": 100}, + duration_s=1.0, + runtime_s=1.0, + ) + + metrics = record_stats.read_job_metrics(job.id) + assert metrics == { + "cpu_cumsum": 50.0, + "cpu_mean": 50.0, + "cpu_peak": 50, + "cpu_sample": 50, + "mem_cumsum": 100.0, + "mem_mean": 100.0, + "mem_peak": 100, + "mem_sample": 100, + } + + # 100%/1000m for 1s + record_stats.update_job_metrics( + job, + {"cpu_percentage": 100, "memory_used": 1000}, + duration_s=1.0, + runtime_s=2.0, + ) + + metrics = record_stats.read_job_metrics(job.id) + assert metrics == { + "cpu_cumsum": 150.0, + "cpu_mean": 75.0, + "cpu_peak": 100, + "cpu_sample": 100, + "mem_cumsum": 1100.0, + "mem_mean": 550.0, + "mem_peak": 1000, + "mem_sample": 1000, + } + + # 100%/1000m for 8s + record_stats.update_job_metrics( + job, + {"cpu_percentage": 100, "memory_used": 1000}, + duration_s=8.0, + runtime_s=10.0, + ) + + metrics = record_stats.read_job_metrics(job.id) + assert metrics == { + "cpu_cumsum": 950.0, + "cpu_mean": 95.0, + "cpu_peak": 100, + "cpu_sample": 100, + "mem_cumsum": 9100.0, + "mem_mean": 910.0, + "mem_peak": 1000, + "mem_sample": 1000, + } diff --git a/tests/test_sync.py b/tests/test_sync.py index a4a15de4..0a63be14 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -4,6 +4,7 @@ from jobrunner import config, queries, sync from jobrunner.models import JobRequest +from tests.factories import job_factory, metrics_factory def test_job_request_from_remote_format(): @@ -76,6 +77,33 @@ def test_job_request_from_remote_format_database_name_fallback(): assert job_request == expected +def test_job_to_remote_format_default(db): + job = job_factory() + + json = sync.job_to_remote_format(job) + + assert json["action"] == "action_name" + assert json["run_command"] == "python myscript.py" + assert json["status"] == "pending" + assert json["status_code"] == "created" + assert json["metrics"] == {} + + +def test_job_to_remote_format_null_status_message(db): + job = job_factory(status_message=None) + json = sync.job_to_remote_format(job) + assert json["status_message"] == "" + + +def test_job_to_remote_format_metrics(db): + job = job_factory() + metrics_factory(job, metrics={"test": 0.0}) + + json = sync.job_to_remote_format(job) + + assert json["metrics"] == {"test": 0.0} + + def test_session_request_no_flags(db, responses): responses.add( method="GET",