diff --git a/src/broker/overlay.c b/src/broker/overlay.c index cbdb47e103b4..f8f530466434 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -24,7 +24,7 @@ #include "src/common/libzmqutil/msg_zsock.h" #include "src/common/libzmqutil/sockopt.h" -#include "src/common/libzmqutil/reactor.h" +#include "src/common/libzmqutil/zwatcher.h" #include "src/common/libzmqutil/zap.h" #include "src/common/libzmqutil/cert.h" #include "src/common/libzmqutil/monitor.h" diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index 9b273a78696e..c4fa389f6d26 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -181,11 +181,13 @@ flux_job_LDADD = \ $(top_builddir)/src/common/libczmqcontainers/libczmqcontainers.la \ $(top_builddir)/src/common/libdebugged/libdebugged.la \ $(top_builddir)/src/common/libterminus/libterminus.la \ - $(fluxcmd_ldadd) + $(fluxcmd_ldadd) \ + $(top_builddir)/src/common/libflux/libflux.la flux_exec_LDADD = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ - $(fluxcmd_ldadd) + $(fluxcmd_ldadd) \ + $(top_builddir)/src/common/libflux/libflux.la flux_terminus_LDADD = \ $(top_builddir)/src/common/libterminus/libterminus.la \ diff --git a/src/common/libflux/Makefile.am b/src/common/libflux/Makefile.am index 8d173f521382..f48e7c38620e 100644 --- a/src/common/libflux/Makefile.am +++ b/src/common/libflux/Makefile.am @@ -25,6 +25,7 @@ fluxcoreinclude_HEADERS = \ handle.h \ connector.h \ reactor.h \ + watcher.h \ msg_handler.h \ message.h \ msglist.h \ @@ -62,6 +63,10 @@ libflux_la_SOURCES = \ connector_local.c \ reactor.c \ reactor_private.h \ + watcher.c \ + watcher_private.h \ + watcher_wrap.c \ + hwatcher.c \ msg_handler.c \ message.c \ message_private.h \ @@ -79,8 +84,6 @@ libflux_la_SOURCES = \ module.c \ conf_private.h \ conf.c \ - ev_flux.h \ - ev_flux.c \ control.c \ future.c \ composite_future.c \ diff --git a/src/common/libflux/ev_flux.c b/src/common/libflux/ev_flux.c deleted file mode 100644 index 4c939d7aa3e2..000000000000 --- a/src/common/libflux/ev_flux.c +++ /dev/null @@ -1,107 +0,0 @@ -/************************************************************\ - * Copyright 2014 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -#if HAVE_CONFIG_H -#include "config.h" -#endif -#include -#include -#include - -#include "src/common/libev/ev.h" - -#include "ev_flux.h" - - -/* Get flux poll events, converted to libev - */ -static int get_pollevents (flux_t *h) -{ - int e = flux_pollevents (h); - int events = 0; - if (e < 0 || (e & FLUX_POLLERR)) - events |= EV_ERROR; - if ((e & FLUX_POLLIN)) - events |= EV_READ; - if ((e & FLUX_POLLOUT)) - events |= EV_WRITE; - return events; -} - -static void prepare_cb (struct ev_loop *loop, ev_prepare *w, int revents) -{ - struct ev_flux *fw = (struct ev_flux *)((char *)w - - offsetof (struct ev_flux, prepare_w)); - int events = get_pollevents (fw->h); - - if ((events & fw->events) || (events & EV_ERROR)) - ev_idle_start (loop, &fw->idle_w); - else - ev_io_start (loop, &fw->io_w); -} - -static void check_cb (struct ev_loop *loop, ev_check *w, int revents) -{ - struct ev_flux *fw = (struct ev_flux *)((char *)w - - offsetof (struct ev_flux, check_w)); - - if (ev_is_pending (&fw->io_w) - && ev_clear_pending (loop, &fw->io_w) & EV_ERROR) { - fw->cb (loop, fw, EV_ERROR); - return; - } - int events = get_pollevents (fw->h); - - ev_io_stop (loop, &fw->io_w); - ev_idle_stop (loop, &fw->idle_w); - - if ((events & fw->events) || (events & EV_ERROR)) - fw->cb (loop, fw, events); -} - -int ev_flux_init (struct ev_flux *w, ev_flux_f cb, flux_t *h, int events) -{ - w->cb = cb; - w->h = h; - w->events = events; - if ((w->pollfd = flux_pollfd (h)) < 0) - return -1; - - ev_prepare_init (&w->prepare_w, prepare_cb); - ev_check_init (&w->check_w, check_cb); - ev_idle_init (&w->idle_w, NULL); - ev_io_init (&w->io_w, NULL, w->pollfd, EV_READ); - - return 0; -} - -void ev_flux_start (struct ev_loop *loop, struct ev_flux *w) -{ - ev_prepare_start (loop, &w->prepare_w); - ev_check_start (loop, &w->check_w); -} - -void ev_flux_stop (struct ev_loop *loop, struct ev_flux *w) -{ - ev_prepare_stop (loop, &w->prepare_w); - ev_check_stop (loop, &w->check_w); - ev_io_stop (loop, &w->io_w); - ev_idle_stop (loop, &w->idle_w); -} - -bool ev_flux_is_active (struct ev_flux *w) -{ - return ev_is_active (&w->prepare_w); -} - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ - diff --git a/src/common/libflux/ev_flux.h b/src/common/libflux/ev_flux.h deleted file mode 100644 index 2fb7a269fcd8..000000000000 --- a/src/common/libflux/ev_flux.h +++ /dev/null @@ -1,37 +0,0 @@ -/************************************************************\ - * Copyright 2014 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -#ifndef _EV_FLUX_H -#define _EV_FLUX_H - -#include "src/common/libev/ev.h" - -struct ev_flux; - -typedef void (*ev_flux_f)(struct ev_loop *loop, struct ev_flux *w, int revents); - -struct ev_flux { - ev_io io_w; - ev_prepare prepare_w; - ev_idle idle_w; - ev_check check_w; - flux_t *h; - int pollfd; - int events; - ev_flux_f cb; - void *data; -}; - -int ev_flux_init (struct ev_flux *w, ev_flux_f cb, flux_t *h, int events); -void ev_flux_start (struct ev_loop *loop, struct ev_flux *w); -void ev_flux_stop (struct ev_loop *loop, struct ev_flux *w); -bool ev_flux_is_active (struct ev_flux *w); - -#endif /* !_EV_FLUX_H */ diff --git a/src/common/libflux/flux.h b/src/common/libflux/flux.h index eb5ab5d9e8fd..acd2b1a28d55 100644 --- a/src/common/libflux/flux.h +++ b/src/common/libflux/flux.h @@ -15,6 +15,7 @@ #include "message.h" #include "handle.h" #include "reactor.h" +#include "watcher.h" #include "msg_handler.h" #include "connector.h" #include "msglist.h" diff --git a/src/common/libflux/hwatcher.c b/src/common/libflux/hwatcher.c new file mode 100644 index 000000000000..3c662a874896 --- /dev/null +++ b/src/common/libflux/hwatcher.c @@ -0,0 +1,154 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* hwatcher.c - reactor watcher for flux_t handle */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include + +#include "src/common/libutil/errno_safe.h" + +#include "watcher_private.h" + +struct hwatcher { + flux_watcher_t *fd_w; + flux_watcher_t *prepare_w; + flux_watcher_t *idle_w; + flux_watcher_t *check_w; + flux_t *h; + int events; +}; + +static void hwatcher_start (flux_watcher_t *w) +{ + struct hwatcher *hw = watcher_get_data (w); + + flux_watcher_start (hw->prepare_w); + flux_watcher_start (hw->check_w); +} + +static void hwatcher_stop (flux_watcher_t *w) +{ + struct hwatcher *hw = watcher_get_data (w); + + flux_watcher_stop (hw->prepare_w); + flux_watcher_stop (hw->check_w); + flux_watcher_stop (hw->fd_w); + flux_watcher_stop (hw->idle_w); +} + +static bool hwatcher_is_active (flux_watcher_t *w) +{ + struct hwatcher *hw = watcher_get_data (w); + + return flux_watcher_is_active (hw->prepare_w); +} + +static void hwatcher_destroy (flux_watcher_t *w) +{ + struct hwatcher *hw = watcher_get_data (w); + if (hw) { + flux_watcher_destroy (hw->prepare_w); + flux_watcher_destroy (hw->check_w); + flux_watcher_destroy (hw->fd_w); + flux_watcher_destroy (hw->idle_w); + } +} + +static void hwatcher_prepare_cb (flux_reactor_t *r, + flux_watcher_t *prepare_w, + int prepare_revents, + void *arg) +{ + flux_watcher_t *w = arg; + struct hwatcher *hw = watcher_get_data (w); + int hevents; + + if ((hevents = flux_pollevents (hw->h)) < 0) + hevents = FLUX_POLLERR; + + if ((hevents & hw->events)) + flux_watcher_start (hw->idle_w); + else + flux_watcher_start (hw->fd_w); +} + +static void hwatcher_check_cb (flux_reactor_t *r, + flux_watcher_t *check_w, + int check_revents, + void *arg) +{ + flux_watcher_t *w = arg; + struct hwatcher *hw = watcher_get_data (w); + int hevents; + int revents; + + flux_watcher_stop (hw->fd_w); + flux_watcher_stop (hw->idle_w); + + if ((hevents = flux_pollevents (hw->h)) < 0) + hevents = FLUX_POLLERR; + revents = (hevents & hw->events); + + if (revents) + watcher_call (w, revents); +} + +static struct flux_watcher_ops hwatcher_ops = { + .start = hwatcher_start, + .stop = hwatcher_stop, + .is_active = hwatcher_is_active, + .destroy = hwatcher_destroy, +}; + +flux_watcher_t *flux_handle_watcher_create (flux_reactor_t *r, + flux_t *h, + int events, + flux_watcher_f cb, + void *arg) +{ + struct hwatcher *hw; + flux_watcher_t *w; + if (!(w = watcher_create (r, sizeof (*hw), &hwatcher_ops, cb, arg))) + return NULL; + hw = watcher_get_data (w); + hw->events = events | FLUX_POLLERR; + hw->h = h; + + if (!(hw->prepare_w = flux_prepare_watcher_create (r, + hwatcher_prepare_cb, + w)) + || !(hw->check_w = flux_check_watcher_create (r, hwatcher_check_cb, w)) + || !(hw->idle_w = flux_idle_watcher_create (r, NULL, NULL))) + goto error; + + int fd; + if ((fd = flux_pollfd (h)) < 0 + || !(hw->fd_w = flux_fd_watcher_create (r, fd, FLUX_POLLIN, NULL, w))) + goto error; + return w; +error: + ERRNO_SAFE_WRAP (flux_watcher_destroy, w); + return NULL; +} + +flux_t *flux_handle_watcher_get_flux (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &hwatcher_ops) { + errno = EINVAL; + return NULL; + } + struct hwatcher *hw = watcher_get_data (w); + return hw->h; +} + +// vi:ts=4 sw=4 expandtab diff --git a/src/common/libflux/reactor.c b/src/common/libflux/reactor.c index 0c67b2110bab..29f2f11f2ebc 100644 --- a/src/common/libflux/reactor.c +++ b/src/common/libflux/reactor.c @@ -19,12 +19,15 @@ #include #include "src/common/libev/ev.h" -#include "src/common/libutil/log.h" -#include "src/common/libutil/fdutils.h" -#include "ev_flux.h" #include "reactor_private.h" +struct flux_reactor { + struct ev_loop *loop; + int usecount; + unsigned int errflag:1; +}; + static int valid_flags (int flags, int valid) { if ((flags & ~valid)) { @@ -137,691 +140,9 @@ void flux_reactor_active_decref (flux_reactor_t *r) ev_unref (r->loop); } -/** - ** Watchers - **/ - -void flux_watcher_set_priority (flux_watcher_t *w, int priority) -{ - if (w) { - if (w->ops->set_priority) - w->ops->set_priority (w, priority); - } -} - -void flux_watcher_start (flux_watcher_t *w) -{ - if (w) { - if (w->ops->start) - w->ops->start (w); - } -} - -void flux_watcher_stop (flux_watcher_t *w) -{ - if (w) { - if (w->ops->stop) - w->ops->stop (w); - } -} - -void flux_watcher_destroy (flux_watcher_t *w) -{ - if (w) { - if (w->ops->stop) - w->ops->stop (w); - if (w->ops->destroy) - w->ops->destroy (w); - flux_reactor_decref (w->r); - free (w); - } -} - -static void safe_stop_cb (struct ev_loop *loop, ev_prepare *pw, int revents) -{ - flux_watcher_stop ((flux_watcher_t *)pw->data); - ev_prepare_stop (loop, pw); - free (pw); -} - -/* Stop a watcher in the next ev_prepare callback. To be used from periodics - * reschedule callback or other ev callbacks in which it is documented - * unsafe to modify the ev_loop or any watcher. - */ -static void watcher_stop_safe (flux_watcher_t *w) -{ - if (w) { - ev_prepare *pw = calloc (1, sizeof (*pw)); - if (!pw) /* On ENOMEM, we just have to give up */ - return; - ev_prepare_init (pw, safe_stop_cb); - pw->data = w; - ev_prepare_start (w->r->loop, pw); - } -} - -/* This is_active() callback works for "native" libev watchers, where - * w->data points to a struct ev_TYPE. - */ -static bool wrap_ev_active (flux_watcher_t *w) -{ - return ev_is_active (w->data); -} - -/* flux_t handle - */ - -static void handle_start (flux_watcher_t *w) -{ - ev_flux_start (w->r->loop, (struct ev_flux *)w->data); -} - -static void handle_stop (flux_watcher_t *w) -{ - ev_flux_stop (w->r->loop, (struct ev_flux *)w->data); -} - -static bool handle_is_active (flux_watcher_t *w) -{ - return ev_flux_is_active (w->data); -} - -static void handle_cb (struct ev_loop *loop, struct ev_flux *fw, int revents) -{ - struct flux_watcher *w = fw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops handle_watcher = { - .start = handle_start, - .stop = handle_stop, - .is_active = handle_is_active, - .destroy = NULL, -}; - -flux_watcher_t *flux_handle_watcher_create (flux_reactor_t *r, - flux_t *h, - int events, - flux_watcher_f cb, - void *arg) -{ - struct ev_flux *fw; - flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*fw), &handle_watcher, cb, arg))) - return NULL; - fw = watcher_get_data (w); - ev_flux_init (fw, handle_cb, h, events_to_libev (events) & ~EV_ERROR); - fw->data = w; - - return w; -} - -flux_t *flux_handle_watcher_get_flux (flux_watcher_t *w) -{ - assert (watcher_get_ops (w) == &handle_watcher); - struct ev_flux *fw = w->data; - return fw->h; -} - -/* file descriptors - */ - -static void fd_start (flux_watcher_t *w) -{ - ev_io_start (w->r->loop, (ev_io *)w->data); -} - -static void fd_stop (flux_watcher_t *w) -{ - ev_io_stop (w->r->loop, (ev_io *)w->data); -} - -static void fd_cb (struct ev_loop *loop, ev_io *iow, int revents) -{ - struct flux_watcher *w = iow->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops fd_watcher = { - .start = fd_start, - .stop = fd_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, - int fd, - int events, - flux_watcher_f cb, - void *arg) -{ - ev_io *iow; - flux_watcher_t *w; - - if (!(w = watcher_create (r, sizeof (*iow), &fd_watcher, cb, arg))) - return NULL; - iow = watcher_get_data (w); - ev_io_init (iow, fd_cb, fd, events_to_libev (events) & ~EV_ERROR); - iow->data = w; - - return w; -} - -int flux_fd_watcher_get_fd (flux_watcher_t *w) -{ - assert (watcher_get_ops (w) == &fd_watcher); - ev_io *iow = w->data; - return iow->fd; -} - -/* Timer - */ - -static void timer_start (flux_watcher_t *w) -{ - ev_timer_start (w->r->loop, (ev_timer *)w->data); -} - -static void timer_stop (flux_watcher_t *w) -{ - ev_timer_stop (w->r->loop, (ev_timer *)w->data); -} - -static void timer_cb (struct ev_loop *loop, ev_timer *tw, int revents) -{ - struct flux_watcher *w = tw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops timer_watcher = { - .start = timer_start, - .stop = timer_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, - double after, - double repeat, - flux_watcher_f cb, - void *arg) -{ - ev_timer *tw; - flux_watcher_t *w; - if (after < 0 || repeat < 0) { - errno = EINVAL; - return NULL; - } - if (!(w = watcher_create (r, sizeof (*tw), &timer_watcher, cb, arg))) - return NULL; - tw = watcher_get_data (w); - ev_timer_init (tw, timer_cb, after, repeat); - tw->data = w; - - return w; -} - -void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) -{ - assert (watcher_get_ops (w) == &timer_watcher); - ev_timer *tw = w->data; - ev_timer_set (tw, after, repeat); -} - -void flux_timer_watcher_again (flux_watcher_t *w) -{ - assert (watcher_get_ops (w) == &timer_watcher); - ev_timer *tw = w->data; - struct ev_loop *loop = w->r->loop; - ev_timer_again (loop, tw); -} - -/* Periodic - */ -struct f_periodic { - struct flux_watcher *w; - ev_periodic evp; - flux_reschedule_f reschedule_cb; -}; - -static void periodic_start (flux_watcher_t *w) +void *reactor_get_loop (flux_reactor_t *r) { - struct f_periodic *fp = w->data; - ev_periodic_start (w->r->loop, &fp->evp); -} - -static void periodic_stop (flux_watcher_t *w) -{ - struct f_periodic *fp = w->data; - ev_periodic_stop (w->r->loop, &fp->evp); -} - -static bool periodic_is_active (flux_watcher_t *w) -{ - struct f_periodic *fp = w->data; - return ev_is_active (&fp->evp); -} - -static void periodic_cb (struct ev_loop *loop, ev_periodic *pw, int revents) -{ - struct f_periodic *fp = pw->data; - struct flux_watcher *w = fp->w; - if (w->fn) - fp->w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static ev_tstamp periodic_reschedule_cb (ev_periodic *pw, ev_tstamp now) -{ - ev_tstamp rc; - struct f_periodic *fp = pw->data; - assert (fp->reschedule_cb != NULL); - rc = (ev_tstamp) fp->reschedule_cb (fp->w, (double) now, fp->w->arg); - if (rc < now) { - /* User reschedule cb returned time in the past. The watcher will - * be stopped, but not here (changing loop is not allowed in a - * libev reschedule cb. flux_watcher_stop_safe() will stop it in - * a prepare callback. - * Return time far in the future to ensure we aren't called again. - */ - watcher_stop_safe (fp->w); - return (now + 1e99); - } - return rc; -} - -static struct flux_watcher_ops periodic_watcher = { - .start = periodic_start, - .stop = periodic_stop, - .destroy = NULL, - .is_active = periodic_is_active, -}; - -flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, - double offset, - double interval, - flux_reschedule_f reschedule_cb, - flux_watcher_f cb, - void *arg) -{ - flux_watcher_t *w; - struct f_periodic *fp; - size_t size = sizeof (*fp); - if (offset < 0 || interval < 0) { - errno = EINVAL; - return NULL; - } - if (!(w = watcher_create (r, size, &periodic_watcher, cb, arg))) - return NULL; - fp = watcher_get_data (w); - fp->evp.data = fp; - fp->w = w; - fp->reschedule_cb = reschedule_cb; - - ev_periodic_init (&fp->evp, - periodic_cb, - offset, - interval, - reschedule_cb ? periodic_reschedule_cb : NULL); - - return w; -} - -void flux_periodic_watcher_reset (flux_watcher_t *w, - double next, - double interval, - flux_reschedule_f reschedule_cb) -{ - struct f_periodic *fp = w->data; - struct ev_loop *loop = w->r->loop; - assert (watcher_get_ops (w) == &periodic_watcher); - fp->reschedule_cb = reschedule_cb; - ev_periodic_set (&fp->evp, - next, - interval, - reschedule_cb ? periodic_reschedule_cb : NULL); - ev_periodic_again (loop, &fp->evp); -} - -double flux_watcher_next_wakeup (flux_watcher_t *w) -{ - if (watcher_get_ops (w) == &periodic_watcher) { - struct f_periodic *fp = w->data; - return ((double) ev_periodic_at (&fp->evp)); - } - else if (watcher_get_ops (w) == &timer_watcher) { - ev_timer *tw = w->data; - struct ev_loop *loop = w->r->loop; - return ((double) (ev_now (loop) + ev_timer_remaining (loop, tw))); - } - errno = EINVAL; - return (-1.); -} - -/* Prepare - */ -static void prepare_start (flux_watcher_t *w) -{ - ev_prepare_start (w->r->loop, (ev_prepare *)w->data); -} - -static void prepare_stop (flux_watcher_t *w) -{ - ev_prepare_stop (w->r->loop, (ev_prepare *)w->data); -} - -static void prepare_cb (struct ev_loop *loop, ev_prepare *pw, int revents) -{ - struct flux_watcher *w = pw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops prepare_watcher = { - .start = prepare_start, - .stop = prepare_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg) -{ - ev_prepare *pw; - flux_watcher_t *w; - - if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher, cb, arg))) - return NULL; - pw = watcher_get_data (w); - ev_prepare_init (pw, prepare_cb); - pw->data = w; - - return w; -} - -/* Check - */ - -static void check_set_priority (flux_watcher_t *w, int priority) -{ - ev_set_priority ((ev_check *)w->data, priority); -} - -static void check_start (flux_watcher_t *w) -{ - ev_check_start (w->r->loop, (ev_check *)w->data); -} - -static void check_stop (flux_watcher_t *w) -{ - ev_check_stop (w->r->loop, (ev_check *)w->data); -} - -static void check_cb (struct ev_loop *loop, ev_check *cw, int revents) -{ - struct flux_watcher *w = cw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops check_watcher = { - .set_priority = check_set_priority, - .start = check_start, - .stop = check_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg) -{ - ev_check *cw; - flux_watcher_t *w; - - if (!(w = watcher_create (r, sizeof (*cw), &check_watcher, cb, arg))) - return NULL; - cw = watcher_get_data (w); - ev_check_init (cw, check_cb); - cw->data = w; - - return w; -} - -/* Idle - */ - -static void idle_start (flux_watcher_t *w) -{ - ev_idle_start (w->r->loop, (ev_idle *)w->data); -} - -static void idle_stop (flux_watcher_t *w) -{ - ev_idle_stop (w->r->loop, (ev_idle *)w->data); -} - -static void idle_cb (struct ev_loop *loop, ev_idle *iw, int revents) -{ - struct flux_watcher *w = iw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops idle_watcher = { - .start = idle_start, - .stop = idle_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg) -{ - ev_idle *iw; - flux_watcher_t *w; - - if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher, cb, arg))) - return NULL; - iw = watcher_get_data (w); - ev_idle_init (iw, idle_cb); - iw->data = w; - - return w; -} - -/* Child - */ - -static void child_start (flux_watcher_t *w) -{ - ev_child_start (w->r->loop, (ev_child *)w->data); -} - -static void child_stop (flux_watcher_t *w) -{ - ev_child_stop (w->r->loop, (ev_child *)w->data); -} - -static void child_cb (struct ev_loop *loop, ev_child *cw, int revents) -{ - struct flux_watcher *w = cw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops child_watcher = { - .start = child_start, - .stop = child_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - - -flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, - int pid, - bool trace, - flux_watcher_f cb, - void *arg) -{ - flux_watcher_t *w; - ev_child *cw; - - if (!ev_is_default_loop (r->loop)) { - errno = EINVAL; - return NULL; - } - if (!(w = watcher_create (r, sizeof (*cw), &child_watcher, cb, arg))) - return NULL; - cw = watcher_get_data (w); - ev_child_init (cw, child_cb, pid, trace ? 1 : 0); - cw->data = w; - - return w; -} - -int flux_child_watcher_get_rpid (flux_watcher_t *w) -{ - if (watcher_get_ops (w) != &child_watcher) { - errno = EINVAL; - return -1; - } - ev_child *cw = w->data; - return cw->rpid; -} - -int flux_child_watcher_get_rstatus (flux_watcher_t *w) -{ - if (watcher_get_ops (w) != &child_watcher) { - errno = EINVAL; - return -1; - } - ev_child *cw = w->data; - return cw->rstatus; -} - -/* Signal - */ - -static void signal_start (flux_watcher_t *w) -{ - ev_signal_start (w->r->loop, (ev_signal *)w->data); -} - -static void signal_stop (flux_watcher_t *w) -{ - ev_signal_stop (w->r->loop, (ev_signal *)w->data); -} - -static void signal_cb (struct ev_loop *loop, ev_signal *sw, int revents) -{ - struct flux_watcher *w = sw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops signal_watcher = { - .start = signal_start, - .stop = signal_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, - int signum, - flux_watcher_f cb, - void *arg) -{ - flux_watcher_t *w; - ev_signal *sw; - - if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher, cb, arg))) - return NULL; - sw = watcher_get_data (w); - ev_signal_init (sw, signal_cb, signum); - sw->data = w; - - return w; -} - -int flux_signal_watcher_get_signum (flux_watcher_t *w) -{ - if (watcher_get_ops (w) != &signal_watcher) { - errno = EINVAL; - return (-1); - } - ev_signal *sw = w->data; - return sw->signum; -} - -/* Stat - */ - -static void stat_start (flux_watcher_t *w) -{ - ev_stat_start (w->r->loop, (ev_stat *)w->data); -} - -static void stat_stop (flux_watcher_t *w) -{ - ev_stat_stop (w->r->loop, (ev_stat *)w->data); -} - -static void stat_cb (struct ev_loop *loop, ev_stat *sw, int revents) -{ - struct flux_watcher *w = sw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops stat_watcher = { - .start = stat_start, - .stop = stat_stop, - .destroy = NULL, - .is_active = wrap_ev_active, -}; - -flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, - const char *path, - double interval, - flux_watcher_f cb, - void *arg) -{ - flux_watcher_t *w; - ev_stat *sw; - - if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher, cb, arg))) - return NULL; - sw = watcher_get_data (w); - ev_stat_init (sw, stat_cb, path, interval); - sw->data = w; - - return w; -} - -void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev) -{ - ev_stat *sw = w->data; - assert (watcher_get_ops (w) == &stat_watcher); - if (stat) - *stat = sw->attr; - if (prev) - *prev = sw->prev; -} - -bool flux_watcher_is_active (flux_watcher_t *w) -{ - if (w) { - if (w->ops->is_active) - return w->ops->is_active (w); - } - return false; + return r ? r->loop : NULL; } /* diff --git a/src/common/libflux/reactor.h b/src/common/libflux/reactor.h index 3c8105cbfde7..bce296edd74e 100644 --- a/src/common/libflux/reactor.h +++ b/src/common/libflux/reactor.h @@ -19,9 +19,6 @@ extern "C" { #endif -/* Reactor - */ - /* Flags for flux_reactor_run() */ enum { @@ -58,128 +55,6 @@ double flux_reactor_time (void); void flux_reactor_active_incref (flux_reactor_t *r); void flux_reactor_active_decref (flux_reactor_t *r); - -/* Watchers - */ - -typedef void (*flux_watcher_f)(flux_reactor_t *r, - flux_watcher_t *w, - int revents, - void *arg); - -/* Set the watcher priority. The range is [-2:2] (default 0). - * Higher priority watchers run first. - * This is a no-op if the underlying watcher doesn't support it. - * If the priority is out of range, the max or min value is set. - * The priority should only be set when the watcher is stopped. - * Currently only the check watcher supports it. - */ -void flux_watcher_set_priority (flux_watcher_t *w, int priority); - -void flux_watcher_start (flux_watcher_t *w); -void flux_watcher_stop (flux_watcher_t *w); -void flux_watcher_destroy (flux_watcher_t *w); -double flux_watcher_next_wakeup (flux_watcher_t *w); -bool flux_watcher_is_active (flux_watcher_t *w); - -/* flux_t handle - */ - -flux_watcher_t *flux_handle_watcher_create (flux_reactor_t *r, - flux_t *h, - int events, - flux_watcher_f cb, - void *arg); -flux_t *flux_handle_watcher_get_flux (flux_watcher_t *w); - -/* file descriptor - */ - -flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, - int fd, int events, - flux_watcher_f cb, - void *arg); -int flux_fd_watcher_get_fd (flux_watcher_t *w); - -/* timer - */ - -flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, - double after, - double repeat, - flux_watcher_f cb, - void *arg); - -void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat); - -void flux_timer_watcher_again (flux_watcher_t *w); - -/* periodic - */ - -typedef double (*flux_reschedule_f) (flux_watcher_t *w, double now, void *arg); - -flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, - double offset, - double interval, - flux_reschedule_f reschedule_cb, - flux_watcher_f cb, - void *arg); - -void flux_periodic_watcher_reset (flux_watcher_t *w, - double next_wakeup, - double interval, - flux_reschedule_f reschedule_cb); - - -/* prepare/check/idle - */ - -flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg); - -flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg); - -flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, - flux_watcher_f cb, - void *arg); - -/* child - */ - -flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, - int pid, - bool trace, - flux_watcher_f cb, - void *arg); -int flux_child_watcher_get_rpid (flux_watcher_t *w); -int flux_child_watcher_get_rstatus (flux_watcher_t *w); - -/* signal - */ - -flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, - int signum, - flux_watcher_f cb, - void *arg); - -int flux_signal_watcher_get_signum (flux_watcher_t *w); - -/* stat - */ - -flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, - const char *path, - double interval, - flux_watcher_f cb, - void *arg); -void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev); - #ifdef __cplusplus } #endif diff --git a/src/common/libflux/reactor_private.h b/src/common/libflux/reactor_private.h index a4f8f40fa4de..10711f710e50 100644 --- a/src/common/libflux/reactor_private.h +++ b/src/common/libflux/reactor_private.h @@ -11,97 +11,10 @@ #ifndef _FLUX_CORE_REACTOR_PRIVATE_H #define _FLUX_CORE_REACTOR_PRIVATE_H -#include "src/common/libev/ev.h" #include "reactor.h" -struct flux_watcher_ops { - void (*set_priority) (flux_watcher_t *w, int priority); - void (*start) (flux_watcher_t *w); - void (*stop) (flux_watcher_t *w); - void (*destroy) (flux_watcher_t *w); - bool (*is_active) (flux_watcher_t *w); -}; - -struct flux_reactor { - struct ev_loop *loop; - int usecount; - unsigned int errflag:1; -}; - -struct flux_watcher { - flux_reactor_t *r; - flux_watcher_f fn; - void *arg; - struct flux_watcher_ops *ops; - void *data; -}; - -static inline int events_to_libev (int events) -{ - int e = 0; - if (events & FLUX_POLLIN) - e |= EV_READ; - if (events & FLUX_POLLOUT) - e |= EV_WRITE; - if (events & FLUX_POLLERR) - e |= EV_ERROR; - return e; -} - -static inline int libev_to_events (int events) -{ - int e = 0; - if (events & EV_READ) - e |= FLUX_POLLIN; - if (events & EV_WRITE) - e |= FLUX_POLLOUT; - if (events & EV_ERROR) - e |= FLUX_POLLERR; - return e; -} - -/* Create a custom watcher on reactor 'r' with 'data_size' bytes reserved - * for the implementor, implementation operations in 'ops' and user - * watcher callback and data 'fn' and 'arg'. - * - * Caller retrieves pointer to allocated implementation data with - * flux_watcher_data (w). - */ -static inline flux_watcher_t *watcher_create (flux_reactor_t *r, - size_t data_size, - struct flux_watcher_ops *ops, - flux_watcher_f fn, - void *arg) -{ - struct flux_watcher *w = calloc (1, sizeof (*w) + data_size); - if (!w) - return NULL; - w->r = r; - w->ops = ops; - w->data = w + 1; - w->fn = fn; - w->arg = arg; - flux_reactor_incref (r); - return w; -} - -/* Return pointer to implementation data reserved by watcher object 'w'. - */ -static inline void *watcher_get_data (flux_watcher_t *w) -{ - if (w) - return w->data; - return NULL; -} - -/* Return pointer to flux_watcher_ops structure for this watcher. - */ -static inline struct flux_watcher_ops *watcher_get_ops (flux_watcher_t *w) -{ - if (w) - return w->ops; - return NULL; -} +/* retrieve underlying loop implementation - for watcher_wrap.c only */ +void *reactor_get_loop (flux_reactor_t *r); #endif /* !_FLUX_CORE_REACTOR_PRIVATE_H */ diff --git a/src/common/libflux/watcher.c b/src/common/libflux/watcher.c new file mode 100644 index 000000000000..e6bc66c2454e --- /dev/null +++ b/src/common/libflux/watcher.c @@ -0,0 +1,123 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include + +#include "reactor_private.h" +#include "watcher_private.h" + +struct flux_watcher { + flux_reactor_t *r; + flux_watcher_f fn; + void *arg; + struct flux_watcher_ops *ops; + void *data; +}; + +flux_watcher_t *watcher_create (flux_reactor_t *r, + size_t data_size, + struct flux_watcher_ops *ops, + flux_watcher_f fn, + void *arg) +{ + struct flux_watcher *w = calloc (1, sizeof (*w) + data_size); + if (!w) + return NULL; + w->r = r; + w->ops = ops; + w->data = w + 1; + w->fn = fn; + w->arg = arg; + flux_reactor_incref (r); + return w; +} + +void *watcher_get_data (flux_watcher_t *w) +{ + if (w) + return w->data; + return NULL; +} + +struct flux_watcher_ops *watcher_get_ops (flux_watcher_t *w) +{ + if (w) + return w->ops; + return NULL; +} + +void watcher_call (flux_watcher_t *w, int revents) +{ + if (w->fn) + w->fn (w->r, w, revents, w->arg); +} + +void *watcher_get_arg (flux_watcher_t *w) +{ + if (w) + return w->arg; + return NULL; +} + +flux_reactor_t *watcher_get_reactor (flux_watcher_t *w) +{ + return w ? w->r : NULL; +} + +void flux_watcher_set_priority (flux_watcher_t *w, int priority) +{ + if (w) { + if (w->ops->set_priority) + w->ops->set_priority (w, priority); + } +} + +void flux_watcher_start (flux_watcher_t *w) +{ + if (w) { + if (w->ops->start) + w->ops->start (w); + } +} + +void flux_watcher_stop (flux_watcher_t *w) +{ + if (w) { + if (w->ops->stop) + w->ops->stop (w); + } +} + +bool flux_watcher_is_active (flux_watcher_t *w) +{ + if (w) { + if (w->ops->is_active) + return w->ops->is_active (w); + } + return false; +} + +void flux_watcher_destroy (flux_watcher_t *w) +{ + if (w) { + if (w->ops->stop) + w->ops->stop (w); + if (w->ops->destroy) + w->ops->destroy (w); + flux_reactor_decref (w->r); + free (w); + } +} + +// vi:ts=4 sw=4 expandtab diff --git a/src/common/libflux/watcher.h b/src/common/libflux/watcher.h new file mode 100644 index 000000000000..0ebe83842fe6 --- /dev/null +++ b/src/common/libflux/watcher.h @@ -0,0 +1,146 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_CORE_WATCHER_H +#define _FLUX_CORE_WATCHER_H + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void (*flux_watcher_f)(flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg); + +/* Set the watcher priority. The range is [-2:2] (default 0). + * Higher priority watchers run first. + * This is a no-op if the underlying watcher doesn't support it. + * If the priority is out of range, the max or min value is set. + * The priority should only be set when the watcher is stopped. + * Currently only the check watcher supports it. + */ +void flux_watcher_set_priority (flux_watcher_t *w, int priority); + +void flux_watcher_start (flux_watcher_t *w); +void flux_watcher_stop (flux_watcher_t *w); +void flux_watcher_destroy (flux_watcher_t *w); +double flux_watcher_next_wakeup (flux_watcher_t *w); +bool flux_watcher_is_active (flux_watcher_t *w); + +/* flux_t handle + */ + +flux_watcher_t *flux_handle_watcher_create (flux_reactor_t *r, + flux_t *h, + int events, + flux_watcher_f cb, + void *arg); +flux_t *flux_handle_watcher_get_flux (flux_watcher_t *w); + +/* file descriptor + */ + +flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, + int fd, int events, + flux_watcher_f cb, + void *arg); +int flux_fd_watcher_get_fd (flux_watcher_t *w); + +/* timer + */ + +flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, + double after, + double repeat, + flux_watcher_f cb, + void *arg); + +void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat); + +void flux_timer_watcher_again (flux_watcher_t *w); + +/* periodic + */ + +typedef double (*flux_reschedule_f) (flux_watcher_t *w, double now, void *arg); + +flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, + double offset, + double interval, + flux_reschedule_f reschedule_cb, + flux_watcher_f cb, + void *arg); + +void flux_periodic_watcher_reset (flux_watcher_t *w, + double next_wakeup, + double interval, + flux_reschedule_f reschedule_cb); + + +/* prepare/check/idle + */ + +flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg); + +flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg); + +flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg); + +/* child + */ + +flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, + int pid, + bool trace, + flux_watcher_f cb, + void *arg); +int flux_child_watcher_get_rpid (flux_watcher_t *w); +int flux_child_watcher_get_rstatus (flux_watcher_t *w); + +/* signal + */ + +flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, + int signum, + flux_watcher_f cb, + void *arg); + +int flux_signal_watcher_get_signum (flux_watcher_t *w); + +/* stat + */ + +flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, + const char *path, + double interval, + flux_watcher_f cb, + void *arg); +void flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev); + +#ifdef __cplusplus +} +#endif + +#endif /* !_FLUX_CORE_WATCHER_H */ + +// vi:ts=4 sw=4 expandtab diff --git a/src/common/libflux/watcher_private.h b/src/common/libflux/watcher_private.h new file mode 100644 index 000000000000..fae4c229bfda --- /dev/null +++ b/src/common/libflux/watcher_private.h @@ -0,0 +1,57 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* private interfaces for "subclassing" watcher.c */ + +#ifndef _FLUX_CORE_WATCHER_PRIVATE_H +#define _FLUX_CORE_WATCHER_PRIVATE_H + +#include "reactor.h" + +struct flux_watcher_ops { + void (*set_priority) (flux_watcher_t *w, int priority); + void (*start) (flux_watcher_t *w); + void (*stop) (flux_watcher_t *w); + void (*destroy) (flux_watcher_t *w); + bool (*is_active) (flux_watcher_t *w); +}; + +/* Create a custom watcher on reactor 'r' with 'data_size' bytes reserved + * for the implementor, implementation operations in 'ops' and user + * watcher callback and data 'fn' and 'arg'. + * + * Caller retrieves pointer to allocated implementation data with + * flux_watcher_data (w). + */ +flux_watcher_t *watcher_create (flux_reactor_t *r, + size_t data_size, + struct flux_watcher_ops *ops, + flux_watcher_f fn, + void *arg); + +/* Return pointer to implementation data reserved by watcher object 'w'. + */ +void *watcher_get_data (flux_watcher_t *w); + +/* Return pointer to flux_watcher_ops structure for this watcher. + */ +struct flux_watcher_ops *watcher_get_ops (flux_watcher_t *w); + +void watcher_call (flux_watcher_t *w, int revents); + +flux_reactor_t *watcher_get_reactor (flux_watcher_t *w); + +void *watcher_get_arg (flux_watcher_t *w); + +#endif /* !_FLUX_CORE_WATCHER_PRIVATE_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/common/libflux/watcher_wrap.c b/src/common/libflux/watcher_wrap.c new file mode 100644 index 000000000000..42299c6a276f --- /dev/null +++ b/src/common/libflux/watcher_wrap.c @@ -0,0 +1,670 @@ +/************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* watcher_wrap.c - wrapped libev watchers */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include + +#include "src/common/libev/ev.h" + +#include "reactor_private.h" +#include "watcher_private.h" + +static inline int events_to_libev (int events) +{ + int e = 0; + if (events & FLUX_POLLIN) + e |= EV_READ; + if (events & FLUX_POLLOUT) + e |= EV_WRITE; + if (events & FLUX_POLLERR) + e |= EV_ERROR; + return e; +} + +static inline int libev_to_events (int events) +{ + int e = 0; + if (events & EV_READ) + e |= FLUX_POLLIN; + if (events & EV_WRITE) + e |= FLUX_POLLOUT; + if (events & EV_ERROR) + e |= FLUX_POLLERR; + return e; +} + +static void watcher_call_ev (flux_watcher_t *w, int revents) +{ + watcher_call (w, libev_to_events (revents)); +} + +static struct ev_loop *watcher_get_ev (flux_watcher_t *w) +{ + return reactor_get_loop (watcher_get_reactor (w)); +} + +static void safe_stop_cb (struct ev_loop *loop, ev_prepare *pw, int revents) +{ + flux_watcher_stop ((flux_watcher_t *)pw->data); + ev_prepare_stop (loop, pw); + free (pw); +} + +/* Stop a watcher in the next ev_prepare callback. To be used from periodics + * reschedule callback or other ev callbacks in which it is documented + * unsafe to modify the ev_loop or any watcher. + */ +static void watcher_stop_safe (flux_watcher_t *w) +{ + if (w) { + ev_prepare *pw = calloc (1, sizeof (*pw)); + if (!pw) /* On ENOMEM, we just have to give up */ + return; + ev_prepare_init (pw, safe_stop_cb); + pw->data = w; + ev_prepare_start (watcher_get_ev (w), pw); + } +} + +/* This is_active() callback works for "native" libev watchers, where + * watcher data points to a struct ev_TYPE. + */ +static bool wrap_ev_active (flux_watcher_t *w) +{ + return ev_is_active (watcher_get_data (w)); +} + +/* file descriptors + */ + +static void fd_start (flux_watcher_t *w) +{ + ev_io *iow = watcher_get_data (w); + struct ev_loop *loop = watcher_get_ev (w); + ev_io_start (loop, iow); +} + +static void fd_stop (flux_watcher_t *w) +{ + ev_io *iow = watcher_get_data (w); + struct ev_loop *loop = watcher_get_ev (w); + ev_io_stop (loop, iow); +} + +static void fd_cb (struct ev_loop *loop, ev_io *iow, int revents) +{ + struct flux_watcher *w = iow->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops fd_watcher = { + .start = fd_start, + .stop = fd_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, + int fd, + int events, + flux_watcher_f cb, + void *arg) +{ + ev_io *iow; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*iow), &fd_watcher, cb, arg))) + return NULL; + iow = watcher_get_data (w); + ev_io_init (iow, fd_cb, fd, events_to_libev (events) & ~EV_ERROR); + iow->data = w; + + return w; +} + +int flux_fd_watcher_get_fd (flux_watcher_t *w) +{ + assert (watcher_get_ops (w) == &fd_watcher); + ev_io *iow = watcher_get_data (w); + return iow->fd; +} + +/* Timer + */ + +static void timer_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct ev_timer *tw = watcher_get_data (w); + ev_timer_start (loop, tw); +} + +static void timer_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct ev_timer *tw = watcher_get_data (w); + ev_timer_stop (loop, tw); +} + +static void timer_cb (struct ev_loop *loop, ev_timer *tw, int revents) +{ + struct flux_watcher *w = tw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops timer_watcher = { + .start = timer_start, + .stop = timer_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, + double after, + double repeat, + flux_watcher_f cb, + void *arg) +{ + ev_timer *tw; + flux_watcher_t *w; + if (after < 0 || repeat < 0) { + errno = EINVAL; + return NULL; + } + if (!(w = watcher_create (r, sizeof (*tw), &timer_watcher, cb, arg))) + return NULL; + tw = watcher_get_data (w); + ev_timer_init (tw, timer_cb, after, repeat); + tw->data = w; + + return w; +} + +void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) +{ + assert (watcher_get_ops (w) == &timer_watcher); + ev_timer *tw = watcher_get_data (w); + ev_timer_set (tw, after, repeat); +} + +void flux_timer_watcher_again (flux_watcher_t *w) +{ + assert (watcher_get_ops (w) == &timer_watcher); + struct ev_loop *loop = watcher_get_ev (w); + ev_timer *tw = watcher_get_data (w); + ev_timer_again (loop, tw); +} + +/* Periodic + */ +struct f_periodic { + struct flux_watcher *w; + ev_periodic evp; + flux_reschedule_f reschedule_cb; +}; + +static void periodic_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct f_periodic *fp = watcher_get_data (w); + ev_periodic_start (loop, &fp->evp); +} + +static void periodic_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct f_periodic *fp = watcher_get_data (w); + ev_periodic_stop (loop, &fp->evp); +} + +static bool periodic_is_active (flux_watcher_t *w) +{ + struct f_periodic *fp = watcher_get_data (w); + return ev_is_active (&fp->evp); +} + +static void periodic_cb (struct ev_loop *loop, ev_periodic *pw, int revents) +{ + struct f_periodic *fp = pw->data; + struct flux_watcher *w = fp->w; + watcher_call_ev (w, revents); +} + +static ev_tstamp periodic_reschedule_cb (ev_periodic *pw, ev_tstamp now) +{ + ev_tstamp rc; + struct f_periodic *fp = pw->data; + assert (fp->reschedule_cb != NULL); + rc = (ev_tstamp)fp->reschedule_cb (fp->w, + (double)now, + watcher_get_arg (fp->w)); + if (rc < now) { + /* User reschedule cb returned time in the past. The watcher will + * be stopped, but not here (changing loop is not allowed in a + * libev reschedule cb. flux_watcher_stop_safe() will stop it in + * a prepare callback. + * Return time far in the future to ensure we aren't called again. + */ + watcher_stop_safe (fp->w); + return (now + 1e99); + } + return rc; +} + +static struct flux_watcher_ops periodic_watcher = { + .start = periodic_start, + .stop = periodic_stop, + .destroy = NULL, + .is_active = periodic_is_active, +}; + +flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, + double offset, + double interval, + flux_reschedule_f reschedule_cb, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + struct f_periodic *fp; + size_t size = sizeof (*fp); + if (offset < 0 || interval < 0) { + errno = EINVAL; + return NULL; + } + if (!(w = watcher_create (r, size, &periodic_watcher, cb, arg))) + return NULL; + fp = watcher_get_data (w); + fp->evp.data = fp; + fp->w = w; + fp->reschedule_cb = reschedule_cb; + + ev_periodic_init (&fp->evp, + periodic_cb, + offset, + interval, + reschedule_cb ? periodic_reschedule_cb : NULL); + + return w; +} + +void flux_periodic_watcher_reset (flux_watcher_t *w, + double next, + double interval, + flux_reschedule_f reschedule_cb) +{ + struct ev_loop *loop = watcher_get_ev (w); + struct f_periodic *fp = watcher_get_data (w); + assert (watcher_get_ops (w) == &periodic_watcher); + fp->reschedule_cb = reschedule_cb; + ev_periodic_set (&fp->evp, + next, + interval, + reschedule_cb ? periodic_reschedule_cb : NULL); + ev_periodic_again (loop, &fp->evp); +} + +double flux_watcher_next_wakeup (flux_watcher_t *w) +{ + if (watcher_get_ops (w) == &periodic_watcher) { + struct f_periodic *fp = watcher_get_data (w); + return ((double) ev_periodic_at (&fp->evp)); + } + else if (watcher_get_ops (w) == &timer_watcher) { + ev_timer *tw = watcher_get_data (w); + struct ev_loop *loop = watcher_get_ev (w); + return ((double) (ev_now (loop) + ev_timer_remaining (loop, tw))); + } + errno = EINVAL; + return (-1.); +} + +/* Prepare + */ +static void prepare_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_prepare *pw = watcher_get_data (w); + ev_prepare_start (loop, pw); +} + +static void prepare_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_prepare *pw = watcher_get_data (w); + ev_prepare_stop (loop, pw); +} + +static void prepare_cb (struct ev_loop *loop, ev_prepare *pw, int revents) +{ + struct flux_watcher *w = pw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops prepare_watcher = { + .start = prepare_start, + .stop = prepare_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg) +{ + ev_prepare *pw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher, cb, arg))) + return NULL; + pw = watcher_get_data (w); + ev_prepare_init (pw, prepare_cb); + pw->data = w; + + return w; +} + +/* Check + */ + +static void check_set_priority (flux_watcher_t *w, int priority) +{ + ev_check *cw = watcher_get_data (w); + ev_set_priority (cw, priority); +} + +static void check_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_check *cw = watcher_get_data (w); + ev_check_start (loop, cw); +} + +static void check_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_check *cw = watcher_get_data (w); + ev_check_stop (loop, cw); +} + +static void check_cb (struct ev_loop *loop, ev_check *cw, int revents) +{ + struct flux_watcher *w = cw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops check_watcher = { + .set_priority = check_set_priority, + .start = check_start, + .stop = check_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg) +{ + ev_check *cw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*cw), &check_watcher, cb, arg))) + return NULL; + cw = watcher_get_data (w); + ev_check_init (cw, check_cb); + cw->data = w; + + return w; +} + +/* Idle + */ + +static void idle_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_idle *iw = watcher_get_data (w); + ev_idle_start (loop, iw); +} + +static void idle_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_idle *iw = watcher_get_data (w); + ev_idle_stop (loop, iw); +} + +static void idle_cb (struct ev_loop *loop, ev_idle *iw, int revents) +{ + struct flux_watcher *w = iw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops idle_watcher = { + .start = idle_start, + .stop = idle_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg) +{ + ev_idle *iw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher, cb, arg))) + return NULL; + iw = watcher_get_data (w); + ev_idle_init (iw, idle_cb); + iw->data = w; + + return w; +} + +/* Child + */ + +static void child_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_child *cw = watcher_get_data (w); + ev_child_start (loop, cw); +} + +static void child_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_child *cw = watcher_get_data (w); + ev_child_stop (loop, cw); +} + +static void child_cb (struct ev_loop *loop, ev_child *cw, int revents) +{ + struct flux_watcher *w = cw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops child_watcher = { + .start = child_start, + .stop = child_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + + +flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, + int pid, + bool trace, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + ev_child *cw; + + if (!ev_is_default_loop (reactor_get_loop (r))) { + errno = EINVAL; + return NULL; + } + if (!(w = watcher_create (r, sizeof (*cw), &child_watcher, cb, arg))) + return NULL; + cw = watcher_get_data (w); + ev_child_init (cw, child_cb, pid, trace ? 1 : 0); + cw->data = w; + + return w; +} + +int flux_child_watcher_get_rpid (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &child_watcher) { + errno = EINVAL; + return -1; + } + ev_child *cw = watcher_get_data (w); + return cw->rpid; +} + +int flux_child_watcher_get_rstatus (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &child_watcher) { + errno = EINVAL; + return -1; + } + ev_child *cw = watcher_get_data (w); + return cw->rstatus; +} + +/* Signal + */ + +static void signal_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_signal *sw = watcher_get_data (w); + ev_signal_start (loop, sw); +} + +static void signal_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_signal *sw = watcher_get_data (w); + ev_signal_stop (loop, sw); +} + +static void signal_cb (struct ev_loop *loop, ev_signal *sw, int revents) +{ + struct flux_watcher *w = sw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops signal_watcher = { + .start = signal_start, + .stop = signal_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, + int signum, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + ev_signal *sw; + + if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher, cb, arg))) + return NULL; + sw = watcher_get_data (w); + ev_signal_init (sw, signal_cb, signum); + sw->data = w; + + return w; +} + +int flux_signal_watcher_get_signum (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &signal_watcher) { + errno = EINVAL; + return (-1); + } + ev_signal *sw = watcher_get_data (w); + return sw->signum; +} + +/* Stat + */ + +static void stat_start (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_stat *sw = watcher_get_data (w); + ev_stat_start (loop, sw); +} + +static void stat_stop (flux_watcher_t *w) +{ + struct ev_loop *loop = watcher_get_ev (w); + ev_stat *sw = watcher_get_data (w); + ev_stat_stop (loop, sw); +} + +static void stat_cb (struct ev_loop *loop, ev_stat *sw, int revents) +{ + struct flux_watcher *w = sw->data; + watcher_call_ev (w, revents); +} + +static struct flux_watcher_ops stat_watcher = { + .start = stat_start, + .stop = stat_stop, + .destroy = NULL, + .is_active = wrap_ev_active, +}; + +flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, + const char *path, + double interval, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + ev_stat *sw; + + if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher, cb, arg))) + return NULL; + sw = watcher_get_data (w); + ev_stat_init (sw, stat_cb, path, interval); + sw->data = w; + + return w; +} + +void flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev) +{ + ev_stat *sw = watcher_get_data (w); + assert (watcher_get_ops (w) == &stat_watcher); + if (stat) + *stat = sw->attr; + if (prev) + *prev = sw->prev; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/common/libsubprocess/Makefile.am b/src/common/libsubprocess/Makefile.am index c93a494ae8d4..1f01447cfec3 100644 --- a/src/common/libsubprocess/Makefile.am +++ b/src/common/libsubprocess/Makefile.am @@ -36,10 +36,6 @@ libsubprocess_la_SOURCES = \ subprocess_private.h \ client.h \ client.c \ - ev_fbuf_read.h \ - ev_fbuf_read.c \ - ev_fbuf_write.h \ - ev_fbuf_write.c \ fbuf.h \ fbuf.c \ fbuf_watcher.h \ @@ -84,6 +80,7 @@ test_ldadd = \ $(top_builddir)/src/common/libtap/libtap.la \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libflux/libflux.la \ $(top_builddir)/src/common/libflux-internal.la test_ldflags = \ diff --git a/src/common/libsubprocess/ev_fbuf_read.c b/src/common/libsubprocess/ev_fbuf_read.c deleted file mode 100644 index 12ad03575f68..000000000000 --- a/src/common/libsubprocess/ev_fbuf_read.c +++ /dev/null @@ -1,199 +0,0 @@ -/************************************************************\ - * Copyright 2015 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -#if HAVE_CONFIG_H -#include "config.h" -#endif -#include -#include - -#include "src/common/libev/ev.h" - -#include "ev_fbuf_read.h" - -static bool data_to_read (struct ev_fbuf_read *ebr, bool *is_eof) -{ - if (ebr->line) { - if (fbuf_has_line (ebr->fb)) - return true; - else { - /* if no line, but the buffer is full, we have to flush */ - if (!fbuf_space (ebr->fb)) - return true; - /* if eof read, no lines, but left over data non-line data, - * this data should be flushed to the user */ - if (ebr->eof_read && fbuf_bytes (ebr->fb)) - return true; - } - } - else { - if (fbuf_bytes (ebr->fb) > 0) - return true; - } - - if (ebr->eof_read && !ebr->eof_sent && !fbuf_bytes (ebr->fb)) { - if (is_eof) - (*is_eof) = true; - return true; - } - - return false; -} - -static void buffer_notify_cb (struct fbuf *fb, void *arg) -{ - struct ev_fbuf_read *ebr = arg; - - /* space is available, start ev io watcher again, assuming watcher - * is not stopped by user */ - if (ebr->start && fbuf_space (fb) > 0) - ev_io_start (ebr->loop, &(ebr->io_w)); -} - -static void prepare_cb (struct ev_loop *loop, ev_prepare *w, int revents) -{ - struct ev_fbuf_read *ebr = (struct ev_fbuf_read *)((char *)w - - offsetof (struct ev_fbuf_read, prepare_w)); - - if (data_to_read (ebr, NULL) == true) - ev_idle_start (loop, &ebr->idle_w); -} - -static void buffer_read_cb (struct ev_loop *loop, ev_io *iow, int revents) -{ - struct ev_fbuf_read *ebr = iow->data; - - if (revents & EV_READ) { - int ret, space; - - if ((space = fbuf_space (ebr->fb)) < 0) - return; - - if ((ret = fbuf_write_from_fd (ebr->fb, ebr->fd, space)) < 0) - return; - - if (!ret) { - ev_fbuf_read_decref (ebr); - (void)fbuf_readonly (ebr->fb); - ev_io_stop (ebr->loop, iow); - } - else if (ret == space) { - /* buffer full, buffer_notify_cb will be called - * to re-enable io reactor when space is available */ - ev_io_stop (ebr->loop, iow); - } - } - else { - if (ebr->cb) - ebr->cb (loop, ebr, revents); - } -} - -static void check_cb (struct ev_loop *loop, ev_check *w, int revents) -{ - struct ev_fbuf_read *ebr = (struct ev_fbuf_read *)((char *)w - - offsetof (struct ev_fbuf_read, check_w)); - bool is_eof = false; - - ev_idle_stop (loop, &ebr->idle_w); - - if (data_to_read (ebr, &is_eof) == true) { - if (ebr->cb) - ebr->cb (loop, ebr, EV_READ); - - if (is_eof) - ebr->eof_sent = true; - } -} - -int ev_fbuf_read_init (struct ev_fbuf_read *ebr, - int fd, - int size, - ev_fbuf_read_f cb, - struct ev_loop *loop) -{ - ebr->cb = cb; - ebr->fd = fd; - ebr->loop = loop; - ebr->start = false; - ebr->eof_read = false; - ebr->eof_sent = false; - ebr->refcnt = 1; - - if (!(ebr->fb = fbuf_create (size))) - goto cleanup; - fbuf_set_notify (ebr->fb, buffer_notify_cb, ebr); - - ev_prepare_init (&ebr->prepare_w, prepare_cb); - ev_check_init (&ebr->check_w, check_cb); - ev_idle_init (&ebr->idle_w, NULL); - ev_io_init (&ebr->io_w, buffer_read_cb, ebr->fd, EV_READ); - ebr->io_w.data = ebr; - - return 0; - -cleanup: - ev_fbuf_read_cleanup (ebr); - return -1; -} - -void ev_fbuf_read_cleanup (struct ev_fbuf_read *ebr) -{ - if (ebr) { - fbuf_destroy (ebr->fb); - ebr->fb = NULL; - } -} - -void ev_fbuf_read_start (struct ev_loop *loop, struct ev_fbuf_read *ebr) -{ - if (!ebr->start) { - ebr->start = true; - ev_prepare_start (loop, &ebr->prepare_w); - ev_check_start (loop, &ebr->check_w); - - if (fbuf_space (ebr->fb) > 0) - ev_io_start (ebr->loop, &(ebr->io_w)); - /* else: buffer full, buffer_notify_cb will be called - * to re-enable io reactor when space is available */ - } -} - -void ev_fbuf_read_stop (struct ev_loop *loop, struct ev_fbuf_read *ebr) -{ - if (ebr->start) { - ev_prepare_stop (loop, &ebr->prepare_w); - ev_check_stop (loop, &ebr->check_w); - ev_io_stop (loop, &ebr->io_w); - ev_idle_stop (loop, &ebr->idle_w); - ebr->start = false; - } -} - -bool ev_fbuf_read_is_active (struct ev_fbuf_read *ebr) -{ - return ev_is_active (&ebr->prepare_w); -} - -void ev_fbuf_read_incref (struct ev_fbuf_read *ebr) -{ - ebr->refcnt++; -} - -void ev_fbuf_read_decref (struct ev_fbuf_read *ebr) -{ - if (--ebr->refcnt == 0) - ebr->eof_read = true; -} - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ - diff --git a/src/common/libsubprocess/ev_fbuf_read.h b/src/common/libsubprocess/ev_fbuf_read.h deleted file mode 100644 index 4dd1b781f54f..000000000000 --- a/src/common/libsubprocess/ev_fbuf_read.h +++ /dev/null @@ -1,54 +0,0 @@ -/************************************************************\ - * Copyright 2014 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -#ifndef _LIBSUBPROCESS_EV_FBUF_READ_H -#define _LIBSUBPROCESS_EV_FBUF_READ_H - -#include "src/common/libev/ev.h" -#include "fbuf.h" - -struct ev_fbuf_read; - -typedef void (*ev_fbuf_read_f)(struct ev_loop *loop, - struct ev_fbuf_read *ebr, - int revents); - -struct ev_fbuf_read { - int refcnt; - ev_io io_w; - ev_prepare prepare_w; - ev_idle idle_w; - ev_check check_w; - int fd; - ev_fbuf_read_f cb; - struct fbuf *fb; - struct ev_loop *loop; - bool start; /* flag, if user started reactor */ - bool eof_read; /* flag, if EOF on stream seen */ - bool eof_sent; /* flag, if EOF to user sent */ - bool line; /* flag, if line buffered */ - void *data; -}; - -int ev_fbuf_read_init (struct ev_fbuf_read *ebr, - int fd, - int size, - ev_fbuf_read_f cb, - struct ev_loop *loop); -void ev_fbuf_read_cleanup (struct ev_fbuf_read *ebr); -void ev_fbuf_read_start (struct ev_loop *loop, struct ev_fbuf_read *ebr); -void ev_fbuf_read_stop (struct ev_loop *loop, struct ev_fbuf_read *ebr); -bool ev_fbuf_read_is_active (struct ev_fbuf_read *ebr); -void ev_fbuf_read_incref (struct ev_fbuf_read *ebr); -void ev_fbuf_read_decref (struct ev_fbuf_read *ebr); - -#endif /* !_LIBSUBPROCESS_EV_FBUF_READ_H */ - -// vi: ts=4 sw=4 expandtab diff --git a/src/common/libsubprocess/ev_fbuf_write.c b/src/common/libsubprocess/ev_fbuf_write.c deleted file mode 100644 index 25c28ef1925c..000000000000 --- a/src/common/libsubprocess/ev_fbuf_write.c +++ /dev/null @@ -1,152 +0,0 @@ -/************************************************************\ - * Copyright 2015 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -#if HAVE_CONFIG_H -#include "config.h" -#endif -#include -#include -#include -#include - -#include "src/common/libev/ev.h" - -#include "ev_fbuf_write.h" - -static void buffer_write_cb (struct ev_loop *loop, ev_io *iow, int revents) -{ - struct ev_fbuf_write *ebw = iow->data; - - if (revents & EV_WRITE) { - int ret; - - /* Send one time notification so user knows initial buffer - * size */ - if (!ebw->initial_space) { - ebw->initial_space = true; - if (ebw->cb) - ebw->cb (loop, ebw, revents); - } - - if ((ret = fbuf_read_to_fd (ebw->fb, ebw->fd, -1)) < 0) { - if (ebw->cb) - ebw->cb (loop, ebw, EV_ERROR); - return; - } - - if (ret) { - if (ebw->cb) - ebw->cb (loop, ebw, revents); - } - - if (!fbuf_bytes (ebw->fb) && ebw->eof) { - if (close (ebw->fd) < 0) - ebw->close_errno = errno; - ebw->fd = -1; - ebw->closed = true; - ebw->eof = false; - if (ebw->cb) - ebw->cb (loop, ebw, revents); - } - - if (!fbuf_bytes (ebw->fb) && !ebw->eof) - ev_io_stop (ebw->loop, &(ebw->io_w)); - } - else { - if (ebw->cb) - ebw->cb (loop, ebw, revents); - } -} - -/* data is available, start ev io watcher assuming user has - * started the watcher. - */ -void ev_fbuf_write_wakeup (struct ev_fbuf_write *ebw) -{ - if (ebw->start) - ev_io_start (ebw->loop, &(ebw->io_w)); -} - -static void buffer_notify_cb (struct fbuf *fb, void *arg) -{ - struct ev_fbuf_write *ebw = arg; - - if (fbuf_bytes (fb) > 0) - ev_fbuf_write_wakeup (ebw); -} - -int ev_fbuf_write_init (struct ev_fbuf_write *ebw, - int fd, - int size, - ev_fbuf_write_f cb, - struct ev_loop *loop) -{ - ebw->cb = cb; - ebw->fd = fd; - ebw->loop = loop; - ebw->start = false; - - if (!(ebw->fb = fbuf_create (size))) - goto cleanup; - - /* When any data becomes available, call buffer_notify_cb, - * which will start io reactor - */ - fbuf_set_notify (ebw->fb, buffer_notify_cb, ebw); - - ev_io_init (&ebw->io_w, buffer_write_cb, ebw->fd, EV_WRITE); - ebw->io_w.data = ebw; - - return 0; - -cleanup: - ev_fbuf_write_cleanup (ebw); - return -1; -} - -void ev_fbuf_write_cleanup (struct ev_fbuf_write *ebw) -{ - if (ebw) { - fbuf_destroy (ebw->fb); - ebw->fb = NULL; - } -} - -void ev_fbuf_write_start (struct ev_loop *loop, struct ev_fbuf_write *ebw) -{ - if (!ebw->start) { - ebw->start = true; - /* do not start watcher unless - * - we have not sent initial space - * - there is data to be written out - * - notify EOF - */ - if (!ebw->initial_space || fbuf_bytes (ebw->fb) || ebw->eof) - ev_io_start (ebw->loop, &(ebw->io_w)); - } -} - -void ev_fbuf_write_stop (struct ev_loop *loop, struct ev_fbuf_write *ebw) -{ - if (ebw->start) { - ev_io_stop (loop, &ebw->io_w); - ebw->start = false; - } -} - -bool ev_fbuf_write_is_active (struct ev_fbuf_write *ebw) -{ - return ev_is_active (&ebw->io_w); -} - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ - diff --git a/src/common/libsubprocess/ev_fbuf_write.h b/src/common/libsubprocess/ev_fbuf_write.h deleted file mode 100644 index 89245b39defe..000000000000 --- a/src/common/libsubprocess/ev_fbuf_write.h +++ /dev/null @@ -1,49 +0,0 @@ -/************************************************************\ - * Copyright 2015 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -#ifndef _LIBSUBPROCESS_EV_FBUF_WRITE_H -#define _LIBSUBPROCESS_EV_FBUF_WRITE_H - -#include "src/common/libev/ev.h" -#include "fbuf.h" - -struct ev_fbuf_write; - -typedef void (*ev_fbuf_write_f)(struct ev_loop *loop, - struct ev_fbuf_write *ebw, - int revents); - -struct ev_fbuf_write { - ev_io io_w; - int fd; - ev_fbuf_write_f cb; - struct fbuf *fb; - struct ev_loop *loop; - bool start; /* flag, if user started reactor */ - bool eof; /* flag, eof written */ - bool closed; /* flag, fd has been closed */ - int close_errno; /* errno from close */ - bool initial_space; /* flag, initial space notified */ - void *data; -}; - -int ev_fbuf_write_init (struct ev_fbuf_write *ebw, - int fd, - int size, - ev_fbuf_write_f cb, - struct ev_loop *loop); -void ev_fbuf_write_cleanup (struct ev_fbuf_write *ebw); -void ev_fbuf_write_start (struct ev_loop *loop, struct ev_fbuf_write *ebw); -void ev_fbuf_write_stop (struct ev_loop *loop, struct ev_fbuf_write *ebw); -bool ev_fbuf_write_is_active (struct ev_fbuf_write *ebw); -void ev_fbuf_write_wakeup (struct ev_fbuf_write *ebw); -#endif /* !_EV_BUFFER_WRITE_H */ - -// vi: ts=4 sw=4 expandtab diff --git a/src/common/libsubprocess/fbuf_watcher.c b/src/common/libsubprocess/fbuf_watcher.c index 987f9d1465d2..4da92f7e1bd8 100644 --- a/src/common/libsubprocess/fbuf_watcher.c +++ b/src/common/libsubprocess/fbuf_watcher.c @@ -14,55 +14,205 @@ #include #include #include +#include #include -#include "src/common/libev/ev.h" #include "src/common/libutil/log.h" #include "src/common/libutil/fdutils.h" -#include "src/common/libflux/reactor_private.h" +#include "src/common/libutil/errno_safe.h" +#include "src/common/libflux/watcher_private.h" #include "fbuf_watcher.h" -#include "ev_fbuf_read.h" -#include "ev_fbuf_write.h" #include "fbuf.h" -static void buffer_read_start (flux_watcher_t *w) +/* Read buffer watcher + */ + +struct rbwatcher { + int refcnt; + flux_watcher_t *fd_w; + flux_watcher_t *prepare_w; + flux_watcher_t *idle_w; + flux_watcher_t *check_w; + int fd; + struct fbuf *fbuf; + bool start; /* flag, if user started reactor */ + bool eof_read; /* flag, if EOF on stream seen */ + bool eof_sent; /* flag, if EOF to user sent */ + bool line; /* flag, if line buffered */ + void *data; +}; + +static int validate_fd_nonblock (int fd) +{ + int flags; + if (fd < 0) + goto inval; + if ((flags = fd_get_flags (fd)) < 0) + return -1; + if (!(flags & O_NONBLOCK)) + goto inval; + return 0; +inval: + errno = EINVAL; + return -1; +} + +static bool data_to_read (struct rbwatcher *rbw, bool *is_eof) +{ + if (rbw->line) { + if (fbuf_has_line (rbw->fbuf)) + return true; + else { + /* if no line, but the buffer is full, we have to flush */ + if (!fbuf_space (rbw->fbuf)) + return true; + /* if eof read, no lines, but left over data non-line data, + * this data should be flushed to the user */ + if (rbw->eof_read && fbuf_bytes (rbw->fbuf)) + return true; + } + } + else { + if (fbuf_bytes (rbw->fbuf) > 0) + return true; + } + + if (rbw->eof_read && !rbw->eof_sent && !fbuf_bytes (rbw->fbuf)) { + if (is_eof) + (*is_eof) = true; + return true; + } + + return false; +} + +static void rbwatcher_start (flux_watcher_t *w) +{ + struct rbwatcher *rbw = watcher_get_data (w); + + if (!rbw->start) { + flux_watcher_start (rbw->prepare_w); + flux_watcher_start (rbw->check_w); + if (fbuf_space (rbw->fbuf) > 0) + flux_watcher_start (rbw->fd_w); + /* else: buffer full, buffer_notify_cb will be called + * to re-enable io reactor when space is available */ + rbw->start = true; + } +} + +static void rbwatcher_stop (flux_watcher_t *w) +{ + struct rbwatcher *rbw = watcher_get_data (w); + + if (rbw->start) { + flux_watcher_stop (rbw->prepare_w); + flux_watcher_stop (rbw->check_w); + flux_watcher_stop (rbw->fd_w); + flux_watcher_stop (rbw->idle_w); + rbw->start = false; + } +} + +static void rbwatcher_destroy (flux_watcher_t *w) +{ + struct rbwatcher *rbw = watcher_get_data (w); + + if (rbw) { + flux_watcher_destroy (rbw->prepare_w); + flux_watcher_destroy (rbw->check_w); + flux_watcher_destroy (rbw->fd_w); + flux_watcher_destroy (rbw->idle_w); + fbuf_destroy (rbw->fbuf); + } +} + +static bool rbwatcher_is_active (flux_watcher_t *w) { - struct ev_fbuf_read *ebr = (struct ev_fbuf_read *)w->data; - ev_fbuf_read_start (w->r->loop, ebr); + struct rbwatcher *rbw = watcher_get_data (w); + + return flux_watcher_is_active (rbw->prepare_w); } -static void buffer_read_stop (flux_watcher_t *w) +static void rbwatcher_prepare_cb (flux_reactor_t *r, + flux_watcher_t *prepare_w, + int prepare_revents, + void *arg) { - struct ev_fbuf_read *ebr = (struct ev_fbuf_read *)w->data; - ev_fbuf_read_stop (w->r->loop, ebr); + flux_watcher_t *w = arg; + struct rbwatcher *rbw = watcher_get_data (w); + + if (data_to_read (rbw, NULL)) + flux_watcher_start (rbw->idle_w); } -static bool buffer_read_is_active (flux_watcher_t *w) +static void rbwatcher_check_cb (flux_reactor_t *r, + flux_watcher_t *check_w, + int check_revents, + void *arg) { - return ev_fbuf_read_is_active (w->data); + flux_watcher_t *w = arg; + struct rbwatcher *rbw = watcher_get_data (w); + bool is_eof = false; + + flux_watcher_stop (rbw->idle_w); + + if (data_to_read (rbw, &is_eof)) { + watcher_call (w, FLUX_POLLIN); + if (is_eof) + rbw->eof_sent = true; + } } -static void buffer_read_destroy (flux_watcher_t *w) +static void rbwatcher_fd_cb (flux_reactor_t *r, + flux_watcher_t *fd_w, + int fd_revents, + void *arg) { - struct ev_fbuf_read *ebr = (struct ev_fbuf_read *)w->data; - ev_fbuf_read_cleanup (ebr); + flux_watcher_t *w = arg; + struct rbwatcher *rbw = watcher_get_data (w); + + if (fd_revents & FLUX_POLLIN) { + int ret, space; + + if ((space = fbuf_space (rbw->fbuf)) < 0) + return; + + if ((ret = fbuf_write_from_fd (rbw->fbuf, rbw->fd, space)) < 0) + return; + + if (!ret) { + fbuf_read_watcher_decref (w); + (void)fbuf_readonly (rbw->fbuf); + flux_watcher_stop (fd_w); + } + else if (ret == space) { + /* buffer full, rbwatcher_notify_cb will be called + * to re-enable io reactor when space is available */ + flux_watcher_stop (fd_w); + } + } + else { + watcher_call (w, fd_revents); + } } -static void buffer_read_cb (struct ev_loop *loop, - struct ev_fbuf_read *ebr, - int revents) +static void rbwatcher_notify_cb (struct fbuf *fb, void *arg) { - struct flux_watcher *w = ebr->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); + struct rbwatcher *rbw = arg; + + /* space is available, start ev io watcher again, assuming watcher + * is not stopped by user */ + if (rbw->start && fbuf_space (fb) > 0) + flux_watcher_start (rbw->fd_w); } -static struct flux_watcher_ops buffer_read_watcher = { - .start = buffer_read_start, - .stop = buffer_read_stop, - .destroy = buffer_read_destroy, - .is_active = buffer_read_is_active, +static struct flux_watcher_ops rbwatcher_ops = { + .start = rbwatcher_start, + .stop = rbwatcher_stop, + .destroy = rbwatcher_destroy, + .is_active = rbwatcher_is_active, }; flux_watcher_t *fbuf_read_watcher_create (flux_reactor_t *r, @@ -72,128 +222,212 @@ flux_watcher_t *fbuf_read_watcher_create (flux_reactor_t *r, int flags, void *arg) { - struct ev_fbuf_read *ebr; - flux_watcher_t *w = NULL; - int fd_flags; + struct rbwatcher *rbw; + flux_watcher_t *w; - if (fd < 0) { - errno = EINVAL; - return NULL; - } - if ((fd_flags = fd_get_flags (fd)) < 0) - return NULL; - if (!(fd_flags & O_NONBLOCK)) { - errno = EINVAL; + if (validate_fd_nonblock (fd) < 0) return NULL; - } - - if (!(w = watcher_create (r, - sizeof (*ebr), - &buffer_read_watcher, - cb, - arg))) - goto cleanup; - - ebr = watcher_get_data (w); - - if (ev_fbuf_read_init (ebr, - fd, - size, - buffer_read_cb, - r->loop) < 0) - goto cleanup; - - if (flags & FBUF_WATCHER_LINE_BUFFER) - ebr->line = true; - - ebr->data = w; - + if (!(w = watcher_create (r, sizeof (*rbw), &rbwatcher_ops, cb, arg))) + goto error; + rbw = watcher_get_data (w); + rbw->fd = fd; + rbw->refcnt = 1; + if ((flags & FBUF_WATCHER_LINE_BUFFER)) + rbw->line = true; + if (!(rbw->fbuf = fbuf_create (size)) + || !(rbw->prepare_w = flux_prepare_watcher_create (r, + rbwatcher_prepare_cb, + w)) + || !(rbw->check_w = flux_check_watcher_create (r, + rbwatcher_check_cb, + w)) + || !(rbw->idle_w = flux_idle_watcher_create (r, NULL, NULL)) + || !(rbw->fd_w = flux_fd_watcher_create (r, + fd, + FLUX_POLLIN, + rbwatcher_fd_cb, + w))) + goto error; + fbuf_set_notify (rbw->fbuf, rbwatcher_notify_cb, rbw); return w; - -cleanup: - flux_watcher_destroy (w); +error: + ERRNO_SAFE_WRAP (flux_watcher_destroy, w); return NULL; } +static int validate_rbwatcher (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &rbwatcher_ops) { + errno = EINVAL; + return -1; + } + return 0; +} + struct fbuf *fbuf_read_watcher_get_buffer (flux_watcher_t *w) { - if (w) - return ((struct ev_fbuf_read *)(w->data))->fb; - return NULL; + struct rbwatcher *rbw = watcher_get_data (w); + + if (validate_rbwatcher (w) < 0) + return NULL; + return rbw->fbuf; } const char *fbuf_read_watcher_get_data (flux_watcher_t *w, int *lenp) { - if (w) { - struct ev_fbuf_read *eb = w->data; + struct rbwatcher *rbw = watcher_get_data (w); + + if (validate_rbwatcher (w) < 0) + return NULL; + if (rbw->line) { const char *data; - if (eb->line) { - if (!(data = fbuf_read_line (eb->fb, lenp))) - return NULL; - if (*lenp > 0) - return data; - /* if no space, have to flush data out */ - if (!(*lenp) && !fbuf_space (eb->fb)) - return fbuf_read (eb->fb, -1, lenp); - } - /* Not line-buffered, or reading last bit of data which does - * not contain a newline. Read any data: - */ - return fbuf_read (eb->fb, -1, lenp); - } - errno = EINVAL; - return NULL; + if (!(data = fbuf_read_line (rbw->fbuf, lenp))) + return NULL; + if (*lenp > 0) + return data; + /* if no space, have to flush data out */ + if (!(*lenp) && !fbuf_space (rbw->fbuf)) + return fbuf_read (rbw->fbuf, -1, lenp); + } + /* Not line-buffered, or reading last bit of data which does + * not contain a newline. Read any data: + */ + return fbuf_read (rbw->fbuf, -1, lenp); } void fbuf_read_watcher_incref (flux_watcher_t *w) { - if (w) - ev_fbuf_read_incref ((struct ev_fbuf_read *)w->data); + struct rbwatcher *rbw = watcher_get_data (w); + + if (validate_rbwatcher (w) < 0) + return; + rbw->refcnt++; } void fbuf_read_watcher_decref (flux_watcher_t *w) { - if (w) - ev_fbuf_read_decref ((struct ev_fbuf_read *)w->data); + struct rbwatcher *rbw = watcher_get_data (w); + + if (validate_rbwatcher (w) < 0) + return; + if (--rbw->refcnt == 0) + rbw->eof_read = true; } -static void buffer_write_start (flux_watcher_t *w) +/* Write buffer watcher + */ + +struct wbwatcher { + flux_watcher_t *fd_w; + int fd; + struct fbuf *fbuf; + bool start; /* flag, if user started reactor */ + bool eof; /* flag, eof written */ + bool closed; /* flag, fd has been closed */ + int close_errno; /* errno from close */ + bool initial_space; /* flag, initial space notified */ +}; + +static void wbwatcher_start (flux_watcher_t *w) { - struct ev_fbuf_write *ebw = (struct ev_fbuf_write *)w->data; - ev_fbuf_write_start (w->r->loop, ebw); + struct wbwatcher *wbw = watcher_get_data (w); + + if (!wbw->start) { + /* do not start fd watcher unless + * - we have not sent initial space + * - there is data to be written out + * - notify EOF + */ + if (!wbw->initial_space || fbuf_bytes (wbw->fbuf) || wbw->eof) + flux_watcher_start (wbw->fd_w); + wbw->start = true; + } } -static void buffer_write_stop (flux_watcher_t *w) +static void wbwatcher_stop (flux_watcher_t *w) { - struct ev_fbuf_write *ebw = (struct ev_fbuf_write *)w->data; - ev_fbuf_write_stop (w->r->loop, ebw); + struct wbwatcher *wbw = watcher_get_data (w); + if (wbw->start) { + flux_watcher_stop (wbw->fd_w); + wbw->start = false; + } } -static bool buffer_write_is_active (flux_watcher_t *w) +static bool wbwatcher_is_active (flux_watcher_t *w) { - return ev_fbuf_write_is_active (w->data); + struct wbwatcher *wbw = watcher_get_data (w); + + return flux_watcher_is_active (wbw->fd_w); } -static void buffer_write_destroy (flux_watcher_t *w) +static void wbwatcher_destroy (flux_watcher_t *w) { - struct ev_fbuf_write *ebw = (struct ev_fbuf_write *)w->data; - ev_fbuf_write_cleanup (ebw); + struct wbwatcher *wbw = watcher_get_data (w); + + if (wbw) { + flux_watcher_destroy (wbw->fd_w); + fbuf_destroy (wbw->fbuf); + } } -static void buffer_write_cb (struct ev_loop *loop, - struct ev_fbuf_write *ebw, - int revents) +static void wbwatcher_fd_cb (flux_reactor_t *r, + flux_watcher_t *fd_w, + int revents, + void *arg) { - struct flux_watcher *w = ebw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); + flux_watcher_t *w = arg; + struct wbwatcher *wbw = watcher_get_data (w); + + if (revents & FLUX_POLLOUT) { + int ret; + + /* Send one time notification so user knows initial buffer size */ + if (!wbw->initial_space) { + watcher_call (w, revents); + wbw->initial_space = true; + } + + if ((ret = fbuf_read_to_fd (wbw->fbuf, wbw->fd, -1)) < 0) { + watcher_call (w, FLUX_POLLERR); + return; + } + + if (ret) { + watcher_call (w, revents); + } + + if (!fbuf_bytes (wbw->fbuf) && wbw->eof) { + if (close (wbw->fd) < 0) + wbw->close_errno = errno; + wbw->fd = -1; + wbw->closed = true; + wbw->eof = false; + watcher_call (w, revents); + } + + if (!fbuf_bytes (wbw->fbuf) && !wbw->eof) + flux_watcher_stop (wbw->fd_w); + } + else { + watcher_call (w, revents); + } +} + +static void wbwatcher_notify_cb (struct fbuf *fb, void *arg) +{ + struct wbwatcher *wbw = arg; + + /* data is available, start ev io watcher assuming user has + * started the watcher. */ + if (wbw->start && fbuf_bytes (fb) > 0) + flux_watcher_start (wbw->fd_w); } -static struct flux_watcher_ops buffer_write_watcher = { - .start = buffer_write_start, - .stop = buffer_write_stop, - .destroy = buffer_write_destroy, - .is_active = buffer_write_is_active, +static struct flux_watcher_ops wbwatcher_ops = { + .start = wbwatcher_start, + .stop = wbwatcher_stop, + .destroy = wbwatcher_destroy, + .is_active = wbwatcher_is_active, }; flux_watcher_t *fbuf_write_watcher_create (flux_reactor_t *r, @@ -203,85 +437,78 @@ flux_watcher_t *fbuf_write_watcher_create (flux_reactor_t *r, int flags, void *arg) { - struct ev_fbuf_write *ebw; + struct wbwatcher *wbw; flux_watcher_t *w = NULL; - int fd_flags; - - if (fd < 0) { - errno = EINVAL; - return NULL; - } - if ((fd_flags = fd_get_flags (fd)) < 0) - return NULL; - if (!(fd_flags & O_NONBLOCK)) { - errno = EINVAL; + if (validate_fd_nonblock (fd) < 0) return NULL; - } - - if (!(w = watcher_create (r, - sizeof (*ebw), - &buffer_write_watcher, - cb, - arg))) - goto cleanup; - - ebw = watcher_get_data (w); - - if (ev_fbuf_write_init (ebw, - fd, - size, - buffer_write_cb, - r->loop) < 0) - goto cleanup; - - ebw->data = w; - + if (!(w = watcher_create (r, sizeof (*wbw), &wbwatcher_ops, cb, arg))) + goto error; + wbw = watcher_get_data (w); + wbw->fd = fd; + if (!(wbw->fbuf = fbuf_create (size)) + || !(wbw->fd_w = flux_fd_watcher_create (r, + fd, + FLUX_POLLOUT, + wbwatcher_fd_cb, + w))) + goto error; + fbuf_set_notify (wbw->fbuf, wbwatcher_notify_cb, wbw); return w; -cleanup: +error: flux_watcher_destroy (w); return NULL; } +static int validate_wbwatcher (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &wbwatcher_ops) { + errno = EINVAL; + return -1; + } + return 0; +} + struct fbuf *fbuf_write_watcher_get_buffer (flux_watcher_t *w) { - if (w) - return ((struct ev_fbuf_write *)(w->data))->fb; - return NULL; + struct wbwatcher *wbw = watcher_get_data (w); + + if (validate_wbwatcher (w) < 0) + return NULL; + return wbw->fbuf; } int fbuf_write_watcher_close (flux_watcher_t *w) { - struct ev_fbuf_write *evw; - if (!w) { - errno = EINVAL; - return (-1); - } - evw = w->data; - if (evw->eof) { + struct wbwatcher *wbw = watcher_get_data (w); + + if (validate_wbwatcher (w) < 0) + return -1; + if (wbw->eof) { errno = EINPROGRESS; - return (-1); + return -1; } - if (evw->closed) { + if (wbw->closed) { errno = EINVAL; - return (-1); + return -1; } - evw->eof = true; - fbuf_readonly (evw->fb); - ev_fbuf_write_wakeup (evw); - return (0); + wbw->eof = true; + fbuf_readonly (wbw->fbuf); + if (wbw->start) + flux_watcher_start (wbw->fd_w); + return 0; } int fbuf_write_watcher_is_closed (flux_watcher_t *w, int *errp) { - if (w) { - struct ev_fbuf_write *evw = w->data; - if (evw->closed && errp != NULL) - *errp = evw->close_errno; - return (evw->closed); - } - return (0); + struct wbwatcher *wbw = watcher_get_data (w); + + if (validate_wbwatcher (w) < 0) + return 0; + if (wbw->closed && errp != NULL) + *errp = wbw->close_errno; + return wbw->closed; } /* diff --git a/src/common/libzmqutil/Makefile.am b/src/common/libzmqutil/Makefile.am index db4ab41952d6..e4f12e3064dc 100644 --- a/src/common/libzmqutil/Makefile.am +++ b/src/common/libzmqutil/Makefile.am @@ -20,10 +20,8 @@ noinst_LTLIBRARIES = \ libzmqutil_la_SOURCES = \ msg_zsock.h \ msg_zsock.c \ - reactor.h \ - reactor.c \ - ev_zmq.h \ - ev_zmq.c \ + zwatcher.h \ + zwatcher.c \ zap.h \ zap.c \ monitor.h \ @@ -36,8 +34,7 @@ libzmqutil_la_SOURCES = \ cert.c TESTS = test_msg_zsock.t \ - test_reactor.t \ - test_ev.t \ + test_zwatcher.t \ test_zap.t \ test_monitor.t \ test_mpart.t \ @@ -72,15 +69,10 @@ test_msg_zsock_t_CPPFLAGS = $(test_cppflags) test_msg_zsock_t_LDADD = $(test_ldadd) test_msg_zsock_t_LDFLAGS = $(test_ldflags) -test_reactor_t_SOURCES = test/reactor.c -test_reactor_t_CPPFLAGS = $(test_cppflags) -test_reactor_t_LDADD = $(test_ldadd) -test_reactor_t_LDFLAGS = $(test_ldflags) - -test_ev_t_SOURCES = test/ev.c -test_ev_t_CPPFLAGS = $(test_cppflags) -test_ev_t_LDADD = $(test_ldadd) -test_ev_t_LDFLAGS = $(test_ldflags) +test_zwatcher_t_SOURCES = test/zwatcher.c +test_zwatcher_t_CPPFLAGS = $(test_cppflags) +test_zwatcher_t_LDADD = $(test_ldadd) +test_zwatcher_t_LDFLAGS = $(test_ldflags) test_zap_t_SOURCES = test/zap.c test_zap_t_CPPFLAGS = $(test_cppflags) diff --git a/src/common/libzmqutil/ev_zmq.c b/src/common/libzmqutil/ev_zmq.c deleted file mode 100644 index e8e1c768d3ca..000000000000 --- a/src/common/libzmqutil/ev_zmq.c +++ /dev/null @@ -1,124 +0,0 @@ -/************************************************************\ - * Copyright 2015 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -/* ev_zmq.c - an aggregate libev watcher for 0MQ sockets */ - -/* Thanks to Bert JW Regeer for a helpful blog on integrating 0MQ with libev: - * http://funcptr.net/2013/04/20/embedding-zeromq-in-the-libev-event-loop/ - * Also ref: libzmq zmq_poll() source, czmq zloop() source - * - * Brief summary of 0MQ integration: - * - 0MQ provides ZMQ_EVENTS getsockopt to test whether a 0MQ socket is - * writeable or readable. - * - 0MQ provides ZMQ_FD getsockopt to obtain the fd of a mailbox that - * becomes readable when ZMQ_EVENTS != 0 (edge triggered) - * - libev prepare/check callbacks are used to test ZMQ_EVENTS, make user - * callbacks, and enable/disable no-op io and idle watchers. - * - while ZMQ_EVENTS != 0, enable no-op idle watcher (no callback) - * so that the libev loop will continue looping, executing prepare/check - * - when ZMQ_EVENTS == 0, enable no-op io watcher on ZMQ_FD (no callback) - * so that the libev loop will unblock, executing prepare/check - * on the next mailbox event - */ - -#if HAVE_CONFIG_H -#include "config.h" -#endif -#include -#include - -#include "src/common/libev/ev.h" -#include "ev_zmq.h" - -static void prepare_cb (struct ev_loop *loop, ev_prepare *w, int revents) -{ - ev_zmq *zw = (ev_zmq *)((char *)w - offsetof (ev_zmq, prepare_w)); - uint32_t zevents = 0; - size_t zevents_size = sizeof (zevents); - - if (zw->zsock == NULL) - ev_idle_start (loop, &zw->idle_w); - else if (zmq_getsockopt (zw->zsock, - ZMQ_EVENTS, - &zevents, - &zevents_size) < 0) - ev_idle_start (loop, &zw->idle_w); - else if ((revents = ztoe (zevents) & zw->events)) - ev_idle_start (loop, &zw->idle_w); - else - ev_io_start (loop, &zw->io_w); -} - -static void check_cb (struct ev_loop *loop, ev_check *w, int revents) -{ - ev_zmq *zw = (ev_zmq *)((char *)w - offsetof (ev_zmq, check_w)); - uint32_t zevents = 0; - size_t zevents_size = sizeof (zevents); - - ev_io_stop (loop, &zw->io_w); - ev_idle_stop (loop, &zw->idle_w); - - if (zw->zsock == NULL) - zw->cb (loop, zw, EV_ERROR); - else if (ev_is_pending (&zw->io_w) - && ev_clear_pending (loop, &zw->io_w) & EV_ERROR) - zw->cb (loop, zw, EV_ERROR); - else if (zmq_getsockopt (zw->zsock, - ZMQ_EVENTS, - &zevents, - &zevents_size) < 0) - zw->cb (loop, zw, EV_ERROR); - else if ((revents = ztoe (zevents) & zw->events)) - zw->cb (loop, zw, revents); -} - -int ev_zmq_init (ev_zmq *w, ev_zmq_cb cb, void *zsock, int events) -{ - w->cb = cb; - w->zsock = zsock; - w->events = events; - size_t fd_size = sizeof (w->fd); - - if (zsock == NULL) - return -1; - if (zmq_getsockopt (zsock, ZMQ_FD, &w->fd, &fd_size) < 0) - return -1; - - ev_prepare_init (&w->prepare_w, prepare_cb); - ev_check_init (&w->check_w, check_cb); - ev_idle_init (&w->idle_w, NULL); - ev_io_init (&w->io_w, NULL, w->fd, EV_READ); - - return 0; -} - -void ev_zmq_start (struct ev_loop *loop, ev_zmq *w) -{ - ev_prepare_start (loop, &w->prepare_w); - ev_check_start (loop, &w->check_w); -} - -void ev_zmq_stop (struct ev_loop *loop, ev_zmq *w) -{ - ev_prepare_stop (loop, &w->prepare_w); - ev_check_stop (loop, &w->check_w); - ev_io_stop (loop, &w->io_w); - ev_idle_stop (loop, &w->idle_w); -} - -bool ev_zmq_is_active (ev_zmq *w) -{ - return ev_is_active (&w->prepare_w); -} - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ - diff --git a/src/common/libzmqutil/ev_zmq.h b/src/common/libzmqutil/ev_zmq.h deleted file mode 100644 index f5a2a7b2c7d7..000000000000 --- a/src/common/libzmqutil/ev_zmq.h +++ /dev/null @@ -1,78 +0,0 @@ -/************************************************************\ - * Copyright 2015 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -#ifndef _EV_ZMQ_H -#define _EV_ZMQ_H - -#include -#include "src/common/libev/ev.h" - -typedef struct ev_zmq_struct ev_zmq; -typedef void (*ev_zmq_cb)(struct ev_loop *loop, ev_zmq *w, int revents); - -struct ev_zmq_struct { - ev_io io_w; - ev_prepare prepare_w; - ev_idle idle_w; - ev_check check_w; - void *zsock; - int fd; - int events; - ev_zmq_cb cb; - void *data; -}; - -int ev_zmq_init (ev_zmq *w, ev_zmq_cb cb, void *zsock, int events); -void ev_zmq_start (struct ev_loop *loop, ev_zmq *w); -void ev_zmq_stop (struct ev_loop *loop, ev_zmq *w); -bool ev_zmq_is_active (ev_zmq *w); - -/* Convert zeromq poll bits to libev's, for construction of 'events' - * when registering a watcher. - */ -static __inline__ int ztoe (int z) -{ - int e = 0; - if ((z & ZMQ_POLLIN)) - e |= EV_READ; - if ((z & ZMQ_POLLOUT)) - e |= EV_WRITE; -#if 0 - /* Note: libev will assert if EV_ERROR is included in 'events'. - * If there is an error, libev will call your callback with EV_ERROR set - * whether you request it or not. Silently ignore ZMQ_POLLERR here. - */ - if ((z & ZMQ_POLLERR)) - e |= EV_ERROR; -#endif - return e; -} - -/* Convert libev poll bits to zeromq's, for interpreting 'revents' from - * a libev callback in zeromq context. - */ -static __inline__ int etoz (int e) -{ - int z = 0; - if ((e & EV_READ)) - z |= ZMQ_POLLIN; - if ((e & EV_WRITE)) - z |= ZMQ_POLLOUT; - if ((e & EV_ERROR)) - z |= ZMQ_POLLERR; - return z; -} - -#endif /* !_EV_ZMQ_H */ - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ - diff --git a/src/common/libzmqutil/monitor.c b/src/common/libzmqutil/monitor.c index 812daf1c048c..4fdd13e6cd06 100644 --- a/src/common/libzmqutil/monitor.c +++ b/src/common/libzmqutil/monitor.c @@ -17,7 +17,7 @@ #include "ccan/array_size/array_size.h" -#include "reactor.h" +#include "zwatcher.h" #include "sockopt.h" #include "monitor.h" diff --git a/src/common/libzmqutil/reactor.c b/src/common/libzmqutil/reactor.c deleted file mode 100644 index a0794ce1147a..000000000000 --- a/src/common/libzmqutil/reactor.c +++ /dev/null @@ -1,88 +0,0 @@ -/************************************************************\ - * Copyright 2021 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -#if HAVE_CONFIG_H -#include "config.h" -#endif -#include -#include -#include -#include -#include -#include - -#include "src/common/libev/ev.h" -#include "src/common/libflux/reactor_private.h" - -#include "ev_zmq.h" -#include "reactor.h" - -/* 0MQ sockets - */ - -static void zmq_start (flux_watcher_t *w) -{ - ev_zmq_start (w->r->loop, (ev_zmq *)w->data); -} - -static void zmq_stop (flux_watcher_t *w) -{ - ev_zmq_stop (w->r->loop, (ev_zmq *)w->data); -} - -static bool zmq_is_active (flux_watcher_t *w) -{ - return ev_zmq_is_active (w->data); -} - -static void zmq_cb (struct ev_loop *loop, ev_zmq *pw, int revents) -{ - struct flux_watcher *w = pw->data; - if (w->fn) - w->fn (ev_userdata (loop), w, libev_to_events (revents), w->arg); -} - -static struct flux_watcher_ops zmq_watcher = { - .start = zmq_start, - .stop = zmq_stop, - .destroy = NULL, - .is_active = zmq_is_active, -}; - -flux_watcher_t *zmqutil_watcher_create (flux_reactor_t *r, - void *zsock, int events, - flux_watcher_f cb, void *arg) -{ - ev_zmq *zw; - flux_watcher_t *w; - - if (!(w = watcher_create (r, sizeof (*zw), &zmq_watcher, cb, arg))) - return NULL; - zw = watcher_get_data (w); - ev_zmq_init (zw, zmq_cb, zsock, events_to_libev (events) & ~EV_ERROR); - zw->data = w; - - return w; -} - -void *zmqutil_watcher_get_zsock (flux_watcher_t *w) -{ - if (watcher_get_ops (w) != &zmq_watcher) { - errno = EINVAL; - return NULL; - } - ev_zmq *zw = w->data; - return zw->zsock; -} - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ - diff --git a/src/common/libzmqutil/test/ev.c b/src/common/libzmqutil/test/ev.c deleted file mode 100644 index ffac32c92748..000000000000 --- a/src/common/libzmqutil/test/ev.c +++ /dev/null @@ -1,118 +0,0 @@ -/************************************************************\ - * Copyright 2014 Lawrence Livermore National Security, LLC - * (c.f. AUTHORS, NOTICE.LLNS, COPYING) - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * SPDX-License-Identifier: LGPL-3.0 -\************************************************************/ - -/* ev.c - standalone test of libev ev_zmq watcher (no flux) */ - -#if HAVE_CONFIG_H -#include "config.h" -#endif -#include -#include -#include - -#include "src/common/libzmqutil/ev_zmq.h" -#include "src/common/libtap/tap.h" - -static void *zctx; - -void zsock_tx_cb (struct ev_loop *loop, ev_zmq *w, int revents) -{ - static int count = 50; /* send two per invocation */ - - if ((revents & EV_WRITE)) { - if (zmq_send (w->zsock, "PING", 4, 0) < 0) - fprintf (stderr, "zmq_send: %s", strerror (errno)); - if (zmq_send (w->zsock, "PING", 4, 0) < 0) - fprintf (stderr, "zmq_send: %s", strerror (errno)); - if (--count == 0) - ev_zmq_stop (loop, w); - } - if ((revents & EV_ERROR)) - ev_break (loop, EVBREAK_ALL); -} - -void zsock_rx_cb (struct ev_loop *loop, ev_zmq *w, int revents) -{ - int *iter = w->data; - char buf[128]; - static int count = 100; - - if ((revents & EV_READ)) { - (*iter)++; - if (zmq_recv (w->zsock, buf, sizeof (buf), 0) < 0) - fprintf (stderr, "zstr_recv: %s", strerror (errno)); - if (--count == 0) - ev_zmq_stop (loop, w); - } - if ((revents & EV_ERROR)) - ev_break (loop, EVBREAK_ALL); -} - - -/* send 100 messages over PAIR sockets - * sender in one event handler, receiver in another - */ -void test_ev_zmq (void) -{ - struct ev_loop *loop; - void *zctx; - void *zin, *zout; - int i; - ev_zmq win, wout; - - ok ((loop = ev_loop_new (EVFLAG_AUTO)) != NULL, - "ev_loop_new works"); - ok ((zctx = zmq_init (1)) != NULL, - "initialized zmq context"); - ok ((zout = zmq_socket (zctx, ZMQ_PAIR)) != NULL - && zmq_bind (zout, "inproc://eventloop_test") == 0, - "PAIR socket bind ok"); - ok ((zin = zmq_socket (zctx, ZMQ_PAIR)) != NULL - && zmq_connect (zin, "inproc://eventloop_test") == 0, - "PAIR socket connect ok"); - - i = 0; - ev_zmq_init (&win, zsock_rx_cb, zin, EV_READ); - win.data = &i; - ev_zmq_init (&wout, zsock_tx_cb, zout, EV_WRITE); - - ev_zmq_start (loop, &win); - ev_zmq_start (loop, &wout); - - ok (ev_run (loop, 0) == 0, - "both watchers removed themselves and ev_run exited"); - ev_zmq_stop (loop, &win); - ev_zmq_stop (loop, &wout); - cmp_ok (i, "==", 100, - "ev_zmq handler ran 100 times"); - - ev_loop_destroy (loop); - - zmq_close (zin); - zmq_close (zout); -} - -int main (int argc, char *argv[]) -{ - plan (NO_PLAN); - - if (!(zctx = zmq_ctx_new ())) - BAIL_OUT ("could not create zeromq context"); - - test_ev_zmq (); - - zmq_ctx_term (zctx); - - done_testing (); -} - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ diff --git a/src/common/libzmqutil/test/reactor.c b/src/common/libzmqutil/test/zwatcher.c similarity index 91% rename from src/common/libzmqutil/test/reactor.c rename to src/common/libzmqutil/test/zwatcher.c index bcdb9cee019d..3de987b13c8d 100644 --- a/src/common/libzmqutil/test/reactor.c +++ b/src/common/libzmqutil/test/zwatcher.c @@ -20,13 +20,15 @@ #include "src/common/libtap/tap.h" -#include "src/common/libzmqutil/reactor.h" +#include "zwatcher.h" static const size_t zmqwriter_msgcount = 1024; static void *zctx; -static void zmqwriter (flux_reactor_t *r, flux_watcher_t *w, - int revents, void *arg) +static void zmqwriter (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) { void *sock = zmqutil_watcher_get_zsock (w); static int count = 0; @@ -49,8 +51,10 @@ static void zmqwriter (flux_reactor_t *r, flux_watcher_t *w, flux_reactor_stop_error (r); } -static void zmqreader (flux_reactor_t *r, flux_watcher_t *w, - int revents, void *arg) +static void zmqreader (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) { void *sock = zmqutil_watcher_get_zsock (w); static int count = 0; diff --git a/src/common/libzmqutil/zap.c b/src/common/libzmqutil/zap.c index 69711789a50a..91876cd61ce3 100644 --- a/src/common/libzmqutil/zap.c +++ b/src/common/libzmqutil/zap.c @@ -21,7 +21,7 @@ #include "src/common/libutil/errno_safe.h" #include "src/common/libczmqcontainers/czmq_containers.h" -#include "reactor.h" +#include "zwatcher.h" #include "sockopt.h" #include "mpart.h" #include "cert.h" diff --git a/src/common/libzmqutil/zwatcher.c b/src/common/libzmqutil/zwatcher.c new file mode 100644 index 000000000000..c3dd62cdb481 --- /dev/null +++ b/src/common/libzmqutil/zwatcher.c @@ -0,0 +1,236 @@ +/************************************************************\ + * Copyright 2021 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* zwatcher.c - an aggregate libev watcher for 0MQ sockets */ + +/* Thanks to Bert JW Regeer for a helpful blog on integrating 0MQ with libev: + * http://funcptr.net/2013/04/20/embedding-zeromq-in-the-libev-event-loop/ + * Also ref: libzmq zmq_poll() source, czmq zloop() source + * + * Brief summary of 0MQ integration: + * - 0MQ provides ZMQ_EVENTS getsockopt to test whether a 0MQ socket is + * writeable or readable. + * - 0MQ provides ZMQ_FD getsockopt to obtain the fd of a mailbox that + * becomes readable when ZMQ_EVENTS != 0 (edge triggered) + * - prepare/check watchers are used to test ZMQ_EVENTS, make user + * callbacks, and enable/disable no-op io and idle watchers. + * - while ZMQ_EVENTS != 0, enable no-op idle watcher (no callback) + * so that the event loop will continue looping, executing prepare/check + * - when ZMQ_EVENTS == 0, enable no-op io watcher on ZMQ_FD (no callback) + * so that the event loop will unblock, executing prepare/check + * on the next mailbox event + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include + +#include "src/common/libflux/watcher_private.h" +#include "src/common/libutil/errno_safe.h" + +#include "zwatcher.h" + +struct zwatcher { + flux_watcher_t *fd_w; + flux_watcher_t *prepare_w; + flux_watcher_t *idle_w; + flux_watcher_t *check_w; + void *zsock; + int events; +}; + +static int get_zmq_fd (void *zsock, int *fd) +{ + int val; + size_t size = sizeof (val); + + if (!zsock) { + errno = EINVAL; + return -1; + } + if (zmq_getsockopt (zsock, ZMQ_FD, &val, &size) < 0) + return -1; + *fd = val; + return 0; +} + +static int ztof (int zevents) +{ + int fevents = 0; + if ((zevents & ZMQ_POLLIN)) + fevents |= FLUX_POLLIN; + if ((zevents & ZMQ_POLLOUT)) + fevents |= FLUX_POLLOUT; + return fevents; +} + +static int get_zmq_events (void *zsock, int *events) +{ + int val; + size_t size = sizeof (val); + + if (!zsock) { + errno = EINVAL; + return -1; + } + if (zmq_getsockopt (zsock, ZMQ_EVENTS, &val, &size) < 0) + return -1; + *events = ztof (val); + return 0; +} + +static void zwatcher_start (flux_watcher_t *w) +{ + struct zwatcher *zw = watcher_get_data (w); + + flux_watcher_start (zw->prepare_w); + flux_watcher_start (zw->check_w); +} + +static void zwatcher_stop (flux_watcher_t *w) +{ + struct zwatcher *zw = watcher_get_data (w); + + flux_watcher_stop (zw->prepare_w); + flux_watcher_stop (zw->check_w); + flux_watcher_stop (zw->fd_w); + flux_watcher_stop (zw->idle_w); +} + +static bool zwatcher_is_active (flux_watcher_t *w) +{ + struct zwatcher *zw = watcher_get_data (w); + + return flux_watcher_is_active (zw->prepare_w); +} + +static void zwatcher_destroy (flux_watcher_t *w) +{ + struct zwatcher *zw = watcher_get_data (w); + if (zw) { + flux_watcher_destroy (zw->prepare_w); + flux_watcher_destroy (zw->check_w); + flux_watcher_destroy (zw->fd_w); + flux_watcher_destroy (zw->idle_w); + } +} + +static void prepare_cb (flux_reactor_t *r, + flux_watcher_t *prepare_w, + int ignore, + void *arg) +{ + flux_watcher_t *w = arg; + struct zwatcher *zw = watcher_get_data (w); + int zevents; + + if (get_zmq_events (zw->zsock, &zevents) < 0) + zevents = FLUX_POLLERR; + + if ((zevents & zw->events)) + flux_watcher_start (zw->idle_w); + else + flux_watcher_start (zw->fd_w); +} + +static void check_cb (flux_reactor_t *r, + flux_watcher_t *check_w, + int ignore, + void *arg) +{ + flux_watcher_t *w = arg; + struct zwatcher *zw = watcher_get_data (w); + int zevents; + int revents; + + flux_watcher_stop (zw->fd_w); + flux_watcher_stop (zw->idle_w); + + if (get_zmq_events (zw->zsock, &zevents) < 0) + zevents = FLUX_POLLERR; + revents = (zevents & zw->events); + + if (revents) + watcher_call (w, revents); +} + +/* N.B. The internal fd watcher is only used for its side effect of + * unblocking the reactor when pollevents edge triggers from "no events" + * to "some events". The prep/check watchers do the heavy lifting. + * This callback exists only to handle POLLERR in case something goes wrong. + */ +static void fd_cb (flux_reactor_t *r, + flux_watcher_t *fd_w, + int revents, + void *arg) +{ + flux_watcher_t *w = arg; + + if ((revents & FLUX_POLLERR)) + watcher_call (w, FLUX_POLLERR); +} + +static struct flux_watcher_ops zwatcher_ops = { + .start = zwatcher_start, + .stop = zwatcher_stop, + .destroy = zwatcher_destroy, + .is_active = zwatcher_is_active, +}; + +flux_watcher_t *zmqutil_watcher_create (flux_reactor_t *r, + void *zsock, + int events, + flux_watcher_f cb, + void *arg) +{ + struct zwatcher *zw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*zw), &zwatcher_ops, cb, arg))) + return NULL; + zw = watcher_get_data (w); + zw->events = events | FLUX_POLLERR; + zw->zsock = zsock; + + if (!(zw->prepare_w = flux_prepare_watcher_create (r, prepare_cb, w)) + || !(zw->check_w = flux_check_watcher_create (r, check_cb, w)) + || !(zw->idle_w = flux_idle_watcher_create (r, NULL, NULL))) + goto error; + + int fd; + if (get_zmq_fd (zsock, &fd) < 0 + || !(zw->fd_w = flux_fd_watcher_create (r, fd, FLUX_POLLIN, fd_cb, w))) + goto error; + return w; +error: + ERRNO_SAFE_WRAP (flux_watcher_destroy, w); + return NULL; +} + +void *zmqutil_watcher_get_zsock (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &zwatcher_ops) { + errno = EINVAL; + return NULL; + } + struct zwatcher *zw = watcher_get_data (w); + return zw->zsock; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/common/libzmqutil/reactor.h b/src/common/libzmqutil/zwatcher.h similarity index 66% rename from src/common/libzmqutil/reactor.h rename to src/common/libzmqutil/zwatcher.h index 86f2874476cb..1d2fe0fb97dc 100644 --- a/src/common/libzmqutil/reactor.h +++ b/src/common/libzmqutil/zwatcher.h @@ -8,31 +8,22 @@ * SPDX-License-Identifier: LGPL-3.0 \************************************************************/ -#ifndef _ZMQUTIL_REACTOR_H -#define _ZMQUTIL_REACTOR_H +#ifndef _ZMQUTIL_ZWATCHER_H +#define _ZMQUTIL_ZWATCHER_H #include #include #include -#ifdef __cplusplus -extern "C" { -#endif - -/* zmq socket - */ - flux_watcher_t *zmqutil_watcher_create (flux_reactor_t *r, - void *zsock, int events, - flux_watcher_f cb, void *arg); + void *zsock, + int events, + flux_watcher_f cb, + void *arg); void *zmqutil_watcher_get_zsock (flux_watcher_t *w); -#ifdef __cplusplus -} -#endif - -#endif /* !_ZMQUTIL_REACTOR_H */ +#endif /* !_ZMQUTIL_ZWATCHER_H */ /* * vi:tabstop=4 shiftwidth=4 expandtab diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index f1a1d7bd3e20..14197ef31716 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -144,6 +144,7 @@ job_exec_la_LIBADD = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libflux/libflux.la \ $(JANSSON_LIBS) job_exec_la_LDFLAGS = $(fluxmod_ldflags) -module @@ -208,6 +209,7 @@ job_manager_la_LIBADD = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libflux/libflux.la \ $(top_builddir)/src/common/libflux-optparse.la \ $(top_builddir)/src/common/librlist/librlist.la \ $(JANSSON_LIBS) @@ -283,6 +285,7 @@ sdbus_la_SOURCES = sdbus_la_LIBADD = \ $(builddir)/sdbus/libsdbus.la \ $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libflux/libflux.la \ $(top_builddir)/src/common/libflux-internal.la \ $(JANSSON_LIBS) if HAVE_LIBSYSTEMD diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index cf7d8923ccc7..f9f6864a85fe 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -112,6 +112,8 @@ plugins_perilog_la_SOURCES = \ plugins_perilog_la_LIBADD = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libflux/libflux.la \ $(top_builddir)/src/common/librlist/librlist.la \ $(top_builddir)/src/common/libjob/libjob.la plugins_perilog_la_LDFLAGS = \ @@ -134,6 +136,7 @@ test_ldadd = \ $(top_builddir)/src/common/librlist/librlist.la \ $(top_builddir)/src/common/libjob/libjob.la \ $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libflux/libflux.la \ $(top_builddir)/src/common/libflux-internal.la \ $(LIBPTHREAD) \ $(JANSSON_LIBS) diff --git a/src/modules/sdbus/watcher.c b/src/modules/sdbus/watcher.c index 6c771833c65a..022dc35fd568 100644 --- a/src/modules/sdbus/watcher.c +++ b/src/modules/sdbus/watcher.c @@ -21,7 +21,7 @@ #include #include -#include "src/common/libflux/reactor_private.h" +#include "src/common/libflux/watcher_private.h" #include "watcher.h" @@ -117,7 +117,7 @@ static void op_stop (flux_watcher_t *w) static bool op_is_active (flux_watcher_t *w) { struct sdbus_watcher *sdw = watcher_get_data (w); - return ev_is_active (sdw->prep); + return flux_watcher_is_active (sdw->prep); } static void op_destroy (flux_watcher_t *w) diff --git a/src/shell/Makefile.am b/src/shell/Makefile.am index 57c51d26cbd6..195bc78fa299 100644 --- a/src/shell/Makefile.am +++ b/src/shell/Makefile.am @@ -116,6 +116,7 @@ flux_shell_LDADD = \ $(top_builddir)/src/common/libflux-taskmap.la \ $(top_builddir)/src/common/libflux-idset.la \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ + $(top_builddir)/src/common/libflux/libflux.la \ $(top_builddir)/src/common/libpmi/libpmi_server.la \ $(top_builddir)/src/common/libpmi/libpmi_common.la \ $(top_builddir)/src/common/libczmqcontainers/libczmqcontainers.la \ diff --git a/t/Makefile.am b/t/Makefile.am index 2c05706d82e1..9cad2e654164 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -749,14 +749,16 @@ reactor_reactorcat_CPPFLAGS = $(test_cppflags) reactor_reactorcat_LDADD = $(test_ldadd) reactor_reactorcat_LDFLAGS = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ - $(test_ldflags) + $(test_ldflags) \ + $(top_builddir)/src/common/libflux/libflux.la rexec_rexec_SOURCES = rexec/rexec.c rexec_rexec_CPPFLAGS = $(test_cppflags) rexec_rexec_LDADD = $(test_ldadd) rexec_rexec_LDFLAGS = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ - $(test_ldflags) + $(test_ldflags) \ + $(top_builddir)/src/common/libflux/libflux.la rexec_rexec_count_stdout_SOURCES = rexec/rexec_count_stdout.c rexec_rexec_count_stdout_CPPFLAGS = $(test_cppflags) @@ -768,7 +770,8 @@ rexec_rexec_getline_CPPFLAGS = $(test_cppflags) rexec_rexec_getline_LDADD = $(test_ldadd) rexec_rexec_getline_LDFLAGS = \ $(top_builddir)/src/common/libsubprocess/libsubprocess.la \ - $(test_ldflags) + $(test_ldflags) \ + $(top_builddir)/src/common/libflux/libflux.la ingest_job_manager_la_SOURCES = ingest/job-manager.c ingest_job_manager_la_CPPFLAGS = $(test_cppflags)