From e77bf5e77e34dbcf7bb85be10ad8b173354a2abd Mon Sep 17 00:00:00 2001 From: Rebecca Taft Date: Sat, 20 Feb 2021 00:17:52 -0700 Subject: [PATCH] opt,sql: add support for locality optimized scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds support for a new type of operator called LocalityOptimizedSearch. If the session setting locality_optimized_partitioned_index_scan is true, the optimizer now plans a LocalityOptimizedSearch operation whenever possible. LocalityOptimizedSearch is similar to UnionAll, but it is designed to avoid communicating with remote nodes (relative to the gateway region) if at all possible. LocalityOptimizedSearch can be planned under the following conditions: - A scan is known to produce at most one row. - The scan contains multiple spans, with some spans targeting partitions on local nodes (relative to the gateway region), and some targeting partitions on remote nodes. It is not known which span will produce the row. This commit adds a new exploration rule called GenerateLocalityOptimizedScan that generates a LocalityOptimizedSearch if the above conditions hold. The result of GenerateLocalityOptimizedScan will be a LocalityOptimizedSearch in which the left child contains a new scan operator with the local spans from the orginal scan, and the right child contains a new scan operator with the remote spans. The LocalityOptimizedSearch operator ensures that the right child (containing remote spans) is only executed if the left child (containing local spans) does not return any rows. This is a useful optimization if there is locality of access in the workload, such that rows tend to be accessed from the region where they are located. If there is no locality of access, using LocalityOptimizedSearch could be a slight pessimization, since rows residing in remote regions will be fetched slightly more slowly than they would be otherwise. For example, suppose we have a multi-region database with regions 'us-east1', 'us-west1' and 'europe-west1', and we have the following table and query, issued from 'us-east1': CREATE TABLE tab ( k INT PRIMARY KEY, v INT ) LOCALITY REGIONAL BY ROW; SELECT * FROM tab WHERE k = 10; Normally, this would produce the following plan: scan tab └── constraint: /3/1 ├── [/'europe-west1'/10 - /'europe-west1'/10] ├── [/'us-east1'/10 - /'us-east1'/10] └── [/'us-west1'/10 - /'us-west1'/10] but if the session setting locality_optimized_partitioned_index_scan is enabled, the optimizer will produce this plan, using locality optimized search: locality-optimized-search ├── scan tab │ └── constraint: /9/7: [/'us-east1'/10 - /'us-east1'/10] └── scan tab └── constraint: /14/12 ├── [/'europe-west1'/10 - /'europe-west1'/10] └── [/'us-west1'/10 - /'us-west1'/10] As long as k = 10 is located in 'us-east1', the second plan will be much faster. But if k = 10 is located in one of the other regions, the first plan would be slightly faster. Informs #55185 Release justification: This commit is a low risk, high benefit change to existing functionality. The session setting that enables the change is locality_optimized_partitioned_index_scan, which is still disabled by default. Release note (performance improvement): If the session setting locality_optimized_partitioned_index_scan is enabled, the optimizer will try to plan scans known to produce at most one row using "locality optimized search". This optimization applies for REGIONAL BY ROW tables, and if enabled, it means that the execution engine will first search locally for the row before searching remote nodes. If the row is found in a local node, remote nodes will not be searched. --- .../testdata/logic_test/regional_by_row | 121 +++++- pkg/sql/distsql_physical_planner.go | 70 +++- pkg/sql/distsql_spec_exec_factory.go | 5 +- pkg/sql/opt/constraint/span.go | 15 + pkg/sql/opt/exec/execbuilder/relational.go | 19 +- pkg/sql/opt/exec/explain/emit.go | 7 +- pkg/sql/opt/exec/factory.go | 5 + pkg/sql/opt/exec/factory.opt | 7 + pkg/sql/opt/memo/check_expr.go | 2 +- pkg/sql/opt/memo/expr.go | 3 +- pkg/sql/opt/memo/expr_format.go | 4 +- pkg/sql/opt/memo/logical_props_builder.go | 6 + pkg/sql/opt/ops/relational.opt | 68 ++++ pkg/sql/opt/xform/BUILD.bazel | 1 + pkg/sql/opt/xform/coster.go | 15 +- pkg/sql/opt/xform/rules/scan.opt | 65 +++ pkg/sql/opt/xform/scan_funcs.go | 336 +++++++++++++++ pkg/sql/opt/xform/scan_funcs_test.go | 110 +++++ pkg/sql/opt/xform/testdata/coster/zone | 72 ++++ pkg/sql/opt/xform/testdata/rules/scan | 381 ++++++++++++++++++ pkg/sql/opt_exec_factory.go | 13 +- pkg/sql/physicalplan/physical_plan.go | 13 +- pkg/sql/scan.go | 7 + pkg/sql/union.go | 10 +- 24 files changed, 1316 insertions(+), 39 deletions(-) create mode 100644 pkg/sql/opt/xform/scan_funcs_test.go diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row index 8209c741846e..bab2509ff733 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row @@ -530,6 +530,110 @@ pk pk2 a b j statement error cannot drop column crdb_region as it is used to store the region in a REGIONAL BY ROW table\nHINT: You must change the table locality before dropping this table ALTER TABLE regional_by_row_table DROP COLUMN crdb_region +# Query with locality optimized search disabled. +query T +EXPLAIN SELECT * FROM regional_by_row_table WHERE pk = 1 +---- +distribution: local +vectorized: true +· +• scan + missing stats + table: regional_by_row_table@primary + spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1] [/'ca-central-1'/1 - /'ca-central-1'/1] [/'us-east-1'/1 - /'us-east-1'/1] + +statement ok +SET tracing = on,kv,results; SELECT * FROM regional_by_row_table WHERE pk = 1; SET tracing = off + +# All rows are scanned at once without the optimization. +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY + WHERE message LIKE 'fetched:%' OR message LIKE 'output row%' + OR message LIKE 'Scan%' + ORDER BY ordinality ASC +---- +Scan /Table/66/1/"@"/1{-/#}, /Table/66/1/"\x80"/1{-/#}, /Table/66/1/"\xc0"/1{-/#} +fetched: /regional_by_row_table/primary/'ap-southeast-2'/1/pk2/a/b/j -> /1/2/3/'{"a": "b"}' +output row: [1 1 2 3 '{"a": "b"}'] + +statement ok +SET locality_optimized_partitioned_index_scan = true + +# Same query with locality optimized search enabled. +query T +EXPLAIN (DISTSQL) SELECT * FROM regional_by_row_table WHERE pk = 1 +---- +distribution: local +vectorized: true +· +• union all +│ limit: 1 +│ +├── • scan +│ missing stats +│ table: regional_by_row_table@primary +│ spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1] +│ +└── • scan + missing stats + table: regional_by_row_table@primary + spans: [/'ca-central-1'/1 - /'ca-central-1'/1] [/'us-east-1'/1 - /'us-east-1'/1] +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJykkVEL0zAQx9_9FOF8Uclo0ylIQIhoxULdZjtQsGVkzTGLXVKTFDdGv7usRbbJJmw-3l3-l98vOYD72QCH-OsifZvMyLP3Sb7MP6fPSR6n8bsleUE-ZPNPxOKmNlo2q_V-Zc2vlZfrBsmXj3EWk_YHeUMYUNBG4Uxu0QH_BgxKCq01FTpn7LF1GA4kagc8pFDrtvPHdkmhMhaBH8DXvkHgsDxuz1AqtEEIFBR6WTfD2qsgorX1Vto9UMhbqR0nQQFFsXsdFhCwyXkRPCVSK8KI8d_RAoV55zkRjIqIiikVL6l4BWVPwXT-BOi83CBw1tPHJNijEuKPgBjh7wSObgKfODttrEKL6oKx7K8ozczEtEF0KZPW29oTdpMhvOfRMnSt0Q7_Yrm1uaSAaoOjkDOdrXBhTTVcM5bzITc0FDo_TqOxSPQwGn71PMz-Jxz9Mzy9CId92T_5HQAA__-b4SGh + +statement ok +SET tracing = on,kv,results; SELECT * FROM regional_by_row_table WHERE pk = 1; SET tracing = off + +# If the row is found in the local region, the other regions are not searched. +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY + WHERE message LIKE 'fetched:%' OR message LIKE 'output row%' + OR message LIKE 'Scan%' + ORDER BY ordinality ASC +---- +Scan /Table/66/1/"@"/1{-/#} +fetched: /regional_by_row_table/primary/'ap-southeast-2'/1/pk2/a/b/j -> /1/2/3/'{"a": "b"}' +output row: [1 1 2 3 '{"a": "b"}'] + +statement ok +SET tracing = on,kv,results; SELECT * FROM regional_by_row_table WHERE pk = 10; SET tracing = off + +# If the row is not found in the local region, the other regions are searched +# in parallel. +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY + WHERE message LIKE 'fetched:%' OR message LIKE 'output row%' + OR message LIKE 'Scan%' + ORDER BY ordinality ASC +---- +Scan /Table/66/1/"@"/10{-/#} +Scan /Table/66/1/"\x80"/10{-/#}, /Table/66/1/"\xc0"/10{-/#} +fetched: /regional_by_row_table/primary/'ca-central-1'/10/pk2/a/b -> /10/11/12 +output row: [10 10 11 12 NULL] + +# The local region for this query is ca-central-1, so that span should be +# scanned in the first child of the limited union all. +query T nodeidx=3 +USE multi_region_test_db; SET locality_optimized_partitioned_index_scan = true; +EXPLAIN SELECT * FROM regional_by_row_table WHERE pk = 1 +---- +distribution: local +vectorized: true +· +• union all +│ limit: 1 +│ +├── • scan +│ missing stats +│ table: regional_by_row_table@primary +│ spans: [/'ca-central-1'/1 - /'ca-central-1'/1] +│ +└── • scan + missing stats + table: regional_by_row_table@primary + spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1] [/'us-east-1'/1 - /'us-east-1'/1] + + # Tests creating a index and a unique constraint on a REGIONAL BY ROW table. statement ok CREATE INDEX new_idx ON regional_by_row_table(a, b) @@ -637,11 +741,18 @@ vectorized: true │ ├── • values │ │ size: 6 columns, 1 row │ │ -│ └── • scan -│ missing stats -│ table: regional_by_row_table@primary -│ spans: [/'ap-southeast-2'/2 - /'ap-southeast-2'/2] [/'ca-central-1'/2 - /'ca-central-1'/2] [/'us-east-1'/2 - /'us-east-1'/2] -│ locking strength: for update +│ └── • union all +│ │ limit: 1 +│ │ +│ ├── • scan +│ │ missing stats +│ │ table: regional_by_row_table@primary +│ │ spans: [/'ap-southeast-2'/2 - /'ap-southeast-2'/2] +│ │ +│ └── • scan +│ missing stats +│ table: regional_by_row_table@primary +│ spans: [/'ca-central-1'/2 - /'ca-central-1'/2] [/'us-east-1'/2 - /'us-east-1'/2] │ ├── • constraint-check │ │ diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 8b52dbd96e5d..df639d69f83e 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -471,18 +471,22 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { return cannotDistribute, cannotDistributeRowLevelLockingErr } - // Although we don't yet recommend distributing plans where soft limits - // propagate to scan nodes because we don't have infrastructure to only - // plan for a few ranges at a time, the propagation of the soft limits - // to scan nodes has been added in 20.1 release, so to keep the - // previous behavior we continue to ignore the soft limits for now. - // TODO(yuzefovich): pay attention to the soft limits. - rec := canDistribute - // Check if we are doing a full scan. - if n.isFull { - rec = rec.compose(shouldDistribute) + switch { + case n.localityOptimized: + // This is a locality optimized scan. + return cannotDistribute, nil + case n.isFull: + // This is a full scan. + return shouldDistribute, nil + default: + // Although we don't yet recommend distributing plans where soft limits + // propagate to scan nodes because we don't have infrastructure to only + // plan for a few ranges at a time, the propagation of the soft limits + // to scan nodes has been added in 20.1 release, so to keep the + // previous behavior we continue to ignore the soft limits for now. + // TODO(yuzefovich): pay attention to the soft limits. + return canDistribute, nil } - return rec, nil case *sortNode: rec, err := checkSupportForPlanNode(n.plan) @@ -3303,6 +3307,10 @@ func (dsp *DistSQLPlanner) createPlanForSetOp( p.SetMergeOrdering(mergeOrdering) if !n.all { + if n.hardLimit != 0 { + return nil, errors.AssertionFailedf("a hard limit is not supported for UNION (only for UNION ALL)") + } + // TODO(abhimadan): use columns from mergeOrdering to fill in the // OrderingColumns field in DistinctSpec once the unused columns // are projected out. @@ -3325,12 +3333,36 @@ func (dsp *DistSQLPlanner) createPlanForSetOp( // on a single node (which is always the case when there are mutations), // we can fuse everything so there are no concurrent KV operations (see // #40487, #41307). - // - // Furthermore, in order to disable auto-parallelism that could occur - // when merging multiple streams on the same node, we force the - // serialization of the merge operation (otherwise, it would be - // possible that we have a source of unbounded parallelism, see #51548). - p.EnsureSingleStreamPerNode(true /* forceSerialization */) + + if n.hardLimit == 0 { + // In order to disable auto-parallelism that could occur when merging + // multiple streams on the same node, we force the serialization of the + // merge operation (otherwise, it would be possible that we have a + // source of unbounded parallelism, see #51548). + p.EnsureSingleStreamPerNode(true /* forceSerialization */, execinfrapb.PostProcessSpec{}) + } else { + if p.GetLastStageDistribution() != physicalplan.LocalPlan { + return nil, errors.AssertionFailedf("we expect that limited UNION ALL queries are only planned locally") + } + if len(p.MergeOrdering.Columns) != 0 { + return nil, errors.AssertionFailedf( + "we expect that limited UNION ALL queries do not require a specific ordering", + ) + } + // Here we don't force the serialization so that the unordered + // synchronizer is used. Additionally, because the plan will be fully + // local, we will use the flowinfra.FuseAggressively option. As a + // result, the plan will end up with a serial unordered synchronizer, + // which has exactly the behavior that we want (in particular, it won't + // execute the right child if the limit is reached by the left child). + // TODO(rytaft,yuzefovich): This currently only works with the + // vectorized engine. We should consider adding support for the serial + // unordered synchronizer in the row-based engine (see #61081). + p.EnsureSingleStreamPerNode( + false, /* forceSerialization */ + execinfrapb.PostProcessSpec{Limit: n.hardLimit}, + ) + } // UNION ALL is special: it doesn't have any required downstream // processor, so its two inputs might have different post-processing @@ -3341,6 +3373,10 @@ func (dsp *DistSQLPlanner) createPlanForSetOp( } } } else { + if n.hardLimit != 0 { + return nil, errors.AssertionFailedf("a hard limit is not supported for INTERSECT or EXCEPT") + } + // We plan INTERSECT and EXCEPT queries with joiners. Get the appropriate // join type. joinType := distsqlSetOpJoinType(n.unionType) diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 85ac8c9ef97c..47ef7ed60b0c 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -175,6 +175,9 @@ func (e *distSQLSpecExecFactory) ConstructScan( // previous behavior we continue to ignore the soft limits for now. // TODO(yuzefovich): pay attention to the soft limits. recommendation := canDistribute + if params.LocalityOptimized { + recommendation = recommendation.compose(cannotDistribute) + } planCtx := e.getPlanCtx(recommendation) p := planCtx.NewPhysicalPlan() @@ -589,7 +592,7 @@ func (e *distSQLSpecExecFactory) ConstructDistinct( } func (e *distSQLSpecExecFactory) ConstructSetOp( - typ tree.UnionType, all bool, left, right exec.Node, + typ tree.UnionType, all bool, left, right exec.Node, hardLimit uint64, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: set op") } diff --git a/pkg/sql/opt/constraint/span.go b/pkg/sql/opt/constraint/span.go index c15aed9e04ae..05ba15268793 100644 --- a/pkg/sql/opt/constraint/span.go +++ b/pkg/sql/opt/constraint/span.go @@ -88,6 +88,21 @@ func (sp *Span) HasSingleKey(evalCtx *tree.EvalContext) bool { return true } +// Prefix returns the length of the longest prefix of values for which the +// span has the same start and end values. For example, [/1/1/1 - /1/1/2] +// has prefix 2. +func (sp *Span) Prefix(evalCtx *tree.EvalContext) int { + start := sp.StartKey() + end := sp.EndKey() + + for prefix := 0; ; prefix++ { + if start.Length() <= prefix || end.Length() <= prefix || + start.Value(prefix).Compare(evalCtx, end.Value(prefix)) != 0 { + return prefix + } + } +} + // StartKey returns the start key. func (sp *Span) StartKey() Key { return sp.start diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index b4a8c3aba67a..7bc861419916 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -569,6 +569,7 @@ func (b *Builder) scanParams( Parallelize: parallelize, Locking: locking, EstimatedRowCount: rowCount, + LocalityOptimized: scan.LocalityOptimized, }, outputMap, nil } @@ -1314,7 +1315,7 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) { switch set.Op() { case opt.UnionOp: typ, all = tree.UnionOp, false - case opt.UnionAllOp: + case opt.UnionAllOp, opt.LocalityOptimizedSearchOp: typ, all = tree.UnionOp, true case opt.IntersectOp: typ, all = tree.IntersectOp, false @@ -1328,7 +1329,21 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) { panic(errors.AssertionFailedf("invalid operator %s", log.Safe(set.Op()))) } - node, err := b.factory.ConstructSetOp(typ, all, left.root, right.root) + hardLimit := uint64(0) + if set.Op() == opt.LocalityOptimizedSearchOp { + // If we are performing locality optimized search, set a limit equal to + // the maximum possible number of rows. This will tell the execution engine + // not to execute the right child if the limit is reached by the left + // child. + // TODO(rytaft): Store the limit in the expression. + hardLimit = uint64(set.Relational().Cardinality.Max) + if hardLimit > 1 { + panic(errors.AssertionFailedf( + "locality optimized search is not yet supported for more than one row at a time", + )) + } + } + node, err := b.factory.ConstructSetOp(typ, all, left.root, right.root, hardLimit) if err != nil { return execPlan{}, err } diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index 6466077eb31c..28419d8e3607 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -447,6 +447,12 @@ func (e *emitter) emitNodeAttributes(n *Node) error { ob.Attr("already ordered", colinfo.ColumnOrdering(a.Ordering[:p]).String(n.Columns())) } + case setOpOp: + a := n.args.(*setOpArgs) + if a.HardLimit > 0 { + ob.Attr("limit", a.HardLimit) + } + case indexJoinOp: a := n.args.(*indexJoinArgs) ob.Attrf("table", "%s@%s", a.Table.Name(), a.Table.Index(0).Name()) @@ -720,7 +726,6 @@ func (e *emitter) emitNodeAttributes(n *Node) error { case simpleProjectOp, serializingProjectOp, - setOpOp, ordinalityOp, max1RowOp, explainOptOp, diff --git a/pkg/sql/opt/exec/factory.go b/pkg/sql/opt/exec/factory.go index 9be03805d27c..ee9cb09a28f7 100644 --- a/pkg/sql/opt/exec/factory.go +++ b/pkg/sql/opt/exec/factory.go @@ -61,6 +61,11 @@ type ScanParams struct { Locking *tree.LockingItem EstimatedRowCount float64 + + // If true, we are performing a locality optimized search. In order for this + // to work correctly, the execution engine must create a local DistSQL plan + // for the main query (subqueries and postqueries need not be local). + LocalityOptimized bool } // OutputOrdering indicates the required output ordering on a Node that is being diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index 6891356b3a1e..acf3189658e1 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -164,11 +164,18 @@ define Distinct { # SetOp performs a UNION / INTERSECT / EXCEPT operation (either the ALL or the # DISTINCT version). The left and right nodes must have the same number of # columns. +# +# HardLimit can only be set for UNION ALL operations. It is used to implement +# locality optimized search, and instructs the execution engine that it should +# execute the left node to completion and possibly short-circuit if the limit is +# reached before executing the right node. The limit is guaranteed but the +# short-circuit behavior is not. define SetOp { Typ tree.UnionType All bool Left exec.Node Right exec.Node + HardLimit uint64 } # Sort performs a resorting of the rows produced by the input node. diff --git a/pkg/sql/opt/memo/check_expr.go b/pkg/sql/opt/memo/check_expr.go index 8068cc970ef0..1d008319c0b8 100644 --- a/pkg/sql/opt/memo/check_expr.go +++ b/pkg/sql/opt/memo/check_expr.go @@ -107,7 +107,7 @@ func (m *Memo) CheckExpr(e opt.Expr) { case *SelectExpr: checkFilters(t.Filters) - case *UnionExpr, *UnionAllExpr: + case *UnionExpr, *UnionAllExpr, *LocalityOptimizedSearchExpr: setPrivate := t.Private().(*SetPrivate) outColSet := setPrivate.OutCols.ToSet() diff --git a/pkg/sql/opt/memo/expr.go b/pkg/sql/opt/memo/expr.go index b8c5ce6e8100..f58d7263c8f4 100644 --- a/pkg/sql/opt/memo/expr.go +++ b/pkg/sql/opt/memo/expr.go @@ -633,7 +633,8 @@ func (f *WindowFrame) String() string { func (s *ScanPrivate) IsCanonical() bool { return s.Index == cat.PrimaryIndex && s.Constraint == nil && - s.HardLimit == 0 + s.HardLimit == 0 && + !s.LocalityOptimized } // IsUnfiltered returns true if the ScanPrivate will produce all rows in the diff --git a/pkg/sql/opt/memo/expr_format.go b/pkg/sql/opt/memo/expr_format.go index 813c259741e3..97703be7e545 100644 --- a/pkg/sql/opt/memo/expr_format.go +++ b/pkg/sql/opt/memo/expr_format.go @@ -277,7 +277,7 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) { colList = t.Cols case *UnionExpr, *IntersectExpr, *ExceptExpr, - *UnionAllExpr, *IntersectAllExpr, *ExceptAllExpr: + *UnionAllExpr, *IntersectAllExpr, *ExceptAllExpr, *LocalityOptimizedSearchExpr: colList = e.Private().(*SetPrivate).OutCols default: @@ -321,7 +321,7 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) { // Special-case handling for set operators to show the left and right // input columns that correspond to the output columns. case *UnionExpr, *IntersectExpr, *ExceptExpr, - *UnionAllExpr, *IntersectAllExpr, *ExceptAllExpr: + *UnionAllExpr, *IntersectAllExpr, *ExceptAllExpr, *LocalityOptimizedSearchExpr: if !f.HasFlags(ExprFmtHideColumns) { private := e.Private().(*SetPrivate) f.formatColList(e, tp, "left columns:", private.LeftCols) diff --git a/pkg/sql/opt/memo/logical_props_builder.go b/pkg/sql/opt/memo/logical_props_builder.go index 95dd0540a1ab..2fec18202c3a 100644 --- a/pkg/sql/opt/memo/logical_props_builder.go +++ b/pkg/sql/opt/memo/logical_props_builder.go @@ -670,6 +670,12 @@ func (b *logicalPropsBuilder) buildExceptAllProps(except *ExceptAllExpr, rel *pr b.buildSetProps(except, rel) } +func (b *logicalPropsBuilder) buildLocalityOptimizedSearchProps( + locOptSearch *LocalityOptimizedSearchExpr, rel *props.Relational, +) { + b.buildSetProps(locOptSearch, rel) +} + func (b *logicalPropsBuilder) buildSetProps(setNode RelExpr, rel *props.Relational) { BuildSharedProps(setNode, &rel.Shared) diff --git a/pkg/sql/opt/ops/relational.opt b/pkg/sql/opt/ops/relational.opt index cccab2a1f421..498feb8c582a 100644 --- a/pkg/sql/opt/ops/relational.opt +++ b/pkg/sql/opt/ops/relational.opt @@ -77,6 +77,17 @@ define ScanPrivate { # list will always be empty when part of a ScanPrivate. Locking LockingItem + # LocalityOptimized is true if this scan is a child of a + # LocalityOptimizedSearch operator, indicating that it either contains all + # local (relative to the gateway region) or all remote spans. The + # LocalityOptimizedSearch operator is similar to a UNION ALL, but ensures + # that the right child (containing remote spans) is only executed if the + # left child (containing local spans) does not return any rows. Therefore, + # LocalityOptimized is used as a hint to the coster to reduce the cost of + # this scan. It is also used to ensure that the DistSQL planner creates a + # local plan. + LocalityOptimized bool + # PartitionConstrainedScan records whether or not we were able to use partitions # to constrain the lookup spans further. This flag is used to record telemetry # about how often this optimization is getting applied. @@ -883,6 +894,63 @@ define ExceptAll { _ SetPrivate } +# LocalityOptimizedSearch is similar to UnionAll, but it is designed to avoid +# communicating with remote nodes (relative to the gateway region) if at all +# possible. LocalityOptimizedSearch can be planned when a scan is known to +# produce at most one row, but it is not known which region contains that row +# (if any). In this case, the scan can be split in two, and the resulting scans +# will become the children of the LocalityOptimizedSearch operator. The left +# scan should only contain spans targeting partitions on local nodes, and the +# right scan should contain the remaining spans. The LocalityOptimizedSearch +# operator ensures that the right child (containing remote spans) is only +# executed if the left child (containing local spans) does not return any rows. +# +# This is a useful optimization if there is locality of access in the workload, +# such that rows tend to be accessed from the region where they are located. +# If there is no locality of access, using LocalityOptimizedSearch could be a +# slight pessimization, since rows residing in remote regions will be fetched +# slightly more slowly than they would be otherwise. +# +# For example, suppose we have a multi-region database with regions 'us-east1', +# 'us-west1' and 'europe-west1', and we have the following table and query, +# issued from 'us-east1': +# +# CREATE TABLE tab ( +# k INT PRIMARY KEY, +# v INT +# ) LOCALITY REGIONAL BY ROW; +# +# SELECT * FROM tab WHERE k = 10; +# +# Normally, this would produce the following plan: +# +# scan tab +# └── constraint: /3/1 +# ├── [/'europe-west1'/10 - /'europe-west1'/10] +# ├── [/'us-east1'/10 - /'us-east1'/10] +# └── [/'us-west1'/10 - /'us-west1'/10] +# +# but if the session setting locality_optimized_partitioned_index_scan is enabled, +# the optimizer will produce this plan, using locality optimized search: +# +# locality-optimized-search +# ├── scan tab +# │ └── constraint: /9/7: [/'us-east1'/10 - /'us-east1'/10] +# └── scan tab +# └── constraint: /14/12 +# ├── [/'europe-west1'/10 - /'europe-west1'/10] +# └── [/'us-west1'/10 - /'us-west1'/10] +# +# As long as k = 10 is located in 'us-east1', the second plan will be much faster. +# But if k = 10 is located in one of the other regions, the first plan would be +# slightly faster. +[Relational, Set] +define LocalityOptimizedSearch { + Local RelExpr + Remote RelExpr + _ SetPrivate +} + # Limit returns a limited subset of the results in the input relation. The limit # expression is a scalar value; the operator returns at most this many rows. The # Orering field is a physical.OrderingChoice which indicates the row ordering diff --git a/pkg/sql/opt/xform/BUILD.bazel b/pkg/sql/opt/xform/BUILD.bazel index e958fe9f8763..d28b1687ef57 100644 --- a/pkg/sql/opt/xform/BUILD.bazel +++ b/pkg/sql/opt/xform/BUILD.bazel @@ -58,6 +58,7 @@ go_test( "main_test.go", "optimizer_test.go", "physical_props_test.go", + "scan_funcs_test.go", ], data = glob(["testdata/**"]), embed = [":xform"], diff --git a/pkg/sql/opt/xform/coster.go b/pkg/sql/opt/xform/coster.go index c85c605b1aca..1bfd31718cd4 100644 --- a/pkg/sql/opt/xform/coster.go +++ b/pkg/sql/opt/xform/coster.go @@ -466,7 +466,7 @@ func (c *coster) ComputeCost(candidate memo.RelExpr, required *physical.Required cost = c.computeZigzagJoinCost(candidate.(*memo.ZigzagJoinExpr)) case opt.UnionOp, opt.IntersectOp, opt.ExceptOp, - opt.UnionAllOp, opt.IntersectAllOp, opt.ExceptAllOp: + opt.UnionAllOp, opt.IntersectAllOp, opt.ExceptAllOp, opt.LocalityOptimizedSearchOp: cost = c.computeSetCost(candidate) case opt.GroupByOp, opt.ScalarGroupByOp, opt.DistinctOnOp, opt.EnsureDistinctOnOp, @@ -609,7 +609,18 @@ func (c *coster) computeScanCost(scan *memo.ScanExpr, required *physical.Require if scan.IsUnfiltered(c.mem.Metadata()) { baseCost += cpuCostFactor } - return baseCost + memo.Cost(rowCount)*(seqIOCostFactor+perRowCost) + + cost := baseCost + memo.Cost(rowCount)*(seqIOCostFactor+perRowCost) + + // If this scan is locality optimized, divide the cost in two in order to make + // the total cost of the two scans in the locality optimized plan less then + // the cost of the single scan in the non-locality optimized plan. + // TODO(rytaft): This is hacky. We should really be making this determination + // based on the latency between regions. + if scan.LocalityOptimized { + cost /= 2 + } + return cost } func (c *coster) computeSelectCost(sel *memo.SelectExpr) memo.Cost { diff --git a/pkg/sql/opt/xform/rules/scan.opt b/pkg/sql/opt/xform/rules/scan.opt index 3fcbfbfd1285..5742b9ff3651 100644 --- a/pkg/sql/opt/xform/rules/scan.opt +++ b/pkg/sql/opt/xform/rules/scan.opt @@ -8,3 +8,68 @@ (Scan $scanPrivate:* & (IsCanonicalScan $scanPrivate)) => (GenerateIndexScans $scanPrivate) + +# GenerateLocalityOptimizedScan plans a LocalityOptimizedSearch operation if +# possible. LocalityOptimizedSearch is similar to UnionAll, but it is designed +# to avoid communicating with remote nodes (relative to the gateway region) if +# at all possible. +# +# LocalityOptimizedSearch can be planned under the following conditions: +# - A scan is known to produce at most one row. +# - The scan contains multiple spans, with some spans targeting partitions on +# local nodes (relative to the gateway region), and some targeting partitions +# on remote nodes. It is not known which span will produce the row. +# +# The result of GenerateLocalityOptimizedScan will be a LocalityOptimizedSearch +# in which the left child contains a new scan operator with the local spans from +# the original scan, and the right child contains a new scan operator with the +# remote spans. The LocalityOptimizedSearch operator ensures that the right +# child (containing remote spans) is only executed if the left child (containing +# local spans) does not return any rows. +# +# This is a useful optimization if there is locality of access in the workload, +# such that rows tend to be accessed from the region where they are located. +# If there is no locality of access, using LocalityOptimizedSearch could be a +# slight pessimization, since rows residing in remote regions will be fetched +# slightly more slowly than they would be otherwise. +# +# For example, suppose we have a multi-region database with regions 'us-east1', +# 'us-west1' and 'europe-west1', and we have the following table and query, +# issued from 'us-east1': +# +# CREATE TABLE tab ( +# k INT PRIMARY KEY, +# v INT +# ) LOCALITY REGIONAL BY ROW; +# +# SELECT * FROM tab WHERE k = 10; +# +# Normally, this would produce the following plan: +# +# scan tab +# └── constraint: /3/1 +# ├── [/'europe-west1'/10 - /'europe-west1'/10] +# ├── [/'us-east1'/10 - /'us-east1'/10] +# └── [/'us-west1'/10 - /'us-west1'/10] +# +# but if the session setting locality_optimized_partitioned_index_scan is enabled, +# the optimizer will produce this plan, using locality optimized search: +# +# locality-optimized-search +# ├── scan tab +# │ └── constraint: /9/7: [/'us-east1'/10 - /'us-east1'/10] +# └── scan tab +# └── constraint: /14/12 +# ├── [/'europe-west1'/10 - /'europe-west1'/10] +# └── [/'us-west1'/10 - /'us-west1'/10] +# +# As long as k = 10 is located in 'us-east1', the second plan will be much faster. +# But if k = 10 is located in one of the other regions, the first plan would be +# slightly faster. +[GenerateLocalityOptimizedScan, Explore] +(Scan + $scanPrivate:* & + (CanMaybeGenerateLocalityOptimizedScan $scanPrivate) +) +=> +(GenerateLocalityOptimizedScan $scanPrivate) diff --git a/pkg/sql/opt/xform/scan_funcs.go b/pkg/sql/opt/xform/scan_funcs.go index a1d50b79f6c2..9b38e13fe8da 100644 --- a/pkg/sql/opt/xform/scan_funcs.go +++ b/pkg/sql/opt/xform/scan_funcs.go @@ -11,9 +11,14 @@ package xform import ( + "sort" + "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" + "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" ) // GenerateIndexScans enumerates all non-inverted secondary indexes on the given @@ -69,3 +74,334 @@ func (c *CustomFuncs) GenerateIndexScans(grp memo.RelExpr, scanPrivate *memo.Sca sb.build(grp) }) } + +const regionKey = "region" + +// CanMaybeGenerateLocalityOptimizedScan returns true if it may be possible to +// generate a locality optimized scan from the given scan private. +// CanMaybeGenerateLocalityOptimizedScan performs simple checks that are +// inexpensive to execute and can filter out cases where the optimization +// definitely cannot apply. See the comment above the +// GenerateLocalityOptimizedScan rule for details. +func (c *CustomFuncs) CanMaybeGenerateLocalityOptimizedScan(scanPrivate *memo.ScanPrivate) bool { + // Respect the session setting LocalityOptimizedSearch. + if !c.e.evalCtx.SessionData.LocalityOptimizedSearch { + return false + } + + if scanPrivate.LocalityOptimized { + // This scan has already been locality optimized. + return false + } + + if scanPrivate.HardLimit != 0 { + // This optimization doesn't apply to limited scans. + return false + } + + // This scan should have at least two spans, or we won't be able to move one + // of the spans to a separate remote scan. + if scanPrivate.Constraint == nil || scanPrivate.Constraint.Spans.Count() < 2 { + return false + } + + // Don't apply the rule if there are too many spans, since the rule code is + // O(# spans * # prefixes * # datums per prefix). + if scanPrivate.Constraint.Spans.Count() > 10000 { + return false + } + + // There should be at least two partitions, or we won't be able to + // differentiate between local and remote partitions. + tabMeta := c.e.mem.Metadata().TableMeta(scanPrivate.Table) + index := tabMeta.Table.Index(scanPrivate.Index) + if index.PartitionCount() < 2 { + return false + } + + // The local region must be set, or we won't be able to determine which + // partitions are local. + _, found := c.e.evalCtx.Locality.Find(regionKey) + return found +} + +// GenerateLocalityOptimizedScan generates a locality optimized scan if possible +// from the given scan private. This function should only be called if +// CanMaybeGenerateLocalityOptimizedScan returns true. See the comment above the +// GenerateLocalityOptimizedScan rule for more details. +func (c *CustomFuncs) GenerateLocalityOptimizedScan( + grp memo.RelExpr, scanPrivate *memo.ScanPrivate, +) { + // We can only generate a locality optimized scan if we know there is at + // most one row produced by the local spans. + // TODO(rytaft): We may be able to expand this to allow any number of rows, + // as long as there is a hard upper bound. + if !grp.Relational().Cardinality.IsZeroOrOne() { + return + } + + tabMeta := c.e.mem.Metadata().TableMeta(scanPrivate.Table) + index := tabMeta.Table.Index(scanPrivate.Index) + + // We already know that a local region exists from calling + // CanMaybeGenerateLocalityOptimizedScan. + localRegion, _ := c.e.evalCtx.Locality.Find(regionKey) + + // Determine whether the index has both local and remote partitions, and + // if so, which spans target local partitions. + var localPartitions util.FastIntSet + for i, n := 0, index.PartitionCount(); i < n; i++ { + part := index.Partition(i) + if isZoneLocal(part.Zone(), localRegion) { + localPartitions.Add(i) + } + } + if localPartitions.Len() == 0 || localPartitions.Len() == index.PartitionCount() { + // The partitions are either all local or all remote. + return + } + + localSpans := c.getLocalSpans(index, localPartitions, scanPrivate.Constraint) + if localSpans.Len() == 0 || localSpans.Len() == scanPrivate.Constraint.Spans.Count() { + // The spans target all local or all remote partitions. + return + } + + // Split the spans into local and remote sets. + localConstraint, remoteConstraint := c.splitSpans(scanPrivate.Constraint, localSpans) + + // Create the local scan. + localScanPrivate := c.DuplicateScanPrivate(scanPrivate) + localScanPrivate.LocalityOptimized = true + localConstraint.Columns = localConstraint.Columns.RemapColumns(scanPrivate.Table, localScanPrivate.Table) + localScanPrivate.Constraint = &localConstraint + localScan := c.e.f.ConstructScan(localScanPrivate) + + // Create the remote scan. + remoteScanPrivate := c.DuplicateScanPrivate(scanPrivate) + remoteScanPrivate.LocalityOptimized = true + remoteConstraint.Columns = remoteConstraint.Columns.RemapColumns(scanPrivate.Table, remoteScanPrivate.Table) + remoteScanPrivate.Constraint = &remoteConstraint + remoteScan := c.e.f.ConstructScan(remoteScanPrivate) + + // Add the LocalityOptimizedSearchExpr to the same group as the original scan. + locOptSearch := memo.LocalityOptimizedSearchExpr{ + Local: localScan, + Remote: remoteScan, + SetPrivate: memo.SetPrivate{ + LeftCols: localScan.Relational().OutputCols.ToList(), + RightCols: remoteScan.Relational().OutputCols.ToList(), + OutCols: grp.Relational().OutputCols.ToList(), + }, + } + c.e.mem.AddLocalityOptimizedSearchToGroup(&locOptSearch, grp) +} + +// isZoneLocal returns true if the given zone config indicates that the replicas +// it constrains will be primarily located in the localRegion. +func isZoneLocal(zone cat.Zone, localRegion string) bool { + // First count the number of local and remote replica constraints. If all + // are local or all are remote, we can return early. + local, remote := 0, 0 + for i, n := 0, zone.ReplicaConstraintsCount(); i < n; i++ { + replicaConstraint := zone.ReplicaConstraints(i) + for j, m := 0, replicaConstraint.ConstraintCount(); j < m; j++ { + constraint := replicaConstraint.Constraint(j) + if isLocal, ok := isConstraintLocal(constraint, localRegion); ok { + if isLocal { + local++ + } else { + remote++ + } + } + } + } + if local > 0 && remote == 0 { + return true + } + if remote > 0 && local == 0 { + return false + } + + // Next check the voter replica constraints. Once again, if all are local or + // all are remote, we can return early. + local, remote = 0, 0 + for i, n := 0, zone.VoterConstraintsCount(); i < n; i++ { + replicaConstraint := zone.VoterConstraint(i) + for j, m := 0, replicaConstraint.ConstraintCount(); j < m; j++ { + constraint := replicaConstraint.Constraint(j) + if isLocal, ok := isConstraintLocal(constraint, localRegion); ok { + if isLocal { + local++ + } else { + remote++ + } + } + } + } + if local > 0 && remote == 0 { + return true + } + if remote > 0 && local == 0 { + return false + } + + // Use the lease preferences as a tie breaker. We only really care about the + // first one, since subsequent lease preferences only apply in edge cases. + if zone.LeasePreferenceCount() > 0 { + leasePref := zone.LeasePreference(0) + for i, n := 0, leasePref.ConstraintCount(); i < n; i++ { + constraint := leasePref.Constraint(i) + if isLocal, ok := isConstraintLocal(constraint, localRegion); ok { + return isLocal + } + } + } + + return false +} + +// isConstraintLocal returns isLocal=true and ok=true if the given constraint is +// a required constraint matching the given localRegion. Returns isLocal=false +// and ok=true if the given constraint is a prohibited constraint matching the +// given local region or if it is a required constraint matching a different +// region. Any other scenario returns ok=false, since this constraint gives no +// information about whether the constrained replicas are local or remote. +func isConstraintLocal(constraint cat.Constraint, localRegion string) (isLocal bool, ok bool) { + if constraint.GetKey() != regionKey { + // We only care about constraints on the region. + return false /* isLocal */, false /* ok */ + } + if constraint.GetValue() == localRegion { + if constraint.IsRequired() { + // The local region is required. + return true /* isLocal */, true /* ok */ + } + // The local region is prohibited. + return false /* isLocal */, true /* ok */ + } + if constraint.IsRequired() { + // A remote region is required. + return false /* isLocal */, true /* ok */ + } + // A remote region is prohibited, so this constraint gives no information + // about whether the constrained replicas are local or remote. + return false /* isLocal */, false /* ok */ +} + +// prefixIsLocal contains a PARTITION BY LIST prefix, and a boolean indicating +// whether the prefix is from a local partition. +type prefixIsLocal struct { + prefix tree.Datums + isLocal bool +} + +// prefixSorter sorts prefixes (which are wrapped in prefixIsLocal structs) so +// that longer prefixes are ordered first. +type prefixSorter []prefixIsLocal + +var _ sort.Interface = &prefixSorter{} + +// Len is part of sort.Interface. +func (ps prefixSorter) Len() int { + return len(ps) +} + +// Less is part of sort.Interface. +func (ps prefixSorter) Less(i, j int) bool { + return len(ps[i].prefix) > len(ps[j].prefix) +} + +// Swap is part of sort.Interface. +func (ps prefixSorter) Swap(i, j int) { + ps[i], ps[j] = ps[j], ps[i] +} + +// getLocalSpans returns the indexes of the spans from the given constraint that +// target local partitions. +func (c *CustomFuncs) getLocalSpans( + index cat.Index, localPartitions util.FastIntSet, constraint *constraint.Constraint, +) util.FastIntSet { + // Collect all the prefixes from all the different partitions (remembering + // which ones came from local partitions), and sort them so that longer + // prefixes come before shorter prefixes. For each span in the constraint, we + // will iterate through the list of prefixes until we find a match, so + // ordering them with longer prefixes first ensures that the correct match is + // found. + allPrefixes := make(prefixSorter, 0, index.PartitionCount()) + for i, n := 0, index.PartitionCount(); i < n; i++ { + part := index.Partition(i) + isLocal := localPartitions.Contains(i) + partitionPrefixes := part.PartitionByListPrefixes() + if len(partitionPrefixes) == 0 { + // This can happen when the partition value is DEFAULT. + allPrefixes = append(allPrefixes, prefixIsLocal{ + prefix: nil, + isLocal: isLocal, + }) + } + for j := range partitionPrefixes { + allPrefixes = append(allPrefixes, prefixIsLocal{ + prefix: partitionPrefixes[j], + isLocal: isLocal, + }) + } + } + sort.Sort(allPrefixes) + + // Now iterate through the spans and determine whether each one matches + // with a prefix from a local partition. + // TODO(rytaft): Sort the prefixes by key in addition to length, and use + // binary search here. + var localSpans util.FastIntSet + for i, n := 0, constraint.Spans.Count(); i < n; i++ { + span := constraint.Spans.Get(i) + spanPrefix := span.Prefix(c.e.evalCtx) + for j := range allPrefixes { + prefix := allPrefixes[j].prefix + isLocal := allPrefixes[j].isLocal + if len(prefix) > spanPrefix { + continue + } + matches := true + for k, datum := range prefix { + if span.StartKey().Value(k).Compare(c.e.evalCtx, datum) != 0 { + matches = false + break + } + } + if matches { + if isLocal { + localSpans.Add(i) + } + break + } + } + } + return localSpans +} + +// splitSpans splits the original constraint into a local and remote constraint +// by putting the spans at positions identified by localSpanOrds into the local +// constraint, and the remaining spans into the remote constraint. +func (c *CustomFuncs) splitSpans( + origConstraint *constraint.Constraint, localSpanOrds util.FastIntSet, +) (localConstraint, remoteConstraint constraint.Constraint) { + allSpansCount := origConstraint.Spans.Count() + localSpansCount := localSpanOrds.Len() + var localSpans, remoteSpans constraint.Spans + localSpans.Alloc(localSpansCount) + remoteSpans.Alloc(allSpansCount - localSpansCount) + for i := 0; i < allSpansCount; i++ { + span := origConstraint.Spans.Get(i) + if localSpanOrds.Contains(i) { + localSpans.Append(span) + } else { + remoteSpans.Append(span) + } + } + keyCtx := constraint.MakeKeyContext(&origConstraint.Columns, c.e.evalCtx) + localConstraint.Init(&keyCtx, &localSpans) + remoteConstraint.Init(&keyCtx, &remoteSpans) + return localConstraint, remoteConstraint +} diff --git a/pkg/sql/opt/xform/scan_funcs_test.go b/pkg/sql/opt/xform/scan_funcs_test.go new file mode 100644 index 000000000000..5e2e1c6e1d13 --- /dev/null +++ b/pkg/sql/opt/xform/scan_funcs_test.go @@ -0,0 +1,110 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package xform + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "gopkg.in/yaml.v2" +) + +func TestIsZoneLocal(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + localRegion string + constraints string + voterConstraints string + leasePrefs string + expected bool + }{ + {localRegion: "us", constraints: "[]", expected: false}, + {localRegion: "us", constraints: "[+region=eu,+dc=uk]", expected: false}, + {localRegion: "us", constraints: "[-region=us,+dc=east]", expected: false}, + {localRegion: "us", constraints: "[-region=eu]", expected: false}, + {localRegion: "us", constraints: "[+region=us]", expected: true}, + + {localRegion: "us", voterConstraints: "[+region=us,-dc=east]", expected: true}, + {localRegion: "us", voterConstraints: "[+region=us,+dc=west]", expected: true}, + {localRegion: "us", voterConstraints: "[+dc=east]", expected: false}, + {localRegion: "us", voterConstraints: "[+dc=west,+ssd]", expected: false}, + {localRegion: "us", voterConstraints: "[-region=eu,+dc=east]", expected: false}, + {localRegion: "us", voterConstraints: "[+region=us,+dc=east,+rack=1,-ssd]", expected: true}, + + {localRegion: "us", constraints: `{"+region=us,+dc=east":3,"-dc=east":2}`, expected: true}, + {localRegion: "us", constraints: `{"+region=us,+dc=east":3,"+region=us,+dc=west":2}`, expected: true}, + {localRegion: "us", constraints: `{"+region=us,+dc=east":3,"+region=eu":2}`, expected: false}, + + {localRegion: "us", leasePrefs: "[[]]", expected: false}, + {localRegion: "us", leasePrefs: "[[+dc=west]]", expected: false}, + {localRegion: "us", leasePrefs: "[[+region=us]]", expected: true}, + {localRegion: "us", leasePrefs: "[[+region=us,+dc=east]]", expected: true}, + + {localRegion: "us", constraints: "[+region=eu]", voterConstraints: "[+region=eu]", + leasePrefs: "[[+dc=west]]", expected: false}, + {localRegion: "us", constraints: "[+region=eu]", voterConstraints: "[+region=eu]", + leasePrefs: "[[+region=us]]", expected: false}, + {localRegion: "us", constraints: "[+region=us]", voterConstraints: "[+region=us]", + leasePrefs: "[[+dc=west]]", expected: true}, + {localRegion: "us", constraints: "[+region=us]", voterConstraints: "[+region=us]", + leasePrefs: "[[+region=us]]", expected: true}, + {localRegion: "us", constraints: "[+dc=east]", voterConstraints: "[+region=us]", + leasePrefs: "[[+region=us]]", expected: true}, + {localRegion: "us", constraints: "[+dc=east]", voterConstraints: "[+dc=east]", + leasePrefs: "[[+region=us]]", expected: true}, + {localRegion: "us", constraints: "[+dc=east]", voterConstraints: "[+dc=east]", + leasePrefs: "[[+dc=east]]", expected: false}, + {localRegion: "us", constraints: "[+region=us,+dc=east]", voterConstraints: "[-region=eu]", + leasePrefs: "[[+region=us,+dc=east]]", expected: true}, + {localRegion: "us", constraints: `{"+region=us":3,"+region=eu":2}`, + voterConstraints: `[+region=us]`, expected: true}, + {localRegion: "us", constraints: `{"+region=us":3,"+region=eu":2}`, + voterConstraints: `{"+region=us":1,"+region=eu":1}`, expected: false}, + {localRegion: "us", constraints: `{"+region=us":3,"+region=eu":2}`, + voterConstraints: `{"+region=us":1,"+region=eu":1}`, leasePrefs: "[[+region=us]]", expected: true}, + } + + for _, tc := range testCases { + zone := &zonepb.ZoneConfig{} + + if tc.constraints != "" { + constraintsList := &zonepb.ConstraintsList{} + if err := yaml.UnmarshalStrict([]byte(tc.constraints), constraintsList); err != nil { + t.Fatal(err) + } + zone.Constraints = constraintsList.Constraints + } + + if tc.voterConstraints != "" { + constraintsList := &zonepb.ConstraintsList{} + if err := yaml.UnmarshalStrict([]byte(tc.voterConstraints), constraintsList); err != nil { + t.Fatal(err) + } + zone.VoterConstraints = constraintsList.Constraints + } + + if tc.leasePrefs != "" { + if err := yaml.UnmarshalStrict([]byte(tc.leasePrefs), &zone.LeasePreferences); err != nil { + t.Fatal(err) + } + } + + actual := isZoneLocal(zone, tc.localRegion) + if actual != tc.expected { + t.Errorf("locality=%v, constraints=%v, voterConstraints=%v, leasePrefs=%v: expected %v, got %v", + tc.localRegion, tc.constraints, tc.voterConstraints, tc.leasePrefs, tc.expected, actual) + } + } +} diff --git a/pkg/sql/opt/xform/testdata/coster/zone b/pkg/sql/opt/xform/testdata/coster/zone index 4efdfd1ed247..02bf05be485d 100644 --- a/pkg/sql/opt/xform/testdata/coster/zone +++ b/pkg/sql/opt/xform/testdata/coster/zone @@ -668,3 +668,75 @@ scan t.public.abc@bc2 ├── fd: ()-->(2) ├── prune: (3) └── interesting orderings: (+2,+3) + +# -------------------------------------------------- +# Partition zones. +# -------------------------------------------------- + +exec-ddl +CREATE TABLE abc_part ( + r STRING NOT NULL CHECK (r IN ('east', 'west')), + a INT PRIMARY KEY, + b INT, + c STRING, + UNIQUE WITHOUT INDEX (b, c), + UNIQUE INDEX bc_idx (r, b, c) PARTITION BY LIST (r) ( + PARTITION east VALUES IN (('east')), + PARTITION west VALUES IN (('west')) + ), + INDEX b_idx (r, b) PARTITION BY LIST (r) ( + PARTITION east VALUES IN (('east')), + PARTITION west VALUES IN (('west')) + ) +) +---- + +exec-ddl +ALTER PARTITION "east" OF INDEX abc_part@bc_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=east: 2}', + lease_preferences = '[[+region=east]]' +---- + +exec-ddl +ALTER PARTITION "west" OF INDEX abc_part@bc_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=west: 2}', + lease_preferences = '[[+region=west]]'; +---- + +# We should prefer the locality optimized search here. +opt format=show-all locality=(region=east,dc=a) +SELECT * FROM abc_part WHERE b = 1 AND c = 'foo' +---- +locality-optimized-search + ├── columns: r:1(string!null) a:2(int!null) b:3(int!null) c:4(string!null) + ├── left columns: t.public.abc_part.r:6(string) t.public.abc_part.a:7(int) t.public.abc_part.b:8(int) t.public.abc_part.c:9(string) + ├── right columns: t.public.abc_part.r:11(string) t.public.abc_part.a:12(int) t.public.abc_part.b:13(int) t.public.abc_part.c:14(string) + ├── cardinality: [0 - 1] + ├── stats: [rows=0.910000001, distinct(3)=0.910000001, null(3)=0, distinct(4)=0.910000001, null(4)=0, distinct(3,4)=0.910000001, null(3,4)=0] + ├── cost: 5.101218 + ├── key: () + ├── fd: ()-->(1-4) + ├── prune: (1,2) + ├── interesting orderings: (+2) (+1,+3,+4,+2) (+1,+3,+2) + ├── scan t.public.abc_part@bc_idx + │ ├── columns: t.public.abc_part.r:6(string!null) t.public.abc_part.a:7(int!null) t.public.abc_part.b:8(int!null) t.public.abc_part.c:9(string!null) + │ ├── constraint: /6/8/9: [/'east'/1/'foo' - /'east'/1/'foo'] + │ ├── cardinality: [0 - 1] + │ ├── stats: [rows=0.9001, distinct(6)=0.9001, null(6)=0, distinct(8)=0.9001, null(8)=0, distinct(9)=0.9001, null(9)=0, distinct(6,8,9)=0.9001, null(6,8,9)=0] + │ ├── cost: 2.532058 + │ ├── key: () + │ ├── fd: ()-->(6-9) + │ ├── prune: (6-9) + │ └── interesting orderings: (+7) (+6,+8,+9,+7) (+6,+8,+7) + └── scan t.public.abc_part@bc_idx + ├── columns: t.public.abc_part.r:11(string!null) t.public.abc_part.a:12(int!null) t.public.abc_part.b:13(int!null) t.public.abc_part.c:14(string!null) + ├── constraint: /11/13/14: [/'west'/1/'foo' - /'west'/1/'foo'] + ├── cardinality: [0 - 1] + ├── stats: [rows=0.9001, distinct(11)=0.9001, null(11)=0, distinct(13)=0.9001, null(13)=0, distinct(14)=0.9001, null(14)=0, distinct(11,13,14)=0.9001, null(11,13,14)=0] + ├── cost: 2.532058 + ├── key: () + ├── fd: ()-->(11-14) + ├── prune: (11-14) + └── interesting orderings: (+12) (+11,+13,+14,+12) (+11,+13,+12) diff --git a/pkg/sql/opt/xform/testdata/rules/scan b/pkg/sql/opt/xform/testdata/rules/scan index 91eb69b23c3b..4303021ae3d1 100644 --- a/pkg/sql/opt/xform/testdata/rules/scan +++ b/pkg/sql/opt/xform/testdata/rules/scan @@ -429,3 +429,384 @@ insert fk_b │ ├── key: () │ └── fd: ()-->(6) └── filters (true) + +# -------------------------------------------------- +# GenerateLocalityOptimizedScan +# -------------------------------------------------- + +exec-ddl +CREATE TABLE abc_part ( + r STRING NOT NULL CHECK (r IN ('east', 'west', 'central')), + t INT NOT NULL CHECK (t IN (1, 2, 3)), + a INT PRIMARY KEY, + b INT, + c INT, + d INT, + UNIQUE WITHOUT INDEX (b), + UNIQUE WITHOUT INDEX (c), + UNIQUE INDEX b_idx (r, b) PARTITION BY LIST (r) ( + PARTITION east VALUES IN (('east')), + PARTITION west VALUES IN (('west')), + PARTITION central VALUES IN (('central')) + ), + UNIQUE INDEX c_idx (r, t, c) PARTITION BY LIST (r, t) ( + PARTITION east VALUES IN (('east', 1), ('east', 2)), + PARTITION west VALUES IN (('west', DEFAULT)), + PARTITION default VALUES IN (DEFAULT) + ), + INDEX d_idx (r, d) PARTITION BY LIST (r) ( + PARTITION east VALUES IN (('east')), + PARTITION west VALUES IN (('west')), + PARTITION central VALUES IN (('central')) + ) +) +---- + +exec-ddl +ALTER PARTITION "east" OF INDEX abc_part@b_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=east: 2}', + lease_preferences = '[[+region=east]]' +---- + +exec-ddl +ALTER PARTITION "west" OF INDEX abc_part@b_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=west: 2}', + lease_preferences = '[[+region=west]]'; +---- + +exec-ddl +ALTER PARTITION "central" OF INDEX abc_part@b_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=central: 2}', + lease_preferences = '[[+region=central]]'; +---- + +exec-ddl +ALTER PARTITION "east" OF INDEX abc_part@c_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=east: 2}', + lease_preferences = '[[+region=east]]' +---- + +exec-ddl +ALTER PARTITION "west" OF INDEX abc_part@c_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=west: 2}', + lease_preferences = '[[+region=west]]' +---- + +exec-ddl +ALTER PARTITION "default" OF INDEX abc_part@c_idx CONFIGURE ZONE USING + num_voters = 5, + lease_preferences = '[[+region=central]]'; +---- + +exec-ddl +ALTER PARTITION "east" OF INDEX abc_part@d_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=east: 2}', + lease_preferences = '[[+region=east]]' +---- + +exec-ddl +ALTER PARTITION "west" OF INDEX abc_part@d_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=west: 2}', + lease_preferences = '[[+region=west]]'; +---- + +exec-ddl +ALTER PARTITION "central" OF INDEX abc_part@d_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=central: 2}', + lease_preferences = '[[+region=central]]'; +---- + +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT a FROM abc_part WHERE b = 1 +---- +project + ├── columns: a:3!null + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(3) + └── locality-optimized-search + ├── columns: a:3!null b:4!null + ├── left columns: a:10 b:11 + ├── right columns: a:17 b:18 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(3,4) + ├── scan abc_part@b_idx + │ ├── columns: a:10!null b:11!null + │ ├── constraint: /8/11: [/'east'/1 - /'east'/1] + │ ├── cardinality: [0 - 1] + │ ├── key: () + │ └── fd: ()-->(10,11) + └── scan abc_part@b_idx + ├── columns: a:17!null b:18!null + ├── constraint: /15/18 + │ ├── [/'central'/1 - /'central'/1] + │ └── [/'west'/1 - /'west'/1] + ├── cardinality: [0 - 1] + ├── key: () + └── fd: ()-->(17,18) + +opt locality=(region=west) expect=GenerateLocalityOptimizedScan +SELECT * FROM abc_part WHERE b = 1 +---- +index-join abc_part + ├── columns: r:1!null t:2!null a:3!null b:4!null c:5 d:6 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1-6) + └── locality-optimized-search + ├── columns: r:1!null a:3!null b:4!null + ├── left columns: r:8 a:10 b:11 + ├── right columns: r:15 a:17 b:18 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1,3,4) + ├── scan abc_part@b_idx + │ ├── columns: r:8!null a:10!null b:11!null + │ ├── constraint: /8/11: [/'west'/1 - /'west'/1] + │ ├── cardinality: [0 - 1] + │ ├── key: () + │ └── fd: ()-->(8,10,11) + └── scan abc_part@b_idx + ├── columns: r:15!null a:17!null b:18!null + ├── constraint: /15/18 + │ ├── [/'central'/1 - /'central'/1] + │ └── [/'east'/1 - /'east'/1] + ├── cardinality: [0 - 1] + ├── key: () + └── fd: ()-->(15,17,18) + +opt locality=(region=central) expect=GenerateLocalityOptimizedScan +SELECT * FROM abc_part WHERE b = 1 +---- +index-join abc_part + ├── columns: r:1!null t:2!null a:3!null b:4!null c:5 d:6 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1-6) + └── locality-optimized-search + ├── columns: r:1!null a:3!null b:4!null + ├── left columns: r:8 a:10 b:11 + ├── right columns: r:15 a:17 b:18 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1,3,4) + ├── scan abc_part@b_idx + │ ├── columns: r:8!null a:10!null b:11!null + │ ├── constraint: /8/11: [/'central'/1 - /'central'/1] + │ ├── cardinality: [0 - 1] + │ ├── key: () + │ └── fd: ()-->(8,10,11) + └── scan abc_part@b_idx + ├── columns: r:15!null a:17!null b:18!null + ├── constraint: /15/18 + │ ├── [/'east'/1 - /'east'/1] + │ └── [/'west'/1 - /'west'/1] + ├── cardinality: [0 - 1] + ├── key: () + └── fd: ()-->(15,17,18) + +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT * FROM abc_part WHERE c = 1 +---- +index-join abc_part + ├── columns: r:1!null t:2!null a:3!null b:4 c:5!null d:6 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1-6) + └── locality-optimized-search + ├── columns: r:1!null t:2!null a:3!null c:5!null + ├── left columns: r:8 t:9 a:10 c:12 + ├── right columns: r:15 t:16 a:17 c:19 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1-3,5) + ├── scan abc_part@c_idx + │ ├── columns: r:8!null t:9!null a:10!null c:12!null + │ ├── constraint: /8/9/12 + │ │ ├── [/'east'/1/1 - /'east'/1/1] + │ │ └── [/'east'/2/1 - /'east'/2/1] + │ ├── cardinality: [0 - 1] + │ ├── key: () + │ └── fd: ()-->(8-10,12) + └── scan abc_part@c_idx + ├── columns: r:15!null t:16!null a:17!null c:19!null + ├── constraint: /15/16/19 + │ ├── [/'central'/1/1 - /'central'/1/1] + │ ├── [/'central'/2/1 - /'central'/2/1] + │ ├── [/'central'/3/1 - /'central'/3/1] + │ ├── [/'east'/3/1 - /'east'/3/1] + │ ├── [/'west'/1/1 - /'west'/1/1] + │ ├── [/'west'/2/1 - /'west'/2/1] + │ └── [/'west'/3/1 - /'west'/3/1] + ├── cardinality: [0 - 1] + ├── key: () + └── fd: ()-->(15-17,19) + +opt locality=(region=west) expect=GenerateLocalityOptimizedScan +SELECT * FROM abc_part WHERE c = 1 +---- +index-join abc_part + ├── columns: r:1!null t:2!null a:3!null b:4 c:5!null d:6 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1-6) + └── locality-optimized-search + ├── columns: r:1!null t:2!null a:3!null c:5!null + ├── left columns: r:8 t:9 a:10 c:12 + ├── right columns: r:15 t:16 a:17 c:19 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1-3,5) + ├── scan abc_part@c_idx + │ ├── columns: r:8!null t:9!null a:10!null c:12!null + │ ├── constraint: /8/9/12 + │ │ ├── [/'west'/1/1 - /'west'/1/1] + │ │ ├── [/'west'/2/1 - /'west'/2/1] + │ │ └── [/'west'/3/1 - /'west'/3/1] + │ ├── cardinality: [0 - 1] + │ ├── key: () + │ └── fd: ()-->(8-10,12) + └── scan abc_part@c_idx + ├── columns: r:15!null t:16!null a:17!null c:19!null + ├── constraint: /15/16/19 + │ ├── [/'central'/1/1 - /'central'/1/1] + │ ├── [/'central'/2/1 - /'central'/2/1] + │ ├── [/'central'/3/1 - /'central'/3/1] + │ ├── [/'east'/1/1 - /'east'/1/1] + │ ├── [/'east'/2/1 - /'east'/2/1] + │ └── [/'east'/3/1 - /'east'/3/1] + ├── cardinality: [0 - 1] + ├── key: () + └── fd: ()-->(15-17,19) + +opt locality=(region=central) expect=GenerateLocalityOptimizedScan +SELECT * FROM abc_part WHERE c = 1 +---- +index-join abc_part + ├── columns: r:1!null t:2!null a:3!null b:4 c:5!null d:6 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1-6) + └── locality-optimized-search + ├── columns: r:1!null t:2!null a:3!null c:5!null + ├── left columns: r:8 t:9 a:10 c:12 + ├── right columns: r:15 t:16 a:17 c:19 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(1-3,5) + ├── scan abc_part@c_idx + │ ├── columns: r:8!null t:9!null a:10!null c:12!null + │ ├── constraint: /8/9/12 + │ │ ├── [/'central'/1/1 - /'central'/1/1] + │ │ ├── [/'central'/2/1 - /'central'/2/1] + │ │ ├── [/'central'/3/1 - /'central'/3/1] + │ │ └── [/'east'/3/1 - /'east'/3/1] + │ ├── cardinality: [0 - 1] + │ ├── key: () + │ └── fd: ()-->(8-10,12) + └── scan abc_part@c_idx + ├── columns: r:15!null t:16!null a:17!null c:19!null + ├── constraint: /15/16/19 + │ ├── [/'east'/1/1 - /'east'/1/1] + │ ├── [/'east'/2/1 - /'east'/2/1] + │ ├── [/'west'/1/1 - /'west'/1/1] + │ ├── [/'west'/2/1 - /'west'/2/1] + │ └── [/'west'/3/1 - /'west'/3/1] + ├── cardinality: [0 - 1] + ├── key: () + └── fd: ()-->(15-17,19) + +# b is not constrained to a single value. +opt locality=(region=east) expect-not=GenerateLocalityOptimizedScan +SELECT a FROM abc_part WHERE b IN (1, 2) +---- +project + ├── columns: a:3!null + ├── cardinality: [0 - 2] + ├── key: (3) + └── scan abc_part@b_idx + ├── columns: a:3!null b:4!null + ├── constraint: /1/4 + │ ├── [/'central'/1 - /'central'/2] + │ ├── [/'east'/1 - /'east'/2] + │ └── [/'west'/1 - /'west'/2] + ├── cardinality: [0 - 2] + ├── key: (3) + └── fd: (3)-->(4), (4)-->(3) + +# The spans target all remote partitions. +opt locality=(region=east) expect-not=GenerateLocalityOptimizedScan +SELECT a FROM abc_part WHERE b = 1 AND r IN ('west', 'central') +---- +project + ├── columns: a:3!null + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(3) + └── scan abc_part@b_idx + ├── columns: r:1!null a:3!null b:4!null + ├── constraint: /1/4 + │ ├── [/'central'/1 - /'central'/1] + │ └── [/'west'/1 - /'west'/1] + ├── cardinality: [0 - 1] + ├── key: () + └── fd: ()-->(1,3,4) + +# The scan is limited. +opt locality=(region=east) expect-not=GenerateLocalityOptimizedScan +SELECT a FROM abc_part WHERE d = 1 LIMIT 1 +---- +project + ├── columns: a:3!null + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(3) + └── scan abc_part@d_idx + ├── columns: a:3!null d:6!null + ├── constraint: /1/6/3 + │ ├── [/'central'/1 - /'central'/1] + │ ├── [/'east'/1 - /'east'/1] + │ └── [/'west'/1 - /'west'/1] + ├── limit: 1 + ├── key: () + └── fd: ()-->(3,6) + +# The scan is limited, but b is known to be a key, so the limit is discarded. +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT a FROM abc_part WHERE b = 1 LIMIT 1 +---- +project + ├── columns: a:3!null + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(3) + └── locality-optimized-search + ├── columns: a:3!null b:4!null + ├── left columns: a:10 b:11 + ├── right columns: a:17 b:18 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(3,4) + ├── scan abc_part@b_idx + │ ├── columns: a:10!null b:11!null + │ ├── constraint: /8/11: [/'east'/1 - /'east'/1] + │ ├── cardinality: [0 - 1] + │ ├── key: () + │ └── fd: ()-->(10,11) + └── scan abc_part@b_idx + ├── columns: a:17!null b:18!null + ├── constraint: /15/18 + │ ├── [/'central'/1 - /'central'/1] + │ └── [/'west'/1 - /'west'/1] + ├── cardinality: [0 - 1] + ├── key: () + └── fd: ()-->(17,18) diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 36b2c35814d7..09147f4bbd74 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -128,6 +128,7 @@ func (ef *execFactory) ConstructScan( scan.lockingStrength = descpb.ToScanLockingStrength(params.Locking.Strength) scan.lockingWaitPolicy = descpb.ToScanLockingWaitPolicy(params.Locking.WaitPolicy) } + scan.localityOptimized = params.LocalityOptimized return scan, nil } @@ -496,9 +497,17 @@ func (ef *execFactory) ConstructDistinct( // ConstructSetOp is part of the exec.Factory interface. func (ef *execFactory) ConstructSetOp( - typ tree.UnionType, all bool, left, right exec.Node, + typ tree.UnionType, all bool, left, right exec.Node, hardLimit uint64, ) (exec.Node, error) { - return ef.planner.newUnionNode(typ, all, left.(planNode), right.(planNode)) + if hardLimit != 0 && (typ != tree.UnionOp || !all) { + return nil, errors.AssertionFailedf("a hard limit on a set operator is only supported for UNION ALL") + } + if hardLimit > 1 { + return nil, errors.AssertionFailedf( + "locality optimized search is not yet supported for more than one row at a time", + ) + } + return ef.planner.newUnionNode(typ, all, left.(planNode), right.(planNode), hardLimit) } // ConstructSort is part of the exec.Factory interface. diff --git a/pkg/sql/physicalplan/physical_plan.go b/pkg/sql/physicalplan/physical_plan.go index c3c993140d8a..fdef2842c374 100644 --- a/pkg/sql/physicalplan/physical_plan.go +++ b/pkg/sql/physicalplan/physical_plan.go @@ -1172,7 +1172,9 @@ func (p *PhysicalPlan) AddDistinctSetOpStage( // TODO(radu): a no-op processor is not ideal if the next processor is on the // same node. A fix for that is much more complicated, requiring remembering // extra state in the PhysicalPlan. -func (p *PhysicalPlan) EnsureSingleStreamPerNode(forceSerialization bool) { +func (p *PhysicalPlan) EnsureSingleStreamPerNode( + forceSerialization bool, post execinfrapb.PostProcessSpec, +) { // Fast path - check if we need to do anything. var nodes util.FastIntSet var foundDuplicates bool @@ -1217,6 +1219,7 @@ func (p *PhysicalPlan) EnsureSingleStreamPerNode(forceSerialization bool) { // The other fields will be filled in by MergeResultStreams. ColumnTypes: p.GetResultTypes(), }}, + Post: post, Core: execinfrapb.ProcessorCoreUnion{Noop: &execinfrapb.NoopCoreSpec{}}, Output: []execinfrapb.OutputRouterSpec{{Type: execinfrapb.OutputRouterSpec_PASS_THROUGH}}, ResultTypes: p.GetResultTypes(), @@ -1232,10 +1235,12 @@ func (p *PhysicalPlan) EnsureSingleStreamPerNode(forceSerialization bool) { // Note that if the last stage consists of a single processor planned on a // remote node, such stage is considered distributed. func (p *PhysicalPlan) GetLastStageDistribution() PlanDistribution { - if len(p.ResultRouters) == 1 && p.Processors[p.ResultRouters[0]].Node == p.GatewayNodeID { - return LocalPlan + for i := range p.ResultRouters { + if p.Processors[p.ResultRouters[i]].Node != p.GatewayNodeID { + return FullyDistributedPlan + } } - return FullyDistributedPlan + return LocalPlan } // IsLastStageDistributed returns whether the last stage of processors is diff --git a/pkg/sql/scan.go b/pkg/sql/scan.go index 0faf972d4fa2..91ad24885d8f 100644 --- a/pkg/sql/scan.go +++ b/pkg/sql/scan.go @@ -102,6 +102,13 @@ type scanNode struct { // containsSystemColumns holds whether or not this scan is expected to // produce any system columns. containsSystemColumns bool + + // localityOptimized is true if this scan is part of a locality optimized + // search strategy, which uses a limited UNION ALL operator to try to find a + // row on nodes in the gateway's region before fanning out to remote nodes. In + // order for this optimization to work, the DistSQL planner must create a + // local plan. + localityOptimized bool } // scanColumnsConfig controls the "schema" of a scan node. diff --git a/pkg/sql/union.go b/pkg/sql/union.go index 02913d47eef2..3ab9ddb5a428 100644 --- a/pkg/sql/union.go +++ b/pkg/sql/union.go @@ -76,10 +76,17 @@ type unionNode struct { unionType tree.UnionType // all indicates if the operation is the ALL or DISTINCT version all bool + + // hardLimit can only be set for UNION ALL operations. It is used to implement + // locality optimized search, and instructs the execution engine that it + // should execute the left node to completion and possibly short-circuit if + // the limit is reached before executing the right node. The limit is + // guaranteed but the short-circuit behavior is not. + hardLimit uint64 } func (p *planner) newUnionNode( - typ tree.UnionType, all bool, left, right planNode, + typ tree.UnionType, all bool, left, right planNode, hardLimit uint64, ) (planNode, error) { emitAll := false switch typ { @@ -137,6 +144,7 @@ func (p *planner) newUnionNode( emitAll: emitAll, unionType: typ, all: all, + hardLimit: hardLimit, } return node, nil }