Skip to content

Commit

Permalink
sql: move a single remote flow to the gateway in some cases
Browse files Browse the repository at this point in the history
This commit updates the physical planner to move a single remote flow
onto the gateway in some cases, namely when
- the flow contains a processor that might increase the cardinality of
the data flowing through it or that performs the KV work
- we estimate that the whole flow doesn't reduce the cardinality when
compared against the number of rows read by the table readers.
To be conservative, when there is no estimate, we don't apply this
change to the physical plan.

The justification behind this change is the fact that we're pinning the
whole physical planning based on the placement of table readers. If the
plan consists only of a single flow, and the flow is quite expensive,
then with high enough frequency of such flows, the node having the
lease for the ranges of the table readers becomes the hot spot (we have
seen this in practice a few months ago). In such a scenario we might now
choose to run the flow locally to distribute the load on the cluster
better (assuming that the queries are issued against all nodes with
equal frequency).

The EXPLAIN output will correctly say "distribution: local" if the flow
is moved to the gateway.

Release note (bug fix): Some query patterns that previously could cause
a single node to become a hot spot have been fixed so that the load is
evenly distributed across the whole cluster.
  • Loading branch information
yuzefovich committed Oct 4, 2021
1 parent 4cc510b commit 7607dad
Show file tree
Hide file tree
Showing 25 changed files with 381 additions and 38 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ sql.defaults.datestyle enumeration iso, mdy default value for DateStyle session
sql.defaults.datestyle.enabled boolean false default value for datestyle_enabled session setting
sql.defaults.default_int_size integer 8 the size, in bytes, of an INT type
sql.defaults.disallow_full_table_scans.enabled boolean false setting to true rejects queries that have planned a full table scan
sql.defaults.distsql enumeration auto default distributed SQL execution mode [off = 0, auto = 1, on = 2]
sql.defaults.distsql enumeration auto default distributed SQL execution mode [off = 0, auto = 1, on = 2, always = 3]
sql.defaults.experimental_alter_column_type.enabled boolean false default value for experimental_alter_column_type session setting; enables the use of ALTER COLUMN TYPE for general conversions
sql.defaults.experimental_auto_rehoming.enabled boolean false default value for experimental_enable_auto_rehoming; allows for rows in REGIONAL BY ROW tables to be auto-rehomed on UPDATE
sql.defaults.experimental_distsql_planning enumeration off default experimental_distsql_planning mode; enables experimental opt-driven DistSQL planning [off = 0, on = 1]
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
<tr><td><code>sql.defaults.datestyle.enabled</code></td><td>boolean</td><td><code>false</code></td><td>default value for datestyle_enabled session setting</td></tr>
<tr><td><code>sql.defaults.default_int_size</code></td><td>integer</td><td><code>8</code></td><td>the size, in bytes, of an INT type</td></tr>
<tr><td><code>sql.defaults.disallow_full_table_scans.enabled</code></td><td>boolean</td><td><code>false</code></td><td>setting to true rejects queries that have planned a full table scan</td></tr>
<tr><td><code>sql.defaults.distsql</code></td><td>enumeration</td><td><code>auto</code></td><td>default distributed SQL execution mode [off = 0, auto = 1, on = 2]</td></tr>
<tr><td><code>sql.defaults.distsql</code></td><td>enumeration</td><td><code>auto</code></td><td>default distributed SQL execution mode [off = 0, auto = 1, on = 2, always = 3]</td></tr>
<tr><td><code>sql.defaults.experimental_alter_column_type.enabled</code></td><td>boolean</td><td><code>false</code></td><td>default value for experimental_alter_column_type session setting; enables the use of ALTER COLUMN TYPE for general conversions</td></tr>
<tr><td><code>sql.defaults.experimental_auto_rehoming.enabled</code></td><td>boolean</td><td><code>false</code></td><td>default value for experimental_enable_auto_rehoming; allows for rows in REGIONAL BY ROW tables to be auto-rehomed on UPDATE</td></tr>
<tr><td><code>sql.defaults.experimental_distsql_planning</code></td><td>enumeration</td><td><code>off</code></td><td>default experimental_distsql_planning mode; enables experimental opt-driven DistSQL planning [off = 0, on = 1]</td></tr>
Expand Down
69 changes: 69 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4049,9 +4049,76 @@ func (dsp *DistSQLPlanner) NewPlanningCtx(
return planCtx
}

