From cfe5c02bc206d8b752ada4d56322d831ac803f77 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 16 Nov 2018 07:30:38 -0800 Subject: [PATCH 1/6] libflux: abstract ready test for futures Problem: several places in libflux/future.c test if a future is ready or not ready by checking both f->result_valid *and* f->fatal_errnum_valid. This requirement could too easily lead to a future maintainer (hah) forgetting one of these checks, so abstract this simple test into a convenience function and use it throughout the code. This change also cleans up `flux_future_is_ready()` to use the new function. Though the function handily used `flux_future_wait_for (f, 0.)` to test for readiness, in the end that amounted to the same check implemented in the new `future_is_ready`, and use of that function is more clear. --- src/common/libflux/future.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/common/libflux/future.c b/src/common/libflux/future.c index d5dcb9b04669..988d2b463234 100644 --- a/src/common/libflux/future.c +++ b/src/common/libflux/future.c @@ -443,6 +443,11 @@ flux_t *flux_future_get_flux (flux_future_t *f) return h; } +static bool future_is_ready (flux_future_t *f) +{ + return (f->result_valid || f->fatal_errnum_valid); +} + /* Block until future is fulfilled or timeout expires. * This function can be called multiple times, with different timeouts. * If timeout <= 0., there is no timeout. @@ -459,7 +464,7 @@ int flux_future_wait_for (flux_future_t *f, double timeout) errno = EINVAL; return -1; } - if (!f->result_valid && !f->fatal_errnum_valid) { + if (!future_is_ready (f)) { if (timeout == 0.) { // don't setup 'now' context in this case errno = ETIMEDOUT; return -1; @@ -475,7 +480,7 @@ int flux_future_wait_for (flux_future_t *f, double timeout) f->init (f, f->init_arg); // might set error f->now->init_called = true; } - if (!f->result_valid && !f->fatal_errnum_valid) { + if (!future_is_ready (f)) { if (flux_reactor_run (f->now->r, 0) < 0) return -1; // errno set by now_timer_cb or other watcher } @@ -483,7 +488,7 @@ int flux_future_wait_for (flux_future_t *f, double timeout) flux_dispatch_requeue (f->now->h); f->now->running = false; } - if (!f->result_valid && !f->fatal_errnum_valid) + if (!future_is_ready (f)) return -1; return 0; } @@ -493,7 +498,7 @@ int flux_future_wait_for (flux_future_t *f, double timeout) */ bool flux_future_is_ready (flux_future_t *f) { - if (f && flux_future_wait_for (f, 0.) == 0) + if (f && future_is_ready (f)) return true; return false; } @@ -707,7 +712,7 @@ static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w, assert (f->then != NULL); - if (f->result_valid || f->fatal_errnum_valid) + if (future_is_ready (f)) flux_watcher_start (f->then->idle); // prevent reactor from blocking } @@ -721,7 +726,7 @@ static void check_cb (flux_reactor_t *r, flux_watcher_t *w, assert (f->then != NULL); flux_watcher_stop (f->then->idle); - if (f->result_valid || f->fatal_errnum_valid) { + if (future_is_ready (f)) { flux_watcher_stop (f->then->timer); flux_watcher_stop (f->then->prepare); flux_watcher_stop (f->then->check); From 0d0e43bad322cac36142ba32159e85c8813028bf Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 15 Nov 2018 14:57:04 -0800 Subject: [PATCH 2/6] libflux: don't run prep/check for unready futures Problem: futures run in asynchronous mode have their prepare and check watchers started immediately when `flux_future_then(3)` is called. This means that the `prepare_cb` and `check_cb` are run for every unfulfilled future on every reactor loop iteration. In a process with many futures (e.g. thousands of outstanding RPCs) this can result in a large slowdown. Instead of starting the prepare and check watchers at the time `flux_future_then` is called, start the watchers only after the future has been fulfilled (with result or fatal error) by calling `then_context_start` from `post_fulfill` Fixes #1839 --- src/common/libflux/future.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/common/libflux/future.c b/src/common/libflux/future.c index 988d2b463234..e7d29a2cba59 100644 --- a/src/common/libflux/future.c +++ b/src/common/libflux/future.c @@ -187,6 +187,12 @@ static void then_context_start (struct then_context *then) flux_watcher_start (then->check); } +static void then_context_stop (struct then_context *then) +{ + flux_watcher_stop (then->prepare); + flux_watcher_stop (then->check); +} + static int then_context_set_timeout (struct then_context *then, double timeout, void *arg) { @@ -342,9 +348,8 @@ static void post_fulfill (flux_future_t *f) then_context_clear_timer (f->then); if (f->now) flux_reactor_stop (f->now->r); - /* in "then" context, the main reactor prepare/check/idle watchers - * will run continuation in the next reactor loop for fairness. - */ + if (f->then) + then_context_start (f->then); } /* Reset (unfulfill) a future. @@ -355,7 +360,7 @@ void flux_future_reset (flux_future_t *f) clear_result (&f->result); f->result_valid = false; if (f->then) - then_context_start (f->then); + then_context_stop (f->then); if (f->queue && zlist_size (f->queue) > 0) { struct future_result *fs = zlist_pop (f->queue); move_result (&f->result, fs); @@ -543,7 +548,8 @@ int flux_future_then (flux_future_t *f, double timeout, if (!(f->then = then_context_create (f->r, f))) return -1; } - then_context_start (f->then); + if (future_is_ready (f)) + then_context_start (f->then); if (then_context_set_timeout (f->then, timeout, f) < 0) return -1; f->then->continuation = cb; From b3810a380a145f51aad566a968b1a50accbe9087 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 16 Nov 2018 08:08:21 -0800 Subject: [PATCH 3/6] libflux: eliminate prepare watcher for futures The flux_future_t prepare watcher callback is currently used only to start the idle watcher. Eliminate the middle man and start the idle watcher directly in `then_context_start`. --- src/common/libflux/future.c | 38 +++++++------------------------------ 1 file changed, 7 insertions(+), 31 deletions(-) diff --git a/src/common/libflux/future.c b/src/common/libflux/future.c index e7d29a2cba59..7ffc134de4ed 100644 --- a/src/common/libflux/future.c +++ b/src/common/libflux/future.c @@ -46,7 +46,6 @@ struct now_context { struct then_context { flux_reactor_t *r; // external reactor for then flux_watcher_t *timer; // timer watcher (if timeout set) - flux_watcher_t *prepare;// doorbell for fulfill flux_watcher_t *check; flux_watcher_t *idle; bool init_called; @@ -78,8 +77,6 @@ struct flux_future { zlist_t *queue; }; -static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w, - int revents, void *arg); static void check_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg); static void now_timer_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -154,7 +151,6 @@ static void then_context_destroy (struct then_context *then) { if (then) { flux_watcher_destroy (then->timer); - flux_watcher_destroy (then->prepare); flux_watcher_destroy (then->check); flux_watcher_destroy (then->idle); free (then); @@ -169,8 +165,6 @@ static struct then_context *then_context_create (flux_reactor_t *r, void *arg) goto error; } then->r = r; - if (!(then->prepare = flux_prepare_watcher_create (r, prepare_cb, arg))) - goto error; if (!(then->check = flux_check_watcher_create (r, check_cb, arg))) goto error; if (!(then->idle = flux_idle_watcher_create (r, NULL, NULL))) @@ -183,13 +177,13 @@ static struct then_context *then_context_create (flux_reactor_t *r, void *arg) static void then_context_start (struct then_context *then) { - flux_watcher_start (then->prepare); + flux_watcher_start (then->idle); // prevent reactor from blocking flux_watcher_start (then->check); } static void then_context_stop (struct then_context *then) { - flux_watcher_stop (then->prepare); + flux_watcher_stop (then->idle); flux_watcher_stop (then->check); } @@ -708,20 +702,6 @@ static void now_timer_cb (flux_reactor_t *r, flux_watcher_t *w, flux_reactor_stop_error (r); } -/* prepare - if results are ready, ensure loop doesn't block - * so check can call continuation on next loop iteration - */ -static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w, - int revents, void *arg) -{ - flux_future_t *f = arg; - - assert (f->then != NULL); - - if (future_is_ready (f)) - flux_watcher_start (f->then->idle); // prevent reactor from blocking -} - /* check - if results are ready, call the continuation */ static void check_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -731,15 +711,11 @@ static void check_cb (flux_reactor_t *r, flux_watcher_t *w, assert (f->then != NULL); - flux_watcher_stop (f->then->idle); - if (future_is_ready (f)) { - flux_watcher_stop (f->then->timer); - flux_watcher_stop (f->then->prepare); - flux_watcher_stop (f->then->check); - if (f->then->continuation) - f->then->continuation (f, f->then->continuation_arg); - // N.B. callback might destroy future - } + flux_watcher_stop (f->then->timer); + then_context_stop (f->then); + if (f->then->continuation) + f->then->continuation (f, f->then->continuation_arg); + // N.B. callback might destroy future } From 375bc88565ca9dc96537fbb633679018578f4655 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 16 Nov 2018 09:54:36 -0800 Subject: [PATCH 4/6] test: libflux: test fatal errors on futures in async mode Add unit tests to ensure fatal errors in flux_future_t are handled in asynchronous mode (then context) both before and after a synchronous get of the error. --- src/common/libflux/test/future.c | 55 ++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/src/common/libflux/test/future.c b/src/common/libflux/test/future.c index dfea4995bece..0b5680afd073 100644 --- a/src/common/libflux/test/future.c +++ b/src/common/libflux/test/future.c @@ -754,6 +754,60 @@ void test_fatal_error (void) flux_future_destroy (f); } +void fatal_error_continuation (flux_future_t *f, void *arg) +{ + int *fp = arg; + int rc = flux_future_get (f, NULL); + *fp = errno; + cmp_ok (rc, "<", 0, + "flux_future_get() returns < 0 in continuation after fatal err "); +} + +void test_fatal_error_async (void) +{ + int fatalerr = 0; + flux_reactor_t *r; + flux_future_t *f; + + if (!(r = flux_reactor_create (0))) + BAIL_OUT ("flux_reactor_create failed"); + if (!(f = flux_future_create (NULL, NULL))) + BAIL_OUT ("flux_future_create failed"); + flux_future_set_reactor (f, r); + + flux_future_fatal_error (f, EPERM, NULL); + + ok (flux_future_then (f, -1., fatal_error_continuation, &fatalerr) == 0, + "flux_future_then on future with fatal error"); + if (flux_reactor_run (r, FLUX_REACTOR_NOWAIT) < 0) + BAIL_OUT ("flux_reactor_run NOWAIT failed"); + cmp_ok (fatalerr, "==", EPERM, + "continuation runs after fatal error"); + + flux_future_destroy (f); + + fatalerr = 0; + if (!(f = flux_future_create (NULL, NULL))) + BAIL_OUT ("flux_future_create failed"); + flux_future_set_reactor (f, r); + + flux_future_fatal_error (f, EPERM, NULL); + + ok (flux_future_get (f, NULL) < 0 + && errno == EPERM, + "flux_future_get returns fatal error EPERM"); + + ok (flux_future_then (f, -1., fatal_error_continuation, &fatalerr) == 0, + "flux_future_then on future with fatal error and previous get"); + if (flux_reactor_run (r, FLUX_REACTOR_NOWAIT) < 0) + BAIL_OUT ("flux_reactor_run NOWAIT failed"); + cmp_ok (fatalerr, "==", EPERM, + "continuation runs after fatal error syncrhnously retrieved"); + + flux_future_destroy (f); + flux_reactor_destroy (r); +} + void test_error_string (void) { flux_future_t *f; @@ -894,6 +948,7 @@ int main (int argc, char *argv[]) test_reset (); test_fatal_error (); + test_fatal_error_async (); test_error_string (); From 529c205670d9244de84d4390adb1000907177c6e Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 16 Nov 2018 09:57:47 -0800 Subject: [PATCH 5/6] test: libflux: free reactor in future unit tests Clean up leaked flux_reactor_t in libflux/test/future.c: test_simple(). --- src/common/libflux/test/future.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/libflux/test/future.c b/src/common/libflux/test/future.c index 0b5680afd073..2981bde1166f 100644 --- a/src/common/libflux/test/future.c +++ b/src/common/libflux/test/future.c @@ -150,6 +150,7 @@ void test_simple (void) && !strcmp (result_destroy_arg, "Hello"), "flux_future_destroy called result destructor correctly"); + flux_reactor_destroy (r); diag ("%s: simple tests completed", __FUNCTION__); } From 21cf90ab66de46871a78c477252c68139b4e2943 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 16 Nov 2018 10:32:25 -0800 Subject: [PATCH 6/6] test: libflux: cover queued result futures in async mode Ensure a case where a multiple-result future is use first synchronously then asynchronously is covered in the unit tests. --- src/common/libflux/test/future.c | 44 ++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/common/libflux/test/future.c b/src/common/libflux/test/future.c index 2981bde1166f..54b6cc84e71a 100644 --- a/src/common/libflux/test/future.c +++ b/src/common/libflux/test/future.c @@ -931,6 +931,49 @@ void test_multiple_fulfill (void) flux_reactor_destroy (r); } +void multiple_fulfill_continuation (flux_future_t *f, void *arg) +{ + const void **resultp = arg; + ok (flux_future_get (f, resultp) == 0, + "flux_future_get() in async continuation works"); + flux_future_reset (f); +} + +void test_multiple_fulfill_asynchronous (void) +{ + int rc; + flux_reactor_t *r; + flux_future_t *f; + const void *result; + + if (!(r = flux_reactor_create (0))) + BAIL_OUT ("flux_reactor_create failed"); + + if (!(f = flux_future_create (NULL, NULL))) + BAIL_OUT ("flux_future_create failed"); + flux_future_set_reactor (f, r); + + flux_future_fulfill (f, "foo", NULL); + flux_future_fulfill (f, "bar", NULL); + + /* Call continuation once to get first value and reset future */ + multiple_fulfill_continuation (f, &result); + + ok (strcmp (result, "foo") == 0, + "calling multiple_fulfill_continuation synchronously worked"); + + rc = flux_future_then (f, -1., multiple_fulfill_continuation, &result); + cmp_ok (rc, "==", 0, + "flux_future_then() works for multiple fulfilled future"); + if (flux_reactor_run (r, FLUX_REACTOR_NOWAIT) < 0) + BAIL_OUT ("flux_reactor_run NOWAIT failed"); + ok (strcmp (result, "bar") == 0, + "continuation was called for multiple-fulfilled future"); + + flux_future_destroy (f); + flux_reactor_destroy (r); +} + int main (int argc, char *argv[]) { plan (NO_PLAN); @@ -954,6 +997,7 @@ int main (int argc, char *argv[]) test_error_string (); test_multiple_fulfill (); + test_multiple_fulfill_asynchronous (); done_testing(); return (0);