Skip to content

Commit

Permalink
libflux: handle one message per _then cb
Browse files Browse the repository at this point in the history
Rather than looping in the rpc implementation if there are still
responses to consume after the user's continuation callback runs,
push messages back into the receive queue and let the reactor call
the handler again based on its concept of fairnes.
  • Loading branch information
garlick committed Jun 5, 2015
1 parent 2fe6044 commit bf5913e
Showing 1 changed file with 40 additions and 52 deletions.
92 changes: 40 additions & 52 deletions src/common/libflux/rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct flux_rpc_struct {
void *then_arg;
uint32_t *nodemap; /* nodeid indexed by matchtag */
zmsg_t *rx_msg;
bool rx_msg_consumed;
zmsg_t *rx_msg_consumed;
int rx_count;
bool oneway;
};
Expand All @@ -61,6 +61,7 @@ void flux_rpc_destroy (flux_rpc_t rpc)
if (rpc->m.matchtag != FLUX_MATCHTAG_NONE)
flux_matchtag_free (rpc->h, rpc->m.matchtag, rpc->m.bsize);
zmsg_destroy (&rpc->rx_msg);
zmsg_destroy (&rpc->rx_msg_consumed);
if (rpc->nodemap)
free (rpc->nodemap);
free (rpc);
Expand Down Expand Up @@ -95,33 +96,13 @@ static uint32_t lookup_nodeid (flux_rpc_t rpc, uint32_t matchtag)
return rpc->nodemap[ix];
}

static void garbage_collect (flux_rpc_t rpc)
static zmsg_t *rpc_response_recv (flux_rpc_t rpc, bool nonblock)
{
if (rpc->rx_msg && rpc->rx_msg_consumed)
zmsg_destroy (&rpc->rx_msg);
}

static void rpc_addmsg (flux_rpc_t rpc, zmsg_t **zmsg)
{
assert (rpc->rx_msg == NULL);
rpc->rx_msg = *zmsg;
*zmsg = NULL;
rpc->rx_msg_consumed = false;
rpc->rx_count++;
return flux_recvmsg_match (rpc->h, rpc->m, NULL, nonblock);
}

static int rpc_recvmsg (flux_rpc_t rpc, bool nonblock)
{
assert (rpc->rx_msg == NULL);
if (!(rpc->rx_msg = flux_recvmsg_match (rpc->h, rpc->m, NULL, nonblock)))
return -1;
rpc->rx_msg_consumed = false;
rpc->rx_count++;
return 0;
}

static int rpc_sendmsg (flux_rpc_t rpc, int n, const char *topic,
const char *json_str, uint32_t nodeid)
static int rpc_request_send (flux_rpc_t rpc, int n, const char *topic,
const char *json_str, uint32_t nodeid)
{
zmsg_t *zmsg;
int flags = 0;
Expand Down Expand Up @@ -150,11 +131,9 @@ bool flux_rpc_check (flux_rpc_t rpc)
{
if (rpc->oneway)
return false;
garbage_collect (rpc);
if (!rpc->rx_msg && rpc_recvmsg (rpc, true) < 0)
errno = 0;
if (rpc->rx_msg)
if (rpc->rx_msg || (rpc->rx_msg = rpc_response_recv (rpc, true)))
return true;
errno = 0;
return false;
}

Expand All @@ -166,15 +145,17 @@ int flux_rpc_get (flux_rpc_t rpc, uint32_t *nodeid, const char **json_str)
errno = EINVAL;
goto done;
}
garbage_collect (rpc);
if (!rpc->rx_msg && rpc_recvmsg (rpc, false) < 0)
if (!rpc->rx_msg && !(rpc->rx_msg = rpc_response_recv (rpc, false)))
goto done;
rpc->rx_msg_consumed = true;
if (flux_response_decode (rpc->rx_msg, NULL, json_str) < 0)
zmsg_destroy (&rpc->rx_msg_consumed); /* invalidate last-got payload */
rpc->rx_msg_consumed = rpc->rx_msg;
rpc->rx_msg = NULL;
rpc->rx_count++;
if (flux_response_decode (rpc->rx_msg_consumed, NULL, json_str) < 0)
goto done;
if (nodeid) {
uint32_t matchtag;
if (flux_msg_get_matchtag (rpc->rx_msg, &matchtag) < 0)
if (flux_msg_get_matchtag (rpc->rx_msg_consumed, &matchtag) < 0)
goto done;
*nodeid = lookup_nodeid (rpc, matchtag);
}
Expand All @@ -183,20 +164,31 @@ int flux_rpc_get (flux_rpc_t rpc, uint32_t *nodeid, const char **json_str)
return rc;
}

