Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve efficiency of asynchronous futures #1840

Merged
merged 6 commits into from
Nov 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 26 additions & 39 deletions src/common/libflux/future.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct now_context {
struct then_context {
flux_reactor_t *r; // external reactor for then
flux_watcher_t *timer; // timer watcher (if timeout set)
flux_watcher_t *prepare;// doorbell for fulfill
flux_watcher_t *check;
flux_watcher_t *idle;
bool init_called;
Expand Down Expand Up @@ -78,8 +77,6 @@ struct flux_future {
zlist_t *queue;
};

static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg);
static void check_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg);
static void now_timer_cb (flux_reactor_t *r, flux_watcher_t *w,
Expand Down Expand Up @@ -154,7 +151,6 @@ static void then_context_destroy (struct then_context *then)
{
if (then) {
flux_watcher_destroy (then->timer);
flux_watcher_destroy (then->prepare);
flux_watcher_destroy (then->check);
flux_watcher_destroy (then->idle);
free (then);
Expand All @@ -169,8 +165,6 @@ static struct then_context *then_context_create (flux_reactor_t *r, void *arg)
goto error;
}
then->r = r;
if (!(then->prepare = flux_prepare_watcher_create (r, prepare_cb, arg)))
goto error;
if (!(then->check = flux_check_watcher_create (r, check_cb, arg)))
goto error;
if (!(then->idle = flux_idle_watcher_create (r, NULL, NULL)))
Expand All @@ -183,10 +177,16 @@ static struct then_context *then_context_create (flux_reactor_t *r, void *arg)

static void then_context_start (struct then_context *then)
{
flux_watcher_start (then->prepare);
flux_watcher_start (then->idle); // prevent reactor from blocking
flux_watcher_start (then->check);
}

static void then_context_stop (struct then_context *then)
{
flux_watcher_stop (then->idle);
flux_watcher_stop (then->check);
}

static int then_context_set_timeout (struct then_context *then,
double timeout, void *arg)
{
Expand Down Expand Up @@ -342,9 +342,8 @@ static void post_fulfill (flux_future_t *f)
then_context_clear_timer (f->then);
if (f->now)
flux_reactor_stop (f->now->r);
/* in "then" context, the main reactor prepare/check/idle watchers
* will run continuation in the next reactor loop for fairness.
*/
if (f->then)
then_context_start (f->then);
}

/* Reset (unfulfill) a future.
Expand All @@ -355,7 +354,7 @@ void flux_future_reset (flux_future_t *f)
clear_result (&f->result);
f->result_valid = false;
if (f->then)
then_context_start (f->then);
then_context_stop (f->then);
if (f->queue && zlist_size (f->queue) > 0) {
struct future_result *fs = zlist_pop (f->queue);
move_result (&f->result, fs);
Expand Down Expand Up @@ -443,6 +442,11 @@ flux_t *flux_future_get_flux (flux_future_t *f)
return h;
}

static bool future_is_ready (flux_future_t *f)
{
return (f->result_valid || f->fatal_errnum_valid);
}

/* Block until future is fulfilled or timeout expires.
* This function can be called multiple times, with different timeouts.
* If timeout <= 0., there is no timeout.
Expand All @@ -459,7 +463,7 @@ int flux_future_wait_for (flux_future_t *f, double timeout)
errno = EINVAL;
return -1;
}
if (!f->result_valid && !f->fatal_errnum_valid) {
if (!future_is_ready (f)) {
if (timeout == 0.) { // don't setup 'now' context in this case
errno = ETIMEDOUT;
return -1;
Expand All @@ -475,15 +479,15 @@ int flux_future_wait_for (flux_future_t *f, double timeout)
f->init (f, f->init_arg); // might set error
f->now->init_called = true;
}
if (!f->result_valid && !f->fatal_errnum_valid) {
if (!future_is_ready (f)) {
if (flux_reactor_run (f->now->r, 0) < 0)
return -1; // errno set by now_timer_cb or other watcher
}
if (f->now->h)
flux_dispatch_requeue (f->now->h);
f->now->running = false;
}
if (!f->result_valid && !f->fatal_errnum_valid)
if (!future_is_ready (f))
return -1;
return 0;
}
Expand All @@ -493,7 +497,7 @@ int flux_future_wait_for (flux_future_t *f, double timeout)
*/
bool flux_future_is_ready (flux_future_t *f)
{
if (f && flux_future_wait_for (f, 0.) == 0)
if (f && future_is_ready (f))
return true;
return false;
}
Expand Down Expand Up @@ -538,7 +542,8 @@ int flux_future_then (flux_future_t *f, double timeout,
if (!(f->then = then_context_create (f->r, f)))
return -1;
}
then_context_start (f->then);
if (future_is_ready (f))
then_context_start (f->then);
if (then_context_set_timeout (f->then, timeout, f) < 0)
return -1;
f->then->continuation = cb;
Expand Down Expand Up @@ -697,20 +702,6 @@ static void now_timer_cb (flux_reactor_t *r, flux_watcher_t *w,
flux_reactor_stop_error (r);
}

/* prepare - if results are ready, ensure loop doesn't block
* so check can call continuation on next loop iteration
*/
static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
flux_future_t *f = arg;

assert (f->then != NULL);

if (f->result_valid || f->fatal_errnum_valid)
flux_watcher_start (f->then->idle); // prevent reactor from blocking
}

/* check - if results are ready, call the continuation
*/
static void check_cb (flux_reactor_t *r, flux_watcher_t *w,
Expand All @@ -720,15 +711,11 @@ static void check_cb (flux_reactor_t *r, flux_watcher_t *w,

assert (f->then != NULL);

flux_watcher_stop (f->then->idle);
if (f->result_valid || f->fatal_errnum_valid) {
flux_watcher_stop (f->then->timer);
flux_watcher_stop (f->then->prepare);
flux_watcher_stop (f->then->check);
if (f->then->continuation)
f->then->continuation (f, f->then->continuation_arg);
// N.B. callback might destroy future
}
flux_watcher_stop (f->then->timer);
then_context_stop (f->then);
if (f->then->continuation)
f->then->continuation (f, f->then->continuation_arg);
// N.B. callback might destroy future
}


Expand Down
100 changes: 100 additions & 0 deletions src/common/libflux/test/future.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ void test_simple (void)
&& !strcmp (result_destroy_arg, "Hello"),
"flux_future_destroy called result destructor correctly");

flux_reactor_destroy (r);
diag ("%s: simple tests completed", __FUNCTION__);
}

