Skip to content

Commit

Permalink
resource module: update stats to include failed matches
Browse files Browse the repository at this point in the history
Problem: failed matches for job specifications with unsatisfiable
constraints have been observed restricting the rate at which valid
requests are scheduled. The resource module does not currently track
stats on failed matches.

Add tracking and reporting of stats on failed matches.
  • Loading branch information
milroy committed May 1, 2024
1 parent bb686df commit c72e7ef
Showing 1 changed file with 112 additions and 57 deletions.
169 changes: 112 additions & 57 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,37 @@ using namespace Flux::opts_manager;
******************************************************************************/

struct match_perf_t {
double load; /* Graph load time */
uint64_t njobs; /* Total match count */
uint64_t njobs_reset; /* Jobs since reset match count */
double load = 0.0; /* Graph load time */
/* Graph uptime in seconds */
std::chrono::time_point<std::chrono::system_clock> graph_uptime;
std::chrono::time_point<std::chrono::system_clock> graph_uptime
= std::chrono::system_clock::now ();
/* Time since stats were last cleared */
std::chrono::time_point<std::chrono::system_clock> time_since_reset;
double min; /* Min match time */
double max; /* Max match time */
double mean; /* Mean match time */
double M2; /* Welford's algorithm */
std::chrono::time_point<std::chrono::system_clock> time_since_reset
= std::chrono::system_clock::now ();
struct time_stats {
void update_stats (double elapsed) {
/* Update using Welford's algorithm */
double delta = 0.0;
double delta2 = 0.0;
njobs++;
njobs_reset++;
min = (min > elapsed)? elapsed : min;
max = (max < elapsed)? elapsed : max;
// Welford's online algorithm for variance
delta = elapsed - avg;
avg += delta / (double)njobs;
delta2 = elapsed - avg;
M2 += delta * delta2;
}
uint64_t njobs = 0; /* Total match count */
uint64_t njobs_reset = 0; /* Jobs since match count reset */
double min = std::numeric_limits<double>::max (); /* Min match time */
double max = 0.0; /* Max match time */
double avg = 0.0; /* Average match time */
double M2 = 0.0; /* Welford's algorithm */
};
time_stats succeeded;
time_stats failed;
};

class msg_wrap_t {
Expand Down Expand Up @@ -353,15 +373,6 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
ctx->h = h;
ctx->handlers = NULL;
set_default_args (ctx);
ctx->perf.load = 0.0f;
ctx->perf.njobs = 0;
ctx->perf.njobs_reset = 0;
ctx->perf.min = std::numeric_limits<double>::max ();
ctx->perf.max = 0.0f;
ctx->perf.mean = 0.0f;
ctx->perf.M2 = 0.0f;
ctx->perf.graph_uptime = std::chrono::system_clock::now ();
ctx->perf.time_since_reset = std::chrono::system_clock::now ();
ctx->matcher = nullptr; /* Cannot be allocated at this point */
ctx->fgraph = nullptr; /* Cannot be allocated at this point */
ctx->writers = nullptr; /* Cannot be allocated at this point */
Expand Down Expand Up @@ -1514,19 +1525,12 @@ static int init_resource_graph (std::shared_ptr<resource_ctx_t> &ctx)
******************************************************************************/

static void update_match_perf (std::shared_ptr<resource_ctx_t> &ctx,
double elapse)
double elapsed, bool match_success)
{
double delta = 0.0f;
double delta2 = 0.2f;
ctx->perf.njobs++;
ctx->perf.njobs_reset++;
ctx->perf.min = (ctx->perf.min > elapse)? elapse : ctx->perf.min;
ctx->perf.max = (ctx->perf.max < elapse)? elapse : ctx->perf.max;
// Welford's online algorithm for variance
delta = elapse - ctx->perf.mean;
ctx->perf.mean += delta / (double)ctx->perf.njobs;
delta2 = elapse - ctx->perf.mean;
ctx->perf.M2 += delta * delta2;
if (match_success)
ctx->perf.succeeded.update_stats (elapsed);
else
ctx->perf.failed.update_stats (elapsed);
}

static inline std::string get_status_string (int64_t now, int64_t at)
Expand Down Expand Up @@ -1776,6 +1780,9 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
(start.time_since_epoch ());
*at = *now = epoch.count ();
if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov, false);
goto done;
}
if ( (rc = ctx->writers->emit (o)) < 0) {
Expand All @@ -1786,7 +1793,7 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
rsv = (*now != *at)? true : false;
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov);
update_match_perf (ctx, *ov, true);