// maybeMoveSingleFlowToGateway checks whether plan consists of a single flow
// on the remote node and would benefit from bringing that flow to the gateway.
func maybeMoveSingleFlowToGateway(planCtx *PlanningCtx, plan *PhysicalPlan, rowCount int64) {
if !planCtx.isLocal && planCtx.ExtendedEvalCtx.SessionData().DistSQLMode != sessiondatapb.DistSQLAlways {
// If we chose to distribute this plan, yet we created only a single
// remote flow, it might be a good idea to bring that whole flow back
// to the gateway.
//
// This comes from the limitation of pinning each flow based on the
// physical planning of table readers. However, if later stages of the
// plan contain other processors, e.g. joinReaders, the whole flow can
// become quite expensive. With high enough frequency of such flows, the
// node having the lease for the ranges of the table readers becomes the
// hot spot. In such a scenario we might choose to run the flow locally
// to distribute the load on the cluster better (assuming that the
// queries are issued against all nodes with equal frequency).

// If we estimate that the plan reads far more rows than it returns in
// the output, it probably makes sense to keep the flow as distributed
// (i.e. keep the computation where the data is). Therefore, when we
// don't have a good estimate (rowCount is negative) or the data
// cardinality is reduced significantly (the reduction ratio is at least
// 10), we will keep the plan as is.
const rowReductionRatio = 10
keepPlan := rowCount <= 0 || float64(plan.TotalEstimatedScannedRows)/float64(rowCount) >= rowReductionRatio
if keepPlan {
return
}
singleFlow := true
moveFlowToGateway := false
nodeID := plan.Processors[0].Node
for _, p := range plan.Processors[1:] {
if p.Node != nodeID {
singleFlow = false
break
}
core := p.Spec.Core
if core.JoinReader != nil || core.MergeJoiner != nil || core.HashJoiner != nil ||
core.ZigzagJoiner != nil || core.InvertedJoiner != nil {
// We want to move the flow when it contains a processor that
// might increase the cardinality of the data flowing through it
// or that performs the KV work.
moveFlowToGateway = true
}
}
if singleFlow && moveFlowToGateway {
for i := range plan.Processors {
plan.Processors[i].Node = plan.GatewayNodeID
}
planCtx.isLocal = true
planCtx.planner.curPlan.flags.Unset(planFlagFullyDistributed)
planCtx.planner.curPlan.flags.Unset(planFlagPartiallyDistributed)
plan.Distribution = physicalplan.LocalPlan
}
}
}

// FinalizePlan adds a final "result" stage and a final projection if necessary
// as well as populates the endpoints of the plan.
func (dsp *DistSQLPlanner) FinalizePlan(planCtx *PlanningCtx, plan *PhysicalPlan) {
dsp.finalizePlanWithRowCount(planCtx, plan, -1 /* rowCount */)
}

// finalizePlanWithRowCount adds a final "result" stage and a final projection
// if necessary as well as populates the endpoints of the plan.
// - rowCount is the estimated number of rows that the plan outputs. Use a
// negative number if the stats were not available to make an estimate.
func (dsp *DistSQLPlanner) finalizePlanWithRowCount(
planCtx *PlanningCtx, plan *PhysicalPlan, rowCount int64,
) {
// Find all MetadataTestSenders in the plan, so that the MetadataTestReceiver
// knows how many sender IDs it should expect.
var metadataSenders []string
Expand All @@ -4061,6 +4128,8 @@ func (dsp *DistSQLPlanner) FinalizePlan(planCtx *PlanningCtx, plan *PhysicalPlan
}
}

maybeMoveSingleFlowToGateway(planCtx, plan, rowCount)

