Skip to content

Commit

Permalink
rpc/test: cover multiple responses
Browse files Browse the repository at this point in the history
Add tests to cover RPCs with multiple responses.

An RPC issues a requested number of responses followed
by an error response with errnum == ENODATA (like EOF).

1) Call flux_rpc_get() in a loop, with flux_future_reset()
after each successful call.  Verify that the requetested
number of responses was received.

2) Set up a continuation that calls flux_future_reset()
after a successful flux_rpc_get().  On failure, verify
that the requested number of responses was received.

3) Like #2 except handle the first response in one
continuation, and subsequent responses in another.
It covers calling flux_future_then() a second time on
a future.
  • Loading branch information
garlick committed May 3, 2018
1 parent e91b6fc commit 279b11b
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions t/rpc/rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,33 @@ void rpcftest_hello_cb (flux_t *h, flux_msg_handler_t *mh,
(void)flux_respond_pack (h, msg, "{}");
}

/* Send back the requested number of responses followed an ENODATA error.
*/
void rpctest_multi_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
int i, count;

if (flux_request_unpack (msg, NULL, "{s:i}", "count", &count) < 0)
goto error;
for (i = 0; i < count; i++) {
if (flux_respond_pack (h, msg, "{s:i}", "seq", i) < 0)
diag ("%s: flux_respond: %s", __FUNCTION__, flux_strerror (errno));
}
errno = ENODATA; // EOF of sorts
error:
if (flux_respond (h, msg, errno, NULL) < 0)
diag ("%s: flux_respond: %s", __FUNCTION__, flux_strerror (errno));
}

static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "rpctest.incr", rpctest_incr_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "rpctest.hello", rpctest_hello_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "rpcftest.hello", rpcftest_hello_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "rpctest.echo", rpctest_echo_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "rpctest.rawecho", rpctest_rawecho_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "rpctest.nodeid", rpctest_nodeid_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "rpctest.multi", rpctest_multi_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand Down Expand Up @@ -367,6 +387,113 @@ void test_then (flux_t *h)
diag ("completed test of continuations");
}

void test_multi_response (flux_t *h)
{
flux_future_t *f;
int seq = -1;
int count = 0;

f = flux_rpc_pack (h, "rpctest.multi", FLUX_NODEID_ANY, 0,
"{s:i}", "count", 3);
if (!f)
BAIL_OUT ("flux_rpc_pack failed");
errno = 0;
while (flux_rpc_get_unpack (f, "{s:i}", "seq", &seq) == 0) {
count++;
flux_future_reset (f);
}
ok (errno == ENODATA,
"multi-now: got ENODATA as EOF");
ok (count == 3,
"multi-now: received 3 valid responses");
flux_future_destroy (f);
}

static void multi_then_cb (flux_future_t *f, void *arg)
{
int seq = 0;
static int count = 0;

errno = 0;
if (flux_rpc_get_unpack (f, "{s:i}", "seq", &seq) == 0) {
flux_future_reset (f);
count++;
return;
}
ok (errno == ENODATA,
"multi-then: got ENODATA as EOF in continuation");
ok (count == 3,
"multi-then: received 3 valid responses");
flux_reactor_stop (flux_future_get_reactor (f));
flux_future_destroy (f);
}

void test_multi_response_then (flux_t *h)
{
flux_future_t *f;

f = flux_rpc_pack (h, "rpctest.multi", FLUX_NODEID_ANY, 0,
"{s:i}", "count", 3);
if (!f)
BAIL_OUT ("flux_rpc_pack failed");
ok (flux_future_then (f, -1., multi_then_cb, NULL) == 0,
"multi-then: flux_future_then works");
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
BAIL_OUT ("flux_reactor_run failed");
}

static void multi_then_next_cb (flux_future_t *f, void *arg)
{
int seq = 0;
static int count = 0;

errno = 0;
if (flux_rpc_get_unpack (f, "{s:i}", "seq", &seq) == 0) {
flux_future_reset (f);
count++;
return;
}
ok (errno == ENODATA,
"multi-then-chain: got ENODATA as EOF in continuation");
ok (count == 2,
"multi-then-chain: received 2 valid responses after first");
flux_reactor_stop (flux_future_get_reactor (f));
flux_future_destroy (f);
}

static void multi_then_first_cb (flux_future_t *f, void *arg)
{
int seq = 0;
int rc;

rc = flux_rpc_get_unpack (f, "{s:i}", "seq", &seq);
ok (rc == 0,
"multi-then-chain: received first response");
if (rc == 0) {
flux_future_reset (f);
ok (flux_future_then (f, -1., multi_then_next_cb, NULL) == 0,
"multi-then-chain: flux_future_then works");
}
else {
flux_reactor_stop_error (flux_future_get_reactor (f));
flux_future_destroy (f);
}
}

void test_multi_response_then_chain (flux_t *h)
{
flux_future_t *f;

f = flux_rpc_pack (h, "rpctest.multi", FLUX_NODEID_ANY, 0,
"{s:i}", "count", 3);
if (!f)
BAIL_OUT ("flux_rpc_pack failed");
ok (flux_future_then (f, -1., multi_then_first_cb, NULL) == 0,
"multi-then: flux_future_then works");
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
BAIL_OUT ("flux_reactor_run failed");
}

/* Bit of code to test the test framework.
*/
static int fake_server (flux_t *h, void *arg)
Expand Down Expand Up @@ -436,6 +563,9 @@ int main (int argc, char *argv[])
test_basic (h);
test_encoding (h);
test_then (h);
test_multi_response (h);
test_multi_response_then (h);
test_multi_response_then_chain (h);

ok (test_server_stop (h) == 0,
"stopped test server thread");
Expand Down

0 comments on commit 279b11b

Please sign in to comment.