From 0bdbf8eaa443c322e91ab1b4ac712e5c0bc1b838 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 2 Oct 2024 13:43:38 -0700 Subject: [PATCH] libsubprocess: cover subprocess on_credit Problem: There is no coverage for the new libsubprocess on_credit callback. Add unit tests in libsubprocess/test/stdio.c and libsubprocess/test/iostress.c. --- src/common/libsubprocess/test/iostress.c | 91 +++++++++++- src/common/libsubprocess/test/stdio.c | 177 +++++++++++++++++++++++ 2 files changed, 267 insertions(+), 1 deletion(-) diff --git a/src/common/libsubprocess/test/iostress.c b/src/common/libsubprocess/test/iostress.c index 59525bb743c9..459a87b46ad8 100644 --- a/src/common/libsubprocess/test/iostress.c +++ b/src/common/libsubprocess/test/iostress.c @@ -30,6 +30,7 @@ enum { WRITE_API, WRITE_DIRECT, + WRITE_CREDIT, }; struct iostress_ctx { @@ -40,12 +41,15 @@ struct iostress_ctx { pid_t pid; size_t linesize; char *buf; + int buf_index; int linerecv; int batchcount; int batchlines; int batchcursor; + int batchlinescursor; int outputcount; int write_type; + int stdin_credits; const char *name; }; @@ -105,7 +109,12 @@ static void iostress_state_cb (flux_subprocess_t *p, case FLUX_SUBPROCESS_INIT: case FLUX_SUBPROCESS_RUNNING: ctx->pid = flux_subprocess_pid (p); - flux_watcher_start (ctx->source); // start sourcing data + /* if credit based write, writing will be taken care of in + * iostress_credit_cb() + */ + if (ctx->write_type == WRITE_API + || ctx->write_type == WRITE_DIRECT) + flux_watcher_start (ctx->source); // start sourcing data break; case FLUX_SUBPROCESS_STOPPED: break; @@ -197,10 +206,67 @@ static void iostress_source_cb (flux_reactor_t *r, iostress_start_doomsday (ctx, 2.); } +static void iostress_credit_cb (flux_subprocess_t *p, + const char *stream, + int bytes) +{ + struct iostress_ctx *ctx = flux_subprocess_aux_get (p, "ctx"); + + if (ctx->write_type != WRITE_CREDIT) + return; + + // diag ("%s credit cb stream=%s bytes=%d", ctx->name, stream, bytes); + + ctx->stdin_credits += bytes; + + while (ctx->batchcursor < ctx->batchcount) { + while (ctx->batchlinescursor < ctx->batchlines) { + int wlen, len; + + if ((ctx->linesize - ctx->buf_index) < ctx->stdin_credits) + wlen = ctx->linesize - ctx->buf_index; + else + wlen = ctx->stdin_credits; + len = flux_subprocess_write (ctx->p, + "stdin", + &ctx->buf[ctx->buf_index], + wlen); + if (len < 0) { + diag ("%s: source: %s", ctx->name, strerror (errno)); + goto error; + } + ctx->stdin_credits -= len; + ctx->buf_index += len; + if (ctx->buf_index == ctx->linesize) { + ctx->buf_index = 0; + ctx->batchlinescursor++; + } + if (ctx->stdin_credits == 0) + break; + } + if (ctx->batchlinescursor == ctx->batchlines) { + ctx->batchlinescursor = 0; + if (++ctx->batchcursor == ctx->batchcount) { + if (flux_subprocess_close (ctx->p, "stdin") < 0) { + diag ("%s: source: %s", ctx->name, strerror (errno)); + goto error; + } + break; + } + } + if (ctx->stdin_credits == 0) + break; + } + return; +error: + iostress_start_doomsday (ctx, 2.); +} + flux_subprocess_ops_t iostress_ops = { .on_completion = iostress_completion_cb, .on_state_change = iostress_state_cb, .on_stdout = iostress_output_cb, + .on_credit = iostress_credit_cb, }; bool iostress_run_check (flux_t *h, @@ -226,6 +292,7 @@ bool iostress_run_check (flux_t *h, ctx.linesize = linesize; ctx.name = name; ctx.write_type = write_type; + ctx.stdin_credits = 0; if (!(ctx.buf = malloc (ctx.linesize))) BAIL_OUT ("out of memory"); @@ -344,6 +411,28 @@ int main (int argc, char *argv[]) 4096), "tinystdin-direct failed as expected"); + // remote stdin buffer managed via credits should work + ok (iostress_run_check (h, + "tinystdin-credit", + WRITE_CREDIT, + 128, + 0, + 1, + 1, + 4096), + "tinystdin-credit works as expected"); + + // credits w/ more data + ok (iostress_run_check (h, + "tinystdin-credit-more", + WRITE_CREDIT, + 128, + 0, + 8, + 8, + 4096), + "tinystdin-credit-more works as expected"); + test_server_stop (h); flux_close (h); done_testing (); diff --git a/src/common/libsubprocess/test/stdio.c b/src/common/libsubprocess/test/stdio.c index a0fe79dae641..037a2fb069a4 100644 --- a/src/common/libsubprocess/test/stdio.c +++ b/src/common/libsubprocess/test/stdio.c @@ -39,6 +39,11 @@ int multiple_lines_stderr_output_cb_count; int stdin_closed_stdout_cb_count; int stdin_closed_stderr_cb_count; int timer_cb_count; +int credit_cb_count; +int stdin_credit; +char inputbuf[1024]; +int inputbuf_index; +int inputbuf_len; char outputbuf[1024]; int outputbuf_len; @@ -1414,6 +1419,174 @@ void test_long_line (flux_reactor_t *r) flux_cmd_destroy (cmd); } +void credit_output_cb (flux_subprocess_t *p, const char *stream) +{ + const char *buf = NULL; + int len; + + if (strcasecmp (stream, "stdout")) { + ok (false, "unexpected stream %s", stream); + return; + } + + len = flux_subprocess_read (p, stream, &buf); + ok (len >= 0, + "flux_subprocess_read on %s success", stream); + + if (len > 0) { + memcpy (outputbuf + outputbuf_len, buf, len); + outputbuf_len += len; + } + else { + char cmpbuf[1024]; + + ok (flux_subprocess_read_stream_closed (p, stream), + "flux_subprocess_read_stream_closed saw EOF on %s", stream); + + sprintf (cmpbuf, "abcdefghijklmnopqrstuvwxyz0123456789\n"); + ok (streq (outputbuf, cmpbuf), + "flux_subprocess_read returned correct data"); + /* 26 (ABCs) + 10 (1-10) + 1 for `\n' */ + ok (outputbuf_len == (26 + 10 + 1), + "flux_subprocess_read returned correct amount of data"); + } + stdout_output_cb_count++; +} + +void credit_cb (flux_subprocess_t *p, const char *stream, int bytes) +{ + int *credits = flux_subprocess_aux_get (p, "credits"); + int len; + int ret; + + assert (credits); + + diag ("on_credit: credit of %d bytes", bytes); + + (*credits) += bytes; + + if ((inputbuf_len - inputbuf_index) > 0) { + if ((inputbuf_len - inputbuf_index) > (*credits)) + len = (*credits); + else + len = (inputbuf_len - inputbuf_index); + + ret = flux_subprocess_write (p, "stdin", &inputbuf[inputbuf_index], len); + ok (ret == len, + "flux_subprocess_write success"); + + (*credits) -= ret; + inputbuf_index += ret; + } + else { + ok (flux_subprocess_close (p, "stdin") == 0, + "flux_subprocess_close success"); + } + credit_cb_count++; +} + +void test_on_credit (flux_reactor_t *r) +{ + char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-O", NULL }; + flux_cmd_t *cmd; + flux_subprocess_t *p = NULL; + int credits = 0; + int ret; + + ok ((cmd = flux_cmd_create (2, av, environ)) != NULL, "flux_cmd_create"); + ok (flux_cmd_setopt (cmd, "stdin_BUFSIZE", "8") == 0, + "set stdin buffer size to 1024 bytes"); + + flux_subprocess_ops_t ops = { + .on_completion = completion_cb, + .on_stdout = credit_output_cb, + .on_credit = credit_cb + }; + completion_cb_count = 0; + stdout_output_cb_count = 0; + credit_cb_count = 0; + sprintf (inputbuf, "abcdefghijklmnopqrstuvwxyz0123456789"); + inputbuf_index = 0; + inputbuf_len = (26 + 10); + memset (outputbuf, '\0', sizeof (outputbuf)); + outputbuf_len = 0; + p = flux_local_exec (r, 0, cmd, &ops); + ok (p != NULL, "flux_local_exec"); + ret = flux_subprocess_aux_set (p, "credits", &credits, NULL); + ok (ret == 0, "flux_subprocess_aux_set works"); + + ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, + "subprocess state == RUNNING after flux_local_exec"); + + errno = 0; + ret = flux_subprocess_write (p, "stdin", &inputbuf[inputbuf_index], 10); + ok (ret < 0 && errno == ENOSPC, + "flux_subprocess_write fails with too much data"); + + int rc = flux_reactor_run (r, 0); + ok (rc == 0, "flux_reactor_run returned zero status"); + ok (completion_cb_count == 1, "completion callback called 1 time"); + ok (stdout_output_cb_count >= 2, "stdout output callback called >= 2 times"); + ok (credit_cb_count == 6, "credit callback called 6 times"); + flux_subprocess_destroy (p); + flux_cmd_destroy (cmd); +} + +/* very similar to above test but we send initial write, and there + * should be one fewer credit callback + */ +void test_on_credit_no_initial_credits (flux_reactor_t *r) +{ + char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-O", NULL }; + flux_cmd_t *cmd; + flux_subprocess_t *p = NULL; + int credits = 8; /* start at 8 b/c we're using NO_INITIAL_CREDITS */ + int ret; + + ok ((cmd = flux_cmd_create (2, av, environ)) != NULL, "flux_cmd_create"); + ok (flux_cmd_setopt (cmd, "stdin_BUFSIZE", "8") == 0, + "set stdin buffer size to 1024 bytes"); + + flux_subprocess_ops_t ops = { + .on_completion = completion_cb, + .on_stdout = credit_output_cb, + .on_credit = credit_cb + }; + completion_cb_count = 0; + stdout_output_cb_count = 0; + credit_cb_count = 0; + sprintf (inputbuf, "abcdefghijklmnopqrstuvwxyz0123456789"); + inputbuf_index = 0; + inputbuf_len = (26 + 10); + memset (outputbuf, '\0', sizeof (outputbuf)); + outputbuf_len = 0; + p = flux_local_exec (r, + FLUX_SUBPROCESS_FLAGS_NO_INITIAL_CREDITS, + cmd, + &ops); + ok (p != NULL, "flux_local_exec"); + ret = flux_subprocess_aux_set (p, "credits", &credits, NULL); + ok (ret == 0, "flux_subprocess_aux_set works"); + + ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, + "subprocess state == RUNNING after flux_local_exec"); + + errno = 0; + ret = flux_subprocess_write (p, "stdin", &inputbuf[inputbuf_index], 8); + ok (ret == 8, + "flux_subprocess_write first 8 bytes"); + credits -= ret; + inputbuf_index += ret; + + int rc = flux_reactor_run (r, 0); + ok (rc == 0, "flux_reactor_run returned zero status"); + ok (completion_cb_count == 1, "completion callback called 1 time"); + ok (stdout_output_cb_count >= 2, "stdout output callback called >= 2 times"); + ok (credit_cb_count == 5, "credit callback called 5 times"); + flux_subprocess_destroy (p); + flux_cmd_destroy (cmd); +} + int main (int argc, char *argv[]) { flux_reactor_t *r; @@ -1475,6 +1648,10 @@ int main (int argc, char *argv[]) test_stream_start_stop_mid_stop (r); diag ("long_line"); test_long_line (r); + diag ("on_credit"); + test_on_credit (r); + diag ("on_credit_no_initial_credits"); + test_on_credit_no_initial_credits (r); end_fdcount = fdcount ();