diff --git a/src/modules/job-manager/queue.c b/src/modules/job-manager/queue.c index 38a6bc457d3e..dd3134203623 100644 --- a/src/modules/job-manager/queue.c +++ b/src/modules/job-manager/queue.c @@ -26,6 +26,8 @@ #include "alloc.h" #include "job-manager.h" +#include "jobtap-internal.h" +#include "jobtap.h" #include "conf.h" #include "restart.h" #include "queue.h" @@ -774,6 +776,171 @@ void queue_destroy (struct queue *queue) } } +/* Test equality of two constraint objects. + * For now, two constraints are equivalent if: + * + * - both are either NULL or empty objects (i.e. size == 0) + * (Note: json_object_size (NULL) == 0) + * + * - json_equal(a, b) returns true + */ +static bool constraints_equal (json_t *c1, json_t *c2) +{ + if ((json_object_size (c1) == 0 && json_object_size (c2) == 0) + || json_equal (c1, c2)) + return true; + return false; +} + +static int constraints_match_check (struct queue *queue, + const char *name, + json_t *constraints, + flux_error_t *errp) +{ + int rc = -1; + json_t *expected = NULL; + struct jobq *q; + + /* Return an error if the job's current queue doesn't exist since we + * can't validate current constraints (This should not happen in normal + * situations). + */ + if (!(q = queue_lookup (queue, name, errp))) + return -1; + + /* If current queue has constraints, then create a constraint object + * for equivalence test below: + */ + if (q->requires + && !(expected = json_pack ("{s:O}", "properties", q->requires))) { + errprintf (errp, "failed to get constraints for current queue"); + goto out; + } + + /* Constraints of current job and queue must match exactly or queue + * update will be rejected. This is because the entire constraints + * object will be overwritten on queue update, and we do not want to + * replace any extra constraints provided on the submission commandline + * (and these likely wouldn't make sense in the new queue anyway) + */ + if (!constraints_equal (constraints, expected)) { + errprintf (errp, + "job appears to have non-queue constraints, " + "unable to update queue to %s", + name); + goto out; + } + rc = 0; +out: + json_decref (expected); + return rc; +} + +static int queue_update_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *arg) +{ + int rc; + struct queue *queue = arg; + flux_job_state_t state; + const char *name; + const char *current_queue = NULL; + json_t *constraints = NULL; + flux_error_t error; + struct jobq *newq; + + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:s s:i s:{s:{s:{s?s s?o}}}}", + "value", &name, + "state", &state, + "jobspec", + "attributes", + "system", + "queue", ¤t_queue, + "constraints", &constraints) < 0) { + flux_jobtap_error (p, args, "plugin args unpack failed"); + return -1; + } + if (state == FLUX_JOB_STATE_RUN + || state == FLUX_JOB_STATE_CLEANUP) { + flux_jobtap_error (p, + args, + "update of queue for running job not supported"); + return -1; + } + if (current_queue && streq (current_queue, name)) { + flux_jobtap_error (p, + args, + "job queue is already set to %s", + name); + return -1; + } + if (!(newq = queue_lookup (queue, name, &error))) { + flux_jobtap_error (p, args, "%s", error.text); + return -1; + } + if (!newq->enable) { + flux_jobtap_error (p, + args, + "queue %s is currently disabled", + name); + return -1; + } + /* Constraints must match current queue exactly since they will be + * overwritten with new queue constraints after queue is updated: + */ + if (constraints_match_check (queue, current_queue, constraints, &error)) { + flux_jobtap_error (p, args, "%s", error.text); + return -1; + } + /* Request the update service do a feasibility check for this update + * and append an additional update of the job constraints. + * + * This is done via two different calls below dependent on whether the + * new queue has any constraints. + */ + if (newq->requires) { + /* Replace current constraints with those of the new queue + */ + rc = flux_plugin_arg_pack (args, + FLUX_PLUGIN_ARG_OUT, + "{s:i s:{s:{s:O}}}", + "feasibility", 1, + "updates", + "attributes.system.constraints", + "properties", newq->requires); + } + else { + /* New queue has no requirements. Set constraints to empty object. + */ + rc = flux_plugin_arg_pack (args, + FLUX_PLUGIN_ARG_OUT, + "{s:i s:{s:{}}}", + "feasibility", 1, + "updates", + "attributes.system.constraints"); + } + /* If either of the above packs failed then return an error: + */ + if (rc < 0) { + flux_jobtap_error (p, + args, + "unable to create jobtap out arguments"); + return -1; + } + return 0; +} + +static int update_queue_plugin_init (flux_plugin_t *p, void *arg) +{ + return flux_plugin_add_handler (p, + "job.update.attributes.system.queue", + queue_update_cb, + arg); +} + struct queue *queue_create (struct job_manager *ctx) { struct queue *queue; @@ -799,6 +966,16 @@ struct queue *queue_create (struct job_manager *ctx) error.text); goto error; } + if (jobtap_register_builtin (ctx->jobtap, + ".update-queue", + update_queue_plugin_init, + queue) < 0 + || !jobtap_load (ctx->jobtap, ".update-queue", NULL, NULL)) { + flux_log (ctx->h, + LOG_ERR, + "Failed to register and load update-queue plugin"); + goto error; + } return queue; error: queue_destroy (queue);