Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
60831: opt,sql: add support for locality optimized scans r=rytaft a=rytaft

This commit adds support for a new type of operator called
`LocalityOptimizedSearch`. If the session setting
`locality_optimized_partitioned_index_scan` is true, the optimizer now plans
a `LocalityOptimizedSearch` operation whenever possible.
`LocalityOptimizedSearch` is similar to `UnionAll`, but it is designed to avoid
communicating with remote nodes (relative to the gateway region) if at all
possible.

`LocalityOptimizedSearch` can be planned under the following conditions:
 - A scan is known to produce at most one row.
 - The scan contains multiple spans, with some spans targeting partitions on
   local nodes (relative to the gateway region), and some targeting
   partitions on remote nodes. It is not known which span will produce the
   row.

This commit adds a new exploration rule called `GenerateLocalityOptimizedScan`
that generates a `LocalityOptimizedSearch` if the above conditions hold.
The result of `GenerateLocalityOptimizedScan` will be a `LocalityOptimizedSearch`
in which the left child contains a new scan operator with the local spans
from the orginal scan, and the right child contains a new scan operator with
the remote spans. The `LocalityOptimizedSearch` operator ensures that the right
child (containing remote spans) is only executed if the left child
(containing local spans) does not return any rows.

This is a useful optimization if there is locality of access in the workload,
such that rows tend to be accessed from the region where they are located.
If there is no locality of access, using `LocalityOptimizedSearch` could be a
slight pessimization, since rows residing in remote regions will be fetched
slightly more slowly than they would be otherwise.

For example, suppose we have a multi-region database with regions 'us-east1',
'us-west1' and 'europe-west1', and we have the following table and query,
issued from 'us-east1':
```
  CREATE TABLE tab (
    k INT PRIMARY KEY,
    v INT
  ) LOCALITY REGIONAL BY ROW;

  SELECT * FROM tab WHERE k = 10;
```
Normally, this would produce the following plan:
```
  scan tab
   └── constraint: /3/1
        ├── [/'europe-west1'/10 - /'europe-west1'/10]
        ├── [/'us-east1'/10 - /'us-east1'/10]
        └── [/'us-west1'/10 - /'us-west1'/10]
```
but if the session setting `locality_optimized_partitioned_index_scan` is
enabled, the optimizer will produce this plan, using locality optimized
search:
```
  locality-optimized-search
   ├── scan tab
   │    └── constraint: /9/7: [/'us-east1'/10 - /'us-east1'/10]
   └── scan tab
        └── constraint: /14/12
             ├── [/'europe-west1'/10 - /'europe-west1'/10]
             └── [/'us-west1'/10 - /'us-west1'/10]
```
As long as `k = 10` is located in 'us-east1', the second plan will be much
faster. But if `k = 10` is located in one of the other regions, the first plan
would be slightly faster.

Informs cockroachdb#55185

Release justification: This commit is a low risk, high benefit change to
existing functionality. The session setting that enables the change is
locality_optimized_partitioned_index_scan, which is still disabled by default.

Release note (performance improvement): If the session setting
locality_optimized_partitioned_index_scan is enabled, the optimizer will try
to plan scans known to produce at most one row using "locality optimized
search". This optimization applies for REGIONAL BY ROW tables, and if enabled,
it means that the execution engine will first search locally for the row
before searching remote nodes. If the row is found in a local node, remote
nodes will not be searched.

Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
craig[bot] and rytaft committed Feb 25, 2021
2 parents 65ce294 + e77bf5e commit 2d16ce1
Show file tree
Hide file tree
Showing 24 changed files with 1,316 additions and 39 deletions.
121 changes: 116 additions & 5 deletions pkg/ccl/logictestccl/testdata/logic_test/regional_by_row
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,110 @@ pk pk2 a b j
statement error cannot drop column crdb_region as it is used to store the region in a REGIONAL BY ROW table\nHINT: You must change the table locality before dropping this table
ALTER TABLE regional_by_row_table DROP COLUMN crdb_region

# Query with locality optimized search disabled.
query T
EXPLAIN SELECT * FROM regional_by_row_table WHERE pk = 1
----
distribution: local
vectorized: true
·
• scan
missing stats
table: regional_by_row_table@primary
spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1] [/'ca-central-1'/1 - /'ca-central-1'/1] [/'us-east-1'/1 - /'us-east-1'/1]

statement ok
SET tracing = on,kv,results; SELECT * FROM regional_by_row_table WHERE pk = 1; SET tracing = off

# All rows are scanned at once without the optimization.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/66/1/"@"/1{-/#}, /Table/66/1/"\x80"/1{-/#}, /Table/66/1/"\xc0"/1{-/#}
fetched: /regional_by_row_table/primary/'ap-southeast-2'/1/pk2/a/b/j -> /1/2/3/'{"a": "b"}'
output row: [1 1 2 3 '{"a": "b"}']

