Skip to content

Commit

Permalink
resource module: update stats to include failed matches
Browse files Browse the repository at this point in the history
Problem: failed matches for job specifications with unsatisfiable
constraints have been observed restricting the rate at which valid
requests are scheduled. The resource module does not currently track
stats on failed matches.

Add tracking and reporting of stats on failed matches.
  • Loading branch information
milroy committed Apr 27, 2024
1 parent 143020d commit 0feeb11
Showing 1 changed file with 97 additions and 27 deletions.
124 changes: 97 additions & 27 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ struct match_perf_t {
double max; /* Max match time */
double mean; /* Mean match time */
double M2; /* Welford's algorithm */
// Track failed matches
uint64_t njobs_failed;
uint64_t njobs_reset_failed;
double min_failed;
double max_failed;
double mean_failed;
double M2_failed;
};

class msg_wrap_t {
Expand Down Expand Up @@ -360,6 +367,13 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
ctx->perf.max = 0.0f;
ctx->perf.mean = 0.0f;
ctx->perf.M2 = 0.0f;
// Track failed matches
ctx->perf.njobs_failed = 0;
ctx->perf.njobs_reset_failed = 0;
ctx->perf.min_failed = std::numeric_limits<double>::max ();
ctx->perf.max_failed = 0.0f;
ctx->perf.mean_failed = 0.0f;
ctx->perf.M2_failed = 0.0f;
ctx->perf.graph_uptime = std::chrono::system_clock::now ();
ctx->perf.time_since_reset = std::chrono::system_clock::now ();
ctx->matcher = nullptr; /* Cannot be allocated at this point */
Expand Down Expand Up @@ -1514,19 +1528,31 @@ static int init_resource_graph (std::shared_ptr<resource_ctx_t> &ctx)
******************************************************************************/

static void update_match_perf (std::shared_ptr<resource_ctx_t> &ctx,
double elapse)
double elapse, bool match_success)
{
double delta = 0.0f;
double delta2 = 0.2f;
ctx->perf.njobs++;
ctx->perf.njobs_reset++;
ctx->perf.min = (ctx->perf.min > elapse)? elapse : ctx->perf.min;
ctx->perf.max = (ctx->perf.max < elapse)? elapse : ctx->perf.max;
// Welford's online algorithm for variance
delta = elapse - ctx->perf.mean;
ctx->perf.mean += delta / (double)ctx->perf.njobs;
delta2 = elapse - ctx->perf.mean;
ctx->perf.M2 += delta * delta2;
if (match_success) {
ctx->perf.njobs++;
ctx->perf.njobs_reset++;
ctx->perf.min = (ctx->perf.min > elapse)? elapse : ctx->perf.min;
ctx->perf.max = (ctx->perf.max < elapse)? elapse : ctx->perf.max;
// Welford's online algorithm for variance
delta = elapse - ctx->perf.mean;
ctx->perf.mean += delta / (double)ctx->perf.njobs;
delta2 = elapse - ctx->perf.mean;
ctx->perf.M2 += delta * delta2;
} else {
ctx->perf.njobs_failed++;
ctx->perf.njobs_reset_failed++;
ctx->perf.min_failed = (ctx->perf.min_failed > elapse)? elapse : ctx->perf.min_failed;
ctx->perf.max_failed = (ctx->perf.max_failed < elapse)? elapse : ctx->perf.max_failed;
// Welford's online algorithm for variance
delta = elapse - ctx->perf.mean_failed;
ctx->perf.mean_failed += delta / (double)ctx->perf.njobs_failed;
delta2 = elapse - ctx->perf.mean_failed;
ctx->perf.M2_failed += delta * delta2;
}
}

static inline std::string get_status_string (int64_t now, int64_t at)
Expand Down Expand Up @@ -1776,6 +1802,9 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
(start.time_since_epoch ());
*at = *now = epoch.count ();
if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov, false);
goto done;
}
if ( (rc = ctx->writers->emit (o)) < 0) {
Expand All @@ -1786,7 +1815,7 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
rsv = (*now != *at)? true : false;
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov);
update_match_perf (ctx, *ov, true);

