From cada1e785f5d85974c3265a3817dc02d381f3b28 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 28 May 2024 11:36:52 -0700 Subject: [PATCH] libsubprocess: use matchtag in client write req Problem: RFC 42 now requires write requests to reference the subprocess via the exec matchtag rather than the pid. Send the matchtag in the write request instead of the pid. Also update a unit test that uses the raw protocol. --- src/common/libsubprocess/client.c | 6 +++--- src/common/libsubprocess/client.h | 2 +- src/common/libsubprocess/remote.c | 4 ++-- src/common/libsubprocess/test/iostress.c | 9 ++++++--- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/common/libsubprocess/client.c b/src/common/libsubprocess/client.c index 741cae39f2cc..b2a53b538aae 100644 --- a/src/common/libsubprocess/client.c +++ b/src/common/libsubprocess/client.c @@ -226,7 +226,7 @@ bool subprocess_rexec_is_output (flux_future_t *f, int subprocess_write (flux_t *h, const char *service_name, uint32_t rank, - pid_t pid, + uint32_t exec_matchtag, const char *stream, const char *data, int len, @@ -237,7 +237,7 @@ int subprocess_write (flux_t *h, char *topic; int rc = -1; - if (!h || pid < 0 || !stream || !service_name) { + if (!h || exec_matchtag == FLUX_MATCHTAG_NONE || !stream || !service_name) { errno = EINVAL; return -1; } @@ -249,7 +249,7 @@ int subprocess_write (flux_t *h, rank, FLUX_RPC_NORESPONSE, "{s:i s:O}", - "pid", pid, + "matchtag", exec_matchtag, "io", io))) goto out; rc = 0; diff --git a/src/common/libsubprocess/client.h b/src/common/libsubprocess/client.h index 4ebf18b9ab97..8532cb517bbe 100644 --- a/src/common/libsubprocess/client.h +++ b/src/common/libsubprocess/client.h @@ -43,7 +43,7 @@ bool subprocess_rexec_is_output (flux_future_t *f, int subprocess_write (flux_t *h, const char *service_name, uint32_t rank, - pid_t pid, + uint32_t exec_matchtag, const char *stream, const char *data, int len, diff --git a/src/common/libsubprocess/remote.c b/src/common/libsubprocess/remote.c index e7450566329c..7b9e2a7e04b2 100644 --- a/src/common/libsubprocess/remote.c +++ b/src/common/libsubprocess/remote.c @@ -211,7 +211,7 @@ static int remote_write (struct subprocess_channel *c) if (subprocess_write (c->p->h, c->p->service_name, c->p->rank, - c->p->pid, + flux_rpc_get_matchtag (c->p->f), c->name, ptr, lenp, @@ -235,7 +235,7 @@ static int remote_close (struct subprocess_channel *c) if (subprocess_write (c->p->h, c->p->service_name, c->p->rank, - c->p->pid, + flux_rpc_get_matchtag (c->p->f), c->name, NULL, 0, diff --git a/src/common/libsubprocess/test/iostress.c b/src/common/libsubprocess/test/iostress.c index 389547e461fb..51b90ee12994 100644 --- a/src/common/libsubprocess/test/iostress.c +++ b/src/common/libsubprocess/test/iostress.c @@ -120,7 +120,7 @@ static void iostress_state_cb (flux_subprocess_t *p, } } -static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len) +static int rexec_write (flux_t *h, uint32_t matchtag, const char *buf, int len) { flux_future_t *f; json_t *io; @@ -133,7 +133,7 @@ static int rexec_write (flux_t *h, pid_t pid, const char *buf, int len) 0, FLUX_RPC_NORESPONSE, "{s:i s:O}", - "pid", pid, + "matchtag", matchtag, "io", io))) { json_decref (io); return -1; @@ -150,15 +150,18 @@ static void iostress_source_cb (flux_reactor_t *r, { struct iostress_ctx *ctx = arg; char *buf; + uint32_t matchtag; if (!(buf = malloc (ctx->linesize))) BAIL_OUT ("out of memory"); memset (buf, 'F', ctx->linesize - 1); buf[ctx->linesize - 1] = '\n'; + matchtag = flux_rpc_get_matchtag (ctx->p->f); + for (int i = 0; i < ctx->batchlines; i++) { if (ctx->direct) { - if (rexec_write (ctx->h, ctx->pid, buf, ctx->linesize) < 0) + if (rexec_write (ctx->h, matchtag, buf, ctx->linesize) < 0) BAIL_OUT ("rexec_write failed"); } else {