// Add a final "result" stage if necessary.
plan.EnsureSingleStreamOnGateway()

Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,17 @@ func TestDistSQLRangeCachesIntegrationTest(t *testing.T) {
// precisely control the contents of the range cache on node 4.
tc.Server(3).DistSenderI().(*kvcoord.DistSender).DisableFirstRangeUpdates()
db3 := tc.ServerConn(3)
// Force the DistSQL on this connection.
_, err := db3.Exec(`SET CLUSTER SETTING sql.defaults.distsql = always; SET distsql = always`)
if err != nil {
t.Fatal(err)
}
// Do a query on node 4 so that it populates the its cache with an initial
// descriptor containing all the SQL key space. If we don't do this, the state
// of the cache is left at the whim of gossiping the first descriptor done
// during cluster startup - it can happen that the cache remains empty, which
// is not what this test wants.
_, err := db3.Exec(`SELECT * FROM "left"`)
_, err = db3.Exec(`SELECT * FROM "left"`)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -404,9 +409,6 @@ func TestDistSQLRangeCachesIntegrationTest(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, err := txn.Exec("SET DISTSQL = ALWAYS"); err != nil {
t.Fatal(err)
}

// Check that the initial planning is suboptimal: the cache on db3 is unaware
// of the splits and still holds the state after the first dummy query at the
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
if err != nil {
return err
}
dsp.FinalizePlan(subqueryPlanCtx, subqueryPhysPlan)
dsp.finalizePlanWithRowCount(subqueryPlanCtx, subqueryPhysPlan, subqueryPlan.rowCount)

// TODO(arjun): #28264: We set up a row container, wrap it in a row
// receiver, and use it and serialize the results of the subquery. The type
Expand Down Expand Up @@ -1271,7 +1271,7 @@ func (dsp *DistSQLPlanner) PlanAndRun(
recv.SetError(err)
return physPlanCleanup
}
dsp.FinalizePlan(planCtx, physPlan)
dsp.finalizePlanWithRowCount(planCtx, physPlan, planCtx.planner.curPlan.mainRowCount)
recv.expectedRowsRead = int64(physPlan.TotalEstimatedScannedRows)
runCleanup := dsp.Run(planCtx, txn, physPlan, recv, evalCtx, nil /* finishedSetupFn */)
return func() {
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,11 @@ func (e *distSQLSpecExecFactory) ConstructWindow(
}

func (e *distSQLSpecExecFactory) ConstructPlan(
root exec.Node, subqueries []exec.Subquery, cascades []exec.Cascade, checks []exec.Node,
root exec.Node,
subqueries []exec.Subquery,
cascades []exec.Cascade,
checks []exec.Node,
rootRowCount int64,
) (exec.Plan, error) {
if len(subqueries) != 0 {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: subqueries")
Expand All @@ -781,7 +785,7 @@ func (e *distSQLSpecExecFactory) ConstructPlan(
if len(checks) != 0 {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: checks")
}
return constructPlan(e.planner, root, subqueries, cascades, checks)
return constructPlan(e.planner, root, subqueries, cascades, checks, rootRowCount)
}

func (e *distSQLSpecExecFactory) ConstructExplainOpt(
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func constructPlan(
subqueries []exec.Subquery,
cascades []exec.Cascade,
checks []exec.Node,
rootRowCount int64,
) (exec.Plan, error) {
res := &planComponents{}
assignPlan := func(plan *planMaybePhysical, node exec.Node) {
Expand All @@ -46,6 +47,7 @@ func constructPlan(
}
}
assignPlan(&res.main, root)
res.mainRowCount = rootRowCount
if len(subqueries) > 0 {
res.subqueryPlans = make([]subquery, len(subqueries))
for i := range subqueries {
Expand All @@ -65,6 +67,7 @@ func constructPlan(
return nil, errors.Errorf("invalid SubqueryMode %d", in.Mode)
}
out.expanded = true
out.rowCount = in.RowCount
assignPlan(&out.plan, in.Root)
}
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,10 @@ var DistSQLClusterExecMode = settings.RegisterEnumSetting(
"default distributed SQL execution mode",
"auto",
map[int64]string{
int64(sessiondatapb.DistSQLOff): "off",
int64(sessiondatapb.DistSQLAuto): "auto",
int64(sessiondatapb.DistSQLOn): "on",
int64(sessiondatapb.DistSQLOff): "off",
int64(sessiondatapb.DistSQLAuto): "auto",
int64(sessiondatapb.DistSQLOn): "on",
int64(sessiondatapb.DistSQLAlways): "always",
},
).WithPublic()

Expand Down Expand Up @@ -1405,6 +1406,15 @@ func shouldDistributeGivenRecAndMode(
// is reused, but if plan has logical representation (i.e. it is a planNode
// tree), then we traverse that tree in order to determine the distribution of
// the plan.
// WARNING: in some cases when this method returns
// physicalplan.FullyDistributedPlan, the plan might actually run locally. This
// is the case when
// - the plan ends up with a single flow on the gateway, or
// - during the plan finalization (in DistSQLPlanner.finalizePlanWithRowCount)
// we decide that it is beneficial to move the single flow of the plan from the
// remote node to the gateway.
// TODO(yuzefovich): this will be easy to solve once the DistSQL spec factory is
// completed but is quite annoying to do at the moment.
func getPlanDistribution(
ctx context.Context,
p *planner,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/explain_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *explainPlanNode) startExec(params runParams) error {
} else {
// There might be an issue making the physical plan, but that should not
// cause an error or panic, so swallow the error. See #40677 for example.
distSQLPlanner.FinalizePlan(planCtx, physicalPlan)
distSQLPlanner.finalizePlanWithRowCount(planCtx, physicalPlan, plan.mainRowCount)
ob.AddDistribution(physicalPlan.Distribution.String())
flows := physicalPlan.GenerateFlowSpecs()
flowCtx := newFlowCtxForExplainPurposes(planCtx, params.p)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (n *explainVecNode) startExec(params runParams) error {
return err
}

distSQLPlanner.FinalizePlan(planCtx, physPlan)
distSQLPlanner.finalizePlanWithRowCount(planCtx, physPlan, n.plan.mainRowCount)
flows := physPlan.GenerateFlowSpecs()
flowCtx := newFlowCtxForExplainPurposes(planCtx, params.p)

Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/opt/exec/execbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ func (b *Builder) Build() (_ exec.Plan, err error) {
if err != nil {
return nil, err
}
return b.factory.ConstructPlan(plan.root, b.subqueries, b.cascades, b.checks)

rootRowCount := int64(b.e.(memo.RelExpr).Relational().Stats.RowCountIfAvailable())
return b.factory.ConstructPlan(plan.root, b.subqueries, b.cascades, b.checks, rootRowCount)
}

func (b *Builder) build(e opt.Expr) (_ execPlan, err error) {
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,8 +1976,9 @@ func (b *Builder) buildWith(with *memo.WithExpr) (execPlan, error) {
// subquery mode that reads and discards all rows. This could possibly also
// be fixed by ensuring that bufferNode exhausts its input (and forcing it
// to behave like a spoolNode) and using the EXISTS mode.
Mode: exec.SubqueryAllRows,
Root: buffer,
Mode: exec.SubqueryAllRows,
Root: buffer,
RowCount: int64(with.Relational().Stats.RowCountIfAvailable()),
})

b.addBuiltWithExpr(with.ID, value.outputCols, buffer)
Expand Down Expand Up @@ -2031,7 +2032,8 @@ func (b *Builder) buildRecursiveCTE(rec *memo.RecursiveCTEExpr) (execPlan, error
if err != nil {
return nil, err
}
return innerBld.factory.ConstructPlan(plan.root, innerBld.subqueries, innerBld.cascades, innerBld.checks)
rootRowCount := int64(rec.Recursive.Relational().Stats.RowCountIfAvailable())
return innerBld.factory.ConstructPlan(plan.root, innerBld.subqueries, innerBld.cascades, innerBld.checks, rootRowCount)
}

label := fmt.Sprintf("working buffer (%s)", rec.Name)
Expand Down
23 changes: 18 additions & 5 deletions pkg/sql/opt/exec/execbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,10 @@ func (b *Builder) buildArrayFlatten(
}

typ := b.mem.Metadata().ColumnMeta(af.RequestedCol).Type
e := b.addSubquery(exec.SubqueryAllRows, typ, root.root, af.OriginalExpr)
e := b.addSubquery(
exec.SubqueryAllRows, typ, root.root, af.OriginalExpr,
int64(af.Input.Relational().Stats.RowCountIfAvailable()),
)

return tree.NewTypedArrayFlattenExpr(e), nil
}
Expand Down Expand Up @@ -520,7 +523,10 @@ func (b *Builder) buildAny(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ
contents[val] = b.mem.Metadata().ColumnMeta(opt.ColumnID(key)).Type
})
typs := types.MakeTuple(contents)
subqueryExpr := b.addSubquery(exec.SubqueryAnyRows, typs, plan.root, any.OriginalExpr)
subqueryExpr := b.addSubquery(
exec.SubqueryAnyRows, typs, plan.root, any.OriginalExpr,
int64(any.Input.Relational().Stats.RowCountIfAvailable()),
)

// Build the scalar value that is compared against each row.
scalarExpr, err := b.buildScalar(ctx, any.Scalar)
Expand Down Expand Up @@ -553,7 +559,10 @@ func (b *Builder) buildExistsSubquery(
return nil, err
}

return b.addSubquery(exec.SubqueryExists, types.Bool, plan.root, exists.OriginalExpr), nil
return b.addSubquery(
exec.SubqueryExists, types.Bool, plan.root, exists.OriginalExpr,
int64(exists.Input.Relational().Stats.RowCountIfAvailable()),
), nil
}

func (b *Builder) buildSubquery(
Expand All @@ -580,13 +589,16 @@ func (b *Builder) buildSubquery(
return nil, err
}

return b.addSubquery(exec.SubqueryOneRow, subquery.Typ, plan.root, subquery.OriginalExpr), nil
return b.addSubquery(
exec.SubqueryOneRow, subquery.Typ, plan.root, subquery.OriginalExpr,
int64(input.Relational().Stats.RowCountIfAvailable()),
), nil
}

// addSubquery adds an entry to b.subqueries and creates a tree.Subquery
// expression node associated with it.
func (b *Builder) addSubquery(
mode exec.SubqueryMode, typ *types.T, root exec.Node, originalExpr *tree.Subquery,
mode exec.SubqueryMode, typ *types.T, root exec.Node, originalExpr *tree.Subquery, rowCount int64,
) *tree.Subquery {
var originalSelect tree.SelectStatement
if originalExpr != nil {
Expand All @@ -601,6 +613,7 @@ func (b *Builder) addSubquery(
ExprNode: exprNode,
Mode: mode,
Root: root,
RowCount: rowCount,
})
// Associate the tree.Subquery expression node with this subquery
// by index (1-based).
Expand Down
Loading

0 comments on commit 7607dad

Please sign in to comment.