if (cmd != std::string ("satisfiability")) {
if ( (rc = track_schedule_info (ctx, jobid,
Expand Down Expand Up @@ -1818,6 +1847,9 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
goto done;
}
if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov, false);
flux_log_error (ctx->h, "%s: run", __FUNCTION__);
goto done;
}
Expand All @@ -1827,7 +1859,7 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
}
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov);
update_match_perf (ctx, ov, true);
if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) {
flux_log_error (ctx->h, "%s: can't add job info (id=%jd)",
__FUNCTION__, (intmax_t)jobid);
Expand Down Expand Up @@ -2177,9 +2209,15 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);
int saved_errno;
json_t *o = nullptr;
json_t *match_succeeded = nullptr;
json_t *match_failed = nullptr;
double mean = 0.0f;
double min = 0.0f;
double variance = 0.0f;
// Failed match stats
double mean_failed = 0.0f;
double min_failed = 0.0f;
double variance_failed = 0.0f;
int64_t graph_uptime_s = 0;
int64_t time_since_reset_s = 0;
std::chrono::time_point<std::chrono::system_clock> now;
Expand All @@ -2190,7 +2228,15 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
// Welford's online algorithm
variance = ctx->perf.M2 / (double)ctx->perf.njobs_reset;
}
if ( !(o = json_object ())) {
if (ctx->perf.njobs_reset_failed > 1) {
mean_failed = ctx->perf.mean_failed;
min_failed = ctx->perf.min_failed;
// Welford's online algorithm
variance_failed = ctx->perf.M2_failed
/ (double)ctx->perf.njobs_reset_failed;
}
if ( !(o = json_object ()) || !(match_succeeded = json_object ())
|| !(match_failed = json_object ())) {
errno = ENOMEM;
goto error;
}
Expand All @@ -2203,19 +2249,38 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
now - ctx->perf.graph_uptime).count ();
time_since_reset_s = std::chrono::duration_cast<std::chrono::seconds> (
now - ctx->perf.time_since_reset).count ();
if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:I s:I s:f s:f s:f s:f}",
"V", num_vertices (ctx->db->resource_graph),
"E", num_edges (ctx->db->resource_graph),
"by_rank", o,
"load-time", ctx->perf.load,
"graph-uptime", graph_uptime_s,
"time-since-reset", time_since_reset_s,
"njobs", ctx->perf.njobs,
"njobs-reset", ctx->perf.njobs_reset,
"min-match", min,
"max-match", ctx->perf.max,
"avg-match", mean,
"match-variance", variance) < 0) {

if (!(match_succeeded = json_pack ("{s:I s:I s:f s:f s:f s:f}",
"njobs", ctx->perf.njobs,
"njobs-reset", ctx->perf.njobs_reset,
"min-match", min,
"max-match", ctx->perf.max,
"avg-match", mean,
"match-variance", variance))) {
errno = ENOMEM;
goto error;
}
if (!(match_failed = json_pack ("{s:I s:I s:f s:f s:f s:f}",
"njobs-failed", ctx->perf.njobs_failed,
"njobs-reset-failed", ctx->perf.njobs_reset_failed,
"min-match-failed", min_failed,
"max-match-failed", ctx->perf.max_failed,
"avg-match-failed", mean_failed,
"match-variance-failed", variance_failed))) {
errno = ENOMEM;
goto error;
}

if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:{s:o s:o}}",
"V", num_vertices (ctx->db->resource_graph),
"E", num_edges (ctx->db->resource_graph),
"by_rank", o,
"load-time", ctx->perf.load,
"graph-uptime", graph_uptime_s,
"time-since-reset", time_since_reset_s,
"match",
"succeeded", match_succeeded,
"failed", match_failed) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
}

Expand All @@ -2241,7 +2306,12 @@ static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w,
ctx->perf.max = 0.0f;
ctx->perf.mean = 0.0f;
ctx->perf.M2 = 0.0f;
ctx->perf.time_since_reset = std::chrono::system_clock::now ();
// Failed match stats
ctx->perf.njobs_reset_failed = 0;
ctx->perf.min_failed = std::numeric_limits<double>::max ();
ctx->perf.max_failed = 0.0f;
ctx->perf.mean_failed = 0.0f;
ctx->perf.M2_failed = 0.0f;

if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
Expand Down

0 comments on commit 0feeb11

Please sign in to comment.