Skip to content

Commit

Permalink
job-info: stream events when job is inactive
Browse files Browse the repository at this point in the history
Problem: If a job is inactive, all data in an eventlog will be retrieved
from the KVS in a single lookup.  This is because we know the data should never
change after the job is inactive.

If this data is very large, this lookup can be slow.  In some cases, the
use of something like `flux job attach` can have the appearance of a hang
because the standard output response is taking so long to lookup and return.

Solution:

When a job is inactive and the user wants to watch a job eventlog,
do not retrieve all of the data from the KVS in a single lookup.  Instead,
use the FLUX_KVS_STREAM flag to retrieve the data in smaller chunks.  This data
will be internally read and parsed no differently than when the job is active.

Fixes flux-framework#6516
  • Loading branch information
chu11 committed Dec 20, 2024
1 parent e10d676 commit 0ef12b2
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 51 deletions.
89 changes: 42 additions & 47 deletions src/modules/job-info/guest_watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ struct guest_watch_ctx {
bool guest_released;

/* data from guest namespace */
int offset;
int guest_offset;
/* data from main namespace */
int main_offset;
};

static int get_main_eventlog (struct guest_watch_ctx *gw);
Expand Down Expand Up @@ -194,22 +196,8 @@ static int send_eventlog_watch_cancel (struct guest_watch_ctx *gw,
f = gw->wait_guest_namespace_f;
else if (gw->state == GUEST_WATCH_STATE_GUEST_NAMESPACE_WATCH)
f = gw->guest_namespace_watch_f;
else if (gw->state == GUEST_WATCH_STATE_MAIN_NAMESPACE_LOOKUP) {
/* Since this is a lookup, we don't need to perform an actual
* cancel to "job-info.eventlog-watch-cancel". Just return
* ENODATA to the caller if necessary.
*/
gw->eventlog_watch_canceled = true;
if (gw->cancel) {
if (flux_respond_error (gw->ctx->h,
gw->msg,
ENODATA,
NULL) < 0)
flux_log_error (gw->ctx->h, "%s: flux_respond_error",
__FUNCTION__);
}
return 0;
}
else if (gw->state == GUEST_WATCH_STATE_MAIN_NAMESPACE_LOOKUP)
f = gw->main_namespace_lookup_f;
else {
/* gw->state == GUEST_WATCH_STATE_INIT, eventlog-watch
* never started so sort of "auto-canceled" */
Expand Down Expand Up @@ -649,7 +637,7 @@ static void guest_namespace_watch_continuation (flux_future_t *f, void *arg)
goto cleanup;
}

gw->offset += strlen (event);
gw->guest_offset += strlen (event);
flux_future_reset (f);
return;

Expand Down Expand Up @@ -680,31 +668,36 @@ static int full_guest_path (struct guest_watch_ctx *gw,

static int main_namespace_lookup (struct guest_watch_ctx *gw)
{
const char *topic = "job-info.lookup";
const char *topic = "job-info.eventlog-watch";
int rpc_flags = FLUX_RPC_STREAMING;
flux_msg_t *msg = NULL;
int save_errno;
int flags = gw->flags;
int rv = -1;
char path[PATH_MAX];

if (full_guest_path (gw, path, PATH_MAX) < 0)
goto error;

/* If the eventlog has been migrated to the main KVS namespace, we
* know that the eventlog is complete, so no need to do a "watch",
* do a lookup instead */
/* the job has completed, so "waitcreate" has no meaning
* anymore, clear the flag
*/
if (flags & FLUX_JOB_EVENT_WATCH_WAITCREATE)
flags &= ~FLUX_JOB_EVENT_WATCH_WAITCREATE;

if (!(msg = cred_msg_pack (topic,
gw->cred,
"{s:I s:[s] s:i}",
"{s:I s:b s:s s:i}",
"id", gw->id,
"keys", path,
"flags", 0)))
"guest_in_main", true,
"path", path,
"flags", flags)))
goto error;

