Skip to content

Commit

Permalink
opt: locality optimized scan for queries with a LIMIT clause
Browse files Browse the repository at this point in the history
This commit adds locality optimized scan support for queries which
place a hard limit on the number of rows returned via the LIMIT clause.
This optimization benefits tables with REGIONAL BY ROW locality by
splitting the spans accessed into a local spans set and a remote spans
set, combined via a UNION ALL operation where each branch of the UNION
ALL has the same hard limit as the original SELECT query block. If the
limit is reached by scanning just the local spans, then latency is
improved.

The optimization is not applied if the LIMIT is more than the KV batch
size of 100000 rows or if the number of spans in the scan exceeds 10000.

This commit also adds an improvement to span merging to avoid merging
local spans with remote spans in order to maximize the number of queries
that can utilize locality optimized scan.

Informs cockroachdb#64862

Release note (Performance Improvement): Queries with a LIMIT clause
applied against a single table, either explicitly written, or implicit
such as in an uncorrelated EXISTS subquery, now scan that table with
improved latency if the table is defined with LOCALITY REGIONAL BY ROW
and the number of qualified rows residing in the local region is less
than or equal to the hard limit (sum of the LIMIT clause and optional
OFFSET clause values). This optimization is only applied if the hard
limit is 100000 or less.
  • Loading branch information
Mark Sirek committed Feb 4, 2022
1 parent 1e96583 commit 3350612
Show file tree
Hide file tree
Showing 23 changed files with 2,056 additions and 414 deletions.

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/sql/opt/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/server/telemetry",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/opt/cat",
"//pkg/sql/opt/partition",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/privilege",
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt/constraint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"constraint_set.go",
"key.go",
"key_extension.go",
"locality.go",
"span.go",
"spans.go",
"testutils.go",
Expand All @@ -16,6 +17,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/sql/opt",
"//pkg/sql/opt/partition",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"@com_github_cockroachdb_errors//:errors",
Expand All @@ -38,9 +40,11 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/opt",
"//pkg/sql/opt/partition",
"//pkg/sql/opt/testutils/testcat",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/encoding",
"//pkg/util/leaktest",
"//pkg/util/randutil",
Expand Down
45 changes: 43 additions & 2 deletions pkg/sql/opt/constraint/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/partition"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -471,14 +472,53 @@ func (c *Constraint) Combine(evalCtx *tree.EvalContext, other *Constraint) {

