Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support partially allocated jobs across scheduler reload #6445

Merged
merged 6 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ libflux_idset_la_LDFLAGS = \
libflux_schedutil_la_SOURCES =
libflux_schedutil_la_LIBADD = \
$(builddir)/libschedutil/libschedutil.la \
$(builddir)/libczmqcontainers/libczmqcontainers.la \
$(builddir)/librlist/librlist.la \
libflux-internal.la \
libflux-core.la \
$(JANSSON_LIBS)
libflux_schedutil_la_LDFLAGS = \
Expand Down
48 changes: 42 additions & 6 deletions src/common/libschedutil/hello.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include <jansson.h>

#include "src/common/libjob/idf58.h"
#include "src/common/librlist/rlist.h"
#include "src/common/libutil/errno_safe.h"

#include "schedutil_private.h"
#include "init.h"
#include "hello.h"
Expand All @@ -39,15 +42,40 @@
flux_future_destroy (f);
}

static const char *create_partial_R (const flux_msg_t *msg,
const char *R_orig,
const char *free_ranks)
{
struct idset *ids;
struct rlist *rl = NULL;
char *R_new = NULL;

if (!(ids = idset_decode (free_ranks))
|| !(rl = rlist_from_R (R_orig))
|| rlist_remove_ranks (rl, ids) < 0
|| !(R_new = rlist_encode (rl))
|| flux_msg_aux_set (msg, NULL, R_new, (flux_free_f)free) < 0) {
ERRNO_SAFE_WRAP (free, R_new);
R_new = NULL;

Check warning on line 59 in src/common/libschedutil/hello.c

View check run for this annotation

Codecov / codecov/patch

src/common/libschedutil/hello.c#L58-L59

Added lines #L58 - L59 were not covered by tests
}
rlist_destroy (rl);
idset_destroy (ids);
return R_new;
}

