From c72e7efd6f680dd04a999a1749c89cacd7676e93 Mon Sep 17 00:00:00 2001 From: Daniel Milroy Date: Thu, 25 Apr 2024 19:34:50 -0700 Subject: [PATCH 1/3] resource module: update stats to include failed matches Problem: failed matches for job specifications with unsatisfiable constraints have been observed restricting the rate at which valid requests are scheduled. The resource module does not currently track stats on failed matches. Add tracking and reporting of stats on failed matches. --- resource/modules/resource_match.cpp | 169 ++++++++++++++++++---------- 1 file changed, 112 insertions(+), 57 deletions(-) diff --git a/resource/modules/resource_match.cpp b/resource/modules/resource_match.cpp index fb4606239..1954ddf19 100644 --- a/resource/modules/resource_match.cpp +++ b/resource/modules/resource_match.cpp @@ -42,17 +42,37 @@ using namespace Flux::opts_manager; ******************************************************************************/ struct match_perf_t { - double load; /* Graph load time */ - uint64_t njobs; /* Total match count */ - uint64_t njobs_reset; /* Jobs since reset match count */ + double load = 0.0; /* Graph load time */ /* Graph uptime in seconds */ - std::chrono::time_point graph_uptime; + std::chrono::time_point graph_uptime + = std::chrono::system_clock::now (); /* Time since stats were last cleared */ - std::chrono::time_point time_since_reset; - double min; /* Min match time */ - double max; /* Max match time */ - double mean; /* Mean match time */ - double M2; /* Welford's algorithm */ + std::chrono::time_point time_since_reset + = std::chrono::system_clock::now (); + struct time_stats { + void update_stats (double elapsed) { + /* Update using Welford's algorithm */ + double delta = 0.0; + double delta2 = 0.0; + njobs++; + njobs_reset++; + min = (min > elapsed)? elapsed : min; + max = (max < elapsed)? elapsed : max; + // Welford's online algorithm for variance + delta = elapsed - avg; + avg += delta / (double)njobs; + delta2 = elapsed - avg; + M2 += delta * delta2; + } + uint64_t njobs = 0; /* Total match count */ + uint64_t njobs_reset = 0; /* Jobs since match count reset */ + double min = std::numeric_limits::max (); /* Min match time */ + double max = 0.0; /* Max match time */ + double avg = 0.0; /* Average match time */ + double M2 = 0.0; /* Welford's algorithm */ + }; + time_stats succeeded; + time_stats failed; }; class msg_wrap_t { @@ -353,15 +373,6 @@ static std::shared_ptr getctx (flux_t *h) ctx->h = h; ctx->handlers = NULL; set_default_args (ctx); - ctx->perf.load = 0.0f; - ctx->perf.njobs = 0; - ctx->perf.njobs_reset = 0; - ctx->perf.min = std::numeric_limits::max (); - ctx->perf.max = 0.0f; - ctx->perf.mean = 0.0f; - ctx->perf.M2 = 0.0f; - ctx->perf.graph_uptime = std::chrono::system_clock::now (); - ctx->perf.time_since_reset = std::chrono::system_clock::now (); ctx->matcher = nullptr; /* Cannot be allocated at this point */ ctx->fgraph = nullptr; /* Cannot be allocated at this point */ ctx->writers = nullptr; /* Cannot be allocated at this point */ @@ -1514,19 +1525,12 @@ static int init_resource_graph (std::shared_ptr &ctx) ******************************************************************************/ static void update_match_perf (std::shared_ptr &ctx, - double elapse) + double elapsed, bool match_success) { - double delta = 0.0f; - double delta2 = 0.2f; - ctx->perf.njobs++; - ctx->perf.njobs_reset++; - ctx->perf.min = (ctx->perf.min > elapse)? elapse : ctx->perf.min; - ctx->perf.max = (ctx->perf.max < elapse)? elapse : ctx->perf.max; - // Welford's online algorithm for variance - delta = elapse - ctx->perf.mean; - ctx->perf.mean += delta / (double)ctx->perf.njobs; - delta2 = elapse - ctx->perf.mean; - ctx->perf.M2 += delta * delta2; + if (match_success) + ctx->perf.succeeded.update_stats (elapsed); + else + ctx->perf.failed.update_stats (elapsed); } static inline std::string get_status_string (int64_t now, int64_t at) @@ -1776,6 +1780,9 @@ static int run_match (std::shared_ptr &ctx, int64_t jobid, (start.time_since_epoch ()); *at = *now = epoch.count (); if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) { + elapsed = std::chrono::system_clock::now () - start; + *ov = elapsed.count (); + update_match_perf (ctx, *ov, false); goto done; } if ( (rc = ctx->writers->emit (o)) < 0) { @@ -1786,7 +1793,7 @@ static int run_match (std::shared_ptr &ctx, int64_t jobid, rsv = (*now != *at)? true : false; elapsed = std::chrono::system_clock::now () - start; *ov = elapsed.count (); - update_match_perf (ctx, *ov); + update_match_perf (ctx, *ov, true); if (cmd != std::string ("satisfiability")) { if ( (rc = track_schedule_info (ctx, jobid, @@ -1818,6 +1825,9 @@ static int run_update (std::shared_ptr &ctx, int64_t jobid, goto done; } if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) { + elapsed = std::chrono::system_clock::now () - start; + ov = elapsed.count (); + update_match_perf (ctx, ov, false); flux_log_error (ctx->h, "%s: run", __FUNCTION__); goto done; } @@ -1827,7 +1837,7 @@ static int run_update (std::shared_ptr &ctx, int64_t jobid, } elapsed = std::chrono::system_clock::now () - start; ov = elapsed.count (); - update_match_perf (ctx, ov); + update_match_perf (ctx, ov, true); if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) { flux_log_error (ctx->h, "%s: can't add job info (id=%jd)", __FUNCTION__, (intmax_t)jobid); @@ -2177,20 +2187,35 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, std::shared_ptr ctx = getctx ((flux_t *)arg); int saved_errno; json_t *o = nullptr; - double mean = 0.0f; + json_t *match_succeeded = nullptr; + json_t *match_failed = nullptr; + double avg = 0.0f; double min = 0.0f; double variance = 0.0f; + // Failed match stats + double avg_failed = 0.0f; + double min_failed = 0.0f; + double variance_failed = 0.0f; int64_t graph_uptime_s = 0; int64_t time_since_reset_s = 0; std::chrono::time_point now; - if (ctx->perf.njobs_reset > 1) { - mean = ctx->perf.mean; - min = ctx->perf.min; + if (ctx->perf.succeeded.njobs_reset > 1) { + avg = ctx->perf.succeeded.avg; + min = ctx->perf.succeeded.min; // Welford's online algorithm - variance = ctx->perf.M2 / (double)ctx->perf.njobs_reset; + variance = ctx->perf.succeeded.M2 + / (double)ctx->perf.succeeded.njobs_reset; } - if ( !(o = json_object ())) { + if (ctx->perf.failed.njobs_reset > 1) { + avg_failed = ctx->perf.failed.avg; + min_failed = ctx->perf.failed.min; + // Welford's online algorithm + variance_failed = ctx->perf.failed.M2 + / (double)ctx->perf.failed.njobs_reset; + } + if ( !(o = json_object ()) || !(match_succeeded = json_object ()) + || !(match_failed = json_object ())) { errno = ENOMEM; goto error; } @@ -2198,32 +2223,57 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s: get_stat_by_rank", __FUNCTION__); goto error_free; } + + if (!(match_succeeded = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}", + "njobs", ctx->perf.succeeded.njobs, + "njobs-reset", ctx->perf.succeeded.njobs_reset, + "stats", + "min", min, + "max", ctx->perf.succeeded.max, + "avg", avg, + "variance", variance))) { + errno = ENOMEM; + goto error_free; + } + if (!(match_failed = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}", + "njobs", ctx->perf.failed.njobs, + "njobs-reset", ctx->perf.failed.njobs_reset, + "stats", + "min", min_failed, + "max", ctx->perf.failed.max, + "avg", avg_failed, + "variance", variance_failed))) { + errno = ENOMEM; + goto error_free; + } now = std::chrono::system_clock::now (); graph_uptime_s = std::chrono::duration_cast ( now - ctx->perf.graph_uptime).count (); time_since_reset_s = std::chrono::duration_cast ( now - ctx->perf.time_since_reset).count (); - if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:I s:I s:f s:f s:f s:f}", - "V", num_vertices (ctx->db->resource_graph), - "E", num_edges (ctx->db->resource_graph), - "by_rank", o, - "load-time", ctx->perf.load, - "graph-uptime", graph_uptime_s, - "time-since-reset", time_since_reset_s, - "njobs", ctx->perf.njobs, - "njobs-reset", ctx->perf.njobs_reset, - "min-match", min, - "max-match", ctx->perf.max, - "avg-match", mean, - "match-variance", variance) < 0) { + + if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:{s:O s:O}}", + "V", num_vertices (ctx->db->resource_graph), + "E", num_edges (ctx->db->resource_graph), + "by_rank", o, + "load-time", ctx->perf.load, + "graph-uptime", graph_uptime_s, + "time-since-reset", time_since_reset_s, + "match", + "succeeded", match_succeeded, + "failed", match_failed) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); } + json_decref (match_succeeded); + json_decref (match_failed); return; error_free: saved_errno = errno; json_decref (o); + json_decref (match_succeeded); + json_decref (match_failed); errno = saved_errno; error: if (flux_respond_error (h, msg, errno, NULL) < 0) @@ -2236,12 +2286,17 @@ static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w, std::shared_ptr ctx = getctx ((flux_t *)arg); // Clear the jobs-related stats and reset time - ctx->perf.njobs_reset = 0; - ctx->perf.min = std::numeric_limits::max (); - ctx->perf.max = 0.0f; - ctx->perf.mean = 0.0f; - ctx->perf.M2 = 0.0f; - ctx->perf.time_since_reset = std::chrono::system_clock::now (); + ctx->perf.succeeded.njobs_reset = 0; + ctx->perf.succeeded.min = std::numeric_limits::max (); + ctx->perf.succeeded.max = 0.0; + ctx->perf.succeeded.avg = 0.0; + ctx->perf.succeeded.M2 = 0.0; + // Failed match stats + ctx->perf.failed.njobs_reset = 0; + ctx->perf.failed.min = std::numeric_limits::max (); + ctx->perf.failed.max = 0.0; + ctx->perf.failed.avg = 0.0; + ctx->perf.failed.M2 = 0.0; if (flux_respond (h, msg, NULL) < 0) flux_log_error (h, "%s: flux_respond", __FUNCTION__); From c7c3220511bf6b5bb1ba1f07c32ac7359ba2129d Mon Sep 17 00:00:00 2001 From: Daniel Milroy Date: Thu, 25 Apr 2024 19:35:13 -0700 Subject: [PATCH 2/3] flux-ion: update stats to include failed matches Problem: flux ion does not report the stats on failed matches collected and returned via RPC from the resource module. Add the ability to output the stats. --- src/cmd/flux-ion-resource.py | 62 ++++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/src/cmd/flux-ion-resource.py b/src/cmd/flux-ion-resource.py index 4dd1ee15e..ab482e340 100755 --- a/src/cmd/flux-ion-resource.py +++ b/src/cmd/flux-ion-resource.py @@ -224,12 +224,62 @@ def stat_action(_): print("Graph Load Time: ", resp["load-time"], "Secs") print("Graph Upime: ", resp["graph-uptime"], "Secs") print("Time Since Stats Reset: ", resp["time-since-reset"], "Secs") - print("Num. of Total Jobs Matched: ", resp["njobs"]) - print("Num. of Jobs Matched Since Reset: ", resp["njobs-reset"]) - print("Min. Match Time: ", resp["min-match"], "Secs") - print("Max. Match Time: ", resp["max-match"], "Secs") - print("Avg. Match Time: ", resp["avg-match"], "Secs") - print("Match Variance: ", resp["match-variance"], "Secs^2") + print( + "Num. of Total Jobs Successfully Matched: ", + resp["match"]["succeeded"]["njobs"], + ) + print( + "Num. of Jobs Successfully Matched Since Reset: ", + resp["match"]["succeeded"]["njobs-reset"], + ) + print( + "Min. Successful Match Time: ", + resp["match"]["succeeded"]["stats"]["min"], + "Secs", + ) + print( + "Max. Successful Match Time: ", + resp["match"]["succeeded"]["stats"]["max"], + "Secs", + ) + print( + "Avg. Successful Match Time: ", + resp["match"]["succeeded"]["stats"]["avg"], + "Secs", + ) + print( + "Successful Match Variance: ", + resp["match"]["succeeded"]["stats"]["variance"], + "Secs^2", + ) + print( + "Num. of Jobs with Failed Matches: ", + resp["match"]["failed"]["njobs"], + ) + print( + "Num. of Jobs with Failed Matches Since Reset: ", + resp["match"]["failed"]["njobs-reset"], + ) + print( + "Min. Match Time of Failed Matches: ", + resp["match"]["failed"]["stats"]["min"], + "Secs", + ) + print( + "Max. Match Time of Failed Matches: ", + resp["match"]["failed"]["stats"]["max"], + "Secs", + ) + print( + "Avg. Match Time of Failed Matches: ", + resp["match"]["failed"]["stats"]["avg"], + "Secs", + ) + print( + "Match Variance of Failed Matches: ", + resp["match"]["failed"]["stats"]["variance"], + "Secs^2", + ) def stats_clear_action(_): From 9013583247d4a44bcaba17ee25528cc80973578a Mon Sep 17 00:00:00 2001 From: Daniel Milroy Date: Wed, 1 May 2024 09:57:18 -0700 Subject: [PATCH 3/3] testsuite: update test to populate failed stats Problem: the current stats tests don't test populating failed stats. Submit two unsatisfiable jobs to generate failed job stats. --- t/t4000-match-params.t | 3 +++ 1 file changed, 3 insertions(+) diff --git a/t/t4000-match-params.t b/t/t4000-match-params.t index 6f31687e9..7d9cb9386 100755 --- a/t/t4000-match-params.t +++ b/t/t4000-match-params.t @@ -104,6 +104,9 @@ test_expect_success 'resource module stats and clear work' ' unload_resource && load_resource && load_qmanager_sync && + # Submit unsatisfiable jobs to populate failed match stats + test_must_fail flux run -N 100 -n 400 -t 10s sleep 10 && + test_must_fail flux run -N 100 -n 400 -t 10s sleep 10 && flux module stats sched-fluxion-resource && flux module stats --clear sched-fluxion-resource '