Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libflux: support setting prep watcher priority #6367

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/common/libflux/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,12 @@ double flux_watcher_next_wakeup (flux_watcher_t *w)

/* Prepare
*/

static void prepare_set_priority (flux_watcher_t *w, int priority)
{
ev_set_priority ((ev_prepare *)w->data, priority);
}

static void prepare_start (flux_watcher_t *w)
{
ev_prepare_start (w->r->loop, (ev_prepare *)w->data);
Expand All @@ -543,6 +549,7 @@ static void prepare_cb (struct ev_loop *loop, ev_prepare *pw, int revents)
}

static struct flux_watcher_ops prepare_watcher = {
.set_priority = prepare_set_priority,
.start = prepare_start,
.stop = prepare_stop,
.destroy = NULL,
Expand Down
93 changes: 76 additions & 17 deletions src/common/libflux/test/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,21 +636,73 @@ static void test_reactor_flags (flux_reactor_t *r)

static char cblist[6] = {0};
static int cblist_index = 0;

static void priority_prepare_watcher_prep_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
char *s = arg;
/* stick the char name of this watcher into the array, we'll
* compare later
*/
cblist[cblist_index++] = s[0];
flux_watcher_stop (w);
}

static void test_priority_prepare_watcher (flux_reactor_t *r)
{
flux_watcher_t *a, *b, *c, *d, *e;

memset (cblist, '\0', sizeof (cblist));
cblist_index = 0;

a = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "A");
b = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "B");
c = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "C");
d = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "D");
e = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "E");
ok (a != NULL && b != NULL && c != NULL && d != NULL && e != NULL,
"prepare watcher create worked");
// Don't set priority of 'a', it'll be default
flux_watcher_set_priority (b, 1);
flux_watcher_set_priority (c, -2);
flux_watcher_set_priority (d, -1);
flux_watcher_set_priority (e, 2);
flux_watcher_start (a);
flux_watcher_start (b);
flux_watcher_start (c);
flux_watcher_start (d);
flux_watcher_start (e);
ok (flux_reactor_run (r, 0) == 0,
"reactor ran to completion");
/* given priorities, callbacks should be called in the following order
* EBADC
*/
ok (memcmp (cblist, "EBADC", 5) == 0,
"prepare callbacks called in the correct order");
flux_watcher_destroy (a);
flux_watcher_destroy (b);
flux_watcher_destroy (c);
flux_watcher_destroy (d);
flux_watcher_destroy (e);
}

static flux_watcher_t *priority_prep = NULL;
static flux_watcher_t *priority_idle = NULL;

static void priority_prep_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
static void priority_check_watcher_prep_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
flux_watcher_start (priority_idle);
}

static void priority_check_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
static void priority_check_watcher_check_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
char *s = arg;
/* stick the char name of this watcher into the array, we'll
Expand All @@ -664,20 +716,26 @@ static void priority_check_cb (flux_reactor_t *r,
flux_watcher_stop (w);
}

static void test_priority (flux_reactor_t *r)
static void test_priority_check_watcher (flux_reactor_t *r)
{
flux_watcher_t *a, *b, *c, *d, *e;
priority_prep = flux_prepare_watcher_create (r, priority_prep_cb, NULL);

memset (cblist, '\0', sizeof (cblist));
cblist_index = 0;

priority_prep = flux_prepare_watcher_create (r,
priority_check_watcher_prep_cb,
NULL);
ok (priority_prep != NULL,
"prep watcher create worked");
priority_idle = flux_idle_watcher_create (r, NULL, NULL);
ok (priority_idle != NULL,
"idle watcher create worked");
a = flux_check_watcher_create (r, priority_check_cb, "A");
b = flux_check_watcher_create (r, priority_check_cb, "B");
c = flux_check_watcher_create (r, priority_check_cb, "C");
d = flux_check_watcher_create (r, priority_check_cb, "D");
e = flux_check_watcher_create (r, priority_check_cb, "E");
a = flux_check_watcher_create (r, priority_check_watcher_check_cb, "A");
b = flux_check_watcher_create (r, priority_check_watcher_check_cb, "B");
c = flux_check_watcher_create (r, priority_check_watcher_check_cb, "C");
d = flux_check_watcher_create (r, priority_check_watcher_check_cb, "D");
e = flux_check_watcher_create (r, priority_check_watcher_check_cb, "E");
ok (a != NULL && b != NULL && c != NULL && d != NULL && e != NULL,
"check watcher create worked");
// Don't set priority of 'a', it'll be default
Expand All @@ -697,7 +755,7 @@ static void test_priority (flux_reactor_t *r)
* DCAEB
*/
ok (memcmp (cblist, "DCAEB", 5) == 0,
"callbacks called in the correct order");
"check callbacks called in the correct order");
flux_watcher_destroy (a);
flux_watcher_destroy (b);
flux_watcher_destroy (c);
Expand Down Expand Up @@ -731,7 +789,8 @@ int main (int argc, char *argv[])
test_stat (reactor);
test_active_ref (reactor);
test_reactor_flags (reactor);
test_priority (reactor);
test_priority_prepare_watcher (reactor);
test_priority_check_watcher (reactor);

flux_reactor_destroy (reactor);

Expand Down
Loading