Skip to content

Commit

Permalink
Merge pull request #1187 from milroy/stats-update
Browse files Browse the repository at this point in the history
Update stats output to include data for failed matches
  • Loading branch information
mergify[bot] authored May 1, 2024
2 parents bb686df + 9013583 commit 7a45c25
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 63 deletions.
169 changes: 112 additions & 57 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,37 @@ using namespace Flux::opts_manager;
******************************************************************************/

struct match_perf_t {
double load; /* Graph load time */
uint64_t njobs; /* Total match count */
uint64_t njobs_reset; /* Jobs since reset match count */
double load = 0.0; /* Graph load time */
/* Graph uptime in seconds */
std::chrono::time_point<std::chrono::system_clock> graph_uptime;
std::chrono::time_point<std::chrono::system_clock> graph_uptime
= std::chrono::system_clock::now ();
/* Time since stats were last cleared */
std::chrono::time_point<std::chrono::system_clock> time_since_reset;
double min; /* Min match time */
double max; /* Max match time */
double mean; /* Mean match time */
double M2; /* Welford's algorithm */
std::chrono::time_point<std::chrono::system_clock> time_since_reset
= std::chrono::system_clock::now ();
struct time_stats {
void update_stats (double elapsed) {
/* Update using Welford's algorithm */
double delta = 0.0;
double delta2 = 0.0;
njobs++;
njobs_reset++;
min = (min > elapsed)? elapsed : min;
max = (max < elapsed)? elapsed : max;
// Welford's online algorithm for variance
delta = elapsed - avg;
avg += delta / (double)njobs;
delta2 = elapsed - avg;
M2 += delta * delta2;
}
uint64_t njobs = 0; /* Total match count */
uint64_t njobs_reset = 0; /* Jobs since match count reset */
double min = std::numeric_limits<double>::max (); /* Min match time */
double max = 0.0; /* Max match time */
double avg = 0.0; /* Average match time */
double M2 = 0.0; /* Welford's algorithm */
};
time_stats succeeded;
time_stats failed;
};

class msg_wrap_t {
Expand Down Expand Up @@ -353,15 +373,6 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
ctx->h = h;
ctx->handlers = NULL;
set_default_args (ctx);
ctx->perf.load = 0.0f;
ctx->perf.njobs = 0;
ctx->perf.njobs_reset = 0;
ctx->perf.min = std::numeric_limits<double>::max ();
ctx->perf.max = 0.0f;
ctx->perf.mean = 0.0f;
ctx->perf.M2 = 0.0f;
ctx->perf.graph_uptime = std::chrono::system_clock::now ();
ctx->perf.time_since_reset = std::chrono::system_clock::now ();
ctx->matcher = nullptr; /* Cannot be allocated at this point */
ctx->fgraph = nullptr; /* Cannot be allocated at this point */
ctx->writers = nullptr; /* Cannot be allocated at this point */
Expand Down Expand Up @@ -1514,19 +1525,12 @@ static int init_resource_graph (std::shared_ptr<resource_ctx_t> &ctx)
******************************************************************************/

static void update_match_perf (std::shared_ptr<resource_ctx_t> &ctx,
double elapse)
double elapsed, bool match_success)
{
double delta = 0.0f;
double delta2 = 0.2f;
ctx->perf.njobs++;
ctx->perf.njobs_reset++;
ctx->perf.min = (ctx->perf.min > elapse)? elapse : ctx->perf.min;
ctx->perf.max = (ctx->perf.max < elapse)? elapse : ctx->perf.max;
// Welford's online algorithm for variance
delta = elapse - ctx->perf.mean;
ctx->perf.mean += delta / (double)ctx->perf.njobs;
delta2 = elapse - ctx->perf.mean;
ctx->perf.M2 += delta * delta2;
if (match_success)
ctx->perf.succeeded.update_stats (elapsed);
else
ctx->perf.failed.update_stats (elapsed);
}

static inline std::string get_status_string (int64_t now, int64_t at)
Expand Down Expand Up @@ -1776,6 +1780,9 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
(start.time_since_epoch ());
*at = *now = epoch.count ();
if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov, false);
goto done;
}
if ( (rc = ctx->writers->emit (o)) < 0) {
Expand All @@ -1786,7 +1793,7 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
rsv = (*now != *at)? true : false;
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov);
update_match_perf (ctx, *ov, true);

