Skip to content

Commit

Permalink
modules: use inproc sockets for deferred req
Browse files Browse the repository at this point in the history
    This allows deferred messages to make the reactor ready and
    should address issue flux-framework#81 for module handles.
  • Loading branch information
garlick committed Oct 23, 2014
1 parent 632ab96 commit 28cabc8
Showing 1 changed file with 49 additions and 24 deletions.
73 changes: 49 additions & 24 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ typedef struct {
int event_rx;
} plugin_stats_t;

/* deferred message queue */
typedef struct {
void *zs_resp[2]; /* [0]=read, [1]=write */
} dq_t;

#define PLUGIN_MAGIC 0xfeefbe01
struct plugin_ctx_struct {
int magic;
Expand All @@ -75,7 +80,7 @@ struct plugin_ctx_struct {
mod_main_f *main;
plugin_stats_t stats;
zloop_t *zloop;
zlist_t *deferred_responses;
dq_t *dq;
void *zctx;
flux_t h;
const char *name;
Expand All @@ -99,6 +104,46 @@ static void plugin_reactor_stop (void *impl, int rc);

static const struct flux_handle_ops plugin_handle_ops;

static int dq_resp_cb (zloop_t *zl, zmq_pollitem_t *item, void *arg)
{
plugin_ctx_t p = arg;
zmsg_t *z = zmsg_recv_nowait (item->socket);
if (z) {
if (flux_handle_event_msg (p->h, FLUX_MSGTYPE_RESPONSE, &z) < 0) {
plugin_reactor_stop (p, -1);
goto done;
}
}
done:
ZLOOP_RETURN(p);
}

static dq_t *dq_create (plugin_ctx_t p)
{
char *resp_uri = xasprintf ("inproc://dq-resp-%p", p);
zmq_pollitem_t zp = { .events = ZMQ_POLLIN, .fd = -1 };
dq_t *dq = xzmalloc (sizeof (*dq));

zbind (p->zctx, &dq->zs_resp[1], ZMQ_PAIR, resp_uri, -1);
zconnect (p->zctx, &dq->zs_resp[0], ZMQ_PAIR, resp_uri, -1, NULL);
zp.socket = dq->zs_resp[0];
if (zloop_poller (p->zloop, &zp, dq_resp_cb, p) < 0)
oom ();

free (resp_uri);
return dq;
}

static void dq_destroy (dq_t *dq)
{
/* N.B. zctx destroy takes care of PAIR sockets */
free (dq);
}

static int dq_put (dq_t *dq, zmsg_t **zmsg)
{
return zmsg_send (zmsg, dq->zs_resp[1]);
}

/**
** flux_t implementation
Expand Down Expand Up @@ -152,7 +197,7 @@ static int plugin_response_putmsg (void *impl, zmsg_t **zmsg)
{
plugin_ctx_t p = impl;
assert (p->magic == PLUGIN_MAGIC);
if (zlist_append (p->deferred_responses, *zmsg) < 0)
if (dq_put (p->dq, zmsg) < 0)
oom ();
*zmsg = NULL;
return 0;
Expand Down Expand Up @@ -415,26 +460,13 @@ static void plugin_handle_response (plugin_ctx_t p, zmsg_t *zmsg)
zmsg_destroy (&zmsg);
}

/* Process any responses received during synchronous request-reply handling.
* Call this after every plugin callback that may have invoked one of the
* synchronous request-reply functions.
*/
static void plugin_handle_deferred_responses (plugin_ctx_t p)
{
zmsg_t *zmsg;

while ((zmsg = zlist_pop (p->deferred_responses)))
plugin_handle_response (p, zmsg);
}

/* Handle a response.
*/
static int request_cb (zloop_t *zl, zmq_pollitem_t *item, plugin_ctx_t p)
{
zmsg_t *zmsg = zmsg_recv (p->zs_request);

plugin_handle_response (p, zmsg);
plugin_handle_deferred_responses (p);
ZLOOP_RETURN(p);
}

Expand Down Expand Up @@ -462,7 +494,6 @@ static int svc_cb (zloop_t *zl, zmq_pollitem_t *item, plugin_ctx_t p)
goto done;
}
}
plugin_handle_deferred_responses (p);
done:
if (zmsg)
zmsg_destroy (&zmsg);
Expand All @@ -481,7 +512,6 @@ static int event_cb (zloop_t *zl, zmq_pollitem_t *item, plugin_ctx_t p)
goto done;
}
}
plugin_handle_deferred_responses (p);
done:
if (zmsg)
zmsg_destroy (&zmsg);
Expand Down Expand Up @@ -557,6 +587,7 @@ static void *plugin_thread (void *arg)
p->zloop = plugin_zloop_create (p);
if (p->zloop == NULL)
err_exit ("%s: plugin_zloop_create", p->name);
p->dq = dq_create (p);

/* Register callbacks for "internal" methods.
* These can be overridden in p->ops->main() if desired.
Expand All @@ -571,6 +602,7 @@ static void *plugin_thread (void *arg)
goto done;
}
done:
dq_destroy (p->dq);
zloop_destroy (&p->zloop);
zstr_send (p->zs_svc[0], ""); /* EOF */

Expand Down Expand Up @@ -605,7 +637,6 @@ int plugin_size (plugin_ctx_t p)
void plugin_destroy (plugin_ctx_t p)
{
int errnum;
zmsg_t *zmsg;

if (p->t) {
errnum = pthread_join (p->t, NULL);
Expand All @@ -620,10 +651,6 @@ void plugin_destroy (plugin_ctx_t p)
zsocket_destroy (p->zctx, p->zs_svc[1]);
zsocket_destroy (p->zctx, p->zs_request);

while ((zmsg = zlist_pop (p->deferred_responses)))
zmsg_destroy (&zmsg);
zlist_destroy (&p->deferred_responses);

dlclose (p->dso);
zuuid_destroy (&p->uuid);
free (p->svc_uri);
Expand Down Expand Up @@ -684,8 +711,6 @@ plugin_ctx_t plugin_create (flux_t h, const char *path, zhash_t *args)
if (!(p->uuid = zuuid_new ()))
oom ();
p->rank = flux_rank (h);
if (!(p->deferred_responses = zlist_new ()))
oom ();

p->h = flux_handle_create (p, &plugin_handle_ops, 0);
flux_log_set_facility (p->h, p->name);
Expand Down

0 comments on commit 28cabc8

Please sign in to comment.