Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing hasNext behaviour #1745

Merged
merged 14 commits into from
Sep 14, 2022
Merged

Fixing hasNext behaviour #1745

merged 14 commits into from
Sep 14, 2022

Conversation

Geal
Copy link
Contributor

@Geal Geal commented Sep 12, 2022

we previously returned an empty graphql response at the end of the
response stream to set the hasNext field to false, to indicate that no
more responses will come.

That empty response is causing issues in some clients, so 24a00e6 was a
fix to set the hasNext on a deferred response from inside query
planner execution, but it does not account for parallel deferred
response executions, so one response might come with hasNext to false
then get another one.

This commit uses another approach, where we go through an
intermediate task that checks if the response stream is closed.

Fixes #1687

we previously returned an empty graphql response at the end of the
response stream to set the `hasNext` field to false, to indicate that no
more responses will come.

That empty response is causing issues in some clients, so 24a00e6 was a
fix to set the `hasNext` on a deferred response from inside query
planner execution, but it does not account for parallel deferred
response executions, so one response might come with `hasNext` to false
then get another one.

This commit attempts another solution, where we go through an
intermediate task that checks if the response stream is closed (it is a
channel, so it implements `FusedStream`). Unfortuantely, right now it
fals to recognize when the stream is closed

let stream = once(ready(first)).chain(rest).boxed();
let (mut sender2, receiver2) = futures::channel::mpsc::channel(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we find a better name ? :p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's "WiP"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll find a better name once I get it working :D

since the stream is marked as terminated from inside poll_next, we need
to call it a second time after getting a message, to check if it is
closed, but we cannot do that with an async method, since we need to
send the current message ASAP. So we call `try_next`, and depending on
its result, we either send the current message, or set hasNext on the
current message and send it if the channel is closed, or send the
current message, get the next one and try again to see if there's
another one
@jpvajda jpvajda added the bug label Sep 12, 2022
@abernix abernix removed the bug label Sep 13, 2022
@abernix
Copy link
Member

abernix commented Sep 13, 2022

@jpvajda I removed the bug label as we don't put bug labels on PRs but rather the issues that PRs close with fixes. Since #1687 is the issue this PR aims to close, the bug label is over there now 🪰 .

@Geal
Copy link
Contributor Author

Geal commented Sep 13, 2022

so this solution with channels appears to be working when testing manually, but not in integration tests. I am testing another solution using an atomic counter, but it is currently very racy.

Some issues I'm encountering here:

  • some deferred responses can be entirely created from the primary query, but their actual content is generated in the supergraph service, in the format_response call. In some cases, that can result in an empty deferred response that should not be sent. But at this point we might have already decided that this was the last response and set has_next on it
  • some deferred responses can be generated in parallel, and decrease the counter faster than the other end can keep up. That leads to cases where we're waiting for 2 deferred responses (counter = 2), both finish at the same time (counter = 0), now the response filter sees the counter at 0 and sets has_next to false on the first one, then it receives the second deferred response

@Geal
Copy link
Contributor Author

Geal commented Sep 13, 2022

I will need #1640 to land first: to make the behaviour more coherent, I will move the response formatting step to the execution service, so the execution service will always return correct responses

@Geal Geal changed the title WIP: attempt at fixing hasNext Fixing hasNext behaviour Sep 13, 2022
@Geal Geal marked this pull request as ready for review September 13, 2022 15:43
@Geal
Copy link
Contributor Author

Geal commented Sep 13, 2022

This is now working and can be reviewed (I'll clean up the remaining println calls tomorrow)

receiver
}

async fn consume_responses(
Copy link
Contributor

@BrynCooke BrynCooke Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be a race condition here.

In the case where stream.try_next() is called and returns an error as there may be more items in the stream, if the stream is then closed before the call to next in filter_stream, there will be no final empty response with has_next: false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_next is like doing next but without await: if there's a message in the stream, it will be returned by try_next, if not then try_next will return an error and the next call to next would wait for it.

In the case you are describing, try_next would not return an error if there are in flight messages. In the case where try_next would return an error, then somehow between returning from consume_responses and the call to next the stream gets new messages then is closed, then next would return a message, some calls to try_next would return messages, then when there's nothing remaining try_next would return Ok(None)`.

The one possible race I worry about is if messages are received and re-sent, then we await on next, then for whatever reason the stream is disconnected (maybe all the senders are dropped). Then we would need to add a final has_next = false response. But I don't see how this could play out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BrynCooke that last cvase should be addressed by 5210765

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

there could be a race condition where we consume and send all messages,
then await for the next one, then the stream is closed and we don't have
any message on which we would set `has_next`. So we detect that case and
add a last message
@Geal Geal enabled auto-merge (squash) September 14, 2022 09:57
@Geal Geal merged commit be96131 into main Sep 14, 2022
@Geal Geal deleted the geal/fix-hasnext branch September 14, 2022 10:10
@abernix abernix mentioned this pull request Sep 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

eager hasNext: false in @defer payloads
5 participants