-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql: move a single remote flow to the gateway in some cases #70648
Conversation
5a8a790
to
a2b7ae8
Compare
This PR is like an extension to #68524 (which was meant to resolve the hot spot scenario for one of our largest customers). I have also seen this hot spot pattern when running TPCH workload with high concurrency, and I believe that this PR should be generally a beneficial change (and might be justifiable to backport for 21.2.1). My main questions are:
Curious about opinions from @cockroachdb/sql-queries. |
a2b7ae8
to
3d8327a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool! This is definitely the kind of decision that the optimizer should be making eventually, but I think it makes sense to do this as a stopgap for now. But I'm also a bit worried that this could cause us to send a lot more data over the network than we would otherwise. Maybe you could take advantage of the optimizer row count estimates to make the decision? e.g., if the output row count from the remote flow is significantly lower than the scan row counts, you might want to leave the remote flow as-is. Otherwise, you can move it to the gateway.
I picked a subset of processors that could cause the hot spot scenario and don't reduce the cardinality of the data. An alternative is to move the flow back to the gateway by default, except for limited subset of plans (like a scan with a filter and nothing else).
I'd start with a smaller change that you know is likely to be beneficial.
what do we want the EXPLAIN to say in case the single flow is moved back to the gateway? I'm probably leaning towards saying "distribution: local".
The plan is effectively local in this case, right? If so, I think I agree that we should say "distribution: local".
Reviewed 9 of 9 files at r1, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)
-- commits, line 27 at r1:
nit: node become -> node to become
pkg/sql/distsql_physical_planner.go, line 4088 at r1 (raw file):
// or that performs the KV work. moveFlowToGateway = true }
what if there is also a Select / InvertedFilter / group by somewhere in the flow? wouldn't that possibly make the distributed plan better?
3d8327a
to
4a78bd4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could take advantage of the optimizer row count estimates to make the decision?
Thanks, I like this idea, implemented. I also updated the EXPLAIN output to show the actual distribution.
I think I only want to add some tests to make sure that
- if there are no stats, the flow is not moved
- if the data cardinality is reduced significantly, then the flow is also not moved.
But otherwise, RFAL.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft)
pkg/sql/distsql_physical_planner.go, line 4088 at r1 (raw file):
Previously, rytaft (Rebecca Taft) wrote…
what if there is also a Select / InvertedFilter / group by somewhere in the flow? wouldn't that possibly make the distributed plan better?
Yes, it might. I think now that we rely on the estimation of "row reduction ratio", it should cover those cases.
7a543f7
to
5244b2d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some tests, PTAL.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft)
pkg/sql/opt/exec/execbuilder/testdata/distsql_single_flow, line 15 at r5 (raw file):
# There are no stats on the table, so the single flow should stay on the remote # node. # TODO(yuzefovich): currently the optimizer assumes that t has 1000 rows, so it
Can the optimizer tell us whether the stats are missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft)
pkg/sql/opt/exec/execbuilder/testdata/distsql_single_flow, line 15 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Can the optimizer tell us whether the stats are missing?
nvm, I see where to check for this.
5244b2d
to
6a5c876
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, done updating, sorry for the noise. RFAL.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft)
pkg/sql/opt/exec/execbuilder/relational.go, line 2035 at r7 (raw file):
return nil, err } // TODO: I'm not sure whether this is correct.
Not sure about this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
Reviewed 17 of 30 files at r2, 24 of 24 files at r4, 15 of 26 files at r5, 12 of 12 files at r7, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft and @yuzefovich)
pkg/sql/distsql_physical_planner.go, line 4077 at r7 (raw file):
const rowReductionRatio = 10 keepPlan := rowCount <= 0 || float64(plan.TotalEstimatedScannedRows)/float64(rowCount) >= rowReductionRatio if !keepPlan {
nit: you could just return here if keepPlan
is true
pkg/sql/opt/exec/execbuilder/relational.go, line 2035 at r7 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Not sure about this one.
I would think it should be rec.Recursive
, since that's what is used to build the plan here.
pkg/sql/physicalplan/physical_plan.go, line 1243 at r4 (raw file):
// two parts of it. func (a PlanDistribution) compose( b PlanDistribution, allowPartialDistribution bool,
I'm struggling to understand the need for this parameter. Seems like you had to update a lot of code to include it, and I still don't quite see why it's necessary. Is there any way to simplify this?
Previously, the `distribution` info in `EXPLAIN` output was printed based on the recommendation about the distribution of the plan. For example, if the plan is determined as "should be distributed", yet it only contains a single flow on the gateway, we would say that the plan has "full" distribution. This commit updates the code to print the distribution based on the actual physical plan (in the example above it would say "local"), regardless of the reason - whether it is the recommendation to plan locally or the data happened to be only on the gateway. I think it makes more sense this way since now DISTSQL diagram consisting of a single flow on the gateway more appropriately corresponds to "local" distribution. Additionally, this change is motivated by the follow-up commit which will introduce changes to the physical plan during the plan finalization, and we want to show the correct distribution in the EXPLAIN output for that too. Release note: None
This commit updates the physical planner to move a single remote flow onto the gateway in some cases, namely when - the flow contains a processor that might increase the cardinality of the data flowing through it or that performs the KV work - we estimate that the whole flow doesn't reduce the cardinality when compared against the number of rows read by the table readers. To be conservative, when there is no estimate, we don't apply this change to the physical plan. The justification behind this change is the fact that we're pinning the whole physical planning based on the placement of table readers. If the plan consists only of a single flow, and the flow is quite expensive, then with high enough frequency of such flows, the node having the lease for the ranges of the table readers becomes the hot spot (we have seen this in practice a few months ago). In such a scenario we might now choose to run the flow locally to distribute the load on the cluster better (assuming that the queries are issued against all nodes with equal frequency). The EXPLAIN output will correctly say "distribution: local" if the flow is moved to the gateway. Release note (bug fix): Some query patterns that previously could cause a single node to become a hot spot have been fixed so that the load is evenly distributed across the whole cluster.
6a5c876
to
7607dad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft)
pkg/sql/physicalplan/physical_plan.go, line 1243 at r4 (raw file):
Previously, rytaft (Rebecca Taft) wrote…
I'm struggling to understand the need for this parameter. Seems like you had to update a lot of code to include it, and I still don't quite see why it's necessary. Is there any way to simplify this?
The idea behind "partial distribution" is to show that a part of the plan is fully distributed whereas another part of the plan couldn't be distributed, so the latter is executed locally. For example, we could plan table readers on multiple nodes for a single scan (i.e. "full" distribution), but then if we have a render expression involving a function that is Distsql-blocklisted, we have to evaluate that render on the gateway; such a plan with the new distsql spec factory would get "partial distribution".
The need for allowPartialDistribution
argument came from the desire to not change "distribution" info for plans created by the old exec factory. If we were to keep the previous code of compose
, then we would often see "partial" distribution when we have a stage of multiple remote processors and then we have a noop processor on the gateway - we would misleadingly say that we couldn't distribute the last stage, so we got the "partial" distribution. However, we actually don't want to "distribute" the last noop processor that is placed on the gateway simply to propagate the result to the client.
I think we have several options:
- plumb
allowPartialDistribution
parameter here (what I'm currently proposing) - this mostly preserves the existing behavior of the old exec factory and the new distsql spec factory - disallow the "partial" distribution entirely - this will change the behavior of the new distsql spec factory and would call some plans as "fully" distributed when there are parts that have to only run on the gateway
- allow the "partial" distribution for the old exec factory - this will likely be confusing to users.
I've just added a temporary third commit that shows the change to the existing tests for the third option from above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 27 of 27 files at r8, 25 of 25 files at r9, 18 of 18 files at r10, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft and @yuzefovich)
pkg/sql/opt/exec/execbuilder/testdata/experimental_distsql_planning_5node, line 145 at r10 (raw file):
SET disable_partially_distributed_plans = true # TODO(yuzefovich): for some reason the plan below is not local, figure it out.
can you remove this TODO now?
pkg/sql/physicalplan/physical_plan.go, line 1243 at r4 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
The idea behind "partial distribution" is to show that a part of the plan is fully distributed whereas another part of the plan couldn't be distributed, so the latter is executed locally. For example, we could plan table readers on multiple nodes for a single scan (i.e. "full" distribution), but then if we have a render expression involving a function that is Distsql-blocklisted, we have to evaluate that render on the gateway; such a plan with the new distsql spec factory would get "partial distribution".
The need for
allowPartialDistribution
argument came from the desire to not change "distribution" info for plans created by the old exec factory. If we were to keep the previous code ofcompose
, then we would often see "partial" distribution when we have a stage of multiple remote processors and then we have a noop processor on the gateway - we would misleadingly say that we couldn't distribute the last stage, so we got the "partial" distribution. However, we actually don't want to "distribute" the last noop processor that is placed on the gateway simply to propagate the result to the client.I think we have several options:
- plumb
allowPartialDistribution
parameter here (what I'm currently proposing) - this mostly preserves the existing behavior of the old exec factory and the new distsql spec factory- disallow the "partial" distribution entirely - this will change the behavior of the new distsql spec factory and would call some plans as "fully" distributed when there are parts that have to only run on the gateway
- allow the "partial" distribution for the old exec factory - this will likely be confusing to users.
I've just added a temporary third commit that shows the change to the existing tests for the third option from above.
Hmm... so basically with this change we would never show distribution: full
, either in the new or old exec factory? Doesn't seem ideal...
Is there a way to just make an exception for the final noop processor? Seems like we'd need that exception anyway to get the desired behavior for the new exec factory.
3ad6931
to
cab31e2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @rytaft, and @yuzefovich)
pkg/sql/physicalplan/physical_plan.go, line 1243 at r4 (raw file):
Hmm... so basically with this change we would never show
distribution: full
, either in the new or old exec factory?
Yeah, that was pretty much the case with the previous version of the third commit.
I adjusted the last commit to implement your suggestion. Things are better, but there are still some plans that are called "partial" that I'd like to be called "full". For example
We have a couple of stages of TableReaders that are planned on the gateway because they read from a single range that lives on the gateway, however, we call such distribution as "local", so when composing things, we'll end up with "partial" which is confusing.
We currently don't have enough expressivity for an example from above. I think I prefer my original approach of essentially disabling "partial" distribution for the old factory so that we mostly keep the existing behavior unchanged (given that we might backport this to 21.2) and that we could think through things better during the distsql spec work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 21 of 21 files at r11, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft)
pkg/sql/physicalplan/physical_plan.go, line 1243 at r4 (raw file):
I think I prefer my original approach...
Ok, that sounds fine
cab31e2
to
7607dad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft)
pkg/sql/opt/exec/execbuilder/testdata/experimental_distsql_planning_5node, line 145 at r10 (raw file):
Previously, rytaft (Rebecca Taft) wrote…
can you remove this TODO now?
No, but I don't think it's critical to figure this out since this only matters with the experimental settings enabled.
pkg/sql/physicalplan/physical_plan.go, line 1243 at r4 (raw file):
Previously, rytaft (Rebecca Taft) wrote…
I think I prefer my original approach...
Ok, that sounds fine
Removed the third commit, PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @rytaft)
TFTR! bors r+ |
Build succeeded: |
sql: show distribution info based on actual physical plan in EXPLAIN
Previously, the
distribution
info inEXPLAIN
output was printedbased on the recommendation about the distribution of the plan. For
example, if the plan is determined as "should be distributed", yet
it only contains a single flow on the gateway, we would say that the
plan has "full" distribution.
This commit updates the code to print the distribution based on the
actual physical plan (in the example above it would say "local"),
regardless of the reason - whether it is the recommendation to plan
locally or the data happened to be only on the gateway.
I think it makes more sense this way since now DISTSQL diagram
consisting of a single flow on the gateway more appropriately
corresponds to "local" distribution. Additionally, this change is
motivated by the follow-up commit which will introduce changes to the
physical plan during the plan finalization, and we want to show the
correct distribution in the EXPLAIN output for that too.
Release note: None
sql: move a single remote flow to the gateway in some cases
This commit updates the physical planner to move a single remote flow
onto the gateway in some cases, namely when
the data flowing through it or that performs the KV work
compared against the number of rows read by the table readers.
To be conservative, when there is no estimate, we don't apply this
change to the physical plan.
The justification behind this change is the fact that we're pinning the
whole physical planning based on the placement of table readers. If the
plan consists only of a single flow, and the flow is quite expensive,
then with high enough frequency of such flows, the node having the
lease for the ranges of the table readers becomes the hot spot (we have
seen this in practice a few months ago). In such a scenario we might now
choose to run the flow locally to distribute the load on the cluster
better (assuming that the queries are issued against all nodes with
equal frequency).
The EXPLAIN output will correctly say "distribution: local" if the flow
is moved to the gateway.
Informs: #59014.
Release note (bug fix): Some query patterns that previously could cause
a single node to become a hot spot have been fixed so that the load is
evenly distributed across the whole cluster.