Skip to content

Commit

Permalink
qmanager: unpack free ranks in hello and pack into scheduling key
Browse files Browse the repository at this point in the history
Problem: the sched.hello RPC now includes a `free` key whose value is an
idset of previously partially released ranks. Currently Fluxion doesn't
unpack the key and handle the previously freed resources. The
rv1_noexec doesn't need the freed ranks idset because core only sends
the R that is still allocated in R_lite. However, core doesn't support
processing JGF, so rv1 with the scheduling key contains R with a
scheduling key JGF representing the initial resource set. This leads to
state divergence between core and sched for the rv1 or reader.

Add support for unpacking the `free` key and packing it into the JSON
`scheduling` payload for queue reconstruction and update allocate with
JGF.
  • Loading branch information
milroy authored and garlick committed Dec 21, 2024
1 parent 2b047bc commit be086b2
Showing 1 changed file with 36 additions and 3 deletions.
39 changes: 36 additions & 3 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,21 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const
unsigned int prio;
uint32_t uid;
double ts;
const char *free_ranks = NULL;
json_t *jobspec = NULL;
flux_future_t *f = NULL;
json_t *R_jsontmp = NULL;
json_t *free_ranks_j = NULL;
json_t *sched = NULL;
json_error_t json_err;
const char *R_final = NULL;

/* Don't expect jobspec to be set here as it is not currently defined
* in RFC 27. However, add it anyway in case the hello protocol
* evolves to include it. If it is not set, it must be looked up.
*/
if (flux_msg_unpack (msg,
"{s:I s:i s:i s:f s?o}",
"{s:I s:i s:i s:f s?s s?o}",
"id",
&id,
"priority",
Expand All @@ -167,6 +173,8 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const
&uid,
"t_submit",
&ts,
"free",
&free_ranks,
"jobspec",
&jobspec)
< 0) {
Expand Down Expand Up @@ -199,9 +207,34 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const
queue_name.c_str ());
goto out;
}
// if free ranks is populated, insert the free ranks into the scheduling key
if (free_ranks) {
if (!(R_jsontmp = json_loads (R, 0, &json_err))) {
errno = ENOMEM;
flux_log (h, LOG_ERR, "%s: json_loads", __FUNCTION__);
goto out;
}
if ((sched = json_object_get (R_jsontmp, "scheduling")) == NULL) {
R_final = R;
} else {
free_ranks_j = json_string (free_ranks);
json_object_set (sched, "free_ranks", free_ranks_j);
if (!(R_final = json_dumps (R_jsontmp, JSON_COMPACT))) {
errno = ENOMEM;
flux_log (h, LOG_ERR, "%s: json_dumps", __FUNCTION__);
goto out;
}
}
} else {
R_final = R;
}
queue = ctx->queues.at (queue_name);
running_job =
std::make_shared<job_t> (job_state_kind_t::RUNNING, id, uid, calc_priority (prio), ts, R);
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
id,
uid,
calc_priority (prio),
ts,
R_final);

if (queue->reconstruct (static_cast<void *> (h), running_job, R_out) < 0) {
flux_log_error (h,
Expand Down

0 comments on commit be086b2

Please sign in to comment.