From 4e76d833678f8b4068b926c02fa5bc8dbecde84f Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 6 Dec 2021 07:04:26 -0800 Subject: [PATCH 01/13] libflux: avoid double close on fd in flux_close() Problem: flux_close() calls flux_handle_destroy(), which closes the file descriptor it obtained from connector->pollfd(), but that file descriptor is owned by the implementation, and should not be closed by users. N.B. most likely this exists because in the shmem connector, the fd is obtained by calling zsock_fd() on the zsock_t object, and zsock(3) says: Caller owns return value and must destroy it when done However, zsock_fd() is just a wrapper for zmq_getsockopt (ZMQ_FD), and zmq_getsockopt(3) says: CAUTION: The returned file descriptor is intended for use with a 'poll' or similar system call only. Applications must never attempt to read or write data to it directly, neither should they try to close it. Most likely the zsock(3) documentation is incorrect. In the other connector implementations, it is more obvious that the fd belongs to the implementation and is closed by connnector->impl_destroy(). Remove extra close from flux_handle_destroy(). --- src/common/libflux/handle.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/common/libflux/handle.c b/src/common/libflux/handle.c index 94852fd66ac8..d113d30c213d 100644 --- a/src/common/libflux/handle.c +++ b/src/common/libflux/handle.c @@ -386,8 +386,6 @@ void flux_handle_destroy (flux_t *h) dlclose (h->dso); #endif flux_msglist_destroy (h->queue); - if (h->pollfd >= 0) - (void)close (h->pollfd); } free (h); errno = saved_errno; From 79ea92fb42f606f72e3c7139abfce551229929d1 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 7 Dec 2021 09:53:42 -0800 Subject: [PATCH 02/13] testsuite: fix typo Problem: router unit test diag was cut and pasted from the previous test and shows the wrong topic string. Show correct topic string. --- src/common/librouter/test/router.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/librouter/test/router.c b/src/common/librouter/test/router.c index eeaf8de1f274..1a1fb4692055 100644 --- a/src/common/librouter/test/router.c +++ b/src/common/librouter/test/router.c @@ -228,7 +228,7 @@ void test_basic (flux_t *h) "{\"service\":\"testfu\"}"))) BAIL_OUT ("flux_request_encode failed"); router_entry_recv (entry, request); // router receives message from abcd - diag ("basic: sent local.sub request"); + diag ("basic: sent service.add request"); flux_msg_destroy (request); ok (flux_reactor_run (r, 0) >= 0, "basic: reactor processed one message"); From fdd16a3e857d79f78da62863f846bdddf8c218da Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 7 Dec 2021 09:56:48 -0800 Subject: [PATCH 03/13] libtestuitil: add flags arg to test_server_create() Problem: loopback_create() accepts a flags argument, but test_server_create() does not. Add a flags argument to test_server_create() which will be useful later for setting test-only handle flags. Update tests that use this function. --- src/common/libflux/test/attr.c | 2 +- src/common/libflux/test/log.c | 2 +- src/common/libflux/test/rpc.c | 6 +++--- src/common/libflux/test/rpc_chained.c | 2 +- src/common/librouter/test/router.c | 2 +- src/common/librouter/test/servhash.c | 2 +- src/common/librouter/test/usock_echo.c | 2 +- src/common/librouter/test/usock_emfile.c | 2 +- src/common/librouter/test/usock_epipe.c | 2 +- src/common/libterminus/test/pty.c | 4 ++-- src/common/libterminus/test/terminus.c | 4 ++-- src/common/libtestutil/util.c | 3 +-- src/common/libtestutil/util.h | 2 +- 13 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/common/libflux/test/attr.c b/src/common/libflux/test/attr.c index 003996a2143b..1fb0e192b24d 100644 --- a/src/common/libflux/test/attr.c +++ b/src/common/libflux/test/attr.c @@ -179,7 +179,7 @@ int main (int argc, char *argv[]) test_server_environment_init ("attr-test"); - if (!(h = test_server_create (test_server, NULL))) + if (!(h = test_server_create (0, test_server, NULL))) BAIL_OUT ("test_server_create failed"); /* get ENOENT */ diff --git a/src/common/libflux/test/log.c b/src/common/libflux/test/log.c index f03c661c78e9..eb1d67d3f671 100644 --- a/src/common/libflux/test/log.c +++ b/src/common/libflux/test/log.c @@ -25,7 +25,7 @@ int main (int argc, char *argv[]) test_server_environment_init ("log-test"); - if (!(h = test_server_create (NULL, NULL))) + if (!(h = test_server_create (0, NULL, NULL))) BAIL_OUT ("could not create test server"); if (flux_attr_set_cacheonly (h, "rank", "0") < 0) BAIL_OUT ("flux_attr_set_cacheonly failed"); diff --git a/src/common/libflux/test/rpc.c b/src/common/libflux/test/rpc.c index cdede8b649d0..0e23e836f389 100644 --- a/src/common/libflux/test/rpc.c +++ b/src/common/libflux/test/rpc.c @@ -917,14 +917,14 @@ void test_fake_server (void) { flux_t *h; - ok ((h = test_server_create (fake_server, NULL)) != NULL, + ok ((h = test_server_create (0, fake_server, NULL)) != NULL, "test_server_create (recv loop)"); ok (test_server_stop (h) == 0, "test_server_stop worked"); flux_close (h); diag ("completed test with server recv loop"); - ok ((h = test_server_create (fake_server_reactor, NULL)) != NULL, + ok ((h = test_server_create (0, fake_server_reactor, NULL)) != NULL, "test_server_create (reactor)"); ok ((test_server_stop (h)) == 0, "test_server_stop worked"); @@ -965,7 +965,7 @@ int main (int argc, char *argv[]) test_fake_server (); - h = test_server_create (test_server, NULL); + h = test_server_create (0, test_server, NULL); ok (h != NULL, "created test server thread"); if (!h) diff --git a/src/common/libflux/test/rpc_chained.c b/src/common/libflux/test/rpc_chained.c index d0949d252e4c..501ea2db0290 100644 --- a/src/common/libflux/test/rpc_chained.c +++ b/src/common/libflux/test/rpc_chained.c @@ -348,7 +348,7 @@ int main (int argc, char *argv[]) test_server_environment_init ("rpc-chained-test"); - h = test_server_create (test_server, NULL); + h = test_server_create (0, test_server, NULL); ok (h != NULL, "created test server thread"); if (!h) diff --git a/src/common/librouter/test/router.c b/src/common/librouter/test/router.c index 1a1fb4692055..c6929623abec 100644 --- a/src/common/librouter/test/router.c +++ b/src/common/librouter/test/router.c @@ -266,7 +266,7 @@ int main (int argc, char *argv[]) diag ("starting test server"); test_server_environment_init ("test_router"); - if (!(h = test_server_create (server_cb, NULL))) + if (!(h = test_server_create (0, server_cb, NULL))) BAIL_OUT ("test_server_create failed"); test_basic (h); diff --git a/src/common/librouter/test/servhash.c b/src/common/librouter/test/servhash.c index fcea1135e5e0..ffccb795a496 100644 --- a/src/common/librouter/test/servhash.c +++ b/src/common/librouter/test/servhash.c @@ -248,7 +248,7 @@ int main (int argc, char *argv[]) diag ("starting test server"); test_server_environment_init ("test_router"); - if (!(h = test_server_create (server_cb, NULL))) + if (!(h = test_server_create (0, server_cb, NULL))) BAIL_OUT ("test_server_create failed"); test_basic (h); diff --git a/src/common/librouter/test/usock_echo.c b/src/common/librouter/test/usock_echo.c index 1a7ae94a90ac..f172df451eb0 100644 --- a/src/common/librouter/test/usock_echo.c +++ b/src/common/librouter/test/usock_echo.c @@ -300,7 +300,7 @@ int main (int argc, char *argv[]) diag ("starting test server"); test_server_environment_init ("usock_server"); - if (!(h = test_server_create (server_cb, NULL))) + if (!(h = test_server_create (0, server_cb, NULL))) BAIL_OUT ("test_server_create failed"); test_early_disconnect (h); diff --git a/src/common/librouter/test/usock_emfile.c b/src/common/librouter/test/usock_emfile.c index e567dccea366..5514251cb282 100644 --- a/src/common/librouter/test/usock_emfile.c +++ b/src/common/librouter/test/usock_emfile.c @@ -273,7 +273,7 @@ int main (int argc, char *argv[]) diag ("starting test server"); test_server_environment_init ("usock_server"); - if (!(h = test_server_create (server_cb, &tp))) + if (!(h = test_server_create (0, server_cb, &tp))) BAIL_OUT ("test_server_create failed"); wait_for_server (); diff --git a/src/common/librouter/test/usock_epipe.c b/src/common/librouter/test/usock_epipe.c index d41e64b930c2..9872218e11b5 100644 --- a/src/common/librouter/test/usock_epipe.c +++ b/src/common/librouter/test/usock_epipe.c @@ -226,7 +226,7 @@ int main (int argc, char *argv[]) diag ("starting test server"); test_server_environment_init ("usock_server"); - if (!(h = test_server_create (server_cb, &tp))) + if (!(h = test_server_create (0, server_cb, &tp))) BAIL_OUT ("test_server_create failed"); test_send_and_exit (h, 1); diff --git a/src/common/libterminus/test/pty.c b/src/common/libterminus/test/pty.c index 2b7b859c6a87..5b2b9d2fde9d 100644 --- a/src/common/libterminus/test/pty.c +++ b/src/common/libterminus/test/pty.c @@ -166,7 +166,7 @@ static int pty_server (flux_t *h, void *arg) static void test_basic_protocol (void) { - flux_t *h = test_server_create (pty_server, NULL); + flux_t *h = test_server_create (0, pty_server, NULL); flux_future_t *f = NULL; flux_future_t *f_attach = NULL; @@ -385,7 +385,7 @@ static void pty_exit_cb (struct flux_pty_client *c, void *arg) static void test_client() { - flux_t *h = test_server_create (pty_server, NULL); + flux_t *h = test_server_create (0, pty_server, NULL); flux_future_t *f = NULL; int rc; int flags = FLUX_PTY_CLIENT_ATTACH_SYNC diff --git a/src/common/libterminus/test/terminus.c b/src/common/libterminus/test/terminus.c index 5a48bd4939dd..29d7846faf8a 100644 --- a/src/common/libterminus/test/terminus.c +++ b/src/common/libterminus/test/terminus.c @@ -90,7 +90,7 @@ static void test_kill_server_empty (void) { int rc; flux_future_t *f = NULL; - flux_t *h = test_server_create (terminus_server, NULL); + flux_t *h = test_server_create (0, terminus_server, NULL); /* kill-server */ @@ -121,7 +121,7 @@ static void test_protocol (void) int rc; json_t *o = NULL; flux_future_t *f = NULL; - flux_t *h = test_server_create (terminus_server, NULL); + flux_t *h = test_server_create (0, terminus_server, NULL); const char *service = NULL; const char *name = NULL; diff --git a/src/common/libtestutil/util.c b/src/common/libtestutil/util.c index adb35de5ae18..2c7589b7230c 100644 --- a/src/common/libtestutil/util.c +++ b/src/common/libtestutil/util.c @@ -122,11 +122,10 @@ static void test_server_destroy (struct test_server *a) } } -flux_t *test_server_create (test_server_f cb, void *arg) +flux_t *test_server_create (int cflags, test_server_f cb, void *arg) { int e; struct test_server *a; - int cflags = 0; // client connector flags int sflags = 0; // server connector flags if (!(a = calloc (1, sizeof (*a)))) diff --git a/src/common/libtestutil/util.h b/src/common/libtestutil/util.h index 94953d0908ba..26a313ff391e 100644 --- a/src/common/libtestutil/util.h +++ b/src/common/libtestutil/util.h @@ -30,7 +30,7 @@ */ typedef int (*test_server_f)(flux_t *h, void *arg); -flux_t *test_server_create (test_server_f cb, void *arg); +flux_t *test_server_create (int flags, test_server_f cb, void *arg); int test_server_stop (flux_t *c); From b1faf61bed1615a4b1364e088c7a31564a336577 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 6 Dec 2021 18:26:37 -0800 Subject: [PATCH 04/13] librouter: rename local.sub to event.subscribe Problem: the "routers" (e.g. local connector) use local.sub, local.unsub as the RPC topic strings for message subscription, while the broker uses something else. Standardize on one topic string so the same code works whether talking to the broker directly or through a router. Update the local and ssh connectors. --- src/common/librouter/router.c | 8 ++++---- src/common/librouter/test/router.c | 10 +++++----- src/connectors/local/local.c | 4 ++-- src/connectors/ssh/ssh.c | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/common/librouter/router.c b/src/common/librouter/router.c index b0ee6615bd16..1a51e91824e7 100644 --- a/src/common/librouter/router.c +++ b/src/common/librouter/router.c @@ -92,7 +92,7 @@ static void router_entry_respond_byuuid (const flux_msg_t *msg, router_entry_respond (entry, msg, errnum); } -/* Handle internal local.sub request. +/* Handle internal local subscribe request. */ static void local_sub_request (struct router_entry *entry, flux_msg_t *msg) { @@ -108,7 +108,7 @@ static void local_sub_request (struct router_entry *entry, flux_msg_t *msg) router_entry_respond (entry, msg, errno); } -/* Handle internal local.unsub request. +/* Handle internal local unsubscribe request. */ static void local_unsub_request (struct router_entry *entry, flux_msg_t *msg) { @@ -172,11 +172,11 @@ void router_entry_recv (struct router_entry *entry, flux_msg_t *msg) return; switch (type) { case FLUX_MSGTYPE_REQUEST: - if (!strcmp (topic, "local.sub")) { + if (!strcmp (topic, "event.subscribe")) { local_sub_request (entry, msg); break; } - if (!strcmp (topic, "local.unsub")) { + if (!strcmp (topic, "event.unsubscribe")) { local_unsub_request (entry, msg); break; } diff --git a/src/common/librouter/test/router.c b/src/common/librouter/test/router.c index c6929623abec..7db73b4c6f67 100644 --- a/src/common/librouter/test/router.c +++ b/src/common/librouter/test/router.c @@ -135,7 +135,7 @@ int basic_recv (const flux_msg_t *msg, void *arg) switch (type) { case FLUX_MSGTYPE_RESPONSE: - like (topic, "local.sub|local.unsub|service.add|service.remove|rtest.hello", + like (topic, "event.subscribe|event.unsubscribe|service.add|service.remove|rtest.hello", "router-entry: response is %s", topic); break; case FLUX_MSGTYPE_EVENT: @@ -192,11 +192,11 @@ void test_basic (flux_t *h) * - basic_recv() is called in the context of router_entry_recv() * in this case so don't start the reactor. */ - if (!(request = flux_request_encode ("local.sub", + if (!(request = flux_request_encode ("event.subscribe", "{\"topic\":\"rtest\"}"))) BAIL_OUT ("flux_request_encode failed"); router_entry_recv (entry, request); // router recives message from abcd - diag ("basic: sent local.sub request"); + diag ("basic: sent event.subscribe request"); flux_msg_destroy (request); /* Send an rtest.pub request from client. @@ -212,11 +212,11 @@ void test_basic (flux_t *h) /* Now unsubscribe to rtest events. */ - if (!(request = flux_request_encode ("local.unsub", + if (!(request = flux_request_encode ("event.unsubscribe", "{\"topic\":\"rtest\"}"))) BAIL_OUT ("flux_request_encode failed"); router_entry_recv (entry, request); // router recives message from abcd - diag ("basic: sent local.unsub request"); + diag ("basic: sent event.unsubscribe request"); flux_msg_destroy (request); /* Register testfu service. diff --git a/src/connectors/local/local.c b/src/connectors/local/local.c index 5f7243fd59cc..9c95c067207c 100644 --- a/src/connectors/local/local.c +++ b/src/connectors/local/local.c @@ -102,7 +102,7 @@ static int op_event_subscribe (void *impl, const char *topic) flux_future_t *f; if (!(f = flux_rpc_pack (ctx->h, - "local.sub", + "event.subscribe", FLUX_NODEID_ANY, 0, "{s:s}", @@ -122,7 +122,7 @@ static int op_event_unsubscribe (void *impl, const char *topic) flux_future_t *f; if (!(f = flux_rpc_pack (ctx->h, - "local.unsub", + "event.unsubscribe", FLUX_NODEID_ANY, 0, "{s:s}", diff --git a/src/connectors/ssh/ssh.c b/src/connectors/ssh/ssh.c index 0dd52fe9b9bb..75ef69597486 100644 --- a/src/connectors/ssh/ssh.c +++ b/src/connectors/ssh/ssh.c @@ -72,7 +72,7 @@ static int op_event_subscribe (void *impl, const char *topic) flux_future_t *f; int rc = 0; - if (!(f = flux_rpc_pack (ctx->h, "local.sub", FLUX_NODEID_ANY, 0, + if (!(f = flux_rpc_pack (ctx->h, "event.subscribe", FLUX_NODEID_ANY, 0, "{ s:s }", "topic", topic))) goto done; if (flux_future_get (f, NULL) < 0) @@ -89,7 +89,7 @@ static int op_event_unsubscribe (void *impl, const char *topic) flux_future_t *f; int rc = 0; - if (!(f = flux_rpc_pack (ctx->h, "local.unsub", FLUX_NODEID_ANY, 0, + if (!(f = flux_rpc_pack (ctx->h, "event.subscribe", FLUX_NODEID_ANY, 0, "{ s:s }", "topic", topic))) goto done; if (flux_future_get (f, NULL) < 0) From 6f4d090bf0891706a2d0ba48d338389bcf5e9671 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 6 Dec 2021 16:07:47 -0800 Subject: [PATCH 05/13] broker: register event service on all ranks Problem: the broker "event" service is only registered on rank 0 with only the "pub" method, but we should consolidate broker publishing and subscribing under one service and subscription will need to be on all ranks. Register the service on all ranks, and change client to always send the RPC to rank 0 rather than let the broker route it upstream as an unmatched service. Fail event.pub if received on a rank > 0. This is not likely as it is only used internally. The prototype for publisher_create() was modified to pass in the broker context, for access to the rank. This was not strictly necessary but a commit following this one will take advantage of that to move the subscribe code into this source module, and then it will be required. --- src/broker/broker.c | 5 ++--- src/broker/publisher.c | 20 ++++++++++++++------ src/broker/publisher.h | 6 +++++- src/common/libflux/event.c | 4 ++-- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 1467c43c17fe..312f7290ba0e 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -289,9 +289,8 @@ int main (int argc, char *argv[]) } /* Arrange for the publisher to route event messages. - * handle_event - local subscribers (ctx.h) */ - if (!(ctx.publisher = publisher_create (ctx.h, + if (!(ctx.publisher = publisher_create (&ctx, (publisher_send_f)handle_event, &ctx))) { log_err ("error setting up event publishing service"); @@ -1388,7 +1387,7 @@ static struct internal_service services[] = { { "content", NULL }, { "attr", NULL }, { "heaptrace", NULL }, - { "event", "[0]" }, + { "event", NULL }, { "service", NULL }, { "overlay", NULL }, { "config", NULL }, diff --git a/src/broker/publisher.c b/src/broker/publisher.c index c268bb8306aa..e69cc6de7cba 100644 --- a/src/broker/publisher.c +++ b/src/broker/publisher.c @@ -23,7 +23,7 @@ struct publisher { - flux_t *h; + struct broker *ctx; flux_msg_handler_t **handlers; int seq; zlist_t *senders; @@ -84,7 +84,7 @@ static flux_msg_t *encode_event (const char *topic, int flags, static void send_event (struct publisher *pub, const flux_msg_t *msg) { if (pub->send (pub->arg, msg) < 0) - flux_log_error (pub->h, "error publishing event message"); + flux_log_error (pub->ctx->h, "error publishing event message"); } void pub_cb (flux_t *h, flux_msg_handler_t *mh, @@ -96,12 +96,18 @@ void pub_cb (flux_t *h, flux_msg_handler_t *mh, int flags; struct flux_msg_cred cred; flux_msg_t *event = NULL; + const char *errmsg = NULL; if (flux_request_unpack (msg, NULL, "{s:s s:i s?:s}", "topic", &topic, "flags", &flags, "payload", &payload) < 0) goto error; + if (pub->ctx->rank > 0) { + errno = EPROTO; + errmsg = "this service is only available on rank 0"; + goto error; + } if ((flags & ~(FLUX_MSGFLAG_PRIVATE)) != 0) { errno = EPROTO; goto error; @@ -118,7 +124,7 @@ void pub_cb (flux_t *h, flux_msg_handler_t *mh, error_restore_seq: pub->seq--; error: - if (flux_respond_error (h, msg, errno, NULL) < 0) + if (flux_respond_error (h, msg, errno, errmsg) < 0) flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); flux_msg_destroy (event); } @@ -156,16 +162,18 @@ void publisher_destroy (struct publisher *pub) } } -struct publisher *publisher_create (flux_t *h, publisher_send_f cb, void *arg) +struct publisher *publisher_create (struct broker *ctx, + publisher_send_f cb, + void *arg) { struct publisher *pub; if (!(pub = calloc (1, sizeof (*pub)))) return NULL; - pub->h = h; + pub->ctx = ctx; pub->send = cb; pub->arg = arg; - if (flux_msg_handler_addvec (h, htab, pub, &pub->handlers) < 0) { + if (flux_msg_handler_addvec (ctx->h, htab, pub, &pub->handlers) < 0) { publisher_destroy (pub); return NULL; } diff --git a/src/broker/publisher.h b/src/broker/publisher.h index e9e04513b060..690d367b6a84 100644 --- a/src/broker/publisher.h +++ b/src/broker/publisher.h @@ -11,9 +11,13 @@ #ifndef _BROKER_PUBLISHER_H #define _BROKER_PUBLISHER_H +#include "broker.h" + typedef int (*publisher_send_f)(void *arg, const flux_msg_t *msg); -struct publisher *publisher_create (flux_t *h, publisher_send_f cb, void *arg); +struct publisher *publisher_create (struct broker *ctx, + publisher_send_f cb, + void *arg); void publisher_destroy (struct publisher *pub); /* Publish an encoded event message, assigning sequence number. diff --git a/src/common/libflux/event.c b/src/common/libflux/event.c index ebe3e9a8a005..f1b45b53d751 100644 --- a/src/common/libflux/event.c +++ b/src/common/libflux/event.c @@ -211,7 +211,7 @@ static flux_future_t *wrap_event_rpc (flux_t *h, errno = EPROTO; return NULL; } - if (!(f = flux_rpc_pack (h, "event.pub", FLUX_NODEID_ANY, 0, + if (!(f = flux_rpc_pack (h, "event.pub", 0, 0, "{s:s s:i s:s}", "topic", topic, "flags", flags, "payload", dst))) { @@ -223,7 +223,7 @@ static flux_future_t *wrap_event_rpc (flux_t *h, free (dst); } else { - if (!(f = flux_rpc_pack (h, "event.pub", FLUX_NODEID_ANY, 0, + if (!(f = flux_rpc_pack (h, "event.pub", 0, 0, "{s:s s:i}", "topic", topic, "flags", flags))) { return NULL; From 9d56e821d2f5888d1262a8a3e957e4e35996c318 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 6 Dec 2021 16:12:16 -0800 Subject: [PATCH 06/13] broker: rename broker.sub to event.subscribe Problem: broker has a unique service name for subscribe/unsubscribe, but we want to unify event subscription RPCs so they work the same in the broker and in "routers". Rename broker.sub to event.subscribe Rename broker.unsub to event.unsubscribe Update the shmem connector to use the new name. --- src/broker/broker.c | 4 ++-- src/connectors/shmem/shmem.c | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 312f7290ba0e..f4ed31550855 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1351,13 +1351,13 @@ static const struct flux_msg_handler_spec htab[] = { }, { FLUX_MSGTYPE_REQUEST, - "broker.sub", + "event.subscribe", broker_sub_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "broker.unsub", + "event.unsubscribe", broker_unsub_cb, 0 }, diff --git a/src/connectors/shmem/shmem.c b/src/connectors/shmem/shmem.c index b6cfafb03b0e..63d547543b4b 100644 --- a/src/connectors/shmem/shmem.c +++ b/src/connectors/shmem/shmem.c @@ -109,7 +109,7 @@ static int op_event_subscribe (void *impl, const char *topic) flux_future_t *f; int rc = -1; - if (!(f = flux_rpc_pack (ctx->h, "broker.sub", FLUX_NODEID_ANY, 0, + if (!(f = flux_rpc_pack (ctx->h, "event.subscribe", FLUX_NODEID_ANY, 0, "{ s:s }", "topic", topic))) goto done; if (flux_future_get (f, NULL) < 0) @@ -127,7 +127,7 @@ static int op_event_unsubscribe (void *impl, const char *topic) flux_future_t *f = NULL; int rc = -1; - if (!(f = flux_rpc_pack (ctx->h, "broker.unsub", FLUX_NODEID_ANY, 0, + if (!(f = flux_rpc_pack (ctx->h, "event.unsubscribe", FLUX_NODEID_ANY, 0, "{ s:s }", "topic", topic))) goto done; if (flux_future_get (f, NULL) < 0) From 0584f868eed6cd528505adab0ac8a14ae8ced802 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 6 Dec 2021 17:14:14 -0800 Subject: [PATCH 07/13] broker: rename event.pub to event.publish Problem: event.pub unnecessarily abbreviates "publish". Let's just use the full descriptive name event.publish. Update some tests that hardwired the name. --- src/broker/publisher.c | 8 +++++--- src/common/libflux/event.c | 4 ++-- t/lua/t0003-events.t | 26 +++++++++++++------------- t/t0004-event.t | 5 +++-- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/src/broker/publisher.c b/src/broker/publisher.c index e69cc6de7cba..e6e35da12d42 100644 --- a/src/broker/publisher.c +++ b/src/broker/publisher.c @@ -87,8 +87,10 @@ static void send_event (struct publisher *pub, const flux_msg_t *msg) flux_log_error (pub->ctx->h, "error publishing event message"); } -void pub_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) +static void publish_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) { struct publisher *pub = arg; const char *topic; @@ -148,7 +150,7 @@ int publisher_send (struct publisher *pub, const flux_msg_t *msg) } static const struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, "event.pub", pub_cb, FLUX_ROLE_USER }, + { FLUX_MSGTYPE_REQUEST, "event.publish", publish_cb, FLUX_ROLE_USER }, FLUX_MSGHANDLER_TABLE_END, }; diff --git a/src/common/libflux/event.c b/src/common/libflux/event.c index f1b45b53d751..108e668c907a 100644 --- a/src/common/libflux/event.c +++ b/src/common/libflux/event.c @@ -211,7 +211,7 @@ static flux_future_t *wrap_event_rpc (flux_t *h, errno = EPROTO; return NULL; } - if (!(f = flux_rpc_pack (h, "event.pub", 0, 0, + if (!(f = flux_rpc_pack (h, "event.publish", 0, 0, "{s:s s:i s:s}", "topic", topic, "flags", flags, "payload", dst))) { @@ -223,7 +223,7 @@ static flux_future_t *wrap_event_rpc (flux_t *h, free (dst); } else { - if (!(f = flux_rpc_pack (h, "event.pub", 0, 0, + if (!(f = flux_rpc_pack (h, "event.publish", 0, 0, "{s:s s:i}", "topic", topic, "flags", flags))) { return NULL; diff --git a/t/lua/t0003-events.t b/t/lua/t0003-events.t index c672c189b457..b83190037c6a 100755 --- a/t/lua/t0003-events.t +++ b/t/lua/t0003-events.t @@ -55,36 +55,36 @@ type_ok (msg, 'table', "recv_event: got msg as a table") is_deeply (msg, {}, "recv_event: got empty payload as expected") --- poke at event.pub service +-- poke at event.publish service -- good request, no payload local request = { topic = "foo", flags = 0 } -local response, err = f:rpc ("event.pub", request); -is (err, nil, "event.pub: works without payload") +local response, err = f:rpc ("event.publish", request); +is (err, nil, "event.publish: works without payload") -- good request, with raw payload local request = { topic = "foo", flags = 0, payload = "aGVsbG8gd29ybGQ=" } -local response, err = f:rpc ("event.pub", request); -is (err, nil, "event.pub: works with payload") +local response, err = f:rpc ("event.publish", request); +is (err, nil, "event.publish: works with payload") -- good request, with JSON "{}\0" local request = { topic = "foo", flags = 0, payload = "e30A" } -local response, err = f:rpc ("event.pub", request); -is (err, nil, "event.pub: works with json payload") +local response, err = f:rpc ("event.publish", request); +is (err, nil, "event.publish: works with json payload") -- flags missing from request local request = { topic = "foo" } -local response, err = f:rpc ("event.pub", request); -is (err, "Protocol error", "event.pub: no flags, fails with EPROTO") +local response, err = f:rpc ("event.publish", request); +is (err, "Protocol error", "event.publish: no flags, fails with EPROTO") -- mangled base64 payload local request = { topic = "foo", flags = 0, payload = "aGVsbG8gd29ybGQ%" } -local response, err = f:rpc ("event.pub", request); -is (err, "Protocol error", "event.pub: bad base64, fails with EPROTO") +local response, err = f:rpc ("event.publish", request); +is (err, "Protocol error", "event.publish: bad base64, fails with EPROTO") -- good request, mangled JSON payload "{\0" local request = { topic = "foo", flags = 4, payload = "ewA=" } -local response, err = f:rpc ("event.pub", request); -is (err, "Protocol error", "event.pub: bad json payload, fails with EPROTO") +local response, err = f:rpc ("event.publish", request); +is (err, "Protocol error", "event.publish: bad json payload, fails with EPROTO") done_testing () diff --git a/t/t0004-event.t b/t/t0004-event.t index 7522a111382d..bd438ff0d9d4 100755 --- a/t/t0004-event.t +++ b/t/t0004-event.t @@ -87,8 +87,9 @@ test_expect_success 'publish private event with no payload (synchronous,loopback run_timeout 5 flux event pub -p -s -l foo.bar ' -test_expect_success 'event.pub request with empty payload fails with EPROTO(71)' ' - ${RPC} event.pub 71 Date: Mon, 6 Dec 2021 17:58:55 -0800 Subject: [PATCH 08/13] broker: move subscribe to publisher.c Problem: publish and subscribe are spread across broker.c and publisher.c. Relocate message handlers to publisher.c, with improvements: - drop redundant EPROTO checks - don't respond if request message has NORESPONSE flag - log a more informative error if responding fails --- src/broker/broker.c | 66 ------------------------------------------ src/broker/publisher.c | 53 +++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 66 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index f4ed31550855..070198d5bc05 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1168,60 +1168,6 @@ static void broker_disconnect_cb (flux_t *h, flux_msg_handler_t *mh, /* no response */ } -static void broker_sub_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) -{ - broker_ctx_t *ctx = arg; - const char *uuid; - const char *topic; - - if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0) - goto error; - if (!(uuid = flux_msg_route_first (msg))) { - errno = EPROTO; - goto error; - } - if (!uuid) { - errno = EPROTO; - goto error; - } - if (module_subscribe (ctx->modhash, uuid, topic) < 0) - goto error; - if (flux_respond (h, msg, NULL) < 0) - flux_log_error (h, "%s: flux_respond", __FUNCTION__); - return; -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void broker_unsub_cb (flux_t *h, flux_msg_handler_t *mh, - const flux_msg_t *msg, void *arg) -{ - broker_ctx_t *ctx = arg; - const char *uuid; - const char *topic; - - if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0) - goto error; - if (!(uuid = flux_msg_route_first (msg))) { - errno = EPROTO; - goto error; - } - if (!uuid) { - errno = EPROTO; - goto error; - } - if (module_unsubscribe (ctx->modhash, uuid, topic) < 0) - goto error; - if (flux_respond (h, msg, NULL) < 0) - flux_log_error (h, "%s: flux_respond", __FUNCTION__); - return; -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - static int route_to_handle (const flux_msg_t *msg, void *arg) { broker_ctx_t *ctx = arg; @@ -1349,18 +1295,6 @@ static const struct flux_msg_handler_spec htab[] = { broker_disconnect_cb, 0 }, - { - FLUX_MSGTYPE_REQUEST, - "event.subscribe", - broker_sub_cb, - 0 - }, - { - FLUX_MSGTYPE_REQUEST, - "event.unsubscribe", - broker_unsub_cb, - 0 - }, { FLUX_MSGTYPE_REQUEST, "service.add", diff --git a/src/broker/publisher.c b/src/broker/publisher.c index e6e35da12d42..65b799a8877e 100644 --- a/src/broker/publisher.c +++ b/src/broker/publisher.c @@ -19,6 +19,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libccan/ccan/base64/base64.h" +#include "module.h" #include "publisher.h" @@ -149,8 +150,60 @@ int publisher_send (struct publisher *pub, const flux_msg_t *msg) return -1; } +static void subscribe_cb (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) +{ + struct publisher *pub = arg; + const char *uuid; + const char *topic; + + if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0) + goto error; + if (!(uuid = flux_msg_route_first (msg))) { + errno = EPROTO; + goto error; + } + if (module_subscribe (pub->ctx->modhash, uuid, topic) < 0) + goto error; + if (!flux_msg_is_noresponse (msg) + && flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to subscribe request"); + return; +error: + if (!flux_msg_is_noresponse (msg) + && flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "error responding to subscribe request"); +} + +static void unsubscribe_cb (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) +{ + struct publisher *pub = arg; + const char *uuid; + const char *topic; + + if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0) + goto error; + if (!(uuid = flux_msg_route_first (msg))) { + errno = EPROTO; + goto error; + } + if (module_unsubscribe (pub->ctx->modhash, uuid, topic) < 0) + goto error; + if (!flux_msg_is_noresponse (msg) + && flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to unsubscribe request"); + return; +error: + if (!flux_msg_is_noresponse (msg) + && flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "error responding to unsubscribe request"); +} + static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, "event.publish", publish_cb, FLUX_ROLE_USER }, + { FLUX_MSGTYPE_REQUEST, "event.subscribe", subscribe_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "event.unsubscribe", unsubscribe_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; From 17604cb9c29af552698549e5512de7ee9c19bfaf Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 6 Dec 2021 18:39:36 -0800 Subject: [PATCH 09/13] libflux: use RPC not connector->event_subscribe() Problem: connectors have event subscribe/unsubscribe "operations" from early times when 0MQ was the primary transport, but even in the one extant 0MQ-based connector (shmem), event subscriptions are handled with hidden synchronous RPCs. In addition to being unnecessary, the embedded RPCs are inflexible. If handled like other RPCs, then: - subscriptions can be made asynchronously, where appropriate - subscriptions can be made with FLUX_RPC_NORESPONSE, where appropriate - In the future, RPC tracking for automatic reconnect when a broker restarts could track subscription RPCs like any other Drop the connector event_subscribe() and event_unsubscribe() operations and reimplement flux_event_subscribe() and flux_event_unsubscribe() as "normal" RPCS instead. Add flux_event_subscribe_ex() and flux_event_unsubscribe_ex() that return a future, and can accept RPC flags (like FLUX_RPC_NORESPONSE). Convert flux_sync_create() to use flux_event_subscribe_ex () with FLUX_RPC_NORESPONSE to avoid deadlock in the broker when the synchronous subscribe RPC will be handled in the same thread as the requestor. Reorder some broker initialization so that the event.subscribe handler is registered before flux_sync_create() is called. Augment broker event.(un)subscribe RPC handlers to deal with messages sent from the broker to itself, now that the handle operations are no longer available. Add FLUX_O_TEST_NOSUB flag for turning flux_event_subscribe() and flux_event_unsubscribe() into no-ops, useful for loop:// testing where RPC handlers are not available. Set FLUX_O_TEST_NOSUB flag on test server in librouter/router test. Set FLUX_O_TEST_NOSUB flag on loop:// in flux-shell (standalone mode). --- src/broker/broker.c | 52 ++++++------------------- src/broker/overlay.c | 13 +++++-- src/broker/overlay.h | 6 +++ src/broker/publisher.c | 51 +++++++++++++++++++----- src/common/libflux/connector.h | 3 -- src/common/libflux/event.c | 62 ++++++++++++++++++++++++++++++ src/common/libflux/event.h | 11 ++++++ src/common/libflux/handle.c | 26 ------------- src/common/libflux/handle.h | 14 +++---- src/common/libflux/sync.c | 12 +++++- src/common/librouter/test/router.c | 2 +- src/common/libtestutil/util.c | 4 -- src/connectors/local/local.c | 42 -------------------- src/connectors/shmem/shmem.c | 39 ------------------- src/connectors/ssh/ssh.c | 36 ----------------- src/shell/shell.c | 6 ++- 16 files changed, 163 insertions(+), 216 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 070198d5bc05..d0572215a222 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -357,13 +357,6 @@ int main (int argc, char *argv[]) goto cleanup; } - /* Create content cache. - */ - if (!(ctx.cache = content_cache_create (ctx.h, ctx.attrs))) { - log_err ("content_cache_create"); - goto cleanup; - } - if (ctx.verbose) { const char *parent = overlay_get_parent_uri (ctx.overlay); const char *child = overlay_get_bind_uri (ctx.overlay); @@ -428,6 +421,18 @@ int main (int argc, char *argv[]) log_err ("broker_add_services"); goto cleanup; } + /* These two broker-resident services call flux_sync_create(), thus + * require event.subscribe to have a handler before running. + */ + if (overlay_keepalive_start (ctx.overlay) < 0) { + log_err ("error initializing overlay keepalives"); + goto cleanup; + } + if (!(ctx.cache = content_cache_create (ctx.h, ctx.attrs))) { + log_err ("error initializing content cache"); + goto cleanup; + } + /* Initialize module infrastructure. */ @@ -1788,41 +1793,8 @@ static int broker_send (void *impl, const flux_msg_t *msg, int flags) return rc; } -static int broker_subscribe (void *impl, const char *topic) -{ - broker_ctx_t *ctx = impl; - char *cpy = NULL; - - if (!(cpy = strdup (topic))) - goto nomem; - if (zlist_append (ctx->subscriptions, cpy) < 0) - goto nomem; - zlist_freefn (ctx->subscriptions, cpy, free, true); - return 0; -nomem: - free (cpy); - errno = ENOMEM; - return -1; -} - -static int broker_unsubscribe (void *impl, const char *topic) -{ - broker_ctx_t *ctx = impl; - char *s = zlist_first (ctx->subscriptions); - while (s) { - if (!strcmp (s, topic)) { - zlist_remove (ctx->subscriptions, s); - break; - } - s = zlist_next (ctx->subscriptions); - } - return 0; -} - static const struct flux_handle_ops broker_handle_ops = { .send = broker_send, - .event_subscribe = broker_subscribe, - .event_unsubscribe = broker_unsubscribe, }; diff --git a/src/broker/overlay.c b/src/broker/overlay.c index 0ffe1b94abed..e04f20a306ac 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -662,6 +662,16 @@ static void sync_cb (flux_future_t *f, void *arg) flux_future_reset (f); } +int overlay_keepalive_start (struct overlay *ov) +{ + if (!ov->f_sync) { + if (!(ov->f_sync = flux_sync_create (ov->h, sync_min)) + || flux_future_then (ov->f_sync, sync_max, sync_cb, ov) < 0) + return -1; + } + return 0; +} + const char *overlay_get_bind_uri (struct overlay *ov) { return ov->bind_uri; @@ -1772,9 +1782,6 @@ struct overlay *overlay_create (flux_t *h, goto error; if (flux_msg_handler_addvec (h, htab, ov, &ov->handlers) < 0) goto error; - if (!(ov->f_sync = flux_sync_create (h, sync_min)) - || flux_future_then (ov->f_sync, sync_max, sync_cb, ov) < 0) - goto error; if (!(ov->cert = zcert_new ())) goto nomem; if (!(ov->health_requests = flux_msglist_create ())) diff --git a/src/broker/overlay.h b/src/broker/overlay.h index 285fa6708718..aada3d333321 100644 --- a/src/broker/overlay.h +++ b/src/broker/overlay.h @@ -43,6 +43,12 @@ struct overlay *overlay_create (flux_t *h, void *arg); void overlay_destroy (struct overlay *ov); +/* Start sending keepalive messages to parent and monitoring peers. + * This registers a sync callback, and will fail if event.subscribe + * doesn't have a handler yet. + */ +int overlay_keepalive_start (struct overlay *ov); + /* Set the overlay network size and rank of this broker. */ int overlay_set_geometry (struct overlay *ov, uint32_t size, uint32_t rank); diff --git a/src/broker/publisher.c b/src/broker/publisher.c index 65b799a8877e..da4794f7a14f 100644 --- a/src/broker/publisher.c +++ b/src/broker/publisher.c @@ -150,6 +150,35 @@ int publisher_send (struct publisher *pub, const flux_msg_t *msg) return -1; } +static int broker_subscribe (struct broker *ctx, const char *topic) +{ + char *cpy; + + if (!(cpy = strdup (topic))) + return -1; + if (zlist_append (ctx->subscriptions, cpy) < 0) + goto nomem; + zlist_freefn (ctx->subscriptions, cpy, free, true); + return 0; +nomem: + free (cpy); + errno = ENOMEM; + return -1; +} + +static void broker_unsubscribe (struct broker *ctx, const char *topic) +{ + char *s = zlist_first (ctx->subscriptions); + while (s) { + if (!strcmp (s, topic)) { + zlist_remove (ctx->subscriptions, s); + break; + } + s = zlist_next (ctx->subscriptions); + } +} + + static void subscribe_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { @@ -159,12 +188,14 @@ static void subscribe_cb (flux_t *h, flux_msg_handler_t *mh, if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0) goto error; - if (!(uuid = flux_msg_route_first (msg))) { - errno = EPROTO; - goto error; + if ((uuid = flux_msg_route_first (msg))) { + if (module_subscribe (pub->ctx->modhash, uuid, topic) < 0) + goto error; + } + else { + if (broker_subscribe (pub->ctx, topic) < 0) + goto error; } - if (module_subscribe (pub->ctx->modhash, uuid, topic) < 0) - goto error; if (!flux_msg_is_noresponse (msg) && flux_respond (h, msg, NULL) < 0) flux_log_error (h, "error responding to subscribe request"); @@ -184,12 +215,12 @@ static void unsubscribe_cb (flux_t *h, flux_msg_handler_t *mh, if (flux_request_unpack (msg, NULL, "{ s:s }", "topic", &topic) < 0) goto error; - if (!(uuid = flux_msg_route_first (msg))) { - errno = EPROTO; - goto error; + if ((uuid = flux_msg_route_first (msg))) { + if (module_unsubscribe (pub->ctx->modhash, uuid, topic) < 0) + goto error; } - if (module_unsubscribe (pub->ctx->modhash, uuid, topic) < 0) - goto error; + else + broker_unsubscribe (pub->ctx, topic); if (!flux_msg_is_noresponse (msg) && flux_respond (h, msg, NULL) < 0) flux_log_error (h, "error responding to unsubscribe request"); diff --git a/src/common/libflux/connector.h b/src/common/libflux/connector.h index b72b5d43b295..09d8efadf68a 100644 --- a/src/common/libflux/connector.h +++ b/src/common/libflux/connector.h @@ -31,9 +31,6 @@ struct flux_handle_ops { int (*send)(void *impl, const flux_msg_t *msg, int flags); flux_msg_t* (*recv)(void *impl, int flags); - int (*event_subscribe)(void *impl, const char *topic); - int (*event_unsubscribe)(void *impl, const char *topic); - void (*impl_destroy)(void *impl); }; diff --git a/src/common/libflux/event.c b/src/common/libflux/event.c index 108e668c907a..c04fa158b7af 100644 --- a/src/common/libflux/event.c +++ b/src/common/libflux/event.c @@ -22,6 +22,68 @@ #include "rpc.h" #include "message.h" +flux_future_t *flux_event_subscribe_ex (flux_t *h, + const char *topic, + int flags) +{ + if (!topic) { + errno = EINVAL; + return NULL; + } + return flux_rpc_pack (h, + "event.subscribe", + FLUX_NODEID_ANY, + flags, + "{s:s}", + "topic", topic); +} + +int flux_event_subscribe (flux_t *h, const char *topic) +{ + flux_future_t *f; + + if (h && (flux_flags_get (h) & FLUX_O_TEST_NOSUB)) + return 0; + if (!(f = flux_event_subscribe_ex (h, topic, 0)) + || flux_future_get (f, NULL) < 0) { + flux_future_destroy (f); + return -1; + } + flux_future_destroy (f); + return 0; +} + +flux_future_t *flux_event_unsubscribe_ex (flux_t *h, + const char *topic, + int flags) +{ + if (!topic) { + errno = EINVAL; + return NULL; + } + return flux_rpc_pack (h, + "event.unsubscribe", + FLUX_NODEID_ANY, + flags, + "{s:s}", + "topic", topic); +} + +int flux_event_unsubscribe (flux_t *h, const char *topic) +{ + flux_future_t *f; + + if (h && (flux_flags_get (h) & FLUX_O_TEST_NOSUB)) + return 0; + if (!(f = flux_event_unsubscribe_ex (h, topic, 0)) + || flux_future_get (f, NULL) < 0) { + flux_future_destroy (f); + return -1; + } + flux_future_destroy (f); + return 0; +} + static int event_decode (const flux_msg_t *msg, const char **topic) { int type; diff --git a/src/common/libflux/event.h b/src/common/libflux/event.h index 57072496e92d..613fce31fc49 100644 --- a/src/common/libflux/event.h +++ b/src/common/libflux/event.h @@ -22,6 +22,17 @@ enum event_flags { FLUX_EVENT_PRIVATE = 1, }; +/* Event subscribe/unsubscribe. + */ +int flux_event_subscribe (flux_t *h, const char *topic); +int flux_event_unsubscribe (flux_t *h, const char *topic); +flux_future_t *flux_event_subscribe_ex (flux_t *h, + const char *topic, + int flags); +flux_future_t *flux_event_unsubscribe_ex (flux_t *h, + const char *topic, + int flags); + /* Decode an event message with optional string payload. * If topic is non-NULL, assign the event topic string. * If s is non-NULL, assign string payload or set to NULL if none diff --git a/src/common/libflux/handle.c b/src/common/libflux/handle.c index d113d30c213d..f43003e8478c 100644 --- a/src/common/libflux/handle.c +++ b/src/common/libflux/handle.c @@ -741,32 +741,6 @@ int flux_requeue (flux_t *h, const flux_msg_t *msg, int flags) return -1; } -int flux_event_subscribe (flux_t *h, const char *topic) -{ - h = lookup_clone_ancestor (h); - if (h->ops->event_subscribe) { - if (h->ops->event_subscribe (h->impl, topic) < 0) - goto fatal; - } - return 0; -fatal: - FLUX_FATAL (h); - return -1; -} - -int flux_event_unsubscribe (flux_t *h, const char *topic) -{ - h = lookup_clone_ancestor (h); - if (h->ops->event_unsubscribe) { - if (h->ops->event_unsubscribe (h->impl, topic) < 0) - goto fatal; - } - return 0; -fatal: - FLUX_FATAL (h); - return -1; -} - int flux_pollfd (flux_t *h) { h = lookup_clone_ancestor (h); diff --git a/src/common/libflux/handle.h b/src/common/libflux/handle.h index 3485a3ac0ac6..98ff02a5a3f3 100644 --- a/src/common/libflux/handle.h +++ b/src/common/libflux/handle.h @@ -40,10 +40,11 @@ typedef void (*flux_fatal_f)(const char *msg, void *arg); /* Flags for handle creation and flux_flags_set()/flux_flags_unset. */ enum { - FLUX_O_TRACE = 1, /* send message trace to stderr */ - FLUX_O_CLONE = 2, /* handle was created with flux_clone() */ - FLUX_O_NONBLOCK = 4,/* handle should not block on send/recv */ - FLUX_O_MATCHDEBUG = 8,/* enable matchtag debugging */ + FLUX_O_TRACE = 1, /* send message trace to stderr */ + FLUX_O_CLONE = 2, /* handle was created with flux_clone() */ + FLUX_O_NONBLOCK = 4, /* handle should not block on send/recv */ + FLUX_O_MATCHDEBUG = 8, /* enable matchtag debugging */ + FLUX_O_TEST_NOSUB = 16, /* for testing: make (un)subscribe a no-op */ }; /* Flags for flux_requeue(). @@ -174,11 +175,6 @@ int flux_pollevents (flux_t *h); */ int flux_pollfd (flux_t *h); -/* Event subscribe/unsubscribe. - */ -int flux_event_subscribe (flux_t *h, const char *topic); -int flux_event_unsubscribe (flux_t *h, const char *topic); - /* Get/clear handle message counters. */ void flux_get_msgcounters (flux_t *h, flux_msgcounters_t *mcs); diff --git a/src/common/libflux/sync.c b/src/common/libflux/sync.c index 2c58e6b32173..3958d123ec6c 100644 --- a/src/common/libflux/sync.c +++ b/src/common/libflux/sync.c @@ -31,7 +31,11 @@ static void sync_destroy (struct flux_sync *sync) { if (sync) { int saved_errno = errno; - (void)flux_event_unsubscribe (sync->h, "heartbeat.pulse"); + flux_future_t *f; + f = flux_event_unsubscribe_ex (sync->h, + "heartbeat.pulse", + FLUX_RPC_NORESPONSE); + flux_future_destroy (f); free (sync); errno = saved_errno; } @@ -40,13 +44,17 @@ static void sync_destroy (struct flux_sync *sync) static struct flux_sync *sync_create (flux_t *h, double minimum) { struct flux_sync *sync; + flux_future_t *f; if (!(sync = calloc (1, sizeof (*sync)))) return NULL; sync->h = h; sync->minimum = minimum; - if (flux_event_subscribe (h, "heartbeat.pulse") < 0) + if (!(f = flux_event_subscribe_ex (sync->h, + "heartbeat.pulse", + FLUX_RPC_NORESPONSE))) goto error; + flux_future_destroy (f); return sync; error: sync_destroy (sync); diff --git a/src/common/librouter/test/router.c b/src/common/librouter/test/router.c index 7db73b4c6f67..7a954f3169b5 100644 --- a/src/common/librouter/test/router.c +++ b/src/common/librouter/test/router.c @@ -266,7 +266,7 @@ int main (int argc, char *argv[]) diag ("starting test server"); test_server_environment_init ("test_router"); - if (!(h = test_server_create (0, server_cb, NULL))) + if (!(h = test_server_create (FLUX_O_TEST_NOSUB, server_cb, NULL))) BAIL_OUT ("test_server_create failed"); test_basic (h); diff --git a/src/common/libtestutil/util.c b/src/common/libtestutil/util.c index 2c7589b7230c..5915476920a7 100644 --- a/src/common/libtestutil/util.c +++ b/src/common/libtestutil/util.c @@ -285,8 +285,6 @@ static const struct flux_handle_ops handle_ops = { .recv = test_connector_recv, .getopt = NULL, .setopt = NULL, - .event_subscribe = NULL, - .event_unsubscribe = NULL, .impl_destroy = test_connector_fini, }; @@ -464,8 +462,6 @@ static const struct flux_handle_ops loopback_ops = { .recv = loopback_connector_recv, .getopt = loopback_connector_getopt, .setopt = loopback_connector_setopt, - .event_subscribe = NULL, - .event_unsubscribe = NULL, .impl_destroy = loopback_connector_fini, }; diff --git a/src/connectors/local/local.c b/src/connectors/local/local.c index 9c95c067207c..bd26329a4623 100644 --- a/src/connectors/local/local.c +++ b/src/connectors/local/local.c @@ -96,46 +96,6 @@ static flux_msg_t *op_recv (void *impl, int flags) return usock_client_recv (ctx->uclient, flags); } -static int op_event_subscribe (void *impl, const char *topic) -{ - struct local_connector *ctx = impl; - flux_future_t *f; - - if (!(f = flux_rpc_pack (ctx->h, - "event.subscribe", - FLUX_NODEID_ANY, - 0, - "{s:s}", - "topic", topic))) - return -1; - if (flux_future_get (f, NULL) < 0) { - flux_future_destroy (f); - return -1; - } - flux_future_destroy (f); - return 0; -} - -static int op_event_unsubscribe (void *impl, const char *topic) -{ - struct local_connector *ctx = impl; - flux_future_t *f; - - if (!(f = flux_rpc_pack (ctx->h, - "event.unsubscribe", - FLUX_NODEID_ANY, - 0, - "{s:s}", - "topic", topic))) - return -1; - if (flux_future_get (f, NULL) < 0) { - flux_future_destroy (f); - return -1; - } - flux_future_destroy (f); - return 0; -} - static int op_setopt (void *impl, const char *option, const void *val, size_t size) { @@ -262,8 +222,6 @@ static const struct flux_handle_ops handle_ops = { .pollevents = op_pollevents, .send = op_send, .recv = op_recv, - .event_subscribe = op_event_subscribe, - .event_unsubscribe = op_event_unsubscribe, .setopt = op_setopt, .getopt = op_getopt, .impl_destroy = op_fini, diff --git a/src/connectors/shmem/shmem.c b/src/connectors/shmem/shmem.c index 63d547543b4b..f9fa92419a38 100644 --- a/src/connectors/shmem/shmem.c +++ b/src/connectors/shmem/shmem.c @@ -102,43 +102,6 @@ static flux_msg_t *op_recv (void *impl, int flags) return msg; } -static int op_event_subscribe (void *impl, const char *topic) -{ - shmem_ctx_t *ctx = impl; - assert (ctx->magic == MODHANDLE_MAGIC); - flux_future_t *f; - int rc = -1; - - if (!(f = flux_rpc_pack (ctx->h, "event.subscribe", FLUX_NODEID_ANY, 0, - "{ s:s }", "topic", topic))) - goto done; - if (flux_future_get (f, NULL) < 0) - goto done; - rc = 0; -done: - flux_future_destroy (f); - return rc; -} - -static int op_event_unsubscribe (void *impl, const char *topic) -{ - shmem_ctx_t *ctx = impl; - assert (ctx->magic == MODHANDLE_MAGIC); - flux_future_t *f = NULL; - int rc = -1; - - if (!(f = flux_rpc_pack (ctx->h, "event.unsubscribe", FLUX_NODEID_ANY, 0, - "{ s:s }", "topic", topic))) - goto done; - if (flux_future_get (f, NULL) < 0) - goto done; - rc = 0; -done: - flux_future_destroy (f); - return rc; -} - - static void op_fini (void *impl) { shmem_ctx_t *ctx = impl; @@ -224,8 +187,6 @@ static const struct flux_handle_ops handle_ops = { .recv = op_recv, .getopt = NULL, .setopt = NULL, - .event_subscribe = op_event_subscribe, - .event_unsubscribe = op_event_unsubscribe, .impl_destroy = op_fini, }; diff --git a/src/connectors/ssh/ssh.c b/src/connectors/ssh/ssh.c index 75ef69597486..4c96f9bf3ddc 100644 --- a/src/connectors/ssh/ssh.c +++ b/src/connectors/ssh/ssh.c @@ -66,40 +66,6 @@ static flux_msg_t *op_recv (void *impl, int flags) return usock_client_recv (ctx->uclient, flags); } -static int op_event_subscribe (void *impl, const char *topic) -{ - struct ssh_connector *ctx = impl; - flux_future_t *f; - int rc = 0; - - if (!(f = flux_rpc_pack (ctx->h, "event.subscribe", FLUX_NODEID_ANY, 0, - "{ s:s }", "topic", topic))) - goto done; - if (flux_future_get (f, NULL) < 0) - goto done; - rc = 0; -done: - flux_future_destroy (f); - return rc; -} - -static int op_event_unsubscribe (void *impl, const char *topic) -{ - struct ssh_connector *ctx = impl; - flux_future_t *f; - int rc = 0; - - if (!(f = flux_rpc_pack (ctx->h, "event.subscribe", FLUX_NODEID_ANY, 0, - "{ s:s }", "topic", topic))) - goto done; - if (flux_future_get (f, NULL) < 0) - goto done; - rc = 0; -done: - flux_future_destroy (f); - return rc; -} - static void op_fini (void *impl) { struct ssh_connector *ctx = impl; @@ -314,8 +280,6 @@ static const struct flux_handle_ops handle_ops = { .pollevents = op_pollevents, .send = op_send, .recv = op_recv, - .event_subscribe = op_event_subscribe, - .event_unsubscribe = op_event_unsubscribe, .impl_destroy = op_fini, }; diff --git a/src/shell/shell.c b/src/shell/shell.c index 730a8264bcb6..95a2668867a7 100644 --- a/src/shell/shell.c +++ b/src/shell/shell.c @@ -222,7 +222,11 @@ static void shell_parse_cmdline (flux_shell_t *shell, int argc, char *argv[]) static void shell_connect_flux (flux_shell_t *shell) { - if (!(shell->h = flux_open (shell->standalone ? "loop://" : NULL, 0))) + if (shell->standalone) + shell->h = flux_open ("loop://", FLUX_O_TEST_NOSUB); + else + shell->h = flux_open (NULL, 0); + if (!shell->h) shell_die_errno (1, "flux_open"); /* Set reactor for flux handle to our custom created reactor. From bb3ae24742faf67e2657126efc48e1da73a51132 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 7 Dec 2021 11:03:52 -0800 Subject: [PATCH 10/13] flux_open(3): add FLUX_O_CLONE flag Problem: flux_open(3) lists the valid flags, but omits FLUX_O_CLONE. It is intended for internal use but should be included to make the list complete. --- doc/man3/flux_open.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/man3/flux_open.rst b/doc/man3/flux_open.rst index b6482a9e7122..76a451fa3ebb 100644 --- a/doc/man3/flux_open.rst +++ b/doc/man3/flux_open.rst @@ -31,6 +31,9 @@ the value of $FLUX_URI is used, if set. FLUX_O_TRACE Dumps message trace to stderr. +FLUX_O_CLONE + Used internally by ``flux_clone()`` (see below). + FLUX_O_MATCHDEBUG Prints diagnostic to stderr when matchtags are leaked, for example when a streaming RPC is destroyed without receiving a error response as From c69f7cae79759334788faa5229ad7a157ec3bb15 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 7 Dec 2021 11:12:31 -0800 Subject: [PATCH 11/13] flux_open(3): add FLUX_O_TEST_NOSUB flag Problem: FLUX_O_TEST_NOSUB flag is not documented. Add a brief description of this new flag. --- doc/man3/flux_open.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/doc/man3/flux_open.rst b/doc/man3/flux_open.rst index 76a451fa3ebb..ad58692da936 100644 --- a/doc/man3/flux_open.rst +++ b/doc/man3/flux_open.rst @@ -42,6 +42,11 @@ FLUX_O_MATCHDEBUG FLUX_O_NONBLOCK The ``flux_send()`` and ``flux_recv()`` functions should never block. +FLUX_O_TEST_NOSUB + Make ``flux_event_subscribe()` and ``flux_event_unsubscribe()`` no-ops. + This may be useful in specialized situations with the ``loop://`` connector, + where no message handler is available to service subscription RPCs. + ``flux_clone()`` creates another reference to a ``flux_t`` handle that is identical to the original in all respects except that it does not inherit a copy of the original handle's "aux" hash, or its reactor and message From 87ede8a2a852c08b2454e29bb18c31a58e33bc43 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 7 Dec 2021 14:19:21 -0800 Subject: [PATCH 12/13] testsuite: refactor event unit test Problem: event unit test just has a main() function. Put codec testing (all we have now) in its own function, making it easier to add more functions to enhance testing. --- src/common/libflux/test/event.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/common/libflux/test/event.c b/src/common/libflux/test/event.c index 6e5d1ee5e32c..2ef7d0b0f6f9 100644 --- a/src/common/libflux/test/event.c +++ b/src/common/libflux/test/event.c @@ -15,7 +15,7 @@ #include "event.h" #include "src/common/libtap/tap.h" -int main (int argc, char *argv[]) +void test_codec (void) { flux_msg_t *msg; const char *topic, *s; @@ -26,8 +26,6 @@ int main (int argc, char *argv[]) int l; int i; - plan (NO_PLAN); - /* no topic is an error */ errno = 0; ok ((msg = flux_event_encode (NULL, json_str)) == NULL && errno == EINVAL, @@ -96,6 +94,13 @@ int main (int argc, char *argv[]) ok (flux_event_decode_raw (msg, NULL, &d, NULL) < 0 && errno == EINVAL, "flux_event_decode_raw len=NULL fails with EINVAL"); flux_msg_destroy (msg); +} + +int main (int argc, char *argv[]) +{ + plan (NO_PLAN); + + test_codec (); done_testing(); return (0); From 4d8fe7511525c587234f9f025549af7297cf176e Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 7 Dec 2021 15:25:00 -0800 Subject: [PATCH 13/13] testsuite: add test coverage for (un)subscribe Problem: there is no test coverage of event subscribe/unsubscribe functions and RPCs. Add unit tests for these functions. --- src/common/libflux/test/event.c | 202 +++++++++++++++++++++++++++++++- 1 file changed, 200 insertions(+), 2 deletions(-) diff --git a/src/common/libflux/test/event.c b/src/common/libflux/test/event.c index 2ef7d0b0f6f9..389aac57cde8 100644 --- a/src/common/libflux/test/event.c +++ b/src/common/libflux/test/event.c @@ -11,9 +11,9 @@ #if HAVE_CONFIG_H #include "config.h" #endif -#include "message.h" -#include "event.h" +#include #include "src/common/libtap/tap.h" +#include "src/common/libtestutil/util.h" void test_codec (void) { @@ -96,11 +96,209 @@ void test_codec (void) flux_msg_destroy (msg); } +void test_subscribe_badparams (void) +{ + flux_t *h; + + if (!(h = loopback_create (0))) + BAIL_OUT ("loopback_create failed"); + + errno = 0; + ok (flux_event_subscribe_ex (NULL, "foo", 0) == NULL && errno == EINVAL, + "flux_event_subscribe_ex h=NULL fails with EINVAL"); + errno = 0; + ok (flux_event_subscribe_ex (h, NULL, 0) == NULL && errno == EINVAL, + "flux_event_subscribe_ex topic=NULL fails with EINVAL"); + errno = 0; + ok (flux_event_subscribe_ex (h, "foo", -1) == NULL && errno == EINVAL, + "flux_event_subscribe_ex flags=-1 fails with EINVAL"); + + errno = 0; + ok (flux_event_unsubscribe_ex (NULL, "foo", 0) == NULL && errno == EINVAL, + "flux_event_unsubscribe_ex h=NULL fails with EINVAL"); + errno = 0; + ok (flux_event_unsubscribe_ex (h, NULL, 0) == NULL && errno == EINVAL, + "flux_event_unsubscribe_ex topic=NULL fails with EINVAL"); + errno = 0; + ok (flux_event_unsubscribe_ex (h, "foo", -1) == NULL && errno == EINVAL, + "flux_event_unsubscribe_ex flags=-1 fails with EINVAL"); + + errno = 0; + ok (flux_event_subscribe (NULL, "foo") < 0 && errno == EINVAL, + "flux_event_subscribe h=NULL fails with EINVAL"); + errno = 0; + ok (flux_event_subscribe (h, NULL) < 0 && errno == EINVAL, + "flux_event_subscribe topic=NULL fails with EINVAL"); + + errno = 0; + ok (flux_event_unsubscribe (NULL, "foo") < 0 && errno == EINVAL, + "flux_event_unsubscribe h=NULL fails with EINVAL"); + errno = 0; + ok (flux_event_unsubscribe (h, NULL) < 0 && errno == EINVAL, + "flux_event_unsubscribe topic=NULL fails with EINVAL"); + + flux_close (h); +} + +bool fake_failure; + +void subscribe_cb (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) +{ + const char *topic = NULL; + + if (flux_request_unpack (msg, NULL, "{s:s}", "topic", &topic) < 0) + goto error; + diag ("subscribe %s", topic); + if (fake_failure) { + errno = EIO; + fake_failure = false; + goto error; + } + if (!flux_msg_is_noresponse (msg) + && flux_respond (h, msg, NULL) < 0) + diag ("error responding to subscribe request"); + return; +error: + if (!flux_msg_is_noresponse (msg) + && flux_respond_error (h, msg, errno, NULL) < 0) + diag ("error responding to subscribe request: %s", strerror (errno)); +} + +void unsubscribe_cb (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) +{ + const char *topic = NULL; + + if (flux_request_unpack (msg, NULL, "{s:s}", "topic", &topic) < 0) + goto error; + diag ("unsubscribe %s", topic); + if (fake_failure) { + errno = EIO; + fake_failure = false; + goto error; + } + if (!flux_msg_is_noresponse (msg) + && flux_respond (h, msg, NULL) < 0) + diag ("error responding to unsubscribe request"); + return; +error: + if (!flux_msg_is_noresponse (msg) + && flux_respond_error (h, msg, errno, NULL) < 0) + diag ("error responding to unsubscribe request: %s", strerror (errno)); +} + +const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "event.subscribe", subscribe_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "event.unsubscribe", unsubscribe_cb, 0 }, + FLUX_MSGHANDLER_TABLE_END, +}; + +int test_server (flux_t *h, void *arg) +{ + flux_msg_handler_t **handlers = NULL; + int rc = -1; + + if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0) { + diag ("flux_msg_handler_addvec failed"); + return -1; + } + if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { + diag ("flux_reactor_run failed"); + goto done; + } + rc = 0; +done: + flux_msg_handler_delvec (handlers); + return rc; +} + +void test_subscribe_rpc (void) +{ + flux_t *h; + flux_future_t *f; + + if (!(h = test_server_create (0, test_server, NULL))) + BAIL_OUT ("test_server_create: %s", strerror (errno)); + + ok (flux_event_subscribe (h, "fubar") == 0, + "flux_event_subscribe topic=FUBAR works"); + + ok (flux_event_unsubscribe (h, "fubar") == 0, + "flux_event_unsubscribe topic=FUBAR works"); + + fake_failure = true; + errno = 0; + ok (flux_event_subscribe (h, "fubar") < 0 && errno == EIO, + "flux_event_subscribe failure works"); + + fake_failure = true; + errno = 0; + ok (flux_event_unsubscribe (h, "fubar") < 0 && errno == EIO, + "flux_event_unsubscribe failure works"); + + f = flux_event_subscribe_ex (h, "fubar", FLUX_RPC_NORESPONSE); + ok (f != NULL, + "flux_event_subscribe_ex flags=FLUX_RPC_NORESPONSE works"); + flux_future_destroy (f); + + f = flux_event_unsubscribe_ex (h, "fubar", FLUX_RPC_NORESPONSE); + ok (f != NULL, + "flux_event_unsubscribe_ex flags=FLUX_RPC_NORESPONSE works"); + flux_future_destroy (f); + + f = flux_event_subscribe_ex (h, "fubar", 0); + ok (f && flux_future_get (f, NULL) == 0, + "flux_event_subscribe_ex works"); + flux_future_destroy (f); + + f = flux_event_unsubscribe_ex (h, "fubar", 0); + ok (f && flux_future_get (f, NULL) == 0, + "flux_event_unsubscribe_ex works"); + flux_future_destroy (f); + + fake_failure = true; + errno = 0; + f = flux_event_subscribe_ex (h, "fubar", 0); + ok (f && flux_future_get (f, NULL) < 0 && errno == EIO, + "flux_event_subscribe_ex failure works"); + flux_future_destroy (f); + + fake_failure = true; + errno = 0; + f = flux_event_unsubscribe_ex (h, "fubar", 0); + ok (f && flux_future_get (f, NULL) < 0 && errno == EIO, + "flux_event_unsubscribe_ex failure works"); + flux_future_destroy (f); + + if (test_server_stop (h) < 0) + BAIL_OUT ("error stopping test server: %s", strerror (errno)); + flux_close (h); +} + +void test_subscribe_nosub (void) +{ + flux_t *h; + + if (!(h = loopback_create (FLUX_O_TEST_NOSUB))) + BAIL_OUT ("loopback_create failed"); + + ok (flux_event_subscribe (h, "foo") == 0, + "flux_event_subscribe succeeds in loopback with TEST_NOSUB flag"); + ok (flux_event_unsubscribe (h, "foo") == 0, + "flux_event_unsubscribe succeeds in loopback with TEST_NOSUB flag"); + + flux_close (h); +} + int main (int argc, char *argv[]) { plan (NO_PLAN); test_codec (); + test_subscribe_badparams (); + test_subscribe_rpc (); + test_subscribe_nosub (); done_testing(); return (0);