Skip to content

Commit

Permalink
resource: expand resource.status RPC response
Browse files Browse the repository at this point in the history
Problem: 'flux resource status' is very slow when the scheduler
is busy.

Expand the resource.status RPC response taht is provided by the
resource module to include the same info that is returned by the
sched.resource-status RPC.

Resource now makes an RPC to the new job-manager.resource-status RPC.
  • Loading branch information
garlick committed Mar 21, 2024
1 parent 48258d4 commit fa2b011
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 11 deletions.
272 changes: 261 additions & 11 deletions src/modules/resource/status.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,277 @@

#include "src/common/libutil/errprintf.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/librlist/rlist.h"

struct status {
struct resource_ctx *ctx;
flux_msg_handler_t **handlers;
struct flux_msglist *requests;
};

static json_t *prepare_response (struct status *status)
/* Mark the ranks in 'ids' DOWN in the resource set 'rl'.
*/
static int mark_down (struct rlist *rl, const struct idset *ids)
{
if (ids) {
char *s;

if (!(s = idset_encode (ids, IDSET_FLAG_RANGE)))
return -1;
if (rlist_mark_down (rl, s) < 0) {
free (s);
errno = EINVAL;
return -1;
}
free (s);
}
return 0;
}

/* Mark the resources in 'o' (an Rv1 resource object) ALLOCATED
* in the resource set 'rl'.
*/
static int mark_allocated (struct rlist *rl, json_t *o)
{
if (o && !json_is_null (o)) {
struct rlist *r;

if (!(r = rlist_from_json (o, NULL))) {
errno = EINVAL;
return -1;
}
if (rlist_set_allocated (rl, r) < 0) {
rlist_destroy (r);
errno = EINVAL;
return -1;
}
rlist_destroy (r);
}
return 0;
}

/* Get an Rv1 resource object that includes all resources.
*/
static json_t *get_all (struct rlist *rl)
{
json_t *o;
struct rlist *r;

if (!(r = rlist_copy_empty (rl))
|| rlist_mark_up (r, "all") < 0
|| !(o = rlist_to_R (r)))
goto error;
rlist_destroy (r);
return o;
error:
rlist_destroy (r);
return NULL;
}

/* Get an Rv1 resource object that includes only DOWN resources.
*/
static json_t *get_down (struct rlist *rl)
{
json_t *o;
struct rlist *r;

if (!(r = rlist_copy_down (rl))
|| !(o = rlist_to_R (r)))
goto error;
rlist_destroy (r);
return o;
error:
rlist_destroy (r);
return NULL;
}

/* Get an Rv1 resource object that includes only ALLOCATED resources.
*/
static json_t *get_allocated (struct rlist *rl)
{
json_t *o;
struct rlist *r;

if (!(r = rlist_copy_allocated (rl))
|| !(o = rlist_to_R (r)))
goto error;
rlist_destroy (r);
return o;
error:
rlist_destroy (r);
return NULL;
}

/* Given the "all" resource set 'rl', set the "all", "down", and
* "allocated" keys in 'obj' to Rv1 resource objects.
*/
static int set_resource_status (json_t *obj, struct rlist *rl)
{
json_t *o;

if (!(o = get_all (rl))
|| json_object_set_new (obj, "all", o) < 0) {
json_decref (o);
goto error;
}
if (!(o = get_down (rl))
|| json_object_set_new (obj, "down", o) < 0) {
json_decref (o);
goto error;
}
if (!(o = get_allocated (rl))
|| json_object_set_new (obj, "allocated", o) < 0) {
json_decref (o);
goto error;
}
return 0;
error:
errno = ENOMEM;
return -1;
}

/* Create an rlist object from R. Omit the scheduling key. Then:
* - exclude the ranks in 'exclude' (if non-NULL)
* - mark down the ranks in 'down' and/or 'drain' (if non-NULL)
* - mark allocated the resources in 'allocated' (if non-NULL and not json NULL)
*/
static struct rlist *get_resource (const json_t *R,
const struct idset *exclude,
const struct idset *down,
struct idset *drain,
json_t *allocated)
{
json_t *cpy;
struct rlist *rl;

if (!(cpy = json_copy ((json_t *)R))) { // thin copy - to del top level key
errno = ENOMEM;
return NULL;
}
(void)json_object_del (cpy, "scheduling");

if (!(rl = rlist_from_json (cpy, NULL)))
goto error;

if (exclude) {
if (rlist_remove_ranks (rl, (struct idset *)exclude) < 0)
goto error;
}
if (mark_down (rl, down) < 0
|| mark_down (rl, drain) < 0
|| mark_allocated (rl, allocated) < 0)
goto error;
json_decref (cpy);
return rl;
error:
json_decref (cpy);
rlist_destroy (rl);
errno = EINVAL;
return NULL;
}

static json_t *prepare_response (struct status *status, json_t *allocated)
{
struct resource_ctx *ctx = status->ctx;
const struct idset *down = monitor_get_down (ctx->monitor);
const struct idset *exclude = exclude_get (ctx->exclude);
const struct idset *down = monitor_get_down (ctx->monitor);
struct idset *drain = drain_get (ctx->drain);
const json_t *R;
json_t *o = NULL;
json_t *drain_info = NULL;
struct rlist *rl = NULL;

if (!(R = inventory_get (ctx->inventory))
|| !(drain_info = drain_get_info (ctx->drain)))
|| !(drain_info = drain_get_info (ctx->drain))
|| !(rl = get_resource (R, exclude, down, drain, allocated)))
goto error;
if (!(o = json_pack ("{s:O s:O}", "R", R, "drain", drain_info))) {
errno = ENOMEM;
goto error;
}
if (rutil_set_json_idset (o, "online", monitor_get_up (ctx->monitor)) < 0
|| rutil_set_json_idset (o, "offline", down) < 0
|| rutil_set_json_idset (o, "exclude", exclude) < 0)
|| rutil_set_json_idset (o, "exclude", exclude) < 0
|| set_resource_status (o, rl) < 0)
goto error;

