Skip to content

Commit

Permalink
Merge pull request #1188 from garlick/compose_future
Browse files Browse the repository at this point in the history
libflux/future: minor interface changes to support composite futures
  • Loading branch information
grondo authored Sep 14, 2017
2 parents 4157888 + 0b3aa3a commit 06a2ec8
Show file tree
Hide file tree
Showing 6 changed files with 456 additions and 75 deletions.
114 changes: 60 additions & 54 deletions doc/man3/flux_future_create.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ SYNOPSIS
#include <flux/core.h>

typedef void (*flux_future_init_f)(flux_future_t *f,
flux_reactor_t *r, void *arg);
flux_reactor_t *r, void *arg);

flux_future_t *flux_future_create (flux_reactor_t *r,
flux_future_init_f cb, void *arg);
flux_future_t *flux_future_create (flux_future_init_f cb, void *arg);

void flux_future_fulfill (flux_future_t *f,
void *result, flux_free_f free_fn);
Expand All @@ -28,6 +27,10 @@ SYNOPSIS
int flux_future_aux_set (flux_future_t *f, const char *name,
void *aux, flux_free_f destroy);

void flux_future_set_reactor (flux_future_t *f, flux_t *h);

flux_reactor_t *flux_future_get_reactor (flux_future_t *f);

void flux_future_set_flux (flux_future_t *f, flux_t *h);

flux_t *flux_future_get_flux (flux_future_t *f);
Expand All @@ -39,29 +42,30 @@ DESCRIPTION
A Flux future represents some activity that may be completed with reactor
watchers and/or message handlers. It is intended to be returned by other
classes as a handle for synchronization and a container for results.
Such a class provides two user-facing functions, one to initiate the
activity and return a future which internally calls `flux_future_create()`,
and one to access class-specific result(s), which internally calls
`flux_future_get()`. The class also provides a _flux_future_init_f_
function that is called lazily by the future implementation to perform
class-specific reactor setup, such as installing watchers and message
handlers. This page describes the future interfaces used by such classes.
This page describes the future interfaces used by such classes.
Class users and users seeking an introduction to Flux futures are referred
to `flux_future_then(3)`.
`flux_future_create()` creates a future, associates a reactor with it,
and registers the class-specific initialization callback _cb_, and an
opaque argument _arg_ that will be passed to _cb_. The callback sets
up class-specific watchers on the reactor to handle asynchronous events.
The watchers must eventually call `flux_future_fulfill()` or
`flux_future_fulfill_error()` to fulfill the future. The callback may
occur in one or both of two contexts. A call in the first context occurs
when the user calls `flux_future_then()`. A call in the second context
occurs when the user (or `flux_future_get()`) calls `flux_future_wait_for()`.
In the former case, the callback receives the reactor _r_ passed to
`flux_future_create()`. In the latter case, it receives a temporary reactor
created within the `flux_future_wait_for()` implementation. See REACTOR
CONTEXTS below for more information.
A class that returns a future usually provides a creation function
that internally calls `flux_future_create()`, and may provide functions
to access class-specific result(s), that internally call `flux_future_get()`.
The create function internally registers a _flux_future_init_f_
function that is called lazily by the future implementation to perform
class-specific reactor setup, such as installing watchers and message
handlers.
`flux_future_create()` creates a future and registers the class-specific
initialization callback _cb_, and an opaque argument _arg_ that will be
passed to _cb_. The purpose of the initialization callback is to set up
class-specific watchers on a reactor obtained with `flux_future_get_reactor()`,
or message handlers on a flux_t handle obtained with `flux_future_get_flux()`,
or both. `flux_future_get_reactor()` and `flux_future_get_flux()` return
different results depending on whether the initialization callback is
triggered by a user calling `flux_future_then()` or `flux_future_wait_for()`.
The function may be triggered in one or both contexts, at most once for each.
The watchers or message handlers must eventually call `flux_future_fulfill()`
or `flux_future_fulfill_error()` to fulfill the future. See REACTOR CONTEXTS
below for more information.
`flux_future_fulfill()` fulfills the future, assigning an opaque
_result_ value with optional destructor _free_fn_ to the future.
Expand All @@ -79,10 +83,13 @@ retrieves an object by _name_. Destructors are called when the future is
destroyed. Objects may be stored anonymously under a NULL _name_ to be
scheduled for destruction without the option of retrieval.
`flux_future_set_reactor()` may be used to associate a Flux reactor
with a future. The reactor (or a temporary one, depending on the context)
may be retrieved using `flux_future_get_reactor()`.
`flux_future_set_flux()` may be used to associate a Flux broker handle
with a future. The handle may be retrieved from within an init callback using
`flux_future_get_flux()` and used to set up message handlers that
fulfill the future in the same manner as described for reactor watchers.
with a future. The handle (or a clone associated with a temporary reactor,
depending on the context) may be retrieved using `flux_future_get_flux()`.
Futures may "contain" other futures, to arbitrary depth. That is, an
init callback may create futures and use their continuations to fulfill
Expand All @@ -93,22 +100,31 @@ handlers.
REACTOR CONTEXTS
----------------

