From be086b2c0f6da269c99ece85ca0e13343b7f9862 Mon Sep 17 00:00:00 2001 From: Daniel Milroy Date: Sat, 21 Dec 2024 01:43:48 -0800 Subject: [PATCH] qmanager: unpack free ranks in hello and pack into scheduling key Problem: the sched.hello RPC now includes a `free` key whose value is an idset of previously partially released ranks. Currently Fluxion doesn't unpack the key and handle the previously freed resources. The rv1_noexec doesn't need the freed ranks idset because core only sends the R that is still allocated in R_lite. However, core doesn't support processing JGF, so rv1 with the scheduling key contains R with a scheduling key JGF representing the initial resource set. This leads to state divergence between core and sched for the rv1 or reader. Add support for unpacking the `free` key and packing it into the JSON `scheduling` payload for queue reconstruction and update allocate with JGF. --- qmanager/modules/qmanager_callbacks.cpp | 39 +++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/qmanager/modules/qmanager_callbacks.cpp b/qmanager/modules/qmanager_callbacks.cpp index 56251f835..fe858ffdc 100644 --- a/qmanager/modules/qmanager_callbacks.cpp +++ b/qmanager/modules/qmanager_callbacks.cpp @@ -150,15 +150,21 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const unsigned int prio; uint32_t uid; double ts; + const char *free_ranks = NULL; json_t *jobspec = NULL; flux_future_t *f = NULL; + json_t *R_jsontmp = NULL; + json_t *free_ranks_j = NULL; + json_t *sched = NULL; + json_error_t json_err; + const char *R_final = NULL; /* Don't expect jobspec to be set here as it is not currently defined * in RFC 27. However, add it anyway in case the hello protocol * evolves to include it. If it is not set, it must be looked up. */ if (flux_msg_unpack (msg, - "{s:I s:i s:i s:f s?o}", + "{s:I s:i s:i s:f s?s s?o}", "id", &id, "priority", @@ -167,6 +173,8 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const &uid, "t_submit", &ts, + "free", + &free_ranks, "jobspec", &jobspec) < 0) { @@ -199,9 +207,34 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const queue_name.c_str ()); goto out; } + // if free ranks is populated, insert the free ranks into the scheduling key + if (free_ranks) { + if (!(R_jsontmp = json_loads (R, 0, &json_err))) { + errno = ENOMEM; + flux_log (h, LOG_ERR, "%s: json_loads", __FUNCTION__); + goto out; + } + if ((sched = json_object_get (R_jsontmp, "scheduling")) == NULL) { + R_final = R; + } else { + free_ranks_j = json_string (free_ranks); + json_object_set (sched, "free_ranks", free_ranks_j); + if (!(R_final = json_dumps (R_jsontmp, JSON_COMPACT))) { + errno = ENOMEM; + flux_log (h, LOG_ERR, "%s: json_dumps", __FUNCTION__); + goto out; + } + } + } else { + R_final = R; + } queue = ctx->queues.at (queue_name); - running_job = - std::make_shared (job_state_kind_t::RUNNING, id, uid, calc_priority (prio), ts, R); + running_job = std::make_shared (job_state_kind_t::RUNNING, + id, + uid, + calc_priority (prio), + ts, + R_final); if (queue->reconstruct (static_cast (h), running_job, R_out) < 0) { flux_log_error (h,