Skip to content

Commit

Permalink
Merge pull request #6534 from garlick/watcher_ref
Browse files Browse the repository at this point in the history
reactor: improve reactor referencing API
  • Loading branch information
mergify[bot] authored Dec 23, 2024
2 parents d58f245 + 2821d37 commit 59cbf5e
Show file tree
Hide file tree
Showing 28 changed files with 861 additions and 344 deletions.
6 changes: 4 additions & 2 deletions doc/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,13 @@ MAN3_FILES_SECONDARY = \
man3/flux_reactor_run.3 \
man3/flux_reactor_stop.3 \
man3/flux_reactor_stop_error.3 \
man3/flux_reactor_active_incref.3 \
man3/flux_reactor_active_decref.3 \
man3/flux_fd_watcher_get_fd.3 \
man3/flux_watcher_stop.3 \
man3/flux_watcher_is_active.3 \
man3/flux_watcher_destroy.3 \
man3/flux_watcher_ref.3 \
man3/flux_watcher_unref.3 \
man3/flux_watcher_is_referenced.3 \
man3/flux_watcher_next_wakeup.3 \
man3/flux_handle_watcher_get_flux.3 \
man3/flux_timer_watcher_reset.3 \
Expand All @@ -219,6 +220,7 @@ MAN3_FILES_SECONDARY = \
man3/flux_msg_handler_destroy.3 \
man3/flux_msg_handler_start.3 \
man3/flux_msg_handler_stop.3 \
man3/flux_get_handle_watcher.3 \
man3/flux_msg_handler_delvec.3 \
man3/flux_child_watcher_get_rpid.3 \
man3/flux_child_watcher_get_rstatus.3 \
Expand Down
5 changes: 4 additions & 1 deletion doc/man3/flux_msg_handler_create.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ SYNOPSIS
void flux_msg_handler_stop (flux_msg_handler_t *mh);
flux_watcher_t *flux_get_handle_watcher(flux_t *h)
Link with :command:`-lflux-core`.

DESCRIPTION
Expand All @@ -46,7 +48,8 @@ The handle :var:`h` is monitored for FLUX_POLLIN events on the
:type:`flux_reactor_t` associated with the handle as described in
:man3:`flux_set_reactor`. This internal "handle watcher" is started when the
first message handler is started, and stopped when the last message handler
is stopped.
is stopped. The handle watcher may be directly accessed with
:func:`flux_get_handle_watcher`.

Messages arriving on :var:`h` are internally read and dispatched to matching
message handlers. If multiple handlers match the message, the following
Expand Down
14 changes: 0 additions & 14 deletions doc/man3/flux_reactor_create.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ SYNOPSIS
void flux_reactor_stop_error (flux_reactor_t *r);
void flux_reactor_active_incref (flux_reactor_t *r);
void flux_reactor_active_decref (flux_reactor_t *r);
Link with :command:`-lflux-core`.

DESCRIPTION
Expand Down Expand Up @@ -81,16 +77,6 @@ The caller should ensure that a valid error code has been assigned to
:func:`flux_reactor_create` time. Freeing of the underlying resources will
be deferred if there are any remaining watchers associated with the reactor.

:func:`flux_reactor_active_decref` and :func:`flux_reactor_active_incref`
manipulate the reactor's internal count of active watchers. Each active
watcher takes a reference count on the reactor, and the reactor returns
when this count reaches zero. It is useful sometimes to have a watcher that
can remain active without preventing the reactor from exiting. To achieve this,
call :func:`flux_reactor_active_decref` after the watcher is started, and
:func:`flux_reactor_active_incref` before the watcher is stopped.
Remember that destroying an active reactor internally stops it,
so be sure to stop/incref such a watcher first.


RETURN VALUE
============
Expand Down
12 changes: 9 additions & 3 deletions doc/man3/flux_stat_watcher_create.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ SYNOPSIS
flux_watcher_f callback,
void *arg);
void flux_stat_watcher_get_rstat (flux_watcher_t *w,
struct stat *stat,
struct stat *prev);
int flux_stat_watcher_get_rstat (flux_watcher_t *w,
struct stat *stat,
struct stat *prev);
Link with :command:`-lflux-core`.

