Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update stats output to include data for failed matches #1187

Merged
merged 3 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 112 additions & 57 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,37 @@
******************************************************************************/

struct match_perf_t {
double load; /* Graph load time */
uint64_t njobs; /* Total match count */
uint64_t njobs_reset; /* Jobs since reset match count */
double load = 0.0; /* Graph load time */
/* Graph uptime in seconds */
std::chrono::time_point<std::chrono::system_clock> graph_uptime;
std::chrono::time_point<std::chrono::system_clock> graph_uptime
= std::chrono::system_clock::now ();
/* Time since stats were last cleared */
std::chrono::time_point<std::chrono::system_clock> time_since_reset;
double min; /* Min match time */
double max; /* Max match time */
double mean; /* Mean match time */
double M2; /* Welford's algorithm */
std::chrono::time_point<std::chrono::system_clock> time_since_reset
= std::chrono::system_clock::now ();
struct time_stats {
void update_stats (double elapsed) {
/* Update using Welford's algorithm */
double delta = 0.0;
double delta2 = 0.0;
njobs++;
njobs_reset++;
min = (min > elapsed)? elapsed : min;
max = (max < elapsed)? elapsed : max;
// Welford's online algorithm for variance
delta = elapsed - avg;
avg += delta / (double)njobs;
delta2 = elapsed - avg;
M2 += delta * delta2;
}
uint64_t njobs = 0; /* Total match count */
uint64_t njobs_reset = 0; /* Jobs since match count reset */
double min = std::numeric_limits<double>::max (); /* Min match time */
double max = 0.0; /* Max match time */
double avg = 0.0; /* Average match time */
double M2 = 0.0; /* Welford's algorithm */
};
time_stats succeeded;
time_stats failed;
};

class msg_wrap_t {
Expand Down Expand Up @@ -353,15 +373,6 @@
ctx->h = h;
ctx->handlers = NULL;
set_default_args (ctx);
ctx->perf.load = 0.0f;
ctx->perf.njobs = 0;
ctx->perf.njobs_reset = 0;
ctx->perf.min = std::numeric_limits<double>::max ();
ctx->perf.max = 0.0f;
ctx->perf.mean = 0.0f;
ctx->perf.M2 = 0.0f;
ctx->perf.graph_uptime = std::chrono::system_clock::now ();
ctx->perf.time_since_reset = std::chrono::system_clock::now ();
ctx->matcher = nullptr; /* Cannot be allocated at this point */
ctx->fgraph = nullptr; /* Cannot be allocated at this point */
ctx->writers = nullptr; /* Cannot be allocated at this point */
Expand Down Expand Up @@ -1514,19 +1525,12 @@
******************************************************************************/

static void update_match_perf (std::shared_ptr<resource_ctx_t> &ctx,
double elapse)
double elapsed, bool match_success)
{
double delta = 0.0f;
double delta2 = 0.2f;
ctx->perf.njobs++;
ctx->perf.njobs_reset++;
ctx->perf.min = (ctx->perf.min > elapse)? elapse : ctx->perf.min;
ctx->perf.max = (ctx->perf.max < elapse)? elapse : ctx->perf.max;
// Welford's online algorithm for variance
delta = elapse - ctx->perf.mean;
ctx->perf.mean += delta / (double)ctx->perf.njobs;
delta2 = elapse - ctx->perf.mean;
ctx->perf.M2 += delta * delta2;
if (match_success)
ctx->perf.succeeded.update_stats (elapsed);
else
ctx->perf.failed.update_stats (elapsed);
}

static inline std::string get_status_string (int64_t now, int64_t at)
Expand Down Expand Up @@ -1776,6 +1780,9 @@
(start.time_since_epoch ());
*at = *now = epoch.count ();
if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov, false);
goto done;
}
if ( (rc = ctx->writers->emit (o)) < 0) {
Expand All @@ -1786,7 +1793,7 @@
rsv = (*now != *at)? true : false;
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov);
update_match_perf (ctx, *ov, true);

