From 9f2a33873d8c7ad200a1ece7a1194333b155fc30 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sat, 20 Mar 2021 09:21:13 -0700 Subject: [PATCH] job-manager: add fast path for simple owner jobs Problem: for some simple high throughput use cases, the python front end tools, separate ingest module with jobspec validation, and job-info KVS proxy adds unnecessary latency. Add a job-manager.runjob RPC that accepts jobspec, bypassing the ingest module, generating a job ID in situ, adding the FLUX_JOB_WAITABLE flag, and kicking off the job state machine. The original request is responded to by the wait logic once the job has completed. This service is only available to the instance owner, since it bypasses the ingest jobspec validator. --- src/modules/job-manager/Makefile.am | 2 + src/modules/job-manager/job-manager.c | 12 ++ src/modules/job-manager/job-manager.h | 1 + src/modules/job-manager/runjob.c | 190 ++++++++++++++++++++++++++ src/modules/job-manager/runjob.h | 26 ++++ 5 files changed, 231 insertions(+) create mode 100644 src/modules/job-manager/runjob.c create mode 100644 src/modules/job-manager/runjob.h diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index 4bb1a8c3b7cf..f4aced8e93aa 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -58,6 +58,8 @@ libjob_manager_la_SOURCES = \ getattr.c \ prioritize.h \ prioritize.c \ + runjob.h \ + runjob.c \ jobtap-internal.h \ jobtap.h \ jobtap.c \ diff --git a/src/modules/job-manager/job-manager.c b/src/modules/job-manager/job-manager.c index e4d0c83d7123..924300a9e42a 100644 --- a/src/modules/job-manager/job-manager.c +++ b/src/modules/job-manager/job-manager.c @@ -31,6 +31,7 @@ #include "annotate.h" #include "journal.h" #include "getattr.h" +#include "runjob.h" #include "jobtap-internal.h" #include "job-manager.h" @@ -113,6 +114,12 @@ static const struct flux_msg_handler_spec htab[] = { getinfo_handle_request, FLUX_ROLE_USER }, + { + FLUX_MSGTYPE_REQUEST, + "job-manager.runjob", + runjob_handler, + FLUX_ROLE_OWNER, + }, { FLUX_MSGTYPE_REQUEST, "job-manager.jobtap", @@ -202,6 +209,10 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "restart_from_kvs"); goto done; } + if (!(ctx.runjob = runjob_ctx_create (&ctx))) { // uses max_jobid + flux_log_error (h, "error creating runjob interface"); // from restart + goto done; + } if (flux_reactor_run (r, 0) < 0) { flux_log_error (h, "flux_reactor_run"); goto done; @@ -223,6 +234,7 @@ int mod_main (flux_t *h, int argc, char **argv) alloc_ctx_destroy (ctx.alloc); submit_ctx_destroy (ctx.submit); event_ctx_destroy (ctx.event); + runjob_ctx_destroy (ctx.runjob); jobtap_destroy (ctx.jobtap); zhashx_destroy (&ctx.active_jobs); return rc; diff --git a/src/modules/job-manager/job-manager.h b/src/modules/job-manager/job-manager.h index 0a457de7e25d..d53a3a1790ad 100644 --- a/src/modules/job-manager/job-manager.h +++ b/src/modules/job-manager/job-manager.h @@ -29,6 +29,7 @@ struct job_manager { struct kill *kill; struct annotate *annotate; struct journal *journal; + struct runjob *runjob; struct jobtap *jobtap; }; diff --git a/src/modules/job-manager/runjob.c b/src/modules/job-manager/runjob.c new file mode 100644 index 000000000000..0258fc74142d --- /dev/null +++ b/src/modules/job-manager/runjob.c @@ -0,0 +1,190 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* runjob.c - fastpath for running a job in one RPC + * + * The RPC returns when the job is inactive and includes the wait status. + * + * Access is restricted to instance owner only. + * Job ID is issued here, instead of in job-ingest. + * Jobspec validation is bypassed. + * Jobspec is not signed, nor is signed jobspec stored to KVS. + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include + +#include "src/common/libutil/fluid.h" +#include "src/common/libutil/jpath.h" +#include "src/common/libutil/errno_safe.h" + +#include "job.h" +#include "wait.h" +#include "event.h" +#include "runjob.h" + +struct runjob { + struct job_manager *ctx; + struct fluid_generator fluid_gen; +}; + +static void jobspec_continuation (flux_future_t *f, void *arg) +{ + struct runjob *runjob = arg; + flux_t *h = flux_future_get_flux (f); + struct job *job = flux_future_aux_get (f, "job"); + const char *errstr = NULL; + + if (flux_rpc_get (f, NULL) < 0) { + errstr = "eventlog commit failed"; + goto error; + } + if (event_job_post_pack (runjob->ctx->event, + job, + "submit", + 0, + "{s:i s:i s:i}", + "userid", job->userid, + "urgency", job->urgency, + "flags", job->flags) < 0) { + errstr = "error posting submit event"; + goto error; + } + wait_notify_active (runjob->ctx->wait, job); + job_aux_delete (job, f); + return; +error: + if (flux_respond_error (h, job->waiter, errno, errstr) < 0) + flux_log_error (h, "error responding to runjob"); + zhashx_delete (runjob->ctx->active_jobs, &job->id); +} + +static flux_future_t *commit_jobspec (flux_t *h, struct job *job) +{ + char key[64]; + flux_future_t *f = NULL; + flux_kvs_txn_t *txn; + + if (flux_job_kvs_key (key, sizeof (key), job->id, "jobspec") < 0) + return NULL; + if (!(txn = flux_kvs_txn_create ())) + return NULL; + if (flux_kvs_txn_pack (txn, 0, key, "O", job->jobspec_redacted) < 0) + goto error; + if (!(f = flux_kvs_commit (h, NULL, 0, txn))) + goto error; + flux_kvs_txn_destroy (txn); + return f; +error: + flux_future_destroy (f); + flux_kvs_txn_destroy (txn); + return NULL; +} + +void runjob_handler (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct job_manager *ctx = arg; + struct runjob *runjob = ctx->runjob; + json_t *jobspec; + const char *errstr = NULL; + struct job *job = NULL; + flux_future_t *f; + + if (flux_request_unpack (msg, + NULL, + "{s:o}", + "jobspec", &jobspec) < 0) { + errstr = "malformed runjob request"; + goto error; + } + if (!(job = job_create ())) + goto error; + /* The runjob request will be handled as if it were a 'wait' request + * for this job. Code in wait.c responds to the request once the job + * becomes inactive. + */ + job->flags = FLUX_JOB_WAITABLE; + job->waiter = flux_msg_incref (msg); + + if (flux_msg_get_userid (msg, &job->userid) < 0) + goto error; + if (fluid_generate (&runjob->fluid_gen, &job->id) < 0) { + errstr = "error generating job id"; + errno = EINVAL; + goto error; + } + /* The redacted jobspec is not actually redacted until it (in full) + * becomes part of the KVS transaction below. + */ + job->jobspec_redacted = json_incref (jobspec); + /* Start KVS commit of jobspec. + * If the commit is successful, its continuation posts the submit event + * which kicks the job state machine. + * N.B. future 'f' destruction is tied to 'job', not the other way around + */ + if (!(f = commit_jobspec (h, job)) + || flux_future_aux_set (f, "job", job, NULL) < 0 + || flux_future_then (f, -1, jobspec_continuation, runjob) < 0 + || job_aux_set (job, NULL, f, (flux_free_f)flux_future_destroy) < 0) { + flux_future_destroy (f); + errstr = "error committing jobspec to KVS"; + goto error; + } + if (jpath_del (job->jobspec_redacted, "attributes.system.environment") < 0) + goto error; + + zhashx_update (ctx->active_jobs, &job->id, job); // increfs job + job_decref (job); + return; +error: + if (flux_respond_error (h, msg, errno, errstr) < 0) + flux_log_error (h, "error responding to runjob"); + job_decref (job); +} + +void runjob_ctx_destroy (struct runjob *runjob) +{ + if (runjob) { + int saved_errno = errno; + free (runjob); + errno = saved_errno; + } +} + +struct runjob *runjob_ctx_create (struct job_manager *ctx) +{ + struct runjob *runjob; + + if (!(runjob = calloc (1, sizeof (*runjob)))) + return NULL; + runjob->ctx = ctx; + if (fluid_init (&runjob->fluid_gen, + 16383, // reserved by job-ingest + fluid_get_timestamp (ctx->max_jobid)) < 0) { + flux_log (ctx->h, LOG_ERR, "fluid_init failed"); + goto error; + } + return runjob; +error: + runjob_ctx_destroy (runjob); + return NULL; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/job-manager/runjob.h b/src/modules/job-manager/runjob.h new file mode 100644 index 000000000000..f698045aaf10 --- /dev/null +++ b/src/modules/job-manager/runjob.h @@ -0,0 +1,26 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef FLUX_JOB_MANAGER_RUNJOB_H +#define FLUX_JOB_MANAGER_RUNJOB_H + +#include +#include "job-manager.h" + +void runjob_handler (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg); + +struct runjob *runjob_ctx_create (struct job_manager *ctx); +void runjob_ctx_destroy (struct runjob *runjob); + + +#endif /* !FLUX_JOB_MANAGER_RUNJOB_H */