From 0977e7e6a1c4957138582050f206acedfdbe2d78 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 14:56:58 -0700 Subject: [PATCH 01/13] build: remove detrius from t/Makefile.am Eliminates warnings from autogen.sh by removing references to tests introduced and then removed in pr #215. --- t/Makefile.am | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/t/Makefile.am b/t/Makefile.am index f274a9655f61..bef818aad62a 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -82,18 +82,6 @@ test_cppflags = \ -I$(top_srcdir)/src/common/libtap \ $(AM_CPPFLAGS) -loop_request_t_SOURCES = loop/request.c -loop_request_t_CPPFLAGS = $(test_cppflags) -loop_request_t_LDADD = $(test_ldadd) $(LIBDL) - -loop_response_t_SOURCES = loop/response.c -loop_response_t_CPPFLAGS = $(test_cppflags) -loop_response_t_LDADD = $(test_ldadd) $(LIBDL) - -loop_event_t_SOURCES = loop/event.c -loop_event_t_CPPFLAGS = $(test_cppflags) -loop_event_t_LDADD = $(test_ldadd) $(LIBDL) - loop_rpc_t_SOURCES = loop/rpc.c loop_rpc_t_CPPFLAGS = $(test_cppflags) loop_rpc_t_LDADD = $(test_ldadd) $(LIBDL) From ac883982cf1b11ed758a674df331ea17423de1a8 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:31:24 -0700 Subject: [PATCH 02/13] libflux: flux_rpc_get sets nodeid on failure flux_rpc_get() whould set the nodeid even if the internal response decode fails. The common case of the decode failure will be that the remote returned an errnum, and the flux_rpc_multi() caller will want to know which node failed. --- src/common/libflux/rpc.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/libflux/rpc.c b/src/common/libflux/rpc.c index fa214caf5d9a..a788a45b755d 100644 --- a/src/common/libflux/rpc.c +++ b/src/common/libflux/rpc.c @@ -151,14 +151,14 @@ int flux_rpc_get (flux_rpc_t rpc, uint32_t *nodeid, const char **json_str) rpc->rx_msg_consumed = rpc->rx_msg; rpc->rx_msg = NULL; rpc->rx_count++; - if (flux_response_decode (rpc->rx_msg_consumed, NULL, json_str) < 0) - goto done; if (nodeid) { uint32_t matchtag; if (flux_msg_get_matchtag (rpc->rx_msg_consumed, &matchtag) < 0) goto done; *nodeid = lookup_nodeid (rpc, matchtag); } + if (flux_response_decode (rpc->rx_msg_consumed, NULL, json_str) < 0) + goto done; rc = 0; done: return rc; From 5744613dfbf8fc41db311d8c4cbdf2f32e780aa4 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:35:37 -0700 Subject: [PATCH 03/13] flux-module: use new flux_rpc_multi() --- src/cmd/flux-module.c | 55 ++++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/src/cmd/flux-module.c b/src/cmd/flux-module.c index ef4426579255..9398af48d80a 100644 --- a/src/cmd/flux-module.c +++ b/src/cmd/flux-module.c @@ -263,15 +263,19 @@ void mod_insmod (flux_t h, opt_t opt) char *topic = xasprintf ("%s.insmod", service); char *json_str = flux_insmod_json_encode (modpath, opt.argc, opt.argv); assert (json_str != NULL); - JSON in = Jfromstr (json_str); - assert (in != NULL); - if (flux_json_multrpc (h, opt.nodeset, opt.fanout, topic, in, - NULL, NULL) < 0) - err_exit ("%s", modname); + flux_rpc_t r = flux_rpc_multi (h, topic, json_str, opt.nodeset, 0); + if (!r) + err_exit ("%s", topic); + while (!flux_rpc_completed (r)) { + uint32_t nodeid = FLUX_NODEID_ANY; + if (flux_rpc_get (r, NULL, NULL) < 0) + err_exit ("%s[%d]", topic, + nodeid == FLUX_NODEID_ANY ? -1 : nodeid); + } + flux_rpc_destroy (r); free (topic); free (service); free (json_str); - Jput (in); } else { if (flux_modctl_load (h, opt.nodeset, modpath, opt.argc, opt.argv) < 0) err_exit ("%s", modname); @@ -294,13 +298,17 @@ void mod_rmmod (flux_t h, opt_t opt) char *topic = xasprintf ("%s.rmmod", service); char *json_str = flux_rmmod_json_encode (modname); assert (json_str != NULL); - JSON in = Jfromstr (json_str); - if (flux_json_multrpc (h, opt.nodeset, opt.fanout, topic, in, - NULL, NULL) < 0) - err_exit ("%s", modname); + flux_rpc_t r = flux_rpc_multi (h, topic, json_str, opt.nodeset, 0); + if (!r) + err_exit ("%s", topic); + while (!flux_rpc_completed (r)) { + uint32_t nodeid = FLUX_NODEID_ANY; + if (flux_rpc_get (r, &nodeid, NULL) < 0) + err ("%s[%d]", topic, nodeid == FLUX_NODEID_ANY ? -1 : nodeid); + } + flux_rpc_destroy (r); free (topic); free (service); - Jput (in); free (json_str); } else { if (flux_modctl_unload (h, opt.nodeset, modname) < 0) @@ -375,19 +383,16 @@ void lsmod_map_hash (zhash_t *mods, flux_lsmod_f cb, void *arg) zlist_destroy (&keys); } -int lsmod_hash_cb (uint32_t nodeid, uint32_t errnum, JSON out, void *arg) +int lsmod_hash_cb (uint32_t nodeid, const char *json_str, zhash_t *mods) { flux_modlist_t modlist; - zhash_t *mods = arg; mod_t *m; int i, len; const char *name, *digest; int size, idle; int rc = -1; - if (errnum) - return 0; - if (!(modlist = flux_lsmod_json_decode (Jtostr (out)))) + if (!(modlist = flux_lsmod_json_decode (json_str))) goto done; if ((len = flux_modlist_count (modlist)) == -1) goto done; @@ -424,13 +429,21 @@ void mod_lsmod (flux_t h, opt_t opt) "Module", "Size", "Digest", "Idle", "Nodeset"); if (opt.direct) { zhash_t *mods = zhash_new (); - char *topic = xasprintf ("%s.lsmod", service); - if (!mods) oom (); - if (flux_json_multrpc (h, opt.nodeset, opt.fanout, topic, NULL, - lsmod_hash_cb, mods) < 0) - err_exit ("modctl_list"); + char *topic = xasprintf ("%s.lsmod", service); + flux_rpc_t r = flux_rpc_multi (h, topic, NULL, opt.nodeset, 0); + if (!r) + err_exit ("%s", topic); + while (!flux_rpc_completed (r)) { + const char *json_str; + uint32_t nodeid; + if (flux_rpc_get (r, &nodeid, &json_str) < 0) + err_exit ("%s", topic); + if (lsmod_hash_cb (nodeid, json_str, mods) < 0) + err_exit ("%s[%u]", topic, nodeid); + } + flux_rpc_destroy (r); lsmod_map_hash (mods, lsmod_print_cb, NULL); zhash_destroy (&mods); free (topic); From fafb9f860fe43cf597c4b983b64179f8032f1887 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:36:35 -0700 Subject: [PATCH 04/13] libjsonc: drop deprecated flux_multrpc() --- src/common/libjsonc/rpc.c | 125 -------------------------------------- src/common/libjsonc/rpc.h | 11 ---- 2 files changed, 136 deletions(-) diff --git a/src/common/libjsonc/rpc.c b/src/common/libjsonc/rpc.c index b4bd98a6683e..14aa9d080e7a 100644 --- a/src/common/libjsonc/rpc.c +++ b/src/common/libjsonc/rpc.c @@ -37,131 +37,6 @@ #include "jsonc.h" -/* helper for flux_json_multrpc */ -static int multrpc_cb (zmsg_t *zmsg, uint32_t nodeid, - flux_multrpc_f cb, void *arg) -{ - int errnum = 0; - const char *json_str = NULL; - JSON out = NULL; - - if (flux_msg_get_errnum (zmsg, &errnum) < 0) { - errnum = errno; - goto done; - } - if (flux_msg_get_payload_json (zmsg, &json_str) < 0) { - errnum = errno; - goto done; - } - if (json_str && !(out = Jfromstr (json_str))) { - errnum = EPROTO; - goto done; - } -done: - if (cb && cb (nodeid, errnum, out, arg) < 0) - errnum = errno; - Jput (out); - if (errnum) { - errno = errnum; - return -1; - } - return 0; -} - -int flux_json_multrpc (flux_t h, const char *nodeset, int fanout, - const char *topic, json_object *in, - flux_multrpc_f cb, void *arg) -{ - nodeset_t ns = nodeset_new_str (nodeset); - nodeset_itr_t itr; - int errnum = 0; - uint32_t *nodeids = NULL; - zlist_t *nomatch = NULL; - int ntx, nrx, i; - flux_match_t match = { - .typemask = FLUX_MSGTYPE_RESPONSE, - .topic_glob = NULL, - }; - - if (!(nomatch = zlist_new ())) - oom (); - if (!ns || nodeset_max (ns) >= flux_size (h)) { - errnum = EINVAL; - goto done; - } - - /* Allocate block of matchtags. - */ - match.bsize = nodeset_count (ns); - match.matchtag = flux_matchtag_alloc (h, match.bsize); - if (match.matchtag == FLUX_MATCHTAG_NONE) { - errnum = EAGAIN; - goto done; - } - - /* Build map of matchtag -> nodeid - */ - nodeids = xzmalloc (match.bsize * sizeof (nodeids[0])); - if (!(itr = nodeset_itr_new (ns))) - oom (); - for (i = 0; i < match.bsize; i++) - nodeids[i] = nodeset_next (itr); - nodeset_itr_destroy (itr); - - /* Keep 'fanout' requests active concurrently - */ - ntx = nrx = 0; - while (ntx < match.bsize || nrx < match.bsize) { - while (ntx < match.bsize && ntx - nrx < fanout) { - uint32_t matchtag = match.matchtag + ntx; - uint32_t nodeid = nodeids[ntx++]; - - if (flux_json_request (h, nodeid, matchtag, topic, in) < 0) { - if (errnum < errno) - errnum = errno; - if (cb) - cb (nodeid, errno, NULL, arg); - nrx++; - } - } - while (nrx < match.bsize && (ntx - nrx == fanout || ntx == match.bsize)) { - uint32_t matchtag; - uint32_t nodeid; - zmsg_t *zmsg; - - if (!(zmsg = flux_recvmsg_match (h, match, nomatch, false))) - continue; - if (flux_msg_get_matchtag (zmsg, &matchtag) < 0) { - zmsg_destroy (&zmsg); - continue; - } - nodeid = nodeids[matchtag - match.matchtag]; - if (multrpc_cb (zmsg, nodeid, cb, arg) < 0) { - if (errnum < errno) - errnum = errno; - } - zmsg_destroy (&zmsg); - nrx++; - } - } - if (flux_putmsg_list (h, nomatch) < 0) { - if (errnum < errno) - errnum = errno; - } -done: - if (nodeids) - free (nodeids); - if (match.matchtag != FLUX_MATCHTAG_NONE) - flux_matchtag_free (h, match.matchtag, match.bsize); - if (nomatch) - zlist_destroy (&nomatch); - if (ns) - nodeset_destroy (ns); - if (errnum) - errno = errnum; - return errnum ? -1 : 0; -} - int flux_json_rpc (flux_t h, uint32_t nodeid, const char *topic, JSON in, JSON *out) { diff --git a/src/common/libjsonc/rpc.h b/src/common/libjsonc/rpc.h index 8c24a4644bda..faa1494dd88c 100644 --- a/src/common/libjsonc/rpc.h +++ b/src/common/libjsonc/rpc.h @@ -16,17 +16,6 @@ int flux_json_rpc (flux_t h, uint32_t nodeid, const char *topic, json_object *in, json_object **out); -/* Send a request to each node in 'nodeset', then collect responses, - * calling 'cb' for each one (if 'cb' is non-NULL). - * Returns 0 on success, -1 on failure with errno set. - * If there are multiple failures, their greatest errno is returned. - */ -typedef int (flux_multrpc_f)(uint32_t nodeid, uint32_t errnum, - json_object *out, void *arg); -int flux_json_multrpc (flux_t h, const char *nodeset, int fanout, - const char *topic, json_object *in, - flux_multrpc_f cb, void *arg); - #endif /* !_FLUX_JSONC_RPC_H */ /* From d3e69b42e6cce8882193313e1fca98938cd7146c Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:48:18 -0700 Subject: [PATCH 05/13] libflux: avoid using flux_putmsg_list in reactor internals --- src/common/libflux/reactor.c | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/common/libflux/reactor.c b/src/common/libflux/reactor.c index d82c0b2a665a..a567db59a819 100644 --- a/src/common/libflux/reactor.c +++ b/src/common/libflux/reactor.c @@ -163,9 +163,24 @@ static int backlog_append (dispatch_t *d, zmsg_t **zmsg) static int backlog_flush (dispatch_t *d) { - if (d->backlog) - return flux_putmsg_list (d->h, d->backlog); - return 0; + int errnum = 0; + int rc = 0; + + if (d->backlog) { + zmsg_t *zmsg; + while ((zmsg = zlist_pop (d->backlog))) { + if (flux_putmsg (d->h, &zmsg) < 0) { + if (errnum < errno) { + errnum = errno; + rc = -1; + } + zmsg_destroy (&zmsg); + } + } + } + if (errnum > 0) + errno = errnum; + return rc; } int flux_sleep_on (flux_t h, flux_match_t match) From 7a380567e16da02404ece49aa5b391a40460aa23 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:49:24 -0700 Subject: [PATCH 06/13] libflux: drop flux_putmsg_list() Remove this function from the public API. It was only used internally and in the now gone flux_multrpc(). The zlist_t argument in flux_recvmsg_match() that goes with this is now a deprecated void *. --- src/common/libflux/handle.c | 52 ++++++++++++++++++------------------- src/common/libflux/handle.h | 10 +------ 2 files changed, 27 insertions(+), 35 deletions(-) diff --git a/src/common/libflux/handle.c b/src/common/libflux/handle.c index 4b5fc41282f1..acb797680f6c 100644 --- a/src/common/libflux/handle.c +++ b/src/common/libflux/handle.c @@ -367,11 +367,33 @@ zmsg_t *flux_recvmsg (flux_t h, bool nonblock) return zmsg; } -zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, zlist_t *nomatch, +static int putmsg_list (flux_t h, zlist_t *l) +{ + int errnum = 0; + int rc = 0; + + if (l) { + zmsg_t *zmsg; + while ((zmsg = zlist_pop (l))) { + if (flux_putmsg (h, &zmsg) < 0) { + if (errnum < errno) { + errnum = errno; + rc = -1; + } + zmsg_destroy (&zmsg); + } + } + } + if (errnum > 0) + errno = errnum; + return rc; +} + +zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, void *deprecated_arg, bool nonblock) { zmsg_t *zmsg = NULL; - zlist_t *putmsg = nomatch; + zlist_t *putmsg = NULL; if (!nonblock && flux_sleep_on (h, match) < 0) { if (errno != EINVAL) @@ -396,8 +418,8 @@ zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, zlist_t *nomatch, } } done: - if (putmsg && !nomatch) { - if (flux_putmsg_list (h, putmsg) < 0) { + if (putmsg) { + if (putmsg_list (h, putmsg) < 0) { int errnum = errno; zmsg_destroy (&zmsg); errno = errnum; @@ -407,28 +429,6 @@ zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, zlist_t *nomatch, return zmsg; } -int flux_putmsg_list (flux_t h, zlist_t *l) -{ - int errnum = 0; - int rc = 0; - - if (l) { - zmsg_t *zmsg; - while ((zmsg = zlist_pop (l))) { - if (flux_putmsg (h, &zmsg) < 0) { - if (errnum < errno) { - errnum = errno; - rc = -1; - } - zmsg_destroy (&zmsg); - } - } - } - if (errnum > 0) - errno = errnum; - return rc; -} - /* FIXME: FLUX_O_TRACE will show these messages being received again */ int flux_putmsg (flux_t h, zmsg_t **zmsg) diff --git a/src/common/libflux/handle.h b/src/common/libflux/handle.h index c38766b4b2f9..948b460e8b73 100644 --- a/src/common/libflux/handle.h +++ b/src/common/libflux/handle.h @@ -81,18 +81,10 @@ int flux_pushmsg (flux_t h, zmsg_t **zmsg); /* Receive a message matching 'match' (see message.h). * Any unmatched messages are returned to the handle with flux_putmsg(), - * unless 'nomatch' is non-NULL, in which case they are appended to the - * list pointed to by 'nomatch' for you to deal with. */ -zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, zlist_t *nomatch, +zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, void *deprecated_arg, bool nonblock); -/* Pop messages off 'list' and call flux_putmsg() on them. - * If there were any errors, -1 is returned with the greatest errno set. - * The list is always returned empty. - */ -int flux_putmsg_list (flux_t h, zlist_t *list); - /* Event subscribe/unsubscribe. */ int flux_event_subscribe (flux_t h, const char *topic); From d95bdb02abc0d7f7c38077e65a6283e59a3c5bbb Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:54:58 -0700 Subject: [PATCH 07/13] libflux: drop deprecated arg from flux_recvmsg_match --- src/common/libflux/handle.c | 3 +-- src/common/libflux/handle.h | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/common/libflux/handle.c b/src/common/libflux/handle.c index acb797680f6c..857d4bd97b7a 100644 --- a/src/common/libflux/handle.c +++ b/src/common/libflux/handle.c @@ -389,8 +389,7 @@ static int putmsg_list (flux_t h, zlist_t *l) return rc; } -zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, void *deprecated_arg, - bool nonblock) +zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, bool nonblock) { zmsg_t *zmsg = NULL; zlist_t *putmsg = NULL; diff --git a/src/common/libflux/handle.h b/src/common/libflux/handle.h index 948b460e8b73..34c2af8484ef 100644 --- a/src/common/libflux/handle.h +++ b/src/common/libflux/handle.h @@ -82,8 +82,7 @@ int flux_pushmsg (flux_t h, zmsg_t **zmsg); /* Receive a message matching 'match' (see message.h). * Any unmatched messages are returned to the handle with flux_putmsg(), */ -zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, void *deprecated_arg, - bool nonblock); +zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, bool nonblock); /* Event subscribe/unsubscribe. */ From 9e07f7a17ef35aa740e69bb9d7a9f982799616a8 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:55:46 -0700 Subject: [PATCH 08/13] libflux: update usage of flux_recvmsg_match() --- src/common/libflux/rpc.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/libflux/rpc.c b/src/common/libflux/rpc.c index a788a45b755d..7826855367cb 100644 --- a/src/common/libflux/rpc.c +++ b/src/common/libflux/rpc.c @@ -98,7 +98,7 @@ static uint32_t lookup_nodeid (flux_rpc_t rpc, uint32_t matchtag) static zmsg_t *rpc_response_recv (flux_rpc_t rpc, bool nonblock) { - return flux_recvmsg_match (rpc->h, rpc->m, NULL, nonblock); + return flux_recvmsg_match (rpc->h, rpc->m, nonblock); } static int rpc_request_send (flux_rpc_t rpc, int n, const char *topic, From 58f146f40344dbaf6023d54c75352ef59b8531fe Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:56:17 -0700 Subject: [PATCH 09/13] bindings/lua: update usage of flux_recvmsg_match() --- src/bindings/lua/flux-lua.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bindings/lua/flux-lua.c b/src/bindings/lua/flux-lua.c index 6fa514f1ec36..7bd39e31ce4e 100644 --- a/src/bindings/lua/flux-lua.c +++ b/src/bindings/lua/flux-lua.c @@ -332,7 +332,7 @@ static int l_flux_recv (lua_State *L) if (lua_gettop (L) > 1) match.matchtag = lua_tointeger (L, 2); - if (!(zmsg = flux_recvmsg_match (f, match, NULL, false))) + if (!(zmsg = flux_recvmsg_match (f, match, false))) goto error; if (flux_msg_get_errnum (zmsg, &errnum) < 0) @@ -470,7 +470,7 @@ static int l_flux_recv_event (lua_State *L) }; zmsg_t *zmsg = NULL; - if (!(zmsg = flux_recvmsg_match (f, match, NULL, 0))) + if (!(zmsg = flux_recvmsg_match (f, match, 0))) return lua_pusherror (L, strerror (errno)); if (flux_msg_get_topic (zmsg, &topic) < 0 @@ -853,7 +853,7 @@ static int l_flux_recvmsg (lua_State *L) if (lua_gettop (L) > 1) match.matchtag = lua_tointeger (L, 2); - if (!(zmsg = flux_recvmsg_match (f, match, NULL, false))) + if (!(zmsg = flux_recvmsg_match (f, match, false))) return lua_pusherror (L, strerror (errno)); if (flux_msg_get_type (zmsg, &type) < 0) From 7a1e5143dc4d4e5ececa6e3c61acea8b7058d3bc Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:56:37 -0700 Subject: [PATCH 10/13] kvs: update usage of flux_recmsg_match() --- src/modules/kvs/libkvs.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/kvs/libkvs.c b/src/modules/kvs/libkvs.c index 8c92869bc986..40d4ad52e8dd 100644 --- a/src/modules/kvs/libkvs.c +++ b/src/modules/kvs/libkvs.c @@ -662,7 +662,7 @@ static int watch_rpc (flux_t h, const char *key, JSON *val, if (flux_json_request (h, FLUX_NODEID_ANY, match.matchtag, "kvs.watch", in) < 0) goto done; - if (!(zmsg = flux_recvmsg_match (h, match, NULL, false))) + if (!(zmsg = flux_recvmsg_match (h, match, false))) goto done; if (flux_json_response_decode (zmsg, &out) < 0) goto done; From 59ac32976dd85e067783e882f8873fc394ff21da Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:57:03 -0700 Subject: [PATCH 11/13] flux-event: update usage of flux_recvmsg_match() --- src/cmd/flux-event.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmd/flux-event.c b/src/cmd/flux-event.c index 469d5a9d5852..245cd166efcc 100644 --- a/src/cmd/flux-event.c +++ b/src/cmd/flux-event.c @@ -142,7 +142,7 @@ static void event_sub (flux_t h, int argc, char **argv) else if (flux_event_subscribe (h, "") < 0) err_exit ("flux_event_subscribe"); - while ((zmsg = flux_recvmsg_match (h, match, NULL, false))) { + while ((zmsg = flux_recvmsg_match (h, match, false))) { const char *topic; const char *json_str; if (flux_msg_get_topic (zmsg, &topic) < 0 From fc9e42883293b803b540650fad8bd030ff64da3f Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 15:57:30 -0700 Subject: [PATCH 12/13] libjsonc: update usage of flux_recvmsg_match() --- src/common/libjsonc/rpc.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/libjsonc/rpc.c b/src/common/libjsonc/rpc.c index 14aa9d080e7a..d0df194146dc 100644 --- a/src/common/libjsonc/rpc.c +++ b/src/common/libjsonc/rpc.c @@ -58,7 +58,7 @@ int flux_json_rpc (flux_t h, uint32_t nodeid, const char *topic, } if (flux_json_request (h, nodeid, match.matchtag, topic, in) < 0) goto done; - if (!(zmsg = flux_recvmsg_match (h, match, NULL, false))) + if (!(zmsg = flux_recvmsg_match (h, match, false))) goto done; if (flux_msg_get_errnum (zmsg, &errnum) < 0) goto done; From 3972e6be789d0709343acc598c8b74145d01eb4e Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 5 Jun 2015 16:17:02 -0700 Subject: [PATCH 13/13] libflux: fatalize more key functions in handle.c Tidy up flux_recvmsg(), flux_sendmsg() use of FLUX_FATAL(). Add calls to FLUX_FATAL() to the following functions: flux_recvmsg_match() flux_putmsg() flux_pushmsg() flux_subscribe() flux_unsubscribe() flux_rank() --- src/common/libflux/handle.c | 132 ++++++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 52 deletions(-) diff --git a/src/common/libflux/handle.c b/src/common/libflux/handle.c index 857d4bd97b7a..531cf42c479c 100644 --- a/src/common/libflux/handle.c +++ b/src/common/libflux/handle.c @@ -293,15 +293,13 @@ uint32_t flux_matchtag_avail (flux_t h) int flux_sendmsg (flux_t h, zmsg_t **zmsg) { int type; - int rc = -1; if (!h->ops->sendmsg) { errno = ENOSYS; - FLUX_FATAL (h); - goto done; + goto fatal; } if (flux_msg_get_type (*zmsg, &type) < 0) - goto done; + goto fatal; switch (type) { case FLUX_MSGTYPE_REQUEST: h->msgcounters.request_tx++; @@ -318,13 +316,12 @@ int flux_sendmsg (flux_t h, zmsg_t **zmsg) } if (h->flags & FLUX_O_TRACE) flux_msg_fprint (stderr, *zmsg); - if (h->ops->sendmsg (h->impl, zmsg) < 0) { - FLUX_FATAL (h); - goto done; - } - rc = 0; -done: - return rc; + if (h->ops->sendmsg (h->impl, zmsg) < 0) + goto fatal; + return 0; +fatal: + FLUX_FATAL (h); + return -1; } zmsg_t *flux_recvmsg (flux_t h, bool nonblock) @@ -334,37 +331,36 @@ zmsg_t *flux_recvmsg (flux_t h, bool nonblock) if (!h->ops->recvmsg) { errno = ENOSYS; - FLUX_FATAL (h); - goto done; + goto fatal; } if (!(zmsg = h->ops->recvmsg (h->impl, nonblock))) { - if (errno != EAGAIN && errno != EWOULDBLOCK) - FLUX_FATAL (h); - goto done; - } - if (flux_msg_get_type (zmsg, &type) < 0) { - zmsg_destroy (&zmsg); - errno = EPROTO; - goto done; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return NULL; + goto fatal; } - switch (type) { - case FLUX_MSGTYPE_REQUEST: - h->msgcounters.request_rx++; - break; - case FLUX_MSGTYPE_RESPONSE: - h->msgcounters.response_rx++; - break; - case FLUX_MSGTYPE_EVENT: - h->msgcounters.event_rx++; - break; + if (flux_msg_get_type (zmsg, &type) == 0) { + switch (type) { + case FLUX_MSGTYPE_REQUEST: + h->msgcounters.request_rx++; + break; + case FLUX_MSGTYPE_RESPONSE: + h->msgcounters.response_rx++; + break; + case FLUX_MSGTYPE_EVENT: + h->msgcounters.event_rx++; + break; case FLUX_MSGTYPE_KEEPALIVE: h->msgcounters.keepalive_rx++; break; + } } if ((h->flags & FLUX_O_TRACE)) flux_msg_fprint (stderr, zmsg); -done: return zmsg; +fatal: + zmsg_destroy (&zmsg); + FLUX_FATAL (h); + return NULL; } static int putmsg_list (flux_t h, zlist_t *l) @@ -394,9 +390,13 @@ zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, bool nonblock) zmsg_t *zmsg = NULL; zlist_t *putmsg = NULL; + if (!h->ops->recvmsg || !h->ops->putmsg) { + errno = ENOSYS; + goto fatal; + } if (!nonblock && flux_sleep_on (h, match) < 0) { if (errno != EINVAL) - goto done; + goto fatal; errno = 0; /* EINVAL: not running in a coprocess */ } while (!zmsg) { @@ -417,15 +417,17 @@ zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, bool nonblock) } } done: - if (putmsg) { - if (putmsg_list (h, putmsg) < 0) { - int errnum = errno; - zmsg_destroy (&zmsg); - errno = errnum; - } - zlist_destroy (&putmsg); + if (putmsg_list (h, putmsg) < 0) { + int errnum = errno; + zmsg_destroy (&zmsg); + errno = errnum; } + zlist_destroy (&putmsg); return zmsg; +fatal: + zmsg_destroy (&zmsg); + FLUX_FATAL (h); + return NULL; } /* FIXME: FLUX_O_TRACE will show these messages being received again @@ -434,41 +436,67 @@ int flux_putmsg (flux_t h, zmsg_t **zmsg) { if (!h->ops->putmsg) { errno = ENOSYS; - return -1; + goto fatal; } - return h->ops->putmsg (h->impl, zmsg); + if (h->ops->putmsg (h->impl, zmsg) < 0) + goto fatal; + return 0; +fatal: + FLUX_FATAL (h); + return -1; } int flux_pushmsg (flux_t h, zmsg_t **zmsg) { if (!h->ops->pushmsg) { errno = ENOSYS; - return -1; + goto fatal; } - return h->ops->pushmsg (h->impl, zmsg); + if (h->ops->pushmsg (h->impl, zmsg) < 0) + goto fatal; + return 0; +fatal: + FLUX_FATAL (h); + return -1; } int flux_event_subscribe (flux_t h, const char *topic) { - if (!h->ops->event_subscribe) - return 0; - return h->ops->event_subscribe (h->impl, topic); + if (h->ops->event_subscribe) { + if (h->ops->event_subscribe (h->impl, topic) < 0) + goto fatal; + } + return 0; +fatal: + FLUX_FATAL (h); + return -1; } int flux_event_unsubscribe (flux_t h, const char *topic) { - if (!h->ops->event_unsubscribe) - return 0; - return h->ops->event_unsubscribe (h->impl, topic); + if (h->ops->event_unsubscribe) { + if (h->ops->event_unsubscribe (h->impl, topic) < 0) + goto fatal; + } + return 0; +fatal: + FLUX_FATAL (h); + return -1; } int flux_rank (flux_t h) { + int rank; if (!h->ops->rank) { errno = ENOSYS; - return -1; + goto fatal; } - return h->ops->rank (h->impl); + if ((rank = h->ops->rank (h->impl)) < 0) + goto fatal; + return rank; +fatal: + FLUX_FATAL (h); + return -1; } zctx_t *flux_get_zctx (flux_t h)