if (cmd != std::string ("satisfiability")) {
if ( (rc = track_schedule_info (ctx, jobid,
Expand Down Expand Up @@ -1818,6 +1825,9 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
goto done;
}
if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov, false);
flux_log_error (ctx->h, "%s: run", __FUNCTION__);
goto done;
}
Expand All @@ -1827,7 +1837,7 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
}
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov);
update_match_perf (ctx, ov, true);
if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) {
flux_log_error (ctx->h, "%s: can't add job info (id=%jd)",
__FUNCTION__, (intmax_t)jobid);
Expand Down Expand Up @@ -2177,53 +2187,93 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);
int saved_errno;
json_t *o = nullptr;
double mean = 0.0f;
json_t *match_succeeded = nullptr;
json_t *match_failed = nullptr;
double avg = 0.0f;
double min = 0.0f;
double variance = 0.0f;
// Failed match stats
double avg_failed = 0.0f;
double min_failed = 0.0f;
double variance_failed = 0.0f;
int64_t graph_uptime_s = 0;
int64_t time_since_reset_s = 0;
std::chrono::time_point<std::chrono::system_clock> now;

if (ctx->perf.njobs_reset > 1) {
mean = ctx->perf.mean;
min = ctx->perf.min;
if (ctx->perf.succeeded.njobs_reset > 1) {
avg = ctx->perf.succeeded.avg;
min = ctx->perf.succeeded.min;
// Welford's online algorithm
variance = ctx->perf.M2 / (double)ctx->perf.njobs_reset;
variance = ctx->perf.succeeded.M2
/ (double)ctx->perf.succeeded.njobs_reset;
}
if ( !(o = json_object ())) {
if (ctx->perf.failed.njobs_reset > 1) {
avg_failed = ctx->perf.failed.avg;
min_failed = ctx->perf.failed.min;
// Welford's online algorithm
variance_failed = ctx->perf.failed.M2
/ (double)ctx->perf.failed.njobs_reset;
}
if ( !(o = json_object ()) || !(match_succeeded = json_object ())
|| !(match_failed = json_object ())) {
errno = ENOMEM;
goto error;
}
if (get_stat_by_rank (ctx, o) < 0) {
flux_log_error (h, "%s: get_stat_by_rank", __FUNCTION__);
goto error_free;
}

if (!(match_succeeded = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}",
"njobs", ctx->perf.succeeded.njobs,
"njobs-reset", ctx->perf.succeeded.njobs_reset,
"stats",
"min", min,
"max", ctx->perf.succeeded.max,
"avg", avg,
"variance", variance))) {
errno = ENOMEM;
goto error_free;
}
if (!(match_failed = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}",
"njobs", ctx->perf.failed.njobs,
"njobs-reset", ctx->perf.failed.njobs_reset,
"stats",
"min", min_failed,
"max", ctx->perf.failed.max,
"avg", avg_failed,
"variance", variance_failed))) {
errno = ENOMEM;
goto error_free;
}
now = std::chrono::system_clock::now ();
graph_uptime_s = std::chrono::duration_cast<std::chrono::seconds> (
now - ctx->perf.graph_uptime).count ();
time_since_reset_s = std::chrono::duration_cast<std::chrono::seconds> (
now - ctx->perf.time_since_reset).count ();
if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:I s:I s:f s:f s:f s:f}",
"V", num_vertices (ctx->db->resource_graph),
"E", num_edges (ctx->db->resource_graph),
"by_rank", o,
"load-time", ctx->perf.load,
"graph-uptime", graph_uptime_s,
"time-since-reset", time_since_reset_s,
"njobs", ctx->perf.njobs,
"njobs-reset", ctx->perf.njobs_reset,
"min-match", min,
"max-match", ctx->perf.max,
"avg-match", mean,
"match-variance", variance) < 0) {

if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:{s:O s:O}}",
"V", num_vertices (ctx->db->resource_graph),
"E", num_edges (ctx->db->resource_graph),
"by_rank", o,
"load-time", ctx->perf.load,
"graph-uptime", graph_uptime_s,
"time-since-reset", time_since_reset_s,
"match",
"succeeded", match_succeeded,
"failed", match_failed) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
}
json_decref (match_succeeded);
json_decref (match_failed);

return;

error_free:
saved_errno = errno;
json_decref (o);
json_decref (match_succeeded);
json_decref (match_failed);
errno = saved_errno;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
Expand All @@ -2236,12 +2286,17 @@ static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w,
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);

// Clear the jobs-related stats and reset time
ctx->perf.njobs_reset = 0;
ctx->perf.min = std::numeric_limits<double>::max ();
ctx->perf.max = 0.0f;
ctx->perf.mean = 0.0f;
ctx->perf.M2 = 0.0f;
ctx->perf.time_since_reset = std::chrono::system_clock::now ();
ctx->perf.succeeded.njobs_reset = 0;
ctx->perf.succeeded.min = std::numeric_limits<double>::max ();
ctx->perf.succeeded.max = 0.0;
ctx->perf.succeeded.avg = 0.0;
ctx->perf.succeeded.M2 = 0.0;
// Failed match stats
ctx->perf.failed.njobs_reset = 0;
ctx->perf.failed.min = std::numeric_limits<double>::max ();
ctx->perf.failed.max = 0.0;
ctx->perf.failed.avg = 0.0;
ctx->perf.failed.M2 = 0.0;

if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
Expand Down

0 comments on commit c72e7ef

Please sign in to comment.