Skip to content

Commit

Permalink
Merge pull request #217 from garlick/api_cleanup
Browse files Browse the repository at this point in the history
drop detrius in the aftermath of pr #215, fatalize more key functions, tweak flux_rpc_get()
  • Loading branch information
grondo committed Jun 5, 2015
2 parents 947ef13 + 3972e6b commit f392a81
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 268 deletions.
6 changes: 3 additions & 3 deletions src/bindings/lua/flux-lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ static int l_flux_recv (lua_State *L)
if (lua_gettop (L) > 1)
match.matchtag = lua_tointeger (L, 2);

if (!(zmsg = flux_recvmsg_match (f, match, NULL, false)))
if (!(zmsg = flux_recvmsg_match (f, match, false)))
goto error;

if (flux_msg_get_errnum (zmsg, &errnum) < 0)
Expand Down Expand Up @@ -470,7 +470,7 @@ static int l_flux_recv_event (lua_State *L)
};
zmsg_t *zmsg = NULL;

if (!(zmsg = flux_recvmsg_match (f, match, NULL, 0)))
if (!(zmsg = flux_recvmsg_match (f, match, 0)))
return lua_pusherror (L, strerror (errno));

if (flux_msg_get_topic (zmsg, &topic) < 0
Expand Down Expand Up @@ -853,7 +853,7 @@ static int l_flux_recvmsg (lua_State *L)
if (lua_gettop (L) > 1)
match.matchtag = lua_tointeger (L, 2);

if (!(zmsg = flux_recvmsg_match (f, match, NULL, false)))
if (!(zmsg = flux_recvmsg_match (f, match, false)))
return lua_pusherror (L, strerror (errno));

if (flux_msg_get_type (zmsg, &type) < 0)
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/flux-event.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static void event_sub (flux_t h, int argc, char **argv)
else if (flux_event_subscribe (h, "") < 0)
err_exit ("flux_event_subscribe");

while ((zmsg = flux_recvmsg_match (h, match, NULL, false))) {
while ((zmsg = flux_recvmsg_match (h, match, false))) {
const char *topic;
const char *json_str;
if (flux_msg_get_topic (zmsg, &topic) < 0
Expand Down
55 changes: 34 additions & 21 deletions src/cmd/flux-module.c
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,19 @@ void mod_insmod (flux_t h, opt_t opt)
char *topic = xasprintf ("%s.insmod", service);
char *json_str = flux_insmod_json_encode (modpath, opt.argc, opt.argv);
assert (json_str != NULL);
JSON in = Jfromstr (json_str);
assert (in != NULL);
if (flux_json_multrpc (h, opt.nodeset, opt.fanout, topic, in,
NULL, NULL) < 0)
err_exit ("%s", modname);
flux_rpc_t r = flux_rpc_multi (h, topic, json_str, opt.nodeset, 0);
if (!r)
err_exit ("%s", topic);
while (!flux_rpc_completed (r)) {
uint32_t nodeid = FLUX_NODEID_ANY;
if (flux_rpc_get (r, NULL, NULL) < 0)
err_exit ("%s[%d]", topic,
nodeid == FLUX_NODEID_ANY ? -1 : nodeid);
}
flux_rpc_destroy (r);
free (topic);
free (service);
free (json_str);
Jput (in);
} else {
if (flux_modctl_load (h, opt.nodeset, modpath, opt.argc, opt.argv) < 0)
err_exit ("%s", modname);
Expand All @@ -294,13 +298,17 @@ void mod_rmmod (flux_t h, opt_t opt)
char *topic = xasprintf ("%s.rmmod", service);
char *json_str = flux_rmmod_json_encode (modname);
assert (json_str != NULL);
JSON in = Jfromstr (json_str);
if (flux_json_multrpc (h, opt.nodeset, opt.fanout, topic, in,
NULL, NULL) < 0)
err_exit ("%s", modname);
flux_rpc_t r = flux_rpc_multi (h, topic, json_str, opt.nodeset, 0);
if (!r)
err_exit ("%s", topic);
while (!flux_rpc_completed (r)) {
uint32_t nodeid = FLUX_NODEID_ANY;
if (flux_rpc_get (r, &nodeid, NULL) < 0)
err ("%s[%d]", topic, nodeid == FLUX_NODEID_ANY ? -1 : nodeid);
}
flux_rpc_destroy (r);
free (topic);
free (service);
Jput (in);
free (json_str);
} else {
if (flux_modctl_unload (h, opt.nodeset, modname) < 0)
Expand Down Expand Up @@ -375,19 +383,16 @@ void lsmod_map_hash (zhash_t *mods, flux_lsmod_f cb, void *arg)
zlist_destroy (&keys);
}

int lsmod_hash_cb (uint32_t nodeid, uint32_t errnum, JSON out, void *arg)
int lsmod_hash_cb (uint32_t nodeid, const char *json_str, zhash_t *mods)
{
flux_modlist_t modlist;
zhash_t *mods = arg;
mod_t *m;
int i, len;
const char *name, *digest;
int size, idle;
int rc = -1;

if (errnum)
return 0;
if (!(modlist = flux_lsmod_json_decode (Jtostr (out))))
if (!(modlist = flux_lsmod_json_decode (json_str)))
goto done;
if ((len = flux_modlist_count (modlist)) == -1)
goto done;
Expand Down Expand Up @@ -424,13 +429,21 @@ void mod_lsmod (flux_t h, opt_t opt)
"Module", "Size", "Digest", "Idle", "Nodeset");
if (opt.direct) {
zhash_t *mods = zhash_new ();
char *topic = xasprintf ("%s.lsmod", service);

if (!mods)
oom ();
if (flux_json_multrpc (h, opt.nodeset, opt.fanout, topic, NULL,
lsmod_hash_cb, mods) < 0)
err_exit ("modctl_list");
char *topic = xasprintf ("%s.lsmod", service);
flux_rpc_t r = flux_rpc_multi (h, topic, NULL, opt.nodeset, 0);
if (!r)
err_exit ("%s", topic);
while (!flux_rpc_completed (r)) {
const char *json_str;
uint32_t nodeid;
if (flux_rpc_get (r, &nodeid, &json_str) < 0)
err_exit ("%s", topic);
if (lsmod_hash_cb (nodeid, json_str, mods) < 0)
err_exit ("%s[%u]", topic, nodeid);
}
flux_rpc_destroy (r);
lsmod_map_hash (mods, lsmod_print_cb, NULL);
zhash_destroy (&mods);
free (topic);
Expand Down
181 changes: 104 additions & 77 deletions src/common/libflux/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,13 @@ uint32_t flux_matchtag_avail (flux_t h)
int flux_sendmsg (flux_t h, zmsg_t **zmsg)
{
int type;
int rc = -1;

if (!h->ops->sendmsg) {
errno = ENOSYS;
FLUX_FATAL (h);
goto done;
goto fatal;
}
if (flux_msg_get_type (*zmsg, &type) < 0)
goto done;
goto fatal;
switch (type) {
case FLUX_MSGTYPE_REQUEST:
h->msgcounters.request_tx++;
Expand All @@ -318,13 +316,12 @@ int flux_sendmsg (flux_t h, zmsg_t **zmsg)
}
if (h->flags & FLUX_O_TRACE)
flux_msg_fprint (stderr, *zmsg);
if (h->ops->sendmsg (h->impl, zmsg) < 0) {
FLUX_FATAL (h);
goto done;
}
rc = 0;
done:
return rc;
if (h->ops->sendmsg (h->impl, zmsg) < 0)
goto fatal;
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

zmsg_t *flux_recvmsg (flux_t h, bool nonblock)
Expand All @@ -334,48 +331,72 @@ zmsg_t *flux_recvmsg (flux_t h, bool nonblock)

if (!h->ops->recvmsg) {
errno = ENOSYS;
FLUX_FATAL (h);
goto done;
goto fatal;
}
if (!(zmsg = h->ops->recvmsg (h->impl, nonblock))) {
if (errno != EAGAIN && errno != EWOULDBLOCK)
FLUX_FATAL (h);
goto done;
}
if (flux_msg_get_type (zmsg, &type) < 0) {
zmsg_destroy (&zmsg);
errno = EPROTO;
goto done;
if (errno == EAGAIN || errno == EWOULDBLOCK)
return NULL;
goto fatal;
}
switch (type) {
case FLUX_MSGTYPE_REQUEST:
h->msgcounters.request_rx++;
break;
case FLUX_MSGTYPE_RESPONSE:
h->msgcounters.response_rx++;
break;
case FLUX_MSGTYPE_EVENT:
h->msgcounters.event_rx++;
break;
if (flux_msg_get_type (zmsg, &type) == 0) {
switch (type) {
case FLUX_MSGTYPE_REQUEST:
h->msgcounters.request_rx++;
break;
case FLUX_MSGTYPE_RESPONSE:
h->msgcounters.response_rx++;
break;
case FLUX_MSGTYPE_EVENT:
h->msgcounters.event_rx++;
break;
case FLUX_MSGTYPE_KEEPALIVE:
h->msgcounters.keepalive_rx++;
break;
}
}
if ((h->flags & FLUX_O_TRACE))
flux_msg_fprint (stderr, zmsg);
done:
return zmsg;
fatal:
zmsg_destroy (&zmsg);
FLUX_FATAL (h);
return NULL;
}

static int putmsg_list (flux_t h, zlist_t *l)
{
int errnum = 0;
int rc = 0;

if (l) {
zmsg_t *zmsg;
while ((zmsg = zlist_pop (l))) {
if (flux_putmsg (h, &zmsg) < 0) {
if (errnum < errno) {
errnum = errno;
rc = -1;
}
zmsg_destroy (&zmsg);
}
}
}
if (errnum > 0)
errno = errnum;
return rc;
}

zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, zlist_t *nomatch,
bool nonblock)
zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, bool nonblock)
{
zmsg_t *zmsg = NULL;
zlist_t *putmsg = nomatch;
zlist_t *putmsg = NULL;

if (!h->ops->recvmsg || !h->ops->putmsg) {
errno = ENOSYS;
goto fatal;
}
if (!nonblock && flux_sleep_on (h, match) < 0) {
if (errno != EINVAL)
goto done;
goto fatal;
errno = 0; /* EINVAL: not running in a coprocess */
}
while (!zmsg) {
Expand All @@ -396,37 +417,17 @@ zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, zlist_t *nomatch,
}
}
done:
if (putmsg && !nomatch) {
if (flux_putmsg_list (h, putmsg) < 0) {
int errnum = errno;
zmsg_destroy (&zmsg);
errno = errnum;
}
zlist_destroy (&putmsg);
if (putmsg_list (h, putmsg) < 0) {
int errnum = errno;
zmsg_destroy (&zmsg);
errno = errnum;
}
zlist_destroy (&putmsg);
return zmsg;
}

int flux_putmsg_list (flux_t h, zlist_t *l)
{
int errnum = 0;
int rc = 0;

if (l) {
zmsg_t *zmsg;
while ((zmsg = zlist_pop (l))) {
if (flux_putmsg (h, &zmsg) < 0) {
if (errnum < errno) {
errnum = errno;
rc = -1;
}
zmsg_destroy (&zmsg);
}
}
}
if (errnum > 0)
errno = errnum;
return rc;
fatal:
zmsg_destroy (&zmsg);
FLUX_FATAL (h);
return NULL;
}

/* FIXME: FLUX_O_TRACE will show these messages being received again
Expand All @@ -435,41 +436,67 @@ int flux_putmsg (flux_t h, zmsg_t **zmsg)
{
if (!h->ops->putmsg) {
errno = ENOSYS;
return -1;
goto fatal;
}
return h->ops->putmsg (h->impl, zmsg);
if (h->ops->putmsg (h->impl, zmsg) < 0)
goto fatal;
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

int flux_pushmsg (flux_t h, zmsg_t **zmsg)
{
if (!h->ops->pushmsg) {
errno = ENOSYS;
return -1;
goto fatal;
}
return h->ops->pushmsg (h->impl, zmsg);
if (h->ops->pushmsg (h->impl, zmsg) < 0)
goto fatal;
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

int flux_event_subscribe (flux_t h, const char *topic)
{
if (!h->ops->event_subscribe)
return 0;
return h->ops->event_subscribe (h->impl, topic);
if (h->ops->event_subscribe) {
if (h->ops->event_subscribe (h->impl, topic) < 0)
goto fatal;
}
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

int flux_event_unsubscribe (flux_t h, const char *topic)
{
if (!h->ops->event_unsubscribe)
return 0;
return h->ops->event_unsubscribe (h->impl, topic);
if (h->ops->event_unsubscribe) {
if (h->ops->event_unsubscribe (h->impl, topic) < 0)
goto fatal;
}
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

int flux_rank (flux_t h)
{
int rank;
if (!h->ops->rank) {
errno = ENOSYS;
return -1;
goto fatal;
}
return h->ops->rank (h->impl);
if ((rank = h->ops->rank (h->impl)) < 0)
goto fatal;
return rank;
fatal:
FLUX_FATAL (h);
return -1;
}

zctx_t *flux_get_zctx (flux_t h)
Expand Down
Loading

0 comments on commit f392a81

Please sign in to comment.