From 46a2573f3a08d279c99417e8fc0ec99b6780c9bf Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 12 Sep 2017 10:26:22 -0700 Subject: [PATCH 1/7] libflux/future: add flux_future_set_reactor() Problem: a composite future's init function needs to set up continuations for contained futures using the reactor it is passed, but the reactor for the contained futures was pre-set when they were created. Drop reactor argument from flux_future_create(), and add flux_future_set_reactor(). Also, if a flux_t handle is set with flux_future_set_flux(), set the set the reactor to the one associated with the flux_t handle as a side-effect. For symmetry add flux_future_get_reactor() that behaves like flux_future_get_flux(). A composite future's init function must call flux_future_get_flux () on the composite future, and flux_future set_flux() on the contained futures. Or if there is no flux_t handle involved, call flux_future_set_reactor() on the contained future with reactor passed to the init function. If neither is called on a future, flux_future_then() fails with EINVAL. Update rpc.c and future unit test. --- src/common/libflux/future.c | 48 ++++++++++++++++++++++++++++---- src/common/libflux/future.h | 5 ++-- src/common/libflux/rpc.c | 2 +- src/common/libflux/test/future.c | 38 +++++++++++++++++++++---- 4 files changed, 80 insertions(+), 13 deletions(-) diff --git a/src/common/libflux/future.c b/src/common/libflux/future.c index 8ade58469d7e..dee59179d709 100644 --- a/src/common/libflux/future.c +++ b/src/common/libflux/future.c @@ -211,8 +211,7 @@ void flux_future_destroy (flux_future_t *f) /* Create a future. */ -flux_future_t *flux_future_create (flux_reactor_t *r, - flux_future_init_f cb, void *arg) +flux_future_t *flux_future_create (flux_future_init_f cb, void *arg) { flux_future_t *f = calloc (1, sizeof (*f)); if (!f) { @@ -221,19 +220,58 @@ flux_future_t *flux_future_create (flux_reactor_t *r, } f->init = cb; f->init_arg = arg; - f->r = r; return f; error: flux_future_destroy (f); return NULL; } -/* Set flux handle +/* Set the flux reactor to be used for 'then' context. + * In 'now' context, reactor will be a temporary one. */ -void flux_future_set_flux (flux_future_t *f, flux_t *h) +void flux_future_set_reactor (flux_future_t *f, flux_reactor_t *r) { if (f) + f->r = r; +} + +/* Context dependent get of reactor. + * If "now" context, return one off reactor. + * If "then" context, return the one that was set. + */ +flux_reactor_t *flux_future_get_reactor (flux_future_t *f) +{ + flux_reactor_t *r; + + if (!f) + goto inval; + if (!f->now || !f->now->running) { + if (!f->r) + goto inval; + r = f->r; + } + else { + if (!f->now->r) + goto inval; + r = f->now->r; + } + return r; +inval: + errno = EINVAL; + return NULL; +} + +/* Set the flux handle to be used for 'then' context. + * In 'now' context, handle will be a clone of this one, + * associated with the temporary reactor. + */ +void flux_future_set_flux (flux_future_t *f, flux_t *h) +{ + if (f) { f->h = h; + if (!f->r) + f->r = flux_get_reactor (h); + } } /* Context dependent get of flux handle. diff --git a/src/common/libflux/future.h b/src/common/libflux/future.h index a3bace8e0b21..8e56bab1947c 100644 --- a/src/common/libflux/future.h +++ b/src/common/libflux/future.h @@ -25,8 +25,7 @@ int flux_future_aux_set (flux_future_t *f, const char *name, typedef void (*flux_future_init_f)(flux_future_t *f, flux_reactor_t *r, void *arg); -flux_future_t *flux_future_create (flux_reactor_t *r, - flux_future_init_f cb, void *arg); +flux_future_t *flux_future_create (flux_future_init_f cb, void *arg); int flux_future_get (flux_future_t *f, void *result); @@ -36,6 +35,8 @@ void flux_future_fulfill_error (flux_future_t *f, int errnum); void flux_future_set_flux (flux_future_t *f, flux_t *h); flux_t *flux_future_get_flux (flux_future_t *f); +void flux_future_set_reactor (flux_future_t *f, flux_reactor_t *r); +flux_reactor_t *flux_future_get_reactor (flux_future_t *f); #endif /* !_FLUX_CORE_FUTURE_H */ diff --git a/src/common/libflux/rpc.c b/src/common/libflux/rpc.c index bfefdc477fd6..2f7b958b4156 100644 --- a/src/common/libflux/rpc.c +++ b/src/common/libflux/rpc.c @@ -204,7 +204,7 @@ static flux_future_t *flux_rpc_msg (flux_t *h, flux_future_t *f; int msgflags = 0; - if (!(f = flux_future_create (flux_get_reactor (h), initialize_cb, NULL))) + if (!(f = flux_future_create (initialize_cb, NULL))) goto error; if (!(rpc = rpc_create (h, flags))) goto error; diff --git a/src/common/libflux/test/future.c b/src/common/libflux/test/future.c index 8c5202eba72f..68c8a3672b63 100644 --- a/src/common/libflux/test/future.c +++ b/src/common/libflux/test/future.c @@ -26,11 +26,15 @@ void result_destroy (void *arg) int contin_called; void *contin_arg; int contin_get_rc; +flux_reactor_t *contin_reactor; +flux_t *contin_flux; void *contin_get_result; void contin (flux_future_t *f, void *arg) { contin_called = 1; contin_arg = arg; + contin_flux = flux_future_get_flux (f); + contin_reactor = flux_future_get_reactor (f); contin_get_rc = flux_future_get (f, &contin_get_result); } @@ -44,11 +48,14 @@ void test_simple (void) BAIL_OUT ("flux_reactor_create failed"); /* create */ - f = flux_future_create (r, NULL, NULL); + f = flux_future_create (NULL, NULL); ok (f != NULL, "flux_future_create works"); if (!f) BAIL_OUT ("flux_future_create failed"); + flux_future_set_reactor (f, r); + ok (flux_future_get_reactor (f) == r, + "flux_future_get_reactor matches what was set"); /* before aux is set */ errno = 0; @@ -116,6 +123,7 @@ void test_simple (void) contin_arg = NULL; contin_get_rc = -42; contin_get_result = NULL; + contin_reactor = NULL; ok (flux_future_then (f, -1., contin, "nerp") == 0, "flux_future_then registered continuation"); ok (flux_reactor_run (r, 0) == 0, @@ -125,6 +133,8 @@ void test_simple (void) ok (contin_get_rc == 0 && contin_get_result != NULL && !strcmp (contin_get_result, "Hello"), "continuation obtained correct result with flux_future_get"); + ok (contin_reactor == r, + "flux_future_get_reactor from continuation returned set reactor"); /* destructors */ flux_future_destroy (f); @@ -142,7 +152,7 @@ void test_timeout_now (void) { flux_future_t *f; - f = flux_future_create (NULL, NULL, NULL); + f = flux_future_create (NULL, NULL); ok (f != NULL, "flux_future_create works"); if (!f) @@ -172,11 +182,12 @@ void test_timeout_then (void) if (!r) BAIL_OUT ("flux_reactor_create failed"); - f = flux_future_create (r, NULL, NULL); + f = flux_future_create (NULL, NULL); ok (f != NULL, "flux_future_create works"); if (!f) BAIL_OUT ("flux_future_create failed"); + flux_future_set_reactor (f, r); ok (flux_future_then (f, 0.1, timeout_contin, &errnum) == 0, "flux_future_then registered continuation with timeout"); @@ -201,6 +212,8 @@ void simple_init_timer_cb (flux_reactor_t *r, flux_watcher_t *w, int simple_init_called; void *simple_init_arg; +flux_reactor_t *simple_init_reactor; +flux_reactor_t *simple_init_r; void simple_init (flux_future_t *f, flux_reactor_t *r, void *arg) { flux_watcher_t *w; @@ -208,6 +221,8 @@ void simple_init (flux_future_t *f, flux_reactor_t *r, void *arg) simple_init_called++; simple_init_arg = arg; + simple_init_reactor = flux_future_get_reactor (f); + simple_init_r = r; w = flux_timer_watcher_create (r, 0.1, 0., simple_init_timer_cb, f); if (!w) goto error; @@ -227,13 +242,15 @@ void test_init_now (void) flux_future_t *f; char *result; - f = flux_future_create (NULL, simple_init, "testarg"); + f = flux_future_create (simple_init, "testarg"); ok (f != NULL, "flux_future_create works"); if (!f) BAIL_OUT ("flux_future_create failed"); simple_init_called = 0; simple_init_arg = NULL; + simple_init_r = NULL; + simple_init_reactor = NULL; result = NULL; ok (flux_future_get (f, &result) == 0, "flux_future_get worked"); @@ -242,6 +259,10 @@ void test_init_now (void) ok (simple_init_called == 1 && simple_init_arg != NULL && !strcmp (simple_init_arg, "testarg"), "init was called once with correct arg"); + ok (simple_init_reactor != NULL, + "flux_future_get_reactor returned tmp reactor in init"); + ok (simple_init_r == simple_init_reactor, + "flux_future_get_reactor got same reactor as argument"); flux_future_destroy (f); @@ -266,13 +287,16 @@ void test_init_then (void) if (!r) BAIL_OUT ("flux_reactor_create failed"); - f = flux_future_create (r, simple_init, "testarg"); + f = flux_future_create (simple_init, "testarg"); ok (f != NULL, "flux_future_create works"); if (!f) BAIL_OUT ("flux_future_create failed"); + flux_future_set_reactor (f, r); simple_init_called = 0; simple_init_arg = &f; + simple_init_r = NULL; + simple_init_reactor = NULL; simple_contin_result = NULL; simple_contin_called = 0; simple_contin_rc = -42; @@ -281,6 +305,10 @@ void test_init_then (void) ok (simple_init_called == 1 && simple_init_arg != NULL && !strcmp (simple_init_arg, "testarg"), "init was called once with correct arg"); + ok (simple_init_reactor == r, + "flux_future_get_reactor return set reactor in init"); + ok (simple_init_r == simple_init_reactor, + "flux_future_get_reactor got same reactor as argument"); ok (flux_reactor_run (r, 0) == 0, "reactor successfully run"); ok (simple_contin_called == 1, From a230c16935ab5405e55dee7fde9b227c4f1f4c91 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 13 Sep 2017 09:34:55 -0700 Subject: [PATCH 2/7] libflux/future: drop reactor arg from init callback Drop the reactor argument from the future's initialization callback. It should be accessed in the same manner as the flux_t handle now that we haver reactor get/set functions. This makes the API more uniform, and the reactor is often not used if the future contains message handlers instead of reactor watchers. Update rpc implementation and unit test. --- src/common/libflux/future.c | 4 ++-- src/common/libflux/future.h | 3 +-- src/common/libflux/rpc.c | 2 +- src/common/libflux/test/future.c | 15 ++++----------- 4 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/common/libflux/future.c b/src/common/libflux/future.c index dee59179d709..70ec26e6ca58 100644 --- a/src/common/libflux/future.c +++ b/src/common/libflux/future.c @@ -333,7 +333,7 @@ int flux_future_wait_for (flux_future_t *f, double timeout) } f->now->running = true; if (f->init && !f->now->init_called) { - f->init (f, f->now->r, f->init_arg); // might set error + f->init (f, f->init_arg); // might set error f->now->init_called = true; } if (!f->result_valid && !f->result_errnum_valid) { @@ -384,7 +384,7 @@ int flux_future_then (flux_future_t *f, double timeout, f->then->continuation = cb; f->then->continuation_arg = arg; if (f->init) - f->init (f, f->then->r, f->init_arg); // might set error + f->init (f, f->init_arg); // might set error if (f->result_errnum_valid) { errno = f->result_errnum; return -1; diff --git a/src/common/libflux/future.h b/src/common/libflux/future.h index 8e56bab1947c..21d92157aa35 100644 --- a/src/common/libflux/future.h +++ b/src/common/libflux/future.h @@ -22,8 +22,7 @@ int flux_future_aux_set (flux_future_t *f, const char *name, void *aux, flux_free_f destroy); -typedef void (*flux_future_init_f)(flux_future_t *f, - flux_reactor_t *r, void *arg); +typedef void (*flux_future_init_f)(flux_future_t *f, void *arg); flux_future_t *flux_future_create (flux_future_init_f cb, void *arg); diff --git a/src/common/libflux/rpc.c b/src/common/libflux/rpc.c index 2f7b958b4156..c263ca418f77 100644 --- a/src/common/libflux/rpc.c +++ b/src/common/libflux/rpc.c @@ -173,7 +173,7 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, /* Callback to initialize future in main or alternate reactor contexts. * Install a message handler for the response. */ -static void initialize_cb (flux_future_t *f, flux_reactor_t *r, void *arg) +static void initialize_cb (flux_future_t *f, void *arg) { struct flux_rpc *rpc = flux_future_aux_get (f, "flux::rpc"); flux_msg_handler_t *w; diff --git a/src/common/libflux/test/future.c b/src/common/libflux/test/future.c index 68c8a3672b63..5429a26183d8 100644 --- a/src/common/libflux/test/future.c +++ b/src/common/libflux/test/future.c @@ -212,16 +212,15 @@ void simple_init_timer_cb (flux_reactor_t *r, flux_watcher_t *w, int simple_init_called; void *simple_init_arg; -flux_reactor_t *simple_init_reactor; flux_reactor_t *simple_init_r; -void simple_init (flux_future_t *f, flux_reactor_t *r, void *arg) +void simple_init (flux_future_t *f, void *arg) { + flux_reactor_t *r = flux_future_get_reactor (f); flux_watcher_t *w; simple_init_called++; simple_init_arg = arg; - simple_init_reactor = flux_future_get_reactor (f); simple_init_r = r; w = flux_timer_watcher_create (r, 0.1, 0., simple_init_timer_cb, f); if (!w) @@ -250,7 +249,6 @@ void test_init_now (void) simple_init_called = 0; simple_init_arg = NULL; simple_init_r = NULL; - simple_init_reactor = NULL; result = NULL; ok (flux_future_get (f, &result) == 0, "flux_future_get worked"); @@ -259,10 +257,8 @@ void test_init_now (void) ok (simple_init_called == 1 && simple_init_arg != NULL && !strcmp (simple_init_arg, "testarg"), "init was called once with correct arg"); - ok (simple_init_reactor != NULL, + ok (simple_init_r != NULL, "flux_future_get_reactor returned tmp reactor in init"); - ok (simple_init_r == simple_init_reactor, - "flux_future_get_reactor got same reactor as argument"); flux_future_destroy (f); @@ -296,7 +292,6 @@ void test_init_then (void) simple_init_called = 0; simple_init_arg = &f; simple_init_r = NULL; - simple_init_reactor = NULL; simple_contin_result = NULL; simple_contin_called = 0; simple_contin_rc = -42; @@ -305,10 +300,8 @@ void test_init_then (void) ok (simple_init_called == 1 && simple_init_arg != NULL && !strcmp (simple_init_arg, "testarg"), "init was called once with correct arg"); - ok (simple_init_reactor == r, + ok (simple_init_r == r, "flux_future_get_reactor return set reactor in init"); - ok (simple_init_r == simple_init_reactor, - "flux_future_get_reactor got same reactor as argument"); ok (flux_reactor_run (r, 0) == 0, "reactor successfully run"); ok (simple_contin_called == 1, From 7116138b7701490dc7452a3d61c52dd6476feed2 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 12 Sep 2017 16:27:49 -0700 Subject: [PATCH 3/7] test/future: test composite future Add a test for a future containing a reactor watcher, and then a future containing two of those futures. --- src/common/libflux/test/future.c | 185 +++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) diff --git a/src/common/libflux/test/future.c b/src/common/libflux/test/future.c index 5429a26183d8..d6b1f15ab67a 100644 --- a/src/common/libflux/test/future.c +++ b/src/common/libflux/test/future.c @@ -318,6 +318,188 @@ void test_init_then (void) diag ("%s: init works in reactor context", __FUNCTION__); } +/* mumble - a 0.1s timer wrapped in a future. + */ + +void mumble_timer_cb (flux_reactor_t *r, flux_watcher_t *w, + int revents, void *arg) +{ + flux_future_t *f = arg; + flux_future_fulfill (f, xstrdup ("Hello"), free); +} + +void mumble_init (flux_future_t *f, void *arg) +{ + flux_reactor_t *r = flux_future_get_reactor (f); + flux_watcher_t *w; + if (!(w = flux_timer_watcher_create (r, 0.1, 0., mumble_timer_cb, f))) + goto error; + if (flux_future_aux_set (f, NULL, w, + (flux_free_f)flux_watcher_destroy) < 0) { + flux_watcher_destroy (w); + goto error; + } + flux_watcher_start (w); + return; +error: + flux_future_fulfill_error (f, errno); +} + +flux_future_t *mumble_create (void) +{ + return flux_future_create (mumble_init, NULL); +} + +int fclass_contin_rc; +void fclass_contin (flux_future_t *f, void *arg) +{ + fclass_contin_rc = flux_future_get (f, arg); +} + +void test_fclass_synchronous (char *tag, flux_future_t *f, const char *expected) +{ + char *s; + + ok (flux_future_wait_for (f, -1.) == 0, + "%s: flux_future_wait_for returned successfully", tag); + ok (flux_future_get (f, &s) == 0 && s != NULL && !strcmp (s, expected), + "%s: flux_future_get worked", tag); +} + +void test_fclass_asynchronous (char *tag, + flux_future_t *f, const char *expected) +{ + flux_reactor_t *r; + char *s; + + r = flux_reactor_create (0); + if (!r) + BAIL_OUT ("flux_reactor_create failed"); + + flux_future_set_reactor (f, r); + s = NULL; + fclass_contin_rc = 42; + ok (flux_future_then (f, -1., fclass_contin, &s) == 0, + "%s: flux_future_then worked", tag); + ok (flux_reactor_run (r, 0) == 0, + "%s: flux_reactor_run returned", tag); + ok (fclass_contin_rc == 0, + "%s: continuation called flux_future_get with success", tag); + ok (s != NULL && !strcmp (s, expected), + "%s: continuation fetched expected value", tag); + + flux_reactor_destroy (r); +} + +void test_mumble (void) +{ + flux_future_t *f; + + f = mumble_create (); + ok (f != NULL, + "mumble_create worked"); + test_fclass_synchronous ("mumble", f, "Hello"); + flux_future_destroy (f); + + f = mumble_create (); + ok (f != NULL, + "mumble_create worked"); + test_fclass_asynchronous ("mumble", f, "Hello"); + flux_future_destroy (f); +} + +/* incept - two mumbles wrapped in a future, wrapped in an engima. + * No not the last bit. + */ +struct incept { + flux_future_t *f1; + flux_future_t *f2; + int count; +}; +void ic_free (struct incept *ic) +{ + if (ic) { + flux_future_destroy (ic->f1); + flux_future_destroy (ic->f2); + free (ic); + } +} +struct incept *ic_alloc (void) +{ + struct incept *ic = xzmalloc (sizeof (*ic)); + ic->f1 = mumble_create (); + ic->f2 = mumble_create (); + if (!ic->f2 || !ic->f1) { + ic_free (ic); + return NULL; + } + return ic; +} +void incept_mumble_contin (flux_future_t *f, void *arg) +{ + flux_future_t *incept_f = arg; + struct incept *ic = flux_future_aux_get (incept_f, "ic"); + if (ic == NULL) + goto error; + if (--ic->count == 0) + flux_future_fulfill (incept_f, xstrdup ("Blorg"), free); + return; +error: + flux_future_fulfill_error (incept_f, errno); +} +void incept_init (flux_future_t *f, void *arg) +{ + flux_reactor_t *r = flux_future_get_reactor (f); + struct incept *ic = arg; + + flux_future_set_reactor (ic->f1, r); + flux_future_set_reactor (ic->f2, r); + if (flux_future_then (ic->f1, -1., incept_mumble_contin, f) < 0) + goto error; + if (flux_future_then (ic->f2, -1., incept_mumble_contin, f) < 0) + goto error; + return; +error: + flux_future_fulfill_error (f, errno); +} +flux_future_t *incept_create (void) +{ + flux_future_t *f = NULL; + struct incept *ic; + + if (!(ic = ic_alloc ())) + goto error; + if (!(f = flux_future_create (incept_init, ic))) { + ic_free (ic); + goto error; + } + if (flux_future_aux_set (f, "ic", ic, (flux_free_f)ic_free) < 0) { + ic_free (ic); + goto error; + } + ic->count = 2; + return f; +error: + flux_future_destroy (f); + return NULL; +} + +void test_mumble_inception (void) +{ + flux_future_t *f; + + f = incept_create (); + ok (f != NULL, + "incept_create worked"); + test_fclass_synchronous ("incept", f, "Blorg"); + flux_future_destroy (f); + + f = incept_create (); + ok (f != NULL, + "incept_create worked"); + test_fclass_asynchronous ("incept", f, "Blorg"); + flux_future_destroy (f); +} int main (int argc, char *argv[]) { @@ -330,6 +512,9 @@ int main (int argc, char *argv[]) test_init_now (); test_init_then (); + test_mumble (); + test_mumble_inception (); + done_testing(); return (0); } From 8b5bbdadeea6b813c3a9d5c082c54c7e7d8f9503 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 13 Sep 2017 08:52:16 -0700 Subject: [PATCH 4/7] test/future: add iterative composite future test Add a test for a future containing an arbitrary number of futures that are created on the fly from the contained futures' continuations until the job is done. --- src/common/libflux/test/future.c | 125 ++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/src/common/libflux/test/future.c b/src/common/libflux/test/future.c index d6b1f15ab67a..228890635695 100644 --- a/src/common/libflux/test/future.c +++ b/src/common/libflux/test/future.c @@ -1,5 +1,6 @@ #include #include +#include #include "future.h" #include "reactor.h" @@ -318,7 +319,7 @@ void test_init_then (void) diag ("%s: init works in reactor context", __FUNCTION__); } -/* mumble - a 0.1s timer wrapped in a future. +/* mumble - a 0.01s timer wrapped in a future. */ void mumble_timer_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -332,7 +333,7 @@ void mumble_init (flux_future_t *f, void *arg) { flux_reactor_t *r = flux_future_get_reactor (f); flux_watcher_t *w; - if (!(w = flux_timer_watcher_create (r, 0.1, 0., mumble_timer_cb, f))) + if (!(w = flux_timer_watcher_create (r, 0.01, 0., mumble_timer_cb, f))) goto error; if (flux_future_aux_set (f, NULL, w, (flux_free_f)flux_watcher_destroy) < 0) { @@ -501,6 +502,125 @@ void test_mumble_inception (void) flux_future_destroy (f); } +/* walk - multiple mumbles wrapped in a future, executed serially + * The next future is created in the current future's contination. + */ +struct walk { + zlist_t *f; // stack of futures + int count; // number of steps requested +}; +void walk_free (struct walk *walk) +{ + if (walk) { + if (walk->f) { + flux_future_t *f; + while ((f = zlist_pop (walk->f))) + flux_future_destroy (f); + zlist_destroy (&walk->f); + } + free (walk); + } +} +struct walk *walk_alloc (void) +{ + struct walk *walk = xzmalloc (sizeof (*walk)); + if (!(walk->f = zlist_new ())) { + walk_free (walk); + return NULL; + } + return walk; +} +void walk_mumble_contin (flux_future_t *f, void *arg) +{ + flux_future_t *walk_f = arg; + struct walk *walk = flux_future_aux_get (walk_f, "walk"); + + if (walk == NULL) + goto error; + if (--walk->count > 0) { + flux_reactor_t *r = flux_future_get_reactor (walk_f); + flux_future_t *nf; + if (!(nf = mumble_create ())) + goto error; + flux_future_set_reactor (nf, r); + if (flux_future_then (nf, -1., walk_mumble_contin, walk_f) < 0) { + flux_future_destroy (nf); + goto error; + } + if (zlist_push (walk->f, nf) < 0) { + flux_future_destroy (nf); + goto error; + } + } + else + flux_future_fulfill (walk_f, xstrdup ("Nerg"), free); + diag ("%s: count=%d", __FUNCTION__, walk->count); + return; +error: + flux_future_fulfill_error (walk_f, errno); +} +void walk_init (flux_future_t *f, void *arg) +{ + flux_reactor_t *r = flux_future_get_reactor (f); + struct walk *walk = arg; + + assert (walk->count > 0); + + flux_future_t *nf; + if (!(nf = mumble_create ())) + goto error; + flux_future_set_reactor (nf, r); + if (flux_future_then (nf, -1., walk_mumble_contin, f) < 0) { + flux_future_destroy (nf); + goto error; + } + if (zlist_push (walk->f, nf) < 0) { + flux_future_destroy (nf); + goto error; + } + return; +error: + flux_future_fulfill_error (f, errno); +} +flux_future_t *walk_create (int count) +{ + flux_future_t *f = NULL; + struct walk *walk; + + if (!(walk = walk_alloc ())) + goto error; + if (!(f = flux_future_create (walk_init, walk))) { + walk_free (walk); + goto error; + } + if (flux_future_aux_set (f, "walk", walk, (flux_free_f)walk_free) < 0) { + walk_free (walk); + goto error; + } + walk->count = count; + return f; +error: + flux_future_destroy (f); + return NULL; +} + +void test_walk (void) +{ + flux_future_t *f; + + f = walk_create (4); + ok (f != NULL, + "walk_create worked"); + test_fclass_synchronous ("walk", f, "Nerg"); + flux_future_destroy (f); + + f = walk_create (10); + ok (f != NULL, + "walk_create worked"); + test_fclass_asynchronous ("walk", f, "Nerg"); + flux_future_destroy (f); +} + int main (int argc, char *argv[]) { plan (NO_PLAN); @@ -514,6 +634,7 @@ int main (int argc, char *argv[]) test_mumble (); test_mumble_inception (); + test_walk (); done_testing(); return (0); From b5ee174bb44c2bf1efde5ee5507c7609f90c6c68 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 13 Sep 2017 09:56:04 -0700 Subject: [PATCH 5/7] libflux/future: add man page refs to header file --- src/common/libflux/future.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/common/libflux/future.h b/src/common/libflux/future.h index 21d92157aa35..254b564c25e5 100644 --- a/src/common/libflux/future.h +++ b/src/common/libflux/future.h @@ -6,6 +6,10 @@ #include "handle.h" #include "msg_handler.h" +/* Interfaces useful for all classes that return futures. + * See flux_future_then(3). + */ + typedef struct flux_future flux_future_t; typedef void (*flux_continuation_f)(flux_future_t *f, void *arg); @@ -21,6 +25,9 @@ void *flux_future_aux_get (flux_future_t *f, const char *name); int flux_future_aux_set (flux_future_t *f, const char *name, void *aux, flux_free_f destroy); +/* Functions primarily used by implementors of classes that return futures. + * See flux_future_create(3). + */ typedef void (*flux_future_init_f)(flux_future_t *f, void *arg); From 297471486842e2f87a92fe2ae1488a4e19520094 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 13 Sep 2017 10:04:56 -0700 Subject: [PATCH 6/7] doc/flux_future_then(3): adapt for reactor arg change --- doc/man3/flux_future_then.adoc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/doc/man3/flux_future_then.adoc b/doc/man3/flux_future_then.adoc index 1e69a37702e7..a3226843a0f4 100644 --- a/doc/man3/flux_future_then.adoc +++ b/doc/man3/flux_future_then.adoc @@ -37,14 +37,17 @@ occurred. Flux futures were inspired by similar constructs in other programming environments mentioned in RESOURCES, but are not a faithful implementation of any particular one. -Generally other Flux classes return futures, and provide a class-specific +Generally other Flux classes return futures, and may provide class-specific access function for results. The functions described in this page can be used to access, synchronize, and destroy futures returned from any such class. Authors of classes that return futures are referred to `flux_future_create(3)`. `flux_future_then()` sets up a continuation callback _cb_ that is called with opaque argument _arg_ once the future is fulfilled. The continuation -is registered on the main reactor passed in to `flux_future_create()`. +is registered on the reactor passed to a class-specific create function +(some create functions accept a flux_t handle, and the reactor is +derived from that). + The continuation will normally use `flux_future_get()` or a class-specific access function to obtain the result from the future container without blocking. `flux_future_then()` may only be called once on a given future. From 0b3aa3a056c0d682ca9faf128609d7c26879608f Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 13 Sep 2017 10:45:49 -0700 Subject: [PATCH 7/7] doc/flux_future_create(3): adapt for reactor arg change --- doc/man3/flux_future_create.adoc | 114 ++++++++++++++++--------------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/doc/man3/flux_future_create.adoc b/doc/man3/flux_future_create.adoc index fc7c4e2a09ae..e22b72bb89de 100644 --- a/doc/man3/flux_future_create.adoc +++ b/doc/man3/flux_future_create.adoc @@ -13,10 +13,9 @@ SYNOPSIS #include typedef void (*flux_future_init_f)(flux_future_t *f, - flux_reactor_t *r, void *arg); + flux_reactor_t *r, void *arg); - flux_future_t *flux_future_create (flux_reactor_t *r, - flux_future_init_f cb, void *arg); + flux_future_t *flux_future_create (flux_future_init_f cb, void *arg); void flux_future_fulfill (flux_future_t *f, void *result, flux_free_f free_fn); @@ -28,6 +27,10 @@ SYNOPSIS int flux_future_aux_set (flux_future_t *f, const char *name, void *aux, flux_free_f destroy); + void flux_future_set_reactor (flux_future_t *f, flux_t *h); + + flux_reactor_t *flux_future_get_reactor (flux_future_t *f); + void flux_future_set_flux (flux_future_t *f, flux_t *h); flux_t *flux_future_get_flux (flux_future_t *f); @@ -39,29 +42,30 @@ DESCRIPTION A Flux future represents some activity that may be completed with reactor watchers and/or message handlers. It is intended to be returned by other classes as a handle for synchronization and a container for results. -Such a class provides two user-facing functions, one to initiate the -activity and return a future which internally calls `flux_future_create()`, -and one to access class-specific result(s), which internally calls -`flux_future_get()`. The class also provides a _flux_future_init_f_ -function that is called lazily by the future implementation to perform -class-specific reactor setup, such as installing watchers and message -handlers. This page describes the future interfaces used by such classes. +This page describes the future interfaces used by such classes. Class users and users seeking an introduction to Flux futures are referred to `flux_future_then(3)`. -`flux_future_create()` creates a future, associates a reactor with it, - and registers the class-specific initialization callback _cb_, and an -opaque argument _arg_ that will be passed to _cb_. The callback sets -up class-specific watchers on the reactor to handle asynchronous events. -The watchers must eventually call `flux_future_fulfill()` or -`flux_future_fulfill_error()` to fulfill the future. The callback may -occur in one or both of two contexts. A call in the first context occurs -when the user calls `flux_future_then()`. A call in the second context -occurs when the user (or `flux_future_get()`) calls `flux_future_wait_for()`. -In the former case, the callback receives the reactor _r_ passed to -`flux_future_create()`. In the latter case, it receives a temporary reactor -created within the `flux_future_wait_for()` implementation. See REACTOR -CONTEXTS below for more information. +A class that returns a future usually provides a creation function +that internally calls `flux_future_create()`, and may provide functions +to access class-specific result(s), that internally call `flux_future_get()`. +The create function internally registers a _flux_future_init_f_ +function that is called lazily by the future implementation to perform +class-specific reactor setup, such as installing watchers and message +handlers. + +`flux_future_create()` creates a future and registers the class-specific +initialization callback _cb_, and an opaque argument _arg_ that will be +passed to _cb_. The purpose of the initialization callback is to set up +class-specific watchers on a reactor obtained with `flux_future_get_reactor()`, +or message handlers on a flux_t handle obtained with `flux_future_get_flux()`, +or both. `flux_future_get_reactor()` and `flux_future_get_flux()` return +different results depending on whether the initialization callback is +triggered by a user calling `flux_future_then()` or `flux_future_wait_for()`. +The function may be triggered in one or both contexts, at most once for each. +The watchers or message handlers must eventually call `flux_future_fulfill()` +or `flux_future_fulfill_error()` to fulfill the future. See REACTOR CONTEXTS +below for more information. `flux_future_fulfill()` fulfills the future, assigning an opaque _result_ value with optional destructor _free_fn_ to the future. @@ -79,10 +83,13 @@ retrieves an object by _name_. Destructors are called when the future is destroyed. Objects may be stored anonymously under a NULL _name_ to be scheduled for destruction without the option of retrieval. +`flux_future_set_reactor()` may be used to associate a Flux reactor +with a future. The reactor (or a temporary one, depending on the context) +may be retrieved using `flux_future_get_reactor()`. + `flux_future_set_flux()` may be used to associate a Flux broker handle -with a future. The handle may be retrieved from within an init callback using -`flux_future_get_flux()` and used to set up message handlers that -fulfill the future in the same manner as described for reactor watchers. +with a future. The handle (or a clone associated with a temporary reactor, +depending on the context) may be retrieved using `flux_future_get_flux()`. Futures may "contain" other futures, to arbitrary depth. That is, an init callback may create futures and use their continuations to fulfill @@ -93,22 +100,31 @@ handlers. REACTOR CONTEXTS ---------------- -Internally, a future can operate in two reactor contexts. The init +Internally, a future can operate in two reactor contexts. The initialization callback may be called in either or both contexts, depending on which -synchronization functions are called by the user. - -The main reactor context involves the reactor passed to `flux_future_create()`. -This reactor is expected to be run or re-entered by the user, and can process -the future's watchers in parallel with other watchers registered by the -application. The call to `flux_future_then()` triggers the init callback -in this context. - -Alternatively, an internal reactor is created when `flux_future_wait_for()` -is called before the future is complete. The separate reactor allows these -functions to wait _only_ for the future's events, without allowing unrelated -watchers registered in the main reactor to run, which might complicate the -application's control flow. After the internal reactor is created, the -init callback is made in this context. +synchronization functions are called by the user. `flux_future_get_reactor()` +and `flux_future_get_flux()` return a result that depends on which context +they are called from. + +When the user calls `flux_future_then()`, this triggers a call to the +initialization callback. The callback would typically call +`flux_future_get_reactor()` and/or `flux_future_get_flux()` to obtain the +reactor or flux_t handle to be used to set up watchers or message handlers. +In this context, the reactor or flux_t handle are exactly the ones passed +to `flux_future_set_reactor()` and `flux_future_set_flux()`. + +When the user calls `flux_future_wait_for()`, this triggers the creation +of a temporary reactor, then a call to the initialization callback. +The temporary reactor allows these functions to wait _only_ for the future's +events, without allowing unrelated watchers registered in the main reactor +to run, which might complicate the application's control flow. In this +context, `flux_future_get_reactor()` returns the temporary reactor, not +the one passed in with `flux_future_set_reactor()`. `flux_future_get_flux()` +returns a temporary flux_t handle cloned from the one passed to +`flux_future_set_flux()`, and associated with the temporary reactor. +After the internal reactor returns, any messages unmatched by the dispatcher +on the cloned handle are requeued in the main flux_t handle with +`flux_dispatch_requeue()`. Since the init callback may be made in either reactor context (at most once each), and is unaware of which context that is, it should take care when @@ -118,19 +134,6 @@ with `flux_future_aux_set()` may be useful for managing the life cycle of reactor watchers and message handlers created by init callbacks. -MESSAGE HANDLERS ----------------- - -To allow message handlers to be registered in either reactor context, -`flux_future_get_flux()` is context sensitive. If called in the main -reactor context, it directly returns the broker handle registered with -`flux_future_set_flux()`. If called in the internal reactor context, -it returns a clone of that handle, obtained with `flux_clone()`, and -associated with the internal reactor. After the internal reactor returns, -any message unmatched by the dispatcher on the cloned handle are requeued -in the handle with `flux_dispatch_requeue()`. - - RETURN VALUE ------------ @@ -146,6 +149,9 @@ error, NULL is returned and errno is set appropriately. `flux_future_get_flux()` returns a flux_t handle on success. On error, NULL is returned and errno is set appropriately. +`flux_future_get_reactor()` returns a flux_reactor_t on success. On error, +NULL is returned and errno is set appropriately. + ERRORS ------