Internally, a future can operate in two reactor contexts. The init
Internally, a future can operate in two reactor contexts. The initialization
callback may be called in either or both contexts, depending on which
synchronization functions are called by the user.

The main reactor context involves the reactor passed to `flux_future_create()`.
This reactor is expected to be run or re-entered by the user, and can process
the future's watchers in parallel with other watchers registered by the
application. The call to `flux_future_then()` triggers the init callback
in this context.

Alternatively, an internal reactor is created when `flux_future_wait_for()`
is called before the future is complete. The separate reactor allows these
functions to wait _only_ for the future's events, without allowing unrelated
watchers registered in the main reactor to run, which might complicate the
application's control flow. After the internal reactor is created, the
init callback is made in this context.
synchronization functions are called by the user. `flux_future_get_reactor()`
and `flux_future_get_flux()` return a result that depends on which context
they are called from.

When the user calls `flux_future_then()`, this triggers a call to the
initialization callback. The callback would typically call
`flux_future_get_reactor()` and/or `flux_future_get_flux()` to obtain the
reactor or flux_t handle to be used to set up watchers or message handlers.
In this context, the reactor or flux_t handle are exactly the ones passed
to `flux_future_set_reactor()` and `flux_future_set_flux()`.

When the user calls `flux_future_wait_for()`, this triggers the creation
of a temporary reactor, then a call to the initialization callback.
The temporary reactor allows these functions to wait _only_ for the future's
events, without allowing unrelated watchers registered in the main reactor
to run, which might complicate the application's control flow. In this
context, `flux_future_get_reactor()` returns the temporary reactor, not
the one passed in with `flux_future_set_reactor()`. `flux_future_get_flux()`
returns a temporary flux_t handle cloned from the one passed to
`flux_future_set_flux()`, and associated with the temporary reactor.
After the internal reactor returns, any messages unmatched by the dispatcher
on the cloned handle are requeued in the main flux_t handle with
`flux_dispatch_requeue()`.

Since the init callback may be made in either reactor context (at most once
each), and is unaware of which context that is, it should take care when
Expand All @@ -118,19 +134,6 @@ with `flux_future_aux_set()` may be useful for managing the life cycle
of reactor watchers and message handlers created by init callbacks.


MESSAGE HANDLERS
----------------
To allow message handlers to be registered in either reactor context,
`flux_future_get_flux()` is context sensitive. If called in the main
reactor context, it directly returns the broker handle registered with
`flux_future_set_flux()`. If called in the internal reactor context,
it returns a clone of that handle, obtained with `flux_clone()`, and
associated with the internal reactor. After the internal reactor returns,
any message unmatched by the dispatcher on the cloned handle are requeued
in the handle with `flux_dispatch_requeue()`.
RETURN VALUE
------------
Expand All @@ -146,6 +149,9 @@ error, NULL is returned and errno is set appropriately.
`flux_future_get_flux()` returns a flux_t handle on success. On error,
NULL is returned and errno is set appropriately.
`flux_future_get_reactor()` returns a flux_reactor_t on success. On error,
NULL is returned and errno is set appropriately.
ERRORS
------
Expand Down
7 changes: 5 additions & 2 deletions doc/man3/flux_future_then.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ occurred. Flux futures were inspired by similar constructs in other
programming environments mentioned in RESOURCES, but are not a faithful
implementation of any particular one.
Generally other Flux classes return futures, and provide a class-specific
Generally other Flux classes return futures, and may provide class-specific
access function for results. The functions described in this page can be
used to access, synchronize, and destroy futures returned from any such class.
Authors of classes that return futures are referred to `flux_future_create(3)`.
`flux_future_then()` sets up a continuation callback _cb_ that is called
with opaque argument _arg_ once the future is fulfilled. The continuation
is registered on the main reactor passed in to `flux_future_create()`.
is registered on the reactor passed to a class-specific create function
(some create functions accept a flux_t handle, and the reactor is
derived from that).
The continuation will normally use `flux_future_get()` or a class-specific
access function to obtain the result from the future container without
blocking. `flux_future_then()` may only be called once on a given future.
Expand Down
52 changes: 45 additions & 7 deletions src/common/libflux/future.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ void flux_future_destroy (flux_future_t *f)

