diff --git a/doc/man3/flux_future_create.adoc b/doc/man3/flux_future_create.adoc index fc7c4e2a09ae..e22b72bb89de 100644 --- a/doc/man3/flux_future_create.adoc +++ b/doc/man3/flux_future_create.adoc @@ -13,10 +13,9 @@ SYNOPSIS #include typedef void (*flux_future_init_f)(flux_future_t *f, - flux_reactor_t *r, void *arg); + flux_reactor_t *r, void *arg); - flux_future_t *flux_future_create (flux_reactor_t *r, - flux_future_init_f cb, void *arg); + flux_future_t *flux_future_create (flux_future_init_f cb, void *arg); void flux_future_fulfill (flux_future_t *f, void *result, flux_free_f free_fn); @@ -28,6 +27,10 @@ SYNOPSIS int flux_future_aux_set (flux_future_t *f, const char *name, void *aux, flux_free_f destroy); + void flux_future_set_reactor (flux_future_t *f, flux_t *h); + + flux_reactor_t *flux_future_get_reactor (flux_future_t *f); + void flux_future_set_flux (flux_future_t *f, flux_t *h); flux_t *flux_future_get_flux (flux_future_t *f); @@ -39,29 +42,30 @@ DESCRIPTION A Flux future represents some activity that may be completed with reactor watchers and/or message handlers. It is intended to be returned by other classes as a handle for synchronization and a container for results. -Such a class provides two user-facing functions, one to initiate the -activity and return a future which internally calls `flux_future_create()`, -and one to access class-specific result(s), which internally calls -`flux_future_get()`. The class also provides a _flux_future_init_f_ -function that is called lazily by the future implementation to perform -class-specific reactor setup, such as installing watchers and message -handlers. This page describes the future interfaces used by such classes. +This page describes the future interfaces used by such classes. Class users and users seeking an introduction to Flux futures are referred to `flux_future_then(3)`. -`flux_future_create()` creates a future, associates a reactor with it, - and registers the class-specific initialization callback _cb_, and an -opaque argument _arg_ that will be passed to _cb_. The callback sets -up class-specific watchers on the reactor to handle asynchronous events. -The watchers must eventually call `flux_future_fulfill()` or -`flux_future_fulfill_error()` to fulfill the future. The callback may -occur in one or both of two contexts. A call in the first context occurs -when the user calls `flux_future_then()`. A call in the second context -occurs when the user (or `flux_future_get()`) calls `flux_future_wait_for()`. -In the former case, the callback receives the reactor _r_ passed to -`flux_future_create()`. In the latter case, it receives a temporary reactor -created within the `flux_future_wait_for()` implementation. See REACTOR -CONTEXTS below for more information. +A class that returns a future usually provides a creation function +that internally calls `flux_future_create()`, and may provide functions +to access class-specific result(s), that internally call `flux_future_get()`. +The create function internally registers a _flux_future_init_f_ +function that is called lazily by the future implementation to perform +class-specific reactor setup, such as installing watchers and message +handlers. + +`flux_future_create()` creates a future and registers the class-specific +initialization callback _cb_, and an opaque argument _arg_ that will be +passed to _cb_. The purpose of the initialization callback is to set up +class-specific watchers on a reactor obtained with `flux_future_get_reactor()`, +or message handlers on a flux_t handle obtained with `flux_future_get_flux()`, +or both. `flux_future_get_reactor()` and `flux_future_get_flux()` return +different results depending on whether the initialization callback is +triggered by a user calling `flux_future_then()` or `flux_future_wait_for()`. +The function may be triggered in one or both contexts, at most once for each. +The watchers or message handlers must eventually call `flux_future_fulfill()` +or `flux_future_fulfill_error()` to fulfill the future. See REACTOR CONTEXTS +below for more information. `flux_future_fulfill()` fulfills the future, assigning an opaque _result_ value with optional destructor _free_fn_ to the future. @@ -79,10 +83,13 @@ retrieves an object by _name_. Destructors are called when the future is destroyed. Objects may be stored anonymously under a NULL _name_ to be scheduled for destruction without the option of retrieval. +`flux_future_set_reactor()` may be used to associate a Flux reactor +with a future. The reactor (or a temporary one, depending on the context) +may be retrieved using `flux_future_get_reactor()`. + `flux_future_set_flux()` may be used to associate a Flux broker handle -with a future. The handle may be retrieved from within an init callback using -`flux_future_get_flux()` and used to set up message handlers that -fulfill the future in the same manner as described for reactor watchers. +with a future. The handle (or a clone associated with a temporary reactor, +depending on the context) may be retrieved using `flux_future_get_flux()`. Futures may "contain" other futures, to arbitrary depth. That is, an init callback may create futures and use their continuations to fulfill @@ -93,22 +100,31 @@ handlers. REACTOR CONTEXTS ---------------- -Internally, a future can operate in two reactor contexts. The init +Internally, a future can operate in two reactor contexts. The initialization callback may be called in either or both contexts, depending on which -synchronization functions are called by the user. - -The main reactor context involves the reactor passed to `flux_future_create()`. -This reactor is expected to be run or re-entered by the user, and can process -the future's watchers in parallel with other watchers registered by the -application. The call to `flux_future_then()` triggers the init callback -in this context. - -Alternatively, an internal reactor is created when `flux_future_wait_for()` -is called before the future is complete. The separate reactor allows these -functions to wait _only_ for the future's events, without allowing unrelated -watchers registered in the main reactor to run, which might complicate the -application's control flow. After the internal reactor is created, the -init callback is made in this context. +synchronization functions are called by the user. `flux_future_get_reactor()` +and `flux_future_get_flux()` return a result that depends on which context +they are called from. + +When the user calls `flux_future_then()`, this triggers a call to the +initialization callback. The callback would typically call +`flux_future_get_reactor()` and/or `flux_future_get_flux()` to obtain the +reactor or flux_t handle to be used to set up watchers or message handlers. +In this context, the reactor or flux_t handle are exactly the ones passed +to `flux_future_set_reactor()` and `flux_future_set_flux()`. + +When the user calls `flux_future_wait_for()`, this triggers the creation +of a temporary reactor, then a call to the initialization callback. +The temporary reactor allows these functions to wait _only_ for the future's +events, without allowing unrelated watchers registered in the main reactor +to run, which might complicate the application's control flow. In this +context, `flux_future_get_reactor()` returns the temporary reactor, not +the one passed in with `flux_future_set_reactor()`. `flux_future_get_flux()` +returns a temporary flux_t handle cloned from the one passed to +`flux_future_set_flux()`, and associated with the temporary reactor. +After the internal reactor returns, any messages unmatched by the dispatcher +on the cloned handle are requeued in the main flux_t handle with +`flux_dispatch_requeue()`. Since the init callback may be made in either reactor context (at most once each), and is unaware of which context that is, it should take care when @@ -118,19 +134,6 @@ with `flux_future_aux_set()` may be useful for managing the life cycle of reactor watchers and message handlers created by init callbacks. -MESSAGE HANDLERS ----------------- - -To allow message handlers to be registered in either reactor context, -`flux_future_get_flux()` is context sensitive. If called in the main -reactor context, it directly returns the broker handle registered with -`flux_future_set_flux()`. If called in the internal reactor context, -it returns a clone of that handle, obtained with `flux_clone()`, and -associated with the internal reactor. After the internal reactor returns, -any message unmatched by the dispatcher on the cloned handle are requeued -in the handle with `flux_dispatch_requeue()`. - - RETURN VALUE ------------ @@ -146,6 +149,9 @@ error, NULL is returned and errno is set appropriately. `flux_future_get_flux()` returns a flux_t handle on success. On error, NULL is returned and errno is set appropriately. +`flux_future_get_reactor()` returns a flux_reactor_t on success. On error, +NULL is returned and errno is set appropriately. + ERRORS ------ diff --git a/doc/man3/flux_future_then.adoc b/doc/man3/flux_future_then.adoc index 1e69a37702e7..a3226843a0f4 100644 --- a/doc/man3/flux_future_then.adoc +++ b/doc/man3/flux_future_then.adoc @@ -37,14 +37,17 @@ occurred. Flux futures were inspired by similar constructs in other programming environments mentioned in RESOURCES, but are not a faithful implementation of any particular one. -Generally other Flux classes return futures, and provide a class-specific +Generally other Flux classes return futures, and may provide class-specific access function for results. The functions described in this page can be used to access, synchronize, and destroy futures returned from any such class. Authors of classes that return futures are referred to `flux_future_create(3)`. `flux_future_then()` sets up a continuation callback _cb_ that is called with opaque argument _arg_ once the future is fulfilled. The continuation -is registered on the main reactor passed in to `flux_future_create()`. +is registered on the reactor passed to a class-specific create function +(some create functions accept a flux_t handle, and the reactor is +derived from that). + The continuation will normally use `flux_future_get()` or a class-specific access function to obtain the result from the future container without blocking. `flux_future_then()` may only be called once on a given future. diff --git a/src/common/libflux/future.c b/src/common/libflux/future.c index 8ade58469d7e..70ec26e6ca58 100644 --- a/src/common/libflux/future.c +++ b/src/common/libflux/future.c @@ -211,8 +211,7 @@ void flux_future_destroy (flux_future_t *f) /* Create a future. */ -flux_future_t *flux_future_create (flux_reactor_t *r, - flux_future_init_f cb, void *arg) +flux_future_t *flux_future_create (flux_future_init_f cb, void *arg) { flux_future_t *f = calloc (1, sizeof (*f)); if (!f) { @@ -221,19 +220,58 @@ flux_future_t *flux_future_create (flux_reactor_t *r, } f->init = cb; f->init_arg = arg; - f->r = r; return f; error: flux_future_destroy (f); return NULL; } -/* Set flux handle +/* Set the flux reactor to be used for 'then' context. + * In 'now' context, reactor will be a temporary one. */ -void flux_future_set_flux (flux_future_t *f, flux_t *h) +void flux_future_set_reactor (flux_future_t *f, flux_reactor_t *r) { if (f) + f->r = r; +} + +/* Context dependent get of reactor. + * If "now" context, return one off reactor. + * If "then" context, return the one that was set. + */ +flux_reactor_t *flux_future_get_reactor (flux_future_t *f) +{ + flux_reactor_t *r; + + if (!f) + goto inval; + if (!f->now || !f->now->running) { + if (!f->r) + goto inval; + r = f->r; + } + else { + if (!f->now->r) + goto inval; + r = f->now->r; + } + return r; +inval: + errno = EINVAL; + return NULL; +} + +/* Set the flux handle to be used for 'then' context. + * In 'now' context, handle will be a clone of this one, + * associated with the temporary reactor. + */ +void flux_future_set_flux (flux_future_t *f, flux_t *h) +{ + if (f) { f->h = h; + if (!f->r) + f->r = flux_get_reactor (h); + } } /* Context dependent get of flux handle. @@ -295,7 +333,7 @@ int flux_future_wait_for (flux_future_t *f, double timeout) } f->now->running = true; if (f->init && !f->now->init_called) { - f->init (f, f->now->r, f->init_arg); // might set error + f->init (f, f->init_arg); // might set error f->now->init_called = true; } if (!f->result_valid && !f->result_errnum_valid) { @@ -346,7 +384,7 @@ int flux_future_then (flux_future_t *f, double timeout, f->then->continuation = cb; f->then->continuation_arg = arg; if (f->init) - f->init (f, f->then->r, f->init_arg); // might set error + f->init (f, f->init_arg); // might set error if (f->result_errnum_valid) { errno = f->result_errnum; return -1; diff --git a/src/common/libflux/future.h b/src/common/libflux/future.h index a3bace8e0b21..254b564c25e5 100644 --- a/src/common/libflux/future.h +++ b/src/common/libflux/future.h @@ -6,6 +6,10 @@ #include "handle.h" #include "msg_handler.h" +/* Interfaces useful for all classes that return futures. + * See flux_future_then(3). + */ + typedef struct flux_future flux_future_t; typedef void (*flux_continuation_f)(flux_future_t *f, void *arg); @@ -21,12 +25,13 @@ void *flux_future_aux_get (flux_future_t *f, const char *name); int flux_future_aux_set (flux_future_t *f, const char *name, void *aux, flux_free_f destroy); +/* Functions primarily used by implementors of classes that return futures. + * See flux_future_create(3). + */ -typedef void (*flux_future_init_f)(flux_future_t *f, - flux_reactor_t *r, void *arg); +typedef void (*flux_future_init_f)(flux_future_t *f, void *arg); -flux_future_t *flux_future_create (flux_reactor_t *r, - flux_future_init_f cb, void *arg); +flux_future_t *flux_future_create (flux_future_init_f cb, void *arg); int flux_future_get (flux_future_t *f, void *result); @@ -36,6 +41,8 @@ void flux_future_fulfill_error (flux_future_t *f, int errnum); void flux_future_set_flux (flux_future_t *f, flux_t *h); flux_t *flux_future_get_flux (flux_future_t *f); +void flux_future_set_reactor (flux_future_t *f, flux_reactor_t *r); +flux_reactor_t *flux_future_get_reactor (flux_future_t *f); #endif /* !_FLUX_CORE_FUTURE_H */ diff --git a/src/common/libflux/rpc.c b/src/common/libflux/rpc.c index bfefdc477fd6..c263ca418f77 100644 --- a/src/common/libflux/rpc.c +++ b/src/common/libflux/rpc.c @@ -173,7 +173,7 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, /* Callback to initialize future in main or alternate reactor contexts. * Install a message handler for the response. */ -static void initialize_cb (flux_future_t *f, flux_reactor_t *r, void *arg) +static void initialize_cb (flux_future_t *f, void *arg) { struct flux_rpc *rpc = flux_future_aux_get (f, "flux::rpc"); flux_msg_handler_t *w; @@ -204,7 +204,7 @@ static flux_future_t *flux_rpc_msg (flux_t *h, flux_future_t *f; int msgflags = 0; - if (!(f = flux_future_create (flux_get_reactor (h), initialize_cb, NULL))) + if (!(f = flux_future_create (initialize_cb, NULL))) goto error; if (!(rpc = rpc_create (h, flags))) goto error; diff --git a/src/common/libflux/test/future.c b/src/common/libflux/test/future.c index 8c5202eba72f..228890635695 100644 --- a/src/common/libflux/test/future.c +++ b/src/common/libflux/test/future.c @@ -1,5 +1,6 @@ #include #include +#include #include "future.h" #include "reactor.h" @@ -26,11 +27,15 @@ void result_destroy (void *arg) int contin_called; void *contin_arg; int contin_get_rc; +flux_reactor_t *contin_reactor; +flux_t *contin_flux; void *contin_get_result; void contin (flux_future_t *f, void *arg) { contin_called = 1; contin_arg = arg; + contin_flux = flux_future_get_flux (f); + contin_reactor = flux_future_get_reactor (f); contin_get_rc = flux_future_get (f, &contin_get_result); } @@ -44,11 +49,14 @@ void test_simple (void) BAIL_OUT ("flux_reactor_create failed"); /* create */ - f = flux_future_create (r, NULL, NULL); + f = flux_future_create (NULL, NULL); ok (f != NULL, "flux_future_create works"); if (!f) BAIL_OUT ("flux_future_create failed"); + flux_future_set_reactor (f, r); + ok (flux_future_get_reactor (f) == r, + "flux_future_get_reactor matches what was set"); /* before aux is set */ errno = 0; @@ -116,6 +124,7 @@ void test_simple (void) contin_arg = NULL; contin_get_rc = -42; contin_get_result = NULL; + contin_reactor = NULL; ok (flux_future_then (f, -1., contin, "nerp") == 0, "flux_future_then registered continuation"); ok (flux_reactor_run (r, 0) == 0, @@ -125,6 +134,8 @@ void test_simple (void) ok (contin_get_rc == 0 && contin_get_result != NULL && !strcmp (contin_get_result, "Hello"), "continuation obtained correct result with flux_future_get"); + ok (contin_reactor == r, + "flux_future_get_reactor from continuation returned set reactor"); /* destructors */ flux_future_destroy (f); @@ -142,7 +153,7 @@ void test_timeout_now (void) { flux_future_t *f; - f = flux_future_create (NULL, NULL, NULL); + f = flux_future_create (NULL, NULL); ok (f != NULL, "flux_future_create works"); if (!f) @@ -172,11 +183,12 @@ void test_timeout_then (void) if (!r) BAIL_OUT ("flux_reactor_create failed"); - f = flux_future_create (r, NULL, NULL); + f = flux_future_create (NULL, NULL); ok (f != NULL, "flux_future_create works"); if (!f) BAIL_OUT ("flux_future_create failed"); + flux_future_set_reactor (f, r); ok (flux_future_then (f, 0.1, timeout_contin, &errnum) == 0, "flux_future_then registered continuation with timeout"); @@ -201,13 +213,16 @@ void simple_init_timer_cb (flux_reactor_t *r, flux_watcher_t *w, int simple_init_called; void *simple_init_arg; -void simple_init (flux_future_t *f, flux_reactor_t *r, void *arg) +flux_reactor_t *simple_init_r; +void simple_init (flux_future_t *f, void *arg) { + flux_reactor_t *r = flux_future_get_reactor (f); flux_watcher_t *w; simple_init_called++; simple_init_arg = arg; + simple_init_r = r; w = flux_timer_watcher_create (r, 0.1, 0., simple_init_timer_cb, f); if (!w) goto error; @@ -227,13 +242,14 @@ void test_init_now (void) flux_future_t *f; char *result; - f = flux_future_create (NULL, simple_init, "testarg"); + f = flux_future_create (simple_init, "testarg"); ok (f != NULL, "flux_future_create works"); if (!f) BAIL_OUT ("flux_future_create failed"); simple_init_called = 0; simple_init_arg = NULL; + simple_init_r = NULL; result = NULL; ok (flux_future_get (f, &result) == 0, "flux_future_get worked"); @@ -242,6 +258,8 @@ void test_init_now (void) ok (simple_init_called == 1 && simple_init_arg != NULL && !strcmp (simple_init_arg, "testarg"), "init was called once with correct arg"); + ok (simple_init_r != NULL, + "flux_future_get_reactor returned tmp reactor in init"); flux_future_destroy (f); @@ -266,13 +284,15 @@ void test_init_then (void) if (!r) BAIL_OUT ("flux_reactor_create failed"); - f = flux_future_create (r, simple_init, "testarg"); + f = flux_future_create (simple_init, "testarg"); ok (f != NULL, "flux_future_create works"); if (!f) BAIL_OUT ("flux_future_create failed"); + flux_future_set_reactor (f, r); simple_init_called = 0; simple_init_arg = &f; + simple_init_r = NULL; simple_contin_result = NULL; simple_contin_called = 0; simple_contin_rc = -42; @@ -281,6 +301,8 @@ void test_init_then (void) ok (simple_init_called == 1 && simple_init_arg != NULL && !strcmp (simple_init_arg, "testarg"), "init was called once with correct arg"); + ok (simple_init_r == r, + "flux_future_get_reactor return set reactor in init"); ok (flux_reactor_run (r, 0) == 0, "reactor successfully run"); ok (simple_contin_called == 1, @@ -297,6 +319,307 @@ void test_init_then (void) diag ("%s: init works in reactor context", __FUNCTION__); } +/* mumble - a 0.01s timer wrapped in a future. + */ + +void mumble_timer_cb (flux_reactor_t *r, flux_watcher_t *w, + int revents, void *arg) +{ + flux_future_t *f = arg; + flux_future_fulfill (f, xstrdup ("Hello"), free); +} + +void mumble_init (flux_future_t *f, void *arg) +{ + flux_reactor_t *r = flux_future_get_reactor (f); + flux_watcher_t *w; + if (!(w = flux_timer_watcher_create (r, 0.01, 0., mumble_timer_cb, f))) + goto error; + if (flux_future_aux_set (f, NULL, w, + (flux_free_f)flux_watcher_destroy) < 0) { + flux_watcher_destroy (w); + goto error; + } + flux_watcher_start (w); + return; +error: + flux_future_fulfill_error (f, errno); +} + +flux_future_t *mumble_create (void) +{ + return flux_future_create (mumble_init, NULL); +} + +int fclass_contin_rc; +void fclass_contin (flux_future_t *f, void *arg) +{ + fclass_contin_rc = flux_future_get (f, arg); +} + +void test_fclass_synchronous (char *tag, flux_future_t *f, const char *expected) +{ + char *s; + + ok (flux_future_wait_for (f, -1.) == 0, + "%s: flux_future_wait_for returned successfully", tag); + ok (flux_future_get (f, &s) == 0 && s != NULL && !strcmp (s, expected), + "%s: flux_future_get worked", tag); +} + +void test_fclass_asynchronous (char *tag, + flux_future_t *f, const char *expected) +{ + flux_reactor_t *r; + char *s; + + r = flux_reactor_create (0); + if (!r) + BAIL_OUT ("flux_reactor_create failed"); + + flux_future_set_reactor (f, r); + s = NULL; + fclass_contin_rc = 42; + ok (flux_future_then (f, -1., fclass_contin, &s) == 0, + "%s: flux_future_then worked", tag); + ok (flux_reactor_run (r, 0) == 0, + "%s: flux_reactor_run returned", tag); + ok (fclass_contin_rc == 0, + "%s: continuation called flux_future_get with success", tag); + ok (s != NULL && !strcmp (s, expected), + "%s: continuation fetched expected value", tag); + + flux_reactor_destroy (r); +} + +void test_mumble (void) +{ + flux_future_t *f; + + f = mumble_create (); + ok (f != NULL, + "mumble_create worked"); + test_fclass_synchronous ("mumble", f, "Hello"); + flux_future_destroy (f); + + f = mumble_create (); + ok (f != NULL, + "mumble_create worked"); + test_fclass_asynchronous ("mumble", f, "Hello"); + flux_future_destroy (f); +} + +/* incept - two mumbles wrapped in a future, wrapped in an engima. + * No not the last bit. + */ +struct incept { + flux_future_t *f1; + flux_future_t *f2; + int count; +}; +void ic_free (struct incept *ic) +{ + if (ic) { + flux_future_destroy (ic->f1); + flux_future_destroy (ic->f2); + free (ic); + } +} +struct incept *ic_alloc (void) +{ + struct incept *ic = xzmalloc (sizeof (*ic)); + ic->f1 = mumble_create (); + ic->f2 = mumble_create (); + if (!ic->f2 || !ic->f1) { + ic_free (ic); + return NULL; + } + return ic; +} +void incept_mumble_contin (flux_future_t *f, void *arg) +{ + flux_future_t *incept_f = arg; + struct incept *ic = flux_future_aux_get (incept_f, "ic"); + if (ic == NULL) + goto error; + if (--ic->count == 0) + flux_future_fulfill (incept_f, xstrdup ("Blorg"), free); + return; +error: + flux_future_fulfill_error (incept_f, errno); +} +void incept_init (flux_future_t *f, void *arg) +{ + flux_reactor_t *r = flux_future_get_reactor (f); + struct incept *ic = arg; + + flux_future_set_reactor (ic->f1, r); + flux_future_set_reactor (ic->f2, r); + if (flux_future_then (ic->f1, -1., incept_mumble_contin, f) < 0) + goto error; + if (flux_future_then (ic->f2, -1., incept_mumble_contin, f) < 0) + goto error; + return; +error: + flux_future_fulfill_error (f, errno); +} +flux_future_t *incept_create (void) +{ + flux_future_t *f = NULL; + struct incept *ic; + + if (!(ic = ic_alloc ())) + goto error; + if (!(f = flux_future_create (incept_init, ic))) { + ic_free (ic); + goto error; + } + if (flux_future_aux_set (f, "ic", ic, (flux_free_f)ic_free) < 0) { + ic_free (ic); + goto error; + } + ic->count = 2; + return f; +error: + flux_future_destroy (f); + return NULL; +} + +void test_mumble_inception (void) +{ + flux_future_t *f; + + f = incept_create (); + ok (f != NULL, + "incept_create worked"); + test_fclass_synchronous ("incept", f, "Blorg"); + flux_future_destroy (f); + + f = incept_create (); + ok (f != NULL, + "incept_create worked"); + test_fclass_asynchronous ("incept", f, "Blorg"); + flux_future_destroy (f); +} + +/* walk - multiple mumbles wrapped in a future, executed serially + * The next future is created in the current future's contination. + */ +struct walk { + zlist_t *f; // stack of futures + int count; // number of steps requested +}; +void walk_free (struct walk *walk) +{ + if (walk) { + if (walk->f) { + flux_future_t *f; + while ((f = zlist_pop (walk->f))) + flux_future_destroy (f); + zlist_destroy (&walk->f); + } + free (walk); + } +} +struct walk *walk_alloc (void) +{ + struct walk *walk = xzmalloc (sizeof (*walk)); + if (!(walk->f = zlist_new ())) { + walk_free (walk); + return NULL; + } + return walk; +} +void walk_mumble_contin (flux_future_t *f, void *arg) +{ + flux_future_t *walk_f = arg; + struct walk *walk = flux_future_aux_get (walk_f, "walk"); + + if (walk == NULL) + goto error; + if (--walk->count > 0) { + flux_reactor_t *r = flux_future_get_reactor (walk_f); + flux_future_t *nf; + if (!(nf = mumble_create ())) + goto error; + flux_future_set_reactor (nf, r); + if (flux_future_then (nf, -1., walk_mumble_contin, walk_f) < 0) { + flux_future_destroy (nf); + goto error; + } + if (zlist_push (walk->f, nf) < 0) { + flux_future_destroy (nf); + goto error; + } + } + else + flux_future_fulfill (walk_f, xstrdup ("Nerg"), free); + diag ("%s: count=%d", __FUNCTION__, walk->count); + return; +error: + flux_future_fulfill_error (walk_f, errno); +} +void walk_init (flux_future_t *f, void *arg) +{ + flux_reactor_t *r = flux_future_get_reactor (f); + struct walk *walk = arg; + + assert (walk->count > 0); + + flux_future_t *nf; + if (!(nf = mumble_create ())) + goto error; + flux_future_set_reactor (nf, r); + if (flux_future_then (nf, -1., walk_mumble_contin, f) < 0) { + flux_future_destroy (nf); + goto error; + } + if (zlist_push (walk->f, nf) < 0) { + flux_future_destroy (nf); + goto error; + } + return; +error: + flux_future_fulfill_error (f, errno); +} +flux_future_t *walk_create (int count) +{ + flux_future_t *f = NULL; + struct walk *walk; + + if (!(walk = walk_alloc ())) + goto error; + if (!(f = flux_future_create (walk_init, walk))) { + walk_free (walk); + goto error; + } + if (flux_future_aux_set (f, "walk", walk, (flux_free_f)walk_free) < 0) { + walk_free (walk); + goto error; + } + walk->count = count; + return f; +error: + flux_future_destroy (f); + return NULL; +} + +void test_walk (void) +{ + flux_future_t *f; + + f = walk_create (4); + ok (f != NULL, + "walk_create worked"); + test_fclass_synchronous ("walk", f, "Nerg"); + flux_future_destroy (f); + + f = walk_create (10); + ok (f != NULL, + "walk_create worked"); + test_fclass_asynchronous ("walk", f, "Nerg"); + flux_future_destroy (f); +} int main (int argc, char *argv[]) { @@ -309,6 +632,10 @@ int main (int argc, char *argv[]) test_init_now (); test_init_then (); + test_mumble (); + test_mumble_inception (); + test_walk (); + done_testing(); return (0); }