Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsubprocess: use matchtag instead of pid for flux_subprocess_write() #6013

Merged
merged 6 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions src/common/libsubprocess/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ struct rexec_ctx {
json_t *cmd;
int flags;
struct rexec_response response;
uint32_t matchtag;
uint32_t rank;
char *service_name;
};

static void rexec_response_clear (struct rexec_response *resp)
Expand All @@ -62,12 +65,16 @@ static void rexec_ctx_destroy (struct rexec_ctx *ctx)
int saved_errno = errno;
rexec_response_clear (&ctx->response);
json_decref (ctx->cmd);
free (ctx->service_name);
free (ctx);
errno = saved_errno;
}
}

static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, int flags)
static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd,
const char *service_name,
uint32_t rank,
int flags)
{
struct rexec_ctx *ctx;
int valid_flags = SUBPROCESS_REXEC_STDOUT
Expand All @@ -80,10 +87,12 @@ static struct rexec_ctx *rexec_ctx_create (flux_cmd_t *cmd, int flags)
}
if (!(ctx = calloc (1, sizeof (*ctx))))
return NULL;
if (!(ctx->cmd = cmd_tojson (cmd)))
if (!(ctx->cmd = cmd_tojson (cmd))
|| !(ctx->service_name = strdup (service_name)))
goto error;
ctx->flags = flags;
ctx->response.pid = -1;
ctx->rank = rank;
return ctx;
error:
rexec_ctx_destroy (ctx);
Expand All @@ -106,7 +115,7 @@ flux_future_t *subprocess_rexec (flux_t *h,
}
if (asprintf (&topic, "%s.exec", service_name) < 0)
return NULL;
if (!(ctx = rexec_ctx_create (cmd, flags)))
if (!(ctx = rexec_ctx_create (cmd, service_name, rank, flags)))
goto error;
if (!(f = flux_rpc_pack (h,
topic,
Expand All @@ -122,6 +131,7 @@ flux_future_t *subprocess_rexec (flux_t *h,
rexec_ctx_destroy (ctx);
goto error;
}
ctx->matchtag = flux_rpc_get_matchtag (f);
free (topic);
return f;
error:
Expand Down Expand Up @@ -223,33 +233,32 @@ bool subprocess_rexec_is_output (flux_future_t *f,
return false;
}

int subprocess_write (flux_t *h,
const char *service_name,
uint32_t rank,
pid_t pid,
int subprocess_write (flux_future_t *f_exec,
const char *stream,
const char *data,
int len,
bool eof)
{
struct rexec_ctx *ctx = flux_future_aux_get (f_exec, "flux::rexec");
flux_t *h = flux_future_get_flux (f_exec);
flux_future_t *f = NULL;
json_t *io;
char *topic;
int rc = -1;

if (!h || pid < 0 || !stream || !service_name) {
if (!stream || !ctx) {
errno = EINVAL;
return -1;
}
if (asprintf (&topic, "%s.write", service_name) < 0)
if (asprintf (&topic, "%s.write", ctx->service_name) < 0)
return -1;
if (!(io = ioencode (stream, "0", data, len, eof))
|| !(f = flux_rpc_pack (h,
topic,
rank,
ctx->rank,
FLUX_RPC_NORESPONSE,
"{s:i s:O}",
"pid", pid,
"matchtag", ctx->matchtag,
"io", io)))
goto out;
rc = 0;
Expand Down
5 changes: 1 addition & 4 deletions src/common/libsubprocess/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ bool subprocess_rexec_is_output (flux_future_t *f,
int *len,
bool *eof);

int subprocess_write (flux_t *h,
const char *service_name,
uint32_t rank,
pid_t pid,
int subprocess_write (flux_future_t *f,
const char *stream,
const char *data,
int len,
Expand Down
44 changes: 0 additions & 44 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -468,48 +468,6 @@ static int remote_output_buffered (flux_subprocess_t *p,
return 0;
}

/* In the event channel had data / closed before process running */
static int send_channel_data (flux_subprocess_t *p)
{
struct subprocess_channel *c;
c = zhash_first (p->channels);
while (c) {
int bytes = fbuf_bytes (c->write_buffer);
if (c->closed || bytes > 0) {
const char *ptr = NULL;
int len = 0;
if (bytes > 0) {
if (!(ptr = fbuf_read (c->write_buffer, -1, &len))) {
llog_debug (p,
"error reading buffered data: %s",
strerror (errno));
set_failed (p, "internal fbuf_read error");
return -1;
}
}
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
ptr,
len,
c->closed) < 0) {
llog_debug (p,
"error sending rexec.write request: %s",
strerror (errno));
set_failed (p, "internal close error");
return -1;
}
/* Don't need this buffer anymore, reclaim the space */
fbuf_destroy (c->write_buffer);
c->write_buffer = NULL;
}
c = zhash_next (p->channels);
}
return 0;
}

static void rexec_continuation (flux_future_t *f, void *arg)
{
flux_subprocess_t *p = arg;
Expand Down Expand Up @@ -543,8 +501,6 @@ static void rexec_continuation (flux_future_t *f, void *arg)
if (subprocess_rexec_is_started (f, &p->pid)) {
p->pid_set = true;
process_new_state (p, FLUX_SUBPROCESS_RUNNING);
if (send_channel_data (p) < 0)
goto error;
}
else if (subprocess_rexec_is_stopped (f)) {
process_new_state (p, FLUX_SUBPROCESS_STOPPED);
Expand Down
41 changes: 31 additions & 10 deletions src/common/libsubprocess/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,30 @@ static flux_subprocess_t *proc_find_bypid (subprocess_server_t *s, pid_t pid)
return NULL;
}

/* Find a <service>.exec message with the same sender as msg and matchtag as
* specified in the request matchtag field.
* N.B. flux_cancel_match() happens to be helpful because RFC 42 subprocess
* write works like RFC 6 cancel.
*/
static flux_subprocess_t *proc_find_byclient (subprocess_server_t *s,
const flux_msg_t *request)
{
flux_subprocess_t *p;

p = zlistx_first (s->subprocesses);
while (p) {
const flux_msg_t *msg;

if ((msg = flux_subprocess_aux_get (p, msgkey))
&& flux_cancel_match (request, msg))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to prior comment

return p;
p = zlistx_next (s->subprocesses);
}
errno = ESRCH;
return NULL;
}


static void proc_completion_cb (flux_subprocess_t *p)
{
subprocess_server_t *s = flux_subprocess_aux_get (p, srvkey);
Expand Down Expand Up @@ -390,14 +414,14 @@ static void server_write_cb (flux_t *h,
char *data = NULL;
int len = 0;
bool eof = false;
pid_t pid;
int matchtag;
json_t *io = NULL;
flux_error_t error;

if (flux_request_unpack (msg,
NULL,
"{ s:i s:o }",
"pid", &pid,
"matchtag", &matchtag,
"io", &io) < 0
|| iodecode (io, &stream, NULL, &data, &len, &eof) < 0) {
llog_error (s,
Expand All @@ -415,27 +439,24 @@ static void server_write_cb (flux_t *h,
* in flight, and is not necessarily an error, and can be common enough
* that the log messages end up being a nuisance.
*/
if (!(p = proc_find_bypid (s, pid))
|| p->state != FLUX_SUBPROCESS_RUNNING)
if (!(p = proc_find_byclient (s, msg))
|| p->state == FLUX_SUBPROCESS_FAILED
|| p->state == FLUX_SUBPROCESS_EXITED)
goto out;

if (data && len) {
int rc = flux_subprocess_write (p, stream, data, len);
if (rc < 0) {
llog_error (s,
"Error writing %d bytes to subprocess pid %d %s",
"Error writing %d bytes to subprocess %s",
len,
(int)pid,
stream);
goto error;
}
}
if (eof) {
if (flux_subprocess_close (p, stream) < 0) {
llog_error (s,
"Error writing EOF to subprocess pid %d %s",
(int)pid,
stream);
llog_error (s, "Error writing EOF to subprocess %s", stream);
goto error;
}
}
Expand Down
65 changes: 10 additions & 55 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ void channel_destroy (void *arg)
flux_watcher_destroy (c->buffer_read_stopped_w);
c->buffer_read_w_started = false;

fbuf_destroy (c->write_buffer);
fbuf_destroy (c->read_buffer);
flux_watcher_destroy (c->out_prep_w);
flux_watcher_destroy (c->out_idle_w);
Expand Down Expand Up @@ -707,42 +706,12 @@ int flux_subprocess_write (flux_subprocess_t *p,
errno = EPIPE;
return -1;
}
if (p->state == FLUX_SUBPROCESS_INIT) {
if (!c->write_buffer) {
int buffer_size;
if ((buffer_size = cmd_option_bufsize (p, stream)) < 0) {
log_err ("cmd_option_bufsize: %s", strerror (errno));
return -1;
}
if (!(c->write_buffer = fbuf_create (buffer_size))) {
log_err ("fbuf_create");
return -1;
}
}
if (fbuf_space (c->write_buffer) < len) {
errno = ENOSPC;
return -1;
}
if ((ret = fbuf_write (c->write_buffer, buf, len)) < 0) {
log_err ("fbuf_write");
return -1;
}
}
else { /* p->state == FLUX_SUBPROCESS_RUNNING */
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
buf,
len,
false) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
ret = len;
if (subprocess_write (p->f, c->name, buf, len, false) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
ret = len;
}

return ret;
Expand Down Expand Up @@ -780,26 +749,12 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream)
c->closed = true;
}
else {
/* if process isn't running, eof plus any previously written
* data will be sent after process converts to running. See
* send_channel_data() in remote.c. If subprocess has already
* exited, this does nothing.
*/
c->closed = true;
if (p->state == FLUX_SUBPROCESS_RUNNING) {
if (subprocess_write (p->h,
p->service_name,
p->rank,
p->pid,
c->name,
NULL,
0,
true) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
if (subprocess_write (p->f, c->name, NULL, 0, true) < 0) {
log_err ("error sending rexec.write request: %s",
strerror (errno));
return -1;
}
c->closed = true;
}

return 0;
Expand Down
1 change: 0 additions & 1 deletion src/common/libsubprocess/subprocess_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ struct subprocess_channel {
bool buffer_read_w_started;

/* remote */
struct fbuf *write_buffer; /* buffer pre-running data */
struct fbuf *read_buffer;
bool read_eof_received;
flux_watcher_t *out_prep_w;
Expand Down
9 changes: 6 additions & 3 deletions src/common/libsubprocess/test/iostress.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ static void iostress_state_cb (flux_subprocess_t *p,
}
}

static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len)
static int rexec_write (flux_t *h, uint32_t matchtag, const char *buf, int len)
{
flux_future_t *f;
json_t *io;
Expand All @@ -133,7 +133,7 @@ static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len)
0,
FLUX_RPC_NORESPONSE,
"{s:i s:O}",
"pid", pid,
"matchtag", matchtag,
"io", io))) {
json_decref (io);
return -1;
Expand All @@ -150,15 +150,18 @@ static void iostress_source_cb (flux_reactor_t *r,
{
struct iostress_ctx *ctx = arg;
char *buf;
uint32_t matchtag;

if (!(buf = malloc (ctx->linesize)))
BAIL_OUT ("out of memory");
memset (buf, 'F', ctx->linesize - 1);
buf[ctx->linesize - 1] = '\n';

matchtag = flux_rpc_get_matchtag (ctx->p->f);

for (int i = 0; i < ctx->batchlines; i++) {
if (ctx->direct) {
if (rexec_write (ctx->h, ctx->pid, buf, ctx->linesize) < 0)
if (rexec_write (ctx->h, matchtag, buf, ctx->linesize) < 0)
BAIL_OUT ("rexec_write failed");
}
else {
Expand Down
Loading
Loading