Skip to content

Commit

Permalink
Merge pull request #6502 from chu11/issue6498_job_info_failed_rpc
Browse files Browse the repository at this point in the history
job-info: avoid error response on failed rpc
  • Loading branch information
mergify[bot] authored Dec 13, 2024
2 parents 8ee6a97 + 15aad44 commit fd8afd5
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 42 deletions.
28 changes: 12 additions & 16 deletions src/modules/job-info/guest_watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -639,24 +639,20 @@ static void guest_namespace_watch_continuation (flux_future_t *f, void *arg)
if (flux_respond_pack (ctx->h, gw->msg, "{s:s}", "event", event) < 0) {
flux_log_error (ctx->h, "%s: flux_respond_pack",
__FUNCTION__);
goto error_cancel;

/* If we haven't sent a cancellation yet, must do so so that
* the future's matchtag will eventually be freed */
if (!gw->eventlog_watch_canceled)
(void) send_eventlog_watch_cancel (gw,
gw->guest_namespace_watch_f,
false);
goto cleanup;
}

gw->offset += strlen (event);
flux_future_reset (f);
return;

error_cancel:
/* If we haven't sent a cancellation yet, must do so so that
* the future's matchtag will eventually be freed */
if (!gw->eventlog_watch_canceled) {
int save_errno = errno;
(void) send_eventlog_watch_cancel (gw,
gw->guest_namespace_watch_f,
false);
errno = save_errno;
}

error:
if (flux_respond_error (ctx->h, gw->msg, errno, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);
Expand Down Expand Up @@ -706,9 +702,9 @@ static int main_namespace_lookup (struct guest_watch_ctx *gw)
goto error;

if (!(gw->main_namespace_lookup_f = flux_rpc_message (gw->ctx->h,
msg,
FLUX_NODEID_ANY,
0))) {
msg,
FLUX_NODEID_ANY,
0))) {
flux_log_error (gw->ctx->h, "%s: flux_rpc_message", __FUNCTION__);
goto error;
}
Expand Down Expand Up @@ -763,7 +759,7 @@ static void main_namespace_lookup_continuation (flux_future_t *f, void *arg)
"event", tok, toklen) < 0) {
flux_log_error (ctx->h, "%s: flux_respond_pack",
__FUNCTION__);
goto error;
goto cleanup;
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/modules/job-info/update.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,20 @@ static void eventlog_continuation (flux_future_t *f, void *arg)

if (flux_job_event_watch_get (f, &s) < 0) {
flux_log_error (ctx->h, "%s: flux_job_event_watch_get", __FUNCTION__);
goto error_cancel;
eventlog_watch_cancel (uc);
goto cleanup;
}

if (!(event = eventlog_entry_decode (s))) {
flux_log_error (uc->ctx->h, "%s: eventlog_entry_decode", __FUNCTION__);
goto error_cancel;
eventlog_watch_cancel (uc);
goto cleanup;
}

if (eventlog_entry_parse (event, NULL, &name, &context) < 0) {
flux_log_error (uc->ctx->h, "%s: eventlog_entry_decode", __FUNCTION__);
goto error_cancel;
eventlog_watch_cancel (uc);
goto cleanup;
}

if (context && streq (name, uc->update_name)) {
Expand Down Expand Up @@ -196,7 +199,8 @@ static void eventlog_continuation (flux_future_t *f, void *arg)
"{s:O}",
uc->key, uc->update_object) < 0) {
flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__);
goto error_cancel;
eventlog_watch_cancel (uc);
goto cleanup;
}
msg = flux_msglist_next (uc->msglist);
}
Expand All @@ -207,11 +211,6 @@ static void eventlog_continuation (flux_future_t *f, void *arg)
json_decref (event);
return;

error_cancel:
/* Must do so so that the future's matchtag will eventually be
* freed */
eventlog_watch_cancel (uc);

error:
msg = flux_msglist_first (uc->msglist);
while (msg) {
Expand Down Expand Up @@ -360,7 +359,7 @@ static void lookup_continuation (flux_future_t *f, void *arg)
if (flux_respond_pack (uc->ctx->h, msg, "{s:O}",
uc->key, uc->update_object) < 0) {
flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__);
goto error;
goto cleanup;
}

next:
Expand Down Expand Up @@ -520,7 +519,7 @@ void update_watch_cb (flux_t *h,
"{s:O}",
uc->key, uc->update_object) < 0) {
flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__);
goto error;
goto cleanup;
}
}
/* if uc->update_object has not been set, the initial lookup
Expand All @@ -536,6 +535,7 @@ void update_watch_cb (flux_t *h,
error:
if (flux_respond_error (h, msg, errno, errmsg) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
cleanup:
free (index_key);
}

Expand Down
31 changes: 16 additions & 15 deletions src/modules/job-info/watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,15 @@ static void watch_continuation (flux_future_t *f, void *arg)
}

if (!w->allow) {
if (eventlog_allow (ctx, w->msg, w->id, s) < 0)
goto error_cancel;
if (eventlog_allow (ctx, w->msg, w->id, s) < 0) {
if (!w->kvs_watch_canceled) {
if (flux_kvs_lookup_cancel (w->watch_f) < 0)
flux_log_error (ctx->h,
"%s: flux_kvs_lookup_cancel",
__FUNCTION__);
}
goto cleanup;
}
w->allow = true;
}

Expand All @@ -266,7 +273,13 @@ static void watch_continuation (flux_future_t *f, void *arg)
flux_log_error (ctx->h,
"%s: flux_respond_pack",
__FUNCTION__);
goto error_cancel;
if (!w->kvs_watch_canceled) {
if (flux_kvs_lookup_cancel (w->watch_f) < 0)
flux_log_error (ctx->h,
"%s: flux_kvs_lookup_cancel",
__FUNCTION__);
}
goto cleanup;
}

/* When watching the main job eventlog, we return ENODATA back
Expand Down Expand Up @@ -294,18 +307,6 @@ static void watch_continuation (flux_future_t *f, void *arg)
flux_future_reset (f);
return;

error_cancel:
/* If we haven't sent a cancellation yet, must do so so that
* the future's matchtag will eventually be freed */
if (!w->kvs_watch_canceled) {
int save_errno = errno;
if (flux_kvs_lookup_cancel (w->watch_f) < 0)
flux_log_error (ctx->h,
"%s: flux_kvs_lookup_cancel",
__FUNCTION__);
errno = save_errno;
}

error:
if (flux_respond_error (ctx->h, w->msg, errno, errmsg) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);
Expand Down

0 comments on commit fd8afd5

Please sign in to comment.