Skip to content

Commit

Permalink
job-manager: add fast path for simple owner jobs
Browse files Browse the repository at this point in the history
Problem: for some simple high throughput use cases, the
python front end tools, separate ingest module with jobspec
validation, and job-info KVS proxy adds unnecessary latency.

Add a job-manager.runjob RPC that accepts jobspec, bypassing
the ingest module, generating a job ID in situ, adding the
FLUX_JOB_WAITABLE flag, and kicking off the job state machine.

The original request is responded to by the wait logic once
the job has completed.

This service is only available to the instance owner, since
it bypasses the ingest jobspec validator.
  • Loading branch information
garlick committed Jun 30, 2021
1 parent ef15557 commit 9f2a338
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/modules/job-manager/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ libjob_manager_la_SOURCES = \
getattr.c \
prioritize.h \
prioritize.c \
runjob.h \
runjob.c \
jobtap-internal.h \
jobtap.h \
jobtap.c \
Expand Down
12 changes: 12 additions & 0 deletions src/modules/job-manager/job-manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "annotate.h"
#include "journal.h"
#include "getattr.h"
#include "runjob.h"
#include "jobtap-internal.h"

#include "job-manager.h"
Expand Down Expand Up @@ -113,6 +114,12 @@ static const struct flux_msg_handler_spec htab[] = {
getinfo_handle_request,
FLUX_ROLE_USER
},
{
FLUX_MSGTYPE_REQUEST,
"job-manager.runjob",
runjob_handler,
FLUX_ROLE_OWNER,
},
{
FLUX_MSGTYPE_REQUEST,
"job-manager.jobtap",
Expand Down Expand Up @@ -202,6 +209,10 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "restart_from_kvs");
goto done;
}
if (!(ctx.runjob = runjob_ctx_create (&ctx))) { // uses max_jobid
flux_log_error (h, "error creating runjob interface"); // from restart
goto done;
}
if (flux_reactor_run (r, 0) < 0) {
flux_log_error (h, "flux_reactor_run");
goto done;
Expand All @@ -223,6 +234,7 @@ int mod_main (flux_t *h, int argc, char **argv)
alloc_ctx_destroy (ctx.alloc);
submit_ctx_destroy (ctx.submit);
event_ctx_destroy (ctx.event);
runjob_ctx_destroy (ctx.runjob);
jobtap_destroy (ctx.jobtap);
zhashx_destroy (&ctx.active_jobs);
return rc;
Expand Down
1 change: 1 addition & 0 deletions src/modules/job-manager/job-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct job_manager {
struct kill *kill;
struct annotate *annotate;
struct journal *journal;
struct runjob *runjob;
struct jobtap *jobtap;
};

Expand Down
190 changes: 190 additions & 0 deletions src/modules/job-manager/runjob.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/************************************************************\
* Copyright 2021 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* runjob.c - fastpath for running a job in one RPC
*
* The RPC returns when the job is inactive and includes the wait status.
*
* Access is restricted to instance owner only.
* Job ID is issued here, instead of in job-ingest.
* Jobspec validation is bypassed.
* Jobspec is not signed, nor is signed jobspec stored to KVS.
*/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <czmq.h>
#include <assert.h>
#include <jansson.h>
#include <flux/core.h>

#include "src/common/libutil/fluid.h"
#include "src/common/libutil/jpath.h"
#include "src/common/libutil/errno_safe.h"

#include "job.h"
#include "wait.h"
#include "event.h"
#include "runjob.h"

struct runjob {
struct job_manager *ctx;
struct fluid_generator fluid_gen;
};

static void jobspec_continuation (flux_future_t *f, void *arg)
{
struct runjob *runjob = arg;
flux_t *h = flux_future_get_flux (f);
struct job *job = flux_future_aux_get (f, "job");
const char *errstr = NULL;

if (flux_rpc_get (f, NULL) < 0) {
errstr = "eventlog commit failed";
goto error;
}
if (event_job_post_pack (runjob->ctx->event,
job,
"submit",
0,
"{s:i s:i s:i}",
"userid", job->userid,
"urgency", job->urgency,
"flags", job->flags) < 0) {
errstr = "error posting submit event";
goto error;
}
wait_notify_active (runjob->ctx->wait, job);
job_aux_delete (job, f);
return;
error:
if (flux_respond_error (h, job->waiter, errno, errstr) < 0)
flux_log_error (h, "error responding to runjob");
zhashx_delete (runjob->ctx->active_jobs, &job->id);
}

static flux_future_t *commit_jobspec (flux_t *h, struct job *job)
{
char key[64];
flux_future_t *f = NULL;
flux_kvs_txn_t *txn;

if (flux_job_kvs_key (key, sizeof (key), job->id, "jobspec") < 0)
return NULL;
if (!(txn = flux_kvs_txn_create ()))
return NULL;
if (flux_kvs_txn_pack (txn, 0, key, "O", job->jobspec_redacted) < 0)
goto error;
if (!(f = flux_kvs_commit (h, NULL, 0, txn)))
goto error;
flux_kvs_txn_destroy (txn);
return f;
error:
flux_future_destroy (f);
flux_kvs_txn_destroy (txn);
return NULL;
}

void runjob_handler (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct job_manager *ctx = arg;
struct runjob *runjob = ctx->runjob;
json_t *jobspec;
const char *errstr = NULL;
struct job *job = NULL;
flux_future_t *f;

if (flux_request_unpack (msg,
NULL,
"{s:o}",
"jobspec", &jobspec) < 0) {
errstr = "malformed runjob request";
goto error;
}
if (!(job = job_create ()))
goto error;
/* The runjob request will be handled as if it were a 'wait' request
* for this job. Code in wait.c responds to the request once the job
* becomes inactive.
*/
job->flags = FLUX_JOB_WAITABLE;
job->waiter = flux_msg_incref (msg);

if (flux_msg_get_userid (msg, &job->userid) < 0)
goto error;
if (fluid_generate (&runjob->fluid_gen, &job->id) < 0) {
errstr = "error generating job id";
errno = EINVAL;
goto error;
}
/* The redacted jobspec is not actually redacted until it (in full)
* becomes part of the KVS transaction below.
*/
job->jobspec_redacted = json_incref (jobspec);
/* Start KVS commit of jobspec.
* If the commit is successful, its continuation posts the submit event
* which kicks the job state machine.
* N.B. future 'f' destruction is tied to 'job', not the other way around
*/
if (!(f = commit_jobspec (h, job))
|| flux_future_aux_set (f, "job", job, NULL) < 0
|| flux_future_then (f, -1, jobspec_continuation, runjob) < 0
|| job_aux_set (job, NULL, f, (flux_free_f)flux_future_destroy) < 0) {
flux_future_destroy (f);
errstr = "error committing jobspec to KVS";
goto error;
}
if (jpath_del (job->jobspec_redacted, "attributes.system.environment") < 0)
goto error;

zhashx_update (ctx->active_jobs, &job->id, job); // increfs job
job_decref (job);
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to runjob");
job_decref (job);
}

void runjob_ctx_destroy (struct runjob *runjob)
{
if (runjob) {
int saved_errno = errno;
free (runjob);
errno = saved_errno;
}
}

struct runjob *runjob_ctx_create (struct job_manager *ctx)
{
struct runjob *runjob;

if (!(runjob = calloc (1, sizeof (*runjob))))
return NULL;
runjob->ctx = ctx;
if (fluid_init (&runjob->fluid_gen,
16383, // reserved by job-ingest
fluid_get_timestamp (ctx->max_jobid)) < 0) {
flux_log (ctx->h, LOG_ERR, "fluid_init failed");
goto error;
}
return runjob;
error:
runjob_ctx_destroy (runjob);
return NULL;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
26 changes: 26 additions & 0 deletions src/modules/job-manager/runjob.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/************************************************************\
* Copyright 2021 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef FLUX_JOB_MANAGER_RUNJOB_H
#define FLUX_JOB_MANAGER_RUNJOB_H

#include <flux/core.h>
#include "job-manager.h"

void runjob_handler (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg);

struct runjob *runjob_ctx_create (struct job_manager *ctx);
void runjob_ctx_destroy (struct runjob *runjob);


#endif /* !FLUX_JOB_MANAGER_RUNJOB_H */

0 comments on commit 9f2a338

Please sign in to comment.