Skip to content

Commit

Permalink
api: use inproc sockets for dq class
Browse files Browse the repository at this point in the history
This allows our deferred queue to make the reactor ready and
should address issue flux-framework#81 for api sockets.
  • Loading branch information
garlick authored and grondo committed Oct 31, 2014
1 parent 67076a0 commit 90cb176
Showing 1 changed file with 69 additions and 57 deletions.
126 changes: 69 additions & 57 deletions src/modules/api/libapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@
#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/zfd.h"
#include "src/common/libutil/zconnect.h"

/* deferred message queue */
typedef struct {
zlist_t *resp;
zlist_t *event;
void *zs_resp[2]; /* [0]=read, [1]=write */
void *zs_event[2];
} dq_t;


Expand All @@ -60,6 +61,7 @@ typedef struct {
dq_t *dq;
flux_t h;
zloop_t *zloop;
zctx_t *zctx;
bool reactor_stop;
int reactor_rc;
} cmb_t;
Expand All @@ -78,76 +80,88 @@ static int cmb_request_sendmsg (void *impl, zmsg_t **zmsg)
return zfd_send_typemask (c->fd, FLUX_MSGTYPE_REQUEST, zmsg);
}

static dq_t *dq_create (void)
static int dq_resp_cb (zloop_t *zl, zmq_pollitem_t *item, void *arg)
{
cmb_t *c = arg;
zmsg_t *z = zmsg_recv_nowait (item->socket);
if (z) {
if (flux_handle_event_msg (c->h, FLUX_MSGTYPE_RESPONSE, &z) < 0) {
cmb_reactor_stop (c, -1);
goto done;
}
}
done:
ZLOOP_RETURN(c);
}

static int dq_event_cb (zloop_t *zl, zmq_pollitem_t *item, void *arg)
{
cmb_t *c = arg;
zmsg_t *z = zmsg_recv_nowait (item->socket);
if (z) {
if (flux_handle_event_msg (c->h, FLUX_MSGTYPE_EVENT, &z) < 0) {
cmb_reactor_stop (c, -1);
goto done;
}
}
done:
ZLOOP_RETURN(c);
}

static dq_t *dq_create (cmb_t *c)
{
char *resp_uri = "inproc://flux-api-dq-resp";
char *event_uri = "inproc://flux-api-dq-event";
zmq_pollitem_t zp = { .events = ZMQ_POLLIN, .fd = -1 };
dq_t *dq = xzmalloc (sizeof (*dq));
if (!(dq->resp = zlist_new ()))

zbind (c->zctx, &dq->zs_resp[1], ZMQ_PAIR, resp_uri, -1);
zconnect (c->zctx, &dq->zs_resp[0], ZMQ_PAIR, resp_uri, -1, NULL);
zp.socket = dq->zs_resp[0];
if (zloop_poller (c->zloop, &zp, dq_resp_cb, c) < 0)
oom ();
if (!(dq->event = zlist_new ()))

zbind (c->zctx, &dq->zs_event[1], ZMQ_PAIR, event_uri, -1);
zconnect (c->zctx, &dq->zs_event[0], ZMQ_PAIR, event_uri, -1, NULL);
zp.socket = dq->zs_event[0];
if (zloop_poller (c->zloop, &zp, dq_event_cb, c) < 0)
oom ();

return dq;
}

static void dq_destroy (dq_t *dq)
{
zmsg_t *z;
while ((z = zlist_pop (dq->resp)))
zmsg_destroy (&z);
zlist_destroy (&dq->resp);
while ((z = zlist_pop (dq->event)))
zmsg_destroy (&z);
zlist_destroy (&dq->event);
/* N.B. zctx destroy takes care of PAIR sockets */
free (dq);
}

static int dq_put (dq_t *dq, zmsg_t **zmsg, int typemask)
{
if ((typemask & FLUX_MSGTYPE_EVENT)) {
if (zlist_append (dq->event, *zmsg) < 0)
return -1;
} else if ((typemask & FLUX_MSGTYPE_RESPONSE)) {
if (zlist_append (dq->resp, *zmsg) < 0)
return -1;
} else {
int rc = 0;
if ((typemask & FLUX_MSGTYPE_EVENT))
rc = zmsg_send (zmsg, dq->zs_event[1]);
else if ((typemask & FLUX_MSGTYPE_RESPONSE))
rc = zmsg_send (zmsg, dq->zs_resp[1]);
else
zmsg_destroy (zmsg);
}
*zmsg = NULL;
return 0;
if (rc == 0)
*zmsg = NULL;
return rc;
}