// ConsolidateSpans merges spans that have consecutive boundaries. For example:
// [/1 - /2] [/3 - /4] becomes [/1 - /4].
func (c *Constraint) ConsolidateSpans(evalCtx *tree.EvalContext) {
// An optional PrefixSorter parameter describes the localities of partitions in
// the index for which the Constraint is being built. Spans belonging to 100%
// local partitions will not be consolidated with spans that overlap any remote
// row ranges. A local row range is one whose leaseholder region preference is
// the same region as the gateway region.
func (c *Constraint) ConsolidateSpans(evalCtx *tree.EvalContext, ps *partition.PrefixSorter) {
keyCtx := KeyContext{Columns: c.Columns, EvalCtx: evalCtx}
var result Spans

if c.Spans.Count() < 1 {
return
}
indexHasLocalAndRemoteParts := ps != nil
spanIsLocal, lastSpanIsLocal, localRemoteCrossover := false, false, false

// Initializations for the first span so we avoid putting a conditional in the
// below 'for' loop
if indexHasLocalAndRemoteParts {
last := c.Spans.Get(0)
if match, ok := FindMatch(last, ps); ok {
if match.IsLocal {
lastSpanIsLocal = true
}
}
}

for i := 1; i < c.Spans.Count(); i++ {
last := c.Spans.Get(i - 1)
sp := c.Spans.Get(i)
if indexHasLocalAndRemoteParts {
spanIsLocal = false
if match, ok := FindMatch(sp, ps); ok {
if match.IsLocal {
spanIsLocal = true
}
}
// If last span is in the local gateway region and the current span is
// not, or vice versa, save this info so we don't combine these spans.
localRemoteCrossover = spanIsLocal != lastSpanIsLocal
}
// Do not merge local spans with remote spans because a span must be 100%
// local in order to utilize locality optimized search.
// An example query on a LOCALITY REGIONAL BY ROW table which this
// benefits is:
// SELECT * FROM regional_by_row_table WHERE pk <> 4 LIMIT 3;
if last.endBoundary == IncludeBoundary && sp.startBoundary == IncludeBoundary &&
sp.start.IsNextKey(&keyCtx, last.end) {
sp.start.IsNextKey(&keyCtx, last.end) && !localRemoteCrossover {
// We only initialize `result` if we need to change something.
if result.Count() == 0 {
result.Alloc(c.Spans.Count() - 1)
Expand All @@ -494,6 +534,7 @@ func (c *Constraint) ConsolidateSpans(evalCtx *tree.EvalContext) {
result.Append(sp)
}
}
lastSpanIsLocal = spanIsLocal
}
if result.Count() != 0 {
c.Spans = result
Expand Down
113 changes: 112 additions & 1 deletion pkg/sql/opt/constraint/constraint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/partition"
"github.com/cockroachdb/cockroach/pkg/sql/opt/testutils/testcat"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

Expand Down Expand Up @@ -512,14 +515,122 @@ func TestConsolidateSpans(t *testing.T) {
spans := parseSpans(&evalCtx, tc.s)
var c Constraint
c.Init(kc, &spans)
c.ConsolidateSpans(kc.EvalCtx)
c.ConsolidateSpans(kc.EvalCtx, nil)
if res := c.Spans.String(); res != tc.e {
t.Errorf("expected %s got %s", tc.e, res)
}
})
}
}

func TestConsolidateLocalAndRemoteSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)

testData := []struct {
spanInputs string
// expected value
expected string

// Partition Spans
// The start key of each span defines the PARTITION BY LIST value of a
// single partition, one span per partition.
// The end key is ignored.
partitionSpans string

// Partition Localities. true == local, false == remote
// There must be the same number of entries as partition spans.
localities []bool
}{
{
partitionSpans: "[/1 - /1] [/3 - /3] [/7 - /9]",
localities: []bool{true, false, false},
spanInputs: "[/1/2 - /1/3] [/1/4 - /6] [/7 - /9]",
expected: "[/1/2 - /1/3] [/1/4 - /9]",
},
// TODO(msirek): This result is expected to change if span-based
// partition locality checking is enabled, in which case
// the span [/3 - /4] would not get consolidated.
{
partitionSpans: "[/3 - /3] [/4 - /4]",
localities: []bool{true, true},
spanInputs: "[/1 - /2] [/3 - /4] [/7 - /9]",
expected: "[/1 - /4] [/7 - /9]",
},
{
partitionSpans: "[/1/2 - /1/2] [/1/4 - /1/4] [/1 - /1]",
localities: []bool{true, true, false},
spanInputs: "[/1/2 - /1/4] [/1/5 - /5]",
expected: "[/1/2 - /5]",
},
// TODO(msirek): This result is expected to change when span-based
// partition locality checking is enabled.
{
partitionSpans: "[/1/2 - /1/2] [/1/3 - /1/3] [/1/4 - /1/4] [/1 - /1]",
localities: []bool{true, true, true, false},
spanInputs: "[/1/2 - /1/4] [/1/5 - /5]",
expected: "[/1/2 - /5]",
},
// TODO(msirek): This result is expected to change when span-based
// partition locality checking is enabled.
{
partitionSpans: "[/1/2 - /1/2] [/1/3 - /1/3] [/1/4 - /1/4] [/1/2/3 - /1/2/3]",
localities: []bool{true, true, true, false},
spanInputs: "[/1/2 - /1/4] [/1/5 - /5]",
expected: "[/1/2 - /5]",
},
{
partitionSpans: "[/1/2/3 - /1/2/3] [/9 - /9]",
localities: []bool{true, false},
spanInputs: "[/1/2/1 - /1/2/2] [/1/2/3 - /1/2/3] [/1/2/4 - /9]",
expected: "[/1/2/1 - /1/2/2] [/1/2/3 - /1/2/3] [/1/2/4 - /9]",
},
}

kc := testKeyContext(1, 2, 3)
for i, tc := range testData {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
// Read the partitionSpans and localities entries to make an index that
// only has the partitions and ps (PrefixSorter) elements populated.
partitionSpans := parseSpans(&evalCtx, tc.partitionSpans)
partitions := make([]testcat.Partition, partitionSpans.Count())
localPartitions := util.FastIntSet{}
for j := 0; j < partitionSpans.Count(); j++ {
span := partitionSpans.Get(j)
spanDatums := make([]tree.Datums, 1)
datumSlice := make(tree.Datums, span.StartKey().Length())
for k := 0; k < span.StartKey().Length(); k++ {
datumSlice[k] = span.StartKey().Value(k)
}
spanDatums[0] = datumSlice
partitions[j] = testcat.Partition{}
partitions[j].SetDatums(spanDatums)

if tc.localities[j] {
localPartitions.Add(j)
}
}

// Make the index
index := &testcat.Index{}
index.SetPartitions(partitions)
// Make the PrefixSorter.
ps := partition.GetSortedPrefixes(index, localPartitions, &evalCtx)

// Run the test.
spans := parseSpans(&evalCtx, tc.spanInputs)
var c Constraint
c.Init(kc, &spans)
c.ConsolidateSpans(kc.EvalCtx, ps)
if res := c.Spans.String(); res != tc.expected {
t.Errorf("expected %s got %s", tc.expected, res)
}
})
}

}