Expand Down Expand Up @@ -55,13 +55,19 @@ RETURN VALUE
:func:`flux_stat_watcher_create` returns a :type:`flux_watcher_t` object
on success. On error, NULL is returned, and :var:`errno` is set appropriately.

:func:`flux_stat_watcher_get_rstat` returns 0 on success. On error, -1 is
returned and :var:`errno` is set appropriately.


ERRORS
======

ENOMEM
Out of memory.

EINVAL
Invalid argument.


RESOURCES
=========
Expand Down
21 changes: 21 additions & 0 deletions doc/man3/flux_watcher_start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ SYNOPSIS
bool flux_watcher_is_active (flux_watcher_t *w);
void flux_watcher_unref (flux_watcher_t *w);
void flux_watcher_ref (flux_watcher_t *w);
bool flux_watcher_is_referenced (flux_watcher_t *w);
void flux_watcher_destroy (flux_watcher_t *w);
double flux_watcher_next_wakeup (flux_watcher_t *w);
Expand All @@ -36,6 +42,21 @@ callback.
:func:`flux_watcher_is_active` returns a true value if the watcher is active
(i.e. it has been started and not yet stopped) and false otherwise.

:func:`flux_watcher_unref` drops the watcher's reference on the reactor.
This function is idempotent. :func:`flux_reactor_run` normally runs until
there are no more active watchers with references.

:func:`flux_watcher_ref` restores the watcher's reference on the reactor.
This function is idempotent.

:func:`flux_watcher_is_referenced` returns true if the watcher is referenced.
All watchers are referenced unless updated with :func:`flux_watcher_unref`.

.. note::
All watchers in the public API support :func:`flux_watcher_unref`, but some
specialized watchers internal to flux-core do not. If in doubt about whether
the call had any effect, check with :func:`flux_watcher_is_referenced`.