/* N.B. if the user's 'then' callback doesn't call flux_rpc_get(),
* the callback will be called again (ad infinitum)
/* N.B. if a new message arrives with an unconsumed one in the rpc handle,
* push the new one back to to the receive queue so it will trigger another
* reactor callback and handle the cached one now.
* The reactor will repeatedly call the continuation (level-triggered)
* until all received responses are consumed.
*/
static int rpc_cb (flux_t h, int type, zmsg_t **zmsg, void *arg)
{
flux_rpc_t rpc = arg;

assert (rpc->then_cb != NULL);
do {
garbage_collect (rpc);
if (!rpc->rx_msg)
rpc_addmsg (rpc, zmsg);
rpc->then_cb (rpc, rpc->then_arg);
} while (*zmsg != NULL);

if (rpc->rx_msg) {
if (flux_pushmsg (rpc->h, zmsg) < 0)
goto done;
} else {
rpc->rx_msg = *zmsg;
*zmsg = NULL;
}
rpc->then_cb (rpc, rpc->then_arg);
if (rpc->rx_msg) {
if (flux_pushmsg (rpc->h, &rpc->rx_msg) < 0)
goto done;
}
done: /* no good way to report flux_pushmsg() errors */
zmsg_destroy (zmsg);
return 0;
}

Expand All @@ -211,10 +203,8 @@ int flux_rpc_then (flux_rpc_t rpc, flux_then_f cb, void *arg)
if (cb && !rpc->then_cb) {
if (flux_msghandler_add_match (rpc->h, rpc->m, rpc_cb, rpc) < 0)
goto done;
if (rpc->rx_msg && !rpc->rx_msg_consumed) {
if (flux_pushmsg (rpc->h, &rpc->rx_msg) < 0)
goto done;
}
if (rpc->rx_msg && flux_pushmsg (rpc->h, &rpc->rx_msg) < 0)
goto done;
} else if (!cb && rpc->then_cb) {
flux_msghandler_remove_match (rpc->h, rpc->m);
}
Expand All @@ -227,9 +217,7 @@ int flux_rpc_then (flux_rpc_t rpc, flux_then_f cb, void *arg)

bool flux_rpc_completed (flux_rpc_t rpc)
{
if (rpc->oneway)
return true;
if (rpc->rx_count == rpc->m.bsize && rpc->rx_msg_consumed)
if (rpc->oneway || rpc->rx_count == rpc->m.bsize)
return true;
return false;
}
Expand All @@ -239,7 +227,7 @@ flux_rpc_t flux_rpc (flux_t h, const char *topic, const char *json_str,
{
flux_rpc_t rpc = rpc_create (h, flags, 1);

if (rpc_sendmsg (rpc, 0, topic, json_str, nodeid) < 0)
if (rpc_request_send (rpc, 0, topic, json_str, nodeid) < 0)
goto error;
if (!rpc->oneway)
rpc->nodemap[0] = nodeid;
Expand Down Expand Up @@ -279,7 +267,7 @@ flux_rpc_t flux_rpc_multi (flux_t h, const char *topic, const char *json_str,
for (i = 0; i < count; i++) {
uint32_t nodeid = nodeset_next (itr);
assert (nodeid != NODESET_EOF);
if (rpc_sendmsg (rpc, i, topic, json_str, nodeid) < 0)
if (rpc_request_send (rpc, i, topic, json_str, nodeid) < 0)
goto error;
if (!rpc->oneway)
rpc->nodemap[i] = nodeid;
Expand Down

0 comments on commit bf5913e

Please sign in to comment.