Skip to content

Commit

Permalink
job-manager: add builtin plugin to enable queue updates
Browse files Browse the repository at this point in the history
Problem: There is no way to move a pending job to a different queue.

Add a new .update-queue jobtap plugin from within job-manager/queue.c
which allows updates to the queue for pending jobs.

This plugin is registered from within the queue component itself to
give the update callback access to the `struct queue` since there is
no external access to the queue states without sending an RPC to the
back to the job manager itself.

The plugin validates the following before allowing a queue update:

 - The queue exists

 - The queue is currently enabled

 - Current job constraints exactly match the existing queue. This is
   required because the update-queue must replace existing constraints
   with the configured constraints of the new queue, and since
   existing queue constraints may be arbitrarily mixed with other
   job constraints, it is easiest for now to just require no extra
   constraints for a queue update (this could be improved in the
   future).

If the above are all true then the queue update is allowed, the plugin
adds the new queue constraints to the proposed updates, and requests
that feasibility of the updated job be checked.
  • Loading branch information
grondo committed Sep 20, 2023
1 parent 9cfbfb0 commit 13d7a3f
Showing 1 changed file with 177 additions and 0 deletions.
177 changes: 177 additions & 0 deletions src/modules/job-manager/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#include "alloc.h"
#include "job-manager.h"
#include "jobtap-internal.h"
#include "jobtap.h"
#include "conf.h"
#include "restart.h"
#include "queue.h"
Expand Down Expand Up @@ -774,6 +776,171 @@ void queue_destroy (struct queue *queue)
}
}

/* Test equality of two constraint objects.
* For now, two constraints are equivalent if:
*
* - both are either NULL or empty objects (i.e. size == 0)
* (Note: json_object_size (NULL) == 0)
*
* - json_equal(a, b) returns true
*/
static bool constraints_equal (json_t *c1, json_t *c2)
{
if ((json_object_size (c1) == 0 && json_object_size (c2) == 0)
|| json_equal (c1, c2))
return true;
return false;
}

static int constraints_match_check (struct queue *queue,
const char *name,
json_t *constraints,
flux_error_t *errp)
{
int rc = -1;
json_t *expected = NULL;
struct jobq *q;

/* Return an error if the job's current queue doesn't exist since we
* can't validate current constraints (This should not happen in normal
* situations).
*/
if (!(q = queue_lookup (queue, name, errp)))
return -1;

/* If current queue has constraints, then create a constraint object
* for equivalence test below:
*/
if (q->requires
&& !(expected = json_pack ("{s:O}", "properties", q->requires))) {
errprintf (errp, "failed to get constraints for current queue");
goto out;
}

/* Constraints of current job and queue must match exactly or queue
* update will be rejected. This is because the entire constraints
* object will be overwritten on queue update, and we do not want to
* replace any extra constraints provided on the submission commandline
* (and these likely wouldn't make sense in the new queue anyway)
*/
if (!constraints_equal (constraints, expected)) {
errprintf (errp,
"job appears to have non-queue constraints, "
"unable to update queue to %s",
name);
goto out;
}
rc = 0;
out:
json_decref (expected);
return rc;
}

static int queue_update_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
int rc;
struct queue *queue = arg;
flux_job_state_t state;
const char *name;
const char *current_queue = NULL;
json_t *constraints = NULL;
flux_error_t error;
struct jobq *newq;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:s s:i s:{s:{s:{s?s s?o}}}}",
"value", &name,
"state", &state,
"jobspec",
"attributes",
"system",
"queue", &current_queue,
"constraints", &constraints) < 0) {
flux_jobtap_error (p, args, "plugin args unpack failed");
return -1;
}
if (state == FLUX_JOB_STATE_RUN
|| state == FLUX_JOB_STATE_CLEANUP) {
flux_jobtap_error (p,
args,
"update of queue for running job not supported");
return -1;
}
if (current_queue && streq (current_queue, name)) {
flux_jobtap_error (p,
args,
"job queue is already set to %s",
name);
return -1;
}
if (!(newq = queue_lookup (queue, name, &error))) {
flux_jobtap_error (p, args, "%s", error.text);
return -1;
}
if (!newq->enable) {
flux_jobtap_error (p,
args,
"queue %s is currently disabled",
name);
return -1;
}
/* Constraints must match current queue exactly since they will be
* overwritten with new queue constraints after queue is updated:
*/
if (constraints_match_check (queue, current_queue, constraints, &error)) {
flux_jobtap_error (p, args, "%s", error.text);
return -1;
}
/* Request the update service do a feasibility check for this update
* and append an additional update of the job constraints.
*
* This is done via two different calls below dependent on whether the
* new queue has any constraints.
*/
if (newq->requires) {
/* Replace current constraints with those of the new queue
*/
rc = flux_plugin_arg_pack (args,
FLUX_PLUGIN_ARG_OUT,
"{s:i s:{s:{s:O}}}",
"feasibility", 1,
"updates",
"attributes.system.constraints",
"properties", newq->requires);
}
else {
/* New queue has no requirements. Set constraints to empty object.
*/
rc = flux_plugin_arg_pack (args,
FLUX_PLUGIN_ARG_OUT,
"{s:i s:{s:{}}}",
"feasibility", 1,
"updates",
"attributes.system.constraints");
}
/* If either of the above packs failed then return an error:
*/
if (rc < 0) {
flux_jobtap_error (p,
args,
"unable to create jobtap out arguments");
return -1;
}
return 0;
}

static int update_queue_plugin_init (flux_plugin_t *p, void *arg)
{
return flux_plugin_add_handler (p,
"job.update.attributes.system.queue",
queue_update_cb,
arg);
}

struct queue *queue_create (struct job_manager *ctx)
{
struct queue *queue;
Expand All @@ -799,6 +966,16 @@ struct queue *queue_create (struct job_manager *ctx)
error.text);
goto error;
}
if (jobtap_register_builtin (ctx->jobtap,
".update-queue",
update_queue_plugin_init,
queue) < 0
|| !jobtap_load (ctx->jobtap, ".update-queue", NULL, NULL)) {
flux_log (ctx->h,
LOG_ERR,
"Failed to register and load update-queue plugin");
goto error;
}
return queue;
error:
queue_destroy (queue);
Expand Down

0 comments on commit 13d7a3f

Please sign in to comment.