:func:`flux_watcher_destroy` destroys a :type:`flux_watcher_t` object :var:`w`,
after stopping it. It is not safe to destroy a watcher object within a
:type:`flux_watcher_f` callback.
Expand Down
16 changes: 9 additions & 7 deletions doc/manpages.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
('man3/flux_msg_handler_create', 'flux_msg_handler_start', 'manage message handlers', [author], 3),
('man3/flux_msg_handler_create', 'flux_msg_handler_stop', 'manage message handlers', [author], 3),
('man3/flux_msg_handler_create', 'flux_msg_handler_create', 'manage message handlers', [author], 3),
('man3/flux_msg_handler_create', 'flux_get_handle_watcher', 'manage message handlers', [author], 3),
('man3/flux_open', 'flux_clone', 'open/close connection to Flux Message Broker', [author], 3),
('man3/flux_open', 'flux_close', 'open/close connection to Flux Message Broker', [author], 3),
('man3/flux_open', 'flux_open', 'open/close connection to Flux Message Broker', [author], 3),
Expand All @@ -202,8 +203,6 @@
('man3/flux_reactor_create', 'flux_reactor_run', 'create/destroy/control event reactor object', [author], 3),
('man3/flux_reactor_create', 'flux_reactor_stop', 'create/destroy/control event reactor object', [author], 3),
('man3/flux_reactor_create', 'flux_reactor_stop_error', 'create/destroy/control event reactor object', [author], 3),
('man3/flux_reactor_create', 'flux_reactor_active_incref', 'create/destroy/control event reactor object', [author], 3),
('man3/flux_reactor_create', 'flux_reactor_active_decref', 'create/destroy/control event reactor object', [author], 3),
('man3/flux_reactor_create', 'flux_reactor_create', 'create/destroy/control event reactor object', [author], 3),
('man3/flux_reactor_now', 'flux_reactor_now_update', 'get/update reactor time', [author], 3),
('man3/flux_reactor_now', 'flux_reactor_now', 'get/update reactor time', [author], 3),
Expand Down Expand Up @@ -282,11 +281,14 @@
('man3/flux_stat_watcher_create', 'flux_stat_watcher_create', 'create stat watcher', [author], 3),
('man3/flux_timer_watcher_create', 'flux_timer_watcher_reset', 'set/reset a timer', [author], 3),
('man3/flux_timer_watcher_create', 'flux_timer_watcher_create', 'set/reset a timer', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_stop', 'start/stop/destroy/query reactor watcher', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_is_active', 'start/stop/destroy/query reactor watcher', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_destroy', 'start/stop/destroy/query reactor watcher', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_next_wakeup', 'start/stop/destroy/query reactor watcher', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_start', 'start/stop/destroy/query reactor watcher', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_stop', 'common reactor watcher methods', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_is_active', 'common reactor watcher methods', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_destroy', 'common reactor watcher methods', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_next_wakeup', 'common reactor watcher methods', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_start', 'common reactor watcher methods', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_ref', 'common reactor watcher methods', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_unref', 'common reactor watcher methods', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_is_referenced', 'common reactor watcher methods', [author], 3),
('man3/flux_watcher_set_priority', 'flux_watcher_set_priority', 'set watcher priority', [author], 3),
('man3/hostlist_create', 'hostlist_create', 'Manipulate lists of hostnames', [author], 3),
('man3/hostlist_create', 'hostlist_destroy', 'Manipulate lists of hostnames', [author], 3),
Expand Down
1 change: 1 addition & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -939,3 +939,4 @@ aTGTz
cPATH
SATTR
myprogram
unref
2 changes: 1 addition & 1 deletion src/bindings/python/flux/cli/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ def progress_update(self, jobinfo=None, submit=False, submit_failed=False):

# Don't let this timer watcher contribute to the reactor's
# "active" reference count:
self.flux_handle.reactor_decref()
timer.unref()

if submit:
self.progress.update(
Expand Down
18 changes: 2 additions & 16 deletions src/bindings/python/flux/core/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,14 @@ def reactor_interrupt(handle, *_args):
reactor_interrupted = True
handle.reactor_stop(reactor)

with self.signal_watcher_create(signal.SIGINT, reactor_interrupt):
with self.signal_watcher_create(signal.SIGINT, reactor_interrupt) as w:
with self.in_reactor():
# This signal watcher should not take a reference on reactor
# o/w the reactor may not exit as expected when all other
# active watchers and msghandlers are complete.
#
self.reactor_active_decref(reactor)
w.unref()
rc = self.flux_reactor_run(reactor, flags)
# Re-establish signal watcher reference so reactor refcount
# doesn't underflow when signal watcher is destroyed
#
self.reactor_active_incref(reactor)
if reactor_interrupted:
raise KeyboardInterrupt
Flux.raise_if_exception()
Expand All @@ -361,16 +357,6 @@ def reactor_stop_error(self, reactor=None):
reactor = self.get_reactor()
self.flux_reactor_stop_error(reactor)

def reactor_incref(self, reactor=None):
if reactor is None:
reactor = self.get_reactor()
self.reactor_active_incref(reactor)

def reactor_decref(self, reactor=None):
if reactor is None:
reactor = self.get_reactor()
self.reactor_active_decref(reactor)

def service_register(self, name):
return Future(self.flux_service_register(name))

Expand Down
8 changes: 8 additions & 0 deletions src/bindings/python/flux/core/watchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ def stop(self):
raw.flux_watcher_stop(self.handle)
return self

def ref(self):
raw.flux_watcher_ref(self.handle)
return self

def unref(self):
raw.flux_watcher_unref(self.handle)
return self

def destroy(self):
# Remove this watcher from its owning Flux handle
# if the handle is still around. A try/except block
Expand Down
2 changes: 1 addition & 1 deletion src/bindings/python/flux/job/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def start(self):
# Don't let this timer watcher contribute to the reactor's
# "active" reference count:
#
self.timer.flux_handle.reactor_decref()
self.timer.unref()
super().start()
# Override superclass `_t0` attribute to elapsed time is computed
# from this value and not the time of super().start():
Expand Down
10 changes: 3 additions & 7 deletions src/cmd/flux-start.c
Original file line number Diff line number Diff line change
Expand Up @@ -1038,13 +1038,9 @@ void start_server_initialize (const char *rundir, bool verbose)
log_err_exit ("could not created embedded flux-start server");
if (flux_msg_handler_addvec (ctx.h, htab, NULL, &ctx.handlers) < 0)
log_err_exit ("could not register service methods");
/* Service related watchers:
* - usock server listen fd
* - flux_t handle watcher (adds 2 active prep/check watchers)
*/
int ignore_watchers = 3;
while (ignore_watchers-- > 0)
flux_reactor_active_decref (ctx.reactor);

flux_watcher_unref (flux_get_handle_watcher (ctx.h));
flux_watcher_unref (usock_service_listen_watcher (ctx.h));
}

void start_server_finalize (void)
Expand Down
22 changes: 22 additions & 0 deletions src/common/libflux/hwatcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@ static void hwatcher_stop (flux_watcher_t *w)
flux_watcher_stop (hw->idle_w);
}

