Skip to content

Commit

Permalink
Merge pull request #6494 from garlick/reactor_cleanup
Browse files Browse the repository at this point in the history
libflux: refactor reactor/watcher implementation
  • Loading branch information
mergify[bot] authored Dec 16, 2024
2 parents 23e0f78 + 971eef2 commit 1088270
Show file tree
Hide file tree
Showing 35 changed files with 1,841 additions and 2,125 deletions.
2 changes: 1 addition & 1 deletion src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

#include "src/common/libzmqutil/msg_zsock.h"
#include "src/common/libzmqutil/sockopt.h"
#include "src/common/libzmqutil/reactor.h"
#include "src/common/libzmqutil/zwatcher.h"
#include "src/common/libzmqutil/zap.h"
#include "src/common/libzmqutil/cert.h"
#include "src/common/libzmqutil/monitor.h"
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,13 @@ flux_job_LDADD = \
$(top_builddir)/src/common/libczmqcontainers/libczmqcontainers.la \
$(top_builddir)/src/common/libdebugged/libdebugged.la \
$(top_builddir)/src/common/libterminus/libterminus.la \
$(fluxcmd_ldadd)
$(fluxcmd_ldadd) \
$(top_builddir)/src/common/libflux/libflux.la

flux_exec_LDADD = \
$(top_builddir)/src/common/libsubprocess/libsubprocess.la \
$(fluxcmd_ldadd)
$(fluxcmd_ldadd) \
$(top_builddir)/src/common/libflux/libflux.la

flux_terminus_LDADD = \
$(top_builddir)/src/common/libterminus/libterminus.la \
Expand Down
7 changes: 5 additions & 2 deletions src/common/libflux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ fluxcoreinclude_HEADERS = \
handle.h \
connector.h \
reactor.h \
watcher.h \
msg_handler.h \
message.h \
msglist.h \
Expand Down Expand Up @@ -62,6 +63,10 @@ libflux_la_SOURCES = \
connector_local.c \
reactor.c \
reactor_private.h \
watcher.c \
watcher_private.h \
watcher_wrap.c \
hwatcher.c \
msg_handler.c \
message.c \
message_private.h \
Expand All @@ -79,8 +84,6 @@ libflux_la_SOURCES = \
module.c \
conf_private.h \
conf.c \
ev_flux.h \
ev_flux.c \
control.c \
future.c \
composite_future.c \
Expand Down
107 changes: 0 additions & 107 deletions src/common/libflux/ev_flux.c

This file was deleted.

37 changes: 0 additions & 37 deletions src/common/libflux/ev_flux.h

This file was deleted.

1 change: 1 addition & 0 deletions src/common/libflux/flux.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "message.h"
#include "handle.h"
#include "reactor.h"
#include "watcher.h"
#include "msg_handler.h"
#include "connector.h"
#include "msglist.h"
Expand Down
154 changes: 154 additions & 0 deletions src/common/libflux/hwatcher.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/************************************************************\
* Copyright 2014 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* hwatcher.c - reactor watcher for flux_t handle */

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <flux/core.h>

#include "src/common/libutil/errno_safe.h"

#include "watcher_private.h"

struct hwatcher {
flux_watcher_t *fd_w;
flux_watcher_t *prepare_w;
flux_watcher_t *idle_w;
flux_watcher_t *check_w;
flux_t *h;
int events;
};

static void hwatcher_start (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);

flux_watcher_start (hw->prepare_w);
flux_watcher_start (hw->check_w);
}

static void hwatcher_stop (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);

flux_watcher_stop (hw->prepare_w);
flux_watcher_stop (hw->check_w);
flux_watcher_stop (hw->fd_w);
flux_watcher_stop (hw->idle_w);
}

static bool hwatcher_is_active (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);

return flux_watcher_is_active (hw->prepare_w);
}

static void hwatcher_destroy (flux_watcher_t *w)
{
struct hwatcher *hw = watcher_get_data (w);
if (hw) {
flux_watcher_destroy (hw->prepare_w);
flux_watcher_destroy (hw->check_w);
flux_watcher_destroy (hw->fd_w);
flux_watcher_destroy (hw->idle_w);
}
}

static void hwatcher_prepare_cb (flux_reactor_t *r,
flux_watcher_t *prepare_w,
int prepare_revents,
void *arg)
{
flux_watcher_t *w = arg;
struct hwatcher *hw = watcher_get_data (w);
int hevents;

if ((hevents = flux_pollevents (hw->h)) < 0)
hevents = FLUX_POLLERR;

if ((hevents & hw->events))
flux_watcher_start (hw->idle_w);
else
flux_watcher_start (hw->fd_w);
}

static void hwatcher_check_cb (flux_reactor_t *r,
flux_watcher_t *check_w,
int check_revents,
void *arg)
{
flux_watcher_t *w = arg;
struct hwatcher *hw = watcher_get_data (w);
int hevents;
int revents;

flux_watcher_stop (hw->fd_w);
flux_watcher_stop (hw->idle_w);

if ((hevents = flux_pollevents (hw->h)) < 0)
hevents = FLUX_POLLERR;
revents = (hevents & hw->events);

if (revents)
watcher_call (w, revents);
}

static struct flux_watcher_ops hwatcher_ops = {
.start = hwatcher_start,
.stop = hwatcher_stop,
.is_active = hwatcher_is_active,
.destroy = hwatcher_destroy,
};

flux_watcher_t *flux_handle_watcher_create (flux_reactor_t *r,
flux_t *h,
int events,
flux_watcher_f cb,
void *arg)
{
struct hwatcher *hw;
flux_watcher_t *w;
if (!(w = watcher_create (r, sizeof (*hw), &hwatcher_ops, cb, arg)))
return NULL;
hw = watcher_get_data (w);
hw->events = events | FLUX_POLLERR;
hw->h = h;

if (!(hw->prepare_w = flux_prepare_watcher_create (r,
hwatcher_prepare_cb,
w))
|| !(hw->check_w = flux_check_watcher_create (r, hwatcher_check_cb, w))
|| !(hw->idle_w = flux_idle_watcher_create (r, NULL, NULL)))
goto error;

int fd;
if ((fd = flux_pollfd (h)) < 0
|| !(hw->fd_w = flux_fd_watcher_create (r, fd, FLUX_POLLIN, NULL, w)))
goto error;
return w;
error:
ERRNO_SAFE_WRAP (flux_watcher_destroy, w);
return NULL;
}

flux_t *flux_handle_watcher_get_flux (flux_watcher_t *w)
{
if (watcher_get_ops (w) != &hwatcher_ops) {
errno = EINVAL;
return NULL;
}
struct hwatcher *hw = watcher_get_data (w);
return hw->h;
}

// vi:ts=4 sw=4 expandtab
Loading

0 comments on commit 1088270

Please sign in to comment.