diff --git a/doc/Makefile.am b/doc/Makefile.am index 0721f275b6ef..67718ef14371 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -204,12 +204,13 @@ MAN3_FILES_SECONDARY = \ man3/flux_reactor_run.3 \ man3/flux_reactor_stop.3 \ man3/flux_reactor_stop_error.3 \ - man3/flux_reactor_active_incref.3 \ - man3/flux_reactor_active_decref.3 \ man3/flux_fd_watcher_get_fd.3 \ man3/flux_watcher_stop.3 \ man3/flux_watcher_is_active.3 \ man3/flux_watcher_destroy.3 \ + man3/flux_watcher_ref.3 \ + man3/flux_watcher_unref.3 \ + man3/flux_watcher_is_referenced.3 \ man3/flux_watcher_next_wakeup.3 \ man3/flux_handle_watcher_get_flux.3 \ man3/flux_timer_watcher_reset.3 \ @@ -219,6 +220,7 @@ MAN3_FILES_SECONDARY = \ man3/flux_msg_handler_destroy.3 \ man3/flux_msg_handler_start.3 \ man3/flux_msg_handler_stop.3 \ + man3/flux_get_handle_watcher.3 \ man3/flux_msg_handler_delvec.3 \ man3/flux_child_watcher_get_rpid.3 \ man3/flux_child_watcher_get_rstatus.3 \ diff --git a/doc/man3/flux_msg_handler_create.rst b/doc/man3/flux_msg_handler_create.rst index c32e00897d8c..3853796d1d94 100644 --- a/doc/man3/flux_msg_handler_create.rst +++ b/doc/man3/flux_msg_handler_create.rst @@ -28,6 +28,8 @@ SYNOPSIS void flux_msg_handler_stop (flux_msg_handler_t *mh); + flux_watcher_t *flux_get_handle_watcher(flux_t *h) + Link with :command:`-lflux-core`. DESCRIPTION @@ -46,7 +48,8 @@ The handle :var:`h` is monitored for FLUX_POLLIN events on the :type:`flux_reactor_t` associated with the handle as described in :man3:`flux_set_reactor`. This internal "handle watcher" is started when the first message handler is started, and stopped when the last message handler -is stopped. +is stopped. The handle watcher may be directly accessed with +:func:`flux_get_handle_watcher`. Messages arriving on :var:`h` are internally read and dispatched to matching message handlers. If multiple handlers match the message, the following diff --git a/doc/man3/flux_reactor_create.rst b/doc/man3/flux_reactor_create.rst index 5ab2aafcf63b..84b3d82fa57f 100644 --- a/doc/man3/flux_reactor_create.rst +++ b/doc/man3/flux_reactor_create.rst @@ -21,10 +21,6 @@ SYNOPSIS void flux_reactor_stop_error (flux_reactor_t *r); - void flux_reactor_active_incref (flux_reactor_t *r); - - void flux_reactor_active_decref (flux_reactor_t *r); - Link with :command:`-lflux-core`. DESCRIPTION @@ -81,16 +77,6 @@ The caller should ensure that a valid error code has been assigned to :func:`flux_reactor_create` time. Freeing of the underlying resources will be deferred if there are any remaining watchers associated with the reactor. -:func:`flux_reactor_active_decref` and :func:`flux_reactor_active_incref` -manipulate the reactor's internal count of active watchers. Each active -watcher takes a reference count on the reactor, and the reactor returns -when this count reaches zero. It is useful sometimes to have a watcher that -can remain active without preventing the reactor from exiting. To achieve this, -call :func:`flux_reactor_active_decref` after the watcher is started, and -:func:`flux_reactor_active_incref` before the watcher is stopped. -Remember that destroying an active reactor internally stops it, -so be sure to stop/incref such a watcher first. - RETURN VALUE ============ diff --git a/doc/man3/flux_stat_watcher_create.rst b/doc/man3/flux_stat_watcher_create.rst index dae76688f106..9491df6ff17e 100644 --- a/doc/man3/flux_stat_watcher_create.rst +++ b/doc/man3/flux_stat_watcher_create.rst @@ -22,9 +22,9 @@ SYNOPSIS flux_watcher_f callback, void *arg); - void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev); + int flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev); Link with :command:`-lflux-core`. @@ -55,6 +55,9 @@ RETURN VALUE :func:`flux_stat_watcher_create` returns a :type:`flux_watcher_t` object on success. On error, NULL is returned, and :var:`errno` is set appropriately. +:func:`flux_stat_watcher_get_rstat` returns 0 on success. On error, -1 is +returned and :var:`errno` is set appropriately. + ERRORS ====== @@ -62,6 +65,9 @@ ERRORS ENOMEM Out of memory. +EINVAL + Invalid argument. + RESOURCES ========= diff --git a/doc/man3/flux_watcher_start.rst b/doc/man3/flux_watcher_start.rst index a902021e3625..dee40946042d 100644 --- a/doc/man3/flux_watcher_start.rst +++ b/doc/man3/flux_watcher_start.rst @@ -15,6 +15,12 @@ SYNOPSIS bool flux_watcher_is_active (flux_watcher_t *w); + void flux_watcher_unref (flux_watcher_t *w); + + void flux_watcher_ref (flux_watcher_t *w); + + bool flux_watcher_is_referenced (flux_watcher_t *w); + void flux_watcher_destroy (flux_watcher_t *w); double flux_watcher_next_wakeup (flux_watcher_t *w); @@ -36,6 +42,21 @@ callback. :func:`flux_watcher_is_active` returns a true value if the watcher is active (i.e. it has been started and not yet stopped) and false otherwise. +:func:`flux_watcher_unref` drops the watcher's reference on the reactor. +This function is idempotent. :func:`flux_reactor_run` normally runs until +there are no more active watchers with references. + +:func:`flux_watcher_ref` restores the watcher's reference on the reactor. +This function is idempotent. + +:func:`flux_watcher_is_referenced` returns true if the watcher is referenced. +All watchers are referenced unless updated with :func:`flux_watcher_unref`. + +.. note:: + All watchers in the public API support :func:`flux_watcher_unref`, but some + specialized watchers internal to flux-core do not. If in doubt about whether + the call had any effect, check with :func:`flux_watcher_is_referenced`. + :func:`flux_watcher_destroy` destroys a :type:`flux_watcher_t` object :var:`w`, after stopping it. It is not safe to destroy a watcher object within a :type:`flux_watcher_f` callback. diff --git a/doc/manpages.py b/doc/manpages.py index b5f784fb7c19..98de50741ceb 100644 --- a/doc/manpages.py +++ b/doc/manpages.py @@ -189,6 +189,7 @@ ('man3/flux_msg_handler_create', 'flux_msg_handler_start', 'manage message handlers', [author], 3), ('man3/flux_msg_handler_create', 'flux_msg_handler_stop', 'manage message handlers', [author], 3), ('man3/flux_msg_handler_create', 'flux_msg_handler_create', 'manage message handlers', [author], 3), + ('man3/flux_msg_handler_create', 'flux_get_handle_watcher', 'manage message handlers', [author], 3), ('man3/flux_open', 'flux_clone', 'open/close connection to Flux Message Broker', [author], 3), ('man3/flux_open', 'flux_close', 'open/close connection to Flux Message Broker', [author], 3), ('man3/flux_open', 'flux_open', 'open/close connection to Flux Message Broker', [author], 3), @@ -202,8 +203,6 @@ ('man3/flux_reactor_create', 'flux_reactor_run', 'create/destroy/control event reactor object', [author], 3), ('man3/flux_reactor_create', 'flux_reactor_stop', 'create/destroy/control event reactor object', [author], 3), ('man3/flux_reactor_create', 'flux_reactor_stop_error', 'create/destroy/control event reactor object', [author], 3), - ('man3/flux_reactor_create', 'flux_reactor_active_incref', 'create/destroy/control event reactor object', [author], 3), - ('man3/flux_reactor_create', 'flux_reactor_active_decref', 'create/destroy/control event reactor object', [author], 3), ('man3/flux_reactor_create', 'flux_reactor_create', 'create/destroy/control event reactor object', [author], 3), ('man3/flux_reactor_now', 'flux_reactor_now_update', 'get/update reactor time', [author], 3), ('man3/flux_reactor_now', 'flux_reactor_now', 'get/update reactor time', [author], 3), @@ -282,11 +281,14 @@ ('man3/flux_stat_watcher_create', 'flux_stat_watcher_create', 'create stat watcher', [author], 3), ('man3/flux_timer_watcher_create', 'flux_timer_watcher_reset', 'set/reset a timer', [author], 3), ('man3/flux_timer_watcher_create', 'flux_timer_watcher_create', 'set/reset a timer', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_stop', 'start/stop/destroy/query reactor watcher', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_is_active', 'start/stop/destroy/query reactor watcher', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_destroy', 'start/stop/destroy/query reactor watcher', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_next_wakeup', 'start/stop/destroy/query reactor watcher', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_start', 'start/stop/destroy/query reactor watcher', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_stop', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_is_active', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_destroy', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_next_wakeup', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_start', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_ref', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_unref', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_is_referenced', 'common reactor watcher methods', [author], 3), ('man3/flux_watcher_set_priority', 'flux_watcher_set_priority', 'set watcher priority', [author], 3), ('man3/hostlist_create', 'hostlist_create', 'Manipulate lists of hostnames', [author], 3), ('man3/hostlist_create', 'hostlist_destroy', 'Manipulate lists of hostnames', [author], 3), diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index a6163e2b6129..591c073ccbfb 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -939,3 +939,4 @@ aTGTz cPATH SATTR myprogram +unref diff --git a/src/bindings/python/flux/cli/base.py b/src/bindings/python/flux/cli/base.py index bab650ed43ae..5146534f521c 100644 --- a/src/bindings/python/flux/cli/base.py +++ b/src/bindings/python/flux/cli/base.py @@ -1536,7 +1536,7 @@ def progress_update(self, jobinfo=None, submit=False, submit_failed=False): # Don't let this timer watcher contribute to the reactor's # "active" reference count: - self.flux_handle.reactor_decref() + timer.unref() if submit: self.progress.update( diff --git a/src/bindings/python/flux/core/handle.py b/src/bindings/python/flux/core/handle.py index 85b30d7a69e8..e183d0b3e600 100644 --- a/src/bindings/python/flux/core/handle.py +++ b/src/bindings/python/flux/core/handle.py @@ -331,18 +331,14 @@ def reactor_interrupt(handle, *_args): reactor_interrupted = True handle.reactor_stop(reactor) - with self.signal_watcher_create(signal.SIGINT, reactor_interrupt): + with self.signal_watcher_create(signal.SIGINT, reactor_interrupt) as w: with self.in_reactor(): # This signal watcher should not take a reference on reactor # o/w the reactor may not exit as expected when all other # active watchers and msghandlers are complete. # - self.reactor_active_decref(reactor) + w.unref() rc = self.flux_reactor_run(reactor, flags) - # Re-establish signal watcher reference so reactor refcount - # doesn't underflow when signal watcher is destroyed - # - self.reactor_active_incref(reactor) if reactor_interrupted: raise KeyboardInterrupt Flux.raise_if_exception() @@ -361,16 +357,6 @@ def reactor_stop_error(self, reactor=None): reactor = self.get_reactor() self.flux_reactor_stop_error(reactor) - def reactor_incref(self, reactor=None): - if reactor is None: - reactor = self.get_reactor() - self.reactor_active_incref(reactor) - - def reactor_decref(self, reactor=None): - if reactor is None: - reactor = self.get_reactor() - self.reactor_active_decref(reactor) - def service_register(self, name): return Future(self.flux_service_register(name)) diff --git a/src/bindings/python/flux/core/watchers.py b/src/bindings/python/flux/core/watchers.py index 3df6f19e7729..c16bbd3575f3 100644 --- a/src/bindings/python/flux/core/watchers.py +++ b/src/bindings/python/flux/core/watchers.py @@ -46,6 +46,14 @@ def stop(self): raw.flux_watcher_stop(self.handle) return self + def ref(self): + raw.flux_watcher_ref(self.handle) + return self + + def unref(self): + raw.flux_watcher_unref(self.handle) + return self + def destroy(self): # Remove this watcher from its owning Flux handle # if the handle is still around. A try/except block diff --git a/src/bindings/python/flux/job/watcher.py b/src/bindings/python/flux/job/watcher.py index 09b065254b55..418cf3979efb 100644 --- a/src/bindings/python/flux/job/watcher.py +++ b/src/bindings/python/flux/job/watcher.py @@ -177,7 +177,7 @@ def start(self): # Don't let this timer watcher contribute to the reactor's # "active" reference count: # - self.timer.flux_handle.reactor_decref() + self.timer.unref() super().start() # Override superclass `_t0` attribute to elapsed time is computed # from this value and not the time of super().start(): diff --git a/src/cmd/flux-start.c b/src/cmd/flux-start.c index 4634bcb1c713..de5b73c2f3dc 100644 --- a/src/cmd/flux-start.c +++ b/src/cmd/flux-start.c @@ -1038,13 +1038,9 @@ void start_server_initialize (const char *rundir, bool verbose) log_err_exit ("could not created embedded flux-start server"); if (flux_msg_handler_addvec (ctx.h, htab, NULL, &ctx.handlers) < 0) log_err_exit ("could not register service methods"); - /* Service related watchers: - * - usock server listen fd - * - flux_t handle watcher (adds 2 active prep/check watchers) - */ - int ignore_watchers = 3; - while (ignore_watchers-- > 0) - flux_reactor_active_decref (ctx.reactor); + + flux_watcher_unref (flux_get_handle_watcher (ctx.h)); + flux_watcher_unref (usock_service_listen_watcher (ctx.h)); } void start_server_finalize (void) diff --git a/src/common/libflux/hwatcher.c b/src/common/libflux/hwatcher.c index 3c662a874896..b8b33ac5678a 100644 --- a/src/common/libflux/hwatcher.c +++ b/src/common/libflux/hwatcher.c @@ -46,6 +46,26 @@ static void hwatcher_stop (flux_watcher_t *w) flux_watcher_stop (hw->idle_w); } +static void hwatcher_ref (flux_watcher_t *w) +{ + struct hwatcher *hw = watcher_get_data (w); + + flux_watcher_ref (hw->fd_w); + flux_watcher_ref (hw->prepare_w); + flux_watcher_ref (hw->idle_w); + flux_watcher_ref (hw->check_w); +} + +static void hwatcher_unref (flux_watcher_t *w) +{ + struct hwatcher *hw = watcher_get_data (w); + + flux_watcher_unref (hw->fd_w); + flux_watcher_unref (hw->prepare_w); + flux_watcher_unref (hw->idle_w); + flux_watcher_unref (hw->check_w); +} + static bool hwatcher_is_active (flux_watcher_t *w) { struct hwatcher *hw = watcher_get_data (w); @@ -106,6 +126,8 @@ static void hwatcher_check_cb (flux_reactor_t *r, static struct flux_watcher_ops hwatcher_ops = { .start = hwatcher_start, .stop = hwatcher_stop, + .ref = hwatcher_ref, + .unref = hwatcher_unref, .is_active = hwatcher_is_active, .destroy = hwatcher_destroy, }; diff --git a/src/common/libflux/msg_handler.c b/src/common/libflux/msg_handler.c index fb948b772a3f..544df36c965c 100644 --- a/src/common/libflux/msg_handler.c +++ b/src/common/libflux/msg_handler.c @@ -729,6 +729,12 @@ int flux_dispatch_requeue (flux_t *h) return 0; } +flux_watcher_t *flux_get_handle_watcher (flux_t *h) +{ + struct dispatch *d = dispatch_get (h); + return d ? d->w : NULL; +} + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/common/libflux/msg_handler.h b/src/common/libflux/msg_handler.h index da71f6f9b0ae..466e8cd27754 100644 --- a/src/common/libflux/msg_handler.h +++ b/src/common/libflux/msg_handler.h @@ -62,6 +62,9 @@ void flux_msg_handler_delvec (flux_msg_handler_t *msg_handlers[]); */ int flux_dispatch_requeue (flux_t *h); +// accessor for start/stop/ref/unref +flux_watcher_t *flux_get_handle_watcher (flux_t *h); + #ifdef __cplusplus } #endif diff --git a/src/common/libflux/reactor.c b/src/common/libflux/reactor.c index 767476bcb222..a587c5c59394 100644 --- a/src/common/libflux/reactor.c +++ b/src/common/libflux/reactor.c @@ -11,7 +11,6 @@ #if HAVE_CONFIG_H #include "config.h" #endif -#include #include #include #include @@ -127,18 +126,6 @@ void flux_reactor_stop_error (flux_reactor_t *r) ev_break (r->loop, EVBREAK_ALL); } -void flux_reactor_active_incref (flux_reactor_t *r) -{ - if (r) - ev_ref (r->loop); -} - -void flux_reactor_active_decref (flux_reactor_t *r) -{ - if (r) - ev_unref (r->loop); -} - void *reactor_get_loop (flux_reactor_t *r) { return r ? r->loop : NULL; diff --git a/src/common/libflux/reactor.h b/src/common/libflux/reactor.h index bce296edd74e..609482390f1c 100644 --- a/src/common/libflux/reactor.h +++ b/src/common/libflux/reactor.h @@ -48,13 +48,6 @@ double flux_reactor_now (flux_reactor_t *r); void flux_reactor_now_update (flux_reactor_t *r); double flux_reactor_time (void); -/* Change reactor reference count. - * Each active watcher holds a reference. - * When the reference count reaches zero, the reactor loop exits. - */ -void flux_reactor_active_incref (flux_reactor_t *r); -void flux_reactor_active_decref (flux_reactor_t *r); - #ifdef __cplusplus } #endif diff --git a/src/common/libflux/test/reactor.c b/src/common/libflux/test/reactor.c index d500464cef38..f0b596f3c91f 100644 --- a/src/common/libflux/test/reactor.c +++ b/src/common/libflux/test/reactor.c @@ -25,6 +25,58 @@ #include "src/common/libtap/tap.h" #include "ccan/array_size/array_size.h" +void watcher_is (flux_watcher_t *w, + bool exp_active, + bool exp_referenced, + const char *name, + const char *what) +{ + bool is_active = flux_watcher_is_active (w); + bool is_referenced = flux_watcher_is_referenced (w); + + ok (is_active == exp_active && is_referenced == exp_referenced, + "%s %sact%sref after %s", + name, + exp_active ? "+" : "-", + exp_referenced ? "+" : "-", + what); + if (is_active != exp_active) + diag ("%s is unexpectedly %sact", name, is_active ? "+" : "-"); + if (is_referenced != exp_referenced) + diag ("%s is unexpectedly %sref", name, is_referenced ? "+" : "-"); +} + +/* Call this on newly created watcher to check start/stop/is_active and + * ref/unref/is_referenced basics. + */ +void generic_watcher_check (flux_watcher_t *w, const char *name) +{ + /* ref/unref while inactive causes ev_ref/ev_unref to run + * in the start/stop callbacks + */ + watcher_is (w, false, true, name, "init"); + flux_watcher_unref (w); + watcher_is (w, false, false, name, "unref"); + flux_watcher_start (w); + watcher_is (w, true, false, name, "start"); + flux_watcher_stop (w); + watcher_is (w, false, false, name, "stop"); + flux_watcher_ref (w); + watcher_is (w, false, true, name, "ref"); + + /* ref/unref while active causes ev_ref/ev_unref to run + * in the ref/unref callbacks + */ + flux_watcher_start (w); + watcher_is (w, true, true, name, "start"); + flux_watcher_unref (w); + watcher_is (w, true, false, name, "unref"); + flux_watcher_ref (w); + watcher_is (w, true, true, name, "ref"); + flux_watcher_stop (w); + watcher_is (w, false, true, name, "stop"); +} + static const size_t fdwriter_bufsize = 10*1024*1024; static void fdwriter (flux_reactor_t *r, @@ -117,11 +169,8 @@ static void test_fd (flux_reactor_t *reactor) w = flux_fd_watcher_create (reactor, fd[1], FLUX_POLLOUT, fdwriter, NULL); ok (r != NULL && w != NULL, "fd: reader and writer created"); - ok (!flux_watcher_is_active (r), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "fd"); flux_watcher_start (r); - ok (flux_watcher_is_active (r), - "flux_watcher_is_active() returns true after flux_watcher_start()"); flux_watcher_start (w); ok (flux_reactor_run (reactor, 0) == 0, "fd: reactor ran to completion after %lu bytes", fdwriter_bufsize); @@ -177,11 +226,8 @@ static void test_timer (flux_reactor_t *reactor) "timer: creating negative repeat fails with EINVAL"); ok ((w = flux_timer_watcher_create (reactor, 0, 0, oneshot, NULL)) != NULL, "timer: creating zero timeout oneshot works"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "timer"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); oneshot_runs = 0; t0 = flux_reactor_now (reactor); ok (flux_reactor_run (reactor, 0) == 0, @@ -292,11 +338,9 @@ static void test_periodic (flux_reactor_t *reactor) ok ((w = flux_periodic_watcher_create (reactor, 0, 0, NULL, oneshot, NULL)) != NULL, "periodic: creating zero offset/interval works"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "periodic"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); + oneshot_runs = 0; ok (flux_reactor_run (reactor, 0) == 0, "periodic: reactor ran to completion"); @@ -380,11 +424,8 @@ static void test_idle (flux_reactor_t *reactor) w = flux_idle_watcher_create (reactor, idle_cb, NULL); ok (w != NULL, "created idle watcher"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "idle"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); ok (flux_reactor_run (reactor, 0) == 0, "reactor ran successfully"); @@ -437,17 +478,17 @@ static void test_prepcheck (flux_reactor_t *reactor) ok (!flux_watcher_is_active (w), "flux_watcher_is_active() returns false"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); prep = flux_prepare_watcher_create (reactor, prepare_cb, NULL); ok (w != NULL, "created prepare watcher"); + generic_watcher_check (prep, "prep"); flux_watcher_start (prep); chk = flux_check_watcher_create (reactor, check_cb, NULL); ok (w != NULL, "created check watcher"); + generic_watcher_check (chk, "check"); flux_watcher_start (chk); ok (flux_reactor_run (reactor, 0) >= 0, @@ -495,11 +536,8 @@ static void test_signal (flux_reactor_t *reactor) w = flux_signal_watcher_create (reactor, SIGUSR1, sigusr1_cb, NULL); ok (w != NULL, "created signal watcher"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "signal"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); idle = flux_idle_watcher_create (reactor, sigidle_cb, NULL); ok (idle != NULL, @@ -549,14 +587,11 @@ static void test_child (flux_reactor_t *reactor) w = flux_child_watcher_create (r, child_pid, false, child_cb, NULL); ok (w != NULL, "created child watcher"); + generic_watcher_check (w, "signal"); ok (kill (child_pid, SIGHUP) == 0, "sent child SIGHUP"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); ok (flux_reactor_run (r, 0) == 0, "reactor ran successfully"); @@ -626,11 +661,8 @@ static void test_stat (flux_reactor_t *reactor) w = flux_stat_watcher_create (reactor, ctx.path, 0., stat_cb, &ctx); ok (w != NULL, "created stat watcher"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "stat"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); tw = flux_timer_watcher_create (reactor, 0.01, @@ -641,6 +673,12 @@ static void test_stat (flux_reactor_t *reactor) "created timer watcher"); flux_watcher_start (tw); + /* Make sure rstat accessor fails if passed the wrong watcher type. + */ + errno = 0; + ok (flux_stat_watcher_get_rstat (tw, NULL, NULL) < 0 && errno == EINVAL, + "flux_stat_watcher_get_rstat fails with EINVAL on wrong watcher type"); + ok (flux_reactor_run (reactor, 0) == 0, "reactor ran successfully"); @@ -654,7 +692,46 @@ static void test_stat (flux_reactor_t *reactor) free (ctx.path); } -static void active_idle_cb (flux_reactor_t *r, +static int handle_counter = 0; +static void handle_cb (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + handle_counter++; + flux_watcher_unref (w); +} + +void test_handle (flux_reactor_t *r) +{ + flux_t *h; + flux_watcher_t *w; + + if (!(h = flux_open ("loop://", 0))) + BAIL_OUT ("could not create loop handle"); + w = flux_handle_watcher_create (r, h, FLUX_POLLIN, handle_cb, NULL); + ok (w != NULL, + "flux_handle_watcher_create works"); + generic_watcher_check (w, "handle"); + flux_watcher_start (w); + + flux_msg_t *msg; + if (!(msg = flux_request_encode ("foo", "bar"))) + BAIL_OUT ("could not encode message"); + if (flux_send (h, msg, 0) < 0) + BAIL_OUT ("could not send message"); + flux_msg_destroy (msg); + + ok (flux_reactor_run (r, 0) == 0, + "flux_reactor_run ran"); + ok (handle_counter == 1, + "watcher ran once"); + + flux_watcher_destroy (w); + flux_close (h); +} + +static void unref_idle_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) @@ -666,41 +743,93 @@ static void active_idle_cb (flux_reactor_t *r, flux_reactor_stop_error (r); } +static void unref_idle2_cb (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + int *count = arg; + (*count)++; + + if (flux_watcher_is_referenced (w)) { + diag ("calling flux_watcher_unref on count=%d", *count); + flux_watcher_unref (w); // calls ev_unref() + } + else { + diag ("calling flux_watcher_ref on count=%d", *count); + flux_watcher_ref (w); // calls ev_ref() + } +} -static void test_active_ref (flux_reactor_t *r) +static void test_unref (flux_reactor_t *r) { flux_watcher_t *w; + flux_watcher_t *w2; int count; ok (flux_reactor_run (r, 0) == 0, "flux_reactor_run with no watchers returned immediately"); - if (!(w = flux_idle_watcher_create (r, active_idle_cb, &count))) + if (!(w = flux_idle_watcher_create (r, unref_idle_cb, &count))) + BAIL_OUT ("flux_idle_watcher_create failed"); + if (!(w2 = flux_idle_watcher_create (r, unref_idle2_cb, &count))) BAIL_OUT ("flux_idle_watcher_create failed"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); - flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); + /* Tests with unref_idle_cb() + * Show that ref/unref as expected with watcher inactive. + */ + flux_watcher_start (w); count = 0; ok (flux_reactor_run (r, 0) < 0 && count == 16, "flux_reactor_run with one watcher stopped after 16 iterations"); + flux_watcher_stop (w); + ok (flux_watcher_is_referenced (w), + "flux_watcher_is_referenced returns true after stop"); - flux_reactor_active_decref (r); - + flux_watcher_unref (w); + flux_watcher_start (w); // calls ev_unref() + ok (!flux_watcher_is_referenced (w), + "flux_watcher_is_referenced returns false after unref/start"); count = 0; ok (flux_reactor_run (r, 0) == 0 && count == 1, - "flux_reactor_run with one watcher+decref returned after 1 iteration"); - - flux_reactor_active_incref (r); + "flux_reactor_run with one unref watcher returned after 1 iteration"); + flux_watcher_stop (w); // calls ev_ref() + flux_watcher_ref (w); + flux_watcher_start (w); + ok (flux_watcher_is_referenced (w), + "flux_watcher_is_referenced returns true after ref/start"); count = 0; ok (flux_reactor_run (r, 0) < 0 && count == 16, - "flux_reactor_run with one watcher+incref stopped after 16 iterations"); + "flux_reactor_run with one ref watcher stopped after 16 iterations"); + flux_watcher_stop (w); + ok (flux_watcher_is_referenced (w), + "flux_watcher_is_referenced returns true after reactor run"); flux_watcher_destroy (w); + + /* Tests with unref_idle2_cb() + * Show that ref/unref works as expected from watcher callback + */ + + flux_watcher_start (w2); + + ok (flux_watcher_is_referenced (w2), + "flux_watcher_is_referenced returns true"); + count = 0; + ok (flux_reactor_run (r, 0) == 0 && count == 1, + "flux_reactor_run with one ref watcher returned after 1 iteration"); + ok (!flux_watcher_is_referenced (w2), + "flux_watcher_is_referenced returns false after reactor run"); + + count = 0; + ok (flux_reactor_run (r, 0) == 0 && count == 2, + "flux_reactor_run with one unref watcher returned after 2 iterations"); + ok (!flux_watcher_is_referenced (w2), + "flux_watcher_is_referenced returns false"); + + flux_watcher_destroy (w2); } static void reactor_destroy_early (void) @@ -826,7 +955,8 @@ int main (int argc, char *argv[]) test_signal (reactor); test_child (reactor); test_stat (reactor); - test_active_ref (reactor); + test_handle (reactor); + test_unref (reactor); test_reactor_flags (reactor); test_priority (reactor); diff --git a/src/common/libflux/watcher.c b/src/common/libflux/watcher.c index e6bc66c2454e..d24e3dd6a804 100644 --- a/src/common/libflux/watcher.c +++ b/src/common/libflux/watcher.c @@ -22,6 +22,7 @@ struct flux_watcher { flux_watcher_f fn; void *arg; struct flux_watcher_ops *ops; + bool unreferenced; void *data; }; @@ -31,6 +32,10 @@ flux_watcher_t *watcher_create (flux_reactor_t *r, flux_watcher_f fn, void *arg) { + if (!r || data_size == 0 || !ops) { + errno = EINVAL; + return NULL; + } struct flux_watcher *w = calloc (1, sizeof (*w) + data_size); if (!w) return NULL; @@ -99,6 +104,26 @@ void flux_watcher_stop (flux_watcher_t *w) } } +void flux_watcher_ref (flux_watcher_t *w) +{ + if (w && w->unreferenced) { + if (w->ops->ref) { + w->ops->ref (w); + w->unreferenced = false; + } + } +} + +void flux_watcher_unref (flux_watcher_t *w) +{ + if (w && !w->unreferenced) { + if (w->ops->unref) { + w->ops->unref(w); + w->unreferenced = true; + } + } +} + bool flux_watcher_is_active (flux_watcher_t *w) { if (w) { @@ -108,6 +133,13 @@ bool flux_watcher_is_active (flux_watcher_t *w) return false; } +bool flux_watcher_is_referenced (flux_watcher_t *w) +{ + if (w) + return !w->unreferenced; + return true; +} + void flux_watcher_destroy (flux_watcher_t *w) { if (w) { diff --git a/src/common/libflux/watcher.h b/src/common/libflux/watcher.h index 0ebe83842fe6..54959f75a96f 100644 --- a/src/common/libflux/watcher.h +++ b/src/common/libflux/watcher.h @@ -35,9 +35,12 @@ void flux_watcher_set_priority (flux_watcher_t *w, int priority); void flux_watcher_start (flux_watcher_t *w); void flux_watcher_stop (flux_watcher_t *w); +void flux_watcher_ref (flux_watcher_t *w); +void flux_watcher_unref (flux_watcher_t *w); void flux_watcher_destroy (flux_watcher_t *w); double flux_watcher_next_wakeup (flux_watcher_t *w); bool flux_watcher_is_active (flux_watcher_t *w); +bool flux_watcher_is_referenced (flux_watcher_t *w); /* flux_t handle */ @@ -133,9 +136,9 @@ flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, double interval, flux_watcher_f cb, void *arg); -void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev); +int flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev); #ifdef __cplusplus } diff --git a/src/common/libflux/watcher_private.h b/src/common/libflux/watcher_private.h index fae4c229bfda..2deb1895c1f6 100644 --- a/src/common/libflux/watcher_private.h +++ b/src/common/libflux/watcher_private.h @@ -19,6 +19,8 @@ struct flux_watcher_ops { void (*set_priority) (flux_watcher_t *w, int priority); void (*start) (flux_watcher_t *w); void (*stop) (flux_watcher_t *w); + void (*ref) (flux_watcher_t *w); + void (*unref) (flux_watcher_t *w); void (*destroy) (flux_watcher_t *w); bool (*is_active) (flux_watcher_t *w); }; diff --git a/src/common/libflux/watcher_wrap.c b/src/common/libflux/watcher_wrap.c index 9094038ecc97..7d38680e1dfb 100644 --- a/src/common/libflux/watcher_wrap.c +++ b/src/common/libflux/watcher_wrap.c @@ -13,7 +13,6 @@ #if HAVE_CONFIG_H #include "config.h" #endif -#include #include #include @@ -54,6 +53,51 @@ static struct ev_loop *watcher_get_ev (flux_watcher_t *w) return reactor_get_loop (watcher_get_reactor (w)); } +/* generic ops->ref() callback for libev watchers + * If the watcher is "refed" while active then we need to fudge + * the active refcount now, because the w->unreferenced flag will be cleared + * by the time ops->stop() is called. + * N.B. flux_watcher_ref() only calls this if w->unreferenced is set. + */ +static void watcher_ref_ev (flux_watcher_t *w) +{ + if (flux_watcher_is_active (w)) + ev_ref (watcher_get_ev (w)); +} + +/* generic ops->unref() callback for libev watchers + * If the watcher is "unrefed" while active then we need to fudge + * the active refcount now, since ops->start() already occurred. + * N.B. flux_watcher_unref() only calls this if w->unreferenced is clear. + */ +static void watcher_unref_ev (flux_watcher_t *w) +{ + if (flux_watcher_is_active (w)) + ev_unref (watcher_get_ev (w)); +} + +/* helper for ops->start() + * Call after ev_TYPE_start() to fudge libev reactor active refcount. + */ +static void watcher_start_post_ev (flux_watcher_t *w, bool was_active) +{ + if (!flux_watcher_is_referenced (w)) { + if (!was_active) + ev_unref (watcher_get_ev (w)); + } +} + +/* helper for ops->stop() + * Call before ev_TYPE_stop() to fudge libev reactor active refcount. + */ +static void watcher_stop_pre_ev (flux_watcher_t *w) +{ + if (!flux_watcher_is_referenced (w)) { + if (flux_watcher_is_active (w)) + ev_ref (watcher_get_ev (w)); + } +} + static void safe_stop_cb (struct ev_loop *loop, ev_prepare *pw, int revents) { flux_watcher_stop ((flux_watcher_t *)pw->data); @@ -77,42 +121,50 @@ static void watcher_stop_safe (flux_watcher_t *w) } } -/* This is_active() callback works for "native" libev watchers, where - * watcher data points to a struct ev_TYPE. - */ -static bool wrap_ev_active (flux_watcher_t *w) -{ - return ev_is_active (watcher_get_data (w)); -} - /* file descriptors */ -static void fd_start (flux_watcher_t *w) +struct fd_watcher { + ev_io evw; +}; + +static void fd_watcher_start (flux_watcher_t *w) { - ev_io *iow = watcher_get_data (w); + struct fd_watcher *fdw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); - ev_io_start (loop, iow); + bool active = ev_is_active (&fdw->evw); + ev_io_start (loop, &fdw->evw); + watcher_start_post_ev (w, active); } -static void fd_stop (flux_watcher_t *w) +static void fd_watcher_stop (flux_watcher_t *w) { - ev_io *iow = watcher_get_data (w); + struct fd_watcher *fdw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); - ev_io_stop (loop, iow); + watcher_stop_pre_ev (w); + ev_io_stop (loop, &fdw->evw); +} + +static bool fd_watcher_is_active (flux_watcher_t *w) +{ + struct fd_watcher *fdw = watcher_get_data (w); + return ev_is_active (&fdw->evw); } -static void fd_cb (struct ev_loop *loop, ev_io *iow, int revents) +static void fd_watcher_cb (struct ev_loop *loop, ev_io *iow, int revents) { struct flux_watcher *w = iow->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops fd_watcher = { - .start = fd_start, - .stop = fd_stop, + +static struct flux_watcher_ops fd_watcher_ops = { + .start = fd_watcher_start, + .stop = fd_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, + .is_active = fd_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, @@ -121,53 +173,81 @@ flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_io *iow; + struct fd_watcher *fdw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*iow), &fd_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*fdw), &fd_watcher_ops, cb, arg))) return NULL; - iow = watcher_get_data (w); - ev_io_init (iow, fd_cb, fd, events_to_libev (events) & ~EV_ERROR); - iow->data = w; + fdw = watcher_get_data (w); + ev_io_init (&fdw->evw, + fd_watcher_cb, + fd, + events_to_libev (events) & ~EV_ERROR); + fdw->evw.data = w; return w; } int flux_fd_watcher_get_fd (flux_watcher_t *w) { - assert (watcher_get_ops (w) == &fd_watcher); - ev_io *iow = watcher_get_data (w); - return iow->fd; + if (watcher_get_ops (w) == &fd_watcher_ops) { + struct fd_watcher *fdw = watcher_get_data (w); + return fdw->evw.fd; + } + errno = EINVAL; + return -1; } /* Timer */ -static void timer_start (flux_watcher_t *w) +struct timer_watcher { + struct ev_timer evw; + double repeat; +}; + +static void timer_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - struct ev_timer *tw = watcher_get_data (w); - ev_timer_start (loop, tw); + struct timer_watcher *tmw = watcher_get_data (w); + bool active = ev_is_active (&tmw->evw); + ev_timer_start (loop, &tmw->evw); + watcher_start_post_ev (w, active); } -static void timer_stop (flux_watcher_t *w) +static void timer_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - struct ev_timer *tw = watcher_get_data (w); - ev_timer_stop (loop, tw); + struct timer_watcher *tmw = watcher_get_data (w); + watcher_stop_pre_ev (w); + ev_timer_stop (loop, &tmw->evw); } -static void timer_cb (struct ev_loop *loop, ev_timer *tw, int revents) +static bool timer_watcher_is_active (flux_watcher_t *w) { - struct flux_watcher *w = tw->data; + struct timer_watcher *tmw = watcher_get_data (w); + return ev_is_active (&tmw->evw); +} + +static void timer_watcher_cb (struct ev_loop *loop, ev_timer *evw, int revents) +{ + struct flux_watcher *w = evw->data; + struct timer_watcher *tmw = watcher_get_data (w); + + // if repeat is zero, timer automatically stops + if (tmw->repeat == 0. && !flux_watcher_is_referenced (w)) + ev_ref (loop); + watcher_call_ev (w, revents); } -static struct flux_watcher_ops timer_watcher = { - .start = timer_start, - .stop = timer_stop, +static struct flux_watcher_ops timer_watcher_ops = { + .start = timer_watcher_start, + .stop = timer_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, + .is_active = timer_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, @@ -176,79 +256,104 @@ flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_timer *tw; + struct timer_watcher *tmw; flux_watcher_t *w; if (after < 0 || repeat < 0) { errno = EINVAL; return NULL; } - if (!(w = watcher_create (r, sizeof (*tw), &timer_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*tmw), &timer_watcher_ops, cb, arg))) return NULL; - tw = watcher_get_data (w); - ev_timer_init (tw, timer_cb, after, repeat); - tw->data = w; + tmw = watcher_get_data (w); + tmw->repeat = repeat; + ev_timer_init (&tmw->evw, timer_watcher_cb, after, repeat); + tmw->evw.data = w; return w; } void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) { - assert (watcher_get_ops (w) == &timer_watcher); - ev_timer *tw = watcher_get_data (w); - ev_timer_set (tw, after, repeat); + if (watcher_get_ops (w) == &timer_watcher_ops) { + struct timer_watcher *tmw = watcher_get_data (w); + tmw->repeat = repeat; + ev_timer_set (&tmw->evw, after, repeat); + } } void flux_timer_watcher_again (flux_watcher_t *w) { - assert (watcher_get_ops (w) == &timer_watcher); - struct ev_loop *loop = watcher_get_ev (w); - ev_timer *tw = watcher_get_data (w); - ev_timer_again (loop, tw); + if (watcher_get_ops (w) == &timer_watcher_ops) { + struct timer_watcher *tmw = watcher_get_data (w); + struct ev_loop *loop = watcher_get_ev (w); + bool active = ev_is_active (&tmw->evw); + + // if repeat is zero, ev_timer_again() automatically stops it + if (tmw->repeat == 0.) + watcher_stop_pre_ev (w); + ev_timer_again (loop, &tmw->evw); + // if repeat is set, ev_timer_again() automatically starts it + if (tmw->repeat > 0.) + watcher_start_post_ev (w, active); + } } /* Periodic */ -struct f_periodic { + +struct periodic_watcher { struct flux_watcher *w; - ev_periodic evp; - flux_reschedule_f reschedule_cb; + ev_periodic evw; + flux_reschedule_f reschedule_cb; + double interval; }; -static void periodic_start (flux_watcher_t *w) +static void periodic_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - struct f_periodic *fp = watcher_get_data (w); - ev_periodic_start (loop, &fp->evp); + struct periodic_watcher *pdw = watcher_get_data (w); + bool active = ev_is_active (&pdw->evw); + ev_periodic_start (loop, &pdw->evw); + watcher_start_post_ev (w, active); } -static void periodic_stop (flux_watcher_t *w) +static void periodic_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - struct f_periodic *fp = watcher_get_data (w); - ev_periodic_stop (loop, &fp->evp); + struct periodic_watcher *pdw = watcher_get_data (w); + watcher_stop_pre_ev (w); + ev_periodic_stop (loop, &pdw->evw); } -static bool periodic_is_active (flux_watcher_t *w) +static bool periodic_watcher_is_active (flux_watcher_t *w) { - struct f_periodic *fp = watcher_get_data (w); - return ev_is_active (&fp->evp); + struct periodic_watcher *pdw = watcher_get_data (w); + return ev_is_active (&pdw->evw); } -static void periodic_cb (struct ev_loop *loop, ev_periodic *pw, int revents) +static void periodic_watcher_cb (struct ev_loop *loop, + ev_periodic *pw, + int revents) { - struct f_periodic *fp = pw->data; - struct flux_watcher *w = fp->w; + struct periodic_watcher *pdw = pw->data; + struct flux_watcher *w = pdw->w; + + // if interval is zero, periodic automatically stops + if (pdw->interval == 0. && !flux_watcher_is_referenced (w)) + ev_ref (loop); + watcher_call_ev (w, revents); } -static ev_tstamp periodic_reschedule_cb (ev_periodic *pw, ev_tstamp now) +static ev_tstamp periodic_watcher_reschedule_cb (ev_periodic *pw, + ev_tstamp now) { + struct periodic_watcher *pdw = pw->data; + struct flux_watcher *w = pdw->w; ev_tstamp rc; - struct f_periodic *fp = pw->data; - assert (fp->reschedule_cb != NULL); - rc = (ev_tstamp)fp->reschedule_cb (fp->w, - (double)now, - watcher_get_arg (fp->w)); + if (pdw->reschedule_cb == NULL) + return 0; + rc = (ev_tstamp)pdw->reschedule_cb (w, (double)now, watcher_get_arg (w)); if (rc < now) { /* User reschedule cb returned time in the past. The watcher will * be stopped, but not here (changing loop is not allowed in a @@ -256,17 +361,20 @@ static ev_tstamp periodic_reschedule_cb (ev_periodic *pw, ev_tstamp now) * a prepare callback. * Return time far in the future to ensure we aren't called again. */ - watcher_stop_safe (fp->w); + watcher_stop_pre_ev (w); + watcher_stop_safe (w); return (now + 1e99); } return rc; } -static struct flux_watcher_ops periodic_watcher = { - .start = periodic_start, - .stop = periodic_stop, +static struct flux_watcher_ops periodic_watcher_ops = { + .start = periodic_watcher_start, + .stop = periodic_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, + .is_active = periodic_watcher_is_active, .destroy = NULL, - .is_active = periodic_is_active, }; flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, @@ -277,24 +385,25 @@ flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, void *arg) { flux_watcher_t *w; - struct f_periodic *fp; - size_t size = sizeof (*fp); + struct periodic_watcher *pdw; + size_t size = sizeof (*pdw); if (offset < 0 || interval < 0) { errno = EINVAL; return NULL; } - if (!(w = watcher_create (r, size, &periodic_watcher, cb, arg))) + if (!(w = watcher_create (r, size, &periodic_watcher_ops, cb, arg))) return NULL; - fp = watcher_get_data (w); - fp->evp.data = fp; - fp->w = w; - fp->reschedule_cb = reschedule_cb; - - ev_periodic_init (&fp->evp, - periodic_cb, + pdw = watcher_get_data (w); + pdw->interval = interval; + pdw->evw.data = pdw; + pdw->w = w; + pdw->reschedule_cb = reschedule_cb; + + ev_periodic_init (&pdw->evw, + periodic_watcher_cb, offset, interval, - reschedule_cb ? periodic_reschedule_cb : NULL); + reschedule_cb ? periodic_watcher_reschedule_cb : NULL); return w; } @@ -305,23 +414,34 @@ void flux_periodic_watcher_reset (flux_watcher_t *w, flux_reschedule_f reschedule_cb) { struct ev_loop *loop = watcher_get_ev (w); - struct f_periodic *fp = watcher_get_data (w); - assert (watcher_get_ops (w) == &periodic_watcher); - fp->reschedule_cb = reschedule_cb; - ev_periodic_set (&fp->evp, - next, - interval, - reschedule_cb ? periodic_reschedule_cb : NULL); - ev_periodic_again (loop, &fp->evp); + struct periodic_watcher *pdw = watcher_get_data (w); + if (watcher_get_ops (w) == &periodic_watcher_ops) { + pdw->interval = interval; + pdw->reschedule_cb = reschedule_cb; + ev_periodic_set (&pdw->evw, + next, + interval, + reschedule_cb ? periodic_watcher_reschedule_cb : NULL); + + bool active = ev_is_active (&pdw->evw); + + // if interval is zero, ev_periodic_again() automatically stops it + if (pdw->interval == 0.) + watcher_stop_pre_ev (w); + ev_periodic_again (loop, &pdw->evw); + // if interval is set, ev_periodic_again() automatically starts it + if (pdw->interval > 0.) + watcher_start_post_ev (w, active); + } } double flux_watcher_next_wakeup (flux_watcher_t *w) { - if (watcher_get_ops (w) == &periodic_watcher) { - struct f_periodic *fp = watcher_get_data (w); - return ((double) ev_periodic_at (&fp->evp)); + if (watcher_get_ops (w) == &periodic_watcher_ops) { + struct periodic_watcher *pdw = watcher_get_data (w); + return ((double) ev_periodic_at (&pdw->evw)); } - else if (watcher_get_ops (w) == &timer_watcher) { + else if (watcher_get_ops (w) == &timer_watcher_ops) { ev_timer *tw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); return ((double) (ev_now (loop) + ev_timer_remaining (loop, tw))); @@ -332,45 +452,63 @@ double flux_watcher_next_wakeup (flux_watcher_t *w) /* Prepare */ -static void prepare_start (flux_watcher_t *w) + +struct prepare_watcher { + ev_prepare evw; +}; + +static void prepare_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_prepare *pw = watcher_get_data (w); - ev_prepare_start (loop, pw); + struct prepare_watcher *pw = watcher_get_data (w); + bool active = ev_is_active (&pw->evw); + ev_prepare_start (loop, &pw->evw); + watcher_start_post_ev (w, active); } -static void prepare_stop (flux_watcher_t *w) +static void prepare_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_prepare *pw = watcher_get_data (w); - ev_prepare_stop (loop, pw); + struct prepare_watcher *pw = watcher_get_data (w); + watcher_stop_pre_ev (w); + ev_prepare_stop (loop, &pw->evw); +} + +static bool prepare_watcher_is_active (flux_watcher_t *w) +{ + struct prepare_watcher *pw = watcher_get_data (w); + return ev_is_active (&pw->evw); } -static void prepare_cb (struct ev_loop *loop, ev_prepare *pw, int revents) +static void prepare_watcher_cb (struct ev_loop *loop, + ev_prepare *evw, + int revents) { - struct flux_watcher *w = pw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops prepare_watcher = { - .start = prepare_start, - .stop = prepare_stop, +static struct flux_watcher_ops prepare_watcher_ops = { + .start = prepare_watcher_start, + .stop = prepare_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, + .is_active = prepare_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_prepare *pw; + struct prepare_watcher *pw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher_ops, cb, arg))) return NULL; pw = watcher_get_data (w); - ev_prepare_init (pw, prepare_cb); - pw->data = w; + ev_prepare_init (&pw->evw, prepare_watcher_cb); + pw->evw.data = w; return w; } @@ -378,52 +516,67 @@ flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, /* Check */ -static void check_set_priority (flux_watcher_t *w, int priority) +struct check_watcher { + ev_check evw; +}; + +static void check_watcher_set_priority (flux_watcher_t *w, int priority) { - ev_check *cw = watcher_get_data (w); - ev_set_priority (cw, priority); + struct check_watcher *cw = watcher_get_data (w); + ev_set_priority (&cw->evw, priority); } -static void check_start (flux_watcher_t *w) +static void check_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_check *cw = watcher_get_data (w); - ev_check_start (loop, cw); + struct check_watcher *cw = watcher_get_data (w); + bool active = ev_is_active (&cw->evw); + ev_check_start (loop, &cw->evw); + watcher_start_post_ev (w, active); } -static void check_stop (flux_watcher_t *w) +static void check_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_check *cw = watcher_get_data (w); - ev_check_stop (loop, cw); + struct check_watcher *cw = watcher_get_data (w); + watcher_stop_pre_ev (w); + ev_check_stop (loop, &cw->evw); +} + +static bool check_watcher_is_active (flux_watcher_t *w) +{ + struct check_watcher *cw = watcher_get_data (w); + return ev_is_active (&cw->evw); } -static void check_cb (struct ev_loop *loop, ev_check *cw, int revents) +static void check_watcher_cb (struct ev_loop *loop, ev_check *evw, int revents) { - struct flux_watcher *w = cw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops check_watcher = { - .set_priority = check_set_priority, - .start = check_start, - .stop = check_stop, +static struct flux_watcher_ops check_watcher_ops = { + .set_priority = check_watcher_set_priority, + .start = check_watcher_start, + .stop = check_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, + .is_active = check_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_check *cw; + struct check_watcher *cw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*cw), &check_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*cw), &check_watcher_ops, cb, arg))) return NULL; cw = watcher_get_data (w); - ev_check_init (cw, check_cb); - cw->data = w; + ev_check_init (&cw->evw, check_watcher_cb); + cw->evw.data = w; return w; } @@ -431,45 +584,60 @@ flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, /* Idle */ -static void idle_start (flux_watcher_t *w) +struct idle_watcher { + ev_idle evw; +}; + +static void idle_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_idle *iw = watcher_get_data (w); - ev_idle_start (loop, iw); + struct idle_watcher *iw = watcher_get_data (w); + bool active = ev_is_active (&iw->evw); + ev_idle_start (loop, &iw->evw); + watcher_start_post_ev (w, active); } -static void idle_stop (flux_watcher_t *w) +static void idle_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_idle *iw = watcher_get_data (w); - ev_idle_stop (loop, iw); + struct idle_watcher *iw = watcher_get_data (w); + watcher_stop_pre_ev (w); + ev_idle_stop (loop, &iw->evw); +} + +static bool idle_watcher_is_active (flux_watcher_t *w) +{ + struct idle_watcher *iw = watcher_get_data (w); + return ev_is_active (&iw->evw); } -static void idle_cb (struct ev_loop *loop, ev_idle *iw, int revents) +static void idle_watcher_cb (struct ev_loop *loop, ev_idle *evw, int revents) { - struct flux_watcher *w = iw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops idle_watcher = { - .start = idle_start, - .stop = idle_stop, +static struct flux_watcher_ops idle_watcher_ops = { + .start = idle_watcher_start, + .stop = idle_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, + .is_active = idle_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_idle *iw; + struct idle_watcher *iw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher_ops, cb, arg))) return NULL; iw = watcher_get_data (w); - ev_idle_init (iw, idle_cb); - iw->data = w; + ev_idle_init (&iw->evw, idle_watcher_cb); + iw->evw.data = w; return w; } @@ -477,34 +645,48 @@ flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, /* Child */ -static void child_start (flux_watcher_t *w) +struct child_watcher { + ev_child evw; +}; + +static void child_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_child *cw = watcher_get_data (w); - ev_child_start (loop, cw); + struct child_watcher *cw = watcher_get_data (w); + bool active = ev_is_active (&cw->evw); + ev_child_start (loop, &cw->evw); + watcher_start_post_ev (w, active); } -static void child_stop (flux_watcher_t *w) +static void child_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_child *cw = watcher_get_data (w); - ev_child_stop (loop, cw); + struct child_watcher *cw = watcher_get_data (w); + watcher_stop_pre_ev (w); + ev_child_stop (loop, &cw->evw); +} + +static bool child_watcher_is_active (flux_watcher_t *w) +{ + struct child_watcher *cw = watcher_get_data (w); + return ev_is_active (&cw->evw); } -static void child_cb (struct ev_loop *loop, ev_child *cw, int revents) +static void child_watcher_cb (struct ev_loop *loop, ev_child *evw, int revents) { - struct flux_watcher *w = cw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops child_watcher = { - .start = child_start, - .stop = child_stop, +static struct flux_watcher_ops child_watcher_ops = { + .start = child_watcher_start, + .stop = child_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, + .is_active = child_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; - flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, int pid, bool trace, @@ -512,69 +694,86 @@ flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, void *arg) { flux_watcher_t *w; - ev_child *cw; + struct child_watcher *cw; if (!ev_is_default_loop (reactor_get_loop (r))) { errno = EINVAL; return NULL; } - if (!(w = watcher_create (r, sizeof (*cw), &child_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*cw), &child_watcher_ops, cb, arg))) return NULL; cw = watcher_get_data (w); - ev_child_init (cw, child_cb, pid, trace ? 1 : 0); - cw->data = w; + ev_child_init (&cw->evw, child_watcher_cb, pid, trace ? 1 : 0); + cw->evw.data = w; return w; } int flux_child_watcher_get_rpid (flux_watcher_t *w) { - if (watcher_get_ops (w) != &child_watcher) { + if (watcher_get_ops (w) != &child_watcher_ops) { errno = EINVAL; return -1; } - ev_child *cw = watcher_get_data (w); - return cw->rpid; + struct child_watcher *cw = watcher_get_data (w); + return cw->evw.rpid; } int flux_child_watcher_get_rstatus (flux_watcher_t *w) { - if (watcher_get_ops (w) != &child_watcher) { + if (watcher_get_ops (w) != &child_watcher_ops) { errno = EINVAL; return -1; } - ev_child *cw = watcher_get_data (w); - return cw->rstatus; + struct child_watcher *cw = watcher_get_data (w); + return cw->evw.rstatus; } /* Signal */ -static void signal_start (flux_watcher_t *w) +struct signal_watcher { + ev_signal evw; +}; + +static void signal_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_signal *sw = watcher_get_data (w); - ev_signal_start (loop, sw); + struct signal_watcher *sw = watcher_get_data (w); + bool active = ev_is_active (&sw->evw); + ev_signal_start (loop, &sw->evw); + watcher_start_post_ev (w, active); } -static void signal_stop (flux_watcher_t *w) +static void signal_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_signal *sw = watcher_get_data (w); - ev_signal_stop (loop, sw); + struct signal_watcher *sw = watcher_get_data (w); + watcher_stop_pre_ev (w); + ev_signal_stop (loop, &sw->evw); } -static void signal_cb (struct ev_loop *loop, ev_signal *sw, int revents) +static bool signal_watcher_is_active (flux_watcher_t *w) { - struct flux_watcher *w = sw->data; + struct signal_watcher *sw = watcher_get_data (w); + return ev_is_active (&sw->evw); +} + +static void signal_watcher_cb (struct ev_loop *loop, + ev_signal *evw, + int revents) +{ + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops signal_watcher = { - .start = signal_start, - .stop = signal_stop, +static struct flux_watcher_ops signal_watcher_ops = { + .start = signal_watcher_start, + .stop = signal_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, + .is_active = signal_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, @@ -583,55 +782,70 @@ flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, void *arg) { flux_watcher_t *w; - ev_signal *sw; + struct signal_watcher *sw; - if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher_ops, cb, arg))) return NULL; sw = watcher_get_data (w); - ev_signal_init (sw, signal_cb, signum); - sw->data = w; + ev_signal_init (&sw->evw, signal_watcher_cb, signum); + sw->evw.data = w; return w; } int flux_signal_watcher_get_signum (flux_watcher_t *w) { - if (watcher_get_ops (w) != &signal_watcher) { + if (watcher_get_ops (w) != &signal_watcher_ops) { errno = EINVAL; return (-1); } - ev_signal *sw = watcher_get_data (w); - return sw->signum; + struct signal_watcher *sw = watcher_get_data (w); + return sw->evw.signum; } /* Stat */ -static void stat_start (flux_watcher_t *w) +struct stat_watcher { + ev_stat evw; +}; + +static void stat_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_stat *sw = watcher_get_data (w); - ev_stat_start (loop, sw); + struct stat_watcher *sw = watcher_get_data (w); + bool active = ev_is_active (&sw->evw); + ev_stat_start (loop, &sw->evw); + watcher_start_post_ev (w, active); } -static void stat_stop (flux_watcher_t *w) +static void stat_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_stat *sw = watcher_get_data (w); - ev_stat_stop (loop, sw); + struct stat_watcher *sw = watcher_get_data (w); + watcher_stop_pre_ev (w); + ev_stat_stop (loop, &sw->evw); } -static void stat_cb (struct ev_loop *loop, ev_stat *sw, int revents) +static bool stat_watcher_is_active (flux_watcher_t *w) { - struct flux_watcher *w = sw->data; + struct stat_watcher *sw = watcher_get_data (w); + return ev_is_active (&sw->evw); +} + +static void stat_watcher_cb (struct ev_loop *loop, ev_stat *evw, int revents) +{ + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops stat_watcher = { - .start = stat_start, - .stop = stat_stop, +static struct flux_watcher_ops stat_watcher_ops = { + .start = stat_watcher_start, + .stop = stat_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, + .is_active = stat_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, @@ -641,27 +855,31 @@ flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, void *arg) { flux_watcher_t *w; - ev_stat *sw; + struct stat_watcher *sw; - if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher_ops, cb, arg))) return NULL; sw = watcher_get_data (w); - ev_stat_init (sw, stat_cb, path, interval); - sw->data = w; + ev_stat_init (&sw->evw, stat_watcher_cb, path, interval); + sw->evw.data = w; return w; } -void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev) +int flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev) { - ev_stat *sw = watcher_get_data (w); - assert (watcher_get_ops (w) == &stat_watcher); - if (stat) - *stat = sw->attr; - if (prev) - *prev = sw->prev; + struct stat_watcher *sw = watcher_get_data (w); + if (watcher_get_ops (w) == &stat_watcher_ops) { + if (stat) + *stat = sw->evw.attr; + if (prev) + *prev = sw->evw.prev; + return 0; + } + errno = EINVAL; + return -1; } /* diff --git a/src/common/librouter/usock.c b/src/common/librouter/usock.c index 3ff1857c9923..5749baca8bb8 100644 --- a/src/common/librouter/usock.c +++ b/src/common/librouter/usock.c @@ -601,6 +601,11 @@ struct usock_server *usock_server_create (flux_reactor_t *r, return NULL; } +flux_watcher_t *usock_server_listen_watcher (struct usock_server *server) +{ + return server ? server->w : NULL; +} + static bool is_poll_error (int revents) { if ((revents & POLLERR) || (revents & POLLHUP) || (revents & POLLNVAL)) diff --git a/src/common/librouter/usock.h b/src/common/librouter/usock.h index 5fc0a7271e4d..4f26448ad5c7 100644 --- a/src/common/librouter/usock.h +++ b/src/common/librouter/usock.h @@ -59,6 +59,9 @@ void usock_server_set_acceptor (struct usock_server *server, usock_acceptor_f cb, void *arg); +// accessor for start/stop/ref/unref +flux_watcher_t *usock_server_listen_watcher (struct usock_server *server); + /* Server connection for one client */ diff --git a/src/common/librouter/usock_service.c b/src/common/librouter/usock_service.c index bc8d8a3e2c55..4d91d12f7fd6 100644 --- a/src/common/librouter/usock_service.c +++ b/src/common/librouter/usock_service.c @@ -20,6 +20,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libutil/log.h" #include "src/common/libutil/errno_safe.h" +#include "ccan/str/str.h" #include "usock.h" #include "usock_service.h" @@ -134,6 +135,27 @@ static void service_acceptor (struct usock_conn *uconn, void *arg) usock_conn_destroy (uconn); } +/* flux_handle_ops getopt signature */ +static int service_getopt (void *impl, + const char *option, + void *val, + size_t size) +{ + struct service *ss = impl; + if (streq (option, "flux::listen_watcher")) { + flux_watcher_t *w = usock_server_listen_watcher (ss->usock_srv); + if (size != sizeof (w)) + goto error; + memcpy (val, &w, size); + } + else + goto error; + return 0; +error: + errno = EINVAL; + return -1; +} + /* flux_handle_ops send signature */ static int service_handle_send (void *impl, const flux_msg_t *msg, int flags) { @@ -226,9 +248,19 @@ flux_t *usock_service_create (flux_reactor_t *r, return ss->h; } +flux_watcher_t *usock_service_listen_watcher (flux_t *h) +{ + flux_watcher_t *w; + + if (flux_opt_get (h, "flux::listen_watcher", &w, sizeof (w)) < 0) + return NULL; + return w; +} + static const struct flux_handle_ops service_handle_ops = { .send = service_handle_send, .impl_destroy = service_destroy, + .getopt = service_getopt, }; /* diff --git a/src/common/librouter/usock_service.h b/src/common/librouter/usock_service.h index 375992013458..5889c71db2ae 100644 --- a/src/common/librouter/usock_service.h +++ b/src/common/librouter/usock_service.h @@ -34,6 +34,9 @@ flux_t *usock_service_create (flux_reactor_t *r, const char *sockpath, bool verbose); +// accessor for star/stop/ref/unref +flux_watcher_t *usock_service_listen_watcher (flux_t *h); + #endif // _ROUTER_USOCK_SERVICE_H /* diff --git a/src/common/libzmqutil/test/zwatcher.c b/src/common/libzmqutil/test/zwatcher.c index 3de987b13c8d..b68b5d47f6b9 100644 --- a/src/common/libzmqutil/test/zwatcher.c +++ b/src/common/libzmqutil/test/zwatcher.c @@ -25,6 +25,25 @@ static const size_t zmqwriter_msgcount = 1024; static void *zctx; +void watcher_is (flux_watcher_t *w, + bool exp_active, + bool exp_referenced, + const char *what) +{ + bool is_active = flux_watcher_is_active (w); + bool is_referenced = flux_watcher_is_referenced (w); + + ok (is_active == exp_active && is_referenced == exp_referenced, + "%sact%sref after %s", + exp_active ? "+" : "-", + exp_referenced ? "+" : "-", + what); + if (is_active != exp_active) + diag ("unexpectedly %sact", is_active ? "+" : "-"); + if (is_referenced != exp_referenced) + diag ("unexpectedly %sref", is_referenced ? "+" : "-"); +} + static void zmqwriter (flux_reactor_t *r, flux_watcher_t *w, int revents, @@ -87,6 +106,7 @@ static void test_zmq (flux_reactor_t *reactor) void *zs[2]; flux_watcher_t *r, *w; const char *uri = "inproc://test_zmq"; + flux_watcher_t *tmp; zs[0] = zmq_socket (zctx, ZMQ_PAIR); zs[1] = zmq_socket (zctx, ZMQ_PAIR); @@ -94,21 +114,46 @@ static void test_zmq (flux_reactor_t *reactor) && zmq_bind (zs[0], uri) == 0 && zmq_connect (zs[1], uri) == 0, "zmq: connected ZMQ_PAIR sockets over inproc"); + + errno = 0; + ok (zmqutil_watcher_create (NULL, zs[0], FLUX_POLLIN, NULL, NULL) == NULL + && errno == EINVAL, + "zmqutil_watcher_create r=NULL fails with EINVAL"); + + tmp = flux_idle_watcher_create (reactor, NULL, NULL); + if (!tmp) + BAIL_OUT ("could not create idle watcher"); + errno = 0; + ok (zmqutil_watcher_get_zsock (tmp) == NULL && errno == EINVAL, + "zmqutil_watcher_get_zsock w=idle fails with EINVAL"); + flux_watcher_destroy (tmp); + r = zmqutil_watcher_create (reactor, zs[0], FLUX_POLLIN, zmqreader, NULL); w = zmqutil_watcher_create (reactor, zs[1], FLUX_POLLOUT, zmqwriter, NULL); ok (r != NULL && w != NULL, "zmq: nonblocking reader and writer created"); - ok (flux_watcher_is_active (r) == false, - "flux_watcher_is_active() returns false on create"); + + flux_watcher_start (w); + watcher_is (w, true, true, "start"); + flux_watcher_unref (w); + watcher_is (w, true, false, "unref"); + flux_watcher_ref (w); + watcher_is (w, true, true, "ref"); + flux_watcher_stop (w); + watcher_is (w, false, true, "stop"); + flux_watcher_start (r); - ok (flux_watcher_is_active (r) == true, - "flux_watcher_is_active() returns true after start"); flux_watcher_start (w); ok (flux_reactor_run (reactor, 0) == 0, "zmq: reactor ran to completion after %d messages", zmqwriter_msgcount); flux_watcher_stop (r); ok (flux_watcher_is_active (r) == false, "flux_watcher_is_active() returns false after stop"); + ok (flux_watcher_is_referenced (w), + "flux_watcher_is_referenced() returns true"); + flux_watcher_unref (w); + ok (!flux_watcher_is_referenced (w), + "flux_watcher_is_referenced() returns false after unref"); flux_watcher_stop (w); flux_watcher_destroy (r); flux_watcher_destroy (w); diff --git a/src/common/libzmqutil/zwatcher.c b/src/common/libzmqutil/zwatcher.c index c3dd62cdb481..3da00cbb9b0e 100644 --- a/src/common/libzmqutil/zwatcher.c +++ b/src/common/libzmqutil/zwatcher.c @@ -110,6 +110,26 @@ static void zwatcher_stop (flux_watcher_t *w) flux_watcher_stop (zw->idle_w); } +static void zwatcher_ref (flux_watcher_t *w) +{ + struct zwatcher *zw = watcher_get_data (w); + + flux_watcher_ref (zw->fd_w); + flux_watcher_ref (zw->prepare_w); + flux_watcher_ref (zw->idle_w); + flux_watcher_ref (zw->check_w); +} + +static void zwatcher_unref (flux_watcher_t *w) +{ + struct zwatcher *zw = watcher_get_data (w); + + flux_watcher_unref (zw->fd_w); + flux_watcher_unref (zw->prepare_w); + flux_watcher_unref (zw->idle_w); + flux_watcher_unref (zw->check_w); +} + static bool zwatcher_is_active (flux_watcher_t *w) { struct zwatcher *zw = watcher_get_data (w); @@ -186,6 +206,8 @@ static void fd_cb (flux_reactor_t *r, static struct flux_watcher_ops zwatcher_ops = { .start = zwatcher_start, .stop = zwatcher_stop, + .ref = zwatcher_ref, + .unref = zwatcher_unref, .destroy = zwatcher_destroy, .is_active = zwatcher_is_active, };