diff --git a/doc/man3/Makefile.am b/doc/man3/Makefile.am index f24db8ac7f94..fec9a6eb60b0 100644 --- a/doc/man3/Makefile.am +++ b/doc/man3/Makefile.am @@ -11,7 +11,10 @@ MAN3_FILES_PRIMARY = \ flux_event_subscribe.3 \ flux_pollevents.3 \ flux_msg_encode.3 \ - flux_msg_sendfd.3 + flux_msg_sendfd.3 \ + flux_rpc.3 \ + flux_rpc_then.3 \ + flux_rpc_multi.3 # These files are generated as roff .so includes of a primary page. # A2X handles this automatically if mentioned in NAME section @@ -25,7 +28,11 @@ MAN3_FILES_SECONDARY = \ flux_event_unsubscribe.3 \ flux_pollfd.3 \ flux_msg_decode.3 \ - flux_msg_recvfd.3 + flux_msg_recvfd.3 \ + flux_rpc_destroy.3 \ + flux_rpc_check.3 \ + flux_rpc_get.3 \ + flux_rpc_completed.3 ADOC_FILES = $(MAN3_FILES_PRIMARY:%.3=%.adoc) XML_FILES = $(MAN3_FILES_PRIMARY:%.3=%.xml) @@ -63,6 +70,10 @@ flux_event_unsubscribe.3: flux_event_subscribe.3 flux_pollfd.3: flux_pollevents.3 flux_msg_decode.3: flux_msg_encode.3 flux_msg_recvfd.3: flux_msg_sendfd.3 +flux_rpc_destroy.3: flux_rpc.3 +flux_rpc_get.3: flux_rpc.3 +flux_rpc_check.3: flux_rpc_then.3 +flux_rpc_completed.3: flux_rpc_multi.3 EXTRA_DIST = $(ADOC_FILES) COPYRIGHT.adoc @@ -74,3 +85,21 @@ TESTS_ENVIRONMENT = \ man_dir=$(abs_srcdir) TESTS = spellcheck + +AM_CFLAGS = @GCCWARN@ + +AM_CPPFLAGS = \ + $(JSON_CFLAGS) $(ZMQ_CFLAGS) \ + -I$(top_srcdir) -I$(top_srcdir)/src/include + +LDADD = \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libtap/libtap.la \ + $(JSON_LIBS) $(ZMQ_LIBS) $(LIBPTHREAD) + +check_PROGRAMS = \ + trpc \ + trpc_then \ + trpc_then_multi + diff --git a/doc/man3/flux_rpc.adoc b/doc/man3/flux_rpc.adoc new file mode 100644 index 000000000000..07cdd6b40af3 --- /dev/null +++ b/doc/man3/flux_rpc.adoc @@ -0,0 +1,134 @@ +flux_rpc(3) +=========== +:doctype: manpage + + +NAME +---- +flux_rpc, flux_rpc_get, flux_rpc_destroy - perform a remote procedure call to a Flux service + + +SYNOPSIS +-------- +#include + +flux_rpc_t *flux_rpc (flux_t h, const char *topic, const char *json_str, + uint32_t nodeid, int flags); + +void flux_rpc_destroy (flux_rpc_t *rpc); + +int flux_rpc_get (flux_rpc_t *rpc, uint32_t *nodeid, const char **json_str); + + +DESCRIPTION +----------- + +`flux_rpc()` constructs a request for the Flux service identified by +_topic_, with optional payload _json_str_, that will routed be according +to _nodeid_. The request message is assigned a matchtag from the handle +matchtag pool, and is then sent to the broker via handle _h_. +A flux_rpc_t object is returned to the caller, used to complete the RPC. + +_nodeid_ affects how the broker will route the request, and may be set +to one of the following values: + +FLUX_NODEID_ANY:: +Route to first available matching service instance. + +FLUX_NODEID_UPSTREAM:: +Route upstream via the tree based overlay network, then to first available +matching service instance, skipping any instance on the sending rank. + +integer:: +Route to a specific broker rank. + +If _json_str_ is non-NULL, it must represent valid serialized JSON, +and will be attached as request payload. + +Flags may be zero or a mask of the following values: + +FLUX_RPC_NORESPONSE:: +No response is expected, and the request will not be assigned a matchtag. +The flux_rpc_t may be immediately destroyed. + +`flux_rpc_get()` obtains the result of the RPC, blocking until the response +is received. If non-NULL, _nodeid_ is set to the _nodeid_ argument given +to `flux_rpc()` -- primarily useful with `flux_rpc_multi(3)`. Similarly, +_json_str_, if non-NULL, is set the response payload. The storage associated +with _json_str_ belongs to the flux_rpc_t object. It is a protocol error +if _json_str_ is NULL and the response has an unexpected payload, or if +_json_str_ is non-NULL and the payload is missing. + +`flux_rpc_destroy()` destroys a completed `flux_rpc_t`, invalidating +any payload returned by `flux_rpc_get()`, and freeing matchtags. + + +CANCELLATION +------------ + +Flux RFC 6 does not currently specify a cancellation protocol for an +individual RPC, but does stipulate that an RPC may be canceled if a disconnect +message is received, as is automatically generated by the local connector +upon client disconnection. + +If `flux_rpc_destroy()` is called before a response is received, a +matchtag value from the handle _h_'s matchtag pool is leaked. +If enough matchtags are leaked, it will be impossible to make RPC calls +on that handle. + + +RETURN VALUE +------------ + +`flux_rpc()` returns a flux_rpc_t object on success. On error, NULL +is returned, and errno is set appropriately. + +`flux_rpc_get()` returns zero on success. On error, -1 is returned, +and errno is set appropriately. + + +ERRORS +------ + +ENOSYS:: +Handle has no send operation. + +EINVAL:: +Some arguments were invalid. + +EPROTO:: +A protocol error was encountered. + + +EXAMPLES +-------- + +This example performs a synchronous RPC with the broker's "cmb.info" service +and extracts the broker's rank. + +.... +include::trpc.c[] +.... + + +AUTHOR +------ +This page is maintained by the Flux community. + + +RESOURCES +--------- +Github: + + +COPYRIGHT +--------- +include::COPYRIGHT.adoc[] + + +SEE ALSO +--------- +flux_rpc_then(3), flux_rpc_multi(3) + +https://github.com/flux-framework/rfc/blob/master/spec_6.adoc[RFC 6: Flux +Remote Procedure Call Protocol] diff --git a/doc/man3/flux_rpc_multi.adoc b/doc/man3/flux_rpc_multi.adoc new file mode 100644 index 000000000000..4e210ad7036b --- /dev/null +++ b/doc/man3/flux_rpc_multi.adoc @@ -0,0 +1,116 @@ +flux_rpc_multi(3) +================= +:doctype: manpage + + +NAME +---- +flux_rpc_multi, flux_rpc_completed, - send a remote procedure call to a Flux service on multiple ranks + + +SYNOPSIS +-------- +#include + +flux_rpc_t *flux_rpc_multi (flux_t h, const char *topic, const char *json_str, + const char *nodeset, int flags); + +bool flux_rpc_completed (flux_rpc_t *rpc); + +DESCRIPTION +----------- + +`flux_rpc_multi()` sends requests to a Flux service identified by _topic_ +via Flux broker handle _h_. The _nodeset_ represents a set of ranks +that will receive the request, and is specified in bracketed range format, +e.g. "[0-255]", "[1,3,5-10]", or "all" for all ranks in the session. + +If _json_str_ is non-NULL, it should represent a valid JSON string +that will be attached as request payload. + +Flags may be zero or a mask of the following values: + +FLUX_RPC_NORESPONSE:: +No response will be expected to the request, and the request will not be +assigned a matchtag. + +`flux_rpc_get(3)` may be used, as with `flux_rpc(3)`, to process responses. +Each call to `flux_rpc_get(3)` invalidates any payload obtained via a +previous call. + +`flux_rpc_check(3)` may be used, as with `flux_rpc(3)`, to determine +whether `flux_rpc_get(3)` will block. + +`flux_rpc_completed()` returns true once all the RPC responses have been +received and handled via `flux_rpc_get()`. It can be used to terminate +synchronous response collection, e.g. +.... +while (!flux_rpc_completed (rpc)) + flux_rpc_get (rpc, &nodeid, &payload); +.... + +`flux_rpc_then(3)` may be used, as with `flux_rpc(3)`, to register a +reactor callback to handle each RPC responses message. + +`flux_rpc_destroy(3)` should be used to dispose of the flux_rpc_t object +once the RPC has completed. After this function is called, any payload +returned by `flux_rpc_get()` is invalidated. + + +RETURN VALUE +------------ + +`flux_rpc_multi()` returns a flux_rpc_t object on success. On error, NULL +is returned, and errno is set appropriately. + +`flux_rpc_completed()` returns true if the RPC has completed, else false. +It does not report any errors. + + +ERRORS +------ + +ENOSYS:: +Handle has no send operation. + +EINVAL:: +Some arguments were invalid. + +EPROTO:: +A protocol error was encountered. + + +EXAMPLE +------- + +This example performs an RPC with the broker's "cmb.info" service on all +ranks. A continuation is registered to process the responses as they arrive. +The reactor loop terminates once the RPC is completed since the completion +is its only event handler. + +.... +include::trpc_then_multi.c[] +.... + + +AUTHOR +------ +This page is maintained by the Flux community. + + +RESOURCES +--------- +Github: + + +COPYRIGHT +--------- +include::COPYRIGHT.adoc[] + + +SEE ALSO +--------- +flux_rpc(3), flux_rpc_then(3) + +https://github.com/flux-framework/rfc/blob/master/spec_6.adoc[RFC 6: Flux +Remote Procedure Call Protocol] diff --git a/doc/man3/flux_rpc_then.adoc b/doc/man3/flux_rpc_then.adoc new file mode 100644 index 000000000000..2400724ddecd --- /dev/null +++ b/doc/man3/flux_rpc_then.adoc @@ -0,0 +1,93 @@ +flux_rpc_then(3) +================ +:doctype: manpage + + +NAME +---- +flux_rpc_check, flux_rpc_then - asynchronous RPC interface + + +SYNOPSIS +-------- +#include + +typedef void (*flux_then_f)(flux_rpc_t *rpc, void *arg); + +bool flux_rpc_check (flux_rpc_t *rpc); + +int flux_rpc_then (flux_rpc_t *rpc, flux_then_f cb, void *arg); + + +DESCRIPTION +----------- + +`flux_rpc_check()` and `flux_rpc_then()` may be used to conditionally +complete an RPC operation that was started with `flux_rpc(3)` or +`flux_rpc_multi(3)`, asynchronously. + +`flux_rpc_check()` returns true if the RPC response has been received +and a call to `flux_rpc_get(3)` would not block. It returns false if +the response has not yet been received. + +`flux_rpc_then()` may be used to register a continuation callback with +the broker handle's reactor. The continuation will be called when the +RPC response is received. It should call `flux_rpc_get(3)` to obtain the +RPC result. + +When the RPC is complete, the continuation is stopped as +far as the reactor is concerned. It is not safe to destroy the +flux_rpc_t object from within the continuation callback. + + +RETURN VALUE +------------ + +`flux_rpc_check()` returns true or false and does not detect failure. + +`flux_rpc_then()` returns zero on success. On error, -1 is returned, +and errno is set appropriately. + + +ERRORS +------ + +EINVAL:: +Some arguments were invalid. + + +EXAMPLES +-------- + +This example performs an RPC with the broker's "cmb.info" service +to obtain the broker's rank. If the response is available immediately, +it is processed synchronously. Otherwise, a continuation is registered to +process the response when it arrives, meanwhile allowing other reactor +watchers (not shown) to execute. + +.... +include::trpc_then.c[] +.... + + +AUTHOR +------ +This page is maintained by the Flux community. + + +RESOURCES +--------- +Github: + + +COPYRIGHT +--------- +include::COPYRIGHT.adoc[] + + +SEE ALSO +--------- +flux_rpc(3), flux_rpc_multi(3) + +https://github.com/flux-framework/rfc/blob/master/spec_6.adoc[RFC 6: Flux +Remote Procedure Call Protocol] diff --git a/doc/man3/trpc.c b/doc/man3/trpc.c new file mode 100644 index 000000000000..b3d7657b38ff --- /dev/null +++ b/doc/man3/trpc.c @@ -0,0 +1,31 @@ +#include +#include "src/common/libutil/shortjson.h" + +void get_rank (flux_rpc_t *rpc) +{ + const char *json_str; + JSON o; + int rank; + + if (flux_rpc_get (rpc, NULL, &json_str) < 0) + err_exit ("flux_rpc_get"); + if (!(o = Jfromstr (json_str)) || !Jget_int (o, "rank", &rank)) + msg_exit ("response protocol error"); + printf ("rank is %d\n", rank); + Jput (o); +} + +int main (int argc, char **argv) +{ + flux_t h; + flux_rpc_t *rpc; + + if (!(h = flux_open (NULL, 0))) + err_exit ("flux_open"); + if (!(rpc = flux_rpc (h, "cmb.info", NULL, FLUX_NODEID_ANY, 0))) + err_exit ("flux_rpc"); + get_rank (rpc); + flux_rpc_destroy (rpc); + flux_close (h); + return (0); +} diff --git a/doc/man3/trpc_then.c b/doc/man3/trpc_then.c new file mode 100644 index 000000000000..05bf5c779109 --- /dev/null +++ b/doc/man3/trpc_then.c @@ -0,0 +1,37 @@ +#include +#include "src/common/libutil/shortjson.h" + +void get_rank (flux_rpc_t *rpc, void *arg) +{ + const char *json_str; + JSON o; + int rank; + + if (flux_rpc_get (rpc, NULL, &json_str) < 0) + err_exit ("flux_rpc_get"); + if (!(o = Jfromstr (json_str)) || !Jget_int (o, "rank", &rank)) + msg_exit ("response protocol error"); + printf ("rank is %d\n", rank); + Jput (o); +} + +int main (int argc, char **argv) +{ + flux_t h; + flux_rpc_t *rpc; + + if (!(h = flux_open (NULL, 0))) + err_exit ("flux_open"); + if (!(rpc = flux_rpc (h, "cmb.info", NULL, FLUX_NODEID_ANY, 0))) + err_exit ("flux_rpc"); + if (flux_rpc_check (rpc)) + get_rank (rpc, NULL); + else if (flux_rpc_then (rpc, get_rank, NULL)) + err_exit ("flux_rpc_then"); + if (flux_reactor_start (h) < 0) + err_exit ("flux_reactor_start"); + + flux_rpc_destroy (rpc); + flux_close (h); + return (0); +} diff --git a/doc/man3/trpc_then_multi.c b/doc/man3/trpc_then_multi.c new file mode 100644 index 000000000000..c4a712b534e3 --- /dev/null +++ b/doc/man3/trpc_then_multi.c @@ -0,0 +1,36 @@ +#include +#include "src/common/libutil/shortjson.h" + +void get_rank (flux_rpc_t *rpc, void *arg) +{ + const char *json_str; + JSON o; + int rank; + uint32_t nodeid; + + if (flux_rpc_get (rpc, &nodeid, &json_str) < 0) + err_exit ("flux_rpc_get"); + if (!(o = Jfromstr (json_str)) || !Jget_int (o, "rank", &rank)) + msg_exit ("response protocol error"); + printf ("[%u] rank is %d\n", nodeid, rank); + Jput (o); +} + +int main (int argc, char **argv) +{ + flux_t h; + flux_rpc_t *rpc; + + if (!(h = flux_open (NULL, 0))) + err_exit ("flux_open"); + if (!(rpc = flux_rpc_multi (h, "cmb.info", NULL, "all", 0))) + err_exit ("flux_rpc"); + if (flux_rpc_then (rpc, get_rank, NULL) < 0) + err_exit ("flux_rpc_then"); + if (flux_reactor_start (h) < 0) + err_exit ("flux_reactor_start"); + + flux_rpc_destroy (rpc); + flux_close (h); + return (0); +} diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index 3726df399256..2f01e225aac4 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -224,3 +224,10 @@ fd iobuf recvfd sendfd +matchtags +multi +NORESPONSE +rpc +str +trpc +bool diff --git a/t/t0002-request.t b/t/t0002-request.t index 5c37d6b40439..116d8cd7d8b9 100755 --- a/t/t0002-request.t +++ b/t/t0002-request.t @@ -10,6 +10,18 @@ Verify basic request/response/rpc handling. . `dirname $0`/sharness.sh test_under_flux 2 +test_expect_success 'flux_rpc(3) example runs' ' + ${FLUX_BUILD_DIR}/doc/man3/trpc +' + +test_expect_success 'flux_rpc_then(3) example runs' ' + ${FLUX_BUILD_DIR}/doc/man3/trpc_then +' + +test_expect_success 'flux_rpc_multi(3) example runs' ' + ${FLUX_BUILD_DIR}/doc/man3/trpc_then_multi +' + test_expect_success 'request: load req module on rank 0' ' flux module load -d --rank=0 \ ${FLUX_BUILD_DIR}/src/test/request/.libs/req.so