From bf5913ebdb2c405c616de5aff9056d701181a2bb Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 13:40:34 -0700 Subject: [PATCH] libflux: handle one message per _then cb Rather than looping in the rpc implementation if there are still responses to consume after the user's continuation callback runs, push messages back into the receive queue and let the reactor call the handler again based on its concept of fairnes. --- src/common/libflux/rpc.c | 92 +++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 52 deletions(-) diff --git a/src/common/libflux/rpc.c b/src/common/libflux/rpc.c index 9cf596bf1569..fa214caf5d9a 100644 --- a/src/common/libflux/rpc.c +++ b/src/common/libflux/rpc.c @@ -48,7 +48,7 @@ struct flux_rpc_struct { void *then_arg; uint32_t *nodemap; /* nodeid indexed by matchtag */ zmsg_t *rx_msg; - bool rx_msg_consumed; + zmsg_t *rx_msg_consumed; int rx_count; bool oneway; }; @@ -61,6 +61,7 @@ void flux_rpc_destroy (flux_rpc_t rpc) if (rpc->m.matchtag != FLUX_MATCHTAG_NONE) flux_matchtag_free (rpc->h, rpc->m.matchtag, rpc->m.bsize); zmsg_destroy (&rpc->rx_msg); + zmsg_destroy (&rpc->rx_msg_consumed); if (rpc->nodemap) free (rpc->nodemap); free (rpc); @@ -95,33 +96,13 @@ static uint32_t lookup_nodeid (flux_rpc_t rpc, uint32_t matchtag) return rpc->nodemap[ix]; } -static void garbage_collect (flux_rpc_t rpc) +static zmsg_t *rpc_response_recv (flux_rpc_t rpc, bool nonblock) { - if (rpc->rx_msg && rpc->rx_msg_consumed) - zmsg_destroy (&rpc->rx_msg); -} - -static void rpc_addmsg (flux_rpc_t rpc, zmsg_t **zmsg) -{ - assert (rpc->rx_msg == NULL); - rpc->rx_msg = *zmsg; - *zmsg = NULL; - rpc->rx_msg_consumed = false; - rpc->rx_count++; + return flux_recvmsg_match (rpc->h, rpc->m, NULL, nonblock); } -static int rpc_recvmsg (flux_rpc_t rpc, bool nonblock) -{ - assert (rpc->rx_msg == NULL); - if (!(rpc->rx_msg = flux_recvmsg_match (rpc->h, rpc->m, NULL, nonblock))) - return -1; - rpc->rx_msg_consumed = false; - rpc->rx_count++; - return 0; -} - -static int rpc_sendmsg (flux_rpc_t rpc, int n, const char *topic, - const char *json_str, uint32_t nodeid) +static int rpc_request_send (flux_rpc_t rpc, int n, const char *topic, + const char *json_str, uint32_t nodeid) { zmsg_t *zmsg; int flags = 0; @@ -150,11 +131,9 @@ bool flux_rpc_check (flux_rpc_t rpc) { if (rpc->oneway) return false; - garbage_collect (rpc); - if (!rpc->rx_msg && rpc_recvmsg (rpc, true) < 0) - errno = 0; - if (rpc->rx_msg) + if (rpc->rx_msg || (rpc->rx_msg = rpc_response_recv (rpc, true))) return true; + errno = 0; return false; } @@ -166,15 +145,17 @@ int flux_rpc_get (flux_rpc_t rpc, uint32_t *nodeid, const char **json_str) errno = EINVAL; goto done; } - garbage_collect (rpc); - if (!rpc->rx_msg && rpc_recvmsg (rpc, false) < 0) + if (!rpc->rx_msg && !(rpc->rx_msg = rpc_response_recv (rpc, false))) goto done; - rpc->rx_msg_consumed = true; - if (flux_response_decode (rpc->rx_msg, NULL, json_str) < 0) + zmsg_destroy (&rpc->rx_msg_consumed); /* invalidate last-got payload */ + rpc->rx_msg_consumed = rpc->rx_msg; + rpc->rx_msg = NULL; + rpc->rx_count++; + if (flux_response_decode (rpc->rx_msg_consumed, NULL, json_str) < 0) goto done; if (nodeid) { uint32_t matchtag; - if (flux_msg_get_matchtag (rpc->rx_msg, &matchtag) < 0) + if (flux_msg_get_matchtag (rpc->rx_msg_consumed, &matchtag) < 0) goto done; *nodeid = lookup_nodeid (rpc, matchtag); } @@ -183,20 +164,31 @@ int flux_rpc_get (flux_rpc_t rpc, uint32_t *nodeid, const char **json_str) return rc; } -/* N.B. if the user's 'then' callback doesn't call flux_rpc_get(), - * the callback will be called again (ad infinitum) +/* N.B. if a new message arrives with an unconsumed one in the rpc handle, + * push the new one back to to the receive queue so it will trigger another + * reactor callback and handle the cached one now. + * The reactor will repeatedly call the continuation (level-triggered) + * until all received responses are consumed. */ static int rpc_cb (flux_t h, int type, zmsg_t **zmsg, void *arg) { flux_rpc_t rpc = arg; - assert (rpc->then_cb != NULL); - do { - garbage_collect (rpc); - if (!rpc->rx_msg) - rpc_addmsg (rpc, zmsg); - rpc->then_cb (rpc, rpc->then_arg); - } while (*zmsg != NULL); + + if (rpc->rx_msg) { + if (flux_pushmsg (rpc->h, zmsg) < 0) + goto done; + } else { + rpc->rx_msg = *zmsg; + *zmsg = NULL; + } + rpc->then_cb (rpc, rpc->then_arg); + if (rpc->rx_msg) { + if (flux_pushmsg (rpc->h, &rpc->rx_msg) < 0) + goto done; + } +done: /* no good way to report flux_pushmsg() errors */ + zmsg_destroy (zmsg); return 0; } @@ -211,10 +203,8 @@ int flux_rpc_then (flux_rpc_t rpc, flux_then_f cb, void *arg) if (cb && !rpc->then_cb) { if (flux_msghandler_add_match (rpc->h, rpc->m, rpc_cb, rpc) < 0) goto done; - if (rpc->rx_msg && !rpc->rx_msg_consumed) { - if (flux_pushmsg (rpc->h, &rpc->rx_msg) < 0) - goto done; - } + if (rpc->rx_msg && flux_pushmsg (rpc->h, &rpc->rx_msg) < 0) + goto done; } else if (!cb && rpc->then_cb) { flux_msghandler_remove_match (rpc->h, rpc->m); } @@ -227,9 +217,7 @@ int flux_rpc_then (flux_rpc_t rpc, flux_then_f cb, void *arg) bool flux_rpc_completed (flux_rpc_t rpc) { - if (rpc->oneway) - return true; - if (rpc->rx_count == rpc->m.bsize && rpc->rx_msg_consumed) + if (rpc->oneway || rpc->rx_count == rpc->m.bsize) return true; return false; } @@ -239,7 +227,7 @@ flux_rpc_t flux_rpc (flux_t h, const char *topic, const char *json_str, { flux_rpc_t rpc = rpc_create (h, flags, 1); - if (rpc_sendmsg (rpc, 0, topic, json_str, nodeid) < 0) + if (rpc_request_send (rpc, 0, topic, json_str, nodeid) < 0) goto error; if (!rpc->oneway) rpc->nodemap[0] = nodeid; @@ -279,7 +267,7 @@ flux_rpc_t flux_rpc_multi (flux_t h, const char *topic, const char *json_str, for (i = 0; i < count; i++) { uint32_t nodeid = nodeset_next (itr); assert (nodeid != NODESET_EOF); - if (rpc_sendmsg (rpc, i, topic, json_str, nodeid) < 0) + if (rpc_request_send (rpc, i, topic, json_str, nodeid) < 0) goto error; if (!rpc->oneway) rpc->nodemap[i] = nodeid;