diff --git a/src/modules/api/libapi.c b/src/modules/api/libapi.c index 1a10b6b7cf17..9176e9c59655 100644 --- a/src/modules/api/libapi.c +++ b/src/modules/api/libapi.c @@ -45,14 +45,19 @@ #include "src/common/libutil/xzmalloc.h" #include "src/common/libutil/zfd.h" +/* deferred message queue */ +typedef struct { + zlist_t *resp; + zlist_t *event; +} dq_t; + #define CMB_CTX_MAGIC 0xf434aaab typedef struct { int magic; int fd; int rank; - zlist_t *resp; /* deferred */ - zlist_t *event; /* deferred */ + dq_t *dq; flux_t h; zloop_t *zloop; bool reactor_stop; @@ -73,31 +78,67 @@ static int cmb_request_sendmsg (void *impl, zmsg_t **zmsg) return zfd_send_typemask (c->fd, FLUX_MSGTYPE_REQUEST, zmsg); } -static void append_deferred (cmb_t *c, zmsg_t **zmsg, int typemask) +static dq_t *dq_create (void) +{ + dq_t *dq = xzmalloc (sizeof (*dq)); + if (!(dq->resp = zlist_new ())) + oom (); + if (!(dq->event = zlist_new ())) + oom (); + return dq; +} + +static void dq_destroy (dq_t *dq) +{ + zmsg_t *z; + while ((z = zlist_pop (dq->resp))) + zmsg_destroy (&z); + zlist_destroy (&dq->resp); + while ((z = zlist_pop (dq->event))) + zmsg_destroy (&z); + zlist_destroy (&dq->event); + free (dq); +} + +static int dq_put (dq_t *dq, zmsg_t **zmsg, int typemask) { if ((typemask & FLUX_MSGTYPE_EVENT)) { - if (zlist_append (c->event, *zmsg) < 0) - oom (); + if (zlist_append (dq->event, *zmsg) < 0) + return -1; } else if ((typemask & FLUX_MSGTYPE_RESPONSE)) { - if (zlist_append (c->resp, *zmsg) < 0) - oom (); + if (zlist_append (dq->resp, *zmsg) < 0) + return -1; } else { zmsg_destroy (zmsg); } *zmsg = NULL; + return 0; +} + +static bool dq_get (dq_t *dq, zmsg_t **zmsg, int typemask) +{ + zmsg_t *z = NULL; + if ((typemask & FLUX_MSGTYPE_EVENT)) { + z = zlist_pop (dq->event); + } else if ((typemask & FLUX_MSGTYPE_RESPONSE)) { + z = zlist_pop (dq->resp); + } + if (z) + *zmsg = z; + return (z != NULL); } static int process_deferred (cmb_t *c) { zmsg_t *zmsg; - while ((zmsg = zlist_pop (c->event))) { + while ((dq_get (c->dq, &zmsg, FLUX_MSGTYPE_EVENT))) { if (flux_handle_event_msg (c->h, FLUX_MSGTYPE_EVENT, &zmsg) < 0) { cmb_reactor_stop (c, -1); goto done; } } - while ((zmsg = zlist_pop (c->resp))) { + while ((dq_get (c->dq, &zmsg, FLUX_MSGTYPE_RESPONSE))) { if (flux_handle_event_msg (c->h, FLUX_MSGTYPE_RESPONSE, &zmsg) < 0) { cmb_reactor_stop (c, -1); goto done; @@ -114,10 +155,11 @@ static zmsg_t *cmb_response_recvmsg (void *impl, bool nonblock) int typemask; assert (c->magic == CMB_CTX_MAGIC); - if (!(z = zlist_pop (c->resp))) { - while ((z = zfd_recv_typemask (c->fd, &typemask, nonblock)) + if (!dq_get (c->dq, &z, FLUX_MSGTYPE_RESPONSE)) { + while ((z = zfd_recv_typemask (c->fd, &typemask, nonblock)) && !(typemask & FLUX_MSGTYPE_RESPONSE)) - append_deferred (c, &z, typemask); + if (dq_put (c->dq, &z, typemask) < 0) + oom (); } return z; } @@ -126,10 +168,7 @@ static int cmb_response_putmsg (void *impl, zmsg_t **zmsg) { cmb_t *c = impl; assert (c->magic == CMB_CTX_MAGIC); - if (zlist_append (c->resp, *zmsg) < 0) - return -1; - *zmsg = NULL; - return 0; + return dq_put (c->dq, zmsg, FLUX_MSGTYPE_RESPONSE); } static int cmb_event_subscribe (void *impl, const char *s) @@ -153,10 +192,10 @@ static zmsg_t *cmb_event_recvmsg (void *impl, bool nonblock) int typemask; assert (c->magic == CMB_CTX_MAGIC); - if (!(z = zlist_pop (c->event))) { + if (!dq_get (c->dq, &z, FLUX_MSGTYPE_EVENT)) { while ((z = zfd_recv_typemask (c->fd, &typemask, nonblock)) && !(typemask & FLUX_MSGTYPE_EVENT)) - append_deferred (c, &z, typemask); + dq_put (c->dq, &z, typemask); } return z; } @@ -300,19 +339,13 @@ static void cmb_reactor_tmout_remove (void *impl, int timer_id) static void cmb_fini (void *impl) { cmb_t *c = impl; - zmsg_t *z; assert (c->magic == CMB_CTX_MAGIC); if (c->fd >= 0) (void)close (c->fd); if (c->zloop) zloop_destroy (&c->zloop); - while ((z = zlist_pop (c->resp))) - zmsg_destroy (&z); - zlist_destroy (&c->resp); - while ((z = zlist_pop (c->event))) - zmsg_destroy (&z); - zlist_destroy (&c->event); + dq_destroy (c->dq); free (c); } @@ -342,11 +375,8 @@ flux_t flux_api_openpath (const char *path, int flags) char *pidfile = NULL; c = xzmalloc (sizeof (*c)); - if (!(c->resp = zlist_new ())) - oom (); - if (!(c->event = zlist_new ())) - oom (); c->magic = CMB_CTX_MAGIC; + c->dq = dq_create (); c->rank = -1; if (!(c->zloop = zloop_new ())) oom ();