Skip to content

Commit

Permalink
api: create 'dq' class in libapi
Browse files Browse the repository at this point in the history
Create a tiny abstraction for the deferred message queue in libapi
so we can more easily replace its implementation.
  • Loading branch information
garlick authored and grondo committed Oct 31, 2014
1 parent f4e5bc2 commit 67076a0
Showing 1 changed file with 59 additions and 29 deletions.
88 changes: 59 additions & 29 deletions src/modules/api/libapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/zfd.h"

/* deferred message queue */
typedef struct {
zlist_t *resp;
zlist_t *event;
} dq_t;


#define CMB_CTX_MAGIC 0xf434aaab
typedef struct {
int magic;
int fd;
int rank;
zlist_t *resp; /* deferred */
zlist_t *event; /* deferred */
dq_t *dq;
flux_t h;
zloop_t *zloop;
bool reactor_stop;
Expand All @@ -73,31 +78,67 @@ static int cmb_request_sendmsg (void *impl, zmsg_t **zmsg)
return zfd_send_typemask (c->fd, FLUX_MSGTYPE_REQUEST, zmsg);
}

static void append_deferred (cmb_t *c, zmsg_t **zmsg, int typemask)
static dq_t *dq_create (void)
{
dq_t *dq = xzmalloc (sizeof (*dq));
if (!(dq->resp = zlist_new ()))
oom ();
if (!(dq->event = zlist_new ()))
oom ();
return dq;
}

static void dq_destroy (dq_t *dq)
{
zmsg_t *z;
while ((z = zlist_pop (dq->resp)))
zmsg_destroy (&z);
zlist_destroy (&dq->resp);
while ((z = zlist_pop (dq->event)))
zmsg_destroy (&z);
zlist_destroy (&dq->event);
free (dq);
}

static int dq_put (dq_t *dq, zmsg_t **zmsg, int typemask)
{
if ((typemask & FLUX_MSGTYPE_EVENT)) {
if (zlist_append (c->event, *zmsg) < 0)
oom ();
if (zlist_append (dq->event, *zmsg) < 0)
return -1;
} else if ((typemask & FLUX_MSGTYPE_RESPONSE)) {
if (zlist_append (c->resp, *zmsg) < 0)
oom ();
if (zlist_append (dq->resp, *zmsg) < 0)
return -1;
} else {
zmsg_destroy (zmsg);
}
*zmsg = NULL;
return 0;
}

static bool dq_get (dq_t *dq, zmsg_t **zmsg, int typemask)
{
zmsg_t *z = NULL;
if ((typemask & FLUX_MSGTYPE_EVENT)) {
z = zlist_pop (dq->event);
} else if ((typemask & FLUX_MSGTYPE_RESPONSE)) {
z = zlist_pop (dq->resp);
}
if (z)
*zmsg = z;
return (z != NULL);
}

static int process_deferred (cmb_t *c)
{
zmsg_t *zmsg;

while ((zmsg = zlist_pop (c->event))) {
while ((dq_get (c->dq, &zmsg, FLUX_MSGTYPE_EVENT))) {
if (flux_handle_event_msg (c->h, FLUX_MSGTYPE_EVENT, &zmsg) < 0) {
cmb_reactor_stop (c, -1);
goto done;
}
}
while ((zmsg = zlist_pop (c->resp))) {
while ((dq_get (c->dq, &zmsg, FLUX_MSGTYPE_RESPONSE))) {
if (flux_handle_event_msg (c->h, FLUX_MSGTYPE_RESPONSE, &zmsg) < 0) {
cmb_reactor_stop (c, -1);
goto done;
Expand All @@ -114,10 +155,11 @@ static zmsg_t *cmb_response_recvmsg (void *impl, bool nonblock)
int typemask;

assert (c->magic == CMB_CTX_MAGIC);
if (!(z = zlist_pop (c->resp))) {
while ((z = zfd_recv_typemask (c->fd, &typemask, nonblock))
if (!dq_get (c->dq, &z, FLUX_MSGTYPE_RESPONSE)) {
while ((z = zfd_recv_typemask (c->fd, &typemask, nonblock))
&& !(typemask & FLUX_MSGTYPE_RESPONSE))
append_deferred (c, &z, typemask);
if (dq_put (c->dq, &z, typemask) < 0)
oom ();
}
return z;
}
Expand All @@ -126,10 +168,7 @@ static int cmb_response_putmsg (void *impl, zmsg_t **zmsg)
{
cmb_t *c = impl;
assert (c->magic == CMB_CTX_MAGIC);
if (zlist_append (c->resp, *zmsg) < 0)
return -1;
*zmsg = NULL;
return 0;
return dq_put (c->dq, zmsg, FLUX_MSGTYPE_RESPONSE);
}

static int cmb_event_subscribe (void *impl, const char *s)
Expand All @@ -153,10 +192,10 @@ static zmsg_t *cmb_event_recvmsg (void *impl, bool nonblock)
int typemask;

assert (c->magic == CMB_CTX_MAGIC);
if (!(z = zlist_pop (c->event))) {
if (!dq_get (c->dq, &z, FLUX_MSGTYPE_EVENT)) {
while ((z = zfd_recv_typemask (c->fd, &typemask, nonblock))
&& !(typemask & FLUX_MSGTYPE_EVENT))
append_deferred (c, &z, typemask);
dq_put (c->dq, &z, typemask);
}
return z;
}
Expand Down Expand Up @@ -300,19 +339,13 @@ static void cmb_reactor_tmout_remove (void *impl, int timer_id)
static void cmb_fini (void *impl)
{
cmb_t *c = impl;
zmsg_t *z;

assert (c->magic == CMB_CTX_MAGIC);
if (c->fd >= 0)
(void)close (c->fd);
if (c->zloop)
zloop_destroy (&c->zloop);
while ((z = zlist_pop (c->resp)))
zmsg_destroy (&z);
zlist_destroy (&c->resp);
while ((z = zlist_pop (c->event)))
zmsg_destroy (&z);
zlist_destroy (&c->event);
dq_destroy (c->dq);
free (c);
}

Expand Down Expand Up @@ -342,11 +375,8 @@ flux_t flux_api_openpath (const char *path, int flags)
char *pidfile = NULL;

c = xzmalloc (sizeof (*c));
if (!(c->resp = zlist_new ()))
oom ();
if (!(c->event = zlist_new ()))
oom ();
c->magic = CMB_CTX_MAGIC;
c->dq = dq_create ();
c->rank = -1;
if (!(c->zloop = zloop_new ()))
oom ();
Expand Down

0 comments on commit 67076a0

Please sign in to comment.