From 16384f0ba340fc50802fec4b31435fc5b0e13258 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 9 Dec 2024 07:47:33 -0800 Subject: [PATCH] job-info: stream events when job is inactive Problem: If a job is inactive, all data in an eventlog will be retrieved as a single response during an eventlog watch. This is because we know for a fact that the data should never change after the job is inactive. If this data, such as job standard output, is very large, this lookup can be very slow. In some cases, the use of something like `flux job attach` can have the appearance of a hang because the standard output response is taking so long to lookup and return. Solution: When a job is inactive and the user wants to watch a job eventlog, do not retrieve all of the data. Instead, retrieve the data via an internal eventlog watch, but have the eventlog watch use the new FLUX_KVS_STREAM flag. Fixes #6516 --- src/modules/job-info/guest_watch.c | 89 ++++++++++++++---------------- src/modules/job-info/watch.c | 33 +++++++++-- 2 files changed, 71 insertions(+), 51 deletions(-) diff --git a/src/modules/job-info/guest_watch.c b/src/modules/job-info/guest_watch.c index f007d47d8dbc..2238e77ca000 100644 --- a/src/modules/job-info/guest_watch.c +++ b/src/modules/job-info/guest_watch.c @@ -117,7 +117,9 @@ struct guest_watch_ctx { bool guest_released; /* data from guest namespace */ - int offset; + int guest_offset; + /* data from main namespace */ + int main_offset; }; static int get_main_eventlog (struct guest_watch_ctx *gw); @@ -194,22 +196,8 @@ static int send_eventlog_watch_cancel (struct guest_watch_ctx *gw, f = gw->wait_guest_namespace_f; else if (gw->state == GUEST_WATCH_STATE_GUEST_NAMESPACE_WATCH) f = gw->guest_namespace_watch_f; - else if (gw->state == GUEST_WATCH_STATE_MAIN_NAMESPACE_LOOKUP) { - /* Since this is a lookup, we don't need to perform an actual - * cancel to "job-info.eventlog-watch-cancel". Just return - * ENODATA to the caller if necessary. - */ - gw->eventlog_watch_canceled = true; - if (gw->cancel) { - if (flux_respond_error (gw->ctx->h, - gw->msg, - ENODATA, - NULL) < 0) - flux_log_error (gw->ctx->h, "%s: flux_respond_error", - __FUNCTION__); - } - return 0; - } + else if (gw->state == GUEST_WATCH_STATE_MAIN_NAMESPACE_LOOKUP) + f = gw->main_namespace_lookup_f; else { /* gw->state == GUEST_WATCH_STATE_INIT, eventlog-watch * never started so sort of "auto-canceled" */ @@ -649,7 +637,7 @@ static void guest_namespace_watch_continuation (flux_future_t *f, void *arg) goto cleanup; } - gw->offset += strlen (event); + gw->guest_offset += strlen (event); flux_future_reset (f); return; @@ -680,31 +668,36 @@ static int full_guest_path (struct guest_watch_ctx *gw, static int main_namespace_lookup (struct guest_watch_ctx *gw) { - const char *topic = "job-info.lookup"; + const char *topic = "job-info.eventlog-watch"; + int rpc_flags = FLUX_RPC_STREAMING; flux_msg_t *msg = NULL; int save_errno; + int flags = gw->flags; int rv = -1; char path[PATH_MAX]; if (full_guest_path (gw, path, PATH_MAX) < 0) goto error; - /* If the eventlog has been migrated to the main KVS namespace, we - * know that the eventlog is complete, so no need to do a "watch", - * do a lookup instead */ + /* the job has completed, so "waitcreate" has no meaning + * anymore, clear the flag + */ + if (flags & FLUX_JOB_EVENT_WATCH_WAITCREATE) + flags &= ~FLUX_JOB_EVENT_WATCH_WAITCREATE; if (!(msg = cred_msg_pack (topic, gw->cred, - "{s:I s:[s] s:i}", + "{s:I s:b s:s s:i}", "id", gw->id, - "keys", path, - "flags", 0))) + "guest_in_main", true, + "path", path, + "flags", flags))) goto error; if (!(gw->main_namespace_lookup_f = flux_rpc_message (gw->ctx->h, msg, FLUX_NODEID_ANY, - 0))) { + rpc_flags))) { flux_log_error (gw->ctx->h, "%s: flux_rpc_message", __FUNCTION__); goto error; } @@ -731,41 +724,43 @@ static void main_namespace_lookup_continuation (flux_future_t *f, void *arg) { struct guest_watch_ctx *gw = arg; struct info_ctx *ctx = gw->ctx; - const char *s; - const char *input; - const char *tok; - size_t toklen; - char path[PATH_MAX]; - - if (full_guest_path (gw, path, PATH_MAX) < 0) - goto error; + const char *event; - if (flux_rpc_get_unpack (f, "{s:s}", path, &s) < 0) { - if (errno != ENOENT && errno != EPERM) - flux_log_error (ctx->h, "%s: flux_rpc_get_unpack", __FUNCTION__); + if (flux_job_event_watch_get (f, &event) < 0) { + if (errno != ENOENT && errno != ENODATA) + flux_log_error (ctx->h, + "%s: flux_job_event_watch_get", + __FUNCTION__); goto error; } if (gw->eventlog_watch_canceled) { - /* already sent ENODATA via send_eventlog_watch_cancel(), so - * just cleanup */ + if (gw->cancel) { + errno = ENODATA; + goto error; + } goto cleanup; } - input = s + gw->offset; - while (get_next_eventlog_entry (&input, &tok, &toklen)) { - if (flux_respond_pack (ctx->h, gw->msg, - "{s:s#}", - "event", tok, toklen) < 0) { + gw->main_offset += strlen (event); + + if (gw->main_offset > gw->guest_offset) { + if (flux_respond_pack (ctx->h, gw->msg, "{s:s}", "event", event) < 0) { flux_log_error (ctx->h, "%s: flux_respond_pack", __FUNCTION__); + + /* If we haven't sent a cancellation yet, must do so so that + * the future's matchtag will eventually be freed */ + if (!gw->eventlog_watch_canceled) + (void) send_eventlog_watch_cancel (gw, + gw->main_namespace_lookup_f, + false); goto cleanup; } } - /* We've moved to the main KVS namespace, so we know there's no - * more data, return ENODATA */ - errno = ENODATA; + flux_future_reset (f); + return; error: if (flux_respond_error (ctx->h, gw->msg, errno, NULL) < 0) diff --git a/src/modules/job-info/watch.c b/src/modules/job-info/watch.c index 36877a8a9111..bf8d4fe8bf5a 100644 --- a/src/modules/job-info/watch.c +++ b/src/modules/job-info/watch.c @@ -32,6 +32,7 @@ struct watch_ctx { const flux_msg_t *msg; flux_jobid_t id; bool guest; + bool guest_in_main; /* read guest in main namespace */ char *path; int flags; flux_future_t *check_f; @@ -62,6 +63,7 @@ static struct watch_ctx *watch_ctx_create (struct info_ctx *ctx, const flux_msg_t *msg, flux_jobid_t id, bool guest, + bool guest_in_main, const char *path, int flags) { @@ -73,6 +75,7 @@ static struct watch_ctx *watch_ctx_create (struct info_ctx *ctx, w->ctx = ctx; w->id = id; w->guest = guest; + w->guest_in_main = guest_in_main; if (!(w->path = strdup (path))) { errno = ENOMEM; goto error; @@ -122,6 +125,12 @@ static int watch_key (struct watch_ctx *w) if (w->flags & FLUX_JOB_EVENT_WATCH_WAITCREATE) flags |= FLUX_KVS_WAITCREATE; + /* guest_in_main means the job is inactive, we do not want to + * "watch" this, just stream the data that is there right now. + */ + if (w->guest_in_main) + flags = FLUX_KVS_STREAM; + if (w->guest) { if (flux_job_kvs_namespace (ns, sizeof (ns), w->id) < 0) { flux_log_error (w->ctx->h, "%s: flux_job_kvs_namespace", @@ -321,12 +330,19 @@ static int watch (struct info_ctx *ctx, flux_jobid_t id, const char *path, int flags, - bool guest) + bool guest, + bool guest_in_main) { struct watch_ctx *w = NULL; uint32_t rolemask; - if (!(w = watch_ctx_create (ctx, msg, id, guest, path, flags))) + if (!(w = watch_ctx_create (ctx, + msg, + id, + guest, + guest_in_main, + path, + flags))) goto error; /* if user requested an alternate path and that alternate path is @@ -388,6 +404,7 @@ void watch_cb (flux_t *h, struct watch_ctx *w = NULL; flux_jobid_t id; int guest = 0; + int guest_in_main = 0; const char *path = NULL; int flags; int valid_flags = FLUX_JOB_EVENT_WATCH_WAITCREATE; @@ -412,15 +429,23 @@ void watch_cb (flux_t *h, } /* guest flag indicates to read path from guest namespace */ (void)flux_request_unpack (msg, NULL, "{s:b}", "guest", &guest); + /* guest_in_main flag indicates to read "guest" path from main + * namespace, also meaning the job is inactive + */ + (void)flux_request_unpack (msg, + NULL, + "{s:b}", + "guest_in_main", + &guest_in_main); /* if watching a "guest" path, forward to guest watcher for * handling */ - if (strstarts (path, "guest.")) { + if (strstarts (path, "guest.") && !guest_in_main) { if (guest_watch (ctx, msg, id, path + 6, flags) < 0) goto error; } else { - if (watch (ctx, msg, id, path, flags, guest) < 0) + if (watch (ctx, msg, id, path, flags, guest, guest_in_main) < 0) goto error; }