if (cmd != std::string ("satisfiability")) {
if ( (rc = track_schedule_info (ctx, jobid,
Expand Down Expand Up @@ -1818,6 +1825,9 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
goto done;
}
if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov, false);
flux_log_error (ctx->h, "%s: run", __FUNCTION__);
goto done;
}
Expand All @@ -1827,7 +1837,7 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
}
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov);
update_match_perf (ctx, ov, true);
if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) {
flux_log_error (ctx->h, "%s: can't add job info (id=%jd)",
__FUNCTION__, (intmax_t)jobid);
Expand Down Expand Up @@ -2177,53 +2187,93 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);
int saved_errno;
json_t *o = nullptr;
double mean = 0.0f;
json_t *match_succeeded = nullptr;
json_t *match_failed = nullptr;
double avg = 0.0f;
double min = 0.0f;
double variance = 0.0f;
// Failed match stats
double avg_failed = 0.0f;
double min_failed = 0.0f;
double variance_failed = 0.0f;
int64_t graph_uptime_s = 0;
int64_t time_since_reset_s = 0;
std::chrono::time_point<std::chrono::system_clock> now;

if (ctx->perf.njobs_reset > 1) {
mean = ctx->perf.mean;
min = ctx->perf.min;
if (ctx->perf.succeeded.njobs_reset > 1) {
avg = ctx->perf.succeeded.avg;
min = ctx->perf.succeeded.min;
// Welford's online algorithm
variance = ctx->perf.M2 / (double)ctx->perf.njobs_reset;
variance = ctx->perf.succeeded.M2
/ (double)ctx->perf.succeeded.njobs_reset;
}
if ( !(o = json_object ())) {
if (ctx->perf.failed.njobs_reset > 1) {
avg_failed = ctx->perf.failed.avg;
min_failed = ctx->perf.failed.min;
// Welford's online algorithm
variance_failed = ctx->perf.failed.M2
/ (double)ctx->perf.failed.njobs_reset;
}
if ( !(o = json_object ()) || !(match_succeeded = json_object ())
|| !(match_failed = json_object ())) {
errno = ENOMEM;
goto error;
}
if (get_stat_by_rank (ctx, o) < 0) {
flux_log_error (h, "%s: get_stat_by_rank", __FUNCTION__);
goto error_free;
}

if (!(match_succeeded = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}",
"njobs", ctx->perf.succeeded.njobs,
"njobs-reset", ctx->perf.succeeded.njobs_reset,
"stats",
"min", min,
"max", ctx->perf.succeeded.max,
"avg", avg,
"variance", variance))) {
errno = ENOMEM;
goto error_free;
}
if (!(match_failed = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}",
"njobs", ctx->perf.failed.njobs,
"njobs-reset", ctx->perf.failed.njobs_reset,
"stats",
"min", min_failed,
"max", ctx->perf.failed.max,
"avg", avg_failed,
"variance", variance_failed))) {
errno = ENOMEM;
goto error_free;
}
now = std::chrono::system_clock::now ();
graph_uptime_s = std::chrono::duration_cast<std::chrono::seconds> (
now - ctx->perf.graph_uptime).count ();
time_since_reset_s = std::chrono::duration_cast<std::chrono::seconds> (
now - ctx->perf.time_since_reset).count ();
if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:I s:I s:f s:f s:f s:f}",
"V", num_vertices (ctx->db->resource_graph),
"E", num_edges (ctx->db->resource_graph),
"by_rank", o,
"load-time", ctx->perf.load,
"graph-uptime", graph_uptime_s,
"time-since-reset", time_since_reset_s,
"njobs", ctx->perf.njobs,
"njobs-reset", ctx->perf.njobs_reset,
"min-match", min,
"max-match", ctx->perf.max,
"avg-match", mean,
"match-variance", variance) < 0) {

if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:{s:O s:O}}",
"V", num_vertices (ctx->db->resource_graph),
"E", num_edges (ctx->db->resource_graph),
"by_rank", o,
"load-time", ctx->perf.load,
"graph-uptime", graph_uptime_s,
"time-since-reset", time_since_reset_s,
"match",
"succeeded", match_succeeded,
"failed", match_failed) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
}
json_decref (match_succeeded);
json_decref (match_failed);

