Skip to content

Commit

Permalink
Merge pull request #3924 from grondo/manager-prolog-epilog
Browse files Browse the repository at this point in the history
job-manager: add prolog/epilog support for jobtap plugins
  • Loading branch information
mergify[bot] authored Oct 30, 2021
2 parents d591830 + bc8c87b commit 26a6123
Show file tree
Hide file tree
Showing 11 changed files with 618 additions and 5 deletions.
68 changes: 68 additions & 0 deletions doc/man7/flux-jobtap-plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,74 @@ or via configuration (See :ref:`configuration` below)
]


.. _perilogs:

PROLOG AND EPILOG ACTIONS
=========================

Plugins that need to perform asynchronous tasks for jobs after an ``alloc``
event but before the job is running, or after a ``finish`` event but before
resources are freed to the scheduler can make use of job manager prolog or
epilog actions.

Prolog and epilog actions are delineated by the following functions:

::

int flux_jobtap_prolog_start (flux_plugin_t *p,
const char *description);

int flux_jobtap_prolog_finish (flux_plugin_t *p,
flux_jobid_t id,
const char *description,
int status);

int flux_jobtap_epilog_start (flux_plugin_t *p,
const char *description);

int flux_jobtap_epilog_finish (flux_plugin_t *p,
flux_jobid_t id,
const char *description,
int status);

To initiate a prolog action, a plugin should call the function
``flux_jobtap_prolog_start()``. This will block the job from starting
even after resources have been assigned until a corresponding call to
``flux_jobtap_prolog_finish()`` has been called. While the status of the
prolog action is passed to ``flux_jobtap_prolog_finish()`` so it can be
captured in the eventlog, the action itself is responsible for raising
a job exception or taking other action on failure. That is, a non-zero
prolog finish status does not cause any automated behavior on the part of
the job manager. Similarly, the prolog ``description`` is used for
informational purposes only, so that multiple actions in an eventlog
may be differentiated.

Similarly, an epilog action is initiated with ``flux_jobtap_epilog_start()``,
and prevents resources from being released to the scheduler until a
corresponding call to ``flux_jobtap_epilog_finish()``. The same caveats
described for prolog actions regarding description and completion status
of epilog actions apply.

The ``flux_jobtap_prolog_start()`` function may be initiated anytime
before the ``start`` request is made to the execution system, though most
often from the ``job.state.run`` or ``job.event.alloc`` callbacks,
since this is the point at which a job has been allocated resources.
(Note: plugins will only receive the ``job.event.*`` callbacks for
jobs to which they have subscribed with a call to
``flux_jobtap_job_subscribe()``). A prolog action cannot be started
after a job enters the CLEANUP state.

The ``flux_jobtap_epilog_start()`` function may only be called after a
job is in the CLEANUP state, but before the ``free`` request has been
sent to the scheduler, for example from the ``job.state.cleanup``
or ``job.event.finish`` callbacks.

If ``flux_jobtap_prolog_start()``, ``flux_jobtap_prolog_finish()``,
``flux_jobtap_epilog_start()`` or ``flux_jobtap_epilog_finish()`` are
called for a job in an invaid state, these function will return -1 with
``errno`` set to ``EINVAL``.

Multiple prolog or epilog actions can be active at the same time.

.. _configuration:

