diff --git a/configure.ac b/configure.ac index bf35adc5d257..592853fbbbc6 100644 --- a/configure.ac +++ b/configure.ac @@ -502,6 +502,7 @@ AC_CONFIG_FILES( \ src/common/librlist/Makefile \ src/common/libczmqcontainers/Makefile \ src/common/libccan/Makefile \ + src/common/libzmqutil/Makefile \ src/bindings/Makefile \ src/bindings/lua/Makefile \ src/bindings/python/Makefile \ diff --git a/doc/Makefile.am b/doc/Makefile.am index c44821e667da..f7ee18197b6d 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -80,7 +80,6 @@ MAN3_FILES_PRIMARY = \ man3/flux_reactor_create.3 \ man3/flux_fd_watcher_create.3 \ man3/flux_watcher_start.3 \ - man3/flux_zmq_watcher_create.3 \ man3/flux_handle_watcher_create.3 \ man3/flux_timer_watcher_create.3 \ man3/flux_periodic_watcher_create.3 \ @@ -163,7 +162,6 @@ MAN3_FILES_SECONDARY = \ man3/flux_watcher_stop.3 \ man3/flux_watcher_destroy.3 \ man3/flux_watcher_next_wakeup.3 \ - man3/flux_zmq_watcher_get_zsock.3 \ man3/flux_handle_watcher_get_flux.3 \ man3/flux_timer_watcher_reset.3 \ man3/flux_periodic_watcher_reset.3 \ diff --git a/doc/conf.py b/doc/conf.py index 27872bf8be30..74c9772c0c64 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -332,8 +332,6 @@ def setup(app): ('man3/flux_watcher_start', 'flux_watcher_destroy', 'start/stop/destroy/query reactor watcher', [author], 3), ('man3/flux_watcher_start', 'flux_watcher_next_wakeup', 'start/stop/destroy/query reactor watcher', [author], 3), ('man3/flux_watcher_start', 'flux_watcher_start', 'start/stop/destroy/query reactor watcher', [author], 3), - ('man3/flux_zmq_watcher_create', 'flux_zmq_watcher_get_zsock', 'create ZeroMQ watcher', [author], 3), - ('man3/flux_zmq_watcher_create', 'flux_zmq_watcher_create', 'create ZeroMQ watcher', [author], 3), ('man3/idset_create', 'idset_create', 'Manipulate numerically sorted sets of non-negative integers', [author], 3), ('man3/idset_create', 'idset_destroy', 'Manipulate numerically sorted sets of non-negative integers', [author], 3), ('man3/idset_create', 'idset_set', 'Manipulate numerically sorted sets of non-negative integers', [author], 3), diff --git a/doc/man3/flux_reactor_create.rst b/doc/man3/flux_reactor_create.rst index fad6a2233e28..2ba9858c4ecc 100644 --- a/doc/man3/flux_reactor_create.rst +++ b/doc/man3/flux_reactor_create.rst @@ -116,7 +116,6 @@ SEE ALSO ======== flux_fd_watcher_create(3), flux_handle_watcher_create(3), -flux_zmq_watcher_create(3), flux_timer_watcher_create(3), -flux_watcher_start(3) +flux_timer_watcher_create(3), flux_watcher_start(3) `libev home page `__ diff --git a/doc/man3/flux_zmq_watcher_create.rst b/doc/man3/flux_zmq_watcher_create.rst deleted file mode 100644 index e4f8b245979e..000000000000 --- a/doc/man3/flux_zmq_watcher_create.rst +++ /dev/null @@ -1,87 +0,0 @@ -========================== -flux_zmq_watcher_create(3) -========================== - - -SYNOPSIS -======== - -:: - - #include - -:: - - typedef void (*flux_watcher_f)(flux_reactor_t *r, - flux_watcher_t *w, - int revents, void *arg); - -:: - - flux_watcher_t *flux_zmq_watcher_create (flux_reactor_t *r, - void *zsock, int events, - flux_watcher_f callback, - void *arg); - -:: - - void *flux_zmq_watcher_get_zsock (flux_watcher_t *w); - - -DESCRIPTION -=========== - -``flux_zmq_watcher_create()`` creates a flux_watcher_t object which -monitors for events on a ZeroMQ socket *zsock*. When events occur, -the user-supplied *callback* is invoked. - -The *events* and *revents* arguments are a bitmask containing a -logical OR of the following bits. If a bit is set in *events*, -it indicates interest in this type of event. If a bit is set in the -*revents*, it indicates that this event has occurred. - -FLUX_POLLIN - The socket is ready for reading. - -FLUX_POLLOUT - The socket is ready for writing. - -FLUX_POLLERR - The socket has encountered an error. - This bit is ignored if it is set in *events*. - -Events are processed in a level-triggered manner. That is, the -callback will continue to be invoked as long as the event has not been -fully consumed or cleared, and the watcher has not been stopped. - -``flux_zmq_watcher_get_zsock()`` is used to obtain the socket from -within the callback. - - -RETURN VALUE -============ - -``flux_zmq_watcher_create()`` returns a flux_watcher_t object on success. -On error, NULL is returned, and errno is set appropriately. - -``flux_zmq_watcher_get_zsock()`` returns the socket associated with -the watcher. - - -ERRORS -====== - -ENOMEM - Out of memory. - - -RESOURCES -========= - -Github: http://github.com/flux-framework - - -SEE ALSO -======== - -flux_watcher_start(3), flux_reactor_start(3). diff --git a/doc/man3/index.rst b/doc/man3/index.rst index 559a614392de..7c53ef3183fe 100644 --- a/doc/man3/index.rst +++ b/doc/man3/index.rst @@ -70,7 +70,6 @@ man3 flux_sync_create flux_timer_watcher_create flux_watcher_start - flux_zmq_watcher_create idset_create idset_encode flux_jobtap_get_flux diff --git a/src/broker/Makefile.am b/src/broker/Makefile.am index 1717da860121..f25cd2eaf363 100644 --- a/src/broker/Makefile.am +++ b/src/broker/Makefile.am @@ -66,6 +66,7 @@ libbroker_la_SOURCES = \ flux_broker_LDADD = \ $(builddir)/libbroker.la \ $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libzmqutil/libzmqutil.la \ $(top_builddir)/src/common/libpmi/libpmi_client.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-optparse.la \ @@ -89,6 +90,7 @@ test_ldadd = \ $(builddir)/libbroker.la \ $(top_builddir)/src/common/libtestutil/libtestutil.la \ $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libzmqutil/libzmqutil.la \ $(top_builddir)/src/common/libpmi/libpmi_client.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libtap/libtap.la \ diff --git a/src/broker/module.c b/src/broker/module.c index 61826db6028f..1cb7f426c51a 100644 --- a/src/broker/module.c +++ b/src/broker/module.c @@ -22,6 +22,8 @@ #include #endif +#include "src/common/libzmqutil/msg_zsock.h" +#include "src/common/libzmqutil/reactor.h" #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libutil/log.h" #include "src/common/libutil/iterators.h" @@ -253,7 +255,7 @@ flux_msg_t *module_recvmsg (module_t *p) int type; struct flux_msg_cred cred; - if (!(msg = flux_msg_recvzsock (p->sock))) + if (!(msg = zmqutil_msg_recv (p->sock))) goto error; if (flux_msg_get_type (msg, &type) < 0) goto error; @@ -314,7 +316,7 @@ int module_sendmsg (module_t *p, const flux_msg_t *msg) goto done; if (flux_msg_route_push (cpy, p->modhash->uuid_str) < 0) goto done; - if (flux_msg_sendzsock (p->sock, cpy) < 0) + if (zmqutil_msg_send (p->sock, cpy) < 0) goto done; break; } @@ -323,12 +325,12 @@ int module_sendmsg (module_t *p, const flux_msg_t *msg) goto done; if (flux_msg_route_delete_last (cpy) < 0) goto done; - if (flux_msg_sendzsock (p->sock, cpy) < 0) + if (zmqutil_msg_send (p->sock, cpy) < 0) goto done; break; } default: - if (flux_msg_sendzsock (p->sock, msg) < 0) + if (zmqutil_msg_send (p->sock, msg) < 0) goto done; break; } @@ -612,13 +614,13 @@ module_t *module_add (modhash_t *mh, const char *path) log_err ("zsock_bind inproc://%s", module_get_uuid (p)); goto cleanup; } - if (!(p->broker_w = flux_zmq_watcher_create ( + if (!(p->broker_w = zmqutil_watcher_create ( flux_get_reactor (p->modhash->broker_h), p->sock, FLUX_POLLIN, module_cb, p))) { - log_err ("flux_zmq_watcher_create"); + log_err ("zmqutil_watcher_create"); goto cleanup; } /* Set creds for connection. diff --git a/src/broker/overlay.c b/src/broker/overlay.c index 5e651d36e779..7e2864647fed 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -19,6 +19,8 @@ #include #include +#include "src/common/libzmqutil/msg_zsock.h" +#include "src/common/libzmqutil/reactor.h" #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libutil/log.h" #include "src/common/libutil/kary.h" @@ -349,7 +351,7 @@ static int overlay_sendmsg_parent (struct overlay *ov, const flux_msg_t *msg) errno = EHOSTUNREACH; goto done; } - rc = flux_msg_sendzsock (ov->parent.zsock, msg); + rc = zmqutil_msg_send (ov->parent.zsock, msg); if (rc == 0) ov->parent.lastsent = flux_reactor_now (ov->reactor); done: @@ -501,7 +503,7 @@ static int overlay_sendmsg_child (struct overlay *ov, const flux_msg_t *msg) errno = EHOSTUNREACH; goto done; } - rc = flux_msg_sendzsock_ex (ov->bind_zsock, msg, true); + rc = zmqutil_msg_send_ex (ov->bind_zsock, msg, true); done: return rc; } @@ -569,7 +571,7 @@ static void child_cb (flux_reactor_t *r, flux_watcher_t *w, struct child *child; int status; - if (!(msg = flux_msg_recvzsock (ov->bind_zsock))) + if (!(msg = zmqutil_msg_recv (ov->bind_zsock))) return; if (flux_msg_get_type (msg, &type) < 0 || !(sender = flux_msg_route_last (msg))) @@ -630,7 +632,7 @@ static void parent_cb (flux_reactor_t *r, flux_watcher_t *w, int type; const char *topic = NULL; - if (!(msg = flux_msg_recvzsock (ov->parent.zsock))) + if (!(msg = zmqutil_msg_recv (ov->parent.zsock))) return; if (flux_msg_get_type (msg, &type) < 0) { goto drop; @@ -770,11 +772,11 @@ static int overlay_zap_init (struct overlay *ov) log_err ("could not bind to %s", ZAP_ENDPOINT); return -1; } - if (!(ov->zap_w = flux_zmq_watcher_create (ov->reactor, - ov->zap, - FLUX_POLLIN, - overlay_zap_cb, - ov))) + if (!(ov->zap_w = zmqutil_watcher_create (ov->reactor, + ov->zap, + FLUX_POLLIN, + overlay_zap_cb, + ov))) return -1; flux_watcher_start (ov->zap_w); return 0; @@ -943,12 +945,12 @@ int overlay_connect (struct overlay *ov) zsock_set_identity (ov->parent.zsock, ov->uuid); if (zsock_connect (ov->parent.zsock, "%s", ov->parent.uri) < 0) goto nomem; - if (!(ov->parent.w = flux_zmq_watcher_create (ov->reactor, - ov->parent.zsock, - FLUX_POLLIN, - parent_cb, - ov))) - return -1; + if (!(ov->parent.w = zmqutil_watcher_create (ov->reactor, + ov->parent.zsock, + FLUX_POLLIN, + parent_cb, + ov))) + return -1; flux_watcher_start (ov->parent.w); if (hello_request_send (ov, ov->rank, FLUX_CORE_VERSION_HEX) < 0) return -1; @@ -982,11 +984,11 @@ int overlay_bind (struct overlay *ov, const char *uri) */ if (!(ov->bind_uri = zsock_last_endpoint (ov->bind_zsock))) return -1; - if (!(ov->bind_w = flux_zmq_watcher_create (ov->reactor, - ov->bind_zsock, - FLUX_POLLIN, - child_cb, - ov))) + if (!(ov->bind_w = zmqutil_watcher_create (ov->reactor, + ov->bind_zsock, + FLUX_POLLIN, + child_cb, + ov))) return -1; flux_watcher_start (ov->bind_w); /* Ensure that ipc files are removed when the broker exits. diff --git a/src/broker/test/overlay.c b/src/broker/test/overlay.c index f79fb7c7b75c..92051491d8ce 100644 --- a/src/broker/test/overlay.c +++ b/src/broker/test/overlay.c @@ -18,6 +18,7 @@ #include #include "src/common/libtap/tap.h" +#include "src/common/libzmqutil/msg_zsock.h" #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libtestutil/util.h" #include "src/common/libutil/stdlog.h" @@ -436,7 +437,7 @@ void trio (flux_t *h) zsock_set_identity (zsock_none, "2"); ok (zsock_connect (zsock_none, "%s", parent_uri) == 0, "none-2: zsock_connect %s (no security) works", parent_uri); - ok (flux_msg_sendzsock (zsock_none, msg) == 0, + ok (zmqutil_msg_send (zsock_none, msg) == 0, "none-2: zsock_msg_sendzsock works"); /* 2) Curve, and correct server publc key, but client public key @@ -453,8 +454,8 @@ void trio (flux_t *h) zcert_destroy (&cert); ok (zsock_connect (zsock_curve, "%s", parent_uri) == 0, "curve-2: zsock_connect %s works", parent_uri); - ok (flux_msg_sendzsock (zsock_curve, msg) == 0, - "curve-2: flux_msg_sendzsock works"); + ok (zmqutil_msg_send (zsock_curve, msg) == 0, + "curve-2: zmqutil_msg_send works"); /* Neither of the above attempts should have gotten a message through. */ diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 8bead867fada..80b892f57f70 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -23,7 +23,8 @@ SUBDIRS = \ libhostlist \ librlist \ libczmqcontainers \ - libccan + libccan \ + libzmqutil AM_CFLAGS = $(WARNING_CFLAGS) $(CODE_COVERAGE_CFLAGS) AM_LDFLAGS = $(CODE_COVERAGE_LIBS) @@ -34,6 +35,7 @@ fluxinclude_HEADERS = core.h schedutil.h noinst_LTLIBRARIES = libflux-internal.la libflux_internal_la_SOURCES = libflux_internal_la_LIBADD = \ + $(builddir)/libflux/libmessage.la \ $(builddir)/liblsd/liblsd.la \ $(builddir)/libccan/libccan.la \ $(builddir)/libutil/libutil.la \ @@ -47,7 +49,6 @@ libflux_internal_la_LIBADD = \ $(builddir)/libhostlist/libhostlist.la \ $(builddir)/libczmqcontainers/libczmqcontainers.la \ $(JANSSON_LIBS) \ - $(ZMQ_LIBS) \ $(LIBUUID_LIBS) \ $(LIBPTHREAD) \ $(LIBDL) \ diff --git a/src/common/libflux/Makefile.am b/src/common/libflux/Makefile.am index 506c240def38..f052cfca6c61 100644 --- a/src/common/libflux/Makefile.am +++ b/src/common/libflux/Makefile.am @@ -11,8 +11,7 @@ AM_CPPFLAGS = \ -I$(top_srcdir)/src/common/libccan \ -I$(top_builddir) \ -I$(top_builddir)/src/common/libflux \ - $(JANSSON_CFLAGS) \ - $(ZMQ_CFLAGS) + $(JANSSON_CFLAGS) installed_conf_cppflags = \ -DINSTALLED_MODULE_PATH=\"$(fluxmoddir)\" \ @@ -88,13 +87,15 @@ nodist_fluxcoreinclude_HEADERS = \ version.h noinst_LTLIBRARIES = \ - libflux.la + libflux.la \ + libmessage.la libflux_la_SOURCES = \ flog.c \ attr.c \ handle.c \ reactor.c \ + reactor_private.h \ msg_handler.c \ message.c \ msglist.c \ @@ -133,6 +134,19 @@ libflux_la_CPPFLAGS = \ $(AM_CPPFLAGS) libflux_la_LDFLAGS = -avoid-version -module -shared -export-dynamic +libmessage_la_SOURCES = \ + message_private.h \ + message_iovec.h \ + message_iovec.c \ + message_route.h \ + message_route.c \ + message_proto.h \ + message_proto.c + + +libmessage_la_CPPFLAGS = $(AM_CPPFLAGS) +libmessage_la_LDFLAGS = -avoid-version -module -shared -export-dynamic + TESTS = test_message.t \ test_msglist.t \ test_request.t \ @@ -164,14 +178,10 @@ TESTS = test_message.t \ test_ldadd = \ $(top_builddir)/src/common/libtestutil/libtestutil.la \ $(top_builddir)/src/common/libflux/libflux.la \ - $(top_builddir)/src/common/libutil/libutil.la \ - $(top_builddir)/src/common/libidset/libidset.la \ + $(top_builddir)/src/common/libzmqutil/libzmqutil.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libkvs/libkvs.la \ $(top_builddir)/src/common/libtap/libtap.la \ - $(top_builddir)/src/common/liblsd/liblsd.la \ - $(top_builddir)/src/common/libtomlc99/libtomlc99.la \ - $(top_builddir)/src/common/libev/libev.la \ - $(top_builddir)/src/common/libczmqcontainers/libczmqcontainers.la \ - $(top_builddir)/src/common/libccan/libccan.la \ $(ZMQ_LIBS) \ $(LIBUUID_LIBS) \ $(JANSSON_LIBS) \ diff --git a/src/common/libflux/message.c b/src/common/libflux/message.c index 3f4514cd653f..7ce996c40d51 100644 --- a/src/common/libflux/message.c +++ b/src/common/libflux/message.c @@ -39,134 +39,17 @@ #include "src/common/libutil/aux.h" #include "src/common/libutil/errno_safe.h" -/* czmq and ccan both define streq */ -#ifdef streq -#undef streq -#endif -#include "src/common/libccan/ccan/list/list.h" #include "message.h" -#define IOVECINCR 4 - -struct flux_msg { - // optional route list, if FLUX_MSGFLAG_ROUTE - struct list_head routes; - int routes_len; /* to avoid looping */ - - // optional topic frame, if FLUX_MSGFLAG_TOPIC - char *topic; - - // optional payload frame, if FLUX_MSGFLAG_PAYLOAD - void *payload; - size_t payload_size; - - // required proto frame data - uint8_t type; - uint8_t flags; - uint32_t userid; - uint32_t rolemask; - union { - uint32_t nodeid; // request - uint32_t sequence; // event - uint32_t errnum; // response, keepalive - uint32_t aux1; // common accessor - }; - union { - uint32_t matchtag; // request, response - uint32_t status; // keepalive - uint32_t aux2; // common accessor - }; - - json_t *json; - char *lasterr; - struct aux_item *aux; - int refcount; -}; - -struct route_id { - struct list_node route_id_node; - char id[0]; /* variable length id stored at end of struct */ -}; - -/* 'transport_data' is for any auxiliary transport data user may wish - * to associate with iovec, user is responsible to free/destroy the - * field - */ -struct msg_iovec { - const void *data; - size_t size; - void *transport_data; -}; - -/* PROTO consists of 4 byte prelude followed by a fixed length - * array of u32's in network byte order. - */ -#define PROTO_MAGIC 0x8e -#define PROTO_VERSION 1 +#include "message_private.h" +#include "message_iovec.h" +#include "message_route.h" +#include "message_proto.h" -#define PROTO_OFF_MAGIC 0 /* 1 byte */ -#define PROTO_OFF_VERSION 1 /* 1 byte */ -#define PROTO_OFF_TYPE 2 /* 1 byte */ -#define PROTO_OFF_FLAGS 3 /* 1 byte */ -#define PROTO_OFF_U32_ARRAY 4 - -/* aux1 - * - * request - nodeid - * response - errnum - * event - sequence - * keepalive - errnum - * - * aux2 - * - * request - matchtag - * response - matchtag - * event - not used - * keepalive - status - */ -#define PROTO_IND_USERID 0 -#define PROTO_IND_ROLEMASK 1 -#define PROTO_IND_AUX1 2 -#define PROTO_IND_AUX2 3 - -#define PROTO_U32_COUNT 4 -#define PROTO_SIZE 4 + (PROTO_U32_COUNT * 4) - -static void route_id_destroy (void *data) +static void msg_setup_type (flux_msg_t *msg) { - if (data) { - struct route_id *r = data; - free (r); - } -} - -static struct route_id *route_id_create (const char *id, unsigned int id_len) -{ - struct route_id *r; - if (!(r = calloc (1, sizeof (*r) + id_len + 1))) - return NULL; - if (id && id_len) { - memcpy (r->id, id, id_len); - list_node_init (&(r->route_id_node)); - } - return r; -} - -static flux_msg_t *flux_msg_create_common (void) -{ - flux_msg_t *msg; - - if (!(msg = calloc (1, sizeof (*msg)))) - return NULL; - list_head_init (&msg->routes); - msg->refcount = 1; - return msg; -} - -static int msg_setup_type (flux_msg_t *msg, int type) -{ - switch (type) { + switch (msg->type) { case FLUX_MSGTYPE_REQUEST: msg->nodeid = FLUX_NODEID_ANY; msg->matchtag = FLUX_MATCHTAG_NONE; @@ -183,28 +66,32 @@ static int msg_setup_type (flux_msg_t *msg, int type) msg->errnum = 0; msg->status = 0; break; - default: - errno = EINVAL; - return -1; } - msg->type = type; - return 0; } flux_msg_t *flux_msg_create (int type) { flux_msg_t *msg; - if (!(msg = flux_msg_create_common ())) + if (type != FLUX_MSGTYPE_REQUEST + && type != FLUX_MSGTYPE_RESPONSE + && type != FLUX_MSGTYPE_EVENT + && type != FLUX_MSGTYPE_KEEPALIVE + && type != FLUX_MSGTYPE_ANY) { + errno = EINVAL; + return NULL; + } + + if (!(msg = calloc (1, sizeof (*msg)))) return NULL; + list_head_init (&msg->routes); + msg->type = type; + if (msg->type != FLUX_MSGTYPE_ANY) + msg_setup_type (msg); msg->userid = FLUX_USERID_UNKNOWN; msg->rolemask = FLUX_ROLE_NONE; - if (msg_setup_type (msg, type) < 0) - goto error; + msg->refcount = 1; return msg; -error: - flux_msg_destroy (msg); - return NULL; } void flux_msg_destroy (flux_msg_t *msg) @@ -302,27 +189,6 @@ ssize_t flux_msg_encode_size (const flux_msg_t *msg) return size; } -static void proto_set_u32 (uint8_t *data, int index, uint32_t val) -{ - uint32_t x = htonl (val); - int offset = PROTO_OFF_U32_ARRAY + index * 4; - memcpy (&data[offset], &x, sizeof (x)); -} - -static void msg_proto_setup (const flux_msg_t *msg, uint8_t *data, int len) -{ - assert (len >= PROTO_SIZE); - memset (data, 0, len); - data[PROTO_OFF_MAGIC] = PROTO_MAGIC; - data[PROTO_OFF_VERSION] = PROTO_VERSION; - data[PROTO_OFF_TYPE] = msg->type; - data[PROTO_OFF_FLAGS] = msg->flags; - proto_set_u32 (data, PROTO_IND_USERID, msg->userid); - proto_set_u32 (data, PROTO_IND_ROLEMASK, msg->rolemask); - proto_set_u32 (data, PROTO_IND_AUX1, msg->aux1); - proto_set_u32 (data, PROTO_IND_AUX2, msg->aux2); -} - static ssize_t encode_frame (uint8_t *buf, size_t buf_len, void *frame, @@ -361,6 +227,11 @@ int flux_msg_encode (const flux_msg_t *msg, void *buf, size_t size) errno = EINVAL; return -1; } + /* msg never completed initial setup */ + if (msg->type == FLUX_MSGTYPE_ANY) { + errno = EPROTO; + return -1; + } if (msg->flags & FLUX_MSGFLAG_ROUTE) { struct route_id *r = NULL; list_for_each (&msg->routes, r, route_id_node) { @@ -405,116 +276,6 @@ int flux_msg_encode (const flux_msg_t *msg, void *buf, size_t size) return 0; } -static void proto_get_u32 (const uint8_t *data, int index, uint32_t *val) -{ - uint32_t x; - int offset = PROTO_OFF_U32_ARRAY + index * 4; - memcpy (&x, &data[offset], sizeof (x)); - *val = ntohl (x); -} - -static int msg_append_route (flux_msg_t *msg, - const char *id, - unsigned int id_len) -{ - struct route_id *r; - assert (msg); - assert ((msg->flags & FLUX_MSGFLAG_ROUTE)); - assert (id); - if (!(r = route_id_create (id, id_len))) - return -1; - list_add_tail (&msg->routes, &r->route_id_node); - msg->routes_len++; - return 0; -} - -static int iovec_to_msg (flux_msg_t *msg, - struct msg_iovec *iov, - int iovcnt) -{ - unsigned int index = 0; - const uint8_t *proto_data; - size_t proto_size; - - assert (msg); - assert (iov); - - if (!iovcnt) { - errno = EPROTO; - return -1; - } - - /* proto frame is last frame */ - proto_data = iov[iovcnt - 1].data; - proto_size = iov[iovcnt - 1].size; - if (proto_size < PROTO_SIZE - || proto_data[PROTO_OFF_MAGIC] != PROTO_MAGIC - || proto_data[PROTO_OFF_VERSION] != PROTO_VERSION) { - errno = EPROTO; - return -1; - } - msg->type = proto_data[PROTO_OFF_TYPE]; - if (msg->type != FLUX_MSGTYPE_REQUEST - && msg->type != FLUX_MSGTYPE_RESPONSE - && msg->type != FLUX_MSGTYPE_EVENT - && msg->type != FLUX_MSGTYPE_KEEPALIVE) { - errno = EPROTO; - return -1; - } - msg->flags = proto_data[PROTO_OFF_FLAGS]; - - if ((msg->flags & FLUX_MSGFLAG_ROUTE)) { - /* On first access index == 0 && iovcnt > 0 guaranteed - * Re-add check if code changes. */ - /* if (index == iovcnt) { */ - /* errno = EPROTO; */ - /* return -1; */ - /* } */ - while ((index < iovcnt) && iov[index].size > 0) { - if (msg_append_route (msg, - (char *)iov[index].data, - iov[index].size) < 0) - return -1; - index++; - } - if (index < iovcnt) - index++; - } - if ((msg->flags & FLUX_MSGFLAG_TOPIC)) { - if (index == iovcnt) { - errno = EPROTO; - return -1; - } - if (!(msg->topic = strndup ((char *)iov[index].data, - iov[index].size))) - return -1; - if (index < iovcnt) - index++; - } - if ((msg->flags & FLUX_MSGFLAG_PAYLOAD)) { - if (index == iovcnt) { - errno = EPROTO; - return -1; - } - msg->payload_size = iov[index].size; - if (!(msg->payload = malloc (msg->payload_size))) - return -1; - memcpy (msg->payload, iov[index].data, msg->payload_size); - if (index < iovcnt) - index++; - } - /* proto frame required */ - if (index == iovcnt) { - errno = EPROTO; - return -1; - } - proto_get_u32 (proto_data, PROTO_IND_USERID, &msg->userid); - proto_get_u32 (proto_data, PROTO_IND_ROLEMASK, &msg->rolemask); - proto_get_u32 (proto_data, PROTO_IND_AUX1, &msg->aux1); - proto_get_u32 (proto_data, PROTO_IND_AUX2, &msg->aux2); - return 0; -} - flux_msg_t *flux_msg_decode (const void *buf, size_t size) { flux_msg_t *msg; @@ -523,7 +284,7 @@ flux_msg_t *flux_msg_decode (const void *buf, size_t size) int iovlen = 0; int iovcnt = 0; - if (!(msg = flux_msg_create_common ())) + if (!(msg = flux_msg_create (FLUX_MSGTYPE_ANY))) return NULL; while (p - (uint8_t *)buf < size) { size_t n = *p++; @@ -567,8 +328,15 @@ int flux_msg_set_type (flux_msg_t *msg, int type) errno = EINVAL; return -1; } - if (msg_setup_type (msg, type) < 0) + if (type != FLUX_MSGTYPE_REQUEST + && type != FLUX_MSGTYPE_RESPONSE + && type != FLUX_MSGTYPE_EVENT + && type != FLUX_MSGTYPE_KEEPALIVE) { + errno = EINVAL; return -1; + } + msg->type = type; + msg_setup_type (msg); return 0; } @@ -959,18 +727,13 @@ void flux_msg_route_disable (flux_msg_t *msg) void flux_msg_route_clear (flux_msg_t *msg) { - struct route_id *r; if (!msg || (!(msg->flags & FLUX_MSGFLAG_ROUTE))) return; - while ((r = list_pop (&msg->routes, struct route_id, route_id_node))) - route_id_destroy (r); - list_head_init (&msg->routes); - msg->routes_len = 0; + msg_route_clear (msg); } int flux_msg_route_push (flux_msg_t *msg, const char *id) { - struct route_id *r; if (!msg || !id) { errno = EINVAL; return -1; @@ -979,16 +742,11 @@ int flux_msg_route_push (flux_msg_t *msg, const char *id) errno = EPROTO; return -1; } - if (!(r = route_id_create (id, strlen (id)))) - return -1; - list_add (&msg->routes, &r->route_id_node); - msg->routes_len++; - return 0; + return msg_route_push (msg, id, strlen (id)); } int flux_msg_route_delete_last (flux_msg_t *msg) { - struct route_id *r; if (!msg) { errno = EINVAL; return -1; @@ -997,11 +755,7 @@ int flux_msg_route_delete_last (flux_msg_t *msg) errno = EPROTO; return -1; } - if ((r = list_pop (&msg->routes, struct route_id, route_id_node))) { - route_id_destroy (r); - msg->routes_len--; - } - return 0; + return msg_route_delete_last (msg); } /* replaces flux_msg_nexthop */ @@ -1403,7 +1157,7 @@ flux_msg_t *flux_msg_copy (const flux_msg_t *msg, bool payload) return NULL; } - if (!(cpy = flux_msg_create_common ())) + if (!(cpy = flux_msg_create (FLUX_MSGTYPE_ANY))) return NULL; cpy->type = msg->type; @@ -1659,173 +1413,6 @@ void flux_msg_fprint (FILE *f, const flux_msg_t *msg) flux_msg_fprint_ts (f, msg, -1); } -static int msg_to_iovec (const flux_msg_t *msg, - uint8_t *proto, - int proto_len, - struct msg_iovec **iovp, - int *iovcntp) -{ - struct msg_iovec *iov = NULL; - int index; - int frame_count; - - if ((frame_count = flux_msg_frames (msg)) < 0) - return -1; - - assert (frame_count); - - if (!(iov = malloc (frame_count * sizeof (*iov)))) - return -1; - - index = frame_count - 1; - - assert (proto_len >= PROTO_SIZE); - msg_proto_setup (msg, proto, proto_len); - iov[index].data = proto; - iov[index].size = PROTO_SIZE; - if (msg->flags & FLUX_MSGFLAG_PAYLOAD) { - index--; - assert (index >= 0); - iov[index].data = msg->payload; - iov[index].size = msg->payload_size; - } - if (msg->flags & FLUX_MSGFLAG_TOPIC) { - index--; - assert (index >= 0); - iov[index].data = msg->topic; - iov[index].size = strlen (msg->topic); - } - if (msg->flags & FLUX_MSGFLAG_ROUTE) { - struct route_id *r = NULL; - /* delimeter */ - index--; - assert (index >= 0); - iov[index].data = NULL; - iov[index].size = 0; - list_for_each_rev (&msg->routes, r, route_id_node) { - index--; - assert (index >= 0); - iov[index].data = r->id; - iov[index].size = strlen (r->id); - } - } - (*iovp) = iov; - (*iovcntp) = frame_count; - return 0; -} - -int flux_msg_sendzsock_ex (void *sock, const flux_msg_t *msg, bool nonblock) -{ - void *handle; - int flags = ZMQ_SNDMORE; - struct msg_iovec *iov = NULL; - int iovcnt; - uint8_t proto[PROTO_SIZE]; - int count = 0; - int rc = -1; - - if (!sock || !msg) { - errno = EINVAL; - return -1; - } - - if (msg_to_iovec (msg, proto, PROTO_SIZE, &iov, &iovcnt) < 0) - goto error; - - if (nonblock) - flags |= ZMQ_DONTWAIT; - - handle = zsock_resolve (sock); - while (count < iovcnt) { - if ((count + 1) == iovcnt) - flags &= ~ZMQ_SNDMORE; - if (zmq_send (handle, - iov[count].data, - iov[count].size, - flags) < 0) - goto error; - count++; - } - rc = 0; -error: - ERRNO_SAFE_WRAP (free, iov); - return rc; -} - -int flux_msg_sendzsock (void *sock, const flux_msg_t *msg) -{ - return flux_msg_sendzsock_ex (sock, msg, false); -} - -flux_msg_t *flux_msg_recvzsock (void *sock) -{ - void *handle; - struct msg_iovec *iov = NULL; - int iovlen = 0; - int iovcnt = 0; - flux_msg_t *msg; - flux_msg_t *rv = NULL; - - if (!sock) { - errno = EINVAL; - return NULL; - } - - /* N.B. we need to store a zmq_msg_t for each iovec entry so that - * the memory is available during the call to iovec_to_msg(). We - * use the msg_iovec's "transport_data" field to store the entry - * and then clear/free it later. - */ - handle = zsock_resolve (sock); - while (true) { - zmq_msg_t *msgdata; - if (iovlen <= iovcnt) { - struct msg_iovec *tmp; - iovlen += IOVECINCR; - if (!(tmp = realloc (iov, sizeof (*iov) * iovlen))) - goto error; - iov = tmp; - } - if (!(msgdata = malloc (sizeof (zmq_msg_t)))) - goto error; - zmq_msg_init (msgdata); - if (zmq_recvmsg (handle, msgdata, 0) < 0) { - int save_errno = errno; - zmq_msg_close (msgdata); - free (msgdata); - errno = save_errno; - goto error; - } - iov[iovcnt].transport_data = msgdata; - iov[iovcnt].data = zmq_msg_data (msgdata); - iov[iovcnt].size = zmq_msg_size (msgdata); - iovcnt++; - if (!zsock_rcvmore (handle)) - break; - } - - if (!(msg = flux_msg_create_common ())) { - errno = ENOMEM; - goto error; - } - if (iovec_to_msg (msg, iov, iovcnt) < 0) - goto error; - rv = msg; -error: - if (iov) { - int save_errno = errno; - int i; - for (i = 0; i < iovcnt; i++) { - zmq_msg_t *msgdata = iov[i].transport_data; - zmq_msg_close (msgdata); - free (msgdata); - } - free (iov); - errno = save_errno; - } - return rv; -} - int flux_msg_frames (const flux_msg_t *msg) { int n = 1; /* 1 for proto frame */ diff --git a/src/common/libflux/message.h b/src/common/libflux/message.h index 85966a1d033b..603f8dda88cc 100644 --- a/src/common/libflux/message.h +++ b/src/common/libflux/message.h @@ -89,7 +89,9 @@ int flux_match_asprintf (struct flux_match *m, NULL \ ) -/* Create a new Flux message. +/* Create a new Flux message. If the type of the message is not yet + * known at creation time, FLUX_MSGTYPE_ANY can be used. + * * Returns new message or null on failure, with errno set (e.g. ENOMEM, EINVAL) * Caller must destroy message with flux_msg_destroy() or equivalent. */ @@ -128,17 +130,6 @@ int flux_msg_frames (const flux_msg_t *msg); */ flux_msg_t *flux_msg_decode (const void *buf, size_t size); -/* Send message to zeromq socket. - * Returns 0 on success, -1 on failure with errno set. - */ -int flux_msg_sendzsock (void *dest, const flux_msg_t *msg); -int flux_msg_sendzsock_ex (void *dest, const flux_msg_t *msg, bool nonblock); - -/* Receive a message from zeromq socket. - * Returns message on success, NULL on failure with errno set. - */ -flux_msg_t *flux_msg_recvzsock (void *dest); - /* Get/set message type * For FLUX_MSGTYPE_REQUEST: set_type initializes nodeid to FLUX_NODEID_ANY * For FLUX_MSGTYPE_RESPONSE: set_type initializes errnum to 0 diff --git a/src/common/libflux/message_iovec.c b/src/common/libflux/message_iovec.c new file mode 100644 index 000000000000..74e2492f8df9 --- /dev/null +++ b/src/common/libflux/message_iovec.c @@ -0,0 +1,178 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include + +#include "message.h" +#include "message_private.h" +#include "message_route.h" +#include "message_proto.h" +#include "message_iovec.h" + +int iovec_to_msg (flux_msg_t *msg, + struct msg_iovec *iov, + int iovcnt) +{ + unsigned int index = 0; + const uint8_t *proto_data; + size_t proto_size; + + assert (msg); + assert (iov); + + if (!iovcnt) { + errno = EPROTO; + return -1; + } + + /* proto frame is last frame */ + proto_data = iov[iovcnt - 1].data; + proto_size = iov[iovcnt - 1].size; + if (proto_size < PROTO_SIZE + || proto_data[PROTO_OFF_MAGIC] != PROTO_MAGIC + || proto_data[PROTO_OFF_VERSION] != PROTO_VERSION) { + errno = EPROTO; + return -1; + } + msg->type = proto_data[PROTO_OFF_TYPE]; + if (msg->type != FLUX_MSGTYPE_REQUEST + && msg->type != FLUX_MSGTYPE_RESPONSE + && msg->type != FLUX_MSGTYPE_EVENT + && msg->type != FLUX_MSGTYPE_KEEPALIVE) { + errno = EPROTO; + return -1; + } + msg->flags = proto_data[PROTO_OFF_FLAGS]; + + if ((msg->flags & FLUX_MSGFLAG_ROUTE)) { + /* On first access index == 0 && iovcnt > 0 guaranteed + * Re-add check if code changes. */ + /* if (index == iovcnt) { */ + /* errno = EPROTO; */ + /* return -1; */ + /* } */ + while ((index < iovcnt) && iov[index].size > 0) { + if (msg_route_append (msg, + (char *)iov[index].data, + iov[index].size) < 0) + return -1; + index++; + } + if (index < iovcnt) + index++; + } + if ((msg->flags & FLUX_MSGFLAG_TOPIC)) { + if (index == iovcnt) { + errno = EPROTO; + return -1; + } + if (!(msg->topic = strndup ((char *)iov[index].data, + iov[index].size))) + return -1; + if (index < iovcnt) + index++; + } + if ((msg->flags & FLUX_MSGFLAG_PAYLOAD)) { + if (index == iovcnt) { + errno = EPROTO; + return -1; + } + msg->payload_size = iov[index].size; + if (!(msg->payload = malloc (msg->payload_size))) + return -1; + memcpy (msg->payload, iov[index].data, msg->payload_size); + if (index < iovcnt) + index++; + } + /* proto frame required */ + if (index == iovcnt) { + errno = EPROTO; + return -1; + } + proto_get_u32 (proto_data, PROTO_IND_USERID, &msg->userid); + proto_get_u32 (proto_data, PROTO_IND_ROLEMASK, &msg->rolemask); + proto_get_u32 (proto_data, PROTO_IND_AUX1, &msg->aux1); + proto_get_u32 (proto_data, PROTO_IND_AUX2, &msg->aux2); + return 0; +} + +int msg_to_iovec (const flux_msg_t *msg, + uint8_t *proto, + int proto_len, + struct msg_iovec **iovp, + int *iovcntp) +{ + struct msg_iovec *iov = NULL; + int index; + int frame_count; + + /* msg never completed initial setup */ + if (msg->type == FLUX_MSGTYPE_ANY) { + errno = EPROTO; + return -1; + } + + if ((frame_count = flux_msg_frames (msg)) < 0) + return -1; + + assert (frame_count); + + if (!(iov = malloc (frame_count * sizeof (*iov)))) + return -1; + + index = frame_count - 1; + + assert (proto_len >= PROTO_SIZE); + msg_proto_setup (msg, proto, proto_len); + iov[index].data = proto; + iov[index].size = PROTO_SIZE; + if (msg->flags & FLUX_MSGFLAG_PAYLOAD) { + index--; + assert (index >= 0); + iov[index].data = msg->payload; + iov[index].size = msg->payload_size; + } + if (msg->flags & FLUX_MSGFLAG_TOPIC) { + index--; + assert (index >= 0); + iov[index].data = msg->topic; + iov[index].size = strlen (msg->topic); + } + if (msg->flags & FLUX_MSGFLAG_ROUTE) { + struct route_id *r = NULL; + /* delimeter */ + index--; + assert (index >= 0); + iov[index].data = NULL; + iov[index].size = 0; + list_for_each_rev (&msg->routes, r, route_id_node) { + index--; + assert (index >= 0); + iov[index].data = r->id; + iov[index].size = strlen (r->id); + } + } + (*iovp) = iov; + (*iovcntp) = frame_count; + return 0; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libflux/message_iovec.h b/src/common/libflux/message_iovec.h new file mode 100644 index 000000000000..2b2738a5808c --- /dev/null +++ b/src/common/libflux/message_iovec.h @@ -0,0 +1,41 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_CORE_MESSAGE_IOVEC_H +#define _FLUX_CORE_MESSAGE_IOVEC_H + +#define IOVECINCR 4 + +/* 'transport_data' is for any auxiliary transport data user may wish + * to associate with iovec, user is responsible to free/destroy the + * field + */ +struct msg_iovec { + const void *data; + size_t size; + void *transport_data; +}; + +int iovec_to_msg (flux_msg_t *msg, + struct msg_iovec *iov, + int iovcnt); + +int msg_to_iovec (const flux_msg_t *msg, + uint8_t *proto, + int proto_len, + struct msg_iovec **iovp, + int *iovcntp); + +#endif /* !_FLUX_CORE_MESSAGE_IOVEC_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libflux/message_private.h b/src/common/libflux/message_private.h new file mode 100644 index 000000000000..9012e3f31ab7 --- /dev/null +++ b/src/common/libflux/message_private.h @@ -0,0 +1,63 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_CORE_MESSAGE_PRIVATE_H +#define _FLUX_CORE_MESSAGE_PRIVATE_H + +#include +#include + +/* czmq and ccan both define streq */ +#ifdef streq +#undef streq +#endif +#include "src/common/libccan/ccan/list/list.h" + +struct flux_msg { + // optional route list, if FLUX_MSGFLAG_ROUTE + struct list_head routes; + int routes_len; /* to avoid looping */ + + // optional topic frame, if FLUX_MSGFLAG_TOPIC + char *topic; + + // optional payload frame, if FLUX_MSGFLAG_PAYLOAD + void *payload; + size_t payload_size; + + // required proto frame data + uint8_t type; + uint8_t flags; + uint32_t userid; + uint32_t rolemask; + union { + uint32_t nodeid; // request + uint32_t sequence; // event + uint32_t errnum; // response, keepalive + uint32_t aux1; // common accessor + }; + union { + uint32_t matchtag; // request, response + uint32_t status; // keepalive + uint32_t aux2; // common accessor + }; + + json_t *json; + char *lasterr; + struct aux_item *aux; + int refcount; +}; + +#endif /* !_FLUX_CORE_MESSAGE_PRIVATE_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libflux/message_proto.c b/src/common/libflux/message_proto.c new file mode 100644 index 000000000000..705dab252fb7 --- /dev/null +++ b/src/common/libflux/message_proto.c @@ -0,0 +1,58 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include + +#include "message.h" +#include "message_private.h" +#include "message_proto.h" + +static void proto_set_u32 (uint8_t *data, int index, uint32_t val) +{ + uint32_t x = htonl (val); + int offset = PROTO_OFF_U32_ARRAY + index * 4; + memcpy (&data[offset], &x, sizeof (x)); +} + +void msg_proto_setup (const flux_msg_t *msg, uint8_t *data, int len) +{ + assert (len >= PROTO_SIZE); + assert (msg->type != FLUX_MSGTYPE_ANY); + memset (data, 0, len); + data[PROTO_OFF_MAGIC] = PROTO_MAGIC; + data[PROTO_OFF_VERSION] = PROTO_VERSION; + data[PROTO_OFF_TYPE] = msg->type; + data[PROTO_OFF_FLAGS] = msg->flags; + proto_set_u32 (data, PROTO_IND_USERID, msg->userid); + proto_set_u32 (data, PROTO_IND_ROLEMASK, msg->rolemask); + proto_set_u32 (data, PROTO_IND_AUX1, msg->aux1); + proto_set_u32 (data, PROTO_IND_AUX2, msg->aux2); +} + +void proto_get_u32 (const uint8_t *data, int index, uint32_t *val) +{ + uint32_t x; + int offset = PROTO_OFF_U32_ARRAY + index * 4; + memcpy (&x, &data[offset], sizeof (x)); + *val = ntohl (x); +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libflux/message_proto.h b/src/common/libflux/message_proto.h new file mode 100644 index 000000000000..2ebc4bd71a53 --- /dev/null +++ b/src/common/libflux/message_proto.h @@ -0,0 +1,57 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_CORE_MESSAGE_PROTO_H +#define _FLUX_CORE_MESSAGE_PROTO_H + +/* PROTO consists of 4 byte prelude followed by a fixed length + * array of u32's in network byte order. + */ +#define PROTO_MAGIC 0x8e +#define PROTO_VERSION 1 + +#define PROTO_OFF_MAGIC 0 /* 1 byte */ +#define PROTO_OFF_VERSION 1 /* 1 byte */ +#define PROTO_OFF_TYPE 2 /* 1 byte */ +#define PROTO_OFF_FLAGS 3 /* 1 byte */ +#define PROTO_OFF_U32_ARRAY 4 + +/* aux1 + * + * request - nodeid + * response - errnum + * event - sequence + * keepalive - errnum + * + * aux2 + * + * request - matchtag + * response - matchtag + * event - not used + * keepalive - status + */ +#define PROTO_IND_USERID 0 +#define PROTO_IND_ROLEMASK 1 +#define PROTO_IND_AUX1 2 +#define PROTO_IND_AUX2 3 + +#define PROTO_U32_COUNT 4 +#define PROTO_SIZE 4 + (PROTO_U32_COUNT * 4) + +void msg_proto_setup (const flux_msg_t *msg, uint8_t *data, int len); + +void proto_get_u32 (const uint8_t *data, int index, uint32_t *val); + +#endif /* !_FLUX_CORE_MESSAGE_PROTO_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libflux/message_route.c b/src/common/libflux/message_route.c new file mode 100644 index 000000000000..70505a1d4641 --- /dev/null +++ b/src/common/libflux/message_route.c @@ -0,0 +1,98 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include + +#include "message.h" +#include "message_private.h" +#include "message_route.h" + +static void route_id_destroy (void *data) +{ + if (data) { + struct route_id *r = data; + free (r); + } +} + +static struct route_id *route_id_create (const char *id, unsigned int id_len) +{ + struct route_id *r; + if (!(r = calloc (1, sizeof (*r) + id_len + 1))) + return NULL; + if (id && id_len) { + memcpy (r->id, id, id_len); + list_node_init (&(r->route_id_node)); + } + return r; +} + +int msg_route_push (flux_msg_t *msg, + const char *id, + unsigned int id_len) +{ + struct route_id *r; + if (!(r = route_id_create (id, strlen (id)))) + return -1; + list_add (&msg->routes, &r->route_id_node); + msg->routes_len++; + return 0; +} + +int msg_route_append (flux_msg_t *msg, + const char *id, + unsigned int id_len) +{ + struct route_id *r; + assert (msg); + assert ((msg->flags & FLUX_MSGFLAG_ROUTE)); + assert (id); + if (!(r = route_id_create (id, id_len))) + return -1; + list_add_tail (&msg->routes, &r->route_id_node); + msg->routes_len++; + return 0; +} + +void msg_route_clear (flux_msg_t *msg) +{ + struct route_id *r; + assert (msg); + assert ((msg->flags & FLUX_MSGFLAG_ROUTE)); + while ((r = list_pop (&msg->routes, struct route_id, route_id_node))) + route_id_destroy (r); + list_head_init (&msg->routes); + msg->routes_len = 0; +} + +int msg_route_delete_last (flux_msg_t *msg) +{ + struct route_id *r; + assert (msg); + assert ((msg->flags & FLUX_MSGFLAG_ROUTE)); + if ((r = list_pop (&msg->routes, struct route_id, route_id_node))) { + route_id_destroy (r); + msg->routes_len--; + } + return 0; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libflux/message_route.h b/src/common/libflux/message_route.h new file mode 100644 index 000000000000..2844a2b6bc24 --- /dev/null +++ b/src/common/libflux/message_route.h @@ -0,0 +1,36 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_CORE_MESSAGE_ROUTE_H +#define _FLUX_CORE_MESSAGE_ROUTE_H + +struct route_id { + struct list_node route_id_node; + char id[0]; /* variable length id stored at end of struct */ +}; + +int msg_route_push (flux_msg_t *msg, + const char *id, + unsigned int id_len); + +int msg_route_append (flux_msg_t *msg, + const char *id, + unsigned int id_len); + +void msg_route_clear (flux_msg_t *msg); + +int msg_route_delete_last (flux_msg_t *msg); + +#endif /* !_FLUX_CORE_MESSAGE_ROUTE_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libflux/reactor.c b/src/common/libflux/reactor.c index 7df686a085a3..d0000b7ee266 100644 --- a/src/common/libflux/reactor.c +++ b/src/common/libflux/reactor.c @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include "handle.h" #include "reactor.h" @@ -26,23 +26,10 @@ #include "buffer_private.h" #include "src/common/libev/ev.h" -#include "src/common/libutil/ev_zmq.h" #include "src/common/libutil/log.h" #include "src/common/libutil/fdutils.h" -struct flux_reactor { - struct ev_loop *loop; - int usecount; - unsigned int errflag:1; -}; - -struct flux_watcher { - flux_reactor_t *r; - flux_watcher_f fn; - void *arg; - struct flux_watcher_ops *ops; - void *data; -}; +#include "reactor_private.h" static void reactor_usecount_decr (flux_reactor_t *r) { @@ -166,30 +153,6 @@ void flux_reactor_active_decref (flux_reactor_t *r) ev_unref (r->loop); } -static int events_to_libev (int events) -{ - int e = 0; - if (events & FLUX_POLLIN) - e |= EV_READ; - if (events & FLUX_POLLOUT) - e |= EV_WRITE; - if (events & FLUX_POLLERR) - e |= EV_ERROR; - return e; -} - -static int libev_to_events (int events) -{ - int e = 0; - if (events & EV_READ) - e |= FLUX_POLLIN; - if (events & EV_WRITE) - e |= FLUX_POLLOUT; - if (events & EV_ERROR) - e |= FLUX_POLLERR; - return e; -} - /** ** Watchers **/ @@ -583,58 +546,6 @@ int flux_buffer_write_watcher_is_closed (flux_watcher_t *w, int *errp) return (0); } -/* 0MQ sockets - */ - -static void zmq_start (flux_watcher_t *w) -{ - ev_zmq_start (w->r->loop, (ev_zmq *)w->data); -} - -static void zmq_stop (flux_watcher_t *w) -{ - ev_zmq_stop (w->r->loop, (ev_zmq *)w->data); -} - -static void zmq_cb (struct ev_loop *loop, ev_zmq *pw, int revents) -{ - struct flux_watcher *w = pw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops zmq_watcher = { - .start = zmq_start, - .stop = zmq_stop, - .destroy = NULL, -}; - -flux_watcher_t *flux_zmq_watcher_create (flux_reactor_t *r, - void *zsock, int events, - flux_watcher_f cb, void *arg) -{ - ev_zmq *zw; - flux_watcher_t *w; - - if (!(w = flux_watcher_create (r, sizeof (*zw), &zmq_watcher, cb, arg))) - return NULL; - zw = flux_watcher_get_data (w); - ev_zmq_init (zw, zmq_cb, zsock, events_to_libev (events) & ~EV_ERROR); - zw->data = w; - - return w; -} - -void *flux_zmq_watcher_get_zsock (flux_watcher_t *w) -{ - if (flux_watcher_get_ops (w) != &zmq_watcher) { - errno = EINVAL; - return NULL; - } - ev_zmq *zw = w->data; - return zw->zsock; -} - /* Timer */ diff --git a/src/common/libflux/reactor.h b/src/common/libflux/reactor.h index cd96c08fe493..7368962f1af0 100644 --- a/src/common/libflux/reactor.h +++ b/src/common/libflux/reactor.h @@ -130,14 +130,6 @@ int flux_buffer_write_watcher_close (flux_watcher_t *w); /* Returns 1 if write watcher is closed, errnum from close in close_err */ int flux_buffer_write_watcher_is_closed (flux_watcher_t *w, int *close_err); -/* zmq socket - */ - -flux_watcher_t *flux_zmq_watcher_create (flux_reactor_t *r, - void *zsock, int events, - flux_watcher_f cb, void *arg); -void *flux_zmq_watcher_get_zsock (flux_watcher_t *w); - /* timer */ diff --git a/src/common/libflux/reactor_private.h b/src/common/libflux/reactor_private.h new file mode 100644 index 000000000000..9fa57fa6d23b --- /dev/null +++ b/src/common/libflux/reactor_private.h @@ -0,0 +1,67 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_CORE_REACTOR_PRIVATE_H +#define _FLUX_CORE_REACTOR_PRIVATE_H + +#include "src/common/libev/ev.h" +#include "reactor.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct flux_reactor { + struct ev_loop *loop; + int usecount; + unsigned int errflag:1; +}; + +struct flux_watcher { + flux_reactor_t *r; + flux_watcher_f fn; + void *arg; + struct flux_watcher_ops *ops; + void *data; +}; + +static inline int events_to_libev (int events) +{ + int e = 0; + if (events & FLUX_POLLIN) + e |= EV_READ; + if (events & FLUX_POLLOUT) + e |= EV_WRITE; + if (events & FLUX_POLLERR) + e |= EV_ERROR; + return e; +} + +static inline int libev_to_events (int events) +{ + int e = 0; + if (events & EV_READ) + e |= FLUX_POLLIN; + if (events & EV_WRITE) + e |= FLUX_POLLOUT; + if (events & EV_ERROR) + e |= FLUX_POLLERR; + return e; +} + +#ifdef __cplusplus +} +#endif + +#endif /* !_FLUX_CORE_REACTOR_PRIVATE_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/common/libflux/test/message.c b/src/common/libflux/test/message.c index 8dcc67b8f771..71b662088750 100644 --- a/src/common/libflux/test/message.c +++ b/src/common/libflux/test/message.c @@ -12,10 +12,11 @@ #include "config.h" #endif #include -#include #include #include #include +#include +#include #include "src/common/libflux/message.h" #include "src/common/libtap/tap.h" @@ -25,7 +26,7 @@ static bool verbose = false; void check_cornercase (void) { flux_msg_t *msg; - flux_msg_t *req, *rsp, *evt; + flux_msg_t *req, *rsp, *evt, *any; struct flux_msg_cred cred; uint32_t seq, nodeid; uint8_t encodebuf[64]; @@ -49,6 +50,8 @@ void check_cornercase (void) BAIL_OUT ("flux_msg_create failed"); if (!(evt = flux_msg_create (FLUX_MSGTYPE_EVENT))) BAIL_OUT ("flux_msg_create failed"); + if (!(any = flux_msg_create (FLUX_MSGTYPE_ANY))) + BAIL_OUT ("flux_msg_create failed"); lives_ok ({flux_msg_destroy (NULL);}, "flux_msg_destroy msg=NULL doesnt crash"); @@ -77,6 +80,9 @@ void check_cornercase (void) ok (flux_msg_encode (NULL, encodebuf, encodesize) < 0 && errno == EINVAL, "flux_msg_encode fails on EINVAL with msg=NULL"); errno = 0; + ok (flux_msg_encode (any, encodebuf, encodesize) < 0 && errno == EPROTO, + "flux_msg_encode fails on EPROTO with msg type ANY"); + errno = 0; ok (flux_msg_frames (NULL) < 0 && errno == EINVAL, "flux_msg_frames returns -1 errno EINVAL on msg = NULL"); @@ -697,6 +703,12 @@ void check_proto (void) int errnum; int type; + ok ((msg = flux_msg_create (FLUX_MSGTYPE_ANY)) != NULL, + "flux_msg_create works"); + ok (flux_msg_get_type (msg, &type) == 0 && type == FLUX_MSGTYPE_ANY, + "flux_msg_get_type works with type FLUX_MSGTYPE_ANY"); + flux_msg_destroy (msg); + ok ((msg = flux_msg_create (FLUX_MSGTYPE_RESPONSE)) != NULL, "flux_msg_create works"); ok (flux_msg_get_type (msg, &type) == 0 && type == FLUX_MSGTYPE_RESPONSE, @@ -938,74 +950,6 @@ void check_encode (void) flux_msg_destroy (msg2); } -void check_sendzsock (void) -{ - zsock_t *zsock[2] = { NULL, NULL }; - flux_msg_t *msg, *msg2; - const char *topic; - int type; - const char *uri = "inproc://test"; - - /* zsys boiler plate: - * appears to be needed to avoid atexit assertions when lives_ok() - * macro (which calls fork()) is used. - */ - zsys_init (); - zsys_set_logstream (stderr); - zsys_set_logident ("test_message.t"); - zsys_handler_set (NULL); - zsys_set_linger (5); // msec - - ok ((zsock[0] = zsock_new_pair (NULL)) != NULL - && zsock_bind (zsock[0], "%s", uri) == 0 - && (zsock[1] = zsock_new_pair (uri)) != NULL, - "got inproc socket pair"); - - ok ((msg = flux_msg_create (FLUX_MSGTYPE_REQUEST)) != NULL - && flux_msg_set_topic (msg, "foo.bar") == 0, - "created test message"); - - /* corner case tests */ - ok (flux_msg_sendzsock (NULL, msg) < 0 && errno == EINVAL, - "flux_msg_sendzsock returns < 0 and EINVAL on dest = NULL"); - ok (flux_msg_sendzsock_ex (NULL, msg, true) < 0 && errno == EINVAL, - "flux_msg_sendzsock_ex returns < 0 and EINVAL on dest = NULL"); - ok (flux_msg_recvzsock (NULL) == NULL && errno == EINVAL, - "flux_msg_recvzsock returns NULL and EINVAL on dest = NULL"); - - ok (flux_msg_sendzsock (zsock[1], msg) == 0, - "flux_msg_sendzsock works"); - ok ((msg2 = flux_msg_recvzsock (zsock[0])) != NULL, - "flux_msg_recvzsock works"); - ok (flux_msg_get_type (msg2, &type) == 0 && type == FLUX_MSGTYPE_REQUEST - && flux_msg_get_topic (msg2, &topic) == 0 - && !strcmp (topic, "foo.bar") - && flux_msg_has_payload (msg2) == false, - "decoded message looks like what was sent"); - flux_msg_destroy (msg2); - - /* Send it again. - */ - ok (flux_msg_sendzsock (zsock[1], msg) == 0, - "try2: flux_msg_sendzsock works"); - ok ((msg2 = flux_msg_recvzsock (zsock[0])) != NULL, - "try2: flux_msg_recvzsock works"); - ok (flux_msg_get_type (msg2, &type) == 0 && type == FLUX_MSGTYPE_REQUEST - && flux_msg_get_topic (msg2, &topic) == 0 - && !strcmp (topic, "foo.bar") - && flux_msg_has_payload (msg2) == false, - "try2: decoded message looks like what was sent"); - flux_msg_destroy (msg2); - flux_msg_destroy (msg); - - zsock_destroy (&zsock[0]); - zsock_destroy (&zsock[1]); - - /* zsys boiler plate - see note above - */ - zsys_shutdown(); -} - void *myfree_arg = NULL; void myfree (void *arg) { @@ -1302,7 +1246,6 @@ int main (int argc, char *argv[]) check_cmp (); check_encode (); - check_sendzsock (); check_refcount(); diff --git a/src/common/libflux/test/reactor.c b/src/common/libflux/test/reactor.c index e71e62344b82..577b8433a261 100644 --- a/src/common/libflux/test/reactor.c +++ b/src/common/libflux/test/reactor.c @@ -9,109 +9,17 @@ \************************************************************/ #include -#include #include #include #include +#include +#include #include "src/common/libflux/reactor.h" #include "src/common/libutil/xzmalloc.h" #include "src/common/libutil/fdutils.h" #include "src/common/libtap/tap.h" -static const size_t zmqwriter_msgcount = 1024; - -static void zmqwriter (flux_reactor_t *r, flux_watcher_t *w, - int revents, void *arg) -{ - void *sock = flux_zmq_watcher_get_zsock (w); - static int count = 0; - if (revents & FLUX_POLLERR) { - fprintf (stderr, "%s: FLUX_POLLERR is set\n", __FUNCTION__); - goto error; - } - if (revents & FLUX_POLLOUT) { - uint8_t blob[64]; - zmsg_t *zmsg = zmsg_new (); - if (!zmsg || zmsg_addmem (zmsg, blob, sizeof (blob)) < 0) { - fprintf (stderr, "%s: failed to create message: %s\n", - __FUNCTION__, strerror (errno)); - goto error; - } - if (zmsg_send (&zmsg, sock) < 0) { - fprintf (stderr, "%s: zmsg_send: %s\n", - __FUNCTION__, strerror (errno)); - goto error; - } - count++; - if (count == zmqwriter_msgcount) - flux_watcher_stop (w); - } - return; -error: - flux_reactor_stop_error (r); -} - -static void zmqreader (flux_reactor_t *r, flux_watcher_t *w, - int revents, void *arg) -{ - void *sock = flux_zmq_watcher_get_zsock (w); - static int count = 0; - if (revents & FLUX_POLLERR) { - fprintf (stderr, "%s: FLUX_POLLERR is set\n", __FUNCTION__); - goto error; - } - if (revents & FLUX_POLLIN) { - zmsg_t *zmsg = zmsg_recv (sock); - if (!zmsg) { - fprintf (stderr, "%s: zmsg_recv: %s\n", - __FUNCTION__, strerror (errno)); - goto error; - } - zmsg_destroy (&zmsg); - count++; - if (count == zmqwriter_msgcount) - flux_watcher_stop (w); - } - return; -error: - flux_reactor_stop_error (r); -} - -static void test_zmq (flux_reactor_t *reactor) -{ - zsock_t *zs[2]; - flux_watcher_t *r, *w; - const char *uri = "inproc://test_zmq"; - - zsys_set_logstream (stderr); - zsys_handler_set (NULL); - - zs[0] = zsock_new_pair (NULL); - zs[1] = zsock_new_pair (NULL); - ok (zs[0] && zs[1] - && zsock_bind (zs[0], "%s", uri) == 0 - && zsock_connect (zs[1], "%s", uri) == 0, - "zmq: connected ZMQ_PAIR sockets over inproc"); - r = flux_zmq_watcher_create (reactor, zs[0], FLUX_POLLIN, zmqreader, NULL); - w = flux_zmq_watcher_create (reactor, zs[1], FLUX_POLLOUT, zmqwriter, NULL); - ok (r != NULL && w != NULL, - "zmq: nonblocking reader and writer created"); - flux_watcher_start (r); - flux_watcher_start (w); - ok (flux_reactor_run (reactor, 0) == 0, - "zmq: reactor ran to completion after %d messages", zmqwriter_msgcount); - flux_watcher_stop (r); - flux_watcher_stop (w); - flux_watcher_destroy (r); - flux_watcher_destroy (w); - - zsock_destroy (&zs[0]); - zsock_destroy (&zs[1]); - - zsys_shutdown (); -} - static const size_t fdwriter_bufsize = 10*1024*1024; static void fdwriter (flux_reactor_t *r, flux_watcher_t *w, @@ -1461,7 +1369,6 @@ int main (int argc, char *argv[]) test_fd (reactor); test_buffer (reactor); test_buffer_corner_case (reactor); - test_zmq (reactor); test_idle (reactor); test_prepcheck (reactor); test_signal (reactor); diff --git a/src/common/libjob/Makefile.am b/src/common/libjob/Makefile.am index 305cc4b63e3f..2ce07ebb0e86 100644 --- a/src/common/libjob/Makefile.am +++ b/src/common/libjob/Makefile.am @@ -53,7 +53,8 @@ test_ldadd = \ $(top_builddir)/src/common/libtap/libtap.la \ $(JANSSON_LIBS) \ $(LIBPTHREAD) \ - $(FLUX_SECURITY_LIBS) + $(FLUX_SECURITY_LIBS) \ + $(ZMQ_LIBS) test_cppflags = \ $(AM_CPPFLAGS) \ diff --git a/src/common/libkvs/Makefile.am b/src/common/libkvs/Makefile.am index 383260e4c2b7..60eda9cfc821 100644 --- a/src/common/libkvs/Makefile.am +++ b/src/common/libkvs/Makefile.am @@ -63,7 +63,8 @@ test_ldadd = \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libtap/libtap.la \ $(JANSSON_LIBS) \ - $(LIBPTHREAD) + $(LIBPTHREAD) \ + $(ZMQ_LIBS) test_cppflags = \ $(AM_CPPFLAGS) \ diff --git a/src/common/libpmi/Makefile.am b/src/common/libpmi/Makefile.am index d87933febefb..bb0ce04601b4 100644 --- a/src/common/libpmi/Makefile.am +++ b/src/common/libpmi/Makefile.am @@ -48,14 +48,11 @@ TESTS = test_keyval.t \ test_ldadd = \ $(top_builddir)/src/common/libflux/libflux.la \ + $(top_builddir)/src/common/libzmqutil/libzmqutil.la \ $(top_builddir)/src/common/libpmi/libpmi_client.la \ $(top_builddir)/src/common/libpmi/libpmi_server.la \ - $(top_builddir)/src/common/libutil/libutil.la \ + $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libtap/libtap.la \ - $(top_builddir)/src/common/liblsd/liblsd.la \ - $(top_builddir)/src/common/libtomlc99/libtomlc99.la \ - $(top_builddir)/src/common/libev/libev.la \ - $(top_builddir)/src/common/libczmqcontainers/libczmqcontainers.la \ $(ZMQ_LIBS) \ $(JANSSON_LIBS) \ $(LIBPTHREAD) \ diff --git a/src/common/librouter/Makefile.am b/src/common/librouter/Makefile.am index a034ffd4ee8c..da14254a7dbc 100644 --- a/src/common/librouter/Makefile.am +++ b/src/common/librouter/Makefile.am @@ -62,9 +62,11 @@ test_ldadd = \ $(builddir)/libtestutil.la \ $(top_builddir)/src/common/librouter/librouter.la \ $(top_builddir)/src/common/libtestutil/libtestutil.la \ + $(top_builddir)/src/common/libzmqutil/libzmqutil.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ - $(top_builddir)/src/common/libtap/libtap.la + $(top_builddir)/src/common/libtap/libtap.la \ + $(ZMQ_LIBS) test_cppflags = \ $(AM_CPPFLAGS) \ diff --git a/src/common/libterminus/Makefile.am b/src/common/libterminus/Makefile.am index ccf2d2d5c19e..b39dc7e0efcf 100644 --- a/src/common/libterminus/Makefile.am +++ b/src/common/libterminus/Makefile.am @@ -36,9 +36,11 @@ test_ldadd = \ $(top_builddir)/src/common/libterminus/libterminus.la \ $(top_builddir)/src/common/libutil/libutil.la \ $(top_builddir)/src/common/libtestutil/libtestutil.la \ + $(top_builddir)/src/common/libzmqutil/libzmqutil.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ - $(top_builddir)/src/common/libtap/libtap.la + $(top_builddir)/src/common/libtap/libtap.la \ + $(ZMQ_LIBS) test_ldflags = \ -no-install diff --git a/src/common/libtestutil/util.c b/src/common/libtestutil/util.c index f9edd856afc1..adb35de5ae18 100644 --- a/src/common/libtestutil/util.c +++ b/src/common/libtestutil/util.c @@ -19,6 +19,7 @@ #include "util.h" +#include "src/common/libzmqutil/msg_zsock.h" #include "src/common/libtap/tap.h" #ifndef UUID_STR_LEN @@ -240,7 +241,7 @@ static int test_connector_send (void *impl, const flux_msg_t *msg, int flags) goto error; break; } - if (flux_msg_sendzsock (tcon->sock, cpy) < 0) + if (zmqutil_msg_send (tcon->sock, cpy) < 0) goto error; flux_msg_destroy (cpy); return 0; @@ -267,7 +268,7 @@ static flux_msg_t *test_connector_recv (void *impl, int flags) return NULL; } } - return flux_msg_recvzsock (tcon->sock); + return zmqutil_msg_recv (tcon->sock); } static void test_connector_fini (void *impl) diff --git a/src/common/libutil/Makefile.am b/src/common/libutil/Makefile.am index 65c96fcfe942..750fe67ae999 100644 --- a/src/common/libutil/Makefile.am +++ b/src/common/libutil/Makefile.am @@ -1,7 +1,6 @@ AM_CFLAGS = \ $(WARNING_CFLAGS) \ $(CODE_COVERAGE_CFLAGS) \ - $(ZMQ_CFLAGS) \ -Wno-strict-aliasing -Wno-error=strict-aliasing \ -Wno-parentheses -Wno-error=parentheses @@ -34,8 +33,6 @@ libutil_la_SOURCES = \ veb.h \ read_all.c \ read_all.h \ - ev_zmq.c \ - ev_zmq.h \ cleanup.c \ cleanup.h \ unlink_recursive.c \ @@ -95,8 +92,7 @@ libutil_la_SOURCES = \ EXTRA_DIST = veb_mach.c -TESTS = test_ev.t \ - test_sha1.t \ +TESTS = test_sha1.t \ test_sha256.t \ test_popen2.t \ test_kary.t \ @@ -126,11 +122,9 @@ test_ldadd = \ $(top_builddir)/src/common/libutil/libutil.la \ $(top_builddir)/src/common/libtap/libtap.la \ $(top_builddir)/src/common/liblsd/liblsd.la \ - $(top_builddir)/src/common/libev/libev.la \ $(top_builddir)/src/common/libtomlc99/libtomlc99.la \ $(top_builddir)/src/common/libczmqcontainers/libczmqcontainers.la \ $(top_builddir)/src/common/libccan/libccan.la \ - $(ZMQ_LIBS) \ $(LIBPTHREAD) \ $(LIBRT) \ $(JANSSON_LIBS) @@ -145,10 +139,6 @@ TEST_EXTENSIONS = .t T_LOG_DRIVER = env AM_TAP_AWK='$(AWK)' $(SHELL) \ $(top_srcdir)/config/tap-driver.sh -test_ev_t_SOURCES = test/ev.c -test_ev_t_CPPFLAGS = $(test_cppflags) -test_ev_t_LDADD = $(test_ldadd) - test_sha1_t_SOURCES = test/sha1.c test_sha1_t_CPPFLAGS = $(test_cppflags) test_sha1_t_LDADD = $(test_ldadd) diff --git a/src/common/libzmqutil/Makefile.am b/src/common/libzmqutil/Makefile.am new file mode 100644 index 000000000000..8155114fff0a --- /dev/null +++ b/src/common/libzmqutil/Makefile.am @@ -0,0 +1,64 @@ +AM_CFLAGS = \ + $(WARNING_CFLAGS) \ + $(CODE_COVERAGE_CFLAGS) + +AM_LDFLAGS = \ + $(CODE_COVERAGE_LDFLAGS) + +AM_CPPFLAGS = \ + -I$(top_srcdir) \ + -I$(top_srcdir)/src/include \ + -I$(top_srcdir)/src/common/libccan \ + -I$(top_builddir)/src/common/libflux \ + $(ZMQ_CFLAGS) + +noinst_LTLIBRARIES = \ + libzmqutil.la + +libzmqutil_la_SOURCES = \ + msg_zsock.h \ + msg_zsock.c \ + reactor.h \ + reactor.c \ + ev_zmq.h \ + ev_zmq.c + +TESTS = test_msg_zsock.t \ + test_reactor.t \ + test_ev.t + +check_PROGRAMS = \ + $(TESTS) + +TEST_EXTENSIONS = .t +T_LOG_DRIVER = env AM_TAP_AWK='$(AWK)' $(SHELL) \ + $(top_srcdir)/config/tap-driver.sh + +test_ldadd = \ + $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libzmqutil/libzmqutil.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libtap/libtap.la \ + $(ZMQ_LIBS) + +test_cppflags = \ + -I$(top_srcdir)/src/common/libtap \ + $(AM_CPPFLAGS) + +test_ldflags = \ + -no-install + +test_msg_zsock_t_SOURCES = test/msg_zsock.c +test_msg_zsock_t_CPPFLAGS = $(test_cppflags) +test_msg_zsock_t_LDADD = $(test_ldadd) +test_msg_zsock_t_LDFLAGS = $(test_ldflags) + +test_reactor_t_SOURCES = test/reactor.c +test_reactor_t_CPPFLAGS = $(test_cppflags) +test_reactor_t_LDADD = $(test_ldadd) +test_reactor_t_LDFLAGS = $(test_ldflags) + +test_ev_t_SOURCES = test/ev.c +test_ev_t_CPPFLAGS = $(test_cppflags) +test_ev_t_LDADD = $(test_ldadd) +test_ev_t_LDFLAGS = $(test_ldflags) diff --git a/src/common/libutil/ev_zmq.c b/src/common/libzmqutil/ev_zmq.c similarity index 98% rename from src/common/libutil/ev_zmq.c rename to src/common/libzmqutil/ev_zmq.c index c6e6f20bcdbf..89a4bbf43649 100644 --- a/src/common/libutil/ev_zmq.c +++ b/src/common/libzmqutil/ev_zmq.c @@ -36,7 +36,7 @@ #include #include "src/common/libev/ev.h" -#include "src/common/libutil/ev_zmq.h" +#include "ev_zmq.h" static void prepare_cb (struct ev_loop *loop, ev_prepare *w, int revents) { diff --git a/src/common/libutil/ev_zmq.h b/src/common/libzmqutil/ev_zmq.h similarity index 98% rename from src/common/libutil/ev_zmq.h rename to src/common/libzmqutil/ev_zmq.h index 0f288739afe1..8c628695d0ca 100644 --- a/src/common/libutil/ev_zmq.h +++ b/src/common/libzmqutil/ev_zmq.h @@ -11,6 +11,8 @@ #ifndef _EV_ZMQ_H #define _EV_ZMQ_H +#include "src/common/libev/ev.h" + typedef struct ev_zmq_struct ev_zmq; typedef void (*ev_zmq_cb)(struct ev_loop *loop, ev_zmq *w, int revents); diff --git a/src/common/libzmqutil/msg_zsock.c b/src/common/libzmqutil/msg_zsock.c new file mode 100644 index 000000000000..0e542cc1b666 --- /dev/null +++ b/src/common/libzmqutil/msg_zsock.c @@ -0,0 +1,141 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include + +#include "src/common/libflux/message.h" +#include "src/common/libflux/message_iovec.h" +#include "src/common/libflux/message_proto.h" +#include "src/common/libutil/errno_safe.h" + +#include "msg_zsock.h" + +int zmqutil_msg_send_ex (void *sock, const flux_msg_t *msg, bool nonblock) +{ + void *handle; + int flags = ZMQ_SNDMORE; + struct msg_iovec *iov = NULL; + int iovcnt; + uint8_t proto[PROTO_SIZE]; + int count = 0; + int rc = -1; + + if (!sock || !msg) { + errno = EINVAL; + return -1; + } + + if (msg_to_iovec (msg, proto, PROTO_SIZE, &iov, &iovcnt) < 0) + goto error; + + if (nonblock) + flags |= ZMQ_DONTWAIT; + + handle = zsock_resolve (sock); + while (count < iovcnt) { + if ((count + 1) == iovcnt) + flags &= ~ZMQ_SNDMORE; + if (zmq_send (handle, + iov[count].data, + iov[count].size, + flags) < 0) + goto error; + count++; + } + rc = 0; +error: + ERRNO_SAFE_WRAP (free, iov); + return rc; +} + +int zmqutil_msg_send (void *sock, const flux_msg_t *msg) +{ + return zmqutil_msg_send_ex (sock, msg, false); +} + +flux_msg_t *zmqutil_msg_recv (void *sock) +{ + void *handle; + struct msg_iovec *iov = NULL; + int iovlen = 0; + int iovcnt = 0; + flux_msg_t *msg; + flux_msg_t *rv = NULL; + + if (!sock) { + errno = EINVAL; + return NULL; + } + + /* N.B. we need to store a zmq_msg_t for each iovec entry so that + * the memory is available during the call to iovec_to_msg(). We + * use the msg_iovec's "transport_data" field to store the entry + * and then clear/free it later. + */ + handle = zsock_resolve (sock); + while (true) { + zmq_msg_t *msgdata; + if (iovlen <= iovcnt) { + struct msg_iovec *tmp; + iovlen += IOVECINCR; + if (!(tmp = realloc (iov, sizeof (*iov) * iovlen))) + goto error; + iov = tmp; + } + if (!(msgdata = malloc (sizeof (zmq_msg_t)))) + goto error; + zmq_msg_init (msgdata); + if (zmq_recvmsg (handle, msgdata, 0) < 0) { + int save_errno = errno; + zmq_msg_close (msgdata); + free (msgdata); + errno = save_errno; + goto error; + } + iov[iovcnt].transport_data = msgdata; + iov[iovcnt].data = zmq_msg_data (msgdata); + iov[iovcnt].size = zmq_msg_size (msgdata); + iovcnt++; + if (!zsock_rcvmore (handle)) + break; + } + + if (!(msg = flux_msg_create (FLUX_MSGTYPE_ANY))) + goto error; + if (iovec_to_msg (msg, iov, iovcnt) < 0) + goto error; + rv = msg; +error: + if (iov) { + int save_errno = errno; + int i; + for (i = 0; i < iovcnt; i++) { + zmq_msg_t *msgdata = iov[i].transport_data; + zmq_msg_close (msgdata); + free (msgdata); + } + free (iov); + errno = save_errno; + } + return rv; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libzmqutil/msg_zsock.h b/src/common/libzmqutil/msg_zsock.h new file mode 100644 index 000000000000..304d236affac --- /dev/null +++ b/src/common/libzmqutil/msg_zsock.h @@ -0,0 +1,43 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _ZMQUTIL_MSG_ZSOCK_H +#define _ZMQUTIL_MSG_ZSOCK_H + +#include +#include + +#include "src/common/libflux/message.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* Send message to zeromq socket. + * Returns 0 on success, -1 on failure with errno set. + */ +int zmqutil_msg_send (void *dest, const flux_msg_t *msg); +int zmqutil_msg_send_ex (void *dest, const flux_msg_t *msg, bool nonblock); + +/* Receive a message from zeromq socket. + * Returns message on success, NULL on failure with errno set. + */ +flux_msg_t *zmqutil_msg_recv (void *dest); + +#ifdef __cplusplus +} +#endif + +#endif /* !_ZMQUTIL_MSG_ZSOCK_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libzmqutil/reactor.c b/src/common/libzmqutil/reactor.c new file mode 100644 index 000000000000..64a66874a36b --- /dev/null +++ b/src/common/libzmqutil/reactor.c @@ -0,0 +1,80 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include + +#include "src/common/libev/ev.h" +#include "src/common/libflux/reactor_private.h" + +#include "ev_zmq.h" + +/* 0MQ sockets + */ + +static void zmq_start (flux_watcher_t *w) +{ + ev_zmq_start (w->r->loop, (ev_zmq *)w->data); +} + +static void zmq_stop (flux_watcher_t *w) +{ + ev_zmq_stop (w->r->loop, (ev_zmq *)w->data); +} + +static void zmq_cb (struct ev_loop *loop, ev_zmq *pw, int revents) +{ + struct flux_watcher *w = pw->data; + if (w->fn) + w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); +} + +static struct flux_watcher_ops zmq_watcher = { + .start = zmq_start, + .stop = zmq_stop, + .destroy = NULL, +}; + +flux_watcher_t *zmqutil_watcher_create (flux_reactor_t *r, + void *zsock, int events, + flux_watcher_f cb, void *arg) +{ + ev_zmq *zw; + flux_watcher_t *w; + + if (!(w = flux_watcher_create (r, sizeof (*zw), &zmq_watcher, cb, arg))) + return NULL; + zw = flux_watcher_get_data (w); + ev_zmq_init (zw, zmq_cb, zsock, events_to_libev (events) & ~EV_ERROR); + zw->data = w; + + return w; +} + +void *zmqutil_watcher_get_zsock (flux_watcher_t *w) +{ + if (flux_watcher_get_ops (w) != &zmq_watcher) { + errno = EINVAL; + return NULL; + } + ev_zmq *zw = w->data; + return zw->zsock; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libzmqutil/reactor.h b/src/common/libzmqutil/reactor.h new file mode 100644 index 000000000000..18f830a848ee --- /dev/null +++ b/src/common/libzmqutil/reactor.h @@ -0,0 +1,40 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _ZMQUTIL_REACTOR_H +#define _ZMQUTIL_REACTOR_H + +#include +#include + +#include "src/common/libflux/reactor.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* zmq socket + */ + +flux_watcher_t *zmqutil_watcher_create (flux_reactor_t *r, + void *zsock, int events, + flux_watcher_f cb, void *arg); +void *zmqutil_watcher_get_zsock (flux_watcher_t *w); + +#ifdef __cplusplus +} +#endif + +#endif /* !_ZMQUTIL_REACTOR_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libutil/test/ev.c b/src/common/libzmqutil/test/ev.c similarity index 99% rename from src/common/libutil/test/ev.c rename to src/common/libzmqutil/test/ev.c index f28da6e7ae4c..104a68accd08 100644 --- a/src/common/libutil/test/ev.c +++ b/src/common/libzmqutil/test/ev.c @@ -14,10 +14,9 @@ #include #include +#include "src/common/libzmqutil/ev_zmq.h" #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libutil/oom.h" -#include "src/common/libev/ev.h" -#include "src/common/libutil/ev_zmq.h" #include "src/common/libtap/tap.h" void timer_arg_cb (struct ev_loop *loop, ev_timer *w, int revents) diff --git a/src/common/libzmqutil/test/msg_zsock.c b/src/common/libzmqutil/test/msg_zsock.c new file mode 100644 index 000000000000..4ab8f56aaa2e --- /dev/null +++ b/src/common/libzmqutil/test/msg_zsock.c @@ -0,0 +1,111 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include + +#include "src/common/libflux/message.h" +#include "src/common/libzmqutil/msg_zsock.h" +#include "src/common/libtap/tap.h" + +void check_sendzsock (void) +{ + zsock_t *zsock[2] = { NULL, NULL }; + flux_msg_t *any, *msg, *msg2; + const char *topic; + int type; + const char *uri = "inproc://test"; + + /* zsys boiler plate: + * appears to be needed to avoid atexit assertions when lives_ok() + * macro (which calls fork()) is used. + */ + zsys_init (); + zsys_set_logstream (stderr); + zsys_set_logident ("test_message.t"); + zsys_handler_set (NULL); + zsys_set_linger (5); // msec + + ok ((zsock[0] = zsock_new_pair (NULL)) != NULL + && zsock_bind (zsock[0], "%s", uri) == 0 + && (zsock[1] = zsock_new_pair (uri)) != NULL, + "got inproc socket pair"); + + if (!(any = flux_msg_create (FLUX_MSGTYPE_ANY))) + BAIL_OUT ("flux_msg_create failed"); + + ok ((msg = flux_msg_create (FLUX_MSGTYPE_REQUEST)) != NULL + && flux_msg_set_topic (msg, "foo.bar") == 0, + "created test message"); + + /* corner case tests */ + ok (zmqutil_msg_send (NULL, msg) < 0 && errno == EINVAL, + "zmqutil_msg_send returns < 0 and EINVAL on dest = NULL"); + ok (zmqutil_msg_send (zsock[1], any) < 0 && errno == EPROTO, + "zmqutil_msg_send returns < 0 and EPROTO on msg w/ type = ANY"); + ok (zmqutil_msg_send_ex (NULL, msg, true) < 0 && errno == EINVAL, + "zmqutil_msg_send_ex returns < 0 and EINVAL on dest = NULL"); + ok (zmqutil_msg_send_ex (zsock[1], any, true) < 0 && errno == EPROTO, + "zmqutil_msg_send_ex returns < 0 and EPROTO on msg w/ type = ANY"); + ok (zmqutil_msg_recv (NULL) == NULL && errno == EINVAL, + "zmqutil_msg_recv returns NULL and EINVAL on dest = NULL"); + + ok (zmqutil_msg_send (zsock[1], msg) == 0, + "zmqutil_msg_send works"); + ok ((msg2 = zmqutil_msg_recv (zsock[0])) != NULL, + "zmqutil_msg_recv works"); + ok (flux_msg_get_type (msg2, &type) == 0 && type == FLUX_MSGTYPE_REQUEST + && flux_msg_get_topic (msg2, &topic) == 0 + && !strcmp (topic, "foo.bar") + && flux_msg_has_payload (msg2) == false, + "decoded message looks like what was sent"); + flux_msg_destroy (msg2); + + /* Send it again. + */ + ok (zmqutil_msg_send (zsock[1], msg) == 0, + "try2: zmqutil_msg_send works"); + ok ((msg2 = zmqutil_msg_recv (zsock[0])) != NULL, + "try2: zmqutil_msg_recv works"); + ok (flux_msg_get_type (msg2, &type) == 0 && type == FLUX_MSGTYPE_REQUEST + && flux_msg_get_topic (msg2, &topic) == 0 + && !strcmp (topic, "foo.bar") + && flux_msg_has_payload (msg2) == false, + "try2: decoded message looks like what was sent"); + flux_msg_destroy (msg2); + flux_msg_destroy (msg); + + zsock_destroy (&zsock[0]); + zsock_destroy (&zsock[1]); + + /* zsys boiler plate - see note above + */ + zsys_shutdown(); +} + +int main (int argc, char *argv[]) +{ + plan (NO_PLAN); + + check_sendzsock (); + + done_testing(); + return (0); +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libzmqutil/test/reactor.c b/src/common/libzmqutil/test/reactor.c new file mode 100644 index 000000000000..0a32d2216309 --- /dev/null +++ b/src/common/libzmqutil/test/reactor.c @@ -0,0 +1,137 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#include +#include +#include +#include +#include + +#include "src/common/libflux/reactor.h" +#include "src/common/libtap/tap.h" + +#include "src/common/libzmqutil/reactor.h" + +static const size_t zmqwriter_msgcount = 1024; + +static void zmqwriter (flux_reactor_t *r, flux_watcher_t *w, + int revents, void *arg) +{ + void *sock = zmqutil_watcher_get_zsock (w); + static int count = 0; + if (revents & FLUX_POLLERR) { + fprintf (stderr, "%s: FLUX_POLLERR is set\n", __FUNCTION__); + goto error; + } + if (revents & FLUX_POLLOUT) { + uint8_t blob[64]; + zmsg_t *zmsg = zmsg_new (); + if (!zmsg || zmsg_addmem (zmsg, blob, sizeof (blob)) < 0) { + fprintf (stderr, "%s: failed to create message: %s\n", + __FUNCTION__, strerror (errno)); + goto error; + } + if (zmsg_send (&zmsg, sock) < 0) { + fprintf (stderr, "%s: zmsg_send: %s\n", + __FUNCTION__, strerror (errno)); + goto error; + } + count++; + if (count == zmqwriter_msgcount) + flux_watcher_stop (w); + } + return; +error: + flux_reactor_stop_error (r); +} + +static void zmqreader (flux_reactor_t *r, flux_watcher_t *w, + int revents, void *arg) +{ + void *sock = zmqutil_watcher_get_zsock (w); + static int count = 0; + if (revents & FLUX_POLLERR) { + fprintf (stderr, "%s: FLUX_POLLERR is set\n", __FUNCTION__); + goto error; + } + if (revents & FLUX_POLLIN) { + zmsg_t *zmsg = zmsg_recv (sock); + if (!zmsg) { + fprintf (stderr, "%s: zmsg_recv: %s\n", + __FUNCTION__, strerror (errno)); + goto error; + } + zmsg_destroy (&zmsg); + count++; + if (count == zmqwriter_msgcount) + flux_watcher_stop (w); + } + return; +error: + flux_reactor_stop_error (r); +} + +static void test_zmq (flux_reactor_t *reactor) +{ + zsock_t *zs[2]; + flux_watcher_t *r, *w; + const char *uri = "inproc://test_zmq"; + + zsys_set_logstream (stderr); + zsys_handler_set (NULL); + + zs[0] = zsock_new_pair (NULL); + zs[1] = zsock_new_pair (NULL); + ok (zs[0] && zs[1] + && zsock_bind (zs[0], "%s", uri) == 0 + && zsock_connect (zs[1], "%s", uri) == 0, + "zmq: connected ZMQ_PAIR sockets over inproc"); + r = zmqutil_watcher_create (reactor, zs[0], FLUX_POLLIN, zmqreader, NULL); + w = zmqutil_watcher_create (reactor, zs[1], FLUX_POLLOUT, zmqwriter, NULL); + ok (r != NULL && w != NULL, + "zmq: nonblocking reader and writer created"); + flux_watcher_start (r); + flux_watcher_start (w); + ok (flux_reactor_run (reactor, 0) == 0, + "zmq: reactor ran to completion after %d messages", zmqwriter_msgcount); + flux_watcher_stop (r); + flux_watcher_stop (w); + flux_watcher_destroy (r); + flux_watcher_destroy (w); + + zsock_destroy (&zs[0]); + zsock_destroy (&zs[1]); + + zsys_shutdown (); +} + +int main (int argc, char *argv[]) +{ + flux_reactor_t *reactor; + + plan (NO_PLAN); + + ok ((reactor = flux_reactor_create (0)) != NULL, + "created reactor"); + if (!reactor) + BAIL_OUT ("can't continue without reactor"); + + test_zmq (reactor); + + flux_reactor_destroy (reactor); + + done_testing(); + return (0); +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/connectors/shmem/Makefile.am b/src/connectors/shmem/Makefile.am index ed1b80f0fb5e..82ade229b4d6 100644 --- a/src/connectors/shmem/Makefile.am +++ b/src/connectors/shmem/Makefile.am @@ -22,5 +22,6 @@ shmem_la_LDFLAGS = -module $(san_ld_zdef_flag) \ shmem_la_LIBADD = \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libzmqutil/libzmqutil.la \ $(ZMQ_LIBS) diff --git a/src/connectors/shmem/shmem.c b/src/connectors/shmem/shmem.c index fba1a2e38225..810e0d9aef33 100644 --- a/src/connectors/shmem/shmem.c +++ b/src/connectors/shmem/shmem.c @@ -20,6 +20,7 @@ #endif #include +#include "src/common/libzmqutil/msg_zsock.h" #include "src/common/libutil/log.h" #define MODHANDLE_MAGIC 0xfeefbe02 @@ -65,7 +66,7 @@ static int op_send (void *impl, const flux_msg_t *msg, int flags) shmem_ctx_t *ctx = impl; assert (ctx->magic == MODHANDLE_MAGIC); - return flux_msg_sendzsock (ctx->sock, msg); + return zmqutil_msg_send (ctx->sock, msg); } static flux_msg_t *op_recv (void *impl, int flags) @@ -88,7 +89,7 @@ static flux_msg_t *op_recv (void *impl, int flags) goto done; } } - msg = flux_msg_recvzsock (ctx->sock); + msg = zmqutil_msg_recv (ctx->sock); done: return msg; } diff --git a/src/modules/job-manager/wait.c b/src/modules/job-manager/wait.c index 58ea42050829..82d6c8c92c13 100644 --- a/src/modules/job-manager/wait.c +++ b/src/modules/job-manager/wait.c @@ -346,7 +346,7 @@ void wait_ctx_destroy (struct waitjob *wait) /* Iterate through active jobs, sending ENOSYS response to * any pending wait requests, indicating that the module is unloading. - * Use wait->waiters count to avoid unncessary scanning. + * Use wait->waiters count to avoid unnecessary scanning. */ job = zhashx_first (wait->ctx->active_jobs); while (job && wait->waiters > 0) { diff --git a/t/issues/t3617-flux-shell-depends-zmq.sh b/t/issues/t3617-flux-shell-depends-zmq.sh new file mode 100755 index 000000000000..68001a1801a3 --- /dev/null +++ b/t/issues/t3617-flux-shell-depends-zmq.sh @@ -0,0 +1,9 @@ +#!/bin/sh -e +# flux-shell does not link to libzmq + +FLUX_SHELL="${FLUX_BUILD_DIR}/src/shell/.libs/lt-flux-shell" +if libtool --mode=execute ldd ${FLUX_SHELL} | grep -q zmq +then + exit 1 +fi +exit 0