Expand Down Expand Up @@ -754,6 +755,60 @@ void test_fatal_error (void)
flux_future_destroy (f);
}

void fatal_error_continuation (flux_future_t *f, void *arg)
{
int *fp = arg;
int rc = flux_future_get (f, NULL);
*fp = errno;
cmp_ok (rc, "<", 0,
"flux_future_get() returns < 0 in continuation after fatal err ");
}

void test_fatal_error_async (void)
{
int fatalerr = 0;
flux_reactor_t *r;
flux_future_t *f;

if (!(r = flux_reactor_create (0)))
BAIL_OUT ("flux_reactor_create failed");
if (!(f = flux_future_create (NULL, NULL)))
BAIL_OUT ("flux_future_create failed");
flux_future_set_reactor (f, r);

flux_future_fatal_error (f, EPERM, NULL);

ok (flux_future_then (f, -1., fatal_error_continuation, &fatalerr) == 0,
"flux_future_then on future with fatal error");
if (flux_reactor_run (r, FLUX_REACTOR_NOWAIT) < 0)
BAIL_OUT ("flux_reactor_run NOWAIT failed");
cmp_ok (fatalerr, "==", EPERM,
"continuation runs after fatal error");

flux_future_destroy (f);

fatalerr = 0;
if (!(f = flux_future_create (NULL, NULL)))
BAIL_OUT ("flux_future_create failed");
flux_future_set_reactor (f, r);

flux_future_fatal_error (f, EPERM, NULL);

ok (flux_future_get (f, NULL) < 0
&& errno == EPERM,
"flux_future_get returns fatal error EPERM");

ok (flux_future_then (f, -1., fatal_error_continuation, &fatalerr) == 0,
"flux_future_then on future with fatal error and previous get");
if (flux_reactor_run (r, FLUX_REACTOR_NOWAIT) < 0)
BAIL_OUT ("flux_reactor_run NOWAIT failed");
cmp_ok (fatalerr, "==", EPERM,
"continuation runs after fatal error syncrhnously retrieved");

flux_future_destroy (f);
flux_reactor_destroy (r);
}

void test_error_string (void)
{
flux_future_t *f;
Expand Down Expand Up @@ -876,6 +931,49 @@ void test_multiple_fulfill (void)
flux_reactor_destroy (r);
}

void multiple_fulfill_continuation (flux_future_t *f, void *arg)
{
const void **resultp = arg;
ok (flux_future_get (f, resultp) == 0,
"flux_future_get() in async continuation works");
flux_future_reset (f);
}

void test_multiple_fulfill_asynchronous (void)
{
int rc;
flux_reactor_t *r;
flux_future_t *f;
const void *result;

if (!(r = flux_reactor_create (0)))
BAIL_OUT ("flux_reactor_create failed");

if (!(f = flux_future_create (NULL, NULL)))
BAIL_OUT ("flux_future_create failed");
flux_future_set_reactor (f, r);

flux_future_fulfill (f, "foo", NULL);
flux_future_fulfill (f, "bar", NULL);

/* Call continuation once to get first value and reset future */
multiple_fulfill_continuation (f, &result);

ok (strcmp (result, "foo") == 0,
"calling multiple_fulfill_continuation synchronously worked");

rc = flux_future_then (f, -1., multiple_fulfill_continuation, &result);
cmp_ok (rc, "==", 0,
"flux_future_then() works for multiple fulfilled future");
if (flux_reactor_run (r, FLUX_REACTOR_NOWAIT) < 0)
BAIL_OUT ("flux_reactor_run NOWAIT failed");
ok (strcmp (result, "bar") == 0,
"continuation was called for multiple-fulfilled future");

flux_future_destroy (f);
flux_reactor_destroy (r);
}

int main (int argc, char *argv[])
{
plan (NO_PLAN);
Expand All @@ -894,10 +992,12 @@ int main (int argc, char *argv[])
test_reset ();

test_fatal_error ();
test_fatal_error_async ();

test_error_string ();

test_multiple_fulfill ();
test_multiple_fulfill_asynchronous ();

done_testing();
return (0);
Expand Down