From c72e7efd6f680dd04a999a1749c89cacd7676e93 Mon Sep 17 00:00:00 2001 From: Daniel Milroy Date: Thu, 25 Apr 2024 19:34:50 -0700 Subject: [PATCH] resource module: update stats to include failed matches Problem: failed matches for job specifications with unsatisfiable constraints have been observed restricting the rate at which valid requests are scheduled. The resource module does not currently track stats on failed matches. Add tracking and reporting of stats on failed matches. --- resource/modules/resource_match.cpp | 169 ++++++++++++++++++---------- 1 file changed, 112 insertions(+), 57 deletions(-) diff --git a/resource/modules/resource_match.cpp b/resource/modules/resource_match.cpp index fb4606239..1954ddf19 100644 --- a/resource/modules/resource_match.cpp +++ b/resource/modules/resource_match.cpp @@ -42,17 +42,37 @@ using namespace Flux::opts_manager; ******************************************************************************/ struct match_perf_t { - double load; /* Graph load time */ - uint64_t njobs; /* Total match count */ - uint64_t njobs_reset; /* Jobs since reset match count */ + double load = 0.0; /* Graph load time */ /* Graph uptime in seconds */ - std::chrono::time_point graph_uptime; + std::chrono::time_point graph_uptime + = std::chrono::system_clock::now (); /* Time since stats were last cleared */ - std::chrono::time_point time_since_reset; - double min; /* Min match time */ - double max; /* Max match time */ - double mean; /* Mean match time */ - double M2; /* Welford's algorithm */ + std::chrono::time_point time_since_reset + = std::chrono::system_clock::now (); + struct time_stats { + void update_stats (double elapsed) { + /* Update using Welford's algorithm */ + double delta = 0.0; + double delta2 = 0.0; + njobs++; + njobs_reset++; + min = (min > elapsed)? elapsed : min; + max = (max < elapsed)? elapsed : max; + // Welford's online algorithm for variance + delta = elapsed - avg; + avg += delta / (double)njobs; + delta2 = elapsed - avg; + M2 += delta * delta2; + } + uint64_t njobs = 0; /* Total match count */ + uint64_t njobs_reset = 0; /* Jobs since match count reset */ + double min = std::numeric_limits::max (); /* Min match time */ + double max = 0.0; /* Max match time */ + double avg = 0.0; /* Average match time */ + double M2 = 0.0; /* Welford's algorithm */ + }; + time_stats succeeded; + time_stats failed; }; class msg_wrap_t { @@ -353,15 +373,6 @@ static std::shared_ptr getctx (flux_t *h) ctx->h = h; ctx->handlers = NULL; set_default_args (ctx); - ctx->perf.load = 0.0f; - ctx->perf.njobs = 0; - ctx->perf.njobs_reset = 0; - ctx->perf.min = std::numeric_limits::max (); - ctx->perf.max = 0.0f; - ctx->perf.mean = 0.0f; - ctx->perf.M2 = 0.0f; - ctx->perf.graph_uptime = std::chrono::system_clock::now (); - ctx->perf.time_since_reset = std::chrono::system_clock::now (); ctx->matcher = nullptr; /* Cannot be allocated at this point */ ctx->fgraph = nullptr; /* Cannot be allocated at this point */ ctx->writers = nullptr; /* Cannot be allocated at this point */ @@ -1514,19 +1525,12 @@ static int init_resource_graph (std::shared_ptr &ctx) ******************************************************************************/ static void update_match_perf (std::shared_ptr &ctx, - double elapse) + double elapsed, bool match_success) { - double delta = 0.0f; - double delta2 = 0.2f; - ctx->perf.njobs++; - ctx->perf.njobs_reset++; - ctx->perf.min = (ctx->perf.min > elapse)? elapse : ctx->perf.min; - ctx->perf.max = (ctx->perf.max < elapse)? elapse : ctx->perf.max; - // Welford's online algorithm for variance - delta = elapse - ctx->perf.mean; - ctx->perf.mean += delta / (double)ctx->perf.njobs; - delta2 = elapse - ctx->perf.mean; - ctx->perf.M2 += delta * delta2; + if (match_success) + ctx->perf.succeeded.update_stats (elapsed); + else + ctx->perf.failed.update_stats (elapsed); } static inline std::string get_status_string (int64_t now, int64_t at) @@ -1776,6 +1780,9 @@ static int run_match (std::shared_ptr &ctx, int64_t jobid, (start.time_since_epoch ()); *at = *now = epoch.count (); if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) { + elapsed = std::chrono::system_clock::now () - start; + *ov = elapsed.count (); + update_match_perf (ctx, *ov, false); goto done; } if ( (rc = ctx->writers->emit (o)) < 0) { @@ -1786,7 +1793,7 @@ static int run_match (std::shared_ptr &ctx, int64_t jobid, rsv = (*now != *at)? true : false; elapsed = std::chrono::system_clock::now () - start; *ov = elapsed.count (); - update_match_perf (ctx, *ov); + update_match_perf (ctx, *ov, true); if (cmd != std::string ("satisfiability")) { if ( (rc = track_schedule_info (ctx, jobid, @@ -1818,6 +1825,9 @@ static int run_update (std::shared_ptr &ctx, int64_t jobid, goto done; } if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) { + elapsed = std::chrono::system_clock::now () - start; + ov = elapsed.count (); + update_match_perf (ctx, ov, false); flux_log_error (ctx->h, "%s: run", __FUNCTION__); goto done; } @@ -1827,7 +1837,7 @@ static int run_update (std::shared_ptr &ctx, int64_t jobid, } elapsed = std::chrono::system_clock::now () - start; ov = elapsed.count (); - update_match_perf (ctx, ov); + update_match_perf (ctx, ov, true); if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) { flux_log_error (ctx->h, "%s: can't add job info (id=%jd)", __FUNCTION__, (intmax_t)jobid); @@ -2177,20 +2187,35 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, std::shared_ptr ctx = getctx ((flux_t *)arg); int saved_errno; json_t *o = nullptr; - double mean = 0.0f; + json_t *match_succeeded = nullptr; + json_t *match_failed = nullptr; + double avg = 0.0f; double min = 0.0f; double variance = 0.0f; + // Failed match stats + double avg_failed = 0.0f; + double min_failed = 0.0f; + double variance_failed = 0.0f; int64_t graph_uptime_s = 0; int64_t time_since_reset_s = 0; std::chrono::time_point now; - if (ctx->perf.njobs_reset > 1) { - mean = ctx->perf.mean; - min = ctx->perf.min; + if (ctx->perf.succeeded.njobs_reset > 1) { + avg = ctx->perf.succeeded.avg; + min = ctx->perf.succeeded.min; // Welford's online algorithm - variance = ctx->perf.M2 / (double)ctx->perf.njobs_reset; + variance = ctx->perf.succeeded.M2 + / (double)ctx->perf.succeeded.njobs_reset; } - if ( !(o = json_object ())) { + if (ctx->perf.failed.njobs_reset > 1) { + avg_failed = ctx->perf.failed.avg; + min_failed = ctx->perf.failed.min; + // Welford's online algorithm + variance_failed = ctx->perf.failed.M2 + / (double)ctx->perf.failed.njobs_reset; + } + if ( !(o = json_object ()) || !(match_succeeded = json_object ()) + || !(match_failed = json_object ())) { errno = ENOMEM; goto error; } @@ -2198,32 +2223,57 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s: get_stat_by_rank", __FUNCTION__); goto error_free; } + + if (!(match_succeeded = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}", + "njobs", ctx->perf.succeeded.njobs, + "njobs-reset", ctx->perf.succeeded.njobs_reset, + "stats", + "min", min, + "max", ctx->perf.succeeded.max, + "avg", avg, + "variance", variance))) { + errno = ENOMEM; + goto error_free; + } + if (!(match_failed = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}", + "njobs", ctx->perf.failed.njobs, + "njobs-reset", ctx->perf.failed.njobs_reset, + "stats", + "min", min_failed, + "max", ctx->perf.failed.max, + "avg", avg_failed, + "variance", variance_failed))) { + errno = ENOMEM; + goto error_free; + } now = std::chrono::system_clock::now (); graph_uptime_s = std::chrono::duration_cast ( now - ctx->perf.graph_uptime).count (); time_since_reset_s = std::chrono::duration_cast ( now - ctx->perf.time_since_reset).count (); - if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:I s:I s:f s:f s:f s:f}", - "V", num_vertices (ctx->db->resource_graph), - "E", num_edges (ctx->db->resource_graph), - "by_rank", o, - "load-time", ctx->perf.load, - "graph-uptime", graph_uptime_s, - "time-since-reset", time_since_reset_s, - "njobs", ctx->perf.njobs, - "njobs-reset", ctx->perf.njobs_reset, - "min-match", min, - "max-match", ctx->perf.max, - "avg-match", mean, - "match-variance", variance) < 0) { + + if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:{s:O s:O}}", + "V", num_vertices (ctx->db->resource_graph), + "E", num_edges (ctx->db->resource_graph), + "by_rank", o, + "load-time", ctx->perf.load, + "graph-uptime", graph_uptime_s, + "time-since-reset", time_since_reset_s, + "match", + "succeeded", match_succeeded, + "failed", match_failed) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); } + json_decref (match_succeeded); + json_decref (match_failed); return; error_free: saved_errno = errno; json_decref (o); + json_decref (match_succeeded); + json_decref (match_failed); errno = saved_errno; error: if (flux_respond_error (h, msg, errno, NULL) < 0) @@ -2236,12 +2286,17 @@ static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w, std::shared_ptr ctx = getctx ((flux_t *)arg); // Clear the jobs-related stats and reset time - ctx->perf.njobs_reset = 0; - ctx->perf.min = std::numeric_limits::max (); - ctx->perf.max = 0.0f; - ctx->perf.mean = 0.0f; - ctx->perf.M2 = 0.0f; - ctx->perf.time_since_reset = std::chrono::system_clock::now (); + ctx->perf.succeeded.njobs_reset = 0; + ctx->perf.succeeded.min = std::numeric_limits::max (); + ctx->perf.succeeded.max = 0.0; + ctx->perf.succeeded.avg = 0.0; + ctx->perf.succeeded.M2 = 0.0; + // Failed match stats + ctx->perf.failed.njobs_reset = 0; + ctx->perf.failed.min = std::numeric_limits::max (); + ctx->perf.failed.max = 0.0; + ctx->perf.failed.avg = 0.0; + ctx->perf.failed.M2 = 0.0; if (flux_respond (h, msg, NULL) < 0) flux_log_error (h, "%s: flux_respond", __FUNCTION__);