if (cmd != std::string ("satisfiability")) {
if ( (rc = track_schedule_info (ctx, jobid,
Expand Down Expand Up @@ -1818,6 +1825,9 @@
goto done;
}
if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) {
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov, false);
flux_log_error (ctx->h, "%s: run", __FUNCTION__);
goto done;
}
Expand All @@ -1827,7 +1837,7 @@
}
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov);
update_match_perf (ctx, ov, true);
if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) {
flux_log_error (ctx->h, "%s: can't add job info (id=%jd)",
__FUNCTION__, (intmax_t)jobid);
Expand Down Expand Up @@ -2177,53 +2187,93 @@
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);
int saved_errno;
json_t *o = nullptr;
double mean = 0.0f;
json_t *match_succeeded = nullptr;
json_t *match_failed = nullptr;
double avg = 0.0f;
double min = 0.0f;
double variance = 0.0f;
// Failed match stats
double avg_failed = 0.0f;
double min_failed = 0.0f;
double variance_failed = 0.0f;
int64_t graph_uptime_s = 0;
int64_t time_since_reset_s = 0;
std::chrono::time_point<std::chrono::system_clock> now;

if (ctx->perf.njobs_reset > 1) {
mean = ctx->perf.mean;
min = ctx->perf.min;
if (ctx->perf.succeeded.njobs_reset > 1) {
avg = ctx->perf.succeeded.avg;
min = ctx->perf.succeeded.min;
// Welford's online algorithm
variance = ctx->perf.M2 / (double)ctx->perf.njobs_reset;
variance = ctx->perf.succeeded.M2
/ (double)ctx->perf.succeeded.njobs_reset;
}
if ( !(o = json_object ())) {
if (ctx->perf.failed.njobs_reset > 1) {
avg_failed = ctx->perf.failed.avg;
min_failed = ctx->perf.failed.min;
// Welford's online algorithm
variance_failed = ctx->perf.failed.M2
/ (double)ctx->perf.failed.njobs_reset;
}
if ( !(o = json_object ()) || !(match_succeeded = json_object ())
|| !(match_failed = json_object ())) {
errno = ENOMEM;
goto error;
}
if (get_stat_by_rank (ctx, o) < 0) {
flux_log_error (h, "%s: get_stat_by_rank", __FUNCTION__);
goto error_free;
}

if (!(match_succeeded = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}",
"njobs", ctx->perf.succeeded.njobs,
"njobs-reset", ctx->perf.succeeded.njobs_reset,
"stats",
"min", min,
"max", ctx->perf.succeeded.max,
"avg", avg,
"variance", variance))) {
errno = ENOMEM;
goto error_free;

Check warning on line 2236 in resource/modules/resource_match.cpp

View check run for this annotation

Codecov / codecov/patch

resource/modules/resource_match.cpp#L2235-L2236

Added lines #L2235 - L2236 were not covered by tests
}
if (!(match_failed = json_pack ("{s:I s:I s:{s:f s:f s:f s:f}}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot easier for me to follow, good stuff.

"njobs", ctx->perf.failed.njobs,
"njobs-reset", ctx->perf.failed.njobs_reset,
"stats",
"min", min_failed,
"max", ctx->perf.failed.max,
"avg", avg_failed,
"variance", variance_failed))) {
errno = ENOMEM;
goto error_free;

Check warning on line 2247 in resource/modules/resource_match.cpp

View check run for this annotation

Codecov / codecov/patch

resource/modules/resource_match.cpp#L2246-L2247

Added lines #L2246 - L2247 were not covered by tests
}
now = std::chrono::system_clock::now ();
graph_uptime_s = std::chrono::duration_cast<std::chrono::seconds> (
now - ctx->perf.graph_uptime).count ();
time_since_reset_s = std::chrono::duration_cast<std::chrono::seconds> (
now - ctx->perf.time_since_reset).count ();
if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:I s:I s:f s:f s:f s:f}",
"V", num_vertices (ctx->db->resource_graph),
"E", num_edges (ctx->db->resource_graph),
"by_rank", o,
"load-time", ctx->perf.load,
"graph-uptime", graph_uptime_s,
"time-since-reset", time_since_reset_s,
"njobs", ctx->perf.njobs,
"njobs-reset", ctx->perf.njobs_reset,
"min-match", min,
"max-match", ctx->perf.max,
"avg-match", mean,
"match-variance", variance) < 0) {

if (flux_respond_pack (h, msg, "{s:I s:I s:o s:f s:I s:I s:{s:O s:O}}",
"V", num_vertices (ctx->db->resource_graph),
"E", num_edges (ctx->db->resource_graph),
"by_rank", o,
"load-time", ctx->perf.load,
"graph-uptime", graph_uptime_s,
"time-since-reset", time_since_reset_s,
"match",
"succeeded", match_succeeded,
"failed", match_failed) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
}
json_decref (match_succeeded);
json_decref (match_failed);

return;

error_free:
saved_errno = errno;
json_decref (o);
json_decref (match_succeeded);
json_decref (match_failed);

Check warning on line 2276 in resource/modules/resource_match.cpp

View check run for this annotation

Codecov / codecov/patch

resource/modules/resource_match.cpp#L2275-L2276

Added lines #L2275 - L2276 were not covered by tests
errno = saved_errno;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
Expand All @@ -2236,12 +2286,17 @@
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);

