Skip to content

Commit

Permalink
Merge pull request #6013 from garlick/rexec_matchtag
Browse files Browse the repository at this point in the history
libsubprocess: use matchtag instead of pid for flux_subprocess_write()
  • Loading branch information
mergify[bot] authored Jun 10, 2024
2 parents 0ddc3d9 + b84e4eb commit 67fc412
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 138 deletions.
31 changes: 20 additions & 11 deletions src/common/libsubprocess/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ struct rexec_ctx {
json_t *cmd;
int flags;
struct rexec_response response;
uint32_t matchtag;
uint32_t rank;
char *service_name;
};

static void rexec_response_clear (struct rexec_response *resp)
Expand All @@ -62,12 +65,16 @@ static void rexec_ctx_destroy (struct rexec_ctx *ctx)
int saved_errno = errno;
rexec_response_clear (&ctx->response);
json_decref (ctx->cmd);
free (ctx->service_name);
free (ctx);
errno = saved_errno;
}
}

static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, int flags)
static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd,
const char *service_name,
uint32_t rank,
int flags)
{
struct rexec_ctx *ctx;
int valid_flags = SUBPROCESS_REXEC_STDOUT
Expand All @@ -80,10 +87,12 @@ static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, int flags)
}
if (!(ctx = calloc (1, sizeof (*ctx))))
return NULL;
if (!(ctx->cmd = cmd_tojson (cmd)))
if (!(ctx->cmd = cmd_tojson (cmd))
|| !(ctx->service_name = strdup (service_name)))
goto error;
ctx->flags = flags;
ctx->response.pid = -1;
ctx->rank = rank;
return ctx;
error:
rexec_ctx_destroy (ctx);
Expand All @@ -106,7 +115,7 @@ flux_future_t *subprocess_rexec (flux_t *h,
}
if (asprintf (&topic, "%s.exec", service_name) < 0)
return NULL;
if (!(ctx = rexec_ctx_create (cmd, flags)))
if (!(ctx = rexec_ctx_create (cmd, service_name, rank, flags)))
goto error;
if (!(f = flux_rpc_pack (h,
topic,
Expand All @@ -122,6 +131,7 @@ flux_future_t *subprocess_rexec (flux_t *h,
rexec_ctx_destroy (ctx);
goto error;
}
ctx->matchtag = flux_rpc_get_matchtag (f);
free (topic);
return f;
error:
Expand Down Expand Up @@ -223,33 +233,32 @@ bool subprocess_rexec_is_output (flux_future_t *f,
return false;
}

int subprocess_write (flux_t *h,
const char *service_name,
uint32_t rank,
pid_t pid,
int subprocess_write (flux_future_t *f_exec,
const char *stream,
const char *data,
int len,
bool eof)
{
struct rexec_ctx *ctx = flux_future_aux_get (f_exec, "flux::rexec");
flux_t *h = flux_future_get_flux (f_exec);
flux_future_t *f = NULL;
json_t *io;
char *topic;
int rc = -1;

if (!h || pid < 0 || !stream || !service_name) {
if (!stream || !ctx) {
errno = EINVAL;
return -1;
}
if (asprintf (&topic, "%s.write", service_name) < 0)
if (asprintf (&topic, "%s.write", ctx->service_name) < 0)
return -1;
if (!(io = ioencode (stream, "0", data, len, eof))
|| !(f = flux_rpc_pack (h,
topic,
rank,
ctx->rank,
FLUX_RPC_NORESPONSE,
"{s:i s:O}",
"pid", pid,
"matchtag", ctx->matchtag,
"io", io)))
goto out;
rc = 0;
Expand Down
5 changes: 1 addition & 4 deletions src/common/libsubprocess/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ bool subprocess_rexec_is_output (flux_future_t *f,
int *len,
bool *eof);

int subprocess_write (flux_t *h,
const char *service_name,
uint32_t rank,
pid_t pid,
int subprocess_write (flux_future_t *f,
const char *stream,
const char *data,
int len,
Expand Down
44 changes: 0 additions & 44 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -468,48 +468,6 @@ static int remote_output_buffered (flux_subprocess_t *p,
return 0;
}

/* In the event channel had data / closed before process running */
static int send_channel_data (flux_subprocess_t *p)
{
struct subprocess_channel *c;
c = zhash_first (p->channels);
while (c) {
int bytes = fbuf_bytes (c->write_buffer);
if (c->closed || bytes > 0) {
const char *ptr = NULL;
int len = 0;
if (bytes > 0) {
if (!(ptr = fbuf_read (c->write_buffer, -1, &len))) {
llog_debug (p,
"error reading buffered data: %s",
strerror (errno));
set_failed (p, "internal fbuf_read error");
return -1;
}
}
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
ptr,
len,
c->closed) < 0) {
llog_debug (p,
"error sending rexec.write request: %s",
strerror (errno));
set_failed (p, "internal close error");
return -1;
}
/* Don't need this buffer anymore, reclaim the space */
fbuf_destroy (c->write_buffer);
c->write_buffer = NULL;
}
c = zhash_next (p->channels);
}
return 0;
}

static void rexec_continuation (flux_future_t *f, void *arg)
{
flux_subprocess_t *p = arg;
Expand Down Expand Up @@ -543,8 +501,6 @@ static void rexec_continuation (flux_future_t *f, void *arg)
if (subprocess_rexec_is_started (f, &p->pid)) {
p->pid_set = true;
process_new_state (p, FLUX_SUBPROCESS_RUNNING);
if (send_channel_data (p) < 0)
goto error;
}
else if (subprocess_rexec_is_stopped (f)) {
process_new_state (p, FLUX_SUBPROCESS_STOPPED);
Expand Down
41 changes: 31 additions & 10 deletions src/common/libsubprocess/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,30 @@ static flux_subprocess_t *proc_find_bypid (subprocess_server_t *s, pid_t pid)
return NULL;
}

/* Find a <service>.exec message with the same sender as msg and matchtag as
* specified in the request matchtag field.
* N.B. flux_cancel_match() happens to be helpful because RFC 42 subprocess
* write works like RFC 6 cancel.
*/
static flux_subprocess_t *proc_find_byclient (subprocess_server_t *s,
const flux_msg_t *request)
{
flux_subprocess_t *p;

p = zlistx_first (s->subprocesses);
while (p) {
const flux_msg_t *msg;

if ((msg = flux_subprocess_aux_get (p, msgkey))
&& flux_cancel_match (request, msg))
return p;
p = zlistx_next (s->subprocesses);
}
errno = ESRCH;
return NULL;
}


static void proc_completion_cb (flux_subprocess_t *p)
{
subprocess_server_t *s = flux_subprocess_aux_get (p, srvkey);
Expand Down Expand Up @@ -390,14 +414,14 @@ static void server_write_cb (flux_t *h,
char *data = NULL;
int len = 0;
bool eof = false;
pid_t pid;
int matchtag;
json_t *io = NULL;
flux_error_t error;

if (flux_request_unpack (msg,
NULL,
"{ s:i s:o }",
"pid", &pid,
"matchtag", &matchtag,
"io", &io) < 0
|| iodecode (io, &stream, NULL, &data, &len, &eof) < 0) {
llog_error (s,
Expand All @@ -415,27 +439,24 @@ static void server_write_cb (flux_t *h,
* in flight, and is not necessarily an error, and can be common enough
* that the log messages end up being a nuisance.
*/
if (!(p = proc_find_bypid (s, pid))
|| p->state != FLUX_SUBPROCESS_RUNNING)
if (!(p = proc_find_byclient (s, msg))
|| p->state == FLUX_SUBPROCESS_FAILED
|| p->state == FLUX_SUBPROCESS_EXITED)
goto out;

if (data && len) {
int rc = flux_subprocess_write (p, stream, data, len);
if (rc < 0) {
llog_error (s,
"Error writing %d bytes to subprocess pid %d %s",
"Error writing %d bytes to subprocess %s",
len,
(int)pid,
stream);
goto error;
}
}
if (eof) {
if (flux_subprocess_close (p, stream) < 0) {
llog_error (s,
"Error writing EOF to subprocess pid %d %s",
(int)pid,
stream);
llog_error (s, "Error writing EOF to subprocess %s", stream);
goto error;
}
}
Expand Down
65 changes: 10 additions & 55 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ void channel_destroy (void *arg)
flux_watcher_destroy (c->buffer_read_stopped_w);
c->buffer_read_w_started = false;

fbuf_destroy (c->write_buffer);
fbuf_destroy (c->read_buffer);
flux_watcher_destroy (c->out_prep_w);
flux_watcher_destroy (c->out_idle_w);
Expand Down Expand Up @@ -707,42 +706,12 @@ int flux_subprocess_write (flux_subprocess_t *p,
errno = EPIPE;
return -1;
}
if (p->state == FLUX_SUBPROCESS_INIT) {
if (!c->write_buffer) {
int buffer_size;
if ((buffer_size = cmd_option_bufsize (p, stream)) < 0) {
log_err ("cmd_option_bufsize: %s", strerror (errno));
return -1;
}
if (!(c->write_buffer = fbuf_create (buffer_size))) {
log_err ("fbuf_create");
return -1;
}
}
if (fbuf_space (c->write_buffer) < len) {
errno = ENOSPC;
return -1;
}
if ((ret = fbuf_write (c->write_buffer, buf, len)) < 0) {
log_err ("fbuf_write");
return -1;
}
}
else { /* p->state == FLUX_SUBPROCESS_RUNNING */
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
buf,
len,
false) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
ret = len;
if (subprocess_write (p->f, c->name, buf, len, false) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
ret = len;
}

return ret;
Expand Down Expand Up @@ -780,26 +749,12 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream)
c->closed = true;
}
else {
/* if process isn't running, eof plus any previously written
* data will be sent after process converts to running. See
* send_channel_data() in remote.c. If subprocess has already
* exited, this does nothing.
*/
c->closed = true;
if (p->state == FLUX_SUBPROCESS_RUNNING) {
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
NULL,
0,
true) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
if (subprocess_write (p->f, c->name, NULL, 0, true) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
c->closed = true;
}

return 0;
Expand Down
1 change: 0 additions & 1 deletion src/common/libsubprocess/subprocess_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ struct subprocess_channel {
bool buffer_read_w_started;

/* remote */
struct fbuf *write_buffer; /* buffer pre-running data */
struct fbuf *read_buffer;
bool read_eof_received;
flux_watcher_t *out_prep_w;
Expand Down
9 changes: 6 additions & 3 deletions src/common/libsubprocess/test/iostress.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ static void iostress_state_cb (flux_subprocess_t *p,
}
}

static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len)
static int rexec_write (flux_t *h, uint32_t matchtag, const char *buf, int len)
{
flux_future_t *f;
json_t *io;
Expand All @@ -133,7 +133,7 @@ static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len)
0,
FLUX_RPC_NORESPONSE,
"{s:i s:O}",
"pid", pid,
"matchtag", matchtag,
"io", io))) {
json_decref (io);
return -1;
Expand All @@ -150,15 +150,18 @@ static void iostress_source_cb (flux_reactor_t *r,
{
struct iostress_ctx *ctx = arg;
char *buf;
uint32_t matchtag;

if (!(buf = malloc (ctx->linesize)))
BAIL_OUT ("out of memory");
memset (buf, 'F', ctx->linesize - 1);
buf[ctx->linesize - 1] = '\n';

matchtag = flux_rpc_get_matchtag (ctx->p->f);

for (int i = 0; i < ctx->batchlines; i++) {
if (ctx->direct) {
if (rexec_write (ctx->h, ctx->pid, buf, ctx->linesize) < 0)
if (rexec_write (ctx->h, matchtag, buf, ctx->linesize) < 0)
BAIL_OUT ("rexec_write failed");
}
else {
Expand Down
Loading

0 comments on commit 67fc412

Please sign in to comment.