Expand Down
2 changes: 1 addition & 1 deletion src/modules/job-exec/job-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ void jobinfo_log_output (struct jobinfo *job,
"rank", buf,
"data", data, len) < 0)
flux_log_error (job->h,
"evenlog_append failed: %ju: message=%s",
"eventlog_append failed: %ju: message=%s",
(uintmax_t) job->id,
data);
}
Expand Down
45 changes: 44 additions & 1 deletion src/modules/job-manager/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,12 @@ int event_job_action (struct event *event, struct job *job)
return -1;
break;
case FLUX_JOB_STATE_RUN:
if (start_send_request (ctx->start, job) < 0)
/*
* If job->request_refcount is nonzero then a prolog action
* is still in progress so do not send start request.
*/
if (!job->perilog_active
&& start_send_request (ctx->start, job) < 0)
return -1;
break;
case FLUX_JOB_STATE_CLEANUP:
Expand All @@ -364,6 +369,7 @@ int event_job_action (struct event *event, struct job *job)
* it is safe to release all resources to the scheduler.
*/
if (job->has_resources
&& !job->perilog_active
&& !job->alloc_bypass
&& !job->start_pending
&& !job->free_pending) {
Expand Down Expand Up @@ -494,6 +500,31 @@ static int event_handle_set_flags (struct job *job,
return 0;
}

/* Handle an prolog-* or epilog-* event
*/
static int event_handle_perilog (struct job *job,
const char *cmd,
json_t *context)
{
if (strcmp (cmd, "start") == 0) {
if (job->perilog_active == UINT8_MAX) {
errno = EOVERFLOW;
return -1;
}
job->perilog_active++;
}
else if (strcmp (cmd, "finish") == 0) {
if (job->perilog_active > 0)
job->perilog_active--;
}
else {
errno = EPROTO;
return -1;
}
return 0;
}


/* Return a callback topic string for the current job state
*
* NOTE: 'job.state.new' and 'job.state.depend' are not currently used
Expand Down Expand Up @@ -626,6 +657,18 @@ int event_job_update (struct job *job, json_t *event)
goto inval;
job->state = FLUX_JOB_STATE_INACTIVE;
}
else if (!strncmp (name, "prolog-", 7)) {
if (job->start_pending)
goto inval;
if (event_handle_perilog (job, name+7, context) < 0)
goto error;
}
else if (!strncmp (name, "epilog-", 7)) {
if (job->state != FLUX_JOB_STATE_CLEANUP)
goto inval;
if (event_handle_perilog (job, name+7, context) < 0)
goto error;
}
else if (!strcmp (name, "flux-restart")) {
/* The flux-restart event is currently only posted to jobs in
* SCHED state since that is the only state transition defined
Expand Down
2 changes: 2 additions & 0 deletions src/modules/job-manager/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ struct job {
uint8_t has_resources:1;
uint8_t start_pending:1;// start request sent to job-exec

uint8_t perilog_active; // if nonzero, prolog/epilog active

json_t *annotations;

struct grudgeset *dependencies;
Expand Down
130 changes: 128 additions & 2 deletions src/modules/job-manager/jobtap.c
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,17 @@ static int jobtap_stack_call (struct jobtap *jobtap,
flux_plugin_arg_t *args)
{
int retcode = 0;
flux_plugin_t *p = zlistx_first (plugins);
flux_plugin_t *p = NULL;

/* Duplicate list to make jobtap_stack_call reentrant */
zlistx_t *l = zlistx_dup (plugins);
if (!l)
return -1;
zlistx_set_destructor (l, NULL);

if (current_job_push (jobtap, job) < 0)
return -1;
p = zlistx_first (l);
while (p) {
int rc = flux_plugin_call (p, topic, args);
if (rc < 0) {
Expand All @@ -561,8 +568,9 @@ static int jobtap_stack_call (struct jobtap *jobtap,
break;
}
retcode += rc;
p = zlistx_next (plugins);
p = zlistx_next (l);
}
zlistx_destroy (&l);
if (current_job_pop (jobtap) < 0)
return -1;
return retcode;
Expand Down Expand Up @@ -1890,6 +1898,124 @@ int flux_jobtap_job_event_posted (flux_plugin_t *p,
return 0;
}

static int jobtap_emit_perilog_event (struct jobtap *jobtap,
struct job *job,
bool prolog,
bool start,
const char *description,
int status)
{
int flags = 0;
const char *event = prolog ? start ? "prolog-start" : "prolog-finish" :
start ? "epilog-start" : "epilog-finish";

if (!description) {
errno = EINVAL;
return -1;
}

/* prolog events cannot be emitted after a start request is pending.
*
* epilog events cannot be emitted outside of CLEANUP state
* and must be emitted before free request is pending.
*/
if ((prolog && job->start_pending)
|| (prolog && job->state == FLUX_JOB_STATE_CLEANUP)
|| (!prolog && job->state != FLUX_JOB_STATE_CLEANUP)
|| (!prolog && job->free_pending)) {
errno = EINVAL;
return -1;
}
if (start)
return event_job_post_pack (jobtap->ctx->event,
job,
event,
flags,
"{s:s}",
"description", description);
else
return event_job_post_pack (jobtap->ctx->event,
job,
event,
flags,
"{s:s s:i}",
"description", description,
"status", status);
}

int flux_jobtap_prolog_start (flux_plugin_t *p, const char *description)
{
struct job * job;
struct jobtap *jobtap;

if (!p
|| !(jobtap = flux_plugin_aux_get (p, "flux::jobtap"))
|| !(job = current_job (jobtap))) {
errno = EINVAL;
return -1;
}
return jobtap_emit_perilog_event (jobtap, job, true, true, description, 0);
}

int flux_jobtap_prolog_finish (flux_plugin_t *p,
flux_jobid_t id,
const char *description,
int status)
{
struct job * job;
struct jobtap *jobtap;

if (!p || !(jobtap = flux_plugin_aux_get (p, "flux::jobtap"))) {
errno = EINVAL;
return -1;
}
if (!(job = jobtap_lookup_jobid (p, id)))
return -1;
return jobtap_emit_perilog_event (jobtap,
job,
true,
false,
description,
status);
}

int flux_jobtap_epilog_start (flux_plugin_t *p, const char *description)
{
struct job * job;
struct jobtap *jobtap;

if (!p
|| !(jobtap = flux_plugin_aux_get (p, "flux::jobtap"))
|| !(job = current_job (jobtap))) {
errno = EINVAL;
return -1;
}
return jobtap_emit_perilog_event (jobtap, job, false, true, description, 0);
}

int flux_jobtap_epilog_finish (flux_plugin_t *p,
flux_jobid_t id,
const char *description,
int status)
{
struct job * job;
struct jobtap *jobtap;

if (!p || !(jobtap = flux_plugin_aux_get (p, "flux::jobtap"))) {
errno = EINVAL;
return -1;
}
if (!(job = jobtap_lookup_jobid (p, id)))
return -1;
return jobtap_emit_perilog_event (jobtap,
job,
false,
false,
description,
status);
}


/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
Expand Down
32 changes: 32 additions & 0 deletions src/modules/job-manager/jobtap.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,38 @@ int flux_jobtap_job_subscribe (flux_plugin_t *p, flux_jobid_t id);
*/
void flux_jobtap_job_unsubscribe (flux_plugin_t *p, flux_jobid_t id);


/* Post an event to the current job eventlog indicating that a prolog
* action has started. This will block the start request to the
* execution system until `flux_jobtap_prolog_finish()` is called.
*/
int flux_jobtap_prolog_start (flux_plugin_t *p, const char *description);

/* Post an event to the eventlog for job id indicating that a prolog
* action has finished. The description should match the description
* of an outstanding prolog start event. `status` is informational
* and should be 0 to indicate success, non-zero for failure.
*/
int flux_jobtap_prolog_finish (flux_plugin_t *p,
flux_jobid_t id,
const char *description,
int status);

/* Post an event to the current job eventlog indicating that an epilog
* action has started. This will block the free request to the
* scheduler until `flux_jobtap_epilog_finish()` is called.
*/
int flux_jobtap_epilog_start (flux_plugin_t *p, const char *description);

/* Post an event to the eventlog for job id indicating that an epilog
* action has finished. The description should match the description
* of an outstanding epilog start event. `status` is informational
* and should be 0 to indicate success, non-zero for failure.
*/
int flux_jobtap_epilog_finish (flux_plugin_t *p,
flux_jobid_t id,
const char *description,
int status);
#ifdef __cplusplus
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/shell/evlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ static int evlog_shell_exit (flux_plugin_t *p,
return 0;
}

/* Start the evenlog-based logger during shell.connect, just after the
/* Start the eventlog-based logger during shell.connect, just after the
* shell has obtained a flux_t handle. This allows more early log
* messages to make it into the eventlog, but some data (such as
* the current shell_rank) is not available at this time.
Expand Down
10 changes: 10 additions & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ check_LTLIBRARIES = \
job-manager/plugins/dependency-test.la \
job-manager/plugins/subscribe.la \
job-manager/plugins/cleanup-event.la \
job-manager/plugins/perilog-test.la \
stats/stats-basic.la \
stats/stats-immediate.la

Expand Down Expand Up @@ -789,6 +790,15 @@ job_manager_plugins_cleanup_event_la_LDFLAGS = \
job_manager_plugins_cleanup_event_la_LIBADD = \
$(top_builddir)/src/common/libflux-core.la

job_manager_plugins_perilog_test_la_SOURCES = \
job-manager/plugins/perilog-test.c
job_manager_plugins_perilog_test_la_CPPFLAGS = \
$(test_cppflags)
job_manager_plugins_perilog_test_la_LDFLAGS = \
$(fluxplugin_ldflags) -module -rpath /nowhere
job_manager_plugins_perilog_test_la_LIBADD = \
$(top_builddir)/src/common/libflux-core.la

hwloc_hwloc_convert_SOURCES = hwloc/hwloc-convert.c
hwloc_hwloc_convert_CPPFLAGS = $(HWLOC_CFLAGS) $(test_cppflags)
hwloc_hwloc_convert_LDADD = $(HWLOC_LIBS) \
Expand Down
Loading

0 comments on commit 26a6123

Please sign in to comment.