json_decref (drain_info);
idset_destroy (drain);
rlist_destroy (rl);
return o;
error:
ERRNO_SAFE_WRAP (json_decref, o);
ERRNO_SAFE_WRAP (json_decref, drain_info);
idset_destroy (drain);
rlist_destroy (rl);
return NULL;
}

static void remove_request (struct flux_msglist *ml, const flux_msg_t *msg)
{
const flux_msg_t *m;

m = flux_msglist_first (ml);
while (m) {
if (m == msg) {
flux_msglist_delete (ml); // delete @cursor
break;
}
m = flux_msglist_next (ml);
}
}

/* The job-manager.resource-status RPC has completed. Finish handling
* the resource.status request.
* N.B. treat ENOSYS from job-manager.resource-status as the empty set.
* This could happen IRL because the resource module loads before job-manager.
* Also note that both the future and the message are unreferenced/destroyed
* when msg is removed from the status->requests list.
*/
static void job_manager_continuation (flux_future_t *f, void *arg)
{
const flux_msg_t *msg = flux_future_aux_get (f, "flux::request");
struct status *status = arg;
flux_t *h = status->ctx->h;
flux_error_t error;
json_t *allocated = NULL;
json_t *o = NULL;

if (flux_rpc_get_unpack (f, "{s:o}", "allocated", &allocated) < 0
&& errno != ENOSYS) {
errprintf (&error,
"job-manager.resource-status request failed: %s",
future_strerror (f, errno));
goto error;
}
if (!(o = prepare_response (status, allocated))) {
errprintf (&error, "error preparing response: %s", strerror (errno));
goto error;
}
if (flux_respond_pack (h, msg, "O", o) < 0)
flux_log_error (h, "error responding to resource.status request");
json_decref (o);
remove_request (status->requests, msg);
return;
error:
if (flux_respond_error (h, msg, errno, error.text) < 0)
flux_log_error (h, "error responding to resource.status request");
json_decref (o);
remove_request (status->requests, msg);
}

/* To answer this query, an RPC must be sent to the job manager to get
* the set of allocated resources. Get that started, then place the request
* on status->requests and continue answering in the RPC continuation.
* The rest of the information required is local.
*/
static void status_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct status *status = arg;
json_t *o = NULL;
flux_future_t *f;
flux_error_t error;

if (flux_request_decode (msg, NULL, NULL) < 0) {
Expand All @@ -76,18 +306,35 @@ static void status_cb (flux_t *h,
errno = EPROTO;
goto error;
}
if (!(o = prepare_response (status))) {
errprintf (&error, "error preparing response: %s", strerror (errno));
if (!(f = flux_rpc (h, "job-manager.resource-status", NULL, 0, 0))
|| flux_future_then (f, -1, job_manager_continuation, status) < 0
|| flux_future_aux_set (f, "flux::request", (void *)msg, NULL) < 0
|| flux_msg_aux_set (msg,
NULL,
f,
(flux_free_f)flux_future_destroy) < 0) {
errprintf (&error,
"error sending job-manager.resource-status request: %s",
strerror (errno));
flux_future_destroy (f);
goto error;
}
if (flux_msglist_append (status->requests, msg) < 0) {
errprintf (&error, "error saving request mesg: %s", strerror (errno));
goto error;
}
if (flux_respond_pack (h, msg, "O", o) < 0)
flux_log_error (h, "error responding to resource.status request");
json_decref (o);
return;
error:
if (flux_respond_error (h, msg, errno, error.text) < 0)
flux_log_error (h, "error responding to resource.status request");
json_decref (o);
}

/* Disconnect hook called from resource module's main disconnect
* message handler.
*/
void status_disconnect (struct status *status, const flux_msg_t *msg)
{
(void)flux_msglist_disconnect (status->requests, msg);
}

static const struct flux_msg_handler_spec htab[] = {
Expand All @@ -105,6 +352,7 @@ void status_destroy (struct status *status)
if (status) {
int saved_errno = errno;
flux_msg_handler_delvec (status->handlers);
flux_msglist_destroy (status->requests);
free (status);
errno = saved_errno;
}
Expand All @@ -117,6 +365,8 @@ struct status *status_create (struct resource_ctx *ctx)
if (!(status = calloc (1, sizeof (*status))))
return NULL;
status->ctx = ctx;
if (!(status->requests = flux_msglist_create ()))
goto error;
if (flux_msg_handler_addvec (ctx->h, htab, status, &status->handlers) < 0)
goto error;
return status;
Expand Down
1 change: 1 addition & 0 deletions src/modules/resource/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

struct status *status_create (struct resource_ctx *ctx);
void status_destroy (struct status *status);
void status_disconnect (struct status *status, const flux_msg_t *msg);

#endif /* ! _RESOURCE_STATUS_H */

Expand Down

0 comments on commit fa2b011

Please sign in to comment.