statement ok
SET locality_optimized_partitioned_index_scan = true

# Same query with locality optimized search enabled.
query T
EXPLAIN (DISTSQL) SELECT * FROM regional_by_row_table WHERE pk = 1
----
distribution: local
vectorized: true
·
• union all
│ limit: 1
├── • scan
│ missing stats
│ table: regional_by_row_table@primary
│ spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1]
└── • scan
missing stats
table: regional_by_row_table@primary
spans: [/'ca-central-1'/1 - /'ca-central-1'/1] [/'us-east-1'/1 - /'us-east-1'/1]
·
Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJykkVEL0zAQx9_9FOF8Uclo0ylIQIhoxULdZjtQsGVkzTGLXVKTFDdGv7usRbbJJmw-3l3-l98vOYD72QCH-OsifZvMyLP3Sb7MP6fPSR6n8bsleUE-ZPNPxOKmNlo2q_V-Zc2vlZfrBsmXj3EWk_YHeUMYUNBG4Uxu0QH_BgxKCq01FTpn7LF1GA4kagc8pFDrtvPHdkmhMhaBH8DXvkHgsDxuz1AqtEEIFBR6WTfD2qsgorX1Vto9UMhbqR0nQQFFsXsdFhCwyXkRPCVSK8KI8d_RAoV55zkRjIqIiikVL6l4BWVPwXT-BOi83CBw1tPHJNijEuKPgBjh7wSObgKfODttrEKL6oKx7K8ozczEtEF0KZPW29oTdpMhvOfRMnSt0Q7_Yrm1uaSAaoOjkDOdrXBhTTVcM5bzITc0FDo_TqOxSPQwGn71PMz-Jxz9Mzy9CId92T_5HQAA__-b4SGh

statement ok
SET tracing = on,kv,results; SELECT * FROM regional_by_row_table WHERE pk = 1; SET tracing = off

# If the row is found in the local region, the other regions are not searched.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/66/1/"@"/1{-/#}
fetched: /regional_by_row_table/primary/'ap-southeast-2'/1/pk2/a/b/j -> /1/2/3/'{"a": "b"}'
output row: [1 1 2 3 '{"a": "b"}']

statement ok
SET tracing = on,kv,results; SELECT * FROM regional_by_row_table WHERE pk = 10; SET tracing = off

# If the row is not found in the local region, the other regions are searched
# in parallel.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/66/1/"@"/10{-/#}
Scan /Table/66/1/"\x80"/10{-/#}, /Table/66/1/"\xc0"/10{-/#}
fetched: /regional_by_row_table/primary/'ca-central-1'/10/pk2/a/b -> /10/11/12
output row: [10 10 11 12 NULL]

# The local region for this query is ca-central-1, so that span should be
# scanned in the first child of the limited union all.
query T nodeidx=3
USE multi_region_test_db; SET locality_optimized_partitioned_index_scan = true;
EXPLAIN SELECT * FROM regional_by_row_table WHERE pk = 1
----
distribution: local
vectorized: true
·
• union all
│ limit: 1
├── • scan
│ missing stats
│ table: regional_by_row_table@primary
│ spans: [/'ca-central-1'/1 - /'ca-central-1'/1]
└── • scan
missing stats
table: regional_by_row_table@primary
spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1] [/'us-east-1'/1 - /'us-east-1'/1]


# Tests creating a index and a unique constraint on a REGIONAL BY ROW table.
statement ok
CREATE INDEX new_idx ON regional_by_row_table(a, b)
Expand Down Expand Up @@ -637,11 +741,18 @@ vectorized: true
│ ├── • values
│ │ size: 6 columns, 1 row
│ │
│ └── • scan
│ missing stats
│ table: regional_by_row_table@primary
│ spans: [/'ap-southeast-2'/2 - /'ap-southeast-2'/2] [/'ca-central-1'/2 - /'ca-central-1'/2] [/'us-east-1'/2 - /'us-east-1'/2]
│ locking strength: for update
│ └── • union all
│ │ limit: 1
│ │
│ ├── • scan
│ │ missing stats
│ │ table: regional_by_row_table@primary
│ │ spans: [/'ap-southeast-2'/2 - /'ap-southeast-2'/2]
│ │
│ └── • scan
│ missing stats
│ table: regional_by_row_table@primary
│ spans: [/'ca-central-1'/2 - /'ca-central-1'/2] [/'us-east-1'/2 - /'us-east-1'/2]
├── • constraint-check
│ │
Expand Down
70 changes: 53 additions & 17 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,18 +471,22 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return cannotDistribute, cannotDistributeRowLevelLockingErr
}

