-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
finish removing json_object from public flux api #215
Conversation
Attempting to perturb the source as little as possible, move the request/response/rpc functions that expose json_object arguments to a new internal library called libjsonc. This is a stopgap measure while transitioning the public API from json-c-specific 'json_object' parameters to 'const char *json_str', and other planned API changes. Eventually libjsonc will become a convenience library for internal use. Some build system gymnastics were required: - incorporate libsonc.la into libflux-internal.la - build libflux tests against libflux-internal.la + libflux-core.la to avoid cyclic dependency (libflux uses jsonc uses libflux) - #define new jsonc_ prefixed names for deprecated functions in the jsonc.h header, to avoid exporting them in libflux-core.la - include jsonc.h in the internal <flux/core.h> include file so that internal users get the name #defines and the new include without needing to be modified
New functions: flux_request_encode() flux_request_decode() Add unit tests.
This function includes code to check storage allocated to new payload overlaps old, to avoid zframe_reset() use-after-free. Add an exception for the case where the proposed payload *is* the old payload (as in echo tests), and do nothing in that case. Adjust unit tests accordingly.
Here's an example of int echo_cb (flux_t h, int type, zmsg_t **zmsg, void *arg)
{
int errnum = 0;
const char *json_str;
if (flux_request_decode (*zmsg, NULL, &json_str) < 0) {
errnum = errno;
goto done;
}
done:
if (flux_respond (h, zmsg, errnum, json_str) < 0)
zmsg_destroy (&zmsg);
return 0;
} (commit e78534e makes the reuse of the request-owned payload in the response valid). So the flow is basically: do stuff, falling through with errnum == 0 and a payload on success, or bail out early and goto done with errnum set nonzero. I think this structure could be applied to most of the message handers I have written, and will simplify them somewhat. This function doesn't try to log or do anything if the respond fails. It might want to at least try to log that, although depending on what is wrong, that might not work either. |
Wow, that seems like a huge improvement at first glance! So, |
Correct. I guess I backslid there a little. It could take a |
So is there anything to do if Also, would the new example for int echo_cb (flux_t h, int type, zmsg_t **zmsg, void *arg)
{
int rc;
int errnum = 0;
const char *json_str;
if (flux_request_decode (*zmsg, NULL, &json_str) < 0) {
errnum = errno;
goto done;
}
/* ... */
done:
if ((rc = flux_respond (h, *zmsg, errnum, json_str)) < 0)
/* flux_log (LOG_ERR, ...)? */
zmsg_destroy (zmsg);
return rc;
} Just trying to wrap my head around usage idioms... |
Reflecting on this a bit: if the underlying send fails, this is most likely fatal and may indicate that logging to the same handle would fail too. Same goes for ENOMEM errors. On the other hand if there is a message encoding error, this is unremarkable from a server standpoint, and might (at best) be worth a I think there are two questions here:
Hmm. |
In @grondo's example, message manipulation errors resulting from a malformed message are unlikely as the message would have already passed through the dispatch code and maybe some request decoding. Maybe it is reasonable to assume that any error returned by done:
if ((rc = flux_respond (h, *zmsg, errnum, json_str)) < 0)
flux_log (h, LOG_ERR, "%s: flux_respond: %s", __FUNCTION__, strerror (errno));
zmsg_destroy (zmsg);
return rc; // -1 stops reactor
} This is a little verbose to have to repeat in every handler. Maybe in reworking the reactor handlers (issue #124) we could say that returning -1 here indicates a "fatal application error", require errno to be set, and then either directly log/terminate reactor as above, or (better) call a user-defined application panic function. Issue #110 is open to revisit |
Another idea, not sure if I like it: Add a mechanism like |
@garlick -- you had also mentioned an idea of registering a fatal error handler with the flux handle. This seemed like a really good idea to me. For testing we could register a handler that ensures test failure (i.e. call Also, as you've said, this could simplify a lot of internal code because instead trying to pass error all the way back up the stack you can call the registered error handler in place. |
see commit 8977fbd - is this looking like the right interface? |
This is looking pretty good to me. Also, you may want to have a separate call of out-of-memory situations. It could call the same fatal error handler for now, but in the future, we may want to tie into some other memory-release hook or something (e.g. a callback to plugins or other flux services to free up caches, etc -- hey, you never know). Having the call signature be different in OOM situation would make this easier in the future. |
Hmm, yeah, maybe whether the |
I'm not ignoring your OOM suggestion. Just pondering how and whether a |
Actually probably not quite as necessary for separate OOM handler if |
Maybe @dun can comment if I'm missing something, but it looks like the two functions in lsd-tools are overriding different behaviors. The The proposed |
@garlick, agreed. In the new rpc tests using |
Shall I squash down the review-commits and submit a new PR? |
Yeah, this is looking good to me, though I hope we can get some comment from @trws at least for some more eyes. I don't think you need to close PR and open another. Can you just force-update the branch referenced here? (Doesn't actually matter to be one way or the other, but is nice to have the entire discussion in one thread) |
Applications may register a fatal error handler with the flux_t handle. If an internal error occurs such that using the handle afterwards would be inadvisable, the user's handler will be called. If the handler returns, internal errors will still be propagated back to callers. The user's handler will only be called once. Internal libflux-core code, or the application, may call flux_fatal_error() to initiate fatal error handling. If no handler was registered, or the handler returns, flux_fatal_error() will return, and error handling proceeds as with non-fatal errors. A macro FLUX_FATAL(h) is provided which calls flux_fatal_error() with the calling __FUNCTION__ and a message of strerror (errno).
Add message copy function that should be used in place of zmsg_dup(). It adds an option to avoid copying the payload, although in the initial implementation we copy the whole message and optionally delete the payload (fix later when reworking flux_msg_t).
New functions: flux_response_decode() flux_response_encode() flux_respond() Add unit tests.
if (!rpc->rx_msg) | ||
rpc_addmsg (rpc, zmsg); | ||
rpc->then_cb (rpc, rpc->then_arg); | ||
} while (*zmsg != NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface looks great to me, but I'm not sure I understand how the rpc message cleanup is being handled in the case of a continuation. This looks a bit like the zmsg is presumed to be destroyed by get, and if it isn't it loops forever. I guess the questions are, does get collect the message such that a subsequent get would error, and why the loop here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A subsequent get would block until the next response arrives.
The loop is to handle the case where a new response is received but there was a previous response left in the flux_rpc_t
e.g. by a _check()
call. In retrospect it seems like I should queue responses to an abitrary depth and not necessarily care if _get()
is called in the then handler.
Thoughts on how this should work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends on the semantics we want for then. If it is meant to consume the request, then it would be reasonable to discard that response whether get is called or not. Is there a reasonable way to grab a handle to the request that the callback is being passed so it can be collected after the call if the call doesn't collect it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an invariant here to be considered: the payload pointer returned by _get()
points directly into the message, so flux_rpc_t
has to keep the response message around for that pointer to remain valid. What we decided was that the next _get()
could invalidate the pointer returned by the previous _get()
. If this was not done, then all the responses would have to be kept in there, an unnecessary memory overhead if the payloads are just being converted to JSON objects as they arrive.
The current _then()
handling is trying to be level triggered, which is consistent with how the reactor works. We probably want that though it means an infinite loop if a callback is registered that doesn't consume responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One alternative would be to pass each callback both the RPC and the
actual message intended for it, rather than relying on it to call get
explicitly that moves the get out to a deterministic location in the
caller.
On 4 Jun 2015, at 16:57, Jim Garlick wrote:
return rc;
}+/* N.B. if the user's 'then' callback doesn't call flux_rpc_get(),
- * the callback will be called again (ad infinitum)
- _/
+static int rpc_cb (flux_t h, int type, zmsg_t *_zmsg, void *arg)
+{- flux_rpc_t rpc = arg;
- assert (rpc->then_cb != NULL);
- do {
garbage_collect (rpc);
if (!rpc->rx_msg)
rpc_addmsg (rpc, zmsg);
rpc->then_cb (rpc, rpc->then_arg);
- } while (*zmsg != NULL);
There is an invariant here to be considered: the payload pointer
returned by_get()
points directly into the message, soflux_rpc_t
has to keep the response message around for that pointer to remain
valid. What we decided was that the next_get()
could invalidate
the pointer returned by the previous_get()
. If this was not done,
then all the responses would have to be kept in there, an unnecessary
memory overhead if the payloads are just being converted to JSON
objects as they arrive.The current
_then()
handling is trying to be level triggered, which
is consistent with how the reactor works. We probably want that
though it means an infinite loop if a callback is registered that
doesn't consume responses.
Reply to this email directly or view it on GitHub:
https://github.com/flux-framework/flux-core/pull/215/files#r31777981
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point I'm not convinced there is a problem here. The infinite loop is what you'd get in most reactor callbacks if you don't clear the condition that triggered the callback.
It feels like the wrong direction to expose the message in the _then()
continuation when we've got a fairly nice abstraction that hides messages behind _get()
.
Keep pushing if you feel strongly on this one, but I'm leaning towards leaving it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel strongly about any of the implementation details really. It
just seems like a source of potentially nasty bugs later to have an
infinite loop dependent on external callback code to continue. That
said, it may just be that I'm being over-cautious.
On 5 Jun 2015, at 10:40, Jim Garlick wrote:
return rc;
}+/* N.B. if the user's 'then' callback doesn't call flux_rpc_get(),
- * the callback will be called again (ad infinitum)
- _/
+static int rpc_cb (flux_t h, int type, zmsg_t *_zmsg, void *arg)
+{- flux_rpc_t rpc = arg;
- assert (rpc->then_cb != NULL);
- do {
garbage_collect (rpc);
if (!rpc->rx_msg)
rpc_addmsg (rpc, zmsg);
rpc->then_cb (rpc, rpc->then_arg);
- } while (*zmsg != NULL);
At this point I'm not convinced there is a problem here. The infinite
loop is what you'd get in most reactor callbacks if you don't clear
the condition that triggered the callback.It feels like the wrong direction to expose the message in the
_then()
continuation when we've got a fairly nice abstraction that
hides messages behind_get()
.Keep pushing if you feel strongly on this one, but I'm leaning towards
leaving it as is.
Reply to this email directly or view it on GitHub:
https://github.com/flux-framework/flux-core/pull/215/files#r31834323
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, compromise: I opened issue #216 on this. We can revisit before we finalize the first pass through the API. I need to make progress in other areas and for some reason I'm having a hard time getting my head around this one right now, but your point seems like a good one.
This test was silently dumping core and passing. As this is not our failure, and planned changes to the flux message type may get us weaned off of zmsg_t in the near term, I chose to disable the test rather than run it down.
Add a simple connector that echoes messages back to the sender, to be used in testing.
Add tests that exercise new rpc functions via the 'loop' connector. Since the loop connector must be built before the test can run, a new test directory 't/loop' was added.
Also: refactor the flog.c code to reflect the current simplistic design. At some point we need to revisit and reimplement the advanced features that were originally prototyped like log coalescing duplicate logs on the TBON, circular debug buffer, log filtering, multi-TBON-level syslog bridge etc.. Changed external interface to broker to flux_log_json() instead of flux_log_zmsg() and updated broker accordingly.
Also, change flux_lspeer() to return a json string rather than json_object, and update its users, cmd/flux-comms.c and module/live/live.c.
Also get rid of json_objects in the module API. As a result, several functions changed prototypes, and an intermediate object flux_modlist_t was introduced to simplify the rather convoluted set of functions we had for manipulating lists of modules. Update users (broker, flux-module command, unit and system tests).
Now that libflux no longer depends on libjsonc (which in turn depends on libflux), stop linking libflux unit tests against ../libflux-core.la ../libflux-internal.la to resolve the circular dependency. That was confounding an edit/make-check/edit/makecheck workflow in libflux (one had to remake the ../*.la's in order for unit tests to see changes).
return -1; | ||
} | ||
rpc->then_cb = cb; | ||
rpc->then_arg = arg; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Come to think of it, is a single then
that can't be reset or changed the desired behavior here? I would somewhat expect either to have it be a set mechanism, removing the old `then, or for it to allow a list of them to be specified by calling it multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. I like the idea of multiple then
callbacks, but if that isn't needed then maybe we could return the old callback (kind of like sigaction(2)
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah that makes sense, why'd I do that?
Let's return the old then cb.
Let's not do multiple thens unless we have a specific use case in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is returning the old function useful without also returning the old arg? If not, maybe it's OK to just return 0 and allow the cb to be updated...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@trws, very reasonable suggestion imo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'd have been fine with overwriting the arg
. The intention (which maybe I've failed to express) is that it's the sole property of the caller, and is only passed through to a continuation callback so the caller can recover its context or whatever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So do you guys think we need flux_rpc_then_get()
? Easy to add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The arg would be property of the caller, but the nature of the interface of the callback is such that the caller may not wait for it to complete right? In a case like that, arg may be implicitly owned by the continuation and require cleanup. If that's not going to be allowed, then just overwriting it would be fine, but we should document the restriction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, for now I will
- modify
flux_rpc_then()
to allow multiple calls - document that internal (cb, arg) refs are overwritten by each call
- handle setting cb to NULL (unregister reactor callback)
In addition, I noticed a bug: if _check()
is called before _then()
, a response may be cached in the flux_rpc_t
that the user's continuation won't see until the next response is received. I'll fix that and add a test.
Re: |
I'm going to go ahead and force update this PR with the feedback commits so far squashed, then we can iterate some more if necessary. |
thanks @dun for commenting on your 13yo code! |
Unregister reactor handler if called with 'cb' set to NULL. Overwrite (cb, arg) if already set.
If a response is cached in the rpc handle before flux_rpc_then() is called, push the response back into the handle to make the reactor ready, so the continuation will be called.
Rather than looping in the rpc implementation if there are still responses to consume after the user's continuation callback runs, push messages back into the receive queue and let the reactor call the handler again based on its concept of fairnes.
If a message is cached in the rpc handle before _then continuation is registered, e.g. due to an earlier flux_rpc_check() call, the callback would not be called until the next response arrived.
Ok, e640d9f (just pushed) should address multiple calls to bf5913e takes the loop out of the reactor callback for continuations, and instead ensures that any unconsumed messages are requeued in the flux_t handle and so the reactor will call the callback again. This does not avoid the infinite loop discussed, but it does place the loop in the reactor where there is at least a hope that the reactor/dispatch will at some point be smart enough to allow other things to run in between continuations for the same rpc. I took the opportunity to simplify the code a bit. |
Looks good, and it sounds like this is ready to go in. |
I agree, just waiting on travis result. On Fri, Jun 5, 2015 at 2:00 PM, Tom Scogland [email protected]
|
finish removing json_object from public flux api, including new rpc API
Eliminates warnings from autogen.sh by removing references to tests introduced and then removed in pr flux-framework#215.
drop detrius in the aftermath of pr #215, fatalize more key functions, tweak flux_rpc_get()
Eliminates warnings from autogen.sh by removing references to tests introduced and then removed in pr flux-framework#215.
Eliminates warnings from autogen.sh by removing references to tests introduced and then removed in pr flux-framework#215.
This is a rework of PR #210 which was rework of PR #204.
This PR abolishes the specialized send/recv functions in earlier attempts. With the enhanced RPC, there seems little justification for wrapping
flux_sendmsg()
,_recvmsg()
/_recvmsg_match()
for each message type.event
Reverted
flux_event_send()
andflux_event_recv()
to be consistent with the other message types.request
response
flux_respond()
seems to work out a little nicer in module message handlers than its predecessors. Since it takes both errnum and payload args, ignoring payload if errnum is nonzero, only one call is needed in a handler, and there is less temptation to call the errnum one if the payload one fails, which was likely futile anyway.rpc
Discussed at length previously; main change is
flux_rpc_multi()
name change.