static void hwatcher_ref (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);

flux_watcher_ref (hw->fd_w);
flux_watcher_ref (hw->prepare_w);
flux_watcher_ref (hw->idle_w);
flux_watcher_ref (hw->check_w);
}

static void hwatcher_unref (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);

flux_watcher_unref (hw->fd_w);
flux_watcher_unref (hw->prepare_w);
flux_watcher_unref (hw->idle_w);
flux_watcher_unref (hw->check_w);
}

static bool hwatcher_is_active (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);
Expand Down Expand Up @@ -106,6 +126,8 @@ static void hwatcher_check_cb (flux_reactor_t *r,
static struct flux_watcher_ops hwatcher_ops = {
.start = hwatcher_start,
.stop = hwatcher_stop,
.ref = hwatcher_ref,
.unref = hwatcher_unref,
.is_active = hwatcher_is_active,
.destroy = hwatcher_destroy,
};
Expand Down
6 changes: 6 additions & 0 deletions src/common/libflux/msg_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,12 @@ int flux_dispatch_requeue (flux_t *h)
return 0;
}

flux_watcher_t *flux_get_handle_watcher (flux_t *h)
{
struct dispatch *d = dispatch_get (h);
return d ? d->w : NULL;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
3 changes: 3 additions & 0 deletions src/common/libflux/msg_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ void flux_msg_handler_delvec (flux_msg_handler_t *msg_handlers[]);
*/
int flux_dispatch_requeue (flux_t *h);

// accessor for start/stop/ref/unref
flux_watcher_t *flux_get_handle_watcher (flux_t *h);

#ifdef __cplusplus
}
#endif
Expand Down
13 changes: 0 additions & 13 deletions src/common/libflux/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <stdbool.h>
Expand Down Expand Up @@ -127,18 +126,6 @@ void flux_reactor_stop_error (flux_reactor_t *r)
ev_break (r->loop, EVBREAK_ALL);
}

void flux_reactor_active_incref (flux_reactor_t *r)
{
if (r)
ev_ref (r->loop);
}

void flux_reactor_active_decref (flux_reactor_t *r)
{
if (r)
ev_unref (r->loop);
}

void *reactor_get_loop (flux_reactor_t *r)
{
return r ? r->loop : NULL;
Expand Down
7 changes: 0 additions & 7 deletions src/common/libflux/reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ double flux_reactor_now (flux_reactor_t *r);
void flux_reactor_now_update (flux_reactor_t *r);
double flux_reactor_time (void);

/* Change reactor reference count.
* Each active watcher holds a reference.
* When the reference count reaches zero, the reactor loop exits.
*/
void flux_reactor_active_incref (flux_reactor_t *r);
void flux_reactor_active_decref (flux_reactor_t *r);

#ifdef __cplusplus
}
#endif
Expand Down
Loading

0 comments on commit 59cbf5e

Please sign in to comment.