static int schedutil_hello_job (schedutil_t *util,
const flux_msg_t *msg)
{
char key[64];
flux_future_t *f = NULL;
const char *R;
flux_jobid_t id;
const char *free_ranks = NULL;

if (flux_msg_unpack (msg, "{s:I}", "id", &id) < 0)
if (flux_msg_unpack (msg,
"{s:I s?s}",
"id", &id,
"free", &free_ranks) < 0)
goto error;
if (flux_job_kvs_key (key, sizeof (key), id, "R") < 0) {
errno = EPROTO;
Expand All @@ -57,6 +85,10 @@
goto error;
if (flux_kvs_lookup_get (f, &R) < 0)
goto error;
if (free_ranks) {
if (!(R = create_partial_R (msg, R, free_ranks)))
goto error;

Check warning on line 90 in src/common/libschedutil/hello.c

View check run for this annotation

Codecov / codecov/patch

src/common/libschedutil/hello.c#L90

Added line #L90 was not covered by tests
}
if (util->ops->hello (util->h,
msg,
R,
Expand All @@ -78,16 +110,20 @@
{
flux_future_t *f;
int rc = -1;
int partial_ok = 0;

if (!util || !util->ops->hello) {
errno = EINVAL;
return -1;
}
if (!(f = flux_rpc (util->h,
"job-manager.sched-hello",
NULL,
FLUX_NODEID_ANY,
FLUX_RPC_STREAMING)))
if ((util->flags & SCHEDUTIL_HELLO_PARTIAL_OK))
partial_ok = 1;
if (!(f = flux_rpc_pack (util->h,
"job-manager.sched-hello",
FLUX_NODEID_ANY,
FLUX_RPC_STREAMING,
"{s:b}",
"partial-ok", partial_ok)))
return -1;
while (1) {
const flux_msg_t *msg;
Expand Down
1 change: 1 addition & 0 deletions src/common/libschedutil/init.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef struct schedutil_ctx schedutil_t;

enum schedutil_flags {
SCHEDUTIL_FREE_NOLOOKUP = 1, // now the default so this flag is ignored
SCHEDUTIL_HELLO_PARTIAL_OK = 2,
};

/* Create a handle for the schedutil convenience library.
Expand Down
11 changes: 8 additions & 3 deletions src/modules/job-manager/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,22 @@
{
struct job_manager *ctx = arg;
struct job *job;
int partial_ok = 0;

/* N.B. no "state" is set in struct alloc after a hello msg, so do
* not set ctx->alloc->sched_sender in here. Do so only in the
* ready callback */
if (flux_request_decode (msg, NULL, NULL) < 0)
if (flux_request_unpack (msg, NULL, "{s?b}", "partial-ok", &partial_ok) < 0
&& flux_request_decode (msg, NULL, NULL) < 0)

Check warning on line 318 in src/modules/job-manager/alloc.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/alloc.c#L318

Added line #L318 was not covered by tests
goto error;
if (!flux_msg_is_streaming (msg)) {
errno = EPROTO;
goto error;
}
flux_log (h, LOG_DEBUG, "scheduler: hello");
flux_log (h,
LOG_DEBUG,
"scheduler: hello%s",
partial_ok ? " +partial-ok" : "");
job = zhashx_first (ctx->active_jobs);
while (job) {
if (job->has_resources && !job->alloc_bypass) {
Expand All @@ -334,7 +339,7 @@
}
job = zhashx_next (ctx->active_jobs);
}
if (housekeeping_hello_respond (ctx->housekeeping, msg) < 0)
if (housekeeping_hello_respond (ctx->housekeeping, msg, partial_ok) < 0)
goto error;
if (flux_respond_error (h, msg, ENODATA, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
Expand Down
73 changes: 53 additions & 20 deletions src/modules/job-manager/housekeeping.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
#include <flux/idset.h>
#include <unistd.h>
#include <signal.h>
#include <jansson.h>
#ifdef HAVE_ARGZ_ADD
#include <argz.h>
#else
Expand Down Expand Up @@ -110,11 +111,11 @@
flux_jobid_t id;
struct rlist *rl; // R, diminished each time a subset is released
struct idset *pending; // ranks in need of housekeeping
struct idset *free; // ranks that have been released to the scheduler
struct housekeeping *hk;
flux_watcher_t *timer;
bool timer_armed;
bool timer_expired;
int free_count; // number of releases
double t_start;
struct bulk_exec *bulk_exec;
void *list_handle;
Expand Down Expand Up @@ -142,6 +143,7 @@
int saved_errno = errno;
rlist_destroy (a->rl);
idset_destroy (a->pending);
idset_destroy (a->free);
flux_watcher_destroy (a->timer);
bulk_exec_destroy (a->bulk_exec);
free (a);
Expand Down Expand Up @@ -182,6 +184,7 @@
a->t_start = flux_reactor_now (flux_get_reactor (hk->ctx->h));
if (!(a->rl = rlist_from_json (R, NULL))
|| !(a->pending = rlist_ranks (a->rl))
|| !(a->free = idset_create (idset_universe_size (a->pending), 0))
|| !(a->timer = flux_timer_watcher_create (r,
0,
0.,
Expand Down Expand Up @@ -242,7 +245,8 @@
|| !(rl = rlist_copy_ranks (a->rl, ranks))
|| !(R = rlist_to_R (rl))
|| alloc_send_free_request (ctx->alloc, R, a->id, final) < 0
|| rlist_remove_ranks (a->rl, ranks) < 0) {
|| rlist_remove_ranks (a->rl, ranks) < 0
|| idset_add (a->free, ranks) < 0) {
char *s = idset_encode (ranks, IDSET_FLAG_RANGE);
flux_log (ctx->h,
LOG_ERR,
Expand All @@ -251,8 +255,6 @@
s ? s : "NULL");
free (s);
}
else
a->free_count++;
json_decref (R);
rlist_destroy (rl);
idset_destroy (ranks);
Expand Down Expand Up @@ -465,36 +467,61 @@
return alloc_send_free_request (hk->ctx->alloc, R, id, true);
}

static int set_idset_string (json_t *obj, const char *key, struct idset *ids)
{
char *s;
json_t *o = NULL;

if (!(s = idset_encode (ids, IDSET_FLAG_RANGE))
|| !(o = json_string (s))
|| json_object_set_new (obj, key, o) < 0) {
json_decref (o);
free (s);
return -1;

Check warning on line 480 in src/modules/job-manager/housekeeping.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/housekeeping.c#L478-L480

Added lines #L478 - L480 were not covered by tests
}
free (s);
return 0;
}

static int housekeeping_hello_respond_one (struct housekeeping *hk,
const flux_msg_t *msg,
struct allocation *a,
bool partial_ok,
flux_error_t *error)
{
struct job *job;
json_t *payload = NULL;

if (a->free_count > 0) {
errprintf (error, "partial release is not supported by RFC 27 hello");
goto error;
if (!idset_empty (a->free) && !partial_ok) {
errprintf (error,
"scheduler does not support restart with partially"
" released resources");
return -1;
}
if (!(job = zhashx_lookup (hk->ctx->inactive_jobs, &a->id))
&& !(job = zhashx_lookup (hk->ctx->active_jobs, &a->id))) {
errprintf (error, "the job could not be looked up during RFC 27 hello");
goto error;
return -1;

Check warning on line 504 in src/modules/job-manager/housekeeping.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/housekeeping.c#L504

Added line #L504 was not covered by tests
}
if (flux_respond_pack (hk->ctx->h,
msg,
"{s:I s:I s:I s:f}",
"id", job->id,
"priority", job->priority,
"userid", (json_int_t)job->userid,
"t_submit", job->t_submit) < 0) {
errprintf (error,
"the RFC 27 hello response could not be sent: %s",
strerror (errno));
if (!(payload = json_pack ("{s:I s:I s:I s:f}",
"id", job->id,
"priority", job->priority,
"userid", (json_int_t)job->userid,
"t_submit", job->t_submit)))
goto error;
if (!idset_empty (a->free)) {
if (set_idset_string (payload, "free", a->free) < 0)
goto error;

Check warning on line 514 in src/modules/job-manager/housekeeping.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/housekeeping.c#L514

Added line #L514 was not covered by tests
}
if (flux_respond_pack (hk->ctx->h, msg, "O", payload) < 0)
goto error;

Check warning on line 517 in src/modules/job-manager/housekeeping.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/housekeeping.c#L517

Added line #L517 was not covered by tests
json_decref (payload);
return 0;
error:
errprintf (error,

Check warning on line 521 in src/modules/job-manager/housekeeping.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/housekeeping.c#L521

Added line #L521 was not covered by tests
"failed to send scheduler HELLO handshake: %s",
strerror (errno));
json_decref (payload);

Check warning on line 524 in src/modules/job-manager/housekeeping.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/housekeeping.c#L523-L524

Added lines #L523 - L524 were not covered by tests
return -1;
}

Expand All @@ -513,14 +540,20 @@
* allocations. Send remaining housekeeping tasks a SIGTERM, log an error,
* and delete the allocation.
*/
int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg)
int housekeeping_hello_respond (struct housekeeping *hk,
const flux_msg_t *msg,
bool partial_ok)
{
struct allocation *a;
flux_error_t error;

a = zlistx_first (hk->allocations);
while (a) {
if (housekeeping_hello_respond_one (hk, msg, a, &error) < 0) {
if (housekeeping_hello_respond_one (hk,
msg,
a,
partial_ok,
&error) < 0) {
char *ranks;
char *hosts = NULL;
flux_future_t *f;
Expand Down
4 changes: 3 additions & 1 deletion src/modules/job-manager/housekeeping.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ int housekeeping_start (struct housekeeping *hk,
* It should inform the scheduler about resources that are still allocated,
* but no longer directly held by jobs.
*/
int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg);
int housekeeping_hello_respond (struct housekeeping *hk,
const flux_msg_t *msg,
bool partial_ok);

json_t *housekeeping_get_stats (struct housekeeping *hk);

Expand Down
19 changes: 14 additions & 5 deletions src/modules/sched-simple/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ static struct simple_sched * simple_sched_create (void)
* concurrency being excessively large.
*/
ss->alloc_limit = 8;

ss->schedutil_flags = SCHEDUTIL_HELLO_PARTIAL_OK;
return ss;
}

Expand Down Expand Up @@ -544,24 +546,28 @@ static int hello_cb (flux_t *h,
unsigned int priority;
uint32_t userid;
double t_submit;
const char *free_ranks = NULL;

if (flux_msg_unpack (msg,
"{s:I s:i s:i s:f}",
"{s:I s:i s:i s:f s?s}",
"id", &id,
"priority", &priority,
"userid", &userid,
"t_submit", &t_submit) < 0) {
"t_submit", &t_submit,
"free", &free_ranks) < 0) {
flux_log_error (h, "hello: invalid hello payload");
return -1;
}

flux_log (h,
LOG_DEBUG,
"hello: id=%s priority=%u userid=%u t_submit=%0.1f",
"hello: id=%s priority=%u userid=%u t_submit=%0.1f %s%s",
idf58 (id),
priority,
(unsigned int)userid,
t_submit);
t_submit,
free_ranks ? "free=" : "",
free_ranks ? free_ranks : "");

alloc = rlist_from_R (R);
if (!alloc) {
Expand All @@ -570,7 +576,7 @@ static int hello_cb (flux_t *h,
}
s = rlist_dumps (alloc);
if ((rc = rlist_set_allocated (ss->rlist, alloc)) < 0)
flux_log_error (h, "hello: rlist_remove (%s)", s);
flux_log_error (h, "hello: alloc %s", s);
else
flux_log (h, LOG_DEBUG, "hello: alloc %s", s);
free (s);
Expand Down Expand Up @@ -958,6 +964,9 @@ static int process_args (flux_t *h, struct simple_sched *ss,
else if (streq (argv[i], "test-free-nolookup")) {
ss->schedutil_flags |= SCHEDUTIL_FREE_NOLOOKUP;
}
else if (streq (argv[i], "test-hello-nopartial")) {
ss->schedutil_flags &= ~SCHEDUTIL_HELLO_PARTIAL_OK;
}
else {
flux_log_error (h, "Unknown module option: '%s'", argv[i]);
errno = EINVAL;
Expand Down
Loading
Loading