// Although we don't yet recommend distributing plans where soft limits
// propagate to scan nodes because we don't have infrastructure to only
// plan for a few ranges at a time, the propagation of the soft limits
// to scan nodes has been added in 20.1 release, so to keep the
// previous behavior we continue to ignore the soft limits for now.
// TODO(yuzefovich): pay attention to the soft limits.
rec := canDistribute
// Check if we are doing a full scan.
if n.isFull {
rec = rec.compose(shouldDistribute)
switch {
case n.localityOptimized:
// This is a locality optimized scan.
return cannotDistribute, nil
case n.isFull:
// This is a full scan.
return shouldDistribute, nil
default:
// Although we don't yet recommend distributing plans where soft limits
// propagate to scan nodes because we don't have infrastructure to only
// plan for a few ranges at a time, the propagation of the soft limits
// to scan nodes has been added in 20.1 release, so to keep the
// previous behavior we continue to ignore the soft limits for now.
// TODO(yuzefovich): pay attention to the soft limits.
return canDistribute, nil
}
return rec, nil

case *sortNode:
rec, err := checkSupportForPlanNode(n.plan)
Expand Down Expand Up @@ -3303,6 +3307,10 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
p.SetMergeOrdering(mergeOrdering)

if !n.all {
if n.hardLimit != 0 {
return nil, errors.AssertionFailedf("a hard limit is not supported for UNION (only for UNION ALL)")
}

// TODO(abhimadan): use columns from mergeOrdering to fill in the
// OrderingColumns field in DistinctSpec once the unused columns
// are projected out.
Expand All @@ -3325,12 +3333,36 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
// on a single node (which is always the case when there are mutations),
// we can fuse everything so there are no concurrent KV operations (see
// #40487, #41307).
//
// Furthermore, in order to disable auto-parallelism that could occur
// when merging multiple streams on the same node, we force the
// serialization of the merge operation (otherwise, it would be
// possible that we have a source of unbounded parallelism, see #51548).
p.EnsureSingleStreamPerNode(true /* forceSerialization */)

if n.hardLimit == 0 {
// In order to disable auto-parallelism that could occur when merging
// multiple streams on the same node, we force the serialization of the
// merge operation (otherwise, it would be possible that we have a
// source of unbounded parallelism, see #51548).
p.EnsureSingleStreamPerNode(true /* forceSerialization */, execinfrapb.PostProcessSpec{})
} else {
if p.GetLastStageDistribution() != physicalplan.LocalPlan {
return nil, errors.AssertionFailedf("we expect that limited UNION ALL queries are only planned locally")
}
if len(p.MergeOrdering.Columns) != 0 {
return nil, errors.AssertionFailedf(
"we expect that limited UNION ALL queries do not require a specific ordering",
)
}
// Here we don't force the serialization so that the unordered
// synchronizer is used. Additionally, because the plan will be fully
// local, we will use the flowinfra.FuseAggressively option. As a
// result, the plan will end up with a serial unordered synchronizer,
// which has exactly the behavior that we want (in particular, it won't
// execute the right child if the limit is reached by the left child).
// TODO(rytaft,yuzefovich): This currently only works with the
// vectorized engine. We should consider adding support for the serial
// unordered synchronizer in the row-based engine (see #61081).
p.EnsureSingleStreamPerNode(
false, /* forceSerialization */
execinfrapb.PostProcessSpec{Limit: n.hardLimit},
)
}

// UNION ALL is special: it doesn't have any required downstream
// processor, so its two inputs might have different post-processing
Expand All @@ -3341,6 +3373,10 @@ func (dsp *DistSQLPlanner) createPlanForSetOp(
}
}
} else {
if n.hardLimit != 0 {
return nil, errors.AssertionFailedf("a hard limit is not supported for INTERSECT or EXCEPT")
}

// We plan INTERSECT and EXCEPT queries with joiners. Get the appropriate
// join type.
joinType := distsqlSetOpJoinType(n.unionType)
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ func (e *distSQLSpecExecFactory) ConstructScan(
// previous behavior we continue to ignore the soft limits for now.
// TODO(yuzefovich): pay attention to the soft limits.
recommendation := canDistribute
if params.LocalityOptimized {
recommendation = recommendation.compose(cannotDistribute)
}
planCtx := e.getPlanCtx(recommendation)
p := planCtx.NewPhysicalPlan()

Expand Down Expand Up @@ -589,7 +592,7 @@ func (e *distSQLSpecExecFactory) ConstructDistinct(
}

func (e *distSQLSpecExecFactory) ConstructSetOp(
typ tree.UnionType, all bool, left, right exec.Node,
typ tree.UnionType, all bool, left, right exec.Node, hardLimit uint64,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: set op")
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/opt/constraint/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ func (sp *Span) HasSingleKey(evalCtx *tree.EvalContext) bool {
return true
}

// Prefix returns the length of the longest prefix of values for which the
// span has the same start and end values. For example, [/1/1/1 - /1/1/2]
// has prefix 2.
func (sp *Span) Prefix(evalCtx *tree.EvalContext) int {
start := sp.StartKey()
end := sp.EndKey()

for prefix := 0; ; prefix++ {
if start.Length() <= prefix || end.Length() <= prefix ||
start.Value(prefix).Compare(evalCtx, end.Value(prefix)) != 0 {
return prefix
}
}
}

// StartKey returns the start key.
func (sp *Span) StartKey() Key {
return sp.start
Expand Down
19 changes: 17 additions & 2 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ func (b *Builder) scanParams(
Parallelize: parallelize,
Locking: locking,
EstimatedRowCount: rowCount,
LocalityOptimized: scan.LocalityOptimized,
}, outputMap, nil
}

Expand Down Expand Up @@ -1314,7 +1315,7 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) {
switch set.Op() {
case opt.UnionOp:
typ, all = tree.UnionOp, false
case opt.UnionAllOp:
case opt.UnionAllOp, opt.LocalityOptimizedSearchOp:
typ, all = tree.UnionOp, true
case opt.IntersectOp:
typ, all = tree.IntersectOp, false
Expand All @@ -1328,7 +1329,21 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) {
panic(errors.AssertionFailedf("invalid operator %s", log.Safe(set.Op())))
}

node, err := b.factory.ConstructSetOp(typ, all, left.root, right.root)
hardLimit := uint64(0)
if set.Op() == opt.LocalityOptimizedSearchOp {
// If we are performing locality optimized search, set a limit equal to
// the maximum possible number of rows. This will tell the execution engine
// not to execute the right child if the limit is reached by the left
// child.
// TODO(rytaft): Store the limit in the expression.
hardLimit = uint64(set.Relational().Cardinality.Max)
if hardLimit > 1 {
panic(errors.AssertionFailedf(
"locality optimized search is not yet supported for more than one row at a time",
))
}
}
node, err := b.factory.ConstructSetOp(typ, all, left.root, right.root, hardLimit)
if err != nil {
return execPlan{}, err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/opt/exec/explain/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,12 @@ func (e *emitter) emitNodeAttributes(n *Node) error {
ob.Attr("already ordered", colinfo.ColumnOrdering(a.Ordering[:p]).String(n.Columns()))
}

case setOpOp:
a := n.args.(*setOpArgs)
if a.HardLimit > 0 {
ob.Attr("limit", a.HardLimit)
}

case indexJoinOp:
a := n.args.(*indexJoinArgs)
ob.Attrf("table", "%s@%s", a.Table.Name(), a.Table.Index(0).Name())
Expand Down Expand Up @@ -720,7 +726,6 @@ func (e *emitter) emitNodeAttributes(n *Node) error {

case simpleProjectOp,
serializingProjectOp,
setOpOp,
ordinalityOp,
max1RowOp,
explainOptOp,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ type ScanParams struct {
Locking *tree.LockingItem

EstimatedRowCount float64

// If true, we are performing a locality optimized search. In order for this
// to work correctly, the execution engine must create a local DistSQL plan
// for the main query (subqueries and postqueries need not be local).
LocalityOptimized bool
}

// OutputOrdering indicates the required output ordering on a Node that is being
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,18 @@ define Distinct {
# SetOp performs a UNION / INTERSECT / EXCEPT operation (either the ALL or the
# DISTINCT version). The left and right nodes must have the same number of
# columns.
#
# HardLimit can only be set for UNION ALL operations. It is used to implement
# locality optimized search, and instructs the execution engine that it should
# execute the left node to completion and possibly short-circuit if the limit is
# reached before executing the right node. The limit is guaranteed but the
# short-circuit behavior is not.
define SetOp {
Typ tree.UnionType
All bool
Left exec.Node
Right exec.Node
HardLimit uint64
}

# Sort performs a resorting of the rows produced by the input node.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/memo/check_expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (m *Memo) CheckExpr(e opt.Expr) {
case *SelectExpr:
checkFilters(t.Filters)

case *UnionExpr, *UnionAllExpr:
case *UnionExpr, *UnionAllExpr, *LocalityOptimizedSearchExpr:
setPrivate := t.Private().(*SetPrivate)
outColSet := setPrivate.OutCols.ToSet()

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/opt/memo/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ func (f *WindowFrame) String() string {
func (s *ScanPrivate) IsCanonical() bool {
return s.Index == cat.PrimaryIndex &&
s.Constraint == nil &&
s.HardLimit == 0
s.HardLimit == 0 &&
!s.LocalityOptimized
}

// IsUnfiltered returns true if the ScanPrivate will produce all rows in the
Expand Down
Loading

0 comments on commit 2d16ce1

Please sign in to comment.