diff --git a/resource/modules/resource_match.cpp b/resource/modules/resource_match.cpp index fb4606239..7f5a16fd0 100644 --- a/resource/modules/resource_match.cpp +++ b/resource/modules/resource_match.cpp @@ -42,17 +42,37 @@ using namespace Flux::opts_manager; ******************************************************************************/ struct match_perf_t { - double load; /* Graph load time */ - uint64_t njobs; /* Total match count */ - uint64_t njobs_reset; /* Jobs since reset match count */ + double load = 0.0; /* Graph load time */ /* Graph uptime in seconds */ - std::chrono::time_point graph_uptime; + std::chrono::time_point graph_uptime + = std::chrono::system_clock::now (); /* Time since stats were last cleared */ - std::chrono::time_point time_since_reset; - double min; /* Min match time */ - double max; /* Max match time */ - double mean; /* Mean match time */ - double M2; /* Welford's algorithm */ + std::chrono::time_point time_since_reset + = std::chrono::system_clock::now (); + struct time_stats { + void update_stats (double elapsed) { + /* Update using Welford's algorithm */ + double delta = 0.0; + double delta2 = 0.0; + njobs++; + njobs_reset++; + min = (min > elapsed)? elapsed : min; + max = (max < elapsed)? elapsed : max; + // Welford's online algorithm for variance + delta = elapsed - avg; + avg += delta / (double)njobs; + delta2 = elapsed - avg; + M2 += delta * delta2; + } + uint64_t njobs = 0; /* Total match count */ + uint64_t njobs_reset = 0; /* Jobs since reset match count */ + double min = std::numeric_limits::max (); /* Min match time */ + double max = 0.0; /* Max match time */ + double avg = 0.0; /* Average match time */ + double M2 = 0.0; /* Welford's algorithm */ + }; + time_stats succeeded; + time_stats failed; }; class msg_wrap_t { @@ -353,15 +373,6 @@ static std::shared_ptr getctx (flux_t *h) ctx->h = h; ctx->handlers = NULL; set_default_args (ctx); - ctx->perf.load = 0.0f; - ctx->perf.njobs = 0; - ctx->perf.njobs_reset = 0; - ctx->perf.min = std::numeric_limits::max (); - ctx->perf.max = 0.0f; - ctx->perf.mean = 0.0f; - ctx->perf.M2 = 0.0f; - ctx->perf.graph_uptime = std::chrono::system_clock::now (); - ctx->perf.time_since_reset = std::chrono::system_clock::now (); ctx->matcher = nullptr; /* Cannot be allocated at this point */ ctx->fgraph = nullptr; /* Cannot be allocated at this point */ ctx->writers = nullptr; /* Cannot be allocated at this point */ @@ -1514,19 +1525,14 @@ static int init_resource_graph (std::shared_ptr &ctx) ******************************************************************************/ static void update_match_perf (std::shared_ptr &ctx, - double elapse) + double elapsed, bool match_success) { double delta = 0.0f; double delta2 = 0.2f; - ctx->perf.njobs++; - ctx->perf.njobs_reset++; - ctx->perf.min = (ctx->perf.min > elapse)? elapse : ctx->perf.min; - ctx->perf.max = (ctx->perf.max < elapse)? elapse : ctx->perf.max; - // Welford's online algorithm for variance - delta = elapse - ctx->perf.mean; - ctx->perf.mean += delta / (double)ctx->perf.njobs; - delta2 = elapse - ctx->perf.mean; - ctx->perf.M2 += delta * delta2; + if (match_success) + ctx->perf.succeeded.update_stats (elapsed); + else + ctx->perf.failed.update_stats (elapsed); } static inline std::string get_status_string (int64_t now, int64_t at) @@ -1776,6 +1782,9 @@ static int run_match (std::shared_ptr &ctx, int64_t jobid, (start.time_since_epoch ()); *at = *now = epoch.count (); if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) { + elapsed = std::chrono::system_clock::now () - start; + *ov = elapsed.count (); + update_match_perf (ctx, *ov, false); goto done; } if ( (rc = ctx->writers->emit (o)) < 0) { @@ -1786,7 +1795,7 @@ static int run_match (std::shared_ptr &ctx, int64_t jobid, rsv = (*now != *at)? true : false; elapsed = std::chrono::system_clock::now () - start; *ov = elapsed.count (); - update_match_perf (ctx, *ov); + update_match_perf (ctx, *ov, true); if (cmd != std::string ("satisfiability")) { if ( (rc = track_schedule_info (ctx, jobid, @@ -1818,6 +1827,9 @@ static int run_update (std::shared_ptr &ctx, int64_t jobid, goto done; } if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) { + elapsed = std::chrono::system_clock::now () - start; + ov = elapsed.count (); + update_match_perf (ctx, ov, false); flux_log_error (ctx->h, "%s: run", __FUNCTION__); goto done; } @@ -1827,7 +1839,7 @@ static int run_update (std::shared_ptr &ctx, int64_t jobid, } elapsed = std::chrono::system_clock::now () - start; ov = elapsed.count (); - update_match_perf (ctx, ov); + update_match_perf (ctx, ov, true); if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) { flux_log_error (ctx->h, "%s: can't add job info (id=%jd)", __FUNCTION__, (intmax_t)jobid); @@ -2177,20 +2189,35 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, std::shared_ptr ctx = getctx ((flux_t *)arg); int saved_errno; json_t *o = nullptr; - double mean = 0.0f; + json_t *match_succeeded = nullptr; + json_t *match_failed = nullptr; + double avg = 0.0f; double min = 0.0f; double variance = 0.0f; + // Failed match stats + double avg_failed = 0.0f; + double min_failed = 0.0f; + double variance_failed = 0.0f; int64_t graph_uptime_s = 0; int64_t time_since_reset_s = 0; std::chrono::time_point now; - if (ctx->perf.njobs_reset > 1) { - mean = ctx->perf.mean; - min = ctx->perf.min; + if (ctx->perf.succeeded.njobs_reset > 1) { + avg = ctx->perf.succeeded.avg; + min = ctx->perf.succeeded.min; // Welford's online algorithm - variance = ctx->perf.M2 / (double)ctx->perf.njobs_reset; + variance = ctx->perf.succeeded.M2 + / (double)ctx->perf.succeeded.njobs_reset; } - if ( !(o = json_object ())) { + if (ctx->perf.failed.njobs_reset > 1) { + avg_failed = ctx->perf.failed.avg; + min_failed = ctx->perf.failed.min; + // Welford's online algorithm + variance_failed = ctx->perf.failed.M2 + / (double)ctx->perf.failed.njobs_reset; + } + if ( !(o = json_object ()) || !(match_succeeded = json_object ()) + || !(match_failed = json_object ())) { errno = ENOMEM; goto error; } @@ -2198,32 +2225,57 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s: get_stat_by_rank", __FUNCTION__); goto error_free; } + + if (!(match_succeeded = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}", + "njobs", ctx->perf.succeeded.njobs, + "njobs-reset", ctx->perf.succeeded.njobs_reset, + "stats", + "min", min, + "max", ctx->perf.succeeded.max, + "avg", avg, + "variance", variance))) { + errno = ENOMEM; + goto error_free; + } + if (!(match_failed = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}", + "njobs", ctx->perf.failed.njobs, + "njobs-reset", ctx->perf.failed.njobs_reset, + "stats", + "min", min_failed, + "max", ctx->perf.failed.max, + "avg", avg_failed, + "variance", variance_failed))) { + errno = ENOMEM; + goto error_free; + } now = std::chrono::system_clock::now (); graph_uptime_s = std::chrono::duration_cast ( now - ctx->perf.graph_uptime).count (); time_since_reset_s = std::chrono::duration_cast ( now - ctx->perf.time_since_reset).count (); - if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:I s:I s:f s:f s:f s:f}", - "V", num_vertices (ctx->db->resource_graph), - "E", num_edges (ctx->db->resource_graph), - "by_rank", o, - "load-time", ctx->perf.load, - "graph-uptime", graph_uptime_s, - "time-since-reset", time_since_reset_s, - "njobs", ctx->perf.njobs, - "njobs-reset", ctx->perf.njobs_reset, - "min-match", min, - "max-match", ctx->perf.max, - "avg-match", mean, - "match-variance", variance) < 0) { + + if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:{s:O s:O}}", + "V", num_vertices (ctx->db->resource_graph), + "E", num_edges (ctx->db->resource_graph), + "by_rank", o, + "load-time", ctx->perf.load, + "graph-uptime", graph_uptime_s, + "time-since-reset", time_since_reset_s, + "match", + "succeeded", match_succeeded, + "failed", match_failed) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); } + json_decref (match_succeeded); + json_decref (match_failed); return; error_free: saved_errno = errno; json_decref (o); + json_decref (match_succeeded); + json_decref (match_failed); errno = saved_errno; error: if (flux_respond_error (h, msg, errno, NULL) < 0) @@ -2236,12 +2288,17 @@ static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w, std::shared_ptr ctx = getctx ((flux_t *)arg); // Clear the jobs-related stats and reset time - ctx->perf.njobs_reset = 0; - ctx->perf.min = std::numeric_limits::max (); - ctx->perf.max = 0.0f; - ctx->perf.mean = 0.0f; - ctx->perf.M2 = 0.0f; - ctx->perf.time_since_reset = std::chrono::system_clock::now (); + ctx->perf.succeeded.njobs_reset = 0; + ctx->perf.succeeded.min = std::numeric_limits::max (); + ctx->perf.succeeded.max = 0.0; + ctx->perf.succeeded.avg = 0.0; + ctx->perf.succeeded.M2 = 0.0; + // Failed match stats + ctx->perf.failed.njobs_reset = 0; + ctx->perf.failed.min = std::numeric_limits::max (); + ctx->perf.failed.max = 0.0; + ctx->perf.failed.avg = 0.0; + ctx->perf.failed.M2 = 0.0; if (flux_respond (h, msg, NULL) < 0) flux_log_error (h, "%s: flux_respond", __FUNCTION__);