Skip to content

Commit

Permalink
Merge pull request #3997 from garlick/subscribe_redo
Browse files Browse the repository at this point in the history
connectors: avoid embedded synchronous RPC for subscribe/unsubscribe
  • Loading branch information
mergify[bot] authored Dec 8, 2021
2 parents 2593ee3 + 4d8fe75 commit 87c800b
Show file tree
Hide file tree
Showing 33 changed files with 492 additions and 336 deletions.
8 changes: 8 additions & 0 deletions doc/man3/flux_open.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ the value of $FLUX_URI is used, if set.
FLUX_O_TRACE
Dumps message trace to stderr.

FLUX_O_CLONE
Used internally by ``flux_clone()`` (see below).

FLUX_O_MATCHDEBUG
Prints diagnostic to stderr when matchtags are leaked, for example when
a streaming RPC is destroyed without receiving a error response as
Expand All @@ -39,6 +42,11 @@ FLUX_O_MATCHDEBUG
FLUX_O_NONBLOCK
The ``flux_send()`` and ``flux_recv()`` functions should never block.

FLUX_O_TEST_NOSUB
Make ``flux_event_subscribe()` and ``flux_event_unsubscribe()`` no-ops.
This may be useful in specialized situations with the ``loop://`` connector,
where no message handler is available to service subscription RPCs.

``flux_clone()`` creates another reference to a ``flux_t`` handle that is
identical to the original in all respects except that it does not inherit
a copy of the original handle's "aux" hash, or its reactor and message
Expand Down
123 changes: 14 additions & 109 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,8 @@ int main (int argc, char *argv[])
}

/* Arrange for the publisher to route event messages.
* handle_event - local subscribers (ctx.h)
*/
if (!(ctx.publisher = publisher_create (ctx.h,
if (!(ctx.publisher = publisher_create (&ctx,
(publisher_send_f)handle_event,
&ctx))) {
log_err ("error setting up event publishing service");
Expand Down Expand Up @@ -358,13 +357,6 @@ int main (int argc, char *argv[])
goto cleanup;
}

/* Create content cache.
*/
if (!(ctx.cache = content_cache_create (ctx.h, ctx.attrs))) {
log_err ("content_cache_create");
goto cleanup;
}

if (ctx.verbose) {
const char *parent = overlay_get_parent_uri (ctx.overlay);
const char *child = overlay_get_bind_uri (ctx.overlay);
Expand Down Expand Up @@ -429,6 +421,18 @@ int main (int argc, char *argv[])
log_err ("broker_add_services");
goto cleanup;
}
/* These two broker-resident services call flux_sync_create(), thus
* require event.subscribe to have a handler before running.
*/
if (overlay_keepalive_start (ctx.overlay) < 0) {
log_err ("error initializing overlay keepalives");
goto cleanup;
}
if (!(ctx.cache = content_cache_create (ctx.h, ctx.attrs))) {
log_err ("error initializing content cache");
goto cleanup;
}


/* Initialize module infrastructure.
*/
Expand Down Expand Up @@ -1169,60 +1173,6 @@ static void broker_disconnect_cb (flux_t *h, flux_msg_handler_t *mh,
/* no response */
}

static void broker_sub_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
broker_ctx_t *ctx = arg;
const char *uuid;
const char *topic;

if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0)
goto error;
if (!(uuid = flux_msg_route_first (msg))) {
errno = EPROTO;
goto error;
}
if (!uuid) {
errno = EPROTO;
goto error;
}
if (module_subscribe (ctx->modhash, uuid, topic) < 0)
goto error;
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static void broker_unsub_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
broker_ctx_t *ctx = arg;
const char *uuid;
const char *topic;

if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0)
goto error;
if (!(uuid = flux_msg_route_first (msg))) {
errno = EPROTO;
goto error;
}
if (!uuid) {
errno = EPROTO;
goto error;
}
if (module_unsubscribe (ctx->modhash, uuid, topic) < 0)
goto error;
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static int route_to_handle (const flux_msg_t *msg, void *arg)
{
broker_ctx_t *ctx = arg;
Expand Down Expand Up @@ -1350,18 +1300,6 @@ static const struct flux_msg_handler_spec htab[] = {
broker_disconnect_cb,
0
},
{
FLUX_MSGTYPE_REQUEST,
"broker.sub",
broker_sub_cb,
0
},
{
FLUX_MSGTYPE_REQUEST,
"broker.unsub",
broker_unsub_cb,
0
},
{
FLUX_MSGTYPE_REQUEST,
"service.add",
Expand All @@ -1388,7 +1326,7 @@ static struct internal_service services[] = {
{ "content", NULL },
{ "attr", NULL },
{ "heaptrace", NULL },
{ "event", "[0]" },
{ "event", NULL },
{ "service", NULL },
{ "overlay", NULL },
{ "config", NULL },
Expand Down Expand Up @@ -1855,41 +1793,8 @@ static int broker_send (void *impl, const flux_msg_t *msg, int flags)
return rc;
}

static int broker_subscribe (void *impl, const char *topic)
{
broker_ctx_t *ctx = impl;
char *cpy = NULL;

if (!(cpy = strdup (topic)))
goto nomem;
if (zlist_append (ctx->subscriptions, cpy) < 0)
goto nomem;
zlist_freefn (ctx->subscriptions, cpy, free, true);
return 0;
nomem:
free (cpy);
errno = ENOMEM;
return -1;
}

static int broker_unsubscribe (void *impl, const char *topic)
{
broker_ctx_t *ctx = impl;
char *s = zlist_first (ctx->subscriptions);
while (s) {
if (!strcmp (s, topic)) {
zlist_remove (ctx->subscriptions, s);
break;
}
s = zlist_next (ctx->subscriptions);
}
return 0;
}

static const struct flux_handle_ops broker_handle_ops = {
.send = broker_send,
.event_subscribe = broker_subscribe,
.event_unsubscribe = broker_unsubscribe,
};


Expand Down
13 changes: 10 additions & 3 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,16 @@ static void sync_cb (flux_future_t *f, void *arg)
flux_future_reset (f);
}

int overlay_keepalive_start (struct overlay *ov)
{
if (!ov->f_sync) {
if (!(ov->f_sync = flux_sync_create (ov->h, sync_min))
|| flux_future_then (ov->f_sync, sync_max, sync_cb, ov) < 0)
return -1;
}
return 0;
}

const char *overlay_get_bind_uri (struct overlay *ov)
{
return ov->bind_uri;
Expand Down Expand Up @@ -1772,9 +1782,6 @@ struct overlay *overlay_create (flux_t *h,
goto error;
if (flux_msg_handler_addvec (h, htab, ov, &ov->handlers) < 0)
goto error;
if (!(ov->f_sync = flux_sync_create (h, sync_min))
|| flux_future_then (ov->f_sync, sync_max, sync_cb, ov) < 0)
goto error;
if (!(ov->cert = zcert_new ()))
goto nomem;
if (!(ov->health_requests = flux_msglist_create ()))
Expand Down
6 changes: 6 additions & 0 deletions src/broker/overlay.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ struct overlay *overlay_create (flux_t *h,
void *arg);
void overlay_destroy (struct overlay *ov);

/* Start sending keepalive messages to parent and monitoring peers.
* This registers a sync callback, and will fail if event.subscribe
* doesn't have a handler yet.
*/
int overlay_keepalive_start (struct overlay *ov);

/* Set the overlay network size and rank of this broker.
*/
int overlay_set_geometry (struct overlay *ov, uint32_t size, uint32_t rank);
Expand Down
Loading

0 comments on commit 87c800b

Please sign in to comment.