func TestExactPrefix(t *testing.T) {
defer leaktest.AfterTest(t)()
st := cluster.MakeTestingClusterSettings()
Expand Down
136 changes: 136 additions & 0 deletions pkg/sql/opt/constraint/locality.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package constraint

import (
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/sql/opt/partition"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

// compare compares the key prefix in prefixInfo with the span prefix. The key
// prefix is considered less than the span prefix if it is longer than the
// span prefix, or if it sorts less according to the Datum.Compare interface.
func compare(prefixInfo partition.PrefixIsLocal, span *Span, ps *partition.PrefixSorter) int {
prefix := prefixInfo.Prefix
prefixLength := len(prefix)
spanPrefixLength := span.Prefix(ps.EvalCtx)
// Longer prefixes sort before shorter ones.
// The span prefix is allowed to be longer than the partition prefix and still
// match.
if prefixLength > spanPrefixLength {
return -1
}

// Look for an exact match on the shared prefix.
for k, datum := range prefix {
compareResult := datum.Compare(ps.EvalCtx, span.StartKey().Value(k))
if compareResult != 0 {
return compareResult
}
}
return 0
}

// searchPrefixes searches a sorted slice of PrefixIsLocals in ps for a full
// match on all Datums in the Prefix with the given span, and returns the index
// of the match, or -1 if there is no match.
// The slice must be sorted in ascending order, with longer prefixes sorting
// before short prefixes and sorted by prefix values within each group
// of equal-length prefixes.
// Each equal-length prefix group is searched separately because there could be
// more than one possible match for a given span, and we want to match the
// longest-length prefix possible, because that reflects the actual locality
// of the span's owning range.
// If prefixSearchUpperBound is non-negative, only equal-length prefix groups of
// length prefixSearchUpperBound or less will be searched. A negative value for
// prefixSearchUpperBound means the same as passing the max upper bound of
// math.MaxInt32. A zero value for prefixSearchUpperBound means only match on
// the DEFAULT partition, which has a zero-length prefix.
func searchPrefixes(span *Span, ps *partition.PrefixSorter, prefixSearchUpperBound int) int {
if prefixSearchUpperBound < 0 {
prefixSearchUpperBound = math.MaxInt32
}
spanPrefix := span.Prefix(ps.EvalCtx)
i := 0
// Get the first slice in the PrefixSorter
prefixSlice, startIndex, ok := ps.Slice(i)

// return 'prefix >= span' result
matchFunction := func(i int) bool {
prefix := prefixSlice[i].Prefix
// For nonzero-length partition prefixes, the span prefix must be at least
// as long as it in order to match, whereas zero-length default partitions
// match anything.
if len(prefix) > spanPrefix {
return false
} else if len(prefix) == 0 {
return true
}

for k, datum := range prefix {
compareResult := datum.Compare(ps.EvalCtx, span.StartKey().Value(k))
if compareResult != 0 {
return compareResult > 0
}
}
return true
}

for ; ok; prefixSlice, startIndex, ok = ps.Slice(i) {
i++

if len(prefixSlice[0].Prefix) > prefixSearchUpperBound {
continue
}

// Binary search for matching entry or insertion point in the prefix slices.
index := sort.Search(len(prefixSlice), matchFunction)
if index >= len(prefixSlice) {
continue
}
// Need to requalify for equality because we might have just found an
// insertion point instead of an actual match.
if compare(prefixSlice[index], span, ps) == 0 {
return index + startIndex
}
}
return -1
}

// FindMatch finds the Entry in PrefixSorter which matches the span prefix on a
// prefix subset of its keys, including a zero-length match in the case of the
// DEFAULT partition.
func FindMatch(span *Span, ps *partition.PrefixSorter) (match *partition.PrefixIsLocal, ok bool) {
index := searchPrefixes(span, ps, math.MaxInt32 /* prefixSearchUpperBound*/)
if index == -1 {
return nil, false
}
return &ps.Entry[index], true
}

// FindMatchOnSingleColumn finds the Entry in PrefixSorter with a prefix length
// of 1 or less which matches the span prefix, including a zero-length match in
// the case of the DEFAULT partition.
func FindMatchOnSingleColumn(
datum tree.Datum, ps *partition.PrefixSorter,
) (match *partition.PrefixIsLocal, ok bool) {
sp := &Span{}
key := Key{firstVal: datum}
sp.Init(key, IncludeBoundary, key, IncludeBoundary)
index := searchPrefixes(sp, ps, 1 /* prefixSearchUpperBound */)
if index == -1 {
return nil, false
}
return &ps.Entry[index], true
}
1 change: 1 addition & 0 deletions pkg/sql/opt/idxconstraint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/sql/opt/constraint",
"//pkg/sql/opt/memo",
"//pkg/sql/opt/norm",
"//pkg/sql/opt/partition",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
Expand Down
Loading

0 comments on commit 3350612

Please sign in to comment.