Skip to content

Commit

Permalink
Merge pull request #85 from garlick/api-handle-dq
Browse files Browse the repository at this point in the history
handle deferrred messages with an inproc socket.
Fixes #81.
  • Loading branch information
grondo committed Oct 24, 2014
2 parents 1aa8265 + 7cb68f3 commit d69a94f
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 104 deletions.
73 changes: 49 additions & 24 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ typedef struct {
int event_rx;
} plugin_stats_t;

/* deferred message queue */
typedef struct {
void *zs_resp[2]; /* [0]=read, [1]=write */
} dq_t;

#define PLUGIN_MAGIC 0xfeefbe01
struct plugin_ctx_struct {
int magic;
Expand All @@ -75,7 +80,7 @@ struct plugin_ctx_struct {
mod_main_f *main;
plugin_stats_t stats;
zloop_t *zloop;
zlist_t *deferred_responses;
dq_t *dq;
void *zctx;
flux_t h;
const char *name;
Expand All @@ -99,6 +104,46 @@ static void plugin_reactor_stop (void *impl, int rc);

static const struct flux_handle_ops plugin_handle_ops;

static int dq_resp_cb (zloop_t *zl, zmq_pollitem_t *item, void *arg)
{
plugin_ctx_t p = arg;
zmsg_t *z = zmsg_recv_nowait (item->socket);
if (z) {
if (flux_handle_event_msg (p->h, FLUX_MSGTYPE_RESPONSE, &z) < 0) {
plugin_reactor_stop (p, -1);
goto done;
}
}
done:
ZLOOP_RETURN(p);
}

static dq_t *dq_create (plugin_ctx_t p)
{
char *resp_uri = xasprintf ("inproc://dq-resp-%p", p);
zmq_pollitem_t zp = { .events = ZMQ_POLLIN, .fd = -1 };
dq_t *dq = xzmalloc (sizeof (*dq));

zbind (p->zctx, &dq->zs_resp[1], ZMQ_PAIR, resp_uri, -1);
zconnect (p->zctx, &dq->zs_resp[0], ZMQ_PAIR, resp_uri, -1, NULL);
zp.socket = dq->zs_resp[0];
if (zloop_poller (p->zloop, &zp, dq_resp_cb, p) < 0)
oom ();

free (resp_uri);
return dq;
}

static void dq_destroy (dq_t *dq)
{
/* N.B. zctx destroy takes care of PAIR sockets */
free (dq);
}

static int dq_put (dq_t *dq, zmsg_t **zmsg)
{
return zmsg_send (zmsg, dq->zs_resp[1]);
}

/**
** flux_t implementation
Expand Down Expand Up @@ -152,7 +197,7 @@ static int plugin_response_putmsg (void *impl, zmsg_t **zmsg)
{
plugin_ctx_t p = impl;
assert (p->magic == PLUGIN_MAGIC);
if (zlist_append (p->deferred_responses, *zmsg) < 0)
if (dq_put (p->dq, zmsg) < 0)
oom ();
*zmsg = NULL;
return 0;
Expand Down Expand Up @@ -415,26 +460,13 @@ static void plugin_handle_response (plugin_ctx_t p, zmsg_t *zmsg)
zmsg_destroy (&zmsg);
}

/* Process any responses received during synchronous request-reply handling.
* Call this after every plugin callback that may have invoked one of the
* synchronous request-reply functions.
*/
static void plugin_handle_deferred_responses (plugin_ctx_t p)
{
zmsg_t *zmsg;

while ((zmsg = zlist_pop (p->deferred_responses)))
plugin_handle_response (p, zmsg);
}

/* Handle a response.
*/
static int request_cb (zloop_t *zl, zmq_pollitem_t *item, plugin_ctx_t p)
{
zmsg_t *zmsg = zmsg_recv (p->zs_request);

plugin_handle_response (p, zmsg);
plugin_handle_deferred_responses (p);
ZLOOP_RETURN(p);
}

Expand Down Expand Up @@ -462,7 +494,6 @@ static int svc_cb (zloop_t *zl, zmq_pollitem_t *item, plugin_ctx_t p)
goto done;
}
}
plugin_handle_deferred_responses (p);
done:
if (zmsg)
zmsg_destroy (&zmsg);
Expand All @@ -481,7 +512,6 @@ static int event_cb (zloop_t *zl, zmq_pollitem_t *item, plugin_ctx_t p)
goto done;
}
}
plugin_handle_deferred_responses (p);
done:
if (zmsg)
zmsg_destroy (&zmsg);
Expand Down Expand Up @@ -557,6 +587,7 @@ static void *plugin_thread (void *arg)
p->zloop = plugin_zloop_create (p);
if (p->zloop == NULL)
err_exit ("%s: plugin_zloop_create", p->name);
p->dq = dq_create (p);

/* Register callbacks for "internal" methods.
* These can be overridden in p->ops->main() if desired.
Expand All @@ -571,6 +602,7 @@ static void *plugin_thread (void *arg)
goto done;
}
done:
dq_destroy (p->dq);
zloop_destroy (&p->zloop);
zstr_send (p->zs_svc[0], ""); /* EOF */

Expand Down Expand Up @@ -605,7 +637,6 @@ int plugin_size (plugin_ctx_t p)
void plugin_destroy (plugin_ctx_t p)
{
int errnum;
zmsg_t *zmsg;

if (p->t) {
errnum = pthread_join (p->t, NULL);
Expand All @@ -620,10 +651,6 @@ void plugin_destroy (plugin_ctx_t p)
zsocket_destroy (p->zctx, p->zs_svc[1]);
zsocket_destroy (p->zctx, p->zs_request);

while ((zmsg = zlist_pop (p->deferred_responses)))
zmsg_destroy (&zmsg);
zlist_destroy (&p->deferred_responses);

dlclose (p->dso);
zuuid_destroy (&p->uuid);
free (p->svc_uri);
Expand Down Expand Up @@ -684,8 +711,6 @@ plugin_ctx_t plugin_create (flux_t h, const char *path, zhash_t *args)
if (!(p->uuid = zuuid_new ()))
oom ();
p->rank = flux_rank (h);
if (!(p->deferred_responses = zlist_new ()))
oom ();

p->h = flux_handle_create (p, &plugin_handle_ops, 0);
flux_log_set_facility (p->h, p->name);
Expand Down
Loading

0 comments on commit d69a94f

Please sign in to comment.