/* Create a future.
*/
flux_future_t *flux_future_create (flux_reactor_t *r,
flux_future_init_f cb, void *arg)
flux_future_t *flux_future_create (flux_future_init_f cb, void *arg)
{
flux_future_t *f = calloc (1, sizeof (*f));
if (!f) {
Expand All @@ -221,19 +220,58 @@ flux_future_t *flux_future_create (flux_reactor_t *r,
}
f->init = cb;
f->init_arg = arg;
f->r = r;
return f;
error:
flux_future_destroy (f);
return NULL;
}

/* Set flux handle
/* Set the flux reactor to be used for 'then' context.
* In 'now' context, reactor will be a temporary one.
*/
void flux_future_set_flux (flux_future_t *f, flux_t *h)
void flux_future_set_reactor (flux_future_t *f, flux_reactor_t *r)
{
if (f)
f->r = r;
}

/* Context dependent get of reactor.
* If "now" context, return one off reactor.
* If "then" context, return the one that was set.
*/
flux_reactor_t *flux_future_get_reactor (flux_future_t *f)
{
flux_reactor_t *r;

if (!f)
goto inval;
if (!f->now || !f->now->running) {
if (!f->r)
goto inval;
r = f->r;
}
else {
if (!f->now->r)
goto inval;
r = f->now->r;
}
return r;
inval:
errno = EINVAL;
return NULL;
}

/* Set the flux handle to be used for 'then' context.
* In 'now' context, handle will be a clone of this one,
* associated with the temporary reactor.
*/
void flux_future_set_flux (flux_future_t *f, flux_t *h)
{
if (f) {
f->h = h;
if (!f->r)
f->r = flux_get_reactor (h);
}
}

/* Context dependent get of flux handle.
Expand Down Expand Up @@ -295,7 +333,7 @@ int flux_future_wait_for (flux_future_t *f, double timeout)
}
f->now->running = true;
if (f->init && !f->now->init_called) {
f->init (f, f->now->r, f->init_arg); // might set error
f->init (f, f->init_arg); // might set error
f->now->init_called = true;
}
if (!f->result_valid && !f->result_errnum_valid) {
Expand Down Expand Up @@ -346,7 +384,7 @@ int flux_future_then (flux_future_t *f, double timeout,
f->then->continuation = cb;
f->then->continuation_arg = arg;
if (f->init)
f->init (f, f->then->r, f->init_arg); // might set error
f->init (f, f->init_arg); // might set error
if (f->result_errnum_valid) {
errno = f->result_errnum;
return -1;
Expand Down
15 changes: 11 additions & 4 deletions src/common/libflux/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#include "handle.h"
#include "msg_handler.h"

/* Interfaces useful for all classes that return futures.
* See flux_future_then(3).
*/

typedef struct flux_future flux_future_t;

typedef void (*flux_continuation_f)(flux_future_t *f, void *arg);
Expand All @@ -21,12 +25,13 @@ void *flux_future_aux_get (flux_future_t *f, const char *name);
int flux_future_aux_set (flux_future_t *f, const char *name,
void *aux, flux_free_f destroy);

/* Functions primarily used by implementors of classes that return futures.
* See flux_future_create(3).
*/

typedef void (*flux_future_init_f)(flux_future_t *f,
flux_reactor_t *r, void *arg);
typedef void (*flux_future_init_f)(flux_future_t *f, void *arg);

flux_future_t *flux_future_create (flux_reactor_t *r,
flux_future_init_f cb, void *arg);
flux_future_t *flux_future_create (flux_future_init_f cb, void *arg);

int flux_future_get (flux_future_t *f, void *result);

Expand All @@ -36,6 +41,8 @@ void flux_future_fulfill_error (flux_future_t *f, int errnum);
void flux_future_set_flux (flux_future_t *f, flux_t *h);
flux_t *flux_future_get_flux (flux_future_t *f);

void flux_future_set_reactor (flux_future_t *f, flux_reactor_t *r);
flux_reactor_t *flux_future_get_reactor (flux_future_t *f);

#endif /* !_FLUX_CORE_FUTURE_H */

Expand Down
4 changes: 2 additions & 2 deletions src/common/libflux/rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w,
/* Callback to initialize future in main or alternate reactor contexts.
* Install a message handler for the response.
*/
static void initialize_cb (flux_future_t *f, flux_reactor_t *r, void *arg)
static void initialize_cb (flux_future_t *f, void *arg)
{
struct flux_rpc *rpc = flux_future_aux_get (f, "flux::rpc");
flux_msg_handler_t *w;
Expand Down Expand Up @@ -204,7 +204,7 @@ static flux_future_t *flux_rpc_msg (flux_t *h,
flux_future_t *f;
int msgflags = 0;

if (!(f = flux_future_create (flux_get_reactor (h), initialize_cb, NULL)))
if (!(f = flux_future_create (initialize_cb, NULL)))
goto error;
if (!(rpc = rpc_create (h, flags)))
goto error;
Expand Down
Loading

0 comments on commit 06a2ec8

Please sign in to comment.