static bool dq_get (dq_t *dq, zmsg_t **zmsg, int typemask)
{
zmsg_t *z = NULL;
if ((typemask & FLUX_MSGTYPE_EVENT)) {
z = zlist_pop (dq->event);
} else if ((typemask & FLUX_MSGTYPE_RESPONSE)) {
z = zlist_pop (dq->resp);
}
if ((typemask & FLUX_MSGTYPE_EVENT))
z = zmsg_recv_nowait (dq->zs_event[0]);
else if ((typemask & FLUX_MSGTYPE_RESPONSE))
z = zmsg_recv_nowait (dq->zs_resp[0]);
if (z)
*zmsg = z;
return (z != NULL);
}

static int process_deferred (cmb_t *c)
{
zmsg_t *zmsg;

while ((dq_get (c->dq, &zmsg, FLUX_MSGTYPE_EVENT))) {
if (flux_handle_event_msg (c->h, FLUX_MSGTYPE_EVENT, &zmsg) < 0) {
cmb_reactor_stop (c, -1);
goto done;
}
}
while ((dq_get (c->dq, &zmsg, FLUX_MSGTYPE_RESPONSE))) {
if (flux_handle_event_msg (c->h, FLUX_MSGTYPE_RESPONSE, &zmsg) < 0) {
cmb_reactor_stop (c, -1);
goto done;
}
}
done:
ZLOOP_RETURN(c);
}

static zmsg_t *cmb_response_recvmsg (void *impl, bool nonblock)
{
cmb_t *c = impl;
Expand Down Expand Up @@ -244,8 +258,6 @@ static int unix_cb (zloop_t *zl, zmq_pollitem_t *item, void *arg)
cmb_reactor_stop (c, -1);
goto done;
}
if (process_deferred (c) < 0)
goto done;
done:
ZLOOP_RETURN(c);
}
Expand All @@ -255,8 +267,6 @@ static int fd_cb (zloop_t *zl, zmq_pollitem_t *item, void *arg)
cmb_t *c = arg;
if (flux_handle_event_fd (c->h, item->fd, item->revents) < 0)
cmb_reactor_stop (c, -1);
else
process_deferred (c);
ZLOOP_RETURN(c);
}

Expand Down Expand Up @@ -289,8 +299,6 @@ static int zs_cb (zloop_t *zl, zmq_pollitem_t *item, void *arg)
cmb_t *c = arg;
if (flux_handle_event_zs (c->h, item->socket, item->revents) < 0)
cmb_reactor_stop (c, -1);
else
process_deferred (c);
ZLOOP_RETURN(c);
}

Expand All @@ -316,8 +324,6 @@ static int tmout_cb (zloop_t *zl, int timer_id, void *arg)

if (flux_handle_event_tmout (c->h, timer_id) < 0)
cmb_reactor_stop (c, -1);
else
process_deferred (c);
ZLOOP_RETURN(c);
}

Expand All @@ -343,9 +349,11 @@ static void cmb_fini (void *impl)
assert (c->magic == CMB_CTX_MAGIC);
if (c->fd >= 0)
(void)close (c->fd);
dq_destroy (c->dq);
if (c->zctx)
zctx_destroy (&c->zctx); /* destroys all sockets created in zctx */
if (c->zloop)
zloop_destroy (&c->zloop);
dq_destroy (c->dq);
free (c);
}

Expand Down Expand Up @@ -376,10 +384,14 @@ flux_t flux_api_openpath (const char *path, int flags)

c = xzmalloc (sizeof (*c));
c->magic = CMB_CTX_MAGIC;
c->dq = dq_create ();
c->rank = -1;
if (!(c->zctx = zctx_new ()))
err_exit ("zctx_new");
zctx_set_iothreads (c->zctx, 0);

if (!(c->zloop = zloop_new ()))
oom ();
c->dq = dq_create (c);

c->fd = socket (AF_UNIX, SOCK_STREAM, 0);
if (c->fd < 0)
Expand Down

0 comments on commit 90cb176

Please sign in to comment.