// Clear the jobs-related stats and reset time
ctx->perf.njobs_reset = 0;
ctx->perf.min = std::numeric_limits<double>::max ();
ctx->perf.max = 0.0f;
ctx->perf.mean = 0.0f;
ctx->perf.M2 = 0.0f;
ctx->perf.time_since_reset = std::chrono::system_clock::now ();
ctx->perf.succeeded.njobs_reset = 0;
milroy marked this conversation as resolved.
Show resolved Hide resolved
ctx->perf.succeeded.min = std::numeric_limits<double>::max ();
ctx->perf.succeeded.max = 0.0;
ctx->perf.succeeded.avg = 0.0;
ctx->perf.succeeded.M2 = 0.0;
// Failed match stats
ctx->perf.failed.njobs_reset = 0;
ctx->perf.failed.min = std::numeric_limits<double>::max ();
ctx->perf.failed.max = 0.0;
ctx->perf.failed.avg = 0.0;
ctx->perf.failed.M2 = 0.0;

if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
Expand Down
62 changes: 56 additions & 6 deletions src/cmd/flux-ion-resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,62 @@ def stat_action(_):
print("Graph Load Time: ", resp["load-time"], "Secs")
print("Graph Upime: ", resp["graph-uptime"], "Secs")
print("Time Since Stats Reset: ", resp["time-since-reset"], "Secs")
print("Num. of Total Jobs Matched: ", resp["njobs"])
print("Num. of Jobs Matched Since Reset: ", resp["njobs-reset"])
print("Min. Match Time: ", resp["min-match"], "Secs")
print("Max. Match Time: ", resp["max-match"], "Secs")
print("Avg. Match Time: ", resp["avg-match"], "Secs")
print("Match Variance: ", resp["match-variance"], "Secs^2")
print(
"Num. of Total Jobs Successfully Matched: ",
resp["match"]["succeeded"]["njobs"],
)
print(
"Num. of Jobs Successfully Matched Since Reset: ",
resp["match"]["succeeded"]["njobs-reset"],
)
print(
"Min. Successful Match Time: ",
resp["match"]["succeeded"]["stats"]["min"],
"Secs",
)
print(
"Max. Successful Match Time: ",
resp["match"]["succeeded"]["stats"]["max"],
"Secs",
)
print(
"Avg. Successful Match Time: ",
resp["match"]["succeeded"]["stats"]["avg"],
"Secs",
)
print(
"Successful Match Variance: ",
resp["match"]["succeeded"]["stats"]["variance"],
"Secs^2",
)
print(
"Num. of Jobs with Failed Matches: ",
resp["match"]["failed"]["njobs"],
)
print(
"Num. of Jobs with Failed Matches Since Reset: ",
resp["match"]["failed"]["njobs-reset"],
)
print(
"Min. Match Time of Failed Matches: ",
resp["match"]["failed"]["stats"]["min"],
"Secs",
)
print(
"Max. Match Time of Failed Matches: ",
resp["match"]["failed"]["stats"]["max"],
"Secs",
)
print(
"Avg. Match Time of Failed Matches: ",
resp["match"]["failed"]["stats"]["avg"],
"Secs",
)
print(
"Match Variance of Failed Matches: ",
resp["match"]["failed"]["stats"]["variance"],
"Secs^2",
)


def stats_clear_action(_):
Expand Down
3 changes: 3 additions & 0 deletions t/t4000-match-params.t
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ test_expect_success 'resource module stats and clear work' '
unload_resource &&
load_resource &&
load_qmanager_sync &&
# Submit unsatisfiable jobs to populate failed match stats
test_must_fail flux run -N 100 -n 400 -t 10s sleep 10 &&
test_must_fail flux run -N 100 -n 400 -t 10s sleep 10 &&
flux module stats sched-fluxion-resource &&
flux module stats --clear sched-fluxion-resource
'
Expand Down
Loading