return;

error_free:
saved_errno = errno;
json_decref (o);
json_decref (match_succeeded);
json_decref (match_failed);
errno = saved_errno;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
Expand All @@ -2236,12 +2286,17 @@ static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w,
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);

// Clear the jobs-related stats and reset time
ctx->perf.njobs_reset = 0;
ctx->perf.min = std::numeric_limits<double>::max ();
ctx->perf.max = 0.0f;
ctx->perf.mean = 0.0f;
ctx->perf.M2 = 0.0f;
ctx->perf.time_since_reset = std::chrono::system_clock::now ();
ctx->perf.succeeded.njobs_reset = 0;
ctx->perf.succeeded.min = std::numeric_limits<double>::max ();
ctx->perf.succeeded.max = 0.0;
ctx->perf.succeeded.avg = 0.0;
ctx->perf.succeeded.M2 = 0.0;
// Failed match stats
ctx->perf.failed.njobs_reset = 0;
ctx->perf.failed.min = std::numeric_limits<double>::max ();
ctx->perf.failed.max = 0.0;
ctx->perf.failed.avg = 0.0;
ctx->perf.failed.M2 = 0.0;

if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
Expand Down
62 changes: 56 additions & 6 deletions src/cmd/flux-ion-resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,62 @@ def stat_action(_):
print("Graph Load Time: ", resp["load-time"], "Secs")
print("Graph Upime: ", resp["graph-uptime"], "Secs")
print("Time Since Stats Reset: ", resp["time-since-reset"], "Secs")
print("Num. of Total Jobs Matched: ", resp["njobs"])
print("Num. of Jobs Matched Since Reset: ", resp["njobs-reset"])
print("Min. Match Time: ", resp["min-match"], "Secs")
print("Max. Match Time: ", resp["max-match"], "Secs")
print("Avg. Match Time: ", resp["avg-match"], "Secs")
print("Match Variance: ", resp["match-variance"], "Secs^2")
print(
"Num. of Total Jobs Successfully Matched: ",
resp["match"]["succeeded"]["njobs"],
)
print(
"Num. of Jobs Successfully Matched Since Reset: ",
resp["match"]["succeeded"]["njobs-reset"],
)
print(
"Min. Successful Match Time: ",
resp["match"]["succeeded"]["stats"]["min"],
"Secs",
)
print(
"Max. Successful Match Time: ",
resp["match"]["succeeded"]["stats"]["max"],
"Secs",
)
print(
"Avg. Successful Match Time: ",
resp["match"]["succeeded"]["stats"]["avg"],
"Secs",
)
print(
"Successful Match Variance: ",
resp["match"]["succeeded"]["stats"]["variance"],
"Secs^2",
)
print(
"Num. of Jobs with Failed Matches: ",
resp["match"]["failed"]["njobs"],
)
print(
"Num. of Jobs with Failed Matches Since Reset: ",
resp["match"]["failed"]["njobs-reset"],
)
print(
"Min. Match Time of Failed Matches: ",
resp["match"]["failed"]["stats"]["min"],
"Secs",
)
print(
"Max. Match Time of Failed Matches: ",
resp["match"]["failed"]["stats"]["max"],
"Secs",
)
print(
"Avg. Match Time of Failed Matches: ",
resp["match"]["failed"]["stats"]["avg"],
"Secs",
)
print(
"Match Variance of Failed Matches: ",
resp["match"]["failed"]["stats"]["variance"],
"Secs^2",
)


def stats_clear_action(_):
Expand Down
3 changes: 3 additions & 0 deletions t/t4000-match-params.t
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ test_expect_success 'resource module stats and clear work' '
unload_resource &&
load_resource &&
load_qmanager_sync &&
# Submit unsatisfiable jobs to populate failed match stats
test_must_fail flux run -N 100 -n 400 -t 10s sleep 10 &&
test_must_fail flux run -N 100 -n 400 -t 10s sleep 10 &&
flux module stats sched-fluxion-resource &&
flux module stats --clear sched-fluxion-resource
'
Expand Down

0 comments on commit 7a45c25

Please sign in to comment.