From 89e583b286ce46ca2a3b9ed54737a539a8727866 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 23 Dec 2024 07:41:22 -0800 Subject: [PATCH 01/18] reactor: clean up watcher internal names/structure Problem: libev watcher wrapper code is hard to read due to poor internal variable/function name choices and non-uniform structure. Give implementations a unique internal name of the form: "NAME_watcher". Give ops callbacks a "NAME_watcher_" prefix. Name the ops struct "NAME_watcher_ops". Always wrap the native ev_watcher in "struct NAME_watcher". This commit is strictly cleanup and changes no watcher behavior. --- src/common/libflux/watcher_wrap.c | 503 +++++++++++++++++------------- 1 file changed, 293 insertions(+), 210 deletions(-) diff --git a/src/common/libflux/watcher_wrap.c b/src/common/libflux/watcher_wrap.c index 9094038ecc97..5baab220df77 100644 --- a/src/common/libflux/watcher_wrap.c +++ b/src/common/libflux/watcher_wrap.c @@ -77,42 +77,45 @@ static void watcher_stop_safe (flux_watcher_t *w) } } -/* This is_active() callback works for "native" libev watchers, where - * watcher data points to a struct ev_TYPE. - */ -static bool wrap_ev_active (flux_watcher_t *w) -{ - return ev_is_active (watcher_get_data (w)); -} - /* file descriptors */ -static void fd_start (flux_watcher_t *w) +struct fd_watcher { + ev_io evw; +}; + +static void fd_watcher_start (flux_watcher_t *w) { - ev_io *iow = watcher_get_data (w); + struct fd_watcher *fdw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); - ev_io_start (loop, iow); + ev_io_start (loop, &fdw->evw); } -static void fd_stop (flux_watcher_t *w) +static void fd_watcher_stop (flux_watcher_t *w) { - ev_io *iow = watcher_get_data (w); + struct fd_watcher *fdw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); - ev_io_stop (loop, iow); + ev_io_stop (loop, &fdw->evw); } -static void fd_cb (struct ev_loop *loop, ev_io *iow, int revents) +static bool fd_watcher_is_active (flux_watcher_t *w) +{ + struct fd_watcher *fdw = watcher_get_data (w); + return ev_is_active (&fdw->evw); +} + +static void fd_watcher_cb (struct ev_loop *loop, ev_io *iow, int revents) { struct flux_watcher *w = iow->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops fd_watcher = { - .start = fd_start, - .stop = fd_stop, + +static struct flux_watcher_ops fd_watcher_ops = { + .start = fd_watcher_start, + .stop = fd_watcher_stop, + .is_active = fd_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, @@ -121,53 +124,66 @@ flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_io *iow; + struct fd_watcher *fdw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*iow), &fd_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*fdw), &fd_watcher_ops, cb, arg))) return NULL; - iow = watcher_get_data (w); - ev_io_init (iow, fd_cb, fd, events_to_libev (events) & ~EV_ERROR); - iow->data = w; + fdw = watcher_get_data (w); + ev_io_init (&fdw->evw, + fd_watcher_cb, + fd, + events_to_libev (events) & ~EV_ERROR); + fdw->evw.data = w; return w; } int flux_fd_watcher_get_fd (flux_watcher_t *w) { - assert (watcher_get_ops (w) == &fd_watcher); - ev_io *iow = watcher_get_data (w); - return iow->fd; + assert (watcher_get_ops (w) == &fd_watcher_ops); + struct fd_watcher *fdw = watcher_get_data (w); + return fdw->evw.fd; } /* Timer */ -static void timer_start (flux_watcher_t *w) +struct timer_watcher { + struct ev_timer evw; +}; + +static void timer_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - struct ev_timer *tw = watcher_get_data (w); - ev_timer_start (loop, tw); + struct timer_watcher *tmw = watcher_get_data (w); + ev_timer_start (loop, &tmw->evw); } -static void timer_stop (flux_watcher_t *w) +static void timer_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - struct ev_timer *tw = watcher_get_data (w); - ev_timer_stop (loop, tw); + struct timer_watcher *tmw = watcher_get_data (w); + ev_timer_stop (loop, &tmw->evw); } -static void timer_cb (struct ev_loop *loop, ev_timer *tw, int revents) +static void timer_watcher_cb (struct ev_loop *loop, ev_timer *evw, int revents) { - struct flux_watcher *w = tw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops timer_watcher = { - .start = timer_start, - .stop = timer_stop, +static bool timer_watcher_is_active (flux_watcher_t *w) +{ + struct timer_watcher *tmw = watcher_get_data (w); + return ev_is_active (&tmw->evw); +} + +static struct flux_watcher_ops timer_watcher_ops = { + .start = timer_watcher_start, + .stop = timer_watcher_stop, + .is_active = timer_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, @@ -176,79 +192,82 @@ flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_timer *tw; + struct timer_watcher *tmw; flux_watcher_t *w; if (after < 0 || repeat < 0) { errno = EINVAL; return NULL; } - if (!(w = watcher_create (r, sizeof (*tw), &timer_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*tmw), &timer_watcher_ops, cb, arg))) return NULL; - tw = watcher_get_data (w); - ev_timer_init (tw, timer_cb, after, repeat); - tw->data = w; + tmw = watcher_get_data (w); + ev_timer_init (&tmw->evw, timer_watcher_cb, after, repeat); + tmw->evw.data = w; return w; } void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) { - assert (watcher_get_ops (w) == &timer_watcher); - ev_timer *tw = watcher_get_data (w); - ev_timer_set (tw, after, repeat); + assert (watcher_get_ops (w) == &timer_watcher_ops); + struct timer_watcher *tmw = watcher_get_data (w); + ev_timer_set (&tmw->evw, after, repeat); } void flux_timer_watcher_again (flux_watcher_t *w) { - assert (watcher_get_ops (w) == &timer_watcher); + assert (watcher_get_ops (w) == &timer_watcher_ops); + struct timer_watcher *tmw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); - ev_timer *tw = watcher_get_data (w); - ev_timer_again (loop, tw); + ev_timer_again (loop, &tmw->evw); } /* Periodic */ -struct f_periodic { + +struct periodic_watcher { struct flux_watcher *w; - ev_periodic evp; - flux_reschedule_f reschedule_cb; + ev_periodic evw; + flux_reschedule_f reschedule_cb; }; -static void periodic_start (flux_watcher_t *w) +static void periodic_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - struct f_periodic *fp = watcher_get_data (w); - ev_periodic_start (loop, &fp->evp); + struct periodic_watcher *pdw = watcher_get_data (w); + ev_periodic_start (loop, &pdw->evw); } -static void periodic_stop (flux_watcher_t *w) +static void periodic_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - struct f_periodic *fp = watcher_get_data (w); - ev_periodic_stop (loop, &fp->evp); + struct periodic_watcher *pdw = watcher_get_data (w); + ev_periodic_stop (loop, &pdw->evw); } -static bool periodic_is_active (flux_watcher_t *w) +static bool periodic_watcher_is_active (flux_watcher_t *w) { - struct f_periodic *fp = watcher_get_data (w); - return ev_is_active (&fp->evp); + struct periodic_watcher *pdw = watcher_get_data (w); + return ev_is_active (&pdw->evw); } -static void periodic_cb (struct ev_loop *loop, ev_periodic *pw, int revents) +static void periodic_watcher_cb (struct ev_loop *loop, + ev_periodic *pw, + int revents) { - struct f_periodic *fp = pw->data; - struct flux_watcher *w = fp->w; + struct periodic_watcher *pdw = pw->data; + struct flux_watcher *w = pdw->w; watcher_call_ev (w, revents); } -static ev_tstamp periodic_reschedule_cb (ev_periodic *pw, ev_tstamp now) +static ev_tstamp periodic_watcher_reschedule_cb (ev_periodic *pw, + ev_tstamp now) { + struct periodic_watcher *pdw = pw->data; + struct flux_watcher *w = pdw->w; ev_tstamp rc; - struct f_periodic *fp = pw->data; - assert (fp->reschedule_cb != NULL); - rc = (ev_tstamp)fp->reschedule_cb (fp->w, - (double)now, - watcher_get_arg (fp->w)); + assert (pdw->reschedule_cb != NULL); + rc = (ev_tstamp)pdw->reschedule_cb (w, (double)now, watcher_get_arg (w)); if (rc < now) { /* User reschedule cb returned time in the past. The watcher will * be stopped, but not here (changing loop is not allowed in a @@ -256,17 +275,17 @@ static ev_tstamp periodic_reschedule_cb (ev_periodic *pw, ev_tstamp now) * a prepare callback. * Return time far in the future to ensure we aren't called again. */ - watcher_stop_safe (fp->w); + watcher_stop_safe (w); return (now + 1e99); } return rc; } -static struct flux_watcher_ops periodic_watcher = { - .start = periodic_start, - .stop = periodic_stop, +static struct flux_watcher_ops periodic_watcher_ops = { + .start = periodic_watcher_start, + .stop = periodic_watcher_stop, + .is_active = periodic_watcher_is_active, .destroy = NULL, - .is_active = periodic_is_active, }; flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, @@ -277,24 +296,24 @@ flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, void *arg) { flux_watcher_t *w; - struct f_periodic *fp; - size_t size = sizeof (*fp); + struct periodic_watcher *pdw; + size_t size = sizeof (*pdw); if (offset < 0 || interval < 0) { errno = EINVAL; return NULL; } - if (!(w = watcher_create (r, size, &periodic_watcher, cb, arg))) + if (!(w = watcher_create (r, size, &periodic_watcher_ops, cb, arg))) return NULL; - fp = watcher_get_data (w); - fp->evp.data = fp; - fp->w = w; - fp->reschedule_cb = reschedule_cb; + pdw = watcher_get_data (w); + pdw->evw.data = pdw; + pdw->w = w; + pdw->reschedule_cb = reschedule_cb; - ev_periodic_init (&fp->evp, - periodic_cb, + ev_periodic_init (&pdw->evw, + periodic_watcher_cb, offset, interval, - reschedule_cb ? periodic_reschedule_cb : NULL); + reschedule_cb ? periodic_watcher_reschedule_cb : NULL); return w; } @@ -305,23 +324,23 @@ void flux_periodic_watcher_reset (flux_watcher_t *w, flux_reschedule_f reschedule_cb) { struct ev_loop *loop = watcher_get_ev (w); - struct f_periodic *fp = watcher_get_data (w); - assert (watcher_get_ops (w) == &periodic_watcher); - fp->reschedule_cb = reschedule_cb; - ev_periodic_set (&fp->evp, + struct periodic_watcher *pdw = watcher_get_data (w); + assert (watcher_get_ops (w) == &periodic_watcher_ops); + pdw->reschedule_cb = reschedule_cb; + ev_periodic_set (&pdw->evw, next, interval, - reschedule_cb ? periodic_reschedule_cb : NULL); - ev_periodic_again (loop, &fp->evp); + reschedule_cb ? periodic_watcher_reschedule_cb : NULL); + ev_periodic_again (loop, &pdw->evw); } double flux_watcher_next_wakeup (flux_watcher_t *w) { - if (watcher_get_ops (w) == &periodic_watcher) { - struct f_periodic *fp = watcher_get_data (w); - return ((double) ev_periodic_at (&fp->evp)); + if (watcher_get_ops (w) == &periodic_watcher_ops) { + struct periodic_watcher *pdw = watcher_get_data (w); + return ((double) ev_periodic_at (&pdw->evw)); } - else if (watcher_get_ops (w) == &timer_watcher) { + else if (watcher_get_ops (w) == &timer_watcher_ops) { ev_timer *tw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); return ((double) (ev_now (loop) + ev_timer_remaining (loop, tw))); @@ -332,45 +351,58 @@ double flux_watcher_next_wakeup (flux_watcher_t *w) /* Prepare */ -static void prepare_start (flux_watcher_t *w) + +struct prepare_watcher { + ev_prepare evw; +}; + +static void prepare_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_prepare *pw = watcher_get_data (w); - ev_prepare_start (loop, pw); + struct prepare_watcher *pw = watcher_get_data (w); + ev_prepare_start (loop, &pw->evw); } -static void prepare_stop (flux_watcher_t *w) +static void prepare_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_prepare *pw = watcher_get_data (w); - ev_prepare_stop (loop, pw); + struct prepare_watcher *pw = watcher_get_data (w); + ev_prepare_stop (loop, &pw->evw); +} + +static bool prepare_watcher_is_active (flux_watcher_t *w) +{ + struct prepare_watcher *pw = watcher_get_data (w); + return ev_is_active (&pw->evw); } -static void prepare_cb (struct ev_loop *loop, ev_prepare *pw, int revents) +static void prepare_watcher_cb (struct ev_loop *loop, + ev_prepare *evw, + int revents) { - struct flux_watcher *w = pw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops prepare_watcher = { - .start = prepare_start, - .stop = prepare_stop, +static struct flux_watcher_ops prepare_watcher_ops = { + .start = prepare_watcher_start, + .stop = prepare_watcher_stop, + .is_active = prepare_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_prepare *pw; + struct prepare_watcher *pw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher_ops, cb, arg))) return NULL; pw = watcher_get_data (w); - ev_prepare_init (pw, prepare_cb); - pw->data = w; + ev_prepare_init (&pw->evw, prepare_watcher_cb); + pw->evw.data = w; return w; } @@ -378,52 +410,62 @@ flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, /* Check */ -static void check_set_priority (flux_watcher_t *w, int priority) +struct check_watcher { + ev_check evw; +}; + +static void check_watcher_set_priority (flux_watcher_t *w, int priority) { - ev_check *cw = watcher_get_data (w); - ev_set_priority (cw, priority); + struct check_watcher *cw = watcher_get_data (w); + ev_set_priority (&cw->evw, priority); } -static void check_start (flux_watcher_t *w) +static void check_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_check *cw = watcher_get_data (w); - ev_check_start (loop, cw); + struct check_watcher *cw = watcher_get_data (w); + ev_check_start (loop, &cw->evw); } -static void check_stop (flux_watcher_t *w) +static void check_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_check *cw = watcher_get_data (w); - ev_check_stop (loop, cw); + struct check_watcher *cw = watcher_get_data (w); + ev_check_stop (loop, &cw->evw); } -static void check_cb (struct ev_loop *loop, ev_check *cw, int revents) +static bool check_watcher_is_active (flux_watcher_t *w) { - struct flux_watcher *w = cw->data; + struct check_watcher *cw = watcher_get_data (w); + return ev_is_active (&cw->evw); +} + +static void check_watcher_cb (struct ev_loop *loop, ev_check *evw, int revents) +{ + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops check_watcher = { - .set_priority = check_set_priority, - .start = check_start, - .stop = check_stop, +static struct flux_watcher_ops check_watcher_ops = { + .set_priority = check_watcher_set_priority, + .start = check_watcher_start, + .stop = check_watcher_stop, + .is_active = check_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_check *cw; + struct check_watcher *cw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*cw), &check_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*cw), &check_watcher_ops, cb, arg))) return NULL; cw = watcher_get_data (w); - ev_check_init (cw, check_cb); - cw->data = w; + ev_check_init (&cw->evw, check_watcher_cb); + cw->evw.data = w; return w; } @@ -431,45 +473,55 @@ flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, /* Idle */ -static void idle_start (flux_watcher_t *w) +struct idle_watcher { + ev_idle evw; +}; + +static void idle_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_idle *iw = watcher_get_data (w); - ev_idle_start (loop, iw); + struct idle_watcher *iw = watcher_get_data (w); + ev_idle_start (loop, &iw->evw); } -static void idle_stop (flux_watcher_t *w) +static void idle_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_idle *iw = watcher_get_data (w); - ev_idle_stop (loop, iw); + struct idle_watcher *iw = watcher_get_data (w); + ev_idle_stop (loop, &iw->evw); +} + +static bool idle_watcher_is_active (flux_watcher_t *w) +{ + struct idle_watcher *iw = watcher_get_data (w); + return ev_is_active (&iw->evw); } -static void idle_cb (struct ev_loop *loop, ev_idle *iw, int revents) +static void idle_watcher_cb (struct ev_loop *loop, ev_idle *evw, int revents) { - struct flux_watcher *w = iw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops idle_watcher = { - .start = idle_start, - .stop = idle_stop, +static struct flux_watcher_ops idle_watcher_ops = { + .start = idle_watcher_start, + .stop = idle_watcher_stop, + .is_active = idle_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_idle *iw; + struct idle_watcher *iw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher_ops, cb, arg))) return NULL; iw = watcher_get_data (w); - ev_idle_init (iw, idle_cb); - iw->data = w; + ev_idle_init (&iw->evw, idle_watcher_cb); + iw->evw.data = w; return w; } @@ -477,34 +529,43 @@ flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, /* Child */ -static void child_start (flux_watcher_t *w) +struct child_watcher { + ev_child evw; +}; + +static void child_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_child *cw = watcher_get_data (w); - ev_child_start (loop, cw); + struct child_watcher *cw = watcher_get_data (w); + ev_child_start (loop, &cw->evw); } -static void child_stop (flux_watcher_t *w) +static void child_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_child *cw = watcher_get_data (w); - ev_child_stop (loop, cw); + struct child_watcher *cw = watcher_get_data (w); + ev_child_stop (loop, &cw->evw); +} + +static bool child_watcher_is_active (flux_watcher_t *w) +{ + struct child_watcher *cw = watcher_get_data (w); + return ev_is_active (&cw->evw); } -static void child_cb (struct ev_loop *loop, ev_child *cw, int revents) +static void child_watcher_cb (struct ev_loop *loop, ev_child *evw, int revents) { - struct flux_watcher *w = cw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops child_watcher = { - .start = child_start, - .stop = child_stop, +static struct flux_watcher_ops child_watcher_ops = { + .start = child_watcher_start, + .stop = child_watcher_stop, + .is_active = child_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; - flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, int pid, bool trace, @@ -512,69 +573,81 @@ flux_watcher_t *flux_child_watcher_create (flux_reactor_t *r, void *arg) { flux_watcher_t *w; - ev_child *cw; + struct child_watcher *cw; if (!ev_is_default_loop (reactor_get_loop (r))) { errno = EINVAL; return NULL; } - if (!(w = watcher_create (r, sizeof (*cw), &child_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*cw), &child_watcher_ops, cb, arg))) return NULL; cw = watcher_get_data (w); - ev_child_init (cw, child_cb, pid, trace ? 1 : 0); - cw->data = w; + ev_child_init (&cw->evw, child_watcher_cb, pid, trace ? 1 : 0); + cw->evw.data = w; return w; } int flux_child_watcher_get_rpid (flux_watcher_t *w) { - if (watcher_get_ops (w) != &child_watcher) { + if (watcher_get_ops (w) != &child_watcher_ops) { errno = EINVAL; return -1; } - ev_child *cw = watcher_get_data (w); - return cw->rpid; + struct child_watcher *cw = watcher_get_data (w); + return cw->evw.rpid; } int flux_child_watcher_get_rstatus (flux_watcher_t *w) { - if (watcher_get_ops (w) != &child_watcher) { + if (watcher_get_ops (w) != &child_watcher_ops) { errno = EINVAL; return -1; } - ev_child *cw = watcher_get_data (w); - return cw->rstatus; + struct child_watcher *cw = watcher_get_data (w); + return cw->evw.rstatus; } /* Signal */ -static void signal_start (flux_watcher_t *w) +struct signal_watcher { + ev_signal evw; +}; + +static void signal_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_signal *sw = watcher_get_data (w); - ev_signal_start (loop, sw); + struct signal_watcher *sw = watcher_get_data (w); + ev_signal_start (loop, &sw->evw); } -static void signal_stop (flux_watcher_t *w) +static void signal_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_signal *sw = watcher_get_data (w); - ev_signal_stop (loop, sw); + struct signal_watcher *sw = watcher_get_data (w); + ev_signal_stop (loop, &sw->evw); +} + +static bool signal_watcher_is_active (flux_watcher_t *w) +{ + struct signal_watcher *sw = watcher_get_data (w); + return ev_is_active (&sw->evw); } -static void signal_cb (struct ev_loop *loop, ev_signal *sw, int revents) +static void signal_watcher_cb (struct ev_loop *loop, + ev_signal *evw, + int revents) { - struct flux_watcher *w = sw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops signal_watcher = { - .start = signal_start, - .stop = signal_stop, +static struct flux_watcher_ops signal_watcher_ops = { + .start = signal_watcher_start, + .stop = signal_watcher_stop, + .is_active = signal_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, @@ -583,55 +656,65 @@ flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, void *arg) { flux_watcher_t *w; - ev_signal *sw; + struct signal_watcher *sw; - if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher_ops, cb, arg))) return NULL; sw = watcher_get_data (w); - ev_signal_init (sw, signal_cb, signum); - sw->data = w; + ev_signal_init (&sw->evw, signal_watcher_cb, signum); + sw->evw.data = w; return w; } int flux_signal_watcher_get_signum (flux_watcher_t *w) { - if (watcher_get_ops (w) != &signal_watcher) { + if (watcher_get_ops (w) != &signal_watcher_ops) { errno = EINVAL; return (-1); } - ev_signal *sw = watcher_get_data (w); - return sw->signum; + struct signal_watcher *sw = watcher_get_data (w); + return sw->evw.signum; } /* Stat */ -static void stat_start (flux_watcher_t *w) +struct stat_watcher { + ev_stat evw; +}; + +static void stat_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_stat *sw = watcher_get_data (w); - ev_stat_start (loop, sw); + struct stat_watcher *sw = watcher_get_data (w); + ev_stat_start (loop, &sw->evw); } -static void stat_stop (flux_watcher_t *w) +static void stat_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); - ev_stat *sw = watcher_get_data (w); - ev_stat_stop (loop, sw); + struct stat_watcher *sw = watcher_get_data (w); + ev_stat_stop (loop, &sw->evw); +} + +static bool stat_watcher_is_active (flux_watcher_t *w) +{ + struct stat_watcher *sw = watcher_get_data (w); + return ev_is_active (&sw->evw); } -static void stat_cb (struct ev_loop *loop, ev_stat *sw, int revents) +static void stat_watcher_cb (struct ev_loop *loop, ev_stat *evw, int revents) { - struct flux_watcher *w = sw->data; + struct flux_watcher *w = evw->data; watcher_call_ev (w, revents); } -static struct flux_watcher_ops stat_watcher = { - .start = stat_start, - .stop = stat_stop, +static struct flux_watcher_ops stat_watcher_ops = { + .start = stat_watcher_start, + .stop = stat_watcher_stop, + .is_active = stat_watcher_is_active, .destroy = NULL, - .is_active = wrap_ev_active, }; flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, @@ -641,13 +724,13 @@ flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, void *arg) { flux_watcher_t *w; - ev_stat *sw; + struct stat_watcher *sw; - if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher_ops, cb, arg))) return NULL; sw = watcher_get_data (w); - ev_stat_init (sw, stat_cb, path, interval); - sw->data = w; + ev_stat_init (&sw->evw, stat_watcher_cb, path, interval); + sw->evw.data = w; return w; } @@ -656,12 +739,12 @@ void flux_stat_watcher_get_rstat (flux_watcher_t *w, struct stat *stat, struct stat *prev) { - ev_stat *sw = watcher_get_data (w); - assert (watcher_get_ops (w) == &stat_watcher); + struct stat_watcher *sw = watcher_get_data (w); + assert (watcher_get_ops (w) == &stat_watcher_ops); if (stat) - *stat = sw->attr; + *stat = sw->evw.attr; if (prev) - *prev = sw->prev; + *prev = sw->evw.prev; } /* From 02db974c4cfb7cee78036c5e0e2819414caf9a6d Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 08:13:18 -0800 Subject: [PATCH 02/18] reactor: don't call assert(3) internally Problem: passing incorrect arguments to reactor watcher functions can trigger an assertion failure. For functions that return success/failure, fail with errno=EINVAL if the arguments are incorrect. For void functions, do nothing if the arguments are incorrect. --- src/common/libflux/reactor.c | 1 - src/common/libflux/watcher_wrap.c | 55 +++++++++++++++++-------------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/src/common/libflux/reactor.c b/src/common/libflux/reactor.c index 767476bcb222..5dc3dbd195e9 100644 --- a/src/common/libflux/reactor.c +++ b/src/common/libflux/reactor.c @@ -11,7 +11,6 @@ #if HAVE_CONFIG_H #include "config.h" #endif -#include #include #include #include diff --git a/src/common/libflux/watcher_wrap.c b/src/common/libflux/watcher_wrap.c index 5baab220df77..1c6bd8f079a4 100644 --- a/src/common/libflux/watcher_wrap.c +++ b/src/common/libflux/watcher_wrap.c @@ -13,7 +13,6 @@ #if HAVE_CONFIG_H #include "config.h" #endif -#include #include #include @@ -141,9 +140,12 @@ flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, int flux_fd_watcher_get_fd (flux_watcher_t *w) { - assert (watcher_get_ops (w) == &fd_watcher_ops); - struct fd_watcher *fdw = watcher_get_data (w); - return fdw->evw.fd; + if (watcher_get_ops (w) == &fd_watcher_ops) { + struct fd_watcher *fdw = watcher_get_data (w); + return fdw->evw.fd; + } + errno = EINVAL; + return -1; } /* Timer @@ -209,17 +211,19 @@ flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) { - assert (watcher_get_ops (w) == &timer_watcher_ops); - struct timer_watcher *tmw = watcher_get_data (w); - ev_timer_set (&tmw->evw, after, repeat); + if (watcher_get_ops (w) == &timer_watcher_ops) { + struct timer_watcher *tmw = watcher_get_data (w); + ev_timer_set (&tmw->evw, after, repeat); + } } void flux_timer_watcher_again (flux_watcher_t *w) { - assert (watcher_get_ops (w) == &timer_watcher_ops); - struct timer_watcher *tmw = watcher_get_data (w); - struct ev_loop *loop = watcher_get_ev (w); - ev_timer_again (loop, &tmw->evw); + if (watcher_get_ops (w) == &timer_watcher_ops) { + struct timer_watcher *tmw = watcher_get_data (w); + struct ev_loop *loop = watcher_get_ev (w); + ev_timer_again (loop, &tmw->evw); + } } /* Periodic @@ -266,7 +270,8 @@ static ev_tstamp periodic_watcher_reschedule_cb (ev_periodic *pw, struct periodic_watcher *pdw = pw->data; struct flux_watcher *w = pdw->w; ev_tstamp rc; - assert (pdw->reschedule_cb != NULL); + if (pdw->reschedule_cb == NULL) + return 0; rc = (ev_tstamp)pdw->reschedule_cb (w, (double)now, watcher_get_arg (w)); if (rc < now) { /* User reschedule cb returned time in the past. The watcher will @@ -325,13 +330,14 @@ void flux_periodic_watcher_reset (flux_watcher_t *w, { struct ev_loop *loop = watcher_get_ev (w); struct periodic_watcher *pdw = watcher_get_data (w); - assert (watcher_get_ops (w) == &periodic_watcher_ops); - pdw->reschedule_cb = reschedule_cb; - ev_periodic_set (&pdw->evw, - next, - interval, - reschedule_cb ? periodic_watcher_reschedule_cb : NULL); - ev_periodic_again (loop, &pdw->evw); + if (watcher_get_ops (w) == &periodic_watcher_ops) { + pdw->reschedule_cb = reschedule_cb; + ev_periodic_set (&pdw->evw, + next, + interval, + reschedule_cb ? periodic_watcher_reschedule_cb : NULL); + ev_periodic_again (loop, &pdw->evw); + } } double flux_watcher_next_wakeup (flux_watcher_t *w) @@ -740,11 +746,12 @@ void flux_stat_watcher_get_rstat (flux_watcher_t *w, struct stat *prev) { struct stat_watcher *sw = watcher_get_data (w); - assert (watcher_get_ops (w) == &stat_watcher_ops); - if (stat) - *stat = sw->evw.attr; - if (prev) - *prev = sw->evw.prev; + if (watcher_get_ops (w) == &stat_watcher_ops) { + if (stat) + *stat = sw->evw.attr; + if (prev) + *prev = sw->evw.prev; + } } /* From ffd8dd747350a3eaa34afbbd757b66ce22959e0d Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 08:28:24 -0800 Subject: [PATCH 03/18] reactor: change flux_stat_watcher_get_rstat() Problem: now that flux_stat_watcher_get_rstat() no longer asserts on error, callers are unable to tell if the pointer arguments have been assigned. Change the function to return an int instead of void. Update man page. Add a unit test. --- doc/man3/flux_stat_watcher_create.rst | 12 +++++++++--- src/common/libflux/test/reactor.c | 6 ++++++ src/common/libflux/watcher.h | 6 +++--- src/common/libflux/watcher_wrap.c | 9 ++++++--- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/doc/man3/flux_stat_watcher_create.rst b/doc/man3/flux_stat_watcher_create.rst index dae76688f106..9491df6ff17e 100644 --- a/doc/man3/flux_stat_watcher_create.rst +++ b/doc/man3/flux_stat_watcher_create.rst @@ -22,9 +22,9 @@ SYNOPSIS flux_watcher_f callback, void *arg); - void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev); + int flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev); Link with :command:`-lflux-core`. @@ -55,6 +55,9 @@ RETURN VALUE :func:`flux_stat_watcher_create` returns a :type:`flux_watcher_t` object on success. On error, NULL is returned, and :var:`errno` is set appropriately. +:func:`flux_stat_watcher_get_rstat` returns 0 on success. On error, -1 is +returned and :var:`errno` is set appropriately. + ERRORS ====== @@ -62,6 +65,9 @@ ERRORS ENOMEM Out of memory. +EINVAL + Invalid argument. + RESOURCES ========= diff --git a/src/common/libflux/test/reactor.c b/src/common/libflux/test/reactor.c index d500464cef38..acfdf0457a87 100644 --- a/src/common/libflux/test/reactor.c +++ b/src/common/libflux/test/reactor.c @@ -641,6 +641,12 @@ static void test_stat (flux_reactor_t *reactor) "created timer watcher"); flux_watcher_start (tw); + /* Make sure rstat accessor fails if passed the wrong watcher type. + */ + errno = 0; + ok (flux_stat_watcher_get_rstat (tw, NULL, NULL) < 0 && errno == EINVAL, + "flux_stat_watcher_get_rstat fails with EINVAL on wrong watcher type"); + ok (flux_reactor_run (reactor, 0) == 0, "reactor ran successfully"); diff --git a/src/common/libflux/watcher.h b/src/common/libflux/watcher.h index 0ebe83842fe6..1488dc7d4929 100644 --- a/src/common/libflux/watcher.h +++ b/src/common/libflux/watcher.h @@ -133,9 +133,9 @@ flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, double interval, flux_watcher_f cb, void *arg); -void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev); +int flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev); #ifdef __cplusplus } diff --git a/src/common/libflux/watcher_wrap.c b/src/common/libflux/watcher_wrap.c index 1c6bd8f079a4..e457fd9d6e68 100644 --- a/src/common/libflux/watcher_wrap.c +++ b/src/common/libflux/watcher_wrap.c @@ -741,9 +741,9 @@ flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, return w; } -void flux_stat_watcher_get_rstat (flux_watcher_t *w, - struct stat *stat, - struct stat *prev) +int flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev) { struct stat_watcher *sw = watcher_get_data (w); if (watcher_get_ops (w) == &stat_watcher_ops) { @@ -751,7 +751,10 @@ void flux_stat_watcher_get_rstat (flux_watcher_t *w, *stat = sw->evw.attr; if (prev) *prev = sw->evw.prev; + return 0; } + errno = EINVAL; + return -1; } /* From 40090df1262b1841a94054521c18db18ba0036e9 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 05:51:35 -0800 Subject: [PATCH 04/18] reactor: add watcher references Problem: flux_reactor_active_incref() and _decref() are easy to use incorrectly, resulting in a reactor hang. libuv improved the interface by adding uv_ref(), uv_unref(), uv_is_referenced() for its handles. These accomplish the same thing but are safer because they are associated with a handle (watcher) rather than the reactor and they are idempotent. See also: https://docs.libuv.org/en/v1.x/handle.html#refcount Add the equivalent interface for the Flux reactor: flux_watcher_ref() flux_watcher_unref() flux_watcher_is_referenced() If a watcher doesn't implement the ref/unref callbacks then flux_watcher_ref() and flux_watcher_unref() will be no-ops and flux_watcher_is_referenced() will always return true. Implement callbacks for the native libev watchers. --- src/common/libflux/watcher.c | 28 ++++++ src/common/libflux/watcher.h | 3 + src/common/libflux/watcher_private.h | 2 + src/common/libflux/watcher_wrap.c | 135 ++++++++++++++++++++++++++- 4 files changed, 163 insertions(+), 5 deletions(-) diff --git a/src/common/libflux/watcher.c b/src/common/libflux/watcher.c index e6bc66c2454e..1cf924c9242c 100644 --- a/src/common/libflux/watcher.c +++ b/src/common/libflux/watcher.c @@ -22,6 +22,7 @@ struct flux_watcher { flux_watcher_f fn; void *arg; struct flux_watcher_ops *ops; + bool unreferenced; void *data; }; @@ -99,6 +100,26 @@ void flux_watcher_stop (flux_watcher_t *w) } } +void flux_watcher_ref (flux_watcher_t *w) +{ + if (w && w->unreferenced) { + if (w->ops->ref) { + w->ops->ref (w); + w->unreferenced = false; + } + } +} + +void flux_watcher_unref (flux_watcher_t *w) +{ + if (w && !w->unreferenced) { + if (w->ops->unref) { + w->ops->unref(w); + w->unreferenced = true; + } + } +} + bool flux_watcher_is_active (flux_watcher_t *w) { if (w) { @@ -108,6 +129,13 @@ bool flux_watcher_is_active (flux_watcher_t *w) return false; } +bool flux_watcher_is_referenced (flux_watcher_t *w) +{ + if (w) + return !w->unreferenced; + return true; +} + void flux_watcher_destroy (flux_watcher_t *w) { if (w) { diff --git a/src/common/libflux/watcher.h b/src/common/libflux/watcher.h index 1488dc7d4929..54959f75a96f 100644 --- a/src/common/libflux/watcher.h +++ b/src/common/libflux/watcher.h @@ -35,9 +35,12 @@ void flux_watcher_set_priority (flux_watcher_t *w, int priority); void flux_watcher_start (flux_watcher_t *w); void flux_watcher_stop (flux_watcher_t *w); +void flux_watcher_ref (flux_watcher_t *w); +void flux_watcher_unref (flux_watcher_t *w); void flux_watcher_destroy (flux_watcher_t *w); double flux_watcher_next_wakeup (flux_watcher_t *w); bool flux_watcher_is_active (flux_watcher_t *w); +bool flux_watcher_is_referenced (flux_watcher_t *w); /* flux_t handle */ diff --git a/src/common/libflux/watcher_private.h b/src/common/libflux/watcher_private.h index fae4c229bfda..2deb1895c1f6 100644 --- a/src/common/libflux/watcher_private.h +++ b/src/common/libflux/watcher_private.h @@ -19,6 +19,8 @@ struct flux_watcher_ops { void (*set_priority) (flux_watcher_t *w, int priority); void (*start) (flux_watcher_t *w); void (*stop) (flux_watcher_t *w); + void (*ref) (flux_watcher_t *w); + void (*unref) (flux_watcher_t *w); void (*destroy) (flux_watcher_t *w); bool (*is_active) (flux_watcher_t *w); }; diff --git a/src/common/libflux/watcher_wrap.c b/src/common/libflux/watcher_wrap.c index e457fd9d6e68..7d38680e1dfb 100644 --- a/src/common/libflux/watcher_wrap.c +++ b/src/common/libflux/watcher_wrap.c @@ -53,6 +53,51 @@ static struct ev_loop *watcher_get_ev (flux_watcher_t *w) return reactor_get_loop (watcher_get_reactor (w)); } +/* generic ops->ref() callback for libev watchers + * If the watcher is "refed" while active then we need to fudge + * the active refcount now, because the w->unreferenced flag will be cleared + * by the time ops->stop() is called. + * N.B. flux_watcher_ref() only calls this if w->unreferenced is set. + */ +static void watcher_ref_ev (flux_watcher_t *w) +{ + if (flux_watcher_is_active (w)) + ev_ref (watcher_get_ev (w)); +} + +/* generic ops->unref() callback for libev watchers + * If the watcher is "unrefed" while active then we need to fudge + * the active refcount now, since ops->start() already occurred. + * N.B. flux_watcher_unref() only calls this if w->unreferenced is clear. + */ +static void watcher_unref_ev (flux_watcher_t *w) +{ + if (flux_watcher_is_active (w)) + ev_unref (watcher_get_ev (w)); +} + +/* helper for ops->start() + * Call after ev_TYPE_start() to fudge libev reactor active refcount. + */ +static void watcher_start_post_ev (flux_watcher_t *w, bool was_active) +{ + if (!flux_watcher_is_referenced (w)) { + if (!was_active) + ev_unref (watcher_get_ev (w)); + } +} + +/* helper for ops->stop() + * Call before ev_TYPE_stop() to fudge libev reactor active refcount. + */ +static void watcher_stop_pre_ev (flux_watcher_t *w) +{ + if (!flux_watcher_is_referenced (w)) { + if (flux_watcher_is_active (w)) + ev_ref (watcher_get_ev (w)); + } +} + static void safe_stop_cb (struct ev_loop *loop, ev_prepare *pw, int revents) { flux_watcher_stop ((flux_watcher_t *)pw->data); @@ -87,13 +132,16 @@ static void fd_watcher_start (flux_watcher_t *w) { struct fd_watcher *fdw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); + bool active = ev_is_active (&fdw->evw); ev_io_start (loop, &fdw->evw); + watcher_start_post_ev (w, active); } static void fd_watcher_stop (flux_watcher_t *w) { struct fd_watcher *fdw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); + watcher_stop_pre_ev (w); ev_io_stop (loop, &fdw->evw); } @@ -113,6 +161,8 @@ static void fd_watcher_cb (struct ev_loop *loop, ev_io *iow, int revents) static struct flux_watcher_ops fd_watcher_ops = { .start = fd_watcher_start, .stop = fd_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, .is_active = fd_watcher_is_active, .destroy = NULL, }; @@ -153,37 +203,49 @@ int flux_fd_watcher_get_fd (flux_watcher_t *w) struct timer_watcher { struct ev_timer evw; + double repeat; }; static void timer_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct timer_watcher *tmw = watcher_get_data (w); + bool active = ev_is_active (&tmw->evw); ev_timer_start (loop, &tmw->evw); + watcher_start_post_ev (w, active); } static void timer_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct timer_watcher *tmw = watcher_get_data (w); + watcher_stop_pre_ev (w); ev_timer_stop (loop, &tmw->evw); } -static void timer_watcher_cb (struct ev_loop *loop, ev_timer *evw, int revents) +static bool timer_watcher_is_active (flux_watcher_t *w) { - struct flux_watcher *w = evw->data; - watcher_call_ev (w, revents); + struct timer_watcher *tmw = watcher_get_data (w); + return ev_is_active (&tmw->evw); } -static bool timer_watcher_is_active (flux_watcher_t *w) +static void timer_watcher_cb (struct ev_loop *loop, ev_timer *evw, int revents) { + struct flux_watcher *w = evw->data; struct timer_watcher *tmw = watcher_get_data (w); - return ev_is_active (&tmw->evw); + + // if repeat is zero, timer automatically stops + if (tmw->repeat == 0. && !flux_watcher_is_referenced (w)) + ev_ref (loop); + + watcher_call_ev (w, revents); } static struct flux_watcher_ops timer_watcher_ops = { .start = timer_watcher_start, .stop = timer_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, .is_active = timer_watcher_is_active, .destroy = NULL, }; @@ -203,6 +265,7 @@ flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, if (!(w = watcher_create (r, sizeof (*tmw), &timer_watcher_ops, cb, arg))) return NULL; tmw = watcher_get_data (w); + tmw->repeat = repeat; ev_timer_init (&tmw->evw, timer_watcher_cb, after, repeat); tmw->evw.data = w; @@ -213,6 +276,7 @@ void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) { if (watcher_get_ops (w) == &timer_watcher_ops) { struct timer_watcher *tmw = watcher_get_data (w); + tmw->repeat = repeat; ev_timer_set (&tmw->evw, after, repeat); } } @@ -222,7 +286,15 @@ void flux_timer_watcher_again (flux_watcher_t *w) if (watcher_get_ops (w) == &timer_watcher_ops) { struct timer_watcher *tmw = watcher_get_data (w); struct ev_loop *loop = watcher_get_ev (w); + bool active = ev_is_active (&tmw->evw); + + // if repeat is zero, ev_timer_again() automatically stops it + if (tmw->repeat == 0.) + watcher_stop_pre_ev (w); ev_timer_again (loop, &tmw->evw); + // if repeat is set, ev_timer_again() automatically starts it + if (tmw->repeat > 0.) + watcher_start_post_ev (w, active); } } @@ -233,19 +305,23 @@ struct periodic_watcher { struct flux_watcher *w; ev_periodic evw; flux_reschedule_f reschedule_cb; + double interval; }; static void periodic_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct periodic_watcher *pdw = watcher_get_data (w); + bool active = ev_is_active (&pdw->evw); ev_periodic_start (loop, &pdw->evw); + watcher_start_post_ev (w, active); } static void periodic_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct periodic_watcher *pdw = watcher_get_data (w); + watcher_stop_pre_ev (w); ev_periodic_stop (loop, &pdw->evw); } @@ -261,6 +337,11 @@ static void periodic_watcher_cb (struct ev_loop *loop, { struct periodic_watcher *pdw = pw->data; struct flux_watcher *w = pdw->w; + + // if interval is zero, periodic automatically stops + if (pdw->interval == 0. && !flux_watcher_is_referenced (w)) + ev_ref (loop); + watcher_call_ev (w, revents); } @@ -280,6 +361,7 @@ static ev_tstamp periodic_watcher_reschedule_cb (ev_periodic *pw, * a prepare callback. * Return time far in the future to ensure we aren't called again. */ + watcher_stop_pre_ev (w); watcher_stop_safe (w); return (now + 1e99); } @@ -289,6 +371,8 @@ static ev_tstamp periodic_watcher_reschedule_cb (ev_periodic *pw, static struct flux_watcher_ops periodic_watcher_ops = { .start = periodic_watcher_start, .stop = periodic_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, .is_active = periodic_watcher_is_active, .destroy = NULL, }; @@ -310,6 +394,7 @@ flux_watcher_t *flux_periodic_watcher_create (flux_reactor_t *r, if (!(w = watcher_create (r, size, &periodic_watcher_ops, cb, arg))) return NULL; pdw = watcher_get_data (w); + pdw->interval = interval; pdw->evw.data = pdw; pdw->w = w; pdw->reschedule_cb = reschedule_cb; @@ -331,12 +416,22 @@ void flux_periodic_watcher_reset (flux_watcher_t *w, struct ev_loop *loop = watcher_get_ev (w); struct periodic_watcher *pdw = watcher_get_data (w); if (watcher_get_ops (w) == &periodic_watcher_ops) { + pdw->interval = interval; pdw->reschedule_cb = reschedule_cb; ev_periodic_set (&pdw->evw, next, interval, reschedule_cb ? periodic_watcher_reschedule_cb : NULL); + + bool active = ev_is_active (&pdw->evw); + + // if interval is zero, ev_periodic_again() automatically stops it + if (pdw->interval == 0.) + watcher_stop_pre_ev (w); ev_periodic_again (loop, &pdw->evw); + // if interval is set, ev_periodic_again() automatically starts it + if (pdw->interval > 0.) + watcher_start_post_ev (w, active); } } @@ -366,13 +461,16 @@ static void prepare_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct prepare_watcher *pw = watcher_get_data (w); + bool active = ev_is_active (&pw->evw); ev_prepare_start (loop, &pw->evw); + watcher_start_post_ev (w, active); } static void prepare_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct prepare_watcher *pw = watcher_get_data (w); + watcher_stop_pre_ev (w); ev_prepare_stop (loop, &pw->evw); } @@ -393,6 +491,8 @@ static void prepare_watcher_cb (struct ev_loop *loop, static struct flux_watcher_ops prepare_watcher_ops = { .start = prepare_watcher_start, .stop = prepare_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, .is_active = prepare_watcher_is_active, .destroy = NULL, }; @@ -430,13 +530,16 @@ static void check_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct check_watcher *cw = watcher_get_data (w); + bool active = ev_is_active (&cw->evw); ev_check_start (loop, &cw->evw); + watcher_start_post_ev (w, active); } static void check_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct check_watcher *cw = watcher_get_data (w); + watcher_stop_pre_ev (w); ev_check_stop (loop, &cw->evw); } @@ -456,6 +559,8 @@ static struct flux_watcher_ops check_watcher_ops = { .set_priority = check_watcher_set_priority, .start = check_watcher_start, .stop = check_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, .is_active = check_watcher_is_active, .destroy = NULL, }; @@ -487,13 +592,16 @@ static void idle_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct idle_watcher *iw = watcher_get_data (w); + bool active = ev_is_active (&iw->evw); ev_idle_start (loop, &iw->evw); + watcher_start_post_ev (w, active); } static void idle_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct idle_watcher *iw = watcher_get_data (w); + watcher_stop_pre_ev (w); ev_idle_stop (loop, &iw->evw); } @@ -512,6 +620,8 @@ static void idle_watcher_cb (struct ev_loop *loop, ev_idle *evw, int revents) static struct flux_watcher_ops idle_watcher_ops = { .start = idle_watcher_start, .stop = idle_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, .is_active = idle_watcher_is_active, .destroy = NULL, }; @@ -543,13 +653,16 @@ static void child_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct child_watcher *cw = watcher_get_data (w); + bool active = ev_is_active (&cw->evw); ev_child_start (loop, &cw->evw); + watcher_start_post_ev (w, active); } static void child_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct child_watcher *cw = watcher_get_data (w); + watcher_stop_pre_ev (w); ev_child_stop (loop, &cw->evw); } @@ -568,6 +681,8 @@ static void child_watcher_cb (struct ev_loop *loop, ev_child *evw, int revents) static struct flux_watcher_ops child_watcher_ops = { .start = child_watcher_start, .stop = child_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, .is_active = child_watcher_is_active, .destroy = NULL, }; @@ -625,13 +740,16 @@ static void signal_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct signal_watcher *sw = watcher_get_data (w); + bool active = ev_is_active (&sw->evw); ev_signal_start (loop, &sw->evw); + watcher_start_post_ev (w, active); } static void signal_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct signal_watcher *sw = watcher_get_data (w); + watcher_stop_pre_ev (w); ev_signal_stop (loop, &sw->evw); } @@ -652,6 +770,8 @@ static void signal_watcher_cb (struct ev_loop *loop, static struct flux_watcher_ops signal_watcher_ops = { .start = signal_watcher_start, .stop = signal_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, .is_active = signal_watcher_is_active, .destroy = NULL, }; @@ -694,13 +814,16 @@ static void stat_watcher_start (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct stat_watcher *sw = watcher_get_data (w); + bool active = ev_is_active (&sw->evw); ev_stat_start (loop, &sw->evw); + watcher_start_post_ev (w, active); } static void stat_watcher_stop (flux_watcher_t *w) { struct ev_loop *loop = watcher_get_ev (w); struct stat_watcher *sw = watcher_get_data (w); + watcher_stop_pre_ev (w); ev_stat_stop (loop, &sw->evw); } @@ -719,6 +842,8 @@ static void stat_watcher_cb (struct ev_loop *loop, ev_stat *evw, int revents) static struct flux_watcher_ops stat_watcher_ops = { .start = stat_watcher_start, .stop = stat_watcher_stop, + .ref = watcher_ref_ev, + .unref = watcher_unref_ev, .is_active = stat_watcher_is_active, .destroy = NULL, }; From c97c22ce5bac267dc8cf00bba20fc2c40638b004 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 14:13:18 -0800 Subject: [PATCH 05/18] libflux: support ref/unref in handle watcher Problem: flux_watcher_ref() and _unref() don't work on handle watchers. Add the necessary internal callbacks for this to work. --- src/common/libflux/hwatcher.c | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/common/libflux/hwatcher.c b/src/common/libflux/hwatcher.c index 3c662a874896..b8b33ac5678a 100644 --- a/src/common/libflux/hwatcher.c +++ b/src/common/libflux/hwatcher.c @@ -46,6 +46,26 @@ static void hwatcher_stop (flux_watcher_t *w) flux_watcher_stop (hw->idle_w); } +static void hwatcher_ref (flux_watcher_t *w) +{ + struct hwatcher *hw = watcher_get_data (w); + + flux_watcher_ref (hw->fd_w); + flux_watcher_ref (hw->prepare_w); + flux_watcher_ref (hw->idle_w); + flux_watcher_ref (hw->check_w); +} + +static void hwatcher_unref (flux_watcher_t *w) +{ + struct hwatcher *hw = watcher_get_data (w); + + flux_watcher_unref (hw->fd_w); + flux_watcher_unref (hw->prepare_w); + flux_watcher_unref (hw->idle_w); + flux_watcher_unref (hw->check_w); +} + static bool hwatcher_is_active (flux_watcher_t *w) { struct hwatcher *hw = watcher_get_data (w); @@ -106,6 +126,8 @@ static void hwatcher_check_cb (flux_reactor_t *r, static struct flux_watcher_ops hwatcher_ops = { .start = hwatcher_start, .stop = hwatcher_stop, + .ref = hwatcher_ref, + .unref = hwatcher_unref, .is_active = hwatcher_is_active, .destroy = hwatcher_destroy, }; From 3b82b216e23862065d36fa44939832327a4b8947 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 14:47:28 -0800 Subject: [PATCH 06/18] librouter: add usock_service_listen_watcher() Problem: the usock service provides no way to access the internal watcher that accepts new connections for start/stop/ref/unref. Add accessors: usock_service_listen_watcher() usock_server_listen_watcher() - needed by the above --- src/common/librouter/usock.c | 5 +++++ src/common/librouter/usock.h | 3 +++ src/common/librouter/usock_service.c | 32 ++++++++++++++++++++++++++++ src/common/librouter/usock_service.h | 3 +++ 4 files changed, 43 insertions(+) diff --git a/src/common/librouter/usock.c b/src/common/librouter/usock.c index 3ff1857c9923..5749baca8bb8 100644 --- a/src/common/librouter/usock.c +++ b/src/common/librouter/usock.c @@ -601,6 +601,11 @@ struct usock_server *usock_server_create (flux_reactor_t *r, return NULL; } +flux_watcher_t *usock_server_listen_watcher (struct usock_server *server) +{ + return server ? server->w : NULL; +} + static bool is_poll_error (int revents) { if ((revents & POLLERR) || (revents & POLLHUP) || (revents & POLLNVAL)) diff --git a/src/common/librouter/usock.h b/src/common/librouter/usock.h index 5fc0a7271e4d..4f26448ad5c7 100644 --- a/src/common/librouter/usock.h +++ b/src/common/librouter/usock.h @@ -59,6 +59,9 @@ void usock_server_set_acceptor (struct usock_server *server, usock_acceptor_f cb, void *arg); +// accessor for start/stop/ref/unref +flux_watcher_t *usock_server_listen_watcher (struct usock_server *server); + /* Server connection for one client */ diff --git a/src/common/librouter/usock_service.c b/src/common/librouter/usock_service.c index bc8d8a3e2c55..4d91d12f7fd6 100644 --- a/src/common/librouter/usock_service.c +++ b/src/common/librouter/usock_service.c @@ -20,6 +20,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" #include "src/common/libutil/log.h" #include "src/common/libutil/errno_safe.h" +#include "ccan/str/str.h" #include "usock.h" #include "usock_service.h" @@ -134,6 +135,27 @@ static void service_acceptor (struct usock_conn *uconn, void *arg) usock_conn_destroy (uconn); } +/* flux_handle_ops getopt signature */ +static int service_getopt (void *impl, + const char *option, + void *val, + size_t size) +{ + struct service *ss = impl; + if (streq (option, "flux::listen_watcher")) { + flux_watcher_t *w = usock_server_listen_watcher (ss->usock_srv); + if (size != sizeof (w)) + goto error; + memcpy (val, &w, size); + } + else + goto error; + return 0; +error: + errno = EINVAL; + return -1; +} + /* flux_handle_ops send signature */ static int service_handle_send (void *impl, const flux_msg_t *msg, int flags) { @@ -226,9 +248,19 @@ flux_t *usock_service_create (flux_reactor_t *r, return ss->h; } +flux_watcher_t *usock_service_listen_watcher (flux_t *h) +{ + flux_watcher_t *w; + + if (flux_opt_get (h, "flux::listen_watcher", &w, sizeof (w)) < 0) + return NULL; + return w; +} + static const struct flux_handle_ops service_handle_ops = { .send = service_handle_send, .impl_destroy = service_destroy, + .getopt = service_getopt, }; /* diff --git a/src/common/librouter/usock_service.h b/src/common/librouter/usock_service.h index 375992013458..5889c71db2ae 100644 --- a/src/common/librouter/usock_service.h +++ b/src/common/librouter/usock_service.h @@ -34,6 +34,9 @@ flux_t *usock_service_create (flux_reactor_t *r, const char *sockpath, bool verbose); +// accessor for star/stop/ref/unref +flux_watcher_t *usock_service_listen_watcher (flux_t *h); + #endif // _ROUTER_USOCK_SERVICE_H /* From 4633a3ddcefd9bee49fd737e60f1eab8de568935 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 15:01:40 -0800 Subject: [PATCH 07/18] libflux: add flux_get_handle_watcher() Problem: there is no way to access the internal handle watcher that is created for message dispatch. Add an accessor: flux_get_handle_watcher() --- src/common/libflux/msg_handler.c | 6 ++++++ src/common/libflux/msg_handler.h | 3 +++ 2 files changed, 9 insertions(+) diff --git a/src/common/libflux/msg_handler.c b/src/common/libflux/msg_handler.c index fb948b772a3f..544df36c965c 100644 --- a/src/common/libflux/msg_handler.c +++ b/src/common/libflux/msg_handler.c @@ -729,6 +729,12 @@ int flux_dispatch_requeue (flux_t *h) return 0; } +flux_watcher_t *flux_get_handle_watcher (flux_t *h) +{ + struct dispatch *d = dispatch_get (h); + return d ? d->w : NULL; +} + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/common/libflux/msg_handler.h b/src/common/libflux/msg_handler.h index da71f6f9b0ae..466e8cd27754 100644 --- a/src/common/libflux/msg_handler.h +++ b/src/common/libflux/msg_handler.h @@ -62,6 +62,9 @@ void flux_msg_handler_delvec (flux_msg_handler_t *msg_handlers[]); */ int flux_dispatch_requeue (flux_t *h); +// accessor for start/stop/ref/unref +flux_watcher_t *flux_get_handle_watcher (flux_t *h); + #ifdef __cplusplus } #endif From 4b602745c6d164aa2c716f2f2e824345ece732b4 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 15:37:21 -0800 Subject: [PATCH 08/18] flux-start: use flux_watcher_unref() Problem: flux start uses the reactor active use count to ensure that the reactor exits once the last client disconnects, but we now have better interfaces for that. Use flux_watcher_unref() instead. --- src/cmd/flux-start.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/cmd/flux-start.c b/src/cmd/flux-start.c index 4634bcb1c713..de5b73c2f3dc 100644 --- a/src/cmd/flux-start.c +++ b/src/cmd/flux-start.c @@ -1038,13 +1038,9 @@ void start_server_initialize (const char *rundir, bool verbose) log_err_exit ("could not created embedded flux-start server"); if (flux_msg_handler_addvec (ctx.h, htab, NULL, &ctx.handlers) < 0) log_err_exit ("could not register service methods"); - /* Service related watchers: - * - usock server listen fd - * - flux_t handle watcher (adds 2 active prep/check watchers) - */ - int ignore_watchers = 3; - while (ignore_watchers-- > 0) - flux_reactor_active_decref (ctx.reactor); + + flux_watcher_unref (flux_get_handle_watcher (ctx.h)); + flux_watcher_unref (usock_service_listen_watcher (ctx.h)); } void start_server_finalize (void) From 78a3e5a52c34f56b328fb65591379cac882279fb Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 22 Dec 2024 17:04:39 -0800 Subject: [PATCH 09/18] libflux: handle invalid watcher_create arguments Problem: some invalid arguments not detected at watcher creation. Add code to fail with EINVAL if certain arguments are not set. --- src/common/libflux/watcher.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/common/libflux/watcher.c b/src/common/libflux/watcher.c index 1cf924c9242c..d24e3dd6a804 100644 --- a/src/common/libflux/watcher.c +++ b/src/common/libflux/watcher.c @@ -32,6 +32,10 @@ flux_watcher_t *watcher_create (flux_reactor_t *r, flux_watcher_f fn, void *arg) { + if (!r || data_size == 0 || !ops) { + errno = EINVAL; + return NULL; + } struct flux_watcher *w = calloc (1, sizeof (*w) + data_size); if (!w) return NULL; From 91f4d8163c2bbeb8d81a58dd58bf135b68266d5e Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 15:49:30 -0800 Subject: [PATCH 10/18] libzmqutil: support ref/unref in zmq watcher Problem: flux_watcher_ref() and _unref() don't work on zmq watchers. Add the necessary internal callbacks for this to work. Update unit test. --- src/common/libzmqutil/test/zwatcher.c | 38 ++++++++++++++++++++++++--- src/common/libzmqutil/zwatcher.c | 22 ++++++++++++++++ 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/src/common/libzmqutil/test/zwatcher.c b/src/common/libzmqutil/test/zwatcher.c index 3de987b13c8d..43ac1e11bd32 100644 --- a/src/common/libzmqutil/test/zwatcher.c +++ b/src/common/libzmqutil/test/zwatcher.c @@ -25,6 +25,25 @@ static const size_t zmqwriter_msgcount = 1024; static void *zctx; +void watcher_is (flux_watcher_t *w, + bool exp_active, + bool exp_referenced, + const char *what) +{ + bool is_active = flux_watcher_is_active (w); + bool is_referenced = flux_watcher_is_referenced (w); + + ok (is_active == exp_active && is_referenced == exp_referenced, + "%sact%sref after %s", + exp_active ? "+" : "-", + exp_referenced ? "+" : "-", + what); + if (is_active != exp_active) + diag ("unexpectedly %sact", is_active ? "+" : "-"); + if (is_referenced != exp_referenced) + diag ("unexpectedly %sref", is_referenced ? "+" : "-"); +} + static void zmqwriter (flux_reactor_t *r, flux_watcher_t *w, int revents, @@ -98,17 +117,28 @@ static void test_zmq (flux_reactor_t *reactor) w = zmqutil_watcher_create (reactor, zs[1], FLUX_POLLOUT, zmqwriter, NULL); ok (r != NULL && w != NULL, "zmq: nonblocking reader and writer created"); - ok (flux_watcher_is_active (r) == false, - "flux_watcher_is_active() returns false on create"); + + flux_watcher_start (w); + watcher_is (w, true, true, "start"); + flux_watcher_unref (w); + watcher_is (w, true, false, "unref"); + flux_watcher_ref (w); + watcher_is (w, true, true, "ref"); + flux_watcher_stop (w); + watcher_is (w, false, true, "stop"); + flux_watcher_start (r); - ok (flux_watcher_is_active (r) == true, - "flux_watcher_is_active() returns true after start"); flux_watcher_start (w); ok (flux_reactor_run (reactor, 0) == 0, "zmq: reactor ran to completion after %d messages", zmqwriter_msgcount); flux_watcher_stop (r); ok (flux_watcher_is_active (r) == false, "flux_watcher_is_active() returns false after stop"); + ok (flux_watcher_is_referenced (w), + "flux_watcher_is_referenced() returns true"); + flux_watcher_unref (w); + ok (!flux_watcher_is_referenced (w), + "flux_watcher_is_referenced() returns false after unref"); flux_watcher_stop (w); flux_watcher_destroy (r); flux_watcher_destroy (w); diff --git a/src/common/libzmqutil/zwatcher.c b/src/common/libzmqutil/zwatcher.c index c3dd62cdb481..3da00cbb9b0e 100644 --- a/src/common/libzmqutil/zwatcher.c +++ b/src/common/libzmqutil/zwatcher.c @@ -110,6 +110,26 @@ static void zwatcher_stop (flux_watcher_t *w) flux_watcher_stop (zw->idle_w); } +static void zwatcher_ref (flux_watcher_t *w) +{ + struct zwatcher *zw = watcher_get_data (w); + + flux_watcher_ref (zw->fd_w); + flux_watcher_ref (zw->prepare_w); + flux_watcher_ref (zw->idle_w); + flux_watcher_ref (zw->check_w); +} + +static void zwatcher_unref (flux_watcher_t *w) +{ + struct zwatcher *zw = watcher_get_data (w); + + flux_watcher_unref (zw->fd_w); + flux_watcher_unref (zw->prepare_w); + flux_watcher_unref (zw->idle_w); + flux_watcher_unref (zw->check_w); +} + static bool zwatcher_is_active (flux_watcher_t *w) { struct zwatcher *zw = watcher_get_data (w); @@ -186,6 +206,8 @@ static void fd_cb (flux_reactor_t *r, static struct flux_watcher_ops zwatcher_ops = { .start = zwatcher_start, .stop = zwatcher_stop, + .ref = zwatcher_ref, + .unref = zwatcher_unref, .destroy = zwatcher_destroy, .is_active = zwatcher_is_active, }; From 91b140e62729dce447019cd845fe1d2f2c882741 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 22 Dec 2024 17:04:25 -0800 Subject: [PATCH 11/18] testsuite: cover zmq watcher invalid arguments Problem: there is no test coverage for incorrect zmqutil_watcher_create() arguments. Add a test. --- src/common/libzmqutil/test/zwatcher.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/common/libzmqutil/test/zwatcher.c b/src/common/libzmqutil/test/zwatcher.c index 43ac1e11bd32..b68b5d47f6b9 100644 --- a/src/common/libzmqutil/test/zwatcher.c +++ b/src/common/libzmqutil/test/zwatcher.c @@ -106,6 +106,7 @@ static void test_zmq (flux_reactor_t *reactor) void *zs[2]; flux_watcher_t *r, *w; const char *uri = "inproc://test_zmq"; + flux_watcher_t *tmp; zs[0] = zmq_socket (zctx, ZMQ_PAIR); zs[1] = zmq_socket (zctx, ZMQ_PAIR); @@ -113,6 +114,20 @@ static void test_zmq (flux_reactor_t *reactor) && zmq_bind (zs[0], uri) == 0 && zmq_connect (zs[1], uri) == 0, "zmq: connected ZMQ_PAIR sockets over inproc"); + + errno = 0; + ok (zmqutil_watcher_create (NULL, zs[0], FLUX_POLLIN, NULL, NULL) == NULL + && errno == EINVAL, + "zmqutil_watcher_create r=NULL fails with EINVAL"); + + tmp = flux_idle_watcher_create (reactor, NULL, NULL); + if (!tmp) + BAIL_OUT ("could not create idle watcher"); + errno = 0; + ok (zmqutil_watcher_get_zsock (tmp) == NULL && errno == EINVAL, + "zmqutil_watcher_get_zsock w=idle fails with EINVAL"); + flux_watcher_destroy (tmp); + r = zmqutil_watcher_create (reactor, zs[0], FLUX_POLLIN, zmqreader, NULL); w = zmqutil_watcher_create (reactor, zs[1], FLUX_POLLOUT, zmqwriter, NULL); ok (r != NULL && w != NULL, From 877f7ad2c3571b2fe82f153d9794d48fd212315d Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 16:48:36 -0800 Subject: [PATCH 12/18] python: convert to watcher ref/unref methods Problem: the python bindings are using the deprecated reactor active_incref/decref interfaces Add ref() and unref() to the watcher base class. Convert users to ref() and unref(). Drop the reactor incref() and decref() methods. --- src/bindings/python/flux/cli/base.py | 2 +- src/bindings/python/flux/core/handle.py | 18 ++---------------- src/bindings/python/flux/core/watchers.py | 8 ++++++++ src/bindings/python/flux/job/watcher.py | 2 +- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/bindings/python/flux/cli/base.py b/src/bindings/python/flux/cli/base.py index bab650ed43ae..5146534f521c 100644 --- a/src/bindings/python/flux/cli/base.py +++ b/src/bindings/python/flux/cli/base.py @@ -1536,7 +1536,7 @@ def progress_update(self, jobinfo=None, submit=False, submit_failed=False): # Don't let this timer watcher contribute to the reactor's # "active" reference count: - self.flux_handle.reactor_decref() + timer.unref() if submit: self.progress.update( diff --git a/src/bindings/python/flux/core/handle.py b/src/bindings/python/flux/core/handle.py index 85b30d7a69e8..e183d0b3e600 100644 --- a/src/bindings/python/flux/core/handle.py +++ b/src/bindings/python/flux/core/handle.py @@ -331,18 +331,14 @@ def reactor_interrupt(handle, *_args): reactor_interrupted = True handle.reactor_stop(reactor) - with self.signal_watcher_create(signal.SIGINT, reactor_interrupt): + with self.signal_watcher_create(signal.SIGINT, reactor_interrupt) as w: with self.in_reactor(): # This signal watcher should not take a reference on reactor # o/w the reactor may not exit as expected when all other # active watchers and msghandlers are complete. # - self.reactor_active_decref(reactor) + w.unref() rc = self.flux_reactor_run(reactor, flags) - # Re-establish signal watcher reference so reactor refcount - # doesn't underflow when signal watcher is destroyed - # - self.reactor_active_incref(reactor) if reactor_interrupted: raise KeyboardInterrupt Flux.raise_if_exception() @@ -361,16 +357,6 @@ def reactor_stop_error(self, reactor=None): reactor = self.get_reactor() self.flux_reactor_stop_error(reactor) - def reactor_incref(self, reactor=None): - if reactor is None: - reactor = self.get_reactor() - self.reactor_active_incref(reactor) - - def reactor_decref(self, reactor=None): - if reactor is None: - reactor = self.get_reactor() - self.reactor_active_decref(reactor) - def service_register(self, name): return Future(self.flux_service_register(name)) diff --git a/src/bindings/python/flux/core/watchers.py b/src/bindings/python/flux/core/watchers.py index 3df6f19e7729..c16bbd3575f3 100644 --- a/src/bindings/python/flux/core/watchers.py +++ b/src/bindings/python/flux/core/watchers.py @@ -46,6 +46,14 @@ def stop(self): raw.flux_watcher_stop(self.handle) return self + def ref(self): + raw.flux_watcher_ref(self.handle) + return self + + def unref(self): + raw.flux_watcher_unref(self.handle) + return self + def destroy(self): # Remove this watcher from its owning Flux handle # if the handle is still around. A try/except block diff --git a/src/bindings/python/flux/job/watcher.py b/src/bindings/python/flux/job/watcher.py index 09b065254b55..418cf3979efb 100644 --- a/src/bindings/python/flux/job/watcher.py +++ b/src/bindings/python/flux/job/watcher.py @@ -177,7 +177,7 @@ def start(self): # Don't let this timer watcher contribute to the reactor's # "active" reference count: # - self.timer.flux_handle.reactor_decref() + self.timer.unref() super().start() # Override superclass `_t0` attribute to elapsed time is computed # from this value and not the time of super().start(): From 177932af747406c899392ecc4ea5b5bd44a50d52 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 16:58:07 -0800 Subject: [PATCH 13/18] libflux: drop flux_reactor_active_incref/decref() Problem: flux_reactor_active_incref/decref() is an inferior interface compared to flux_watcher_ref/unref(). Drop these reactor methods. Drop unit test. Fixes #6510 --- src/common/libflux/reactor.c | 12 ---------- src/common/libflux/reactor.h | 7 ------ src/common/libflux/test/reactor.c | 38 ------------------------------- 3 files changed, 57 deletions(-) diff --git a/src/common/libflux/reactor.c b/src/common/libflux/reactor.c index 5dc3dbd195e9..a587c5c59394 100644 --- a/src/common/libflux/reactor.c +++ b/src/common/libflux/reactor.c @@ -126,18 +126,6 @@ void flux_reactor_stop_error (flux_reactor_t *r) ev_break (r->loop, EVBREAK_ALL); } -void flux_reactor_active_incref (flux_reactor_t *r) -{ - if (r) - ev_ref (r->loop); -} - -void flux_reactor_active_decref (flux_reactor_t *r) -{ - if (r) - ev_unref (r->loop); -} - void *reactor_get_loop (flux_reactor_t *r) { return r ? r->loop : NULL; diff --git a/src/common/libflux/reactor.h b/src/common/libflux/reactor.h index bce296edd74e..609482390f1c 100644 --- a/src/common/libflux/reactor.h +++ b/src/common/libflux/reactor.h @@ -48,13 +48,6 @@ double flux_reactor_now (flux_reactor_t *r); void flux_reactor_now_update (flux_reactor_t *r); double flux_reactor_time (void); -/* Change reactor reference count. - * Each active watcher holds a reference. - * When the reference count reaches zero, the reactor loop exits. - */ -void flux_reactor_active_incref (flux_reactor_t *r); -void flux_reactor_active_decref (flux_reactor_t *r); - #ifdef __cplusplus } #endif diff --git a/src/common/libflux/test/reactor.c b/src/common/libflux/test/reactor.c index acfdf0457a87..ebee9874493a 100644 --- a/src/common/libflux/test/reactor.c +++ b/src/common/libflux/test/reactor.c @@ -672,43 +672,6 @@ static void active_idle_cb (flux_reactor_t *r, flux_reactor_stop_error (r); } - -static void test_active_ref (flux_reactor_t *r) -{ - flux_watcher_t *w; - int count; - - ok (flux_reactor_run (r, 0) == 0, - "flux_reactor_run with no watchers returned immediately"); - - if (!(w = flux_idle_watcher_create (r, active_idle_cb, &count))) - BAIL_OUT ("flux_idle_watcher_create failed"); - - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); - flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); - - count = 0; - ok (flux_reactor_run (r, 0) < 0 && count == 16, - "flux_reactor_run with one watcher stopped after 16 iterations"); - - flux_reactor_active_decref (r); - - count = 0; - ok (flux_reactor_run (r, 0) == 0 && count == 1, - "flux_reactor_run with one watcher+decref returned after 1 iteration"); - - flux_reactor_active_incref (r); - - count = 0; - ok (flux_reactor_run (r, 0) < 0 && count == 16, - "flux_reactor_run with one watcher+incref stopped after 16 iterations"); - - flux_watcher_destroy (w); -} - static void reactor_destroy_early (void) { flux_reactor_t *r; @@ -832,7 +795,6 @@ int main (int argc, char *argv[]) test_signal (reactor); test_child (reactor); test_stat (reactor); - test_active_ref (reactor); test_reactor_flags (reactor); test_priority (reactor); From f7c749a552d1dea3b5ad007fc94d4e145dcd0aa0 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 22 Dec 2024 08:18:46 -0800 Subject: [PATCH 14/18] testsuite: cover watcher ref/unref/is_ref Problem: there is no test coverage for the new flux_watcher_ref(), flux_watcher_unref(), and flux_watcher_is_referenced() interfaces. Create a helper that tests both ref/unref/is_referenced and start/stop/is_active. Replace the repeated is_active code in each watcher test with a call to the helper --- src/common/libflux/test/reactor.c | 184 +++++++++++++++++++++++++----- 1 file changed, 153 insertions(+), 31 deletions(-) diff --git a/src/common/libflux/test/reactor.c b/src/common/libflux/test/reactor.c index ebee9874493a..d8f964957b59 100644 --- a/src/common/libflux/test/reactor.c +++ b/src/common/libflux/test/reactor.c @@ -25,6 +25,58 @@ #include "src/common/libtap/tap.h" #include "ccan/array_size/array_size.h" +void watcher_is (flux_watcher_t *w, + bool exp_active, + bool exp_referenced, + const char *name, + const char *what) +{ + bool is_active = flux_watcher_is_active (w); + bool is_referenced = flux_watcher_is_referenced (w); + + ok (is_active == exp_active && is_referenced == exp_referenced, + "%s %sact%sref after %s", + name, + exp_active ? "+" : "-", + exp_referenced ? "+" : "-", + what); + if (is_active != exp_active) + diag ("%s is unexpectedly %sact", name, is_active ? "+" : "-"); + if (is_referenced != exp_referenced) + diag ("%s is unexpectedly %sref", name, is_referenced ? "+" : "-"); +} + +/* Call this on newly created watcher to check start/stop/is_active and + * ref/unref/is_referenced basics. + */ +void generic_watcher_check (flux_watcher_t *w, const char *name) +{ + /* ref/unref while inactive causes ev_ref/ev_unref to run + * in the start/stop callbacks + */ + watcher_is (w, false, true, name, "init"); + flux_watcher_unref (w); + watcher_is (w, false, false, name, "unref"); + flux_watcher_start (w); + watcher_is (w, true, false, name, "start"); + flux_watcher_stop (w); + watcher_is (w, false, false, name, "stop"); + flux_watcher_ref (w); + watcher_is (w, false, true, name, "ref"); + + /* ref/unref while active causes ev_ref/ev_unref to run + * in the ref/unref callbacks + */ + flux_watcher_start (w); + watcher_is (w, true, true, name, "start"); + flux_watcher_unref (w); + watcher_is (w, true, false, name, "unref"); + flux_watcher_ref (w); + watcher_is (w, true, true, name, "ref"); + flux_watcher_stop (w); + watcher_is (w, false, true, name, "stop"); +} + static const size_t fdwriter_bufsize = 10*1024*1024; static void fdwriter (flux_reactor_t *r, @@ -117,11 +169,8 @@ static void test_fd (flux_reactor_t *reactor) w = flux_fd_watcher_create (reactor, fd[1], FLUX_POLLOUT, fdwriter, NULL); ok (r != NULL && w != NULL, "fd: reader and writer created"); - ok (!flux_watcher_is_active (r), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "fd"); flux_watcher_start (r); - ok (flux_watcher_is_active (r), - "flux_watcher_is_active() returns true after flux_watcher_start()"); flux_watcher_start (w); ok (flux_reactor_run (reactor, 0) == 0, "fd: reactor ran to completion after %lu bytes", fdwriter_bufsize); @@ -177,11 +226,8 @@ static void test_timer (flux_reactor_t *reactor) "timer: creating negative repeat fails with EINVAL"); ok ((w = flux_timer_watcher_create (reactor, 0, 0, oneshot, NULL)) != NULL, "timer: creating zero timeout oneshot works"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "timer"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); oneshot_runs = 0; t0 = flux_reactor_now (reactor); ok (flux_reactor_run (reactor, 0) == 0, @@ -292,11 +338,9 @@ static void test_periodic (flux_reactor_t *reactor) ok ((w = flux_periodic_watcher_create (reactor, 0, 0, NULL, oneshot, NULL)) != NULL, "periodic: creating zero offset/interval works"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "periodic"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); + oneshot_runs = 0; ok (flux_reactor_run (reactor, 0) == 0, "periodic: reactor ran to completion"); @@ -380,11 +424,8 @@ static void test_idle (flux_reactor_t *reactor) w = flux_idle_watcher_create (reactor, idle_cb, NULL); ok (w != NULL, "created idle watcher"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "idle"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); ok (flux_reactor_run (reactor, 0) == 0, "reactor ran successfully"); @@ -437,17 +478,17 @@ static void test_prepcheck (flux_reactor_t *reactor) ok (!flux_watcher_is_active (w), "flux_watcher_is_active() returns false"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); prep = flux_prepare_watcher_create (reactor, prepare_cb, NULL); ok (w != NULL, "created prepare watcher"); + generic_watcher_check (prep, "prep"); flux_watcher_start (prep); chk = flux_check_watcher_create (reactor, check_cb, NULL); ok (w != NULL, "created check watcher"); + generic_watcher_check (chk, "check"); flux_watcher_start (chk); ok (flux_reactor_run (reactor, 0) >= 0, @@ -495,11 +536,8 @@ static void test_signal (flux_reactor_t *reactor) w = flux_signal_watcher_create (reactor, SIGUSR1, sigusr1_cb, NULL); ok (w != NULL, "created signal watcher"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "signal"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); idle = flux_idle_watcher_create (reactor, sigidle_cb, NULL); ok (idle != NULL, @@ -549,14 +587,11 @@ static void test_child (flux_reactor_t *reactor) w = flux_child_watcher_create (r, child_pid, false, child_cb, NULL); ok (w != NULL, "created child watcher"); + generic_watcher_check (w, "signal"); ok (kill (child_pid, SIGHUP) == 0, "sent child SIGHUP"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); ok (flux_reactor_run (r, 0) == 0, "reactor ran successfully"); @@ -626,11 +661,8 @@ static void test_stat (flux_reactor_t *reactor) w = flux_stat_watcher_create (reactor, ctx.path, 0., stat_cb, &ctx); ok (w != NULL, "created stat watcher"); - ok (!flux_watcher_is_active (w), - "flux_watcher_is_active() returns false"); + generic_watcher_check (w, "stat"); flux_watcher_start (w); - ok (flux_watcher_is_active (w), - "flux_watcher_is_active() returns true after flux_watcher_start()"); tw = flux_timer_watcher_create (reactor, 0.01, @@ -660,7 +692,7 @@ static void test_stat (flux_reactor_t *reactor) free (ctx.path); } -static void active_idle_cb (flux_reactor_t *r, +static void unref_idle_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) @@ -672,6 +704,95 @@ static void active_idle_cb (flux_reactor_t *r, flux_reactor_stop_error (r); } +static void unref_idle2_cb (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + int *count = arg; + (*count)++; + + if (flux_watcher_is_referenced (w)) { + diag ("calling flux_watcher_unref on count=%d", *count); + flux_watcher_unref (w); // calls ev_unref() + } + else { + diag ("calling flux_watcher_ref on count=%d", *count); + flux_watcher_ref (w); // calls ev_ref() + } +} + +static void test_unref (flux_reactor_t *r) +{ + flux_watcher_t *w; + flux_watcher_t *w2; + int count; + + ok (flux_reactor_run (r, 0) == 0, + "flux_reactor_run with no watchers returned immediately"); + + if (!(w = flux_idle_watcher_create (r, unref_idle_cb, &count))) + BAIL_OUT ("flux_idle_watcher_create failed"); + if (!(w2 = flux_idle_watcher_create (r, unref_idle2_cb, &count))) + BAIL_OUT ("flux_idle_watcher_create failed"); + + /* Tests with unref_idle_cb() + * Show that ref/unref as expected with watcher inactive. + */ + + flux_watcher_start (w); + count = 0; + ok (flux_reactor_run (r, 0) < 0 && count == 16, + "flux_reactor_run with one watcher stopped after 16 iterations"); + flux_watcher_stop (w); + ok (flux_watcher_is_referenced (w), + "flux_watcher_is_referenced returns true after stop"); + + flux_watcher_unref (w); + flux_watcher_start (w); // calls ev_unref() + ok (!flux_watcher_is_referenced (w), + "flux_watcher_is_referenced returns false after unref/start"); + count = 0; + ok (flux_reactor_run (r, 0) == 0 && count == 1, + "flux_reactor_run with one unref watcher returned after 1 iteration"); + flux_watcher_stop (w); // calls ev_ref() + + flux_watcher_ref (w); + flux_watcher_start (w); + ok (flux_watcher_is_referenced (w), + "flux_watcher_is_referenced returns true after ref/start"); + count = 0; + ok (flux_reactor_run (r, 0) < 0 && count == 16, + "flux_reactor_run with one ref watcher stopped after 16 iterations"); + flux_watcher_stop (w); + ok (flux_watcher_is_referenced (w), + "flux_watcher_is_referenced returns true after reactor run"); + + flux_watcher_destroy (w); + + /* Tests with unref_idle2_cb() + * Show that ref/unref works as expected from watcher callback + */ + + flux_watcher_start (w2); + + ok (flux_watcher_is_referenced (w2), + "flux_watcher_is_referenced returns true"); + count = 0; + ok (flux_reactor_run (r, 0) == 0 && count == 1, + "flux_reactor_run with one ref watcher returned after 1 iteration"); + ok (!flux_watcher_is_referenced (w2), + "flux_watcher_is_referenced returns false after reactor run"); + + count = 0; + ok (flux_reactor_run (r, 0) == 0 && count == 2, + "flux_reactor_run with one unref watcher returned after 2 iterations"); + ok (!flux_watcher_is_referenced (w2), + "flux_watcher_is_referenced returns false"); + + flux_watcher_destroy (w2); +} + static void reactor_destroy_early (void) { flux_reactor_t *r; @@ -795,6 +916,7 @@ int main (int argc, char *argv[]) test_signal (reactor); test_child (reactor); test_stat (reactor); + test_unref (reactor); test_reactor_flags (reactor); test_priority (reactor); From 451ea7c57c95d5a1ab7e3b05164acea0241d2f15 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 22 Dec 2024 08:20:28 -0800 Subject: [PATCH 15/18] testsuite: cover handle watcher ref/unref/is_ref Problem: handle watcher references have no test coverage. The handle watcher doesn't have a standalone unit test, so add one to the reactor unit test, including simple coverage for watcher references. --- src/common/libflux/test/reactor.c | 40 +++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/common/libflux/test/reactor.c b/src/common/libflux/test/reactor.c index d8f964957b59..f0b596f3c91f 100644 --- a/src/common/libflux/test/reactor.c +++ b/src/common/libflux/test/reactor.c @@ -692,6 +692,45 @@ static void test_stat (flux_reactor_t *reactor) free (ctx.path); } +static int handle_counter = 0; +static void handle_cb (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + handle_counter++; + flux_watcher_unref (w); +} + +void test_handle (flux_reactor_t *r) +{ + flux_t *h; + flux_watcher_t *w; + + if (!(h = flux_open ("loop://", 0))) + BAIL_OUT ("could not create loop handle"); + w = flux_handle_watcher_create (r, h, FLUX_POLLIN, handle_cb, NULL); + ok (w != NULL, + "flux_handle_watcher_create works"); + generic_watcher_check (w, "handle"); + flux_watcher_start (w); + + flux_msg_t *msg; + if (!(msg = flux_request_encode ("foo", "bar"))) + BAIL_OUT ("could not encode message"); + if (flux_send (h, msg, 0) < 0) + BAIL_OUT ("could not send message"); + flux_msg_destroy (msg); + + ok (flux_reactor_run (r, 0) == 0, + "flux_reactor_run ran"); + ok (handle_counter == 1, + "watcher ran once"); + + flux_watcher_destroy (w); + flux_close (h); +} + static void unref_idle_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, @@ -916,6 +955,7 @@ int main (int argc, char *argv[]) test_signal (reactor); test_child (reactor); test_stat (reactor); + test_handle (reactor); test_unref (reactor); test_reactor_flags (reactor); test_priority (reactor); From 2fbd742ac5db64e65749366c70aa27fd974aa379 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 17:51:25 -0800 Subject: [PATCH 16/18] flux_reactor_create(3): drop active_incref/decref Problem: flux_reactor_active_incref() and flux_reactor_active_decref() appear in man pages but are no longer available. Remove them. --- doc/Makefile.am | 2 -- doc/man3/flux_reactor_create.rst | 14 -------------- doc/manpages.py | 2 -- 3 files changed, 18 deletions(-) diff --git a/doc/Makefile.am b/doc/Makefile.am index 0721f275b6ef..cf7d6701b303 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -204,8 +204,6 @@ MAN3_FILES_SECONDARY = \ man3/flux_reactor_run.3 \ man3/flux_reactor_stop.3 \ man3/flux_reactor_stop_error.3 \ - man3/flux_reactor_active_incref.3 \ - man3/flux_reactor_active_decref.3 \ man3/flux_fd_watcher_get_fd.3 \ man3/flux_watcher_stop.3 \ man3/flux_watcher_is_active.3 \ diff --git a/doc/man3/flux_reactor_create.rst b/doc/man3/flux_reactor_create.rst index 5ab2aafcf63b..84b3d82fa57f 100644 --- a/doc/man3/flux_reactor_create.rst +++ b/doc/man3/flux_reactor_create.rst @@ -21,10 +21,6 @@ SYNOPSIS void flux_reactor_stop_error (flux_reactor_t *r); - void flux_reactor_active_incref (flux_reactor_t *r); - - void flux_reactor_active_decref (flux_reactor_t *r); - Link with :command:`-lflux-core`. DESCRIPTION @@ -81,16 +77,6 @@ The caller should ensure that a valid error code has been assigned to :func:`flux_reactor_create` time. Freeing of the underlying resources will be deferred if there are any remaining watchers associated with the reactor. -:func:`flux_reactor_active_decref` and :func:`flux_reactor_active_incref` -manipulate the reactor's internal count of active watchers. Each active -watcher takes a reference count on the reactor, and the reactor returns -when this count reaches zero. It is useful sometimes to have a watcher that -can remain active without preventing the reactor from exiting. To achieve this, -call :func:`flux_reactor_active_decref` after the watcher is started, and -:func:`flux_reactor_active_incref` before the watcher is stopped. -Remember that destroying an active reactor internally stops it, -so be sure to stop/incref such a watcher first. - RETURN VALUE ============ diff --git a/doc/manpages.py b/doc/manpages.py index b5f784fb7c19..e4c9a8a79dea 100644 --- a/doc/manpages.py +++ b/doc/manpages.py @@ -202,8 +202,6 @@ ('man3/flux_reactor_create', 'flux_reactor_run', 'create/destroy/control event reactor object', [author], 3), ('man3/flux_reactor_create', 'flux_reactor_stop', 'create/destroy/control event reactor object', [author], 3), ('man3/flux_reactor_create', 'flux_reactor_stop_error', 'create/destroy/control event reactor object', [author], 3), - ('man3/flux_reactor_create', 'flux_reactor_active_incref', 'create/destroy/control event reactor object', [author], 3), - ('man3/flux_reactor_create', 'flux_reactor_active_decref', 'create/destroy/control event reactor object', [author], 3), ('man3/flux_reactor_create', 'flux_reactor_create', 'create/destroy/control event reactor object', [author], 3), ('man3/flux_reactor_now', 'flux_reactor_now_update', 'get/update reactor time', [author], 3), ('man3/flux_reactor_now', 'flux_reactor_now', 'get/update reactor time', [author], 3), From a12af4ea7538545aec3b208996708187f1dd25ea Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 19 Dec 2024 18:02:55 -0800 Subject: [PATCH 17/18] flux_watcher_start(3): add ref/unref/is_ref Problem: flux_watcher_ref(), flux_watcher_unref(), and flux_watcher_is_referenced() are undocumented. Update man pages. --- doc/Makefile.am | 3 +++ doc/man3/flux_watcher_start.rst | 21 +++++++++++++++++++++ doc/manpages.py | 13 ++++++++----- doc/test/spell.en.pws | 1 + 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/doc/Makefile.am b/doc/Makefile.am index cf7d6701b303..50f42bfa0db4 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -208,6 +208,9 @@ MAN3_FILES_SECONDARY = \ man3/flux_watcher_stop.3 \ man3/flux_watcher_is_active.3 \ man3/flux_watcher_destroy.3 \ + man3/flux_watcher_ref.3 \ + man3/flux_watcher_unref.3 \ + man3/flux_watcher_is_referenced.3 \ man3/flux_watcher_next_wakeup.3 \ man3/flux_handle_watcher_get_flux.3 \ man3/flux_timer_watcher_reset.3 \ diff --git a/doc/man3/flux_watcher_start.rst b/doc/man3/flux_watcher_start.rst index a902021e3625..dee40946042d 100644 --- a/doc/man3/flux_watcher_start.rst +++ b/doc/man3/flux_watcher_start.rst @@ -15,6 +15,12 @@ SYNOPSIS bool flux_watcher_is_active (flux_watcher_t *w); + void flux_watcher_unref (flux_watcher_t *w); + + void flux_watcher_ref (flux_watcher_t *w); + + bool flux_watcher_is_referenced (flux_watcher_t *w); + void flux_watcher_destroy (flux_watcher_t *w); double flux_watcher_next_wakeup (flux_watcher_t *w); @@ -36,6 +42,21 @@ callback. :func:`flux_watcher_is_active` returns a true value if the watcher is active (i.e. it has been started and not yet stopped) and false otherwise. +:func:`flux_watcher_unref` drops the watcher's reference on the reactor. +This function is idempotent. :func:`flux_reactor_run` normally runs until +there are no more active watchers with references. + +:func:`flux_watcher_ref` restores the watcher's reference on the reactor. +This function is idempotent. + +:func:`flux_watcher_is_referenced` returns true if the watcher is referenced. +All watchers are referenced unless updated with :func:`flux_watcher_unref`. + +.. note:: + All watchers in the public API support :func:`flux_watcher_unref`, but some + specialized watchers internal to flux-core do not. If in doubt about whether + the call had any effect, check with :func:`flux_watcher_is_referenced`. + :func:`flux_watcher_destroy` destroys a :type:`flux_watcher_t` object :var:`w`, after stopping it. It is not safe to destroy a watcher object within a :type:`flux_watcher_f` callback. diff --git a/doc/manpages.py b/doc/manpages.py index e4c9a8a79dea..afabfd784b88 100644 --- a/doc/manpages.py +++ b/doc/manpages.py @@ -280,11 +280,14 @@ ('man3/flux_stat_watcher_create', 'flux_stat_watcher_create', 'create stat watcher', [author], 3), ('man3/flux_timer_watcher_create', 'flux_timer_watcher_reset', 'set/reset a timer', [author], 3), ('man3/flux_timer_watcher_create', 'flux_timer_watcher_create', 'set/reset a timer', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_stop', 'start/stop/destroy/query reactor watcher', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_is_active', 'start/stop/destroy/query reactor watcher', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_destroy', 'start/stop/destroy/query reactor watcher', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_next_wakeup', 'start/stop/destroy/query reactor watcher', [author], 3), - ('man3/flux_watcher_start', 'flux_watcher_start', 'start/stop/destroy/query reactor watcher', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_stop', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_is_active', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_destroy', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_next_wakeup', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_start', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_ref', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_unref', 'common reactor watcher methods', [author], 3), + ('man3/flux_watcher_start', 'flux_watcher_is_referenced', 'common reactor watcher methods', [author], 3), ('man3/flux_watcher_set_priority', 'flux_watcher_set_priority', 'set watcher priority', [author], 3), ('man3/hostlist_create', 'hostlist_create', 'Manipulate lists of hostnames', [author], 3), ('man3/hostlist_create', 'hostlist_destroy', 'Manipulate lists of hostnames', [author], 3), diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index a6163e2b6129..591c073ccbfb 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -939,3 +939,4 @@ aTGTz cPATH SATTR myprogram +unref From 2821d37a2a2c9230a2a73fb99c84f8e46279ff87 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Sun, 22 Dec 2024 07:25:12 -0800 Subject: [PATCH 18/18] flux_msg_handler_create(3): add get_handle_watcher Problem: flux_get_handle_watcher() is not documented. Add it to the message handler man page. --- doc/Makefile.am | 1 + doc/man3/flux_msg_handler_create.rst | 5 ++++- doc/manpages.py | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/doc/Makefile.am b/doc/Makefile.am index 50f42bfa0db4..67718ef14371 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -220,6 +220,7 @@ MAN3_FILES_SECONDARY = \ man3/flux_msg_handler_destroy.3 \ man3/flux_msg_handler_start.3 \ man3/flux_msg_handler_stop.3 \ + man3/flux_get_handle_watcher.3 \ man3/flux_msg_handler_delvec.3 \ man3/flux_child_watcher_get_rpid.3 \ man3/flux_child_watcher_get_rstatus.3 \ diff --git a/doc/man3/flux_msg_handler_create.rst b/doc/man3/flux_msg_handler_create.rst index c32e00897d8c..3853796d1d94 100644 --- a/doc/man3/flux_msg_handler_create.rst +++ b/doc/man3/flux_msg_handler_create.rst @@ -28,6 +28,8 @@ SYNOPSIS void flux_msg_handler_stop (flux_msg_handler_t *mh); + flux_watcher_t *flux_get_handle_watcher(flux_t *h) + Link with :command:`-lflux-core`. DESCRIPTION @@ -46,7 +48,8 @@ The handle :var:`h` is monitored for FLUX_POLLIN events on the :type:`flux_reactor_t` associated with the handle as described in :man3:`flux_set_reactor`. This internal "handle watcher" is started when the first message handler is started, and stopped when the last message handler -is stopped. +is stopped. The handle watcher may be directly accessed with +:func:`flux_get_handle_watcher`. Messages arriving on :var:`h` are internally read and dispatched to matching message handlers. If multiple handlers match the message, the following diff --git a/doc/manpages.py b/doc/manpages.py index afabfd784b88..98de50741ceb 100644 --- a/doc/manpages.py +++ b/doc/manpages.py @@ -189,6 +189,7 @@ ('man3/flux_msg_handler_create', 'flux_msg_handler_start', 'manage message handlers', [author], 3), ('man3/flux_msg_handler_create', 'flux_msg_handler_stop', 'manage message handlers', [author], 3), ('man3/flux_msg_handler_create', 'flux_msg_handler_create', 'manage message handlers', [author], 3), + ('man3/flux_msg_handler_create', 'flux_get_handle_watcher', 'manage message handlers', [author], 3), ('man3/flux_open', 'flux_clone', 'open/close connection to Flux Message Broker', [author], 3), ('man3/flux_open', 'flux_close', 'open/close connection to Flux Message Broker', [author], 3), ('man3/flux_open', 'flux_open', 'open/close connection to Flux Message Broker', [author], 3),