-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement /restate/invocation/{id}/attach and /restate/invocation/{id}/output #1503
Conversation
The important bits of this PR are the new |
Tested with restatedev/e2e#334 locally and it works |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @slinkydeveloper. The changes look good to me. I mainly left minor comments. The one comment we probably should discuss is about linearizable reads and the need to complete requests by request type. I think with this approach we will not be able to provide linearizable reads of the result because the result might originate from before a read request was sent. What was the reasoning behind batch completing requests (not having to store multiple request ids on the PP side)?
waiting_responses: DashMap< | ||
(InvocationIdOrIdempotencyId, RequestType), | ||
HashMap<IngressResponseWaiterId, IngressInvocationResponseSender>, | ||
>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grouping an fulfilling requests by request type instead by IngressResponseWaiterId
can have some undesired side effects. Assume for example the following scenario: We have a service invocation that calls another service on completion which writes to an external DB. The user is observing this effect and then sends a get output request to the system in anticipation to get the fulfilled result of the first invocation. Now assume that there was an earlier get output request before the service invocation completed. If the non-completed get output response is slow, then this response might arrive after the second get output request has been created. Consequently, the non-completed get output response would also complete the second get output request even though one would expect to see a result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So long story short with this implementation we won't be able to achieve linearizable reads because the result of a read might originate from before the read request was sent.
c807cce
to
bfcecc5
Compare
@tillrohrmann this is ready for another implementation. To make the rebase simpler, i had to squash everything together, i'm sorry. Among the changes I've made:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @slinkydeveloper. The changes make sense to me. There is one thing I would like to better understand and one comment we might want to address:
- Why is it not possible to model the workflow calls as idempotent calls where the idempotency key is derived from the workflow name and key? If it were possible, then we might be able to simplify some special cases in the partition processor state machine.
- Giving the ingress direct access to the
PartitionStoreManager
is not ideal since we want to make the ingress independent of the worker (sooner than later). Instead, it would be better to use the network to let the ingress communicate with the worker to retrieve the output results.
} | ||
} | ||
|
||
impl InvocationStorageReader for InvocationStorageReaderImpl { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be great if the ingress wouldn't get direct access to the PartitionStoreManager
because we want to make the ingress independent of the worker (being able to start it as a separate role). Instead, it would be great if the ingress would talk to the respective node via our network to ask for the output data. Otherwise, we cement a bit the co-location of the ingress and the worker role.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As said below, this is purely an implementation detail of the InvocationStorageReaderImpl
and we can change this later.
if let Some(idempotency_id) = &idempotency_id { | ||
if service_invocation.invocation_target.invocation_target_ty() | ||
== InvocationTargetType::Workflow(WorkflowHandlerType::Workflow) | ||
{ | ||
warn!("The idempotency key for workflow methods is ignored!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me again understanding why we cannot model workflow calls as idempotent invocations where the idempotency key is derived from the workflow name and key? Maybe this is also something that we can document somewhere (differences between workflow calls and normal invocations).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment below. Where to document this, IDK, feel free to suggest it.
Because:
I also don't want to over-generalize now, because I think we're not completely sure about the workflow semantics, and keeping it specialized it's true it's not elegant and adds special cases, but makes it easier to tune for now.
Agree, but this can be hidden behind that "reader" abstraction. If that's ok for you, i would like to go ahead with the current implementation and iterate on that later. |
b9a7d0e
to
b1ecf03
Compare
* Minor stuff * We now correctly implement the distinction between get output and attach responses. This makes sure we don't complete pending attach with get output responses. * Also deduplicated the ingress response types by adding a wrapping type for the target node. * Another test, just in case, plus make sure the ingress dispatcher will try to unblock waiting responses both with idempotency id or invocation id * Implement /restate/invocation/{id}/attach and /restate/invocation/{id}/output. * Now the invocation id returned on /send with idempotency id is the one of the original invocation. * Implement the new commands and the invocation id notification in the PP * New ingress response variant to notify the invocation_id of an existing idempotent invocation. This is required to make sure that on /send idempotent requests requests, we return back the invocation_id of the invocation we "attach to" for a given idempotent invocation. * New state machine command to attach to an existing invocation Second part of the PR: * Add InvocationStorageReader to implement GetOutput, remove the GetOutput command * Send attach notification for workflow calls. * Feedback Add /workflow/.../output and /workflow/.../attach
b1ecf03
to
9e10002
Compare
Thanks for the clarification. The different behavior wrt to state makes sense.
Alrighty. Let's tackle this step after the release. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. +1 for merging :-)
Fix #1495 and #1494