if (!(gw->main_namespace_lookup_f = flux_rpc_message (gw->ctx->h,
msg,
FLUX_NODEID_ANY,
0))) {
rpc_flags))) {
flux_log_error (gw->ctx->h, "%s: flux_rpc_message", __FUNCTION__);
goto error;
}
Expand All @@ -731,41 +724,43 @@ static void main_namespace_lookup_continuation (flux_future_t *f, void *arg)
{
struct guest_watch_ctx *gw = arg;
struct info_ctx *ctx = gw->ctx;
const char *s;
const char *input;
const char *tok;
size_t toklen;
char path[PATH_MAX];

if (full_guest_path (gw, path, PATH_MAX) < 0)
goto error;
const char *event;

if (flux_rpc_get_unpack (f, "{s:s}", path, &s) < 0) {
if (errno != ENOENT && errno != EPERM)
flux_log_error (ctx->h, "%s: flux_rpc_get_unpack", __FUNCTION__);
if (flux_job_event_watch_get (f, &event) < 0) {
if (errno != ENOENT && errno != ENODATA)
flux_log_error (ctx->h,
"%s: flux_job_event_watch_get",
__FUNCTION__);
goto error;
}

if (gw->eventlog_watch_canceled) {
/* already sent ENODATA via send_eventlog_watch_cancel(), so
* just cleanup */
if (gw->cancel) {
errno = ENODATA;
goto error;
}
goto cleanup;
}

input = s + gw->offset;
while (get_next_eventlog_entry (&input, &tok, &toklen)) {
if (flux_respond_pack (ctx->h, gw->msg,
"{s:s#}",
"event", tok, toklen) < 0) {
gw->main_offset += strlen (event);

if (gw->main_offset > gw->guest_offset) {
if (flux_respond_pack (ctx->h, gw->msg, "{s:s}", "event", event) < 0) {
flux_log_error (ctx->h, "%s: flux_respond_pack",
__FUNCTION__);

/* If we haven't sent a cancellation yet, must do so so that
* the future's matchtag will eventually be freed */
if (!gw->eventlog_watch_canceled)
(void) send_eventlog_watch_cancel (gw,
gw->main_namespace_lookup_f,
false);
goto cleanup;
}
}

/* We've moved to the main KVS namespace, so we know there's no
* more data, return ENODATA */
errno = ENODATA;
flux_future_reset (f);
return;

error:
if (flux_respond_error (ctx->h, gw->msg, errno, NULL) < 0)
Expand Down
33 changes: 29 additions & 4 deletions src/modules/job-info/watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct watch_ctx {
const flux_msg_t *msg;
flux_jobid_t id;
bool guest;
bool guest_in_main; /* read guest in main namespace */
char *path;
int flags;
flux_future_t *check_f;
Expand Down Expand Up @@ -62,6 +63,7 @@ static struct watch_ctx *watch_ctx_create (struct info_ctx *ctx,
const flux_msg_t *msg,
flux_jobid_t id,
bool guest,
bool guest_in_main,
const char *path,
int flags)
{
Expand All @@ -73,6 +75,7 @@ static struct watch_ctx *watch_ctx_create (struct info_ctx *ctx,
w->ctx = ctx;
w->id = id;
w->guest = guest;
w->guest_in_main = guest_in_main;
if (!(w->path = strdup (path))) {
errno = ENOMEM;
goto error;
Expand Down Expand Up @@ -122,6 +125,12 @@ static int watch_key (struct watch_ctx *w)
if (w->flags & FLUX_JOB_EVENT_WATCH_WAITCREATE)
flags |= FLUX_KVS_WAITCREATE;

/* guest_in_main means the job is inactive, we do not want to
* "watch" this, just stream the data that is there right now.
*/
if (w->guest_in_main)
flags = FLUX_KVS_STREAM;

if (w->guest) {
if (flux_job_kvs_namespace (ns, sizeof (ns), w->id) < 0) {
flux_log_error (w->ctx->h, "%s: flux_job_kvs_namespace",
Expand Down Expand Up @@ -321,12 +330,19 @@ static int watch (struct info_ctx *ctx,
flux_jobid_t id,
const char *path,
int flags,
bool guest)
bool guest,
bool guest_in_main)
{
struct watch_ctx *w = NULL;
uint32_t rolemask;

if (!(w = watch_ctx_create (ctx, msg, id, guest, path, flags)))
if (!(w = watch_ctx_create (ctx,
msg,
id,
guest,
guest_in_main,
path,
flags)))
goto error;

/* if user requested an alternate path and that alternate path is
Expand Down Expand Up @@ -388,6 +404,7 @@ void watch_cb (flux_t *h,
struct watch_ctx *w = NULL;
flux_jobid_t id;
int guest = 0;
int guest_in_main = 0;
const char *path = NULL;
int flags;
int valid_flags = FLUX_JOB_EVENT_WATCH_WAITCREATE;
Expand All @@ -412,15 +429,23 @@ void watch_cb (flux_t *h,
}
/* guest flag indicates to read path from guest namespace */
(void)flux_request_unpack (msg, NULL, "{s:b}", "guest", &guest);
/* guest_in_main flag indicates to read "guest" path from main
* namespace, also meaning the job is inactive
*/
(void)flux_request_unpack (msg,
NULL,
"{s:b}",
"guest_in_main",
&guest_in_main);

/* if watching a "guest" path, forward to guest watcher for
* handling */
if (strstarts (path, "guest.")) {
if (strstarts (path, "guest.") && !guest_in_main) {
if (guest_watch (ctx, msg, id, path + 6, flags) < 0)
goto error;
}
else {
if (watch (ctx, msg, id, path, flags, guest) < 0)
if (watch (ctx, msg, id, path, flags, guest, guest_in_main) < 0)
goto error;
}

Expand Down

0 comments on commit 0ef12b2

Please sign in to comment.