Skip to content

Commit

Permalink
Update usage of flux_rpc_get()
Browse files Browse the repository at this point in the history
Per recent function behavior change to flux_response_decode(),
adjust usage of flux_rpc_get() appropriately.  Most commonly,
check that the payload pointer is non-NULL before continuing.
Or if a payload is not used, do not pass in a pointer.
  • Loading branch information
chu11 committed Mar 17, 2017
1 parent e829c12 commit 997cd05
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 30 deletions.
4 changes: 3 additions & 1 deletion doc/man3/trpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ void get_rank (flux_rpc_t *rpc)

if (flux_rpc_get (rpc, &json_str) < 0)
log_err_exit ("flux_rpc_get");
if (!(o = Jfromstr (json_str)) || !Jget_str (o, "value", &rank))
if (!json_str
|| !(o = Jfromstr (json_str))
|| !Jget_str (o, "value", &rank))
log_msg_exit ("response protocol error");
printf ("rank is %s\n", rank);
Jput (o);
Expand Down
4 changes: 3 additions & 1 deletion doc/man3/trpc_then.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ void get_rank (flux_rpc_t *rpc, void *arg)

if (flux_rpc_get (rpc, &json_str) < 0)
log_err_exit ("flux_rpc_get");
if (!(o = Jfromstr (json_str)) || !Jget_str (o, "value", &rank))
if (!json_str
|| !(o = Jfromstr (json_str))
|| !Jget_str (o, "value", &rank))
log_msg_exit ("response protocol error");
printf ("rank is %s\n", rank);
Jput (o);
Expand Down
4 changes: 3 additions & 1 deletion doc/man3/trpc_then_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ void get_rank (flux_rpc_t *rpc, void *arg)
log_err_exit ("flux_rpc_get_nodeid");
if (flux_rpc_get (rpc, &json_str) < 0)
log_err_exit ("flux_rpc_get");
if (!(o = Jfromstr (json_str)) || !Jget_str (o, "value", &rank))
if (!json_str
|| !(o = Jfromstr (json_str))
|| !Jget_str (o, "value", &rank))
log_msg_exit ("response protocol error");
printf ("[%" PRIu32 "] rank is %s\n", nodeid, rank);
Jput (o);
Expand Down
10 changes: 9 additions & 1 deletion src/cmd/flux-module.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,18 @@ void lsmod_map_hash (zhash_t *mods, flux_lsmod_f cb, void *arg)

int lsmod_merge_result (uint32_t nodeid, const char *json_str, zhash_t *mods)
{
flux_modlist_t *modlist;
flux_modlist_t *modlist = NULL;
mod_t *m;
int i, len;
const char *name, *digest;
int size, idle;
int status;
int rc = -1;

if (!json_str) {
errno = EPROTO;
goto done;
}
if (!(modlist = flux_lsmod_json_decode (json_str)))
goto done;
if ((len = flux_modlist_count (modlist)) == -1)
Expand Down Expand Up @@ -655,13 +659,17 @@ int cmd_stats (optparse_t *p, int argc, char **argv)
log_err_exit ("%s", topic);
if (flux_rpc_get (r, &json_str) < 0)
log_err_exit ("%s", topic);
if (!json_str)
log_errn_exit (EPROTO, "%s", topic);
parse_json (p, json_str);
} else {
topic = xasprintf ("%s.stats.get", service);
if (!(r = flux_rpc (h, topic, NULL, nodeid, 0)))
log_err_exit ("%s", topic);
if (flux_rpc_get (r, &json_str) < 0)
log_err_exit ("%s", topic);
if (!json_str)
log_errn_exit (EPROTO, "%s", topic);
parse_json (p, json_str);
}
free (topic);
Expand Down
4 changes: 2 additions & 2 deletions src/common/libcompat/rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ int flux_json_rpc (flux_t *h, uint32_t nodeid, const char *topic,

if (!(rpc = flux_rpc (h, topic, Jtostr (in), nodeid, 0)))
goto done;
if (flux_rpc_get (rpc, out ? &json_str : NULL) < 0)
if (flux_rpc_get (rpc, &json_str) < 0)
goto done;
if (out) {
if (!(o = Jfromstr (json_str))) {
if (!json_str || !(o = Jfromstr (json_str))) {
errno = EPROTO;
goto done;
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/libflux/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ int flux_lsmod (flux_t *h, uint32_t nodeid, const char *service,
goto done;
if (flux_rpc_get (r, &json_str) < 0)
goto done;
if (!json_str) {
errno = EPROTO;
goto done;
}
if (!(mods = flux_lsmod_json_decode (json_str)))
goto done;
if ((len = flux_modlist_count (mods)) == -1)
Expand Down
4 changes: 4 additions & 0 deletions src/common/libflux/reparent.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ char *flux_lspeer (flux_t *h, int rank)
goto done;
if (flux_rpc_get (r, &json_str) < 0)
goto done;
if (!json_str) {
errno = EPROTO;
goto done;
}
ret = xstrdup (json_str);
done:
flux_rpc_destroy (r);
Expand Down
9 changes: 4 additions & 5 deletions src/modules/kvs/libkvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ static int getobj (flux_t *h, json_object *rootdir, const char *key,
goto done;
if (flux_rpc_get (rpc, &json_str) < 0)
goto done;
if (!(out = Jfromstr (json_str))) {
if (!json_str || !(out = Jfromstr (json_str))) {
errno = EPROTO;
goto done;
}
Expand Down Expand Up @@ -1357,16 +1357,15 @@ int kvs_get_version (flux_t *h, int *versionp)
int kvs_wait_version (flux_t *h, int version)
{
flux_rpc_t *rpc;
const char *json_str;
int ret = -1;

if (!(rpc = flux_rpcf (h, "kvs.sync", FLUX_NODEID_ANY, 0, "{ s:i }",
"rootseq", version)))
goto done;
if (flux_rpc_get (rpc, &json_str) < 0)
goto done;
/* N.B. response contains (rootseq, rootdir) but we don't use it.
/* N.B. response contains (rootseq, rootdir) but we don't need it.
*/
if (flux_rpc_get (rpc, NULL) < 0)
goto done;
ret = 0;
done:
flux_rpc_destroy (rpc);
Expand Down
4 changes: 4 additions & 0 deletions src/modules/libjsc/jstatctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ static int lwj_kvs_path (flux_t *h, int64_t id, char **pathp)
}
Jput (o);
o = NULL;
if (!json_str) {
flux_log (h, LOG_ERR, "flux_rpc (job.kvspath): empty payload");
goto out;
}
if (!(o = Jfromstr (json_str))) {
flux_log_error (h, "flux_rpc (job.kvspath): failed to parse json");
goto out;
Expand Down
3 changes: 1 addition & 2 deletions src/modules/wreck/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,12 @@ static int add_jobinfo (flux_t *h, const char *kvspath, json_object *req)
static bool ping_sched (flux_t *h)
{
bool retval = false;
const char *s;
flux_rpc_t *rpc;
if (!(rpc = flux_rpcf (h, "sched.ping", 0, 0, "{s:i}", "seq", 0))) {
flux_log_error (h, "ping_sched");
goto out;
}
if (flux_rpc_get (rpc, &s) >= 0)
if (flux_rpc_get (rpc, NULL) >= 0)
retval = true;
out:
flux_rpc_destroy (rpc);
Expand Down
7 changes: 4 additions & 3 deletions t/kvs/watch_disconnect.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ void send_watch_requests (flux_t *h, const char *key)
{
json_object *in;
flux_rpc_t *r;
const char *json_str;

if (!(in = kp_twatch_enc (key, NULL, KVS_PROTO_FIRST)))
log_err_exit ("kp_twatch_enc");
if (!(r = flux_rpc_multi (h, "kvs.watch", Jtostr (in), "all", 0)))
log_err_exit ("flux_rpc_multi kvs.watch");
do {
if (flux_rpc_get (r, &json_str) < 0)
if (flux_rpc_get (r, NULL) < 0)
log_err_exit ("kvs.watch");
} while (flux_rpc_next (r) == 0);
flux_rpc_destroy (r);
Expand All @@ -46,7 +45,9 @@ int count_watchers (flux_t *h)
do {
if (flux_rpc_get (r, &json_str) < 0)
log_err_exit ("kvs.stats.get");
if (!(out = Jfromstr (json_str)) || !Jget_int (out, "#watchers", &n))
if (!json_str
|| !(out = Jfromstr (json_str))
|| !Jget_int (out, "#watchers", &n))
log_msg_exit ("error decoding stats payload");
count += n;
Jput (out);
Expand Down
2 changes: 1 addition & 1 deletion t/loop/multrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ void rpctest_begin_cb (flux_t *h, flux_msg_handler_t *w,
int fail_errno_last = 0;
do {
if (flux_rpc_get_nodeid (r, &nodeid) < 0
|| flux_rpc_get (r, &json_str) < 0) {
|| flux_rpc_get (r, NULL) < 0) {
fail_errno_last = errno;
fail_nodeid_last = nodeid;
fail_count++;
Expand Down
15 changes: 7 additions & 8 deletions t/loop/rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,18 @@ void rpctest_begin_cb (flux_t *h, flux_msg_handler_t *w,
"flux_rpc_get fails with EPROTO");
flux_rpc_destroy (r);

/* cause local EPROTO (user incorrectly expects payload) */
/* receive NULL payload on empty response */
ok ((r = flux_rpc (h, "rpctest.hello", NULL, FLUX_NODEID_ANY, 0)) != NULL,
"flux_rpc with empty payload works");
ok (flux_rpc_check (r) == false,
"flux_rpc_check says get would block");
errno = 0;
ok (flux_rpc_get (r, &json_str) < 0
&& errno == EPROTO,
"flux_rpc_get fails with EPROTO");
ok (flux_rpc_get (r, &json_str) == 0
&& json_str == NULL,
"flux_rpc_get gets NULL payload on empty response");
flux_rpc_destroy (r);

/* cause local EPROTO (user incorrectly expects empty payload) */
/* flux_rpc_get is ok if user doesn't desire response payload */
errno = 0;
o = Jnew ();
Jadd_int (o, "foo", 42);
Expand All @@ -164,9 +164,8 @@ void rpctest_begin_cb (flux_t *h, flux_msg_handler_t *w,
ok (flux_rpc_check (r) == false,
"flux_rpc_check says get would block");
errno = 0;
ok (flux_rpc_get (r, NULL) < 0
&& errno == EPROTO,
"flux_rpc_get fails with EPROTO");
ok (flux_rpc_get (r, NULL) == 0,
"flux_rpc_get is ok if user doesn't desire response payload");
flux_rpc_destroy (r);
Jput (o);

Expand Down
19 changes: 14 additions & 5 deletions t/request/treq.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ void test_echo (flux_t *h, uint32_t nodeid)
if (!(rpc = flux_rpc (h, "req.echo", Jtostr (in), nodeid, 0))
|| flux_rpc_get (rpc, &json_str) < 0)
log_err_exit ("%s", __FUNCTION__);
if (!(out = Jfromstr (json_str)) || !Jget_str (out, "mumble", &s)
|| strcmp (s, "burble") != 0)
if (!json_str
|| !(out = Jfromstr (json_str))
|| !Jget_str (out, "mumble", &s)
|| strcmp (s, "burble") != 0)
log_msg_exit ("%s: returned payload wasn't an echo", __FUNCTION__);
Jput (in);
Jput (out);
Expand Down Expand Up @@ -188,7 +190,10 @@ void test_src (flux_t *h, uint32_t nodeid)
if (!(rpc = flux_rpc (h, "req.src", NULL, nodeid, 0))
|| flux_rpc_get (rpc, &json_str) < 0)
log_err_exit ("%s", __FUNCTION__);
if (!(out = Jfromstr (json_str)) || !Jget_int (out, "wormz", &i) || i != 42)
if (!json_str
|| !(out = Jfromstr (json_str))
|| !Jget_int (out, "wormz", &i)
|| i != 42)
log_msg_exit ("%s: didn't get expected payload", __FUNCTION__);
Jput (out);
flux_rpc_destroy (rpc);
Expand Down Expand Up @@ -325,7 +330,9 @@ static void xping (flux_t *h, uint32_t nodeid, uint32_t xnodeid, const char *svc
if (!(rpc = flux_rpc (h, "req.xping", Jtostr (in), nodeid, 0))
|| flux_rpc_get (rpc, &json_str) < 0)
log_err_exit ("req.xping");
if (!(out = Jfromstr (json_str)) || !Jget_str (out, "route", &route))
if (!json_str
|| !(out = Jfromstr (json_str))
|| !Jget_str (out, "route", &route))
log_errn_exit (EPROTO, "req.xping");
printf ("hops=%d\n", count_hops (route));
Jput (out);
Expand Down Expand Up @@ -403,7 +410,9 @@ int req_count (flux_t *h, uint32_t nodeid)
if (!(rpc = flux_rpc (h, "req.count", NULL, nodeid, 0))
|| flux_rpc_get (rpc, &json_str) < 0)
goto done;
if (!(out = Jfromstr (json_str)) || !Jget_int (out, "count", &count)) {
if (!json_str
|| !(out = Jfromstr (json_str))
|| !Jget_int (out, "count", &count)) {
errno = EPROTO;
goto done;
}
Expand Down

0 comments on commit 997cd05

Please sign in to comment.