Skip to content

Commit

Permalink
Merge pull request flux-framework#1321 from garlick/partial_ok
Browse files Browse the repository at this point in the history
qmanager: send partial-ok with sched.hello
  • Loading branch information
mergify[bot] authored Jan 9, 2025
2 parents de1648c + b3709b7 commit 4b9cbec
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 72 deletions.
32 changes: 15 additions & 17 deletions qmanager/modules/qmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,20 +280,19 @@ static int handshake_jobmanager (std::shared_ptr<qmanager_ctx_t> &ctx)

static void status_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg)
{
size_t len = 0;
const char *payload;
flux_future_t *f = NULL;

if (!(f = flux_rpc (h, "sched-fluxion-resource.status", NULL, FLUX_NODEID_ANY, 0))) {
flux_log_error (h, "%s: flux_rpc (sched-fluxion-resource.status)", __FUNCTION__);
goto out;
}
if (flux_rpc_get_raw (f, (const void **)&payload, &len) < 0) {
flux_log_error (h, "%s: flux_rpc_get_raw", __FUNCTION__);
if (flux_rpc_get (f, &payload) < 0) {
flux_log_error (h, "%s: flux_rpc_get", __FUNCTION__);
goto out;
}
if (flux_respond_raw (h, msg, (const void *)payload, len) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
if (flux_respond (h, msg, payload) < 0) {
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
goto out;
}
flux_future_destroy (f);
Expand All @@ -310,25 +309,19 @@ static void feasibility_request_cb (flux_t *h,
const flux_msg_t *msg,
void *arg)
{
size_t size = 0;
flux_future_t *f = nullptr;
const char *data = nullptr;

if (flux_request_decode_raw (msg, nullptr, (const void **)&data, &size) < 0)
if (flux_request_decode (msg, nullptr, &data) < 0)
goto error;
if (!(f = flux_rpc_raw (h,
"sched-fluxion-resource.satisfiability",
data,
size,
FLUX_NODEID_ANY,
0))) {
if (!(f = flux_rpc (h, "sched-fluxion-resource.satisfiability", data, FLUX_NODEID_ANY, 0))) {
flux_log_error (h, "%s: flux_rpc (sched-fluxion-resource.satisfiability)", __FUNCTION__);
goto error;
}
if (flux_rpc_get_raw (f, (const void **)&data, &size) < 0)
if (flux_rpc_get (f, &data) < 0)
goto error;
if (flux_respond_raw (h, msg, (const void *)data, size) < 0) {
flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__);
if (flux_respond (h, msg, data) < 0) {
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
goto error;
}
flux_log (h, LOG_DEBUG, "%s: feasibility succeeded", __FUNCTION__);
Expand Down Expand Up @@ -558,9 +551,14 @@ static std::shared_ptr<qmanager_ctx_t> qmanager_new (flux_t *h)
ctx = nullptr;
goto done;
}
int schedutil_flags = 0;
#ifdef SCHEDUTIL_HELLO_PARTIAL_OK
// flag was added in flux-core 0.70.0
schedutil_flags |= SCHEDUTIL_HELLO_PARTIAL_OK;
#endif
if (!(ctx->schedutil =
schedutil_create (ctx->h,
SCHEDUTIL_FREE_NOLOOKUP,
schedutil_flags,
&ops,
std::static_pointer_cast<qmanager_cb_ctx_t> (ctx).get ()))) {
flux_log_error (ctx->h, "%s: schedutil_create", __FUNCTION__);
Expand Down
39 changes: 36 additions & 3 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,21 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const
unsigned int prio;
uint32_t uid;
double ts;
const char *free_ranks = NULL;
json_t *jobspec = NULL;
flux_future_t *f = NULL;
json_t *R_jsontmp = NULL;
json_t *free_ranks_j = NULL;
json_t *sched = NULL;
json_error_t json_err;
const char *R_final = NULL;

/* Don't expect jobspec to be set here as it is not currently defined
* in RFC 27. However, add it anyway in case the hello protocol
* evolves to include it. If it is not set, it must be looked up.
*/
if (flux_msg_unpack (msg,
"{s:I s:i s:i s:f s?o}",
"{s:I s:i s:i s:f s?s s?o}",
"id",
&id,
"priority",
Expand All @@ -167,6 +173,8 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const
&uid,
"t_submit",
&ts,
"free",
&free_ranks,
"jobspec",
&jobspec)
< 0) {
Expand Down Expand Up @@ -199,9 +207,34 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const
queue_name.c_str ());
goto out;
}
// if free ranks is populated, insert the free ranks into the scheduling key
if (free_ranks) {
if (!(R_jsontmp = json_loads (R, 0, &json_err))) {
errno = ENOMEM;
flux_log (h, LOG_ERR, "%s: json_loads", __FUNCTION__);
goto out;
}
if ((sched = json_object_get (R_jsontmp, "scheduling")) == NULL) {
R_final = R;
} else {
free_ranks_j = json_string (free_ranks);
json_object_set (sched, "free_ranks", free_ranks_j);
if (!(R_final = json_dumps (R_jsontmp, JSON_COMPACT))) {
errno = ENOMEM;
flux_log (h, LOG_ERR, "%s: json_dumps", __FUNCTION__);
goto out;
}
}
} else {
R_final = R;
}
queue = ctx->queues.at (queue_name);
running_job =
std::make_shared<job_t> (job_state_kind_t::RUNNING, id, uid, calc_priority (prio), ts, R);
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
id,
uid,
calc_priority (prio),
ts,
R_final);

if (queue->reconstruct (static_cast<void *> (h), running_job, R_out) < 0) {
flux_log_error (h,
Expand Down
41 changes: 22 additions & 19 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1528,8 +1528,8 @@ static int parse_R (std::shared_ptr<resource_ctx_t> &ctx,
int rc = 0;
int version = 0;
int saved_errno;
uint64_t st = 0;
uint64_t et = 0;
double tstart = 0;
double expiration = 0;
json_t *o = NULL;
json_t *graph = NULL;
json_error_t error;
Expand All @@ -1541,32 +1541,35 @@ static int parse_R (std::shared_ptr<resource_ctx_t> &ctx,
errno = EINVAL;
goto out;
}
if ((rc = json_unpack (o,
"{s:i s:{s:I s:I} s?:o}",
"version",
&version,
"execution",
"starttime",
&st,
"expiration",
&et,
"scheduling",
&graph))
if ((rc = json_unpack_ex (o,
&error,
0,
"{s:i s:{s:F s:F} s?:o}",
"version",
&version,
"execution",
"starttime",
&tstart,
"expiration",
&expiration,
"scheduling",
&graph))
< 0) {
errno = EINVAL;
flux_log (ctx->h, LOG_ERR, "%s: json_unpack", __FUNCTION__);
flux_log (ctx->h, LOG_ERR, "%s: json_unpack: %s", __FUNCTION__, error.text);
goto freemem_out;
}
if (version != 1 || st < 0 || et < st) {
if (version != 1 || tstart < 0 || expiration < tstart
|| expiration > static_cast<double> (std::numeric_limits<int64_t>::max ())) {
rc = -1;
errno = EPROTO;
flux_log (ctx->h,
LOG_ERR,
"%s: version=%d, starttime=%jd, expiration=%jd",
__FUNCTION__,
version,
static_cast<intmax_t> (st),
static_cast<intmax_t> (et));
static_cast<intmax_t> (tstart),
static_cast<intmax_t> (expiration));
goto freemem_out;
}
if (graph != NULL) {
Expand All @@ -1585,8 +1588,8 @@ static int parse_R (std::shared_ptr<resource_ctx_t> &ctx,
format = "rv1exec";
}

starttime = static_cast<int64_t> (st);
duration = et - st;
starttime = static_cast<int64_t> (tstart);
duration = static_cast<uint64_t> (expiration - tstart);

freemem_out:
saved_errno = errno;
Expand Down
81 changes: 67 additions & 14 deletions resource/readers/resource_reader_jgf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ extern "C" {
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <flux/idset.h>
}

#include <map>
Expand Down Expand Up @@ -212,10 +213,16 @@ std::string diff (const resource_pool_t &r, const fetch_helper_t &f)
int resource_reader_jgf_t::fetch_jgf (const std::string &str,
json_t **jgf_p,
json_t **nodes_p,
json_t **edges_p)
json_t **edges_p,
jgf_updater_data &update_data)
{
int rc = -1;
int64_t rank;
json_t *graph = NULL;
json_t *free_ranks = NULL;
struct idset *r_ids = nullptr;
const char *ranks = nullptr;
std::string ranks_stripped;
json_error_t json_err;

if ((*jgf_p = json_loads (str.c_str (), 0, &json_err)) == NULL) {
Expand All @@ -234,6 +241,30 @@ int resource_reader_jgf_t::fetch_jgf (const std::string &str,
m_err_msg += ": JGF does not contain a required key (graph).\n";
goto done;
}
if ((free_ranks = json_object_get (*jgf_p, "free_ranks")) != NULL) {
update_data.isect_ranks = true;
if (!(ranks = json_dumps (free_ranks, JSON_ENCODE_ANY | JSON_COMPACT))) {
errno = ENOMEM;
m_err_msg += __FUNCTION__;
m_err_msg += ": json_dumps failed.\n";
goto done;
}
// Need to strip double quotes inserted by json_dumps above
ranks_stripped = std::string (ranks);
ranks_stripped.erase (std::remove (ranks_stripped.begin (), ranks_stripped.end (), '"'),
ranks_stripped.end ());
if ((r_ids = idset_decode (ranks_stripped.c_str ())) == NULL) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": failed to decode ranks.\n";
goto done;
}
rank = idset_first (r_ids);
while (rank != IDSET_INVALID_ID) {
update_data.ranks.insert (rank);
rank = idset_next (r_ids, rank);
}
}
if ((*nodes_p = json_object_get (graph, "nodes")) == NULL) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
Expand Down Expand Up @@ -903,6 +934,13 @@ int resource_reader_jgf_t::update_vtx (resource_graph_t &g,
goto done;
if ((rc = check_root (v, g, root_checks)) != 0)
goto done;
// Check if skipping due to previous partial free
if (update_data.isect_ranks && !update_data.ranks.empty ()) {
if (update_data.ranks.find (fetcher.rank) != update_data.ranks.end ()) {
rc = 0;
goto done;
}
}
if ((rc = update_vmap (vmap, v, root_checks, fetcher)) != 0)
goto done;
if (update_data.update) {
Expand Down Expand Up @@ -1025,7 +1063,8 @@ int resource_reader_jgf_t::unpack_edge (json_t *element,
std::map<std::string, vmap_val_t> &vmap,
std::string &source,
std::string &target,
std::string &subsystem)
std::string &subsystem,
jgf_updater_data &update_data)
{
int rc = -1;
json_t *metadata = NULL;
Expand All @@ -1042,11 +1081,17 @@ int resource_reader_jgf_t::unpack_edge (json_t *element,
source = src;
target = tgt;
if (vmap.find (source) == vmap.end () || vmap.find (target) == vmap.end ()) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": source and/or target vertex not found";
m_err_msg += source + std::string (" -> ") + target + ".\n";
goto done;
if (update_data.isect_ranks) {
update_data.skipped = true;
rc = 0;
goto done;
} else {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": source and/or target vertex not found";
m_err_msg += source + std::string (" -> ") + target + ".\n";
goto done;
}
}
if ((json_unpack (element, "{ s?{ s?s } }", "metadata", "subsystem", &subsys)) < 0) {
errno = EINVAL;
Expand Down Expand Up @@ -1077,10 +1122,11 @@ int resource_reader_jgf_t::unpack_edges (resource_graph_t &g,
std::string source{};
std::string target{};
std::string subsystem{};
jgf_updater_data update_data;

for (i = 0; i < json_array_size (edges); i++) {
element = json_array_get (edges, i);
if ((unpack_edge (element, vmap, source, target, subsystem)) != 0)
if ((unpack_edge (element, vmap, source, target, subsystem, update_data)) != 0)
goto done;
// We only add the edge when it connects at least one newly added vertex
if ((added_vtcs.count (source) == 1) || (added_vtcs.count (target) == 1)) {
Expand Down Expand Up @@ -1184,7 +1230,8 @@ int resource_reader_jgf_t::update_edges (resource_graph_t &g,
resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
json_t *edges,
uint64_t token)
uint64_t token,
jgf_updater_data &update_data)
{
edg_t e;
int rc = -1;
Expand All @@ -1197,8 +1244,13 @@ int resource_reader_jgf_t::update_edges (resource_graph_t &g,
for (i = 0; i < json_array_size (edges); i++) {
element = json_array_get (edges, i);
// We only check protocol errors in JGF edges in the following...
if ((rc = unpack_edge (element, vmap, source, target, subsystem)) != 0)
update_data.skipped = false;
if ((rc = unpack_edge (element, vmap, source, target, subsystem, update_data)) != 0)
goto done;
if (update_data.skipped) {
update_data.skipped = false;
continue;
}
if ((rc = update_src_edge (g, m, vmap, source, token)) != 0)
goto done;
if ((rc = update_tgt_edge (g, m, vmap, source, target, token)) != 0)
Expand Down Expand Up @@ -1273,14 +1325,15 @@ int resource_reader_jgf_t::unpack (resource_graph_t &g,
json_t *edges = NULL;
std::map<std::string, vmap_val_t> vmap;
std::unordered_set<std::string> added_vtcs;
jgf_updater_data update_data;

if (rank != -1) {
errno = ENOTSUP;
m_err_msg += __FUNCTION__;
m_err_msg += "rank != -1 unsupported for JGF unpack.\n";
goto done;
}
if ((rc = fetch_jgf (str, &jgf, &nodes, &edges)) != 0)
if ((rc = fetch_jgf (str, &jgf, &nodes, &edges, update_data)) != 0)
goto done;
if ((rc = unpack_vertices (g, m, vmap, nodes, added_vtcs)) != 0)
goto done;
Expand Down Expand Up @@ -1340,13 +1393,13 @@ int resource_reader_jgf_t::update (resource_graph_t &g,
update_data.reserved = rsv;
update_data.update = true;

if ((rc = fetch_jgf (str, &jgf, &nodes, &edges)) != 0)
if ((rc = fetch_jgf (str, &jgf, &nodes, &edges, update_data)) != 0)
goto done;
if ((rc = update_vertices (g, m, vmap, nodes, update_data)) != 0) {
undo_vertices (g, vmap, update_data);
goto done;
}
if ((rc = update_edges (g, m, vmap, edges, token)) != 0)
if ((rc = update_edges (g, m, vmap, edges, token, update_data)) != 0)
goto done;

done:
Expand Down Expand Up @@ -1414,7 +1467,7 @@ int resource_reader_jgf_t::partial_cancel (resource_graph_t &g,
p_cancel_data.jobid = jobid;
p_cancel_data.update = false;

if ((rc = fetch_jgf (R, &jgf, &nodes, &edges)) != 0)
if ((rc = fetch_jgf (R, &jgf, &nodes, &edges, p_cancel_data)) != 0)
goto done;
if ((rc = update_vertices (g, m, vmap, nodes, p_cancel_data)) != 0)
goto done;
Expand Down
Loading

0 comments on commit 4b9cbec

Please sign in to comment.