Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: query hang due to blocked stream #14948

Closed
asubiotto opened this issue Apr 14, 2017 · 26 comments
Closed

distsql: query hang due to blocked stream #14948

asubiotto opened this issue Apr 14, 2017 · 26 comments
Labels
A-admission-control C-bug Code not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior. T-sql-queries SQL Queries Team
Milestone

Comments

@asubiotto
Copy link
Contributor

asubiotto commented Apr 14, 2017

image

I've labelled the interesting blocked reads and writes on the execution plan above. I will refer to specific hash joiners as A and B (as labelled in the execution plan).

The reason why this query hangs is that there are a bunch of blocked writes using the network connection that are not consumed (one feeding to B and another one that feeds into a JoinReader that in turn is blocked on writing to A). These unconsumed writes mean that the write in the circled read-write pair cannot use the connection due to a limit on the grpc connection window size.

So why aren't these writes that are using up the connection being read? They all eventually feed into the left source of a HashJoiner. And our logic to perform a hash join is read all of the right source first and then proceed to reading from the left source. B is blocked on A and A is blocked on a read from its right source that will never be fulfilled because both A's left source and B's left source are using the connection.

To alleviate this, I'm increasing the connection window size to be a multiple of the stream window size but this doesn't solve the problem. One thing we could do is add a message with which a HashJoiner can notify a left source that it should begin sending data. However, all processors will need to be extended with knowledge of this and use it and maybe modifying flow control like this is not something we want to do. We need to discuss and decide a best course of action.

cc @cockroachdb/distsql, @petermattis

Jira issue: CRDB-6097

@RaduBerinde
Copy link
Member

This kind of situation is not limited to joiners. Any ordered synchronizer could stop reading from one of its streams for arbitrary periods of time - especially when all values in one stream are greater than all values in another stream. I'm sure there are other cases. The design of the infrastructure relies on gRPC flow control.

Plumbing flow control messages in our layer constitutes reimplementing flow control ourselves rather than relying on gRPC. IMO it should be the last resort.

We should figure out what's the right thing to do in gRPC and see how hard it would be to implement it. We use our forked repo anyway, and we can contribute it upstream later. One idea: the stream window should not be fixed - it should start out at some fraction of the connection window, but as the connection window fills up, the stream window should get smaller and smaller (blocking new requests earlier). This negative feedback loop can be set up so that there is always some minimum window size available for every stream, up to some number of streams.

Another idea is to have a guaranteed stream window size in addition to the stream window size and the connection window size and say that each connection supports X streams. We reserve X * guaranteed stream window size from the connection window size ahead of time; as long as there are at most X streams, each should get its guaranteed window. Another way to phrase the same idea is that the connection window is not fixed, but grows with the number of streams (up to X). This is simpler than the previous idea but might not work as well.

In addition, we can limit the number of streams that use the same connection (maintaining a cache of multiple connection per host).

I will think some more about this and see if there is some literature out there.

@andreimatei
Copy link
Contributor

I'm also not a fan of adding our own flow control on top of the gRPC one, even in limited forms like what you're suggesting for joins (from unordered sources).

Radu, I'm not sure I understand your suggestions. For the first one - are you suggesting we dynamically adjust down a stream window after the stream has started? This would not help if the stream is currently using more window that what you want to adjust it too, right?
Or are you thinking that we'd just give new streams smaller and smaller windows based on the condition of the conn window at the time they start?

In the 2nd suggestion, I don't get what the interaction between the guaranteed stream window and the stream window is. How exactly do you reserve X * guaranteed sw from the conn window? How do you choose which stream gets one of these X slots versus going to the general conn window pool?

I think as a matter of principle, we do want to overcommit the conn window, in the expectation that no single stream will hog it for too long. We could build a deadlock detecting mechanism that detects when this overcommitting has gone way wrong.

Or my suggestion is still to get rid of the conn window all-together - make it infinite. We'd rely on higher level admission control to keep the size of receiver buffers in check. Or, besides that, we also integrate those buffers with our memory accounting and we start killing streams with large buffers when there's memory pressure.
I don't think such a change would be particularly risky. And to reduce the risk even more, we could isolate KV traffic from DistSQL traffic by moving DistSQL to dedicated gRPC connections.

@petermattis
Copy link
Collaborator

Thanks for the detailed explanation, @asubiotto. One question: the DistSQL processors push data to their outputs? I imagine it would be a big change to reverse that and have the processors pull data from their inputs. And I'm not sure that would fix the problem, though it seems similar to your idea of having processors notify their inputs of when they are ready for data.

@RaduBerinde
Copy link
Member

RaduBerinde commented Apr 15, 2017

Radu, I'm not sure I understand your suggestions. For the first one - are you suggesting we dynamically adjust down a stream window after the stream has started? This would not help if the stream is currently using more window that what you want to adjust it too, right?
Or are you thinking that we'd just give new streams smaller and smaller windows based on the condition of the conn window at the time they start?

The former. Right, streams that already filled their windows will be exceeding the reduced stream window, but that's ok. Here's a simple example, say one where we want to guarantee at least 32kb window for 6 streams:

  • conn window 256k, stream window 64k
  • stream 1 starts sending, it fills its 64k
  • stream 2 starts sending, it fills its 64k
  • now we are using half of our conn window, adjust the stream window target down to 32kb
  • streams 3 through 6 start sending, filling their 32k each.
    So yes, we can't "take back" the window for streams 1 and 2, but because we adjust while there is still space in the conn window we achieve the goal. If some data is consumed from stream 1 and it tries to send more it will bump against the 32k limit. When the total outstanding data decreases, the stream window target can go up again.

In the 2nd suggestion, I don't get what the interaction between the guaranteed stream window and the stream window is. How exactly do you reserve X * guaranteed sw from the conn window? How do you choose which stream gets one of these X slots versus going to the general conn window pool?

Each stream gets one of the slots. Effectively when you are trying to send, instead of checking against the conn window, check against the conn window - (X - number of established streams) * guaranteed sw. You can also think about it like this: the conn window starts at a lower value (by X * guaranteed sw), and it grows with each established stream (up to X).

Let's talk more offline.

@petermattis We considered pulling instead of pushing; the reason we didn't do it is that it can incur extra trips in terms of latency. Simply stated: it takes a trip to push something, but it takes a round-trip to pull something. Say node A is on the west coast and is 50 ms away from node B on the east coast. We are trying to run a query from node A, and we deploy flows on nodes A and B, and say there is a stream that is moving data form A to B. With pulling: it takes 50ms until the flow is deployed on B, then B issues a pull request to A which takes 50ms, and 50ms later B gets the first row (total 150ms until the first row). With pushing: it takes 50ms until the flow is deployed on B; by this time the flow on A is already running and B may be already getting the first row (total 50ms until the first row).

@RaduBerinde
Copy link
Member

Note: edited the stream example above a bit.

@petermattis
Copy link
Collaborator

@RaduBerinde In case you haven't looked, this is the code which handles the stream and connection windows. Notice the separate calls to wait here and here. Seems feasible to grow the connection window size based on the number of streams, though getting this right is important otherwise you get starvation scenarios.

@bdarnell
Copy link
Contributor

Plumbing flow control messages in our layer constitutes reimplementing flow control ourselves rather than relying on gRPC. IMO it should be the last resort.

True, although the same could be said for HTTP/2's flow control being implemented on top of TCP.

In the short term, I think raising the connection-level limit relative to the stream-level limit is the way to go. In the longer term, we could look at adjusting the stream-level limits based on the number of open streams.

@RaduBerinde
Copy link
Member

RaduBerinde commented Apr 17, 2017

True, although the same could be said for HTTP/2's flow control being implemented on top of TCP.

Right, but that was necessary because TCP lacked something fundamental to HTTP/2 (multi stream support). In our case we don't need something fundamentally new above what gRPC does, we just need to tweak the mechanics.

@petermattis I believe the heart of the logic is actually on the receiver side, which decides when to send window update messages (onRead called from places like this).
A question - in grpc the default stream window is 64k and the conn window is 1M. In our fork, you changed both to 2M. What motivated that change? Was there a performance benefit to the increase?

@a-robinson
Copy link
Contributor

A question - in grpc the default stream window is 64k and the conn window is 1M. In our fork, you changed both to 2M. What motivated that change? Was there a performance benefit to the increase?

Increasing the stream window was to improve snapshot performance on high-latency links. 64k was way too small for situations where one node is far away from the other two (#13687). I don't know that there was any strong reasoning for making it equal to the conn window.

@petermattis
Copy link
Collaborator

A question - in grpc the default stream window is 64k and the conn window is 1M. In our fork, you changed both to 2M. What motivated that change? Was there a performance benefit to the increase?

A 64KB stream window size combined with 60ms latencies (as seen on indigo) translates into ~1MB/sec which was causing snapshots to take too long to send.

@RaduBerinde
Copy link
Member

I see. Yeah for high bandwidth*delay connections we need big window sizes. But this is problematic if we want to make the connection window a multiple of the stream window. 2MB * num_streams * hum_hosts can add up to a lot of memory.

@petermattis
Copy link
Collaborator

2MB * num_streams * hum_hosts can add up to a lot of memory.

Yes it can. The current approach must be seen as a temporary bandaid.

@RaduBerinde
Copy link
Member

Another option would be to start with a small (64k) stream window and increase it as we receive data. That would allow streams that transfer data to have a large window while streams that are blocked from the beginning have a small window. The change would be along the lines of this (untested).

@petermattis
Copy link
Collaborator

The gRPC-go folks are promising to have dynamic window sizing this quarter. But yeah, doing something like starting at 64K and doubling up to some max would be a better bandaid than what we have now. Question is whether we need to apply it right now.

@dianasaur323
Copy link
Contributor

I'm not sure what the decision is here, but assigning to @RaduBerinde and marking as a 1.0 issue for now. Feel free to re-assign and move milestone back.

@rjnn
Copy link
Contributor

rjnn commented Aug 15, 2017

It is my reading that this was fixed by #17328. Please reopen and reassign if you disagree.

@rjnn rjnn closed this as completed Aug 15, 2017
@RaduBerinde
Copy link
Member

I don't think that buffering messages in routers fixes this in general. It would if we buffered on the input side (in that case we would always drain gRPC streams).

As a very simple (though contrived) example imagine we have two pieces of two tables on host A and we are sending both to a hashjoiner on B. We may fill up the stream window with one table while the hashjoiner is waiting to
read from the other.

@RaduBerinde RaduBerinde reopened this Aug 16, 2017
@RaduBerinde
Copy link
Member

Moving to 1.2. The temporary solution we have in place (increased connection window limit) seems to be working fine.

@RaduBerinde RaduBerinde added this to the 1.2 milestone Aug 29, 2017
@asubiotto
Copy link
Contributor Author

I wrote a little test scenario for this and found that we currently don't suffer from this anymore because connection flow control has been decoupled from stream flow control in GRPC, allowing an unbounded number of streams to send up to their stream window to a remote node. The downside of this is that the receiver could blow up due to unbounded memory usage. Filed this as a bug: grpc/grpc-go#2792.

I also experimented with dynamic window resizing in #35161 but found that that's done at the connection level, so new streams are created with the maximum window size, which doesn't help us much. I'm wondering whether this is desired behavior and might file a feature request for stream-level dynamic window resizing.

@jordanlewis jordanlewis removed the S-2-temp-unavailability Temp crashes or other availability problems. Can be worked around or resolved by restarting. label Sep 17, 2019
@jordanlewis
Copy link
Member

We're unmarking this as severe because it can't happen in 19.1 or 19.2.

@github-actions
Copy link

github-actions bot commented Jun 9, 2021

We have marked this issue as stale because it has been inactive for
18 months. If this issue is still relevant, removing the stale label
or adding a comment will keep it active. Otherwise, we'll close it in
10 days to keep the issue queue tidy. Thank you for your contribution
to CockroachDB!

@RaduBerinde RaduBerinde removed their assignment Jun 9, 2021
@jlinder jlinder added the T-sql-queries SQL Queries Team label Jun 16, 2021
@mgartner
Copy link
Collaborator

@yuzefovich do you think this is still an issue?

@yuzefovich
Copy link
Member

I'm not sure about this one. I skimmed over the comments on this issue, and the last one from Alfonso suggests that we shouldn't be seeing this problem in practice. Yet he didn't just close it, so I'm assuming that there is still some underlying issue, but it doesn't seem important to investigate further, thus I put it in the cold storage bucket (unless someone else watching this issue provides more context / insight).

@mgartner
Copy link
Collaborator

Thanks. I'm going to close this since there's no action item, as far as we can tell. Anyone should feel free to re-open if they disagree.

@asubiotto
Copy link
Contributor Author

Hello friends! What Yahor says is correct, this cannot happen in practice although that's due to grpc window semantics not being implemented properly grpc-go. I opened an issue for this upstream back in the day but there was no interest in fixing: grpc/grpc-go#2792. This can definitely be an issue if cockroach ever changes the underlying grpc implementation (e.g. moving to connect-go) or the windowing stuff gets fixed in grpc-go.

@mgartner
Copy link
Collaborator

mgartner commented Jun 1, 2023

@asubiotto Hello! Thanks for chiming in!

This can definitely be an issue if cockroach ever changes the underlying grpc implementation (e.g. moving to connect-go) or the windowing stuff gets fixed in grpc-go.

Given this, is there a test we can put in place to ensure that we'll quickly catch this incompatibility if we ever change the grpc library? That'd help me sleep easier at night.

@mgartner mgartner moved this to Done in SQL Queries Jul 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-admission-control C-bug Code not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior. T-sql-queries SQL Queries Team
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.