diff --git a/src/modules/resource/status.c b/src/modules/resource/status.c index cbcbb7997eb4..ad68a109259f 100644 --- a/src/modules/resource/status.c +++ b/src/modules/resource/status.c @@ -24,23 +24,189 @@ #include "src/common/libutil/errprintf.h" #include "src/common/libutil/errno_safe.h" +#include "src/common/librlist/rlist.h" struct status { struct resource_ctx *ctx; flux_msg_handler_t **handlers; + struct flux_msglist *requests; }; -static json_t *prepare_response (struct status *status) +/* Mark the ranks in 'ids' DOWN in the resource set 'rl'. + */ +static int mark_down (struct rlist *rl, const struct idset *ids) +{ + if (ids) { + char *s; + + if (!(s = idset_encode (ids, IDSET_FLAG_RANGE))) + return -1; + if (rlist_mark_down (rl, s) < 0) { + free (s); + errno = EINVAL; + return -1; + } + free (s); + } + return 0; +} + +/* Mark the resources in 'o' (an Rv1 resource object) ALLOCATED + * in the resource set 'rl'. + */ +static int mark_allocated (struct rlist *rl, json_t *o) +{ + if (o && !json_is_null (o)) { + struct rlist *r; + + if (!(r = rlist_from_json (o, NULL))) { + errno = EINVAL; + return -1; + } + if (rlist_set_allocated (rl, r) < 0) { + rlist_destroy (r); + errno = EINVAL; + return -1; + } + rlist_destroy (r); + } + return 0; +} + +/* Get an Rv1 resource object that includes all resources. + */ +static json_t *get_all (struct rlist *rl) +{ + json_t *o; + struct rlist *r; + + if (!(r = rlist_copy_empty (rl)) + || rlist_mark_up (r, "all") < 0 + || !(o = rlist_to_R (r))) + goto error; + rlist_destroy (r); + return o; +error: + rlist_destroy (r); + return NULL; +} + +/* Get an Rv1 resource object that includes only DOWN resources. + */ +static json_t *get_down (struct rlist *rl) +{ + json_t *o; + struct rlist *r; + + if (!(r = rlist_copy_down (rl)) + || !(o = rlist_to_R (r))) + goto error; + rlist_destroy (r); + return o; +error: + rlist_destroy (r); + return NULL; +} + +/* Get an Rv1 resource object that includes only ALLOCATED resources. + */ +static json_t *get_allocated (struct rlist *rl) +{ + json_t *o; + struct rlist *r; + + if (!(r = rlist_copy_allocated (rl)) + || !(o = rlist_to_R (r))) + goto error; + rlist_destroy (r); + return o; +error: + rlist_destroy (r); + return NULL; +} + +/* Given the "all" resource set 'rl', set the "all", "down", and + * "allocated" keys in 'obj' to Rv1 resource objects. + */ +static int set_resource_status (json_t *obj, struct rlist *rl) +{ + json_t *o; + + if (!(o = get_all (rl)) + || json_object_set_new (obj, "all", o) < 0) { + json_decref (o); + goto error; + } + if (!(o = get_down (rl)) + || json_object_set_new (obj, "down", o) < 0) { + json_decref (o); + goto error; + } + if (!(o = get_allocated (rl)) + || json_object_set_new (obj, "allocated", o) < 0) { + json_decref (o); + goto error; + } + return 0; +error: + errno = ENOMEM; + return -1; +} + +/* Create an rlist object from R. Omit the scheduling key. Then: + * - exclude the ranks in 'exclude' (if non-NULL) + * - mark down the ranks in 'down' and/or 'drain' (if non-NULL) + * - mark allocated the resources in 'allocated' (if non-NULL and not json NULL) + */ +static struct rlist *get_resource (const json_t *R, + const struct idset *exclude, + const struct idset *down, + struct idset *drain, + json_t *allocated) +{ + json_t *cpy; + struct rlist *rl; + + if (!(cpy = json_copy ((json_t *)R))) { // thin copy - to del top level key + errno = ENOMEM; + return NULL; + } + (void)json_object_del (cpy, "scheduling"); + + if (!(rl = rlist_from_json (cpy, NULL))) + goto error; + + if (exclude) { + if (rlist_remove_ranks (rl, (struct idset *)exclude) < 0) + goto error; + } + if (mark_down (rl, down) < 0 + || mark_down (rl, drain) < 0 + || mark_allocated (rl, allocated) < 0) + goto error; + json_decref (cpy); + return rl; +error: + json_decref (cpy); + rlist_destroy (rl); + errno = EINVAL; + return NULL; +} + +static json_t *prepare_response (struct status *status, json_t *allocated) { struct resource_ctx *ctx = status->ctx; - const struct idset *down = monitor_get_down (ctx->monitor); const struct idset *exclude = exclude_get (ctx->exclude); + const struct idset *down = monitor_get_down (ctx->monitor); + struct idset *drain = drain_get (ctx->drain); const json_t *R; json_t *o = NULL; json_t *drain_info = NULL; + struct rlist *rl = NULL; if (!(R = inventory_get (ctx->inventory)) - || !(drain_info = drain_get_info (ctx->drain))) + || !(drain_info = drain_get_info (ctx->drain)) + || !(rl = get_resource (R, exclude, down, drain, allocated))) goto error; if (!(o = json_pack ("{s:O s:O}", "R", R, "drain", drain_info))) { errno = ENOMEM; @@ -48,23 +214,87 @@ static json_t *prepare_response (struct status *status) } if (rutil_set_json_idset (o, "online", monitor_get_up (ctx->monitor)) < 0 || rutil_set_json_idset (o, "offline", down) < 0 - || rutil_set_json_idset (o, "exclude", exclude) < 0) + || rutil_set_json_idset (o, "exclude", exclude) < 0 + || set_resource_status (o, rl) < 0) goto error; + json_decref (drain_info); + idset_destroy (drain); + rlist_destroy (rl); return o; error: ERRNO_SAFE_WRAP (json_decref, o); ERRNO_SAFE_WRAP (json_decref, drain_info); + idset_destroy (drain); + rlist_destroy (rl); return NULL; } +static void remove_request (struct flux_msglist *ml, const flux_msg_t *msg) +{ + const flux_msg_t *m; + + m = flux_msglist_first (ml); + while (m) { + if (m == msg) { + flux_msglist_delete (ml); // delete @cursor + break; + } + m = flux_msglist_next (ml); + } +} + +/* The job-manager.resource-status RPC has completed. Finish handling + * the resource.status request. + * N.B. treat ENOSYS from job-manager.resource-status as the empty set. + * This could happen IRL because the resource module loads before job-manager. + * Also note that both the future and the message are unreferenced/destroyed + * when msg is removed from the status->requests list. + */ +static void job_manager_continuation (flux_future_t *f, void *arg) +{ + const flux_msg_t *msg = flux_future_aux_get (f, "flux::request"); + struct status *status = arg; + flux_t *h = status->ctx->h; + flux_error_t error; + json_t *allocated = NULL; + json_t *o = NULL; + + if (flux_rpc_get_unpack (f, "{s:o}", "allocated", &allocated) < 0 + && errno != ENOSYS) { + errprintf (&error, + "job-manager.resource-status request failed: %s", + future_strerror (f, errno)); + goto error; + } + if (!(o = prepare_response (status, allocated))) { + errprintf (&error, "error preparing response: %s", strerror (errno)); + goto error; + } + if (flux_respond_pack (h, msg, "O", o) < 0) + flux_log_error (h, "error responding to resource.status request"); + json_decref (o); + remove_request (status->requests, msg); + return; +error: + if (flux_respond_error (h, msg, errno, error.text) < 0) + flux_log_error (h, "error responding to resource.status request"); + json_decref (o); + remove_request (status->requests, msg); +} + +/* To answer this query, an RPC must be sent to the job manager to get + * the set of allocated resources. Get that started, then place the request + * on status->requests and continue answering in the RPC continuation. + * The rest of the information required is local. + */ static void status_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { struct status *status = arg; - json_t *o = NULL; + flux_future_t *f; flux_error_t error; if (flux_request_decode (msg, NULL, NULL) < 0) { @@ -76,18 +306,35 @@ static void status_cb (flux_t *h, errno = EPROTO; goto error; } - if (!(o = prepare_response (status))) { - errprintf (&error, "error preparing response: %s", strerror (errno)); + if (!(f = flux_rpc (h, "job-manager.resource-status", NULL, 0, 0)) + || flux_future_then (f, -1, job_manager_continuation, status) < 0 + || flux_future_aux_set (f, "flux::request", (void *)msg, NULL) < 0 + || flux_msg_aux_set (msg, + NULL, + f, + (flux_free_f)flux_future_destroy) < 0) { + errprintf (&error, + "error sending job-manager.resource-status request: %s", + strerror (errno)); + flux_future_destroy (f); + goto error; + } + if (flux_msglist_append (status->requests, msg) < 0) { + errprintf (&error, "error saving request mesg: %s", strerror (errno)); goto error; } - if (flux_respond_pack (h, msg, "O", o) < 0) - flux_log_error (h, "error responding to resource.status request"); - json_decref (o); return; error: if (flux_respond_error (h, msg, errno, error.text) < 0) flux_log_error (h, "error responding to resource.status request"); - json_decref (o); +} + +/* Disconnect hook called from resource module's main disconnect + * message handler. + */ +void status_disconnect (struct status *status, const flux_msg_t *msg) +{ + (void)flux_msglist_disconnect (status->requests, msg); } static const struct flux_msg_handler_spec htab[] = { @@ -105,6 +352,7 @@ void status_destroy (struct status *status) if (status) { int saved_errno = errno; flux_msg_handler_delvec (status->handlers); + flux_msglist_destroy (status->requests); free (status); errno = saved_errno; } @@ -117,6 +365,8 @@ struct status *status_create (struct resource_ctx *ctx) if (!(status = calloc (1, sizeof (*status)))) return NULL; status->ctx = ctx; + if (!(status->requests = flux_msglist_create ())) + goto error; if (flux_msg_handler_addvec (ctx->h, htab, status, &status->handlers) < 0) goto error; return status; diff --git a/src/modules/resource/status.h b/src/modules/resource/status.h index a2d3e957d5b8..b1aecd6427e5 100644 --- a/src/modules/resource/status.h +++ b/src/modules/resource/status.h @@ -13,6 +13,7 @@ struct status *status_create (struct resource_ctx *ctx); void status_destroy (struct status *status); +void status_disconnect (struct status *status, const flux_msg_t *msg); #endif /* ! _RESOURCE_STATUS_H */