Skip to content

Commit

Permalink
libflux: fatalize more key functions in handle.c
Browse files Browse the repository at this point in the history
Tidy up flux_recvmsg(), flux_sendmsg() use of FLUX_FATAL().

Add calls to FLUX_FATAL() to the following functions:
  flux_recvmsg_match()
  flux_putmsg()
  flux_pushmsg()
  flux_subscribe()
  flux_unsubscribe()
  flux_rank()
  • Loading branch information
garlick committed Jun 5, 2015
1 parent fc9e428 commit 3972e6b
Showing 1 changed file with 80 additions and 52 deletions.
132 changes: 80 additions & 52 deletions src/common/libflux/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,13 @@ uint32_t flux_matchtag_avail (flux_t h)
int flux_sendmsg (flux_t h, zmsg_t **zmsg)
{
int type;
int rc = -1;

if (!h->ops->sendmsg) {
errno = ENOSYS;
FLUX_FATAL (h);
goto done;
goto fatal;
}
if (flux_msg_get_type (*zmsg, &type) < 0)
goto done;
goto fatal;
switch (type) {
case FLUX_MSGTYPE_REQUEST:
h->msgcounters.request_tx++;
Expand All @@ -318,13 +316,12 @@ int flux_sendmsg (flux_t h, zmsg_t **zmsg)
}
if (h->flags & FLUX_O_TRACE)
flux_msg_fprint (stderr, *zmsg);
if (h->ops->sendmsg (h->impl, zmsg) < 0) {
FLUX_FATAL (h);
goto done;
}
rc = 0;
done:
return rc;
if (h->ops->sendmsg (h->impl, zmsg) < 0)
goto fatal;
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

zmsg_t *flux_recvmsg (flux_t h, bool nonblock)
Expand All @@ -334,37 +331,36 @@ zmsg_t *flux_recvmsg (flux_t h, bool nonblock)

if (!h->ops->recvmsg) {
errno = ENOSYS;
FLUX_FATAL (h);
goto done;
goto fatal;
}
if (!(zmsg = h->ops->recvmsg (h->impl, nonblock))) {
if (errno != EAGAIN && errno != EWOULDBLOCK)
FLUX_FATAL (h);
goto done;
}
if (flux_msg_get_type (zmsg, &type) < 0) {
zmsg_destroy (&zmsg);
errno = EPROTO;
goto done;
if (errno == EAGAIN || errno == EWOULDBLOCK)
return NULL;
goto fatal;
}
switch (type) {
case FLUX_MSGTYPE_REQUEST:
h->msgcounters.request_rx++;
break;
case FLUX_MSGTYPE_RESPONSE:
h->msgcounters.response_rx++;
break;
case FLUX_MSGTYPE_EVENT:
h->msgcounters.event_rx++;
break;
if (flux_msg_get_type (zmsg, &type) == 0) {
switch (type) {
case FLUX_MSGTYPE_REQUEST:
h->msgcounters.request_rx++;
break;
case FLUX_MSGTYPE_RESPONSE:
h->msgcounters.response_rx++;
break;
case FLUX_MSGTYPE_EVENT:
h->msgcounters.event_rx++;
break;
case FLUX_MSGTYPE_KEEPALIVE:
h->msgcounters.keepalive_rx++;
break;
}
}
if ((h->flags & FLUX_O_TRACE))
flux_msg_fprint (stderr, zmsg);
done:
return zmsg;
fatal:
zmsg_destroy (&zmsg);
FLUX_FATAL (h);
return NULL;
}

static int putmsg_list (flux_t h, zlist_t *l)
Expand Down Expand Up @@ -394,9 +390,13 @@ zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, bool nonblock)
zmsg_t *zmsg = NULL;
zlist_t *putmsg = NULL;

if (!h->ops->recvmsg || !h->ops->putmsg) {
errno = ENOSYS;
goto fatal;
}
if (!nonblock && flux_sleep_on (h, match) < 0) {
if (errno != EINVAL)
goto done;
goto fatal;
errno = 0; /* EINVAL: not running in a coprocess */
}
while (!zmsg) {
Expand All @@ -417,15 +417,17 @@ zmsg_t *flux_recvmsg_match (flux_t h, flux_match_t match, bool nonblock)
}
}
done:
if (putmsg) {
if (putmsg_list (h, putmsg) < 0) {
int errnum = errno;
zmsg_destroy (&zmsg);
errno = errnum;
}
zlist_destroy (&putmsg);
if (putmsg_list (h, putmsg) < 0) {
int errnum = errno;
zmsg_destroy (&zmsg);
errno = errnum;
}
zlist_destroy (&putmsg);
return zmsg;
fatal:
zmsg_destroy (&zmsg);
FLUX_FATAL (h);
return NULL;
}

/* FIXME: FLUX_O_TRACE will show these messages being received again
Expand All @@ -434,41 +436,67 @@ int flux_putmsg (flux_t h, zmsg_t **zmsg)
{
if (!h->ops->putmsg) {
errno = ENOSYS;
return -1;
goto fatal;
}
return h->ops->putmsg (h->impl, zmsg);
if (h->ops->putmsg (h->impl, zmsg) < 0)
goto fatal;
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

int flux_pushmsg (flux_t h, zmsg_t **zmsg)
{
if (!h->ops->pushmsg) {
errno = ENOSYS;
return -1;
goto fatal;
}
return h->ops->pushmsg (h->impl, zmsg);
if (h->ops->pushmsg (h->impl, zmsg) < 0)
goto fatal;
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

int flux_event_subscribe (flux_t h, const char *topic)
{
if (!h->ops->event_subscribe)
return 0;
return h->ops->event_subscribe (h->impl, topic);
if (h->ops->event_subscribe) {
if (h->ops->event_subscribe (h->impl, topic) < 0)
goto fatal;
}
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

int flux_event_unsubscribe (flux_t h, const char *topic)
{
if (!h->ops->event_unsubscribe)
return 0;
return h->ops->event_unsubscribe (h->impl, topic);
if (h->ops->event_unsubscribe) {
if (h->ops->event_unsubscribe (h->impl, topic) < 0)
goto fatal;
}
return 0;
fatal:
FLUX_FATAL (h);
return -1;
}

int flux_rank (flux_t h)
{
int rank;
if (!h->ops->rank) {
errno = ENOSYS;
return -1;
goto fatal;
}
return h->ops->rank (h->impl);
if ((rank = h->ops->rank (h->impl)) < 0)
goto fatal;
return rank;
fatal:
FLUX_FATAL (h);
return -1;
}

zctx_t *flux_get_zctx (flux_t h)
Expand Down

0 comments on commit 3972e6b

Please sign in to comment.