Skip to content

Commit

Permalink
libsubprocess: use matchtag in client write req
Browse files Browse the repository at this point in the history
Problem: RFC 42 now requires write requests to reference the
subprocess via the exec matchtag rather than the pid.

Send the matchtag in the write request instead of the pid.

Also update a unit test that uses the raw protocol.
  • Loading branch information
garlick committed May 29, 2024
1 parent 7875b6f commit cada1e7
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 9 deletions.
6 changes: 3 additions & 3 deletions src/common/libsubprocess/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ bool subprocess_rexec_is_output (flux_future_t *f,
int subprocess_write (flux_t *h,
const char *service_name,
uint32_t rank,
pid_t pid,
uint32_t exec_matchtag,
const char *stream,
const char *data,
int len,
Expand All @@ -237,7 +237,7 @@ int subprocess_write (flux_t *h,
char *topic;
int rc = -1;

if (!h || pid < 0 || !stream || !service_name) {
if (!h || exec_matchtag == FLUX_MATCHTAG_NONE || !stream || !service_name) {
errno = EINVAL;
return -1;
}
Expand All @@ -249,7 +249,7 @@ int subprocess_write (flux_t *h,
rank,
FLUX_RPC_NORESPONSE,
"{s:i s:O}",
"pid", pid,
"matchtag", exec_matchtag,
"io", io)))
goto out;
rc = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/common/libsubprocess/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ bool subprocess_rexec_is_output (flux_future_t *f,
int subprocess_write (flux_t *h,
const char *service_name,
uint32_t rank,
pid_t pid,
uint32_t exec_matchtag,
const char *stream,
const char *data,
int len,
Expand Down
4 changes: 2 additions & 2 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ static int remote_write (struct subprocess_channel *c)
if (subprocess_write (c->p->h,
c->p->service_name,
c->p->rank,
c->p->pid,
flux_rpc_get_matchtag (c->p->f),
c->name,
ptr,
lenp,
Expand All @@ -235,7 +235,7 @@ static int remote_close (struct subprocess_channel *c)
if (subprocess_write (c->p->h,
c->p->service_name,
c->p->rank,
c->p->pid,
flux_rpc_get_matchtag (c->p->f),
c->name,
NULL,
0,
Expand Down
9 changes: 6 additions & 3 deletions src/common/libsubprocess/test/iostress.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ static void iostress_state_cb (flux_subprocess_t *p,
}
}

static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len)
static int rexec_write (flux_t *h, uint32_t matchtag, const char *buf, int len)
{
flux_future_t *f;
json_t *io;
Expand All @@ -133,7 +133,7 @@ static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len)
0,
FLUX_RPC_NORESPONSE,
"{s:i s:O}",
"pid", pid,
"matchtag", matchtag,
"io", io))) {
json_decref (io);
return -1;
Expand All @@ -150,15 +150,18 @@ static void iostress_source_cb (flux_reactor_t *r,
{
struct iostress_ctx *ctx = arg;
char *buf;
uint32_t matchtag;

if (!(buf = malloc (ctx->linesize)))
BAIL_OUT ("out of memory");
memset (buf, 'F', ctx->linesize - 1);
buf[ctx->linesize - 1] = '\n';

matchtag = flux_rpc_get_matchtag (ctx->p->f);

for (int i = 0; i < ctx->batchlines; i++) {
if (ctx->direct) {
if (rexec_write (ctx->h, ctx->pid, buf, ctx->linesize) < 0)
if (rexec_write (ctx->h, matchtag, buf, ctx->linesize) < 0)
BAIL_OUT ("rexec_write failed");
}
else {
Expand Down

0 comments on commit cada1e7

Please sign in to comment.