Skip to content

Commit

Permalink
libflux: move zmq reactor calls into libzmqutil
Browse files Browse the repository at this point in the history
Problem: We woud like to remove zmq dependent functions out of libflux so
that libraries that are dependent on libzmq will not be loaded and slow
down flux-core.

Solution: Move flux_zmq_watcher_create() and flux_zmq_watcher_get_zsock()
into libzmqutil.  Rename functions to zmqutil_wacher_create() and
zmqutil_watcher_get_zsock().  Update callers accordingly.

As a result of this change, libflux is no longer dependent on libzmq
and the library dependency can be removed.  Add dependency to any libraries/binaries
previously dependent on libflux's dependency on libzmq.

Fixes #3617
  • Loading branch information
chu11 committed Jul 25, 2021
1 parent fa01ced commit c9096f7
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 184 deletions.
5 changes: 3 additions & 2 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#endif

#include "src/common/libzmqutil/msg_zsock.h"
#include "src/common/libzmqutil/reactor.h"
#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/iterators.h"
Expand Down Expand Up @@ -613,13 +614,13 @@ module_t *module_add (modhash_t *mh, const char *path)
log_err ("zsock_bind inproc://%s", module_get_uuid (p));
goto cleanup;
}
if (!(p->broker_w = flux_zmq_watcher_create (
if (!(p->broker_w = zmqutil_watcher_create (
flux_get_reactor (p->modhash->broker_h),
p->sock,
FLUX_POLLIN,
module_cb,
p))) {
log_err ("flux_zmq_watcher_create");
log_err ("zmqutil_watcher_create");
goto cleanup;
}
/* Set creds for connection.
Expand Down
31 changes: 16 additions & 15 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <uuid.h>

#include "src/common/libzmqutil/msg_zsock.h"
#include "src/common/libzmqutil/reactor.h"
#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/kary.h"
Expand Down Expand Up @@ -767,11 +768,11 @@ static int overlay_zap_init (struct overlay *ov)
log_err ("could not bind to %s", ZAP_ENDPOINT);
return -1;
}
if (!(ov->zap_w = flux_zmq_watcher_create (ov->reactor,
ov->zap,
FLUX_POLLIN,
overlay_zap_cb,
ov)))
if (!(ov->zap_w = zmqutil_watcher_create (ov->reactor,
ov->zap,
FLUX_POLLIN,
overlay_zap_cb,
ov)))
return -1;
flux_watcher_start (ov->zap_w);
return 0;
Expand Down Expand Up @@ -940,11 +941,11 @@ int overlay_connect (struct overlay *ov)
zsock_set_identity (ov->parent.zsock, ov->uuid);
if (zsock_connect (ov->parent.zsock, "%s", ov->parent.uri) < 0)
goto nomem;
if (!(ov->parent.w = flux_zmq_watcher_create (ov->reactor,
ov->parent.zsock,
FLUX_POLLIN,
parent_cb,
ov)))
if (!(ov->parent.w = zmqutil_watcher_create (ov->reactor,
ov->parent.zsock,
FLUX_POLLIN,
parent_cb,
ov)))
return -1;
flux_watcher_start (ov->parent.w);
if (hello_request_send (ov, ov->rank, FLUX_CORE_VERSION_HEX) < 0)
Expand Down Expand Up @@ -979,11 +980,11 @@ int overlay_bind (struct overlay *ov, const char *uri)
*/
if (!(ov->bind_uri = zsock_last_endpoint (ov->bind_zsock)))
return -1;
if (!(ov->bind_w = flux_zmq_watcher_create (ov->reactor,
ov->bind_zsock,
FLUX_POLLIN,
child_cb,
ov)))
if (!(ov->bind_w = zmqutil_watcher_create (ov->reactor,
ov->bind_zsock,
FLUX_POLLIN,
child_cb,
ov)))
return -1;
flux_watcher_start (ov->bind_w);
/* Ensure that ipc files are removed when the broker exits.
Expand Down
4 changes: 1 addition & 3 deletions src/common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ libflux_core_la_LIBADD = \
$(builddir)/libkvs/libkvs.la \
$(builddir)/libjob/libjob.la \
$(builddir)/libsubprocess/libsubprocess.la \
$(builddir)/libzmqutil/libzmqutil.la \
libflux-internal.la \
$(ZMQ_LIBS)
libflux-internal.la
libflux_core_la_LDFLAGS = \
-Wl,--version-script=$(srcdir)/libflux-core.map \
-version-info @LIBFLUX_CORE_VERSION_INFO@ \
Expand Down
3 changes: 1 addition & 2 deletions src/common/libflux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ AM_CPPFLAGS = \
-I$(top_srcdir)/src/common/libccan \
-I$(top_builddir) \
-I$(top_builddir)/src/common/libflux \
$(JANSSON_CFLAGS) \
$(ZMQ_CFLAGS)
$(JANSSON_CFLAGS)

installed_conf_cppflags = \
-DINSTALLED_MODULE_PATH=\"$(fluxmoddir)\" \
Expand Down
55 changes: 1 addition & 54 deletions src/common/libflux/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <string.h>
#include <errno.h>
#include <stdbool.h>
#include <czmq.h>
#include <fcntl.h>

#include "handle.h"
#include "reactor.h"
Expand All @@ -26,7 +26,6 @@
#include "buffer_private.h"

#include "src/common/libev/ev.h"
#include "src/common/libzmqutil/ev_zmq.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/fdutils.h"

Expand Down Expand Up @@ -547,58 +546,6 @@ int flux_buffer_write_watcher_is_closed (flux_watcher_t *w, int *errp)
return (0);
}

/* 0MQ sockets
*/

static void zmq_start (flux_watcher_t *w)
{
ev_zmq_start (w->r->loop, (ev_zmq *)w->data);
}

static void zmq_stop (flux_watcher_t *w)
{
ev_zmq_stop (w->r->loop, (ev_zmq *)w->data);
}

static void zmq_cb (struct ev_loop *loop, ev_zmq *pw, int revents)
{
struct flux_watcher *w = pw->data;
if (w->fn)
w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg);
}

static struct flux_watcher_ops zmq_watcher = {
.start = zmq_start,
.stop = zmq_stop,
.destroy = NULL,
};

flux_watcher_t *flux_zmq_watcher_create (flux_reactor_t *r,
void *zsock, int events,
flux_watcher_f cb, void *arg)
{
ev_zmq *zw;
flux_watcher_t *w;

if (!(w = flux_watcher_create (r, sizeof (*zw), &zmq_watcher, cb, arg)))
return NULL;
zw = flux_watcher_get_data (w);
ev_zmq_init (zw, zmq_cb, zsock, events_to_libev (events) & ~EV_ERROR);
zw->data = w;

return w;
}

void *flux_zmq_watcher_get_zsock (flux_watcher_t *w)
{
if (flux_watcher_get_ops (w) != &zmq_watcher) {
errno = EINVAL;
return NULL;
}
ev_zmq *zw = w->data;
return zw->zsock;
}

/* Timer
*/

Expand Down
8 changes: 0 additions & 8 deletions src/common/libflux/reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,6 @@ int flux_buffer_write_watcher_close (flux_watcher_t *w);
/* Returns 1 if write watcher is closed, errnum from close in close_err */
int flux_buffer_write_watcher_is_closed (flux_watcher_t *w, int *close_err);

/* zmq socket
*/

flux_watcher_t *flux_zmq_watcher_create (flux_reactor_t *r,
void *zsock, int events,
flux_watcher_f cb, void *arg);
void *flux_zmq_watcher_get_zsock (flux_watcher_t *w);

/* timer
*/

Expand Down
97 changes: 2 additions & 95 deletions src/common/libflux/test/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,109 +9,17 @@
\************************************************************/

#include <errno.h>
#include <czmq.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <time.h>
#include <math.h>

#include "src/common/libflux/reactor.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/fdutils.h"
#include "src/common/libtap/tap.h"

static const size_t zmqwriter_msgcount = 1024;

static void zmqwriter (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
void *sock = flux_zmq_watcher_get_zsock (w);
static int count = 0;
if (revents & FLUX_POLLERR) {
fprintf (stderr, "%s: FLUX_POLLERR is set\n", __FUNCTION__);
goto error;
}
if (revents & FLUX_POLLOUT) {
uint8_t blob[64];
zmsg_t *zmsg = zmsg_new ();
if (!zmsg || zmsg_addmem (zmsg, blob, sizeof (blob)) < 0) {
fprintf (stderr, "%s: failed to create message: %s\n",
__FUNCTION__, strerror (errno));
goto error;
}
if (zmsg_send (&zmsg, sock) < 0) {
fprintf (stderr, "%s: zmsg_send: %s\n",
__FUNCTION__, strerror (errno));
goto error;
}
count++;
if (count == zmqwriter_msgcount)
flux_watcher_stop (w);
}
return;
error:
flux_reactor_stop_error (r);
}

static void zmqreader (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
void *sock = flux_zmq_watcher_get_zsock (w);
static int count = 0;
if (revents & FLUX_POLLERR) {
fprintf (stderr, "%s: FLUX_POLLERR is set\n", __FUNCTION__);
goto error;
}
if (revents & FLUX_POLLIN) {
zmsg_t *zmsg = zmsg_recv (sock);
if (!zmsg) {
fprintf (stderr, "%s: zmsg_recv: %s\n",
__FUNCTION__, strerror (errno));
goto error;
}
zmsg_destroy (&zmsg);
count++;
if (count == zmqwriter_msgcount)
flux_watcher_stop (w);
}
return;
error:
flux_reactor_stop_error (r);
}

static void test_zmq (flux_reactor_t *reactor)
{
zsock_t *zs[2];
flux_watcher_t *r, *w;
const char *uri = "inproc://test_zmq";

zsys_set_logstream (stderr);
zsys_handler_set (NULL);

zs[0] = zsock_new_pair (NULL);
zs[1] = zsock_new_pair (NULL);
ok (zs[0] && zs[1]
&& zsock_bind (zs[0], "%s", uri) == 0
&& zsock_connect (zs[1], "%s", uri) == 0,
"zmq: connected ZMQ_PAIR sockets over inproc");
r = flux_zmq_watcher_create (reactor, zs[0], FLUX_POLLIN, zmqreader, NULL);
w = flux_zmq_watcher_create (reactor, zs[1], FLUX_POLLOUT, zmqwriter, NULL);
ok (r != NULL && w != NULL,
"zmq: nonblocking reader and writer created");
flux_watcher_start (r);
flux_watcher_start (w);
ok (flux_reactor_run (reactor, 0) == 0,
"zmq: reactor ran to completion after %d messages", zmqwriter_msgcount);
flux_watcher_stop (r);
flux_watcher_stop (w);
flux_watcher_destroy (r);
flux_watcher_destroy (w);

zsock_destroy (&zs[0]);
zsock_destroy (&zs[1]);

zsys_shutdown ();
}

static const size_t fdwriter_bufsize = 10*1024*1024;

static void fdwriter (flux_reactor_t *r, flux_watcher_t *w,
Expand Down Expand Up @@ -1461,7 +1369,6 @@ int main (int argc, char *argv[])
test_fd (reactor);
test_buffer (reactor);
test_buffer_corner_case (reactor);
test_zmq (reactor);
test_idle (reactor);
test_prepcheck (reactor);
test_signal (reactor);
Expand Down
1 change: 0 additions & 1 deletion src/common/libjob/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ T_LOG_DRIVER = env AM_TAP_AWK='$(AWK)' $(SHELL) \
test_ldadd = \
$(top_builddir)/src/common/libjob/libjob.la \
$(top_builddir)/src/common/libflux/libflux.la \
$(top_builddir)/src/common/libzmqutil/libzmqutil.la \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libtap/libtap.la \
$(JANSSON_LIBS) \
Expand Down
1 change: 0 additions & 1 deletion src/common/libkvs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ T_LOG_DRIVER = env AM_TAP_AWK='$(AWK)' $(SHELL) \
test_ldadd = \
$(top_builddir)/src/common/libkvs/libkvs.la \
$(top_builddir)/src/common/libflux/libflux.la \
$(top_builddir)/src/common/libzmqutil/libzmqutil.la \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libtap/libtap.la \
$(JANSSON_LIBS) \
Expand Down
5 changes: 3 additions & 2 deletions src/common/librouter/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ test_ldadd = \
$(builddir)/libtestutil.la \
$(top_builddir)/src/common/librouter/librouter.la \
$(top_builddir)/src/common/libtestutil/libtestutil.la \
$(top_builddir)/src/common/libzmqutil/libzmqutil.la \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la \
$(top_builddir)/src/common/libtap/libtap.la
$(top_builddir)/src/common/libtap/libtap.la \
$(top_builddir)/src/common/libzmqutil/libzmqutil.la \
$(ZMQ_LIBS)

test_cppflags = \
$(AM_CPPFLAGS) \
Expand Down
3 changes: 2 additions & 1 deletion src/common/libterminus/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ test_ldadd = \
$(top_builddir)/src/common/libzmqutil/libzmqutil.la \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la \
$(top_builddir)/src/common/libtap/libtap.la
$(top_builddir)/src/common/libtap/libtap.la \
$(ZMQ_LIBS)

test_ldflags = \
-no-install
Expand Down
8 changes: 8 additions & 0 deletions src/common/libzmqutil/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ noinst_LTLIBRARIES = \
libzmqutil_la_SOURCES = \
msg_zsock.h \
msg_zsock.c \
reactor.h \
reactor.c \
ev_zmq.h \
ev_zmq.c

TESTS = test_msg_zsock.t \
test_reactor.t \
test_ev.t

check_PROGRAMS = \
Expand Down Expand Up @@ -50,6 +53,11 @@ test_msg_zsock_t_CPPFLAGS = $(test_cppflags)
test_msg_zsock_t_LDADD = $(test_ldadd)
test_msg_zsock_t_LDFLAGS = $(test_ldflags)

test_reactor_t_SOURCES = test/reactor.c
test_reactor_t_CPPFLAGS = $(test_cppflags)
test_reactor_t_LDADD = $(test_ldadd)
test_reactor_t_LDFLAGS = $(test_ldflags)

test_ev_t_SOURCES = test/ev.c
test_ev_t_CPPFLAGS = $(test_cppflags)
test_ev_t_LDADD = $(test_ldadd)
Expand Down
Loading

0 comments on commit c9096f7

Please sign in to comment.