From ca346837a614a86710c3e429dc0211439762034e Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 28 May 2024 11:20:12 -0700 Subject: [PATCH 1/6] sdexec: use matchtag in server write request Problem: RFC 42 now requires write requests to reference the subprocess via the exec matchtag rather than the pid. Expect a "matchtag" key in the write request and use it along with the sender's uuid to look up the systemd unit. --- src/modules/sdexec/sdexec.c | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/modules/sdexec/sdexec.c b/src/modules/sdexec/sdexec.c index cc6eb1f557dd..5b3ae5b37fd5 100644 --- a/src/modules/sdexec/sdexec.c +++ b/src/modules/sdexec/sdexec.c @@ -139,6 +139,25 @@ static const flux_msg_t *lookup_message_byaux (struct flux_msglist *msglist, return NULL; } +/* Find an sdexec.exec message with the same sender as msg and matchtag as + * specified in the msg matchtag field. + * N.B. flux_cancel_match() happens to be helpful because RFC 42 subprocess + * write works like RFC 6 cancel. + */ +static const flux_msg_t *lookup_message_byclient (struct flux_msglist *msglist, + const flux_msg_t *msg) +{ + const flux_msg_t *m; + + m = flux_msglist_first (msglist); + while (m) { + if (flux_cancel_match (msg, m)) + return m; + m = flux_msglist_next (msglist); + } + return NULL; +} + static void exec_respond_error (struct sdproc *proc, int errnum, const char *errstr) @@ -713,7 +732,7 @@ static void write_cb (flux_t *h, void *arg) { struct sdexec_ctx *ctx = arg; - pid_t pid; + int matchtag; json_t *io; const flux_msg_t *exec_request; flux_error_t error; @@ -723,7 +742,7 @@ static void write_cb (flux_t *h, if (flux_request_unpack (msg, NULL, "{s:i s:o}", - "pid", &pid, + "matchtag", &matchtag, "io", &io) < 0) { flux_log_error (h, "error decoding write request"); return; @@ -736,22 +755,18 @@ static void write_cb (flux_t *h, flux_log_error (h, "%s", error.text); return; } - if (!(exec_request = lookup_message_bypid (ctx->requests, pid)) + if (!(exec_request = lookup_message_byclient (ctx->requests, msg)) || !(proc = flux_msg_aux_get (exec_request, "sdproc"))) { - flux_log (h, LOG_ERR, "write pid=%d: not found", pid); + flux_log (h, LOG_ERR, "sdexec.write: subprocess no longer exists"); return; } if (iodecode (io, &stream, NULL, NULL, NULL, NULL) == 0 && !streq (stream, "stdin")) { - flux_log (h, - LOG_ERR, - "write pid=%d stream=%s: invalid stream", - pid, - stream); + flux_log (h, LOG_ERR, "sdexec.write: %s is an invalid stream", stream); return; } if (sdexec_channel_write (proc->in, io) < 0) { - flux_log_error (h, "write pid=%d", pid); + flux_log_error (h, "sdexec.write %s", stream); return; } } From 5f80d28f3f5bc6236e2222067eea6bcf87fcf032 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 28 May 2024 11:29:15 -0700 Subject: [PATCH 2/6] libsubprocess: use matchtag in server write req Problem: RFC 42 now requires write requests to reference the subprocess via the exec matchtag rather than the pid. Expect a "matchtag" key in the write request and use it along with the requestor's uuid to look up the subprocess. --- src/common/libsubprocess/server.c | 38 +++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/common/libsubprocess/server.c b/src/common/libsubprocess/server.c index 139923f23da3..1010e9e12f75 100644 --- a/src/common/libsubprocess/server.c +++ b/src/common/libsubprocess/server.c @@ -107,6 +107,30 @@ static flux_subprocess_t *proc_find_bypid (subprocess_server_t *s, pid_t pid) return NULL; } +/* Find a .exec message with the same sender as msg and matchtag as + * specified in the request matchtag field. + * N.B. flux_cancel_match() happens to be helpful because RFC 42 subprocess + * write works like RFC 6 cancel. + */ +static flux_subprocess_t *proc_find_byclient (subprocess_server_t *s, + const flux_msg_t *request) +{ + flux_subprocess_t *p; + + p = zlistx_first (s->subprocesses); + while (p) { + const flux_msg_t *msg; + + if ((msg = flux_subprocess_aux_get (p, msgkey)) + && flux_cancel_match (request, msg)) + return p; + p = zlistx_next (s->subprocesses); + } + errno = ESRCH; + return NULL; +} + + static void proc_completion_cb (flux_subprocess_t *p) { subprocess_server_t *s = flux_subprocess_aux_get (p, srvkey); @@ -390,14 +414,14 @@ static void server_write_cb (flux_t *h, char *data = NULL; int len = 0; bool eof = false; - pid_t pid; + int matchtag; json_t *io = NULL; flux_error_t error; if (flux_request_unpack (msg, NULL, "{ s:i s:o }", - "pid", &pid, + "matchtag", &matchtag, "io", &io) < 0 || iodecode (io, &stream, NULL, &data, &len, &eof) < 0) { llog_error (s, @@ -415,7 +439,7 @@ static void server_write_cb (flux_t *h, * in flight, and is not necessarily an error, and can be common enough * that the log messages end up being a nuisance. */ - if (!(p = proc_find_bypid (s, pid)) + if (!(p = proc_find_byclient (s, msg)) || p->state != FLUX_SUBPROCESS_RUNNING) goto out; @@ -423,19 +447,15 @@ static void server_write_cb (flux_t *h, int rc = flux_subprocess_write (p, stream, data, len); if (rc < 0) { llog_error (s, - "Error writing %d bytes to subprocess pid %d %s", + "Error writing %d bytes to subprocess %s", len, - (int)pid, stream); goto error; } } if (eof) { if (flux_subprocess_close (p, stream) < 0) { - llog_error (s, - "Error writing EOF to subprocess pid %d %s", - (int)pid, - stream); + llog_error (s, "Error writing EOF to subprocess %s", stream); goto error; } } From dfdbb438794099196557714fa97ee044ff28eea0 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 5 Jun 2024 13:24:21 -0700 Subject: [PATCH 3/6] libsubprocess: use matchtag in client write req Problem: RFC 42 now requires write requests to reference the subprocess via the exec matchtag rather than the pid. Send the matchtag in the write request instead of the pid and alter the internal function prototype for subprocess_write() to accept the exec future in lieu of rank, service, and matchtag. Update users of that function. Also update a unit test that uses the raw protocol. --- src/common/libsubprocess/client.c | 31 +++++++++++++++--------- src/common/libsubprocess/client.h | 5 +--- src/common/libsubprocess/remote.c | 9 +------ src/common/libsubprocess/subprocess.c | 18 ++------------ src/common/libsubprocess/test/iostress.c | 9 ++++--- 5 files changed, 30 insertions(+), 42 deletions(-) diff --git a/src/common/libsubprocess/client.c b/src/common/libsubprocess/client.c index 741cae39f2cc..edb971eb972c 100644 --- a/src/common/libsubprocess/client.c +++ b/src/common/libsubprocess/client.c @@ -44,6 +44,9 @@ struct rexec_ctx { json_t *cmd; int flags; struct rexec_response response; + uint32_t matchtag; + uint32_t rank; + char *service_name; }; static void rexec_response_clear (struct rexec_response *resp) @@ -62,12 +65,16 @@ static void rexec_ctx_destroy (struct rexec_ctx *ctx) int saved_errno = errno; rexec_response_clear (&ctx->response); json_decref (ctx->cmd); + free (ctx->service_name); free (ctx); errno = saved_errno; } } -static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, int flags) +static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, + const char *service_name, + uint32_t rank, + int flags) { struct rexec_ctx *ctx; int valid_flags = SUBPROCESS_REXEC_STDOUT @@ -80,10 +87,12 @@ static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, int flags) } if (!(ctx = calloc (1, sizeof (*ctx)))) return NULL; - if (!(ctx->cmd = cmd_tojson (cmd))) + if (!(ctx->cmd = cmd_tojson (cmd)) + || !(ctx->service_name = strdup (service_name))) goto error; ctx->flags = flags; ctx->response.pid = -1; + ctx->rank = rank; return ctx; error: rexec_ctx_destroy (ctx); @@ -106,7 +115,7 @@ flux_future_t *subprocess_rexec (flux_t *h, } if (asprintf (&topic, "%s.exec", service_name) < 0) return NULL; - if (!(ctx = rexec_ctx_create (cmd, flags))) + if (!(ctx = rexec_ctx_create (cmd, service_name, rank, flags))) goto error; if (!(f = flux_rpc_pack (h, topic, @@ -122,6 +131,7 @@ flux_future_t *subprocess_rexec (flux_t *h, rexec_ctx_destroy (ctx); goto error; } + ctx->matchtag = flux_rpc_get_matchtag (f); free (topic); return f; error: @@ -223,33 +233,32 @@ bool subprocess_rexec_is_output (flux_future_t *f, return false; } -int subprocess_write (flux_t *h, - const char *service_name, - uint32_t rank, - pid_t pid, +int subprocess_write (flux_future_t *f_exec, const char *stream, const char *data, int len, bool eof) { + struct rexec_ctx *ctx = flux_future_aux_get (f_exec, "flux::rexec"); + flux_t *h = flux_future_get_flux (f_exec); flux_future_t *f = NULL; json_t *io; char *topic; int rc = -1; - if (!h || pid < 0 || !stream || !service_name) { + if (!stream || !ctx) { errno = EINVAL; return -1; } - if (asprintf (&topic, "%s.write", service_name) < 0) + if (asprintf (&topic, "%s.write", ctx->service_name) < 0) return -1; if (!(io = ioencode (stream, "0", data, len, eof)) || !(f = flux_rpc_pack (h, topic, - rank, + ctx->rank, FLUX_RPC_NORESPONSE, "{s:i s:O}", - "pid", pid, + "matchtag", ctx->matchtag, "io", io))) goto out; rc = 0; diff --git a/src/common/libsubprocess/client.h b/src/common/libsubprocess/client.h index 4ebf18b9ab97..83198d3ca380 100644 --- a/src/common/libsubprocess/client.h +++ b/src/common/libsubprocess/client.h @@ -40,10 +40,7 @@ bool subprocess_rexec_is_output (flux_future_t *f, int *len, bool *eof); -int subprocess_write (flux_t *h, - const char *service_name, - uint32_t rank, - pid_t pid, +int subprocess_write (flux_future_t *f, const char *stream, const char *data, int len, diff --git a/src/common/libsubprocess/remote.c b/src/common/libsubprocess/remote.c index fd8e5363ad20..ea75e028263b 100644 --- a/src/common/libsubprocess/remote.c +++ b/src/common/libsubprocess/remote.c @@ -487,14 +487,7 @@ static int send_channel_data (flux_subprocess_t *p) return -1; } } - if (subprocess_write (p->h, - p->service_name, - p->rank, - p->pid, - c->name, - ptr, - len, - c->closed) < 0) { + if (subprocess_write (p->f, c->name, ptr, len, c->closed) < 0) { llog_debug (p, "error sending rexec.write request: %s", strerror (errno)); diff --git a/src/common/libsubprocess/subprocess.c b/src/common/libsubprocess/subprocess.c index 701282af1a65..fbf2b357b2e4 100644 --- a/src/common/libsubprocess/subprocess.c +++ b/src/common/libsubprocess/subprocess.c @@ -729,14 +729,7 @@ int flux_subprocess_write (flux_subprocess_t *p, } } else { /* p->state == FLUX_SUBPROCESS_RUNNING */ - if (subprocess_write (p->h, - p->service_name, - p->rank, - p->pid, - c->name, - buf, - len, - false) < 0) { + if (subprocess_write (p->f, c->name, buf, len, false) < 0) { log_err ("error sending rexec.write request: %s", strerror (errno)); return -1; @@ -787,14 +780,7 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream) */ c->closed = true; if (p->state == FLUX_SUBPROCESS_RUNNING) { - if (subprocess_write (p->h, - p->service_name, - p->rank, - p->pid, - c->name, - NULL, - 0, - true) < 0) { + if (subprocess_write (p->f, c->name, NULL, 0, true) < 0) { log_err ("error sending rexec.write request: %s", strerror (errno)); return -1; diff --git a/src/common/libsubprocess/test/iostress.c b/src/common/libsubprocess/test/iostress.c index 389547e461fb..51b90ee12994 100644 --- a/src/common/libsubprocess/test/iostress.c +++ b/src/common/libsubprocess/test/iostress.c @@ -120,7 +120,7 @@ static void iostress_state_cb (flux_subprocess_t *p, } } -static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len) +static int rexec_write (flux_t *h, uint32_t matchtag, const char *buf, int len) { flux_future_t *f; json_t *io; @@ -133,7 +133,7 @@ static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len) 0, FLUX_RPC_NORESPONSE, "{s:i s:O}", - "pid", pid, + "matchtag", matchtag, "io", io))) { json_decref (io); return -1; @@ -150,15 +150,18 @@ static void iostress_source_cb (flux_reactor_t *r, { struct iostress_ctx *ctx = arg; char *buf; + uint32_t matchtag; if (!(buf = malloc (ctx->linesize))) BAIL_OUT ("out of memory"); memset (buf, 'F', ctx->linesize - 1); buf[ctx->linesize - 1] = '\n'; + matchtag = flux_rpc_get_matchtag (ctx->p->f); + for (int i = 0; i < ctx->batchlines; i++) { if (ctx->direct) { - if (rexec_write (ctx->h, ctx->pid, buf, ctx->linesize) < 0) + if (rexec_write (ctx->h, matchtag, buf, ctx->linesize) < 0) BAIL_OUT ("rexec_write failed"); } else { From 6300544e095e484ef2fa5131c140503b9ca1e65c Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 28 May 2024 16:31:57 -0700 Subject: [PATCH 4/6] sdexec: handle early sdexec.write requests Problem: if an sdexec.write request arrives before the unit stdin is ready, it will be dropped. This is possible now that sdexec.write identifies the unit by the matchtag instead of the pid. Queue early sdexec.write requests until stdin is valid, then move them back to the flux handle for processing as though they are being received for the first time. --- src/modules/sdexec/sdexec.c | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/modules/sdexec/sdexec.c b/src/modules/sdexec/sdexec.c index 5b3ae5b37fd5..f313f2778227 100644 --- a/src/modules/sdexec/sdexec.c +++ b/src/modules/sdexec/sdexec.c @@ -65,6 +65,7 @@ struct sdproc { flux_future_t *f_start; flux_future_t *f_stop; struct unit *unit; + struct flux_msglist *write_requests; struct channel *in; struct channel *out; struct channel *err; @@ -365,6 +366,21 @@ static void start_continuation (flux_future_t *f, void *arg) sdexec_channel_close_fd (proc->in); sdexec_channel_close_fd (proc->out); sdexec_channel_close_fd (proc->err); + /* Now that stdin is ready, re-queue any messages write_cb() left in + * proc->write_requests. Push these messages to the front of the flux_t + * queue so that they come before unprocessed writes, if any. + */ + if (proc->write_requests) { + const flux_msg_t *request; + while ((request = flux_msglist_pop (proc->write_requests))) { + int rc = flux_requeue (ctx->h, request, FLUX_RQ_HEAD); + flux_msg_decref (request); + if (rc < 0) { + flux_log_error (ctx->h, "error requeuing early sdexec.write"); + break; + } + } + } return; error: if (flux_respond_error (ctx->h, msg, errno, future_strerror (f, errno))) @@ -445,6 +461,7 @@ static void sdproc_destroy (struct sdproc *proc) flux_future_destroy (proc->f_stop); sdexec_unit_destroy (proc->unit); json_decref (proc->cmd); + flux_msglist_destroy (proc->write_requests); free (proc); errno = saved_errno; } @@ -760,6 +777,22 @@ static void write_cb (flux_t *h, flux_log (h, LOG_ERR, "sdexec.write: subprocess no longer exists"); return; } + /* If the systemd unit has not started yet, enqueue the write request for + * later processing in start_continuation(). We can tell that it hasn't + * started if start_continuation() has not yet handed the stdin channel + * file descriptor over to systemd by calling the close function. + */ + if (sdexec_channel_get_fd (proc->in) != -1) { // not yet claimed by systemd + if (!proc->write_requests) { + if (!(proc->write_requests = flux_msglist_create ())) { + flux_log_error (h, "sdexec.write: error creating write queue"); + return; + } + } + if (flux_msglist_push (proc->write_requests, msg) < 0) + flux_log_error (h, "sdexec.write: error enqueueing write request"); + return; + } if (iodecode (io, &stream, NULL, NULL, NULL, NULL) == 0 && !streq (stream, "stdin")) { flux_log (h, LOG_ERR, "sdexec.write: %s is an invalid stream", stream); From 2a5b7658f9a0ca893f3d6598d38cd35ed959efc6 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 6 Jun 2024 07:02:40 -0700 Subject: [PATCH 5/6] libsubprocess: allow server write before start Problem: data written to the subprocess server before the subprocess has entered RUNNING state is silently discarded, but must be handled in order to eliminate extraneous buffering on the client size. Drop stdin data only when the subprocess is in FAILED or EXITED state. --- src/common/libsubprocess/server.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/libsubprocess/server.c b/src/common/libsubprocess/server.c index 1010e9e12f75..8239aaf51e46 100644 --- a/src/common/libsubprocess/server.c +++ b/src/common/libsubprocess/server.c @@ -440,7 +440,8 @@ static void server_write_cb (flux_t *h, * that the log messages end up being a nuisance. */ if (!(p = proc_find_byclient (s, msg)) - || p->state != FLUX_SUBPROCESS_RUNNING) + || p->state == FLUX_SUBPROCESS_FAILED + || p->state == FLUX_SUBPROCESS_EXITED) goto out; if (data && len) { From b84e4ebd7601f95b986aa05d9393ec9b3332cd10 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 6 Jun 2024 06:53:10 -0700 Subject: [PATCH 6/6] libsubprocess: eliminate extra write buffer Problem: when stdin is written to a remote subprocess before the pid has been received, a buffer is created on the client side, but now that the protocol uses the matchtag instead of the pid, data can be sent early and this extra complexity can be avoided. Drop pre-running stdin buffering. --- src/common/libsubprocess/remote.c | 37 -------------- src/common/libsubprocess/subprocess.c | 51 ++++--------------- src/common/libsubprocess/subprocess_private.h | 1 - 3 files changed, 10 insertions(+), 79 deletions(-) diff --git a/src/common/libsubprocess/remote.c b/src/common/libsubprocess/remote.c index ea75e028263b..31a04c926246 100644 --- a/src/common/libsubprocess/remote.c +++ b/src/common/libsubprocess/remote.c @@ -468,41 +468,6 @@ static int remote_output_buffered (flux_subprocess_t *p, return 0; } -/* In the event channel had data / closed before process running */ -static int send_channel_data (flux_subprocess_t *p) -{ - struct subprocess_channel *c; - c = zhash_first (p->channels); - while (c) { - int bytes = fbuf_bytes (c->write_buffer); - if (c->closed || bytes > 0) { - const char *ptr = NULL; - int len = 0; - if (bytes > 0) { - if (!(ptr = fbuf_read (c->write_buffer, -1, &len))) { - llog_debug (p, - "error reading buffered data: %s", - strerror (errno)); - set_failed (p, "internal fbuf_read error"); - return -1; - } - } - if (subprocess_write (p->f, c->name, ptr, len, c->closed) < 0) { - llog_debug (p, - "error sending rexec.write request: %s", - strerror (errno)); - set_failed (p, "internal close error"); - return -1; - } - /* Don't need this buffer anymore, reclaim the space */ - fbuf_destroy (c->write_buffer); - c->write_buffer = NULL; - } - c = zhash_next (p->channels); - } - return 0; -} - static void rexec_continuation (flux_future_t *f, void *arg) { flux_subprocess_t *p = arg; @@ -536,8 +501,6 @@ static void rexec_continuation (flux_future_t *f, void *arg) if (subprocess_rexec_is_started (f, &p->pid)) { p->pid_set = true; process_new_state (p, FLUX_SUBPROCESS_RUNNING); - if (send_channel_data (p) < 0) - goto error; } else if (subprocess_rexec_is_stopped (f)) { process_new_state (p, FLUX_SUBPROCESS_STOPPED); diff --git a/src/common/libsubprocess/subprocess.c b/src/common/libsubprocess/subprocess.c index fbf2b357b2e4..71e599792df5 100644 --- a/src/common/libsubprocess/subprocess.c +++ b/src/common/libsubprocess/subprocess.c @@ -54,7 +54,6 @@ void channel_destroy (void *arg) flux_watcher_destroy (c->buffer_read_stopped_w); c->buffer_read_w_started = false; - fbuf_destroy (c->write_buffer); fbuf_destroy (c->read_buffer); flux_watcher_destroy (c->out_prep_w); flux_watcher_destroy (c->out_idle_w); @@ -707,35 +706,12 @@ int flux_subprocess_write (flux_subprocess_t *p, errno = EPIPE; return -1; } - if (p->state == FLUX_SUBPROCESS_INIT) { - if (!c->write_buffer) { - int buffer_size; - if ((buffer_size = cmd_option_bufsize (p, stream)) < 0) { - log_err ("cmd_option_bufsize: %s", strerror (errno)); - return -1; - } - if (!(c->write_buffer = fbuf_create (buffer_size))) { - log_err ("fbuf_create"); - return -1; - } - } - if (fbuf_space (c->write_buffer) < len) { - errno = ENOSPC; - return -1; - } - if ((ret = fbuf_write (c->write_buffer, buf, len)) < 0) { - log_err ("fbuf_write"); - return -1; - } - } - else { /* p->state == FLUX_SUBPROCESS_RUNNING */ - if (subprocess_write (p->f, c->name, buf, len, false) < 0) { - log_err ("error sending rexec.write request: %s", - strerror (errno)); - return -1; - } - ret = len; + if (subprocess_write (p->f, c->name, buf, len, false) < 0) { + log_err ("error sending rexec.write request: %s", + strerror (errno)); + return -1; } + ret = len; } return ret; @@ -773,19 +749,12 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream) c->closed = true; } else { - /* if process isn't running, eof plus any previously written - * data will be sent after process converts to running. See - * send_channel_data() in remote.c. If subprocess has already - * exited, this does nothing. - */ - c->closed = true; - if (p->state == FLUX_SUBPROCESS_RUNNING) { - if (subprocess_write (p->f, c->name, NULL, 0, true) < 0) { - log_err ("error sending rexec.write request: %s", - strerror (errno)); - return -1; - } + if (subprocess_write (p->f, c->name, NULL, 0, true) < 0) { + log_err ("error sending rexec.write request: %s", + strerror (errno)); + return -1; } + c->closed = true; } return 0; diff --git a/src/common/libsubprocess/subprocess_private.h b/src/common/libsubprocess/subprocess_private.h index 549ab828abc3..db9410f25c5f 100644 --- a/src/common/libsubprocess/subprocess_private.h +++ b/src/common/libsubprocess/subprocess_private.h @@ -45,7 +45,6 @@ struct subprocess_channel { bool buffer_read_w_started; /* remote */ - struct fbuf *write_buffer; /* buffer pre-running data */ struct fbuf *read_buffer; bool read_eof_received; flux_watcher_t *out_prep_w;