From 335061293a1a1515cf9a1a2d59899c2d3a05b9a1 Mon Sep 17 00:00:00 2001 From: Mark Sirek Date: Fri, 14 Jan 2022 03:15:36 -0800 Subject: [PATCH] opt: locality optimized scan for queries with a LIMIT clause This commit adds locality optimized scan support for queries which place a hard limit on the number of rows returned via the LIMIT clause. This optimization benefits tables with REGIONAL BY ROW locality by splitting the spans accessed into a local spans set and a remote spans set, combined via a UNION ALL operation where each branch of the UNION ALL has the same hard limit as the original SELECT query block. If the limit is reached by scanning just the local spans, then latency is improved. The optimization is not applied if the LIMIT is more than the KV batch size of 100000 rows or if the number of spans in the scan exceeds 10000. This commit also adds an improvement to span merging to avoid merging local spans with remote spans in order to maximize the number of queries that can utilize locality optimized scan. Informs #64862 Release note (Performance Improvement): Queries with a LIMIT clause applied against a single table, either explicitly written, or implicit such as in an uncorrelated EXISTS subquery, now scan that table with improved latency if the table is defined with LOCALITY REGIONAL BY ROW and the number of qualified rows residing in the local region is less than or equal to the hard limit (sum of the LIMIT clause and optional OFFSET clause values). This optimization is only applied if the hard limit is 100000 or less. --- .../logic_test/regional_by_row_query_behavior | 592 ++++++++++++++++++ pkg/sql/opt/BUILD.bazel | 1 + pkg/sql/opt/constraint/BUILD.bazel | 4 + pkg/sql/opt/constraint/constraint.go | 45 +- pkg/sql/opt/constraint/constraint_test.go | 113 +++- pkg/sql/opt/constraint/locality.go | 136 ++++ pkg/sql/opt/idxconstraint/BUILD.bazel | 1 + .../opt/idxconstraint/index_constraints.go | 4 +- .../idxconstraint/index_constraints_test.go | 4 +- .../opt/invertedidx/inverted_index_expr.go | 3 +- pkg/sql/opt/metadata.go | 15 +- pkg/sql/opt/partition/BUILD.bazel | 14 + pkg/sql/opt/partition/locality.go | 272 ++++++++ pkg/sql/opt/table_meta.go | 46 +- pkg/sql/opt/testutils/testcat/test_catalog.go | 10 + pkg/sql/opt/xform/BUILD.bazel | 2 + pkg/sql/opt/xform/general_funcs.go | 154 +---- pkg/sql/opt/xform/general_funcs_test.go | 3 +- pkg/sql/opt/xform/join_funcs.go | 69 +- pkg/sql/opt/xform/scan_funcs.go | 228 ++++--- pkg/sql/opt/xform/select_funcs.go | 286 +++++---- pkg/sql/opt/xform/testdata/rules/scan | 460 +++++++++++++- pkg/sql/sem/tree/eval.go | 8 + 23 files changed, 2056 insertions(+), 414 deletions(-) create mode 100644 pkg/sql/opt/constraint/locality.go create mode 100644 pkg/sql/opt/partition/BUILD.bazel create mode 100644 pkg/sql/opt/partition/locality.go diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior index 5d158dbb0b03..61604a8ad3af 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior @@ -2259,3 +2259,595 @@ query I SELECT * FROM t73024 ---- 100 + +############################################## +# Locality optimized scans with LIMIT clause # +############################################## +# In this section we are checking expected results of queries similar to those +# in pkg/sql/opt/xform/testdata/rules/scan, where query plans and rule firing +# is checked. + +statement ok +SET database = multi_region_test_db + +# LIMIT clause enables locality optimized scan on a REGIONAL BY ROW table +query T +SELECT * FROM [ +EXPLAIN SELECT + pk, pk2, a, b, crdb_region +FROM + regional_by_row_table +LIMIT + 1] +---- +distribution: local +vectorized: true +· +• union all +│ limit: 1 +│ +├── • scan +│ missing stats +│ table: regional_by_row_table@regional_by_row_table_pkey +│ spans: [/'ap-southeast-2' - /'ap-southeast-2'] +│ limit: 1 +│ +└── • scan + missing stats + table: regional_by_row_table@regional_by_row_table_pkey + spans: [/'ca-central-1' - /'us-east-1'] + limit: 1 + +query IIIIT +SELECT + pk, pk2, a, b, crdb_region +FROM + regional_by_row_table +LIMIT + 1 +---- +1 1 2 3 ap-southeast-2 + +## Need to scan to remote region. Only 2 rows in local region. +query IIIIT +SELECT + pk, pk2, a, b, crdb_region +FROM + regional_by_row_table +LIMIT + 1 +OFFSET + 2 +---- +6 6 5 -5 ca-central-1 + +query IIII +SELECT + pk, pk2, a, b +FROM + regional_by_row_table AS a +WHERE + pk + IN (SELECT pk FROM regional_by_row_table AS b LIMIT 3) +---- +1 1 2 3 +4 4 5 6 +6 6 5 -5 + +query II +SELECT + * +FROM + child +WHERE + EXISTS(SELECT * FROM parent) +LIMIT + 3 +---- +10 10 +20 20 +30 30 + +query II +SELECT + * +FROM + child +WHERE + NOT EXISTS(SELECT * FROM parent) +LIMIT + 3 +---- + +# Test partitioning on an index column +statement ok +CREATE TABLE regional_by_row_table_as4 ( + pk + INT8 PRIMARY KEY, + a + INT8, + crdb_region_col + crdb_internal_region + AS ( + CASE + WHEN (a % 3) = 0 THEN 'ap-southeast-2' + WHEN (a % 3) = 1 THEN 'ca-central-1' + ELSE 'us-east-1' + END + ) VIRTUAL + NOT NULL, + INDEX a_idx (a), + FAMILY (pk, a) +) + LOCALITY REGIONAL BY ROW AS crdb_region_col + +statement ok +INSERT +INTO + regional_by_row_table_as4 +SELECT + g, g +FROM + ROWS FROM (generate_series(1, 1000)) AS g (g) + +# Locality optimized scan with a range query +query I nodeidx=0 +SET database = multi_region_test_db; +SELECT + count(*) +FROM + ( + SELECT + * + FROM + regional_by_row_table_as4@a_idx + WHERE + a BETWEEN 1 AND 100 + LIMIT + 10 + ) +---- +10 + +statement ok +SET vectorize = "on" + +statement ok nodeidx=0 +SET database = multi_region_test_db; +SET TRACING = "on", kv, results; +SELECT + count(*) +FROM + ( + SELECT + * + FROM + regional_by_row_table_as4@a_idx + WHERE + a BETWEEN 1 AND 100 + LIMIT + 10 + ); +SET TRACING = off + +# If the rows are found in the local region, the other regions are not +# searched. +query T +SELECT + message +FROM + [SHOW KV TRACE FOR SESSION] WITH ORDINALITY +WHERE + message LIKE 'fetched:%' OR message LIKE 'output row%' +ORDER BY + "ordinality" ASC +---- +fetched: /regional_by_row_table_as4/a_idx/?/3/? -> +fetched: /regional_by_row_table_as4/a_idx/?/6/? -> +fetched: /regional_by_row_table_as4/a_idx/?/9/? -> +fetched: /regional_by_row_table_as4/a_idx/?/12/? -> +fetched: /regional_by_row_table_as4/a_idx/?/15/? -> +fetched: /regional_by_row_table_as4/a_idx/?/18/? -> +fetched: /regional_by_row_table_as4/a_idx/?/21/? -> +fetched: /regional_by_row_table_as4/a_idx/?/24/? -> +fetched: /regional_by_row_table_as4/a_idx/?/27/? -> +fetched: /regional_by_row_table_as4/a_idx/?/30/? -> +output row: [10] + +statement ok +SET vectorize = off + +statement ok +SET TRACING = "on", kv, results; +SELECT + count(*) +FROM + ( + SELECT + * + FROM + regional_by_row_table_as4@a_idx + WHERE + a BETWEEN 1 AND 100 + LIMIT + 10 + ); +SET TRACING = off + +statement ok +RESET vectorize + +# If the rows are found in the local region, the other regions are not searched. +query T +SELECT + message +FROM + [SHOW KV TRACE FOR SESSION] WITH ORDINALITY +WHERE + message LIKE 'fetched:%' OR message LIKE 'output row%' +ORDER BY + "ordinality" ASC +---- +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/3/3 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/6/6 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/9/9 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/12/12 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/15/15 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/18/18 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/21/21 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/24/24 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/27/27 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/30/30 -> +output row: [10] + +statement ok +SET vectorize = "on" + +# Locality optimized scan with an IN list +query T +SELECT + * +FROM + [ + EXPLAIN (OPT) + SELECT + count(*) + FROM + ( + SELECT + * + FROM + regional_by_row_table_as4@a_idx + WHERE + a + IN (1, 2, 4, 5, 6, 8, 10, 11, 12, 14, 15, 16, 17, 18, + 18, 19, 22, 23, 24, 25, 28, 30, 33, 34, 39, 40) + LIMIT + 5 + ) + ] +OFFSET + 2 +---- + │ ├── scan regional_by_row_table_as4@a_idx + │ │ ├── constraint: /11/10/9 + │ │ │ ├── [/'ap-southeast-2'/1 - /'ap-southeast-2'/2] + │ │ │ ├── [/'ap-southeast-2'/4 - /'ap-southeast-2'/6] + │ │ │ ├── [/'ap-southeast-2'/8 - /'ap-southeast-2'/8] + │ │ │ ├── [/'ap-southeast-2'/10 - /'ap-southeast-2'/12] + │ │ │ ├── [/'ap-southeast-2'/14 - /'ap-southeast-2'/19] + │ │ │ ├── [/'ap-southeast-2'/22 - /'ap-southeast-2'/25] + │ │ │ ├── [/'ap-southeast-2'/28 - /'ap-southeast-2'/28] + │ │ │ ├── [/'ap-southeast-2'/30 - /'ap-southeast-2'/30] + │ │ │ ├── [/'ap-southeast-2'/33 - /'ap-southeast-2'/34] + │ │ │ └── [/'ap-southeast-2'/39 - /'ap-southeast-2'/40] + │ │ ├── limit: 5 + │ │ └── flags: force-index=a_idx + │ └── scan regional_by_row_table_as4@a_idx + │ ├── constraint: /16/15/14 + │ │ ├── [/'ca-central-1'/1 - /'ca-central-1'/2] + │ │ ├── [/'ca-central-1'/4 - /'ca-central-1'/6] + │ │ ├── [/'ca-central-1'/8 - /'ca-central-1'/8] + │ │ ├── [/'ca-central-1'/10 - /'ca-central-1'/12] + │ │ ├── [/'ca-central-1'/14 - /'ca-central-1'/19] + │ │ ├── [/'ca-central-1'/22 - /'ca-central-1'/25] + │ │ ├── [/'ca-central-1'/28 - /'ca-central-1'/28] + │ │ ├── [/'ca-central-1'/30 - /'ca-central-1'/30] + │ │ ├── [/'ca-central-1'/33 - /'ca-central-1'/34] + │ │ ├── [/'ca-central-1'/39 - /'ca-central-1'/40] + │ │ ├── [/'us-east-1'/1 - /'us-east-1'/2] + │ │ ├── [/'us-east-1'/4 - /'us-east-1'/6] + │ │ ├── [/'us-east-1'/8 - /'us-east-1'/8] + │ │ ├── [/'us-east-1'/10 - /'us-east-1'/12] + │ │ ├── [/'us-east-1'/14 - /'us-east-1'/19] + │ │ ├── [/'us-east-1'/22 - /'us-east-1'/25] + │ │ ├── [/'us-east-1'/28 - /'us-east-1'/28] + │ │ ├── [/'us-east-1'/30 - /'us-east-1'/30] + │ │ ├── [/'us-east-1'/33 - /'us-east-1'/34] + │ │ └── [/'us-east-1'/39 - /'us-east-1'/40] + │ ├── limit: 5 + │ └── flags: force-index=a_idx + └── aggregations + └── count-rows + +statement ok +SET TRACING = "on", kv, results; +SELECT + count(*) +FROM + ( + SELECT + * + FROM + regional_by_row_table_as4@a_idx + WHERE + a + IN (1, 2, 4, 5, 6, 8, 10, 11, 12, 14, 15, 16, 17, 18, + 18, 19, 22, 23, 24, 25, 28, 30, 33, 34, 39, 40) + LIMIT + 5 + ); +SET TRACING = off + +statement ok +RESET vectorize + +# If the rows are found in the local region, the other regions are not searched. +query T +SELECT + message +FROM + [SHOW KV TRACE FOR SESSION] WITH ORDINALITY +WHERE + message LIKE 'fetched:%' OR message LIKE 'output row%' +ORDER BY + "ordinality" ASC +---- +fetched: /regional_by_row_table_as4/a_idx/?/6/? -> +fetched: /regional_by_row_table_as4/a_idx/?/12/? -> +fetched: /regional_by_row_table_as4/a_idx/?/15/? -> +fetched: /regional_by_row_table_as4/a_idx/?/18/? -> +fetched: /regional_by_row_table_as4/a_idx/?/24/? -> +output row: [5] + +statement ok +SET vectorize = off + +statement ok +SET TRACING = "on", kv, results; +SELECT + count(*) +FROM + ( + SELECT + * + FROM + regional_by_row_table_as4@a_idx + WHERE + a + IN (1, 2, 4, 5, 6, 8, 10, 11, 12, 14, 15, 16, 17, 18, + 18, 19, 22, 23, 24, 25, 28, 30, 33, 34, 39, 40) + LIMIT + 5 + ); +SET TRACING = off + +statement ok +RESET vectorize + +# If the rows are found in the local region, the other regions are not searched. +query T +SELECT + message +FROM + [SHOW KV TRACE FOR SESSION] WITH ORDINALITY +WHERE + message LIKE 'fetched:%' OR message LIKE 'output row%' +ORDER BY + "ordinality" ASC +---- +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/6/6 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/12/12 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/15/15 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/18/18 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/24/24 -> +output row: [5] + +statement ok +SET vectorize = "on" + +# Locality optimized scan with multiple range predicates +query T +SELECT + * +FROM + [ + EXPLAIN (OPT) + SELECT + count(*) + FROM + ( + SELECT + * + FROM + regional_by_row_table_as4@a_idx + WHERE + a BETWEEN -1 AND 10 + OR a BETWEEN 100 AND 110 + OR a BETWEEN 990 AND 1010 + LIMIT + 9 + ) + ] +OFFSET + 2 +---- + │ ├── scan regional_by_row_table_as4@a_idx + │ │ ├── constraint: /11/10/9 + │ │ │ ├── [/'ap-southeast-2'/-1 - /'ap-southeast-2'/10] + │ │ │ ├── [/'ap-southeast-2'/100 - /'ap-southeast-2'/110] + │ │ │ └── [/'ap-southeast-2'/990 - /'ap-southeast-2'/1010] + │ │ ├── limit: 9 + │ │ └── flags: force-index=a_idx + │ └── scan regional_by_row_table_as4@a_idx + │ ├── constraint: /16/15/14 + │ │ ├── [/'ca-central-1'/-1 - /'ca-central-1'/10] + │ │ ├── [/'ca-central-1'/100 - /'ca-central-1'/110] + │ │ ├── [/'ca-central-1'/990 - /'ca-central-1'/1010] + │ │ ├── [/'us-east-1'/-1 - /'us-east-1'/10] + │ │ ├── [/'us-east-1'/100 - /'us-east-1'/110] + │ │ └── [/'us-east-1'/990 - /'us-east-1'/1010] + │ ├── limit: 9 + │ └── flags: force-index=a_idx + └── aggregations + └── count-rows + +statement ok +SET TRACING = "on", kv, results; +SELECT + count(*) +FROM + ( + SELECT + * + FROM + regional_by_row_table_as4@a_idx + WHERE + a BETWEEN -1 AND 10 + OR a BETWEEN 100 AND 110 + OR a BETWEEN 990 AND 1010 + LIMIT + 9 + ); +SET TRACING = off + +statement ok +RESET vectorize + +# If the rows are found in the local region, the other regions are not searched. +query T +SELECT + message +FROM + [SHOW KV TRACE FOR SESSION] WITH ORDINALITY +WHERE + message LIKE 'fetched:%' OR message LIKE 'output row%' +ORDER BY + "ordinality" ASC +---- +fetched: /regional_by_row_table_as4/a_idx/?/3/? -> +fetched: /regional_by_row_table_as4/a_idx/?/6/? -> +fetched: /regional_by_row_table_as4/a_idx/?/9/? -> +fetched: /regional_by_row_table_as4/a_idx/?/102/? -> +fetched: /regional_by_row_table_as4/a_idx/?/105/? -> +fetched: /regional_by_row_table_as4/a_idx/?/108/? -> +fetched: /regional_by_row_table_as4/a_idx/?/990/? -> +fetched: /regional_by_row_table_as4/a_idx/?/993/? -> +fetched: /regional_by_row_table_as4/a_idx/?/996/? -> +output row: [9] + +statement ok +SET vectorize = off + +statement ok +SET TRACING = "on", kv, results; +SELECT + count(*) +FROM + ( + SELECT + * + FROM + regional_by_row_table_as4@a_idx + WHERE + a BETWEEN -1 AND 10 + OR a BETWEEN 100 AND 110 + OR a BETWEEN 990 AND 1010 + LIMIT + 9 + ); +SET TRACING = off + +statement ok +RESET vectorize + +# If the rows are found in the local region, the other regions are not searched. +query T +SELECT + message +FROM + [SHOW KV TRACE FOR SESSION] WITH ORDINALITY +WHERE + message LIKE 'fetched:%' OR message LIKE 'output row%' +ORDER BY + "ordinality" ASC +---- +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/3/3 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/6/6 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/9/9 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/102/102 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/105/105 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/108/108 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/990/990 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/993/993 -> +fetched: /regional_by_row_table_as4/a_idx/'ap-southeast-2'/996/996 -> +output row: [9] + +statement ok +RESET vectorize + +# REGIONAL BY ROW AS table with an explicit crdb_internal_region check +# constraint. +statement ok +CREATE TABLE regional_by_row_table_as1 ( + pk int PRIMARY KEY, + a int, + b int, + crdb_region_col1 crdb_internal_region NOT NULL AS ( + CASE + WHEN pk <= 10 THEN 'ca-central-1' + ELSE 'us-east-1' + END + ) VIRTUAL CHECK(crdb_region_col1 BETWEEN 'ap-southeast-2' AND 'us-east-1'), + crdb_region_col crdb_internal_region NOT NULL AS ( + CASE + WHEN pk <= 1 THEN 'ca-central-1' + ELSE 'us-east-1' + END + ) VIRTUAL, + INDEX (a), + UNIQUE (b), + FAMILY (pk, a, b) +) LOCALITY REGIONAL BY ROW AS crdb_region_col1 + +statement ok +INSERT INTO regional_by_row_table_as1 (pk) VALUES (1), (2), (3), (10), (20) + +# An extra crdb_region check constraint should still allow locality optimized scan. +query T +SELECT * FROM [EXPLAIN SELECT * FROM regional_by_row_table_as1 LIMIT 3] OFFSET 2 +---- +· +• render +│ +└── • union all + │ limit: 3 + │ + ├── • scan + │ missing stats + │ table: regional_by_row_table_as1@regional_by_row_table_as1_pkey + │ spans: [/'ap-southeast-2' - /'ap-southeast-2'] + │ limit: 3 + │ + └── • scan + missing stats + table: regional_by_row_table_as1@regional_by_row_table_as1_pkey + spans: [/'ca-central-1' - /'us-east-1'] + limit: 3 + +query IIITT +SELECT pk, a, b, crdb_region_col, crdb_region_col1 FROM regional_by_row_table_as1 LIMIT 3 +---- +1 NULL NULL ca-central-1 ca-central-1 +2 NULL NULL us-east-1 ca-central-1 +3 NULL NULL us-east-1 ca-central-1 diff --git a/pkg/sql/opt/BUILD.bazel b/pkg/sql/opt/BUILD.bazel index 458a4135921e..8b43cc41f661 100644 --- a/pkg/sql/opt/BUILD.bazel +++ b/pkg/sql/opt/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/server/telemetry", "//pkg/sql/catalog/colinfo", "//pkg/sql/opt/cat", + "//pkg/sql/opt/partition", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/privilege", diff --git a/pkg/sql/opt/constraint/BUILD.bazel b/pkg/sql/opt/constraint/BUILD.bazel index 7a380c03b2b7..28e3c5c419eb 100644 --- a/pkg/sql/opt/constraint/BUILD.bazel +++ b/pkg/sql/opt/constraint/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "constraint_set.go", "key.go", "key_extension.go", + "locality.go", "span.go", "spans.go", "testutils.go", @@ -16,6 +17,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/sql/opt", + "//pkg/sql/opt/partition", "//pkg/sql/sem/tree", "//pkg/sql/types", "@com_github_cockroachdb_errors//:errors", @@ -38,9 +40,11 @@ go_test( "//pkg/settings/cluster", "//pkg/sql/catalog/typedesc", "//pkg/sql/opt", + "//pkg/sql/opt/partition", "//pkg/sql/opt/testutils/testcat", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/util", "//pkg/util/encoding", "//pkg/util/leaktest", "//pkg/util/randutil", diff --git a/pkg/sql/opt/constraint/constraint.go b/pkg/sql/opt/constraint/constraint.go index 02aaa3b18386..b094577dc70d 100644 --- a/pkg/sql/opt/constraint/constraint.go +++ b/pkg/sql/opt/constraint/constraint.go @@ -14,6 +14,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/sql/opt" + "github.com/cockroachdb/cockroach/pkg/sql/opt/partition" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/errors" ) @@ -471,14 +472,53 @@ func (c *Constraint) Combine(evalCtx *tree.EvalContext, other *Constraint) { // ConsolidateSpans merges spans that have consecutive boundaries. For example: // [/1 - /2] [/3 - /4] becomes [/1 - /4]. -func (c *Constraint) ConsolidateSpans(evalCtx *tree.EvalContext) { +// An optional PrefixSorter parameter describes the localities of partitions in +// the index for which the Constraint is being built. Spans belonging to 100% +// local partitions will not be consolidated with spans that overlap any remote +// row ranges. A local row range is one whose leaseholder region preference is +// the same region as the gateway region. +func (c *Constraint) ConsolidateSpans(evalCtx *tree.EvalContext, ps *partition.PrefixSorter) { keyCtx := KeyContext{Columns: c.Columns, EvalCtx: evalCtx} var result Spans + + if c.Spans.Count() < 1 { + return + } + indexHasLocalAndRemoteParts := ps != nil + spanIsLocal, lastSpanIsLocal, localRemoteCrossover := false, false, false + + // Initializations for the first span so we avoid putting a conditional in the + // below 'for' loop + if indexHasLocalAndRemoteParts { + last := c.Spans.Get(0) + if match, ok := FindMatch(last, ps); ok { + if match.IsLocal { + lastSpanIsLocal = true + } + } + } + for i := 1; i < c.Spans.Count(); i++ { last := c.Spans.Get(i - 1) sp := c.Spans.Get(i) + if indexHasLocalAndRemoteParts { + spanIsLocal = false + if match, ok := FindMatch(sp, ps); ok { + if match.IsLocal { + spanIsLocal = true + } + } + // If last span is in the local gateway region and the current span is + // not, or vice versa, save this info so we don't combine these spans. + localRemoteCrossover = spanIsLocal != lastSpanIsLocal + } + // Do not merge local spans with remote spans because a span must be 100% + // local in order to utilize locality optimized search. + // An example query on a LOCALITY REGIONAL BY ROW table which this + // benefits is: + // SELECT * FROM regional_by_row_table WHERE pk <> 4 LIMIT 3; if last.endBoundary == IncludeBoundary && sp.startBoundary == IncludeBoundary && - sp.start.IsNextKey(&keyCtx, last.end) { + sp.start.IsNextKey(&keyCtx, last.end) && !localRemoteCrossover { // We only initialize `result` if we need to change something. if result.Count() == 0 { result.Alloc(c.Spans.Count() - 1) @@ -494,6 +534,7 @@ func (c *Constraint) ConsolidateSpans(evalCtx *tree.EvalContext) { result.Append(sp) } } + lastSpanIsLocal = spanIsLocal } if result.Count() != 0 { c.Spans = result diff --git a/pkg/sql/opt/constraint/constraint_test.go b/pkg/sql/opt/constraint/constraint_test.go index 30f2a6dbf312..b59100780cb2 100644 --- a/pkg/sql/opt/constraint/constraint_test.go +++ b/pkg/sql/opt/constraint/constraint_test.go @@ -16,7 +16,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/opt" + "github.com/cockroachdb/cockroach/pkg/sql/opt/partition" + "github.com/cockroachdb/cockroach/pkg/sql/opt/testutils/testcat" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" ) @@ -512,7 +515,7 @@ func TestConsolidateSpans(t *testing.T) { spans := parseSpans(&evalCtx, tc.s) var c Constraint c.Init(kc, &spans) - c.ConsolidateSpans(kc.EvalCtx) + c.ConsolidateSpans(kc.EvalCtx, nil) if res := c.Spans.String(); res != tc.e { t.Errorf("expected %s got %s", tc.e, res) } @@ -520,6 +523,114 @@ func TestConsolidateSpans(t *testing.T) { } } +func TestConsolidateLocalAndRemoteSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + st := cluster.MakeTestingClusterSettings() + evalCtx := tree.MakeTestingEvalContext(st) + + testData := []struct { + spanInputs string + // expected value + expected string + + // Partition Spans + // The start key of each span defines the PARTITION BY LIST value of a + // single partition, one span per partition. + // The end key is ignored. + partitionSpans string + + // Partition Localities. true == local, false == remote + // There must be the same number of entries as partition spans. + localities []bool + }{ + { + partitionSpans: "[/1 - /1] [/3 - /3] [/7 - /9]", + localities: []bool{true, false, false}, + spanInputs: "[/1/2 - /1/3] [/1/4 - /6] [/7 - /9]", + expected: "[/1/2 - /1/3] [/1/4 - /9]", + }, + // TODO(msirek): This result is expected to change if span-based + // partition locality checking is enabled, in which case + // the span [/3 - /4] would not get consolidated. + { + partitionSpans: "[/3 - /3] [/4 - /4]", + localities: []bool{true, true}, + spanInputs: "[/1 - /2] [/3 - /4] [/7 - /9]", + expected: "[/1 - /4] [/7 - /9]", + }, + { + partitionSpans: "[/1/2 - /1/2] [/1/4 - /1/4] [/1 - /1]", + localities: []bool{true, true, false}, + spanInputs: "[/1/2 - /1/4] [/1/5 - /5]", + expected: "[/1/2 - /5]", + }, + // TODO(msirek): This result is expected to change when span-based + // partition locality checking is enabled. + { + partitionSpans: "[/1/2 - /1/2] [/1/3 - /1/3] [/1/4 - /1/4] [/1 - /1]", + localities: []bool{true, true, true, false}, + spanInputs: "[/1/2 - /1/4] [/1/5 - /5]", + expected: "[/1/2 - /5]", + }, + // TODO(msirek): This result is expected to change when span-based + // partition locality checking is enabled. + { + partitionSpans: "[/1/2 - /1/2] [/1/3 - /1/3] [/1/4 - /1/4] [/1/2/3 - /1/2/3]", + localities: []bool{true, true, true, false}, + spanInputs: "[/1/2 - /1/4] [/1/5 - /5]", + expected: "[/1/2 - /5]", + }, + { + partitionSpans: "[/1/2/3 - /1/2/3] [/9 - /9]", + localities: []bool{true, false}, + spanInputs: "[/1/2/1 - /1/2/2] [/1/2/3 - /1/2/3] [/1/2/4 - /9]", + expected: "[/1/2/1 - /1/2/2] [/1/2/3 - /1/2/3] [/1/2/4 - /9]", + }, + } + + kc := testKeyContext(1, 2, 3) + for i, tc := range testData { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + // Read the partitionSpans and localities entries to make an index that + // only has the partitions and ps (PrefixSorter) elements populated. + partitionSpans := parseSpans(&evalCtx, tc.partitionSpans) + partitions := make([]testcat.Partition, partitionSpans.Count()) + localPartitions := util.FastIntSet{} + for j := 0; j < partitionSpans.Count(); j++ { + span := partitionSpans.Get(j) + spanDatums := make([]tree.Datums, 1) + datumSlice := make(tree.Datums, span.StartKey().Length()) + for k := 0; k < span.StartKey().Length(); k++ { + datumSlice[k] = span.StartKey().Value(k) + } + spanDatums[0] = datumSlice + partitions[j] = testcat.Partition{} + partitions[j].SetDatums(spanDatums) + + if tc.localities[j] { + localPartitions.Add(j) + } + } + + // Make the index + index := &testcat.Index{} + index.SetPartitions(partitions) + // Make the PrefixSorter. + ps := partition.GetSortedPrefixes(index, localPartitions, &evalCtx) + + // Run the test. + spans := parseSpans(&evalCtx, tc.spanInputs) + var c Constraint + c.Init(kc, &spans) + c.ConsolidateSpans(kc.EvalCtx, ps) + if res := c.Spans.String(); res != tc.expected { + t.Errorf("expected %s got %s", tc.expected, res) + } + }) + } + +} + func TestExactPrefix(t *testing.T) { defer leaktest.AfterTest(t)() st := cluster.MakeTestingClusterSettings() diff --git a/pkg/sql/opt/constraint/locality.go b/pkg/sql/opt/constraint/locality.go new file mode 100644 index 000000000000..10e1b18af664 --- /dev/null +++ b/pkg/sql/opt/constraint/locality.go @@ -0,0 +1,136 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package constraint + +import ( + "math" + "sort" + + "github.com/cockroachdb/cockroach/pkg/sql/opt/partition" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" +) + +// compare compares the key prefix in prefixInfo with the span prefix. The key +// prefix is considered less than the span prefix if it is longer than the +// span prefix, or if it sorts less according to the Datum.Compare interface. +func compare(prefixInfo partition.PrefixIsLocal, span *Span, ps *partition.PrefixSorter) int { + prefix := prefixInfo.Prefix + prefixLength := len(prefix) + spanPrefixLength := span.Prefix(ps.EvalCtx) + // Longer prefixes sort before shorter ones. + // The span prefix is allowed to be longer than the partition prefix and still + // match. + if prefixLength > spanPrefixLength { + return -1 + } + + // Look for an exact match on the shared prefix. + for k, datum := range prefix { + compareResult := datum.Compare(ps.EvalCtx, span.StartKey().Value(k)) + if compareResult != 0 { + return compareResult + } + } + return 0 +} + +// searchPrefixes searches a sorted slice of PrefixIsLocals in ps for a full +// match on all Datums in the Prefix with the given span, and returns the index +// of the match, or -1 if there is no match. +// The slice must be sorted in ascending order, with longer prefixes sorting +// before short prefixes and sorted by prefix values within each group +// of equal-length prefixes. +// Each equal-length prefix group is searched separately because there could be +// more than one possible match for a given span, and we want to match the +// longest-length prefix possible, because that reflects the actual locality +// of the span's owning range. +// If prefixSearchUpperBound is non-negative, only equal-length prefix groups of +// length prefixSearchUpperBound or less will be searched. A negative value for +// prefixSearchUpperBound means the same as passing the max upper bound of +// math.MaxInt32. A zero value for prefixSearchUpperBound means only match on +// the DEFAULT partition, which has a zero-length prefix. +func searchPrefixes(span *Span, ps *partition.PrefixSorter, prefixSearchUpperBound int) int { + if prefixSearchUpperBound < 0 { + prefixSearchUpperBound = math.MaxInt32 + } + spanPrefix := span.Prefix(ps.EvalCtx) + i := 0 + // Get the first slice in the PrefixSorter + prefixSlice, startIndex, ok := ps.Slice(i) + + // return 'prefix >= span' result + matchFunction := func(i int) bool { + prefix := prefixSlice[i].Prefix + // For nonzero-length partition prefixes, the span prefix must be at least + // as long as it in order to match, whereas zero-length default partitions + // match anything. + if len(prefix) > spanPrefix { + return false + } else if len(prefix) == 0 { + return true + } + + for k, datum := range prefix { + compareResult := datum.Compare(ps.EvalCtx, span.StartKey().Value(k)) + if compareResult != 0 { + return compareResult > 0 + } + } + return true + } + + for ; ok; prefixSlice, startIndex, ok = ps.Slice(i) { + i++ + + if len(prefixSlice[0].Prefix) > prefixSearchUpperBound { + continue + } + + // Binary search for matching entry or insertion point in the prefix slices. + index := sort.Search(len(prefixSlice), matchFunction) + if index >= len(prefixSlice) { + continue + } + // Need to requalify for equality because we might have just found an + // insertion point instead of an actual match. + if compare(prefixSlice[index], span, ps) == 0 { + return index + startIndex + } + } + return -1 +} + +// FindMatch finds the Entry in PrefixSorter which matches the span prefix on a +// prefix subset of its keys, including a zero-length match in the case of the +// DEFAULT partition. +func FindMatch(span *Span, ps *partition.PrefixSorter) (match *partition.PrefixIsLocal, ok bool) { + index := searchPrefixes(span, ps, math.MaxInt32 /* prefixSearchUpperBound*/) + if index == -1 { + return nil, false + } + return &ps.Entry[index], true +} + +// FindMatchOnSingleColumn finds the Entry in PrefixSorter with a prefix length +// of 1 or less which matches the span prefix, including a zero-length match in +// the case of the DEFAULT partition. +func FindMatchOnSingleColumn( + datum tree.Datum, ps *partition.PrefixSorter, +) (match *partition.PrefixIsLocal, ok bool) { + sp := &Span{} + key := Key{firstVal: datum} + sp.Init(key, IncludeBoundary, key, IncludeBoundary) + index := searchPrefixes(sp, ps, 1 /* prefixSearchUpperBound */) + if index == -1 { + return nil, false + } + return &ps.Entry[index], true +} diff --git a/pkg/sql/opt/idxconstraint/BUILD.bazel b/pkg/sql/opt/idxconstraint/BUILD.bazel index 2cfb9b9e323c..4febcd5ea1de 100644 --- a/pkg/sql/opt/idxconstraint/BUILD.bazel +++ b/pkg/sql/opt/idxconstraint/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/sql/opt/constraint", "//pkg/sql/opt/memo", "//pkg/sql/opt/norm", + "//pkg/sql/opt/partition", "//pkg/sql/sem/tree", "//pkg/sql/types", "//pkg/util", diff --git a/pkg/sql/opt/idxconstraint/index_constraints.go b/pkg/sql/opt/idxconstraint/index_constraints.go index ead9fbf6ea88..95beed32c3a1 100644 --- a/pkg/sql/opt/idxconstraint/index_constraints.go +++ b/pkg/sql/opt/idxconstraint/index_constraints.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/opt/norm" + "github.com/cockroachdb/cockroach/pkg/sql/opt/partition" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" @@ -1052,6 +1053,7 @@ func (ic *Instance) Init( consolidate bool, evalCtx *tree.EvalContext, factory *norm.Factory, + ps *partition.PrefixSorter, ) { // This initialization pattern ensures that fields are not unwittingly // reused. Field reuse must be explicit. @@ -1089,7 +1091,7 @@ func (ic *Instance) Init( // we have [/1/1 - /1/2]. if consolidate { ic.consolidatedConstraint = ic.constraint - ic.consolidatedConstraint.ConsolidateSpans(evalCtx) + ic.consolidatedConstraint.ConsolidateSpans(evalCtx, ps) ic.consolidated = true } ic.initialized = true diff --git a/pkg/sql/opt/idxconstraint/index_constraints_test.go b/pkg/sql/opt/idxconstraint/index_constraints_test.go index 69a1230e120f..25383143efd8 100644 --- a/pkg/sql/opt/idxconstraint/index_constraints_test.go +++ b/pkg/sql/opt/idxconstraint/index_constraints_test.go @@ -125,7 +125,7 @@ func TestIndexConstraints(t *testing.T) { var ic idxconstraint.Instance ic.Init( filters, optionalFilters, indexCols, sv.NotNullCols(), computedCols, - true /* consolidate */, &evalCtx, &f, + true /* consolidate */, &evalCtx, &f, nil, /* prefixSorter */ ) result := ic.Constraint() var buf bytes.Buffer @@ -241,7 +241,7 @@ func BenchmarkIndexConstraints(b *testing.B) { ic.Init( filters, nil /* optionalFilters */, indexCols, sv.NotNullCols(), nil /* computedCols */, true, /* consolidate */ - &evalCtx, &f, + &evalCtx, &f, nil, /* prefixSorter */ ) _ = ic.Constraint() _ = ic.RemainingFilters() diff --git a/pkg/sql/opt/invertedidx/inverted_index_expr.go b/pkg/sql/opt/invertedidx/inverted_index_expr.go index 10fa6cd96870..22969e64bdfc 100644 --- a/pkg/sql/opt/invertedidx/inverted_index_expr.go +++ b/pkg/sql/opt/invertedidx/inverted_index_expr.go @@ -373,6 +373,7 @@ func constrainPrefixColumns( ) (constraint *constraint.Constraint, remainingFilters memo.FiltersExpr, ok bool) { tabMeta := factory.Metadata().TableMeta(tabID) prefixColumnCount := index.NonInvertedPrefixColumnCount() + ps, _ := tabMeta.IndexPartitionLocality(index.Ordinal(), index, evalCtx) // If this is a single-column inverted index, there are no prefix columns to // constrain. @@ -413,7 +414,7 @@ func constrainPrefixColumns( filters, optionalFilters, prefixColumns, notNullCols, tabMeta.ComputedCols, false, /* consolidate */ - evalCtx, factory, + evalCtx, factory, ps, ) constraint = ic.UnconsolidatedConstraint() if constraint.Prefix(evalCtx) < prefixColumnCount { diff --git a/pkg/sql/opt/metadata.go b/pkg/sql/opt/metadata.go index 473155ba8159..de85bfcc98b7 100644 --- a/pkg/sql/opt/metadata.go +++ b/pkg/sql/opt/metadata.go @@ -467,13 +467,14 @@ func (md *Metadata) DuplicateTable( } md.tables = append(md.tables, TableMeta{ - MetaID: newTabID, - Table: tabMeta.Table, - Alias: tabMeta.Alias, - IgnoreForeignKeys: tabMeta.IgnoreForeignKeys, - Constraints: constraints, - ComputedCols: computedCols, - partialIndexPredicates: partialIndexPredicates, + MetaID: newTabID, + Table: tabMeta.Table, + Alias: tabMeta.Alias, + IgnoreForeignKeys: tabMeta.IgnoreForeignKeys, + Constraints: constraints, + ComputedCols: computedCols, + partialIndexPredicates: partialIndexPredicates, + indexPartitionLocalities: tabMeta.indexPartitionLocalities, }) return newTabID diff --git a/pkg/sql/opt/partition/BUILD.bazel b/pkg/sql/opt/partition/BUILD.bazel new file mode 100644 index 000000000000..7b8d277e5cd3 --- /dev/null +++ b/pkg/sql/opt/partition/BUILD.bazel @@ -0,0 +1,14 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "partition", + srcs = ["locality.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/opt/partition", + visibility = ["//visibility:public"], + deps = [ + "//pkg/sql/opt/cat", + "//pkg/sql/sem/tree", + "//pkg/util", + "@com_github_cockroachdb_errors//:errors", + ], +) diff --git a/pkg/sql/opt/partition/locality.go b/pkg/sql/opt/partition/locality.go new file mode 100644 index 000000000000..fd2b79e73949 --- /dev/null +++ b/pkg/sql/opt/partition/locality.go @@ -0,0 +1,272 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package partition + +import ( + "sort" + + "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/errors" +) + +const regionKey = "region" + +// PrefixIsLocal contains a PARTITION BY LIST Prefix, a boolean indicating +// whether the Prefix is from a local partition, and the name of the partition. +type PrefixIsLocal struct { + Prefix tree.Datums + IsLocal bool + + partitionName string +} + +// PrefixSorter sorts prefixes (which are wrapped in PrefixIsLocal structs) so +// that longer prefixes are ordered first and within each group of equal-length +// prefixes so that they are ordered by value. +type PrefixSorter struct { + EvalCtx *tree.EvalContext + Entry []PrefixIsLocal + + // A slice of indices of the last element of each equal-length group of + // entries in the Entry array above. Used by Slice(i int). + idx []int + + // The set of local partitions + LocalPartitions util.FastIntSet + + // A slice of local partition prefixes which can be used to construct + // covering spans for better detection of local spans. + LocalPrefixes []tree.Datums +} + +var _ sort.Interface = &PrefixSorter{} + +// Len is part of sort.Interface. +func (ps PrefixSorter) Len() int { + return len(ps.Entry) +} + +// Less is part of sort.Interface. +func (ps PrefixSorter) Less(i, j int) bool { + if len(ps.Entry[i].Prefix) != len(ps.Entry[j].Prefix) { + return len(ps.Entry[i].Prefix) > len(ps.Entry[j].Prefix) + } + // A zero length prefix is never less than any prefix. + if len(ps.Entry[i].Prefix) == 0 { + return false + } + compareResult := ps.Entry[i].Prefix.Compare(ps.EvalCtx, ps.Entry[j].Prefix) + return compareResult == -1 +} + +// Swap is part of sort.Interface. +func (ps PrefixSorter) Swap(i, j int) { + ps.Entry[i], ps.Entry[j] = ps.Entry[j], ps.Entry[i] +} + +// Slice returns the ith slice of PrefixIsLocal entries, all having the same +// partition prefix length, along with the start index of that slice in the +// main PrefixSorter Entry slice. Slices are sorted by prefix length with those +// of the longest prefix length occurring at i==0. +func (ps PrefixSorter) Slice(i int) (prefixSplice []PrefixIsLocal, sliceStartIndex int, ok bool) { + if i < 0 || i >= len(ps.idx) { + return nil, -1, false + } + inclusiveStartIndex := 0 + if i > 0 { + // The start of the slice is the end of the previous slice plus one. + inclusiveStartIndex = ps.idx[i-1] + 1 + } + nonInclusiveEndIndex := ps.idx[i] + 1 + if (nonInclusiveEndIndex < inclusiveStartIndex) || (nonInclusiveEndIndex > len(ps.Entry)) { + panic(errors.AssertionFailedf("Partition prefix slice not found. inclusiveStartIndex = %d, nonInclusiveEndIndex = %d", + inclusiveStartIndex, nonInclusiveEndIndex)) + } + return ps.Entry[inclusiveStartIndex:nonInclusiveEndIndex], inclusiveStartIndex, true +} + +// HasMixOfLocalAndRemotePartitions tests if the given index has at least one +// local and one remote partition as used in the current evaluation context. +// This function also returns the set of local partitions when the number of +// partitions in the index is 2 or greater and the local gateway region can be +// determined. +func HasMixOfLocalAndRemotePartitions( + evalCtx *tree.EvalContext, index cat.Index, +) (localPartitions *util.FastIntSet, ok bool) { + if index == nil || index.PartitionCount() < 2 { + return nil, false + } + var localRegion string + if localRegion, ok = evalCtx.GetLocalRegion(); !ok { + return nil, false + } + var foundLocal, foundRemote bool + localPartitions = &util.FastIntSet{} + for i, n := 0, index.PartitionCount(); i < n; i++ { + part := index.Partition(i) + if IsZoneLocal(part.Zone(), localRegion) { + foundLocal = true + localPartitions.Add(i) + } else { + foundRemote = true + } + } + return localPartitions, foundLocal && foundRemote +} + +// GetSortedPrefixes collects all the prefixes from all the different partitions +// in the index (remembering which ones came from local partitions), and sorts +// them so that longer prefixes come before shorter prefixes and within each +// group of equal-length prefixes they are ordered by value. +// This is the main function for building a PrefixSorter. +func GetSortedPrefixes( + index cat.Index, localPartitions util.FastIntSet, evalCtx *tree.EvalContext, +) *PrefixSorter { + if index == nil || index.PartitionCount() < 2 { + return nil + } + allPrefixes := make([]PrefixIsLocal, 0, index.PartitionCount()) + localPrefixes := make([]tree.Datums, 0, localPartitions.Len()) + + for i, n := 0, index.PartitionCount(); i < n; i++ { + part := index.Partition(i) + isLocal := localPartitions.Contains(i) + partitionPrefixes := part.PartitionByListPrefixes() + if len(partitionPrefixes) == 0 { + // This can happen when the partition value is DEFAULT. + allPrefixes = append(allPrefixes, PrefixIsLocal{ + Prefix: nil, + IsLocal: isLocal, + partitionName: part.Name(), + }) + } + for j := range partitionPrefixes { + allPrefixes = append(allPrefixes, PrefixIsLocal{ + Prefix: partitionPrefixes[j], + IsLocal: isLocal, + partitionName: part.Name(), + }) + if isLocal && len(partitionPrefixes[j]) > 0 { + localPrefixes = append(localPrefixes, partitionPrefixes[j]) + } + } + } + ps := PrefixSorter{evalCtx, allPrefixes, []int{}, localPartitions, localPrefixes} + sort.Sort(ps) + lastPrefixLength := len(ps.Entry[0].Prefix) + // Mark the index of each prefix group of a different length. + // We must search each group separately, so the boundaries need tagging. + for i := 1; i < len(allPrefixes); i++ { + if len(ps.Entry[i].Prefix) != lastPrefixLength { + ps.idx = append(ps.idx, i-1) + lastPrefixLength = len(ps.Entry[i].Prefix) + } + } + ps.LocalPartitions = localPartitions + + // The end of the last slice is always the last element. + ps.idx = append(ps.idx, len(allPrefixes)-1) + return &ps +} + +// isConstraintLocal returns isLocal=true and ok=true if the given constraint is +// a required constraint matching the given localRegion. Returns isLocal=false +// and ok=true if the given constraint is a prohibited constraint matching the +// given local region or if it is a required constraint matching a different +// region. Any other scenario returns ok=false, since this constraint gives no +// information about whether the constrained replicas are local or remote. +func isConstraintLocal(constraint cat.Constraint, localRegion string) (isLocal bool, ok bool) { + if constraint.GetKey() != regionKey { + // We only care about constraints on the region. + return false /* isLocal */, false /* ok */ + } + if constraint.GetValue() == localRegion { + if constraint.IsRequired() { + // The local region is required. + return true /* isLocal */, true /* ok */ + } + // The local region is prohibited. + return false /* isLocal */, true /* ok */ + } + if constraint.IsRequired() { + // A remote region is required. + return false /* isLocal */, true /* ok */ + } + // A remote region is prohibited, so this constraint gives no information + // about whether the constrained replicas are local or remote. + return false /* isLocal */, false /* ok */ +} + +// IsZoneLocal returns true if the given zone config indicates that the replicas +// it constrains will be primarily located in the localRegion. +func IsZoneLocal(zone cat.Zone, localRegion string) bool { + // First count the number of local and remote replica constraints. If all + // are local or all are remote, we can return early. + local, remote := 0, 0 + for i, n := 0, zone.ReplicaConstraintsCount(); i < n; i++ { + replicaConstraint := zone.ReplicaConstraints(i) + for j, m := 0, replicaConstraint.ConstraintCount(); j < m; j++ { + constraint := replicaConstraint.Constraint(j) + if isLocal, ok := isConstraintLocal(constraint, localRegion); ok { + if isLocal { + local++ + } else { + remote++ + } + } + } + } + if local > 0 && remote == 0 { + return true + } + if remote > 0 && local == 0 { + return false + } + + // Next check the voter replica constraints. Once again, if all are local or + // all are remote, we can return early. + local, remote = 0, 0 + for i, n := 0, zone.VoterConstraintsCount(); i < n; i++ { + replicaConstraint := zone.VoterConstraint(i) + for j, m := 0, replicaConstraint.ConstraintCount(); j < m; j++ { + constraint := replicaConstraint.Constraint(j) + if isLocal, ok := isConstraintLocal(constraint, localRegion); ok { + if isLocal { + local++ + } else { + remote++ + } + } + } + } + if local > 0 && remote == 0 { + return true + } + if remote > 0 && local == 0 { + return false + } + + // Use the lease preferences as a tie breaker. We only really care about the + // first one, since subsequent lease preferences only apply in edge cases. + if zone.LeasePreferenceCount() > 0 { + leasePref := zone.LeasePreference(0) + for i, n := 0, leasePref.ConstraintCount(); i < n; i++ { + constraint := leasePref.Constraint(i) + if isLocal, ok := isConstraintLocal(constraint, localRegion); ok { + return isLocal + } + } + } + + return false +} diff --git a/pkg/sql/opt/table_meta.go b/pkg/sql/opt/table_meta.go index d4cf12bd882c..689b97976bd3 100644 --- a/pkg/sql/opt/table_meta.go +++ b/pkg/sql/opt/table_meta.go @@ -12,6 +12,7 @@ package opt import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" + "github.com/cockroachdb/cockroach/pkg/sql/opt/partition" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/errors" @@ -40,10 +41,10 @@ func (t TableID) ColumnID(ord int) ColumnID { return t.firstColID() + ColumnID(ord) } -// IndexColumnID returns the metadata id of the idxOrd-th index column in the +// IndexColumnID returns the metadata id of the ith ordinal index column in the // given index. -func (t TableID) IndexColumnID(idx cat.Index, idxOrd int) ColumnID { - return t.ColumnID(idx.Column(idxOrd).Ordinal()) +func (t TableID) IndexColumnID(index cat.Index, i int) ColumnID { + return t.ColumnID(index.Column(i).Ordinal()) } // ColumnOrdinal returns the ordinal position of the given column in its base @@ -164,6 +165,17 @@ type TableMeta struct { // the map. partialIndexPredicates map[cat.IndexOrdinal]ScalarExpr + // indexPartitionLocalities is a map from an index ordinal on the table to a + // *PrefixSorter representing the PARTITION BY LIST values of the index and + // whether each of those partitions is region-local with respect to the query + // being run. If an index is partitioned BY LIST, and has both local and + // remote partitions, it will have an entry in the map. Local partitions are + // those where all row ranges they own have a preferred region for leaseholder + // nodes the same as the gateway region of the current connection that is + // running the query. Remote partitions have at least one row range with a + // leaseholder preferred region which is different from the gateway region. + indexPartitionLocalities map[cat.IndexOrdinal]*partition.PrefixSorter + // anns annotates the table metadata with arbitrary data. anns [maxTableAnnIDCount]interface{} } @@ -202,6 +214,10 @@ func (tm *TableMeta) copyFrom(from *TableMeta, copyScalarFn func(Expr) Expr) { tm.partialIndexPredicates[idx] = copyScalarFn(e).(ScalarExpr) } } + + // This map has no ColumnID or TableID specific information in it, so it can + // be shared. + tm.indexPartitionLocalities = from.indexPartitionLocalities } // IndexColumns returns the set of table columns in the given index. @@ -286,6 +302,30 @@ func (tm *TableMeta) AddPartialIndexPredicate(ord cat.IndexOrdinal, pred ScalarE tm.partialIndexPredicates[ord] = pred } +// AddIndexPartitionLocality adds a PrefixSorter to the table's metadata for the +// index with IndexOrdinal ord. +func (tm *TableMeta) AddIndexPartitionLocality(ord cat.IndexOrdinal, ps *partition.PrefixSorter) { + if tm.indexPartitionLocalities == nil { + tm.indexPartitionLocalities = make(map[cat.IndexOrdinal]*partition.PrefixSorter) + } + tm.indexPartitionLocalities[ord] = ps +} + +// IndexPartitionLocality returns the given index's PrefixSorter. +func (tm *TableMeta) IndexPartitionLocality( + ord cat.IndexOrdinal, index cat.Index, evalCtx *tree.EvalContext, +) (ps *partition.PrefixSorter, ok bool) { + ps, ok = tm.indexPartitionLocalities[ord] + if !ok { + if localPartitions, ok := + partition.HasMixOfLocalAndRemotePartitions(evalCtx, index); ok { + ps = partition.GetSortedPrefixes(index, *localPartitions, evalCtx) + } + tm.AddIndexPartitionLocality(ord, ps) + } + return ps, ps != nil +} + // PartialIndexPredicate returns the given index's predicate scalar expression, // if the index is a partial index. Returns ok=false if the index is not a // partial index. Panics if the index is a partial index according to the diff --git a/pkg/sql/opt/testutils/testcat/test_catalog.go b/pkg/sql/opt/testutils/testcat/test_catalog.go index cc6262a93eb3..5934ec8acfa9 100644 --- a/pkg/sql/opt/testutils/testcat/test_catalog.go +++ b/pkg/sql/opt/testutils/testcat/test_catalog.go @@ -983,6 +983,11 @@ func (ti *Index) Partition(i int) cat.Partition { return &ti.partitions[i] } +// SetPartitions manually sets the partitions. +func (ti *Index) SetPartitions(partitions []Partition) { + ti.partitions = partitions +} + // Partition implements the cat.Partition interface for testing purposes. type Partition struct { name string @@ -1007,6 +1012,11 @@ func (p *Partition) PartitionByListPrefixes() []tree.Datums { return p.datums } +// SetDatums manually sets the partitioning values. +func (p *Partition) SetDatums(datums []tree.Datums) { + p.datums = datums +} + // TableStat implements the cat.TableStatistic interface for testing purposes. type TableStat struct { js stats.JSONStatistic diff --git a/pkg/sql/opt/xform/BUILD.bazel b/pkg/sql/opt/xform/BUILD.bazel index 332a5c035269..25aed09ec2fd 100644 --- a/pkg/sql/opt/xform/BUILD.bazel +++ b/pkg/sql/opt/xform/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//pkg/sql/opt/norm", "//pkg/sql/opt/ordering", "//pkg/sql/opt/partialidx", + "//pkg/sql/opt/partition", "//pkg/sql/opt/props", "//pkg/sql/opt/props/physical", "//pkg/sql/rowinfra", @@ -81,6 +82,7 @@ go_test( "//pkg/sql/opt/constraint", "//pkg/sql/opt/memo", "//pkg/sql/opt/norm", + "//pkg/sql/opt/partition", "//pkg/sql/opt/testutils", "//pkg/sql/opt/testutils/opttester", "//pkg/sql/opt/testutils/testcat", diff --git a/pkg/sql/opt/xform/general_funcs.go b/pkg/sql/opt/xform/general_funcs.go index b6ed5d9053da..cc771035eb66 100644 --- a/pkg/sql/opt/xform/general_funcs.go +++ b/pkg/sql/opt/xform/general_funcs.go @@ -11,10 +11,7 @@ package xform import ( - "sort" - "github.com/cockroachdb/cockroach/pkg/sql/opt" - "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" "github.com/cockroachdb/cockroach/pkg/sql/opt/idxconstraint" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" @@ -23,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/partialidx" "github.com/cockroachdb/cockroach/pkg/sql/opt/props" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" ) @@ -173,6 +169,7 @@ func (c *CustomFuncs) initIdxConstraintForIndex( md := c.e.mem.Metadata() tabMeta := md.TableMeta(tabID) index := tabMeta.Table.Index(indexOrd) + ps, _ := tabMeta.IndexPartitionLocality(index.Ordinal(), index, c.e.evalCtx) columns := make([]opt.OrderingColumn, index.LaxKeyColumnCount()) var notNullCols opt.ColSet for i := range columns { @@ -190,7 +187,7 @@ func (c *CustomFuncs) initIdxConstraintForIndex( ic.Init( requiredFilters, optionalFilters, columns, notNullCols, tabMeta.ComputedCols, - true /* consolidate */, c.e.evalCtx, c.e.f, + true /* consolidate */, c.e.evalCtx, c.e.f, ps, ) return ic } @@ -342,153 +339,6 @@ func (c *CustomFuncs) findConstantFilterCols( } } -// isZoneLocal returns true if the given zone config indicates that the replicas -// it constrains will be primarily located in the localRegion. -func isZoneLocal(zone cat.Zone, localRegion string) bool { - // First count the number of local and remote replica constraints. If all - // are local or all are remote, we can return early. - local, remote := 0, 0 - for i, n := 0, zone.ReplicaConstraintsCount(); i < n; i++ { - replicaConstraint := zone.ReplicaConstraints(i) - for j, m := 0, replicaConstraint.ConstraintCount(); j < m; j++ { - constraint := replicaConstraint.Constraint(j) - if isLocal, ok := isConstraintLocal(constraint, localRegion); ok { - if isLocal { - local++ - } else { - remote++ - } - } - } - } - if local > 0 && remote == 0 { - return true - } - if remote > 0 && local == 0 { - return false - } - - // Next check the voter replica constraints. Once again, if all are local or - // all are remote, we can return early. - local, remote = 0, 0 - for i, n := 0, zone.VoterConstraintsCount(); i < n; i++ { - replicaConstraint := zone.VoterConstraint(i) - for j, m := 0, replicaConstraint.ConstraintCount(); j < m; j++ { - constraint := replicaConstraint.Constraint(j) - if isLocal, ok := isConstraintLocal(constraint, localRegion); ok { - if isLocal { - local++ - } else { - remote++ - } - } - } - } - if local > 0 && remote == 0 { - return true - } - if remote > 0 && local == 0 { - return false - } - - // Use the lease preferences as a tie breaker. We only really care about the - // first one, since subsequent lease preferences only apply in edge cases. - if zone.LeasePreferenceCount() > 0 { - leasePref := zone.LeasePreference(0) - for i, n := 0, leasePref.ConstraintCount(); i < n; i++ { - constraint := leasePref.Constraint(i) - if isLocal, ok := isConstraintLocal(constraint, localRegion); ok { - return isLocal - } - } - } - - return false -} - -// isConstraintLocal returns isLocal=true and ok=true if the given constraint is -// a required constraint matching the given localRegion. Returns isLocal=false -// and ok=true if the given constraint is a prohibited constraint matching the -// given local region or if it is a required constraint matching a different -// region. Any other scenario returns ok=false, since this constraint gives no -// information about whether the constrained replicas are local or remote. -func isConstraintLocal(constraint cat.Constraint, localRegion string) (isLocal bool, ok bool) { - if constraint.GetKey() != regionKey { - // We only care about constraints on the region. - return false /* isLocal */, false /* ok */ - } - if constraint.GetValue() == localRegion { - if constraint.IsRequired() { - // The local region is required. - return true /* isLocal */, true /* ok */ - } - // The local region is prohibited. - return false /* isLocal */, true /* ok */ - } - if constraint.IsRequired() { - // A remote region is required. - return false /* isLocal */, true /* ok */ - } - // A remote region is prohibited, so this constraint gives no information - // about whether the constrained replicas are local or remote. - return false /* isLocal */, false /* ok */ -} - -// prefixIsLocal contains a PARTITION BY LIST prefix, and a boolean indicating -// whether the prefix is from a local partition. -type prefixIsLocal struct { - prefix tree.Datums - isLocal bool -} - -// prefixSorter sorts prefixes (which are wrapped in prefixIsLocal structs) so -// that longer prefixes are ordered first. -type prefixSorter []prefixIsLocal - -var _ sort.Interface = &prefixSorter{} - -// Len is part of sort.Interface. -func (ps prefixSorter) Len() int { - return len(ps) -} - -// Less is part of sort.Interface. -func (ps prefixSorter) Less(i, j int) bool { - return len(ps[i].prefix) > len(ps[j].prefix) -} - -// Swap is part of sort.Interface. -func (ps prefixSorter) Swap(i, j int) { - ps[i], ps[j] = ps[j], ps[i] -} - -// getSortedPrefixes collects all the prefixes from all the different partitions -// in the index (remembering which ones came from local partitions), and sorts -// them so that longer prefixes come before shorter prefixes. -func getSortedPrefixes(index cat.Index, localPartitions util.FastIntSet) []prefixIsLocal { - allPrefixes := make(prefixSorter, 0, index.PartitionCount()) - for i, n := 0, index.PartitionCount(); i < n; i++ { - part := index.Partition(i) - isLocal := localPartitions.Contains(i) - partitionPrefixes := part.PartitionByListPrefixes() - if len(partitionPrefixes) == 0 { - // This can happen when the partition value is DEFAULT. - allPrefixes = append(allPrefixes, prefixIsLocal{ - prefix: nil, - isLocal: isLocal, - }) - } - for j := range partitionPrefixes { - allPrefixes = append(allPrefixes, prefixIsLocal{ - prefix: partitionPrefixes[j], - isLocal: isLocal, - }) - } - } - sort.Sort(allPrefixes) - return allPrefixes -} - // splitScanIntoUnionScans tries to find a UnionAll of Scan operators (with an // optional hard limit) that each scan over a single key from the original // Scan's constraints. The UnionAll is returned if the scans can provide the diff --git a/pkg/sql/opt/xform/general_funcs_test.go b/pkg/sql/opt/xform/general_funcs_test.go index 5e2e1c6e1d13..567884c1f5aa 100644 --- a/pkg/sql/opt/xform/general_funcs_test.go +++ b/pkg/sql/opt/xform/general_funcs_test.go @@ -14,6 +14,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/sql/opt/partition" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "gopkg.in/yaml.v2" @@ -101,7 +102,7 @@ func TestIsZoneLocal(t *testing.T) { } } - actual := isZoneLocal(zone, tc.localRegion) + actual := partition.IsZoneLocal(zone, tc.localRegion) if actual != tc.expected { t.Errorf("locality=%v, constraints=%v, voterConstraints=%v, leasePrefs=%v: expected %v, got %v", tc.localRegion, tc.constraints, tc.voterConstraints, tc.leasePrefs, tc.expected, actual) diff --git a/pkg/sql/opt/xform/join_funcs.go b/pkg/sql/opt/xform/join_funcs.go index 24199fec20af..734a806c1b93 100644 --- a/pkg/sql/opt/xform/join_funcs.go +++ b/pkg/sql/opt/xform/join_funcs.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/invertedidx" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/opt/ordering" + "github.com/cockroachdb/cockroach/pkg/sql/opt/partition" "github.com/cockroachdb/cockroach/pkg/sql/opt/props" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -1453,35 +1454,21 @@ func (c *CustomFuncs) GetLocalityOptimizedLookupJoinExprs( // value of private.LookupColsAreTableKey. if private.JoinType != opt.AntiJoinOp { if (private.JoinType != opt.SemiJoinOp || len(on) > 0) && !private.LookupColsAreTableKey { - return + return nil, nil, false } } - - // The local region must be set, or we won't be able to determine which - // partitions are local. - localRegion, found := c.e.evalCtx.Locality.Find(regionKey) - if !found { - return nil, nil, false - } - - // There should be at least two partitions, or we won't be able to - // differentiate between local and remote partitions. tabMeta := c.e.mem.Metadata().TableMeta(private.Table) index := tabMeta.Table.Index(private.Index) - if index.PartitionCount() < 2 { - return nil, nil, false - } - // Determine whether the index has both local and remote partitions. - var localPartitions util.FastIntSet - for i, n := 0, index.PartitionCount(); i < n; i++ { - part := index.Partition(i) - if isZoneLocal(part.Zone(), localRegion) { - localPartitions.Add(i) - } - } - if localPartitions.Len() == 0 || localPartitions.Len() == index.PartitionCount() { - // The partitions are either all local or all remote. + // The PrefixSorter has collected all the prefixes from all the different + // partitions (remembering which ones came from local partitions), and has + // sorted them so that longer prefixes come before shorter prefixes. For each + // span in the scanConstraint, we will iterate through the list of prefixes + // until we find a match, so ordering them with longer prefixes first ensures + // that the correct match is found. The PrefixSorter is only non-nil when this + // index has at least one local and one remote partition. + var ps *partition.PrefixSorter + if ps, ok = tabMeta.IndexPartitionLocality(private.Index, index, c.e.evalCtx); !ok { return nil, nil, false } @@ -1501,7 +1488,7 @@ func (c *CustomFuncs) GetLocalityOptimizedLookupJoinExprs( } // Determine whether the values target both local and remote partitions. - localValOrds := c.getLocalValues(index, localPartitions, vals) + localValOrds := c.getLocalValues(vals, ps) if localValOrds.Len() == 0 || localValOrds.Len() == len(vals) { // The values target all local or all remote partitions. return nil, nil, false @@ -1551,31 +1538,19 @@ func (c CustomFuncs) getConstPrefixFilter( // getLocalValues returns the indexes of the values in the given Datums slice // that target local partitions. func (c *CustomFuncs) getLocalValues( - index cat.Index, localPartitions util.FastIntSet, values tree.Datums, + values tree.Datums, ps *partition.PrefixSorter, ) util.FastIntSet { - // Collect all the prefixes from all the different partitions (remembering - // which ones came from local partitions), and sort them so that longer - // prefixes come before shorter prefixes. For each value in the given Datums, - // we will iterate through the list of prefixes until we find a match, so - // ordering them with longer prefixes first ensures that the correct match is - // found. - allPrefixes := getSortedPrefixes(index, localPartitions) - - // TODO(rytaft): Sort the prefixes by key in addition to length, and use - // binary search here. + // The PrefixSorter has collected all the prefixes from all the different + // partitions (remembering which ones came from local partitions), and has + // sorted them so that longer prefixes come before shorter prefixes. For each + // span in the scanConstraint, we will iterate through the list of prefixes + // until we find a match, so ordering them with longer prefixes first ensures + // that the correct match is found. var localVals util.FastIntSet for i, val := range values { - for j := range allPrefixes { - prefix := allPrefixes[j].prefix - isLocal := allPrefixes[j].isLocal - if len(prefix) > 1 { - continue - } - if val.Compare(c.e.evalCtx, prefix[0]) == 0 { - if isLocal { - localVals.Add(i) - } - break + if match, ok := constraint.FindMatchOnSingleColumn(val, ps); ok { + if match.IsLocal { + localVals.Add(i) } } } diff --git a/pkg/sql/opt/xform/scan_funcs.go b/pkg/sql/opt/xform/scan_funcs.go index ab870c075d17..8371c91e502d 100644 --- a/pkg/sql/opt/xform/scan_funcs.go +++ b/pkg/sql/opt/xform/scan_funcs.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" + "github.com/cockroachdb/cockroach/pkg/sql/opt/partition" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" @@ -91,8 +92,6 @@ func (c *CustomFuncs) GenerateIndexScans(grp memo.RelExpr, scanPrivate *memo.Sca }) } -const regionKey = "region" - // CanMaybeGenerateLocalityOptimizedScan returns true if it may be possible to // generate a locality optimized scan from the given scan private. // CanMaybeGenerateLocalityOptimizedScan performs simple checks that are @@ -110,35 +109,41 @@ func (c *CustomFuncs) CanMaybeGenerateLocalityOptimizedScan(scanPrivate *memo.Sc return false } - if scanPrivate.HardLimit != 0 { - // This optimization doesn't apply to limited scans. - return false - } - - // This scan should have at least two spans, or we won't be able to move one - // of the spans to a separate remote scan. - if scanPrivate.Constraint == nil || scanPrivate.Constraint.Spans.Count() < 2 { - return false - } + if scanPrivate.Constraint == nil { + // Since we have no constraint, we must have a limit to use this + // optimization. We also require the limit to be less than the kv batch + // size, since it's probably better to use DistSQL once we're scanning + // multiple batches. + // TODO(rytaft): Revisit this when we have a more accurate cost model for + // data distribution. + if scanPrivate.HardLimit == 0 || rowinfra.KeyLimit(scanPrivate.HardLimit) > rowinfra.ProductionKVBatchSize { + return false + } + } else { + // This scan should have at least two spans, or we won't be able to move one + // of the spans to a separate remote scan. + if scanPrivate.Constraint.Spans.Count() < 2 { + return false + } - // Don't apply the rule if there are too many spans, since the rule code is - // O(# spans * # prefixes * # datums per prefix). - if scanPrivate.Constraint.Spans.Count() > 10000 { - return false + // Don't apply the rule if there are too many spans, since the rule code is + // O(# spans * # prefixes * # datums per prefix). + if scanPrivate.Constraint.Spans.Count() > 10000 { + return false + } } // There should be at least two partitions, or we won't be able to // differentiate between local and remote partitions. + // This information is encapsulated in the PrefixSorter. If a PrefixSorter was + // not created for this index, then either all partitions are local, all + // are remote, or the index is not partitioned. tabMeta := c.e.mem.Metadata().TableMeta(scanPrivate.Table) index := tabMeta.Table.Index(scanPrivate.Index) - if index.PartitionCount() < 2 { + if _, ok := tabMeta.IndexPartitionLocality(scanPrivate.Index, index, c.e.evalCtx); !ok { return false } - - // The local region must be set, or we won't be able to determine which - // partitions are local. - _, found := c.e.evalCtx.Locality.Find(regionKey) - return found + return true } // GenerateLocalityOptimizedScan generates a locality optimized scan if possible @@ -154,45 +159,54 @@ func (c *CustomFuncs) GenerateLocalityOptimizedScan( // we're scanning multiple batches. // TODO(rytaft): Revisit this when we have a more accurate cost model for data // distribution. - if rowinfra.KeyLimit(grp.Relational().Cardinality.Max) > rowinfra.ProductionKVBatchSize { + maxRows := rowinfra.KeyLimit(grp.Relational().Cardinality.Max) + if maxRows > rowinfra.ProductionKVBatchSize { return } tabMeta := c.e.mem.Metadata().TableMeta(scanPrivate.Table) index := tabMeta.Table.Index(scanPrivate.Index) - // We already know that a local region exists from calling - // CanMaybeGenerateLocalityOptimizedScan. - localRegion, _ := c.e.evalCtx.Locality.Find(regionKey) - - // Determine whether the index has both local and remote partitions, and - // if so, which spans target local partitions. - var localPartitions util.FastIntSet - for i, n := 0, index.PartitionCount(); i < n; i++ { - part := index.Partition(i) - if isZoneLocal(part.Zone(), localRegion) { - localPartitions.Add(i) - } - } - if localPartitions.Len() == 0 || localPartitions.Len() == index.PartitionCount() { - // The partitions are either all local or all remote. + // The PrefixSorter has collected all the prefixes from all the different + // partitions (remembering which ones came from local partitions), and has + // sorted them so that longer prefixes come before shorter prefixes. For each + // span in the scanConstraint, we will iterate through the list of prefixes + // until we find a match, so ordering them with longer prefixes first ensures + // that the correct match is found. The PrefixSorter is only non-nil when this + // index has at least one local and one remote partition. + var ps *partition.PrefixSorter + var ok bool + if ps, ok = tabMeta.IndexPartitionLocality(scanPrivate.Index, index, c.e.evalCtx); !ok { return } - localSpans := c.getLocalSpans(index, localPartitions, scanPrivate.Constraint) - if localSpans.Len() == 0 || localSpans.Len() == scanPrivate.Constraint.Spans.Count() { + // If the Scan has no Constraint, retrieve the constraint of the form + // 'part_col IN (, ... )' plus an expression + // representing any gaps between defined partitions (if any), all combined + // in a single constraint. + // It is expected that this constraint covers all rows in the table, so it is + // equivalent to a nil Constraint. + idxConstraint := scanPrivate.Constraint + if idxConstraint == nil { + if idxConstraint, ok = c.buildAllPartitionsConstraint(tabMeta, index, ps, scanPrivate); !ok { + return + } + } + localSpans := c.getLocalSpans(idxConstraint, ps) + if localSpans.Len() == 0 || localSpans.Len() == idxConstraint.Spans.Count() { // The spans target all local or all remote partitions. return } // Split the spans into local and remote sets. - localConstraint, remoteConstraint := c.splitSpans(scanPrivate.Constraint, localSpans) + localConstraint, remoteConstraint := c.splitSpans(idxConstraint, localSpans) // Create the local scan. localScanPrivate := c.DuplicateScanPrivate(scanPrivate) localScanPrivate.LocalityOptimized = true localConstraint.Columns = localConstraint.Columns.RemapColumns(scanPrivate.Table, localScanPrivate.Table) localScanPrivate.SetConstraint(c.e.evalCtx, &localConstraint) + localScanPrivate.HardLimit = scanPrivate.HardLimit localScan := c.e.f.ConstructScan(localScanPrivate) // Create the remote scan. @@ -200,6 +214,7 @@ func (c *CustomFuncs) GenerateLocalityOptimizedScan( remoteScanPrivate.LocalityOptimized = true remoteConstraint.Columns = remoteConstraint.Columns.RemapColumns(scanPrivate.Table, remoteScanPrivate.Table) remoteScanPrivate.SetConstraint(c.e.evalCtx, &remoteConstraint) + remoteScanPrivate.HardLimit = scanPrivate.HardLimit remoteScan := c.e.f.ConstructScan(remoteScanPrivate) // Add the LocalityOptimizedSearchExpr to the same group as the original scan. @@ -215,45 +230,110 @@ func (c *CustomFuncs) GenerateLocalityOptimizedScan( c.e.mem.AddLocalityOptimizedSearchToGroup(&locOptSearch, grp) } +// buildAllPartitionsConstraint retrieves the partition filters and in between +// filters for the "index" belonging to the table described by "tabMeta", and +// builds the full set of spans covering both defined partitions and rows +// belonging to no defined partition (or partitions defined as DEFAULT). If a +// Constraint fails to be built or if the Constraint is unconstrained, this +// function returns (nil, false). +// Partition spans that are 100% local will not be merged with other spans. Note +// that if the partitioning columns have no CHECK constraint defined, suboptimal +// spans may be produced which don't maximize the number of rows accessed as a +// 100% local operation. +// For example: +// CREATE TABLE abc_part ( +// r STRING NOT NULL , +// t INT NOT NULL, +// a INT PRIMARY KEY, +// b INT, +// c INT, +// d INT, +// UNIQUE INDEX c_idx (r, t, c) PARTITION BY LIST (r, t) ( +// PARTITION west VALUES IN (('west', 1), ('east', 4)), +// PARTITION east VALUES IN (('east', DEFAULT), ('east', 2)), +// PARTITION default VALUES IN (DEFAULT) +// ) +// ); +// ALTER PARTITION "east" OF INDEX abc_part@c_idx CONFIGURE ZONE USING +// num_voters = 5, +// voter_constraints = '{+region=east: 2}', +// lease_preferences = '[[+region=east]]' +// +// ALTER PARTITION "west" OF INDEX abc_part@c_idx CONFIGURE ZONE USING +// num_voters = 5, +// voter_constraints = '{+region=west: 2}', +// lease_preferences = '[[+region=west]]' +// +// ALTER PARTITION "default" OF INDEX abc_part@c_idx CONFIGURE ZONE USING +// num_voters = 5, +// lease_preferences = '[[+region=central]]'; +// +// EXPLAIN SELECT c FROM abc_part@c_idx LIMIT 3; +// info +// ---------------------------------------------- +// distribution: local +// vectorized: true +// +// • union all +// │ limit: 3 +// │ +// ├── • scan +// │ missing stats +// │ table: abc_part@c_idx +// │ spans: [/'east'/2 - /'east'/3] +// │ limit: 3 +// │ +// └── • scan +// missing stats +// table: abc_part@c_idx +// spans: [ - /'east'/1] [/'east'/4 - ] +// limit: 3 +// +// Because of the partial-default east partition, ('east', DEFAULT), the spans +// in the local (left) branch of the union all should be +// [/'east' - /'east'/3] [/'east'/5 - /'east']. Adding in the following check +// constraint achieves this: CHECK (r IN ('east', 'west', 'central')) +func (c *CustomFuncs) buildAllPartitionsConstraint( + tabMeta *opt.TableMeta, index cat.Index, ps *partition.PrefixSorter, sp *memo.ScanPrivate, +) (*constraint.Constraint, bool) { + var ok bool + var remainingFilters memo.FiltersExpr + var combinedConstraint *constraint.Constraint + + // CHECK constraint and computed column filters + optionalFilters, filterColumns := + c.GetOptionalFiltersAndFilterColumns(nil /* explicitFilters */, sp) + + if _, remainingFilters, combinedConstraint, ok = c.MakeCombinedFiltersConstraint( + tabMeta, index, sp, ps, + nil /* explicitFilters */, optionalFilters, filterColumns, + ); !ok { + return nil, false + } + + // All partitionFilters are expected to build constraints. If they don't, + // let's not hide the problem by still generating a locality-optimized search + // that doesn't fully cover local spans. + if remainingFilters != nil && len(remainingFilters) > 0 { + return nil, false + } + + return combinedConstraint, true +} + // getLocalSpans returns the indexes of the spans from the given constraint that // target local partitions. func (c *CustomFuncs) getLocalSpans( - index cat.Index, localPartitions util.FastIntSet, constraint *constraint.Constraint, + scanConstraint *constraint.Constraint, ps *partition.PrefixSorter, ) util.FastIntSet { - // Collect all the prefixes from all the different partitions (remembering - // which ones came from local partitions), and sort them so that longer - // prefixes come before shorter prefixes. For each span in the constraint, we - // will iterate through the list of prefixes until we find a match, so - // ordering them with longer prefixes first ensures that the correct match is - // found. - allPrefixes := getSortedPrefixes(index, localPartitions) - - // Now iterate through the spans and determine whether each one matches + // Iterate through the spans and determine whether each one matches // with a prefix from a local partition. - // TODO(rytaft): Sort the prefixes by key in addition to length, and use - // binary search here. var localSpans util.FastIntSet - for i, n := 0, constraint.Spans.Count(); i < n; i++ { - span := constraint.Spans.Get(i) - spanPrefix := span.Prefix(c.e.evalCtx) - for j := range allPrefixes { - prefix := allPrefixes[j].prefix - isLocal := allPrefixes[j].isLocal - if len(prefix) > spanPrefix { - continue - } - matches := true - for k, datum := range prefix { - if span.StartKey().Value(k).Compare(c.e.evalCtx, datum) != 0 { - matches = false - break - } - } - if matches { - if isLocal { - localSpans.Add(i) - } - break + for i, n := 0, scanConstraint.Spans.Count(); i < n; i++ { + span := scanConstraint.Spans.Get(i) + if match, ok := constraint.FindMatch(span, ps); ok { + if match.IsLocal { + localSpans.Add(i) } } } diff --git a/pkg/sql/opt/xform/select_funcs.go b/pkg/sql/opt/xform/select_funcs.go index 99e2853978cc..fc6950560721 100644 --- a/pkg/sql/opt/xform/select_funcs.go +++ b/pkg/sql/opt/xform/select_funcs.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" "github.com/cockroachdb/cockroach/pkg/sql/opt/invertedidx" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" + "github.com/cockroachdb/cockroach/pkg/sql/opt/partition" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" @@ -144,6 +145,161 @@ func (c *CustomFuncs) GeneratePartialIndexScans( }) } +// MakeCombinedFiltersConstraint builds a constraint from explicitFilters, +// optionalFilters and conditionally an IN list filter generated from the +// index's PARTITION BY LIST values if both of these conditions are true: +// 1) The first partitioning column is not referenced in either +// optionalFilters or explicitFilters +// 2) No index key columns are referenced in optionalFilters or +// explicitFilters. +// These filters are passed in a single call to tryConstrainIndex. +// In all known uses, optionalFilters consists of the CHECK constraint filters +// and computed column filters. +// Returns: +// partitionFilters as the IN list of PARTITION BY values, if it was built +// remainingFilters as any filters which weren't used in combinedConstraint +// combinedConstraint as the collection of Spans to scan +// ok==false if we failed to constrain the scan +// See additional comments below. +func (c *CustomFuncs) MakeCombinedFiltersConstraint( + tabMeta *opt.TableMeta, + index cat.Index, + scanPrivate *memo.ScanPrivate, + ps *partition.PrefixSorter, + explicitFilters memo.FiltersExpr, + optionalFilters memo.FiltersExpr, + filterColumns opt.ColSet, +) ( + partitionFilters memo.FiltersExpr, + remainingFilters memo.FiltersExpr, + combinedConstraint *constraint.Constraint, + ok bool, +) { + // We only consider the partition values when a particular index can otherwise + // not be constrained. For indexes that are constrained, the partitioned + // values add no benefit as they don't really constrain anything. + // Furthermore, if the filters don't take advantage of the index (use any of + // the index columns), using the partition values add no benefit. + // + // If the index is partitioned (by list), we generate two constraints and + // union them: the "main" constraint and the "in-between" constraint.The + // "main" constraint restricts the index to the known partition ranges. The + // "in-between" constraint restricts the index to the rest of the ranges + // (i.e. everything that falls in-between the main ranges); the in-between + // constraint is necessary for correctness (there can be rows outside of the + // partitioned ranges). + // + // For both constraints, the partition-related filters are passed as + // "optional" which guarantees that they return no remaining filters. This + // allows us to merge the remaining filters from both constraints. + // + // Consider the following index and its partition: + // + // CREATE INDEX orders_by_seq_num + // ON orders (region, seq_num, id) + // STORING (total) + // PARTITION BY LIST (region) + // ( + // PARTITION us_east1 VALUES IN ('us-east1'), + // PARTITION us_west1 VALUES IN ('us-west1'), + // PARTITION europe_west2 VALUES IN ('europe-west2') + // ) + // + // The constraint generated for the query: + // SELECT sum(total) FROM orders WHERE seq_num >= 100 AND seq_num < 200 + // is: + // [/'europe-west2'/100 - /'europe-west2'/199] + // [/'us-east1'/100 - /'us-east1'/199] + // [/'us-west1'/100 - /'us-west1'/199] + // + // The spans before europe-west2, after us-west1 and in between the defined + // partitions are missing. We must add these spans now, appropriately + // constrained using the filters. + // + // It is important that we add these spans after the partition spans are + // generated because otherwise these spans would merge with the partition + // spans and would disallow the partition spans (and the in between ones) to + // be constrained further. Using the partitioning example and the query above, + // if we added the in between spans at the same time as the partitioned ones, + // we would end up with a span that looked like: + // [ - /'europe-west2'/99] + // + // Allowing the partition spans to be constrained further and then adding + // the spans give us a more constrained index scan as shown below: + // [ - /'europe-west2') + // [/'europe-west2'/100 - /'europe-west2'/199] + // [/e'europe-west2\x00'/100 - /'us-east1') + // [/'us-east1'/100 - /'us-east1'/199] + // [/e'us-east1\x00'/100 - /'us-west1') + // [/'us-west1'/100 - /'us-west1'/199] + // [/e'us-west1\x00'/100 - ] + // + // Notice how we 'skip' all the europe-west2 rows with seq_num < 100. + // + var inBetweenFilters memo.FiltersExpr + + indexColumns := tabMeta.IndexKeyColumns(index.Ordinal()) + firstIndexCol := scanPrivate.Table.IndexColumnID(index, 0) + if !filterColumns.Contains(firstIndexCol) && + indexColumns.Intersects(filterColumns) { + // Calculate any partition filters if appropriate (see below). + partitionFilters, inBetweenFilters = c.partitionValuesFilters(scanPrivate.Table, index) + } + + // Check whether the filter (along with any partitioning filters) can constrain the index. + combinedConstraint, remainingFilters, ok = c.tryConstrainIndex( + explicitFilters, + append(optionalFilters, partitionFilters...), + scanPrivate.Table, + index.Ordinal(), + ) + if !ok { + return nil, nil, nil, false + } + + if len(partitionFilters) > 0 { + inBetweenConstraint, inBetweenRemainingFilters, ok := c.tryConstrainIndex( + explicitFilters, + append(optionalFilters, inBetweenFilters...), + scanPrivate.Table, + index.Ordinal(), + ) + if !ok { + panic(errors.AssertionFailedf("in-between filters didn't yield a constraint")) + } + + combinedConstraint.UnionWith(c.e.evalCtx, inBetweenConstraint) + + // Even though the partitioned constraints and the inBetween constraints + // were consolidated, we must make sure their Union is as well. + combinedConstraint.ConsolidateSpans(c.e.evalCtx, ps) + // Add all remaining filters that need to be present in the + // inBetween spans. Some of the remaining filters are common + // between them, so we must deduplicate them. + remainingFilters = c.ConcatFilters(remainingFilters, inBetweenRemainingFilters) + remainingFilters.Sort() + remainingFilters.Deduplicate() + } + return partitionFilters, remainingFilters, combinedConstraint, true +} + +// GetOptionalFiltersAndFilterColumns generates implicit filters from +// constraints and computed columns as optional filters to help constrain an +// index scan. filterColumns returns the outer columns found in either the +// implicit filters or the explicitFilters. +func (c *CustomFuncs) GetOptionalFiltersAndFilterColumns( + explicitFilters memo.FiltersExpr, scanPrivate *memo.ScanPrivate, +) (optionalFilters memo.FiltersExpr, filterColumns opt.ColSet) { + + optionalFilters = c.checkConstraintFilters(scanPrivate.Table) + computedColFilters := c.computedColFilters(scanPrivate, explicitFilters, optionalFilters) + optionalFilters = append(optionalFilters, computedColFilters...) + + filterColumns = c.FilterOuterCols(explicitFilters) + filterColumns.UnionWith(c.FilterOuterCols(optionalFilters)) + return optionalFilters, filterColumns +} + // GenerateConstrainedScans enumerates all non-inverted secondary indexes on the // Scan operator's table and tries to push the given Select filter into new // constrained Scan operators using those indexes. Since this only needs to be @@ -212,134 +368,40 @@ func (c *CustomFuncs) GenerateConstrainedScans( ) { var pkCols opt.ColSet var sb indexScanBuilder - sb.Init(c, scanPrivate.Table) + var ok bool + var partitionFilters, remainingFilters memo.FiltersExpr + var combinedConstraint *constraint.Constraint + md := c.e.mem.Metadata() + tabMeta := md.TableMeta(scanPrivate.Table) - // Generate implicit filters from constraints and computed columns as - // optional filters to help constrain an index scan. - optionalFilters := c.checkConstraintFilters(scanPrivate.Table) - computedColFilters := c.computedColFilters(scanPrivate, explicitFilters, optionalFilters) - optionalFilters = append(optionalFilters, computedColFilters...) + sb.Init(c, scanPrivate.Table) - filterColumns := c.FilterOuterCols(explicitFilters) - filterColumns.UnionWith(c.FilterOuterCols(optionalFilters)) + // Check constraint and computed column filters + optionalFilters, filterColumns := + c.GetOptionalFiltersAndFilterColumns(explicitFilters, scanPrivate) // Iterate over all non-inverted indexes. - md := c.e.mem.Metadata() - tabMeta := md.TableMeta(scanPrivate.Table) var iter scanIndexIter iter.Init(c.e.evalCtx, c.e.f, c.e.mem, &c.im, scanPrivate, explicitFilters, rejectInvertedIndexes) iter.ForEach(func(index cat.Index, filters memo.FiltersExpr, indexCols opt.ColSet, isCovering bool, constProj memo.ProjectionsExpr) { - // We only consider the partition values when a particular index can otherwise - // not be constrained. For indexes that are constrained, the partitioned values - // add no benefit as they don't really constrain anything. - // Furthermore, if the filters don't take advantage of the index (use any of the - // index columns), using the partition values add no benefit. - // - // If the index is partitioned (by list), we generate two constraints and - // union them: the "main" constraint and the "in-between" constraint.The - // "main" constraint restricts the index to the known partition ranges. The - // "in-between" constraint restricts the index to the rest of the ranges - // (i.e. everything that falls in-between the main ranges); the in-between - // constraint is necessary for correctness (there can be rows outside of the - // partitioned ranges). - // - // For both constraints, the partition-related filters are passed as - // "optional" which guarantees that they return no remaining filters. This - // allows us to merge the remaining filters from both constraints. - // - // Consider the following index and its partition: - // - // CREATE INDEX orders_by_seq_num - // ON orders (region, seq_num, id) - // STORING (total) - // PARTITION BY LIST (region) - // ( - // PARTITION us_east1 VALUES IN ('us-east1'), - // PARTITION us_west1 VALUES IN ('us-west1'), - // PARTITION europe_west2 VALUES IN ('europe-west2') - // ) - // - // The constraint generated for the query: - // SELECT sum(total) FROM orders WHERE seq_num >= 100 AND seq_num < 200 - // is: - // [/'europe-west2'/100 - /'europe-west2'/199] - // [/'us-east1'/100 - /'us-east1'/199] - // [/'us-west1'/100 - /'us-west1'/199] - // - // The spans before europe-west2, after us-west1 and in between the defined - // partitions are missing. We must add these spans now, appropriately - // constrained using the filters. - // - // It is important that we add these spans after the partition spans are generated - // because otherwise these spans would merge with the partition spans and would - // disallow the partition spans (and the in between ones) to be constrained further. - // Using the partitioning example and the query above, if we added the in between - // spans at the same time as the partitioned ones, we would end up with a span that - // looked like: - // [ - /'europe-west2'/99] - // - // Allowing the partition spans to be constrained further and then adding - // the spans give us a more constrained index scan as shown below: - // [ - /'europe-west2') - // [/'europe-west2'/100 - /'europe-west2'/199] - // [/e'europe-west2\x00'/100 - /'us-east1') - // [/'us-east1'/100 - /'us-east1'/199] - // [/e'us-east1\x00'/100 - /'us-west1') - // [/'us-west1'/100 - /'us-west1'/199] - // [/e'us-west1\x00'/100 - ] - // - // Notice how we 'skip' all the europe-west2 rows with seq_num < 100. - // - var partitionFilters, inBetweenFilters memo.FiltersExpr - indexColumns := tabMeta.IndexKeyColumns(index.Ordinal()) - firstIndexCol := scanPrivate.Table.IndexColumnID(index, 0) - if !filterColumns.Contains(firstIndexCol) && indexColumns.Intersects(filterColumns) { - // Calculate any partition filters if appropriate (see below). - partitionFilters, inBetweenFilters = c.partitionValuesFilters(scanPrivate.Table, index) - } + // A structure describing which index partitions are local to the gateway region + prefixSorter, _ := tabMeta.IndexPartitionLocality(scanPrivate.Index, index, c.e.evalCtx) - // Check whether the filter (along with any partitioning filters) can constrain the index. - constraint, remainingFilters, ok := c.tryConstrainIndex( - filters, - append(optionalFilters, partitionFilters...), - scanPrivate.Table, - index.Ordinal(), - ) - if !ok { + // Build Constraints to scan a subset of the table Spans. + if partitionFilters, remainingFilters, combinedConstraint, ok = + c.MakeCombinedFiltersConstraint( + tabMeta, index, scanPrivate, prefixSorter, + filters, optionalFilters, filterColumns, + ); !ok { return } - if len(partitionFilters) > 0 { - inBetweenConstraint, inBetweenRemainingFilters, ok := c.tryConstrainIndex( - filters, - append(optionalFilters, inBetweenFilters...), - scanPrivate.Table, - index.Ordinal(), - ) - if !ok { - panic(errors.AssertionFailedf("in-between filters didn't yield a constraint")) - } - - constraint.UnionWith(c.e.evalCtx, inBetweenConstraint) - - // Even though the partitioned constraints and the inBetween constraints - // were consolidated, we must make sure their Union is as well. - constraint.ConsolidateSpans(c.e.evalCtx) - - // Add all remaining filters that need to be present in the - // inBetween spans. Some of the remaining filters are common - // between them, so we must deduplicate them. - remainingFilters = c.ConcatFilters(remainingFilters, inBetweenRemainingFilters) - remainingFilters.Sort() - remainingFilters.Deduplicate() - } - // Construct new constrained ScanPrivate. newScanPrivate := *scanPrivate newScanPrivate.Index = index.Ordinal() newScanPrivate.Cols = indexCols.Intersection(scanPrivate.Cols) - newScanPrivate.SetConstraint(c.e.evalCtx, constraint) + newScanPrivate.SetConstraint(c.e.evalCtx, combinedConstraint) // Record whether we were able to use partitions to constrain the scan. newScanPrivate.PartitionConstrainedScan = (len(partitionFilters) > 0) diff --git a/pkg/sql/opt/xform/testdata/rules/scan b/pkg/sql/opt/xform/testdata/rules/scan index acf0a69390ce..2532a280d177 100644 --- a/pkg/sql/opt/xform/testdata/rules/scan +++ b/pkg/sql/opt/xform/testdata/rules/scan @@ -790,7 +790,8 @@ distribute │ ├── columns: a:3!null │ ├── constraint: /1/2/5 │ │ ├── [/'central'/1 - /'central'/3] - │ │ ├── [/'east'/1 - /'east'/3] + │ │ ├── [/'east'/1 - /'east'/2] + │ │ ├── [/'east'/3 - /'east'/3] │ │ └── [/'west'/1 - /'west'/3] │ └── key: (3) └── filters @@ -822,30 +823,37 @@ distribute └── fd: ()-->(1,3,4) # The scan is limited. -opt locality=(region=east) expect-not=GenerateLocalityOptimizedScan +opt locality=(region=east) expect=GenerateLocalityOptimizedScan SELECT a FROM abc_part WHERE d = 1 LIMIT 1 ---- -distribute +project ├── columns: a:3!null ├── cardinality: [0 - 1] ├── key: () ├── fd: ()-->(3) ├── distribution: east - ├── input distribution: central,east,west - └── project - ├── columns: a:3!null + └── locality-optimized-search + ├── columns: a:3!null d:6!null + ├── left columns: a:11 d:14 + ├── right columns: a:19 d:22 ├── cardinality: [0 - 1] ├── key: () - ├── fd: ()-->(3) + ├── fd: ()-->(3,6) + ├── distribution: east + ├── scan abc_part@d_idx + │ ├── columns: a:11!null d:14!null + │ ├── constraint: /9/14/11: [/'east'/1 - /'east'/1] + │ ├── limit: 1 + │ ├── key: () + │ └── fd: ()-->(11,14) └── scan abc_part@d_idx - ├── columns: a:3!null d:6!null - ├── constraint: /1/6/3 + ├── columns: a:19!null d:22!null + ├── constraint: /17/22/19 │ ├── [/'central'/1 - /'central'/1] - │ ├── [/'east'/1 - /'east'/1] │ └── [/'west'/1 - /'west'/1] ├── limit: 1 ├── key: () - └── fd: ()-->(3,6) + └── fd: ()-->(19,22) # The scan is limited, but b is known to be a key, so the limit is discarded. opt locality=(region=east) expect=GenerateLocalityOptimizedScan @@ -879,3 +887,433 @@ project ├── cardinality: [0 - 1] ├── key: () └── fd: ()-->(19,20) + +############################################## +# Locality optimized scans with LIMIT clause # +############################################## +# LIMIT clause enables locality optimized scan. +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT a FROM abc_part LIMIT 1 +---- +locality-optimized-search + ├── columns: a:3!null + ├── left columns: a:11 + ├── right columns: a:19 + ├── cardinality: [0 - 1] + ├── key: () + ├── fd: ()-->(3) + ├── distribution: east + ├── scan abc_part@b_idx + │ ├── columns: a:11!null + │ ├── constraint: /9/12: [/'east' - /'east'] + │ ├── limit: 1 + │ ├── key: () + │ └── fd: ()-->(11) + └── scan abc_part@b_idx + ├── columns: a:19!null + ├── constraint: /17/20 + │ ├── [/'central' - /'central'] + │ └── [/'west' - /'west'] + ├── limit: 1 + ├── key: () + └── fd: ()-->(19) + +# LIMIT and OFFSET clause enables locality optimized scan. +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT a FROM abc_part LIMIT 1 OFFSET 2 +---- +offset + ├── columns: a:3!null + ├── cardinality: [0 - 1] + ├── key: (3) + ├── distribution: east + ├── locality-optimized-search + │ ├── columns: a:3!null + │ ├── left columns: a:11 + │ ├── right columns: a:19 + │ ├── cardinality: [0 - 3] + │ ├── key: (3) + │ ├── distribution: east + │ ├── scan abc_part@b_idx + │ │ ├── columns: a:11!null + │ │ ├── constraint: /9/12: [/'east' - /'east'] + │ │ ├── limit: 3 + │ │ └── key: (11) + │ └── scan abc_part@b_idx + │ ├── columns: a:19!null + │ ├── constraint: /17/20 + │ │ ├── [/'central' - /'central'] + │ │ └── [/'west' - /'west'] + │ ├── limit: 3 + │ └── key: (19) + └── 2 + +# LIMIT in IN subquery enables locality optimized scan. +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT * FROM abc_part WHERE a IN (SELECT a FROM abc_part LIMIT 3) +---- +project + ├── columns: r:1!null t:2!null a:3!null b:4 c:5 d:6 + ├── cardinality: [0 - 3] + ├── key: (3) + ├── fd: (3)-->(1,2,4-6), (4)~~>(1-3,5,6), (5)~~>(1-4,6) + ├── distribution: east + └── inner-join (lookup abc_part) + ├── columns: r:1!null t:2!null a:3!null b:4 c:5 d:6 a:11!null + ├── key columns: [11] = [3] + ├── lookup columns are key + ├── cardinality: [0 - 3] + ├── key: (11) + ├── fd: (3)-->(1,2,4-6), (4)~~>(1-3,5,6), (5)~~>(1-4,6), (3)==(11), (11)==(3) + ├── distribution: east + ├── locality-optimized-search + │ ├── columns: a:11!null + │ ├── left columns: a:19 + │ ├── right columns: a:27 + │ ├── cardinality: [0 - 3] + │ ├── key: (11) + │ ├── distribution: east + │ ├── scan abc_part@b_idx + │ │ ├── columns: a:19!null + │ │ ├── constraint: /17/20: [/'east' - /'east'] + │ │ ├── limit: 3 + │ │ └── key: (19) + │ └── scan abc_part@b_idx + │ ├── columns: a:27!null + │ ├── constraint: /25/28 + │ │ ├── [/'central' - /'central'] + │ │ └── [/'west' - /'west'] + │ ├── limit: 3 + │ └── key: (27) + └── filters (true) + +# Correlated semijoin with LIMIT in outer query block should not enable +# locality optimized scan. +opt locality=(region=east) expect-not=GenerateLocalityOptimizedScan +SELECT * FROM abc_part a WHERE EXISTS (SELECT 1 FROM abc_part b WHERE a.a=b.a) +---- +scan abc_part [as=a] + ├── columns: r:1!null t:2!null a:3!null b:4 c:5 d:6 + ├── check constraint expressions + │ ├── a.r:1 IN ('central', 'east', 'west') [outer=(1), constraints=(/1: [/'central' - /'central'] [/'east' - /'east'] [/'west' - /'west']; tight)] + │ └── a.t:2 IN (1, 2, 3) [outer=(2), constraints=(/2: [/1 - /1] [/2 - /2] [/3 - /3]; tight)] + ├── key: (3) + ├── fd: (3)-->(1,2,4-6), (4)~~>(1-3,5,6), (5)~~>(1-4,6) + └── distribution: east + +# Uncorrelated semijoin with LIMIT can't enable locality optimized scan in the +# outer query block yet. +# TODO(msirek) Push the LIMIT into the outer table scan via issue #75301. +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT * FROM abc_part a WHERE EXISTS (SELECT 1 FROM abc_part) +---- +distribute + ├── columns: r:1!null t:2!null a:3!null b:4 c:5 d:6 + ├── key: (3) + ├── fd: (3)-->(1,2,4-6), (4)~~>(1-3,5,6), (5)~~>(1-4,6) + ├── distribution: east + ├── input distribution: east,west + └── index-join abc_part + ├── columns: a.r:1!null a.t:2!null a.a:3!null a.b:4 a.c:5 a.d:6 + ├── key: (3) + ├── fd: (3)-->(1,2,4-6), (4)~~>(1-3,5,6), (5)~~>(1-4,6) + └── select + ├── columns: a.r:1!null a.t:2!null a.a:3!null a.c:5 + ├── key: (3) + ├── fd: (3)-->(1,2,5), (5)~~>(1-3) + ├── scan abc_part@c_idx [as=a] + │ ├── columns: a.r:1!null a.t:2!null a.a:3!null a.c:5 + │ ├── constraint: /1/2/5 + │ │ ├── [/'central'/1 - /'central'/3] + │ │ ├── [/'east'/1 - /'east'/2] + │ │ ├── [/'east'/3 - /'east'/3] + │ │ └── [/'west'/1 - /'west'/3] + │ ├── key: (3) + │ └── fd: (3)-->(1,2,5), (5)~~>(1-3) + └── filters + └── exists [subquery] + └── locality-optimized-search + ├── cardinality: [0 - 1] + ├── key: () + ├── scan abc_part@b_idx + │ ├── constraint: /18/21: [/'east' - /'east'] + │ ├── limit: 1 + │ └── key: () + └── scan abc_part@b_idx + ├── constraint: /26/29 + │ ├── [/'central' - /'central'] + │ └── [/'west' - /'west'] + ├── limit: 1 + └── key: () + +# Partitioning without CHECK constraints +exec-ddl +CREATE TABLE abc_part_no_check ( + r STRING NOT NULL, + t INT NOT NULL, + a INT PRIMARY KEY, + b INT, + c INT, + d INT, + UNIQUE WITHOUT INDEX (b), + UNIQUE WITHOUT INDEX (c), + UNIQUE INDEX c_idx (r, t, c) PARTITION BY LIST (r, t) ( + PARTITION east VALUES IN (('east', 1), ('east', 3)), + PARTITION west VALUES IN (('west', DEFAULT)), + PARTITION default VALUES IN (DEFAULT) + ) +) +---- + +exec-ddl +ALTER PARTITION "east" OF INDEX abc_part_no_check@c_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=east: 2}', + lease_preferences = '[[+region=east]]' +---- + +exec-ddl +ALTER PARTITION "west" OF INDEX abc_part_no_check@c_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=west: 2}', + lease_preferences = '[[+region=west]]' +---- + +exec-ddl +ALTER PARTITION "default" OF INDEX abc_part_no_check@c_idx CONFIGURE ZONE USING + num_voters = 5, + lease_preferences = '[[+region=central]]'; +---- + +# Queries on most partitioned tables without CHECK constraints won't pick +# locality optimized search until issue #75887 is implemented. +opt locality=(region=east) expect-not=GenerateLocalityOptimizedScan +SELECT a FROM abc_part_no_check@c_idx LIMIT 3 +---- +distribute + ├── columns: a:3!null + ├── cardinality: [0 - 3] + ├── key: (3) + ├── distribution: east + ├── input distribution: central,east,west + └── scan abc_part_no_check@c_idx + ├── columns: a:3!null + ├── limit: 3 + ├── flags: force-index=c_idx + └── key: (3) + +# Simpler partitioned table without CHECK constraints +exec-ddl +CREATE TABLE abc_part_no_check2 ( + r STRING NOT NULL, + t INT NOT NULL, + a INT PRIMARY KEY, + b INT, + c INT, + d INT, + UNIQUE WITHOUT INDEX (b), + UNIQUE WITHOUT INDEX (c), + UNIQUE INDEX b_idx (r, b) PARTITION BY LIST (r) ( + PARTITION east VALUES IN (('east')), + PARTITION west VALUES IN (('west')), + PARTITION central VALUES IN (('central')) + ) +) +---- + +exec-ddl +ALTER PARTITION "east" OF INDEX abc_part@b_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=east: 2}', + lease_preferences = '[[+region=east]]' +---- + +exec-ddl +ALTER PARTITION "west" OF INDEX abc_part@b_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=west: 2}', + lease_preferences = '[[+region=west]]'; +---- + +exec-ddl +ALTER PARTITION "central" OF INDEX abc_part@b_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=central: 2}', + lease_preferences = '[[+region=central]]'; +---- + +# Queries on most partitioned tables without CHECK constraints won't pick +# locality optimized search until issue #75887 is implemented. +opt locality=(region=east) expect-not=GenerateLocalityOptimizedScan +SELECT a FROM abc_part_no_check2@b_idx LIMIT 5 +---- +scan abc_part_no_check2@b_idx + ├── columns: a:3!null + ├── limit: 5 + ├── flags: force-index=b_idx + ├── key: (3) + └── distribution: east + +# Mimic a REGIONAL BY ROW table +exec-ddl +CREATE TABLE def_part ( + r STRING NOT NULL CHECK (r IN ('east', 'west', 'central')), + d INT NOT NULL, + e INT, + f INT, + PRIMARY KEY (r, d), + UNIQUE WITHOUT INDEX (d), + UNIQUE WITHOUT INDEX (e), + UNIQUE INDEX e_idx (r, e) PARTITION BY LIST (r) ( + PARTITION east VALUES IN (('east')), + PARTITION west VALUES IN (('west')), + PARTITION central VALUES IN (('central')) + ), + INDEX f_idx (r, f) PARTITION BY LIST (r) ( + PARTITION east VALUES IN (('east')), + PARTITION west VALUES IN (('west')), + PARTITION central VALUES IN (('central')) + ) +) PARTITION BY LIST (r) ( + PARTITION east VALUES IN (('east')), + PARTITION west VALUES IN (('west')), + PARTITION central VALUES IN (('central')) +) +---- + +exec-ddl +ALTER PARTITION "east" OF INDEX def_part@def_part_pkey CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=east: 2}', + lease_preferences = '[[+region=east]]' +---- + +exec-ddl +ALTER PARTITION "west" OF INDEX def_part@def_part_pkey CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=west: 2}', + lease_preferences = '[[+region=west]]'; +---- + +exec-ddl +ALTER PARTITION "central" OF INDEX def_part@def_part_pkey CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=central: 2}', + lease_preferences = '[[+region=central]]'; +---- + +exec-ddl +ALTER PARTITION "east" OF INDEX def_part@e_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=east: 2}', + lease_preferences = '[[+region=east]]' +---- + +exec-ddl +ALTER PARTITION "west" OF INDEX def_part@e_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=west: 2}', + lease_preferences = '[[+region=west]]'; +---- + +exec-ddl +ALTER PARTITION "central" OF INDEX def_part@e_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=central: 2}', + lease_preferences = '[[+region=central]]'; +---- + +exec-ddl +ALTER PARTITION "east" OF INDEX def_part@f_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=east: 2}', + lease_preferences = '[[+region=east]]' +---- + +exec-ddl +ALTER PARTITION "west" OF INDEX def_part@f_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=west: 2}', + lease_preferences = '[[+region=west]]'; +---- + +exec-ddl +ALTER PARTITION "central" OF INDEX def_part@f_idx CONFIGURE ZONE USING + num_voters = 5, + voter_constraints = '{+region=central: 2}', + lease_preferences = '[[+region=central]]'; +---- + +# Selecting from each of the indexes and the pkey should use locality optimized +# search +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT e FROM def_part@e_idx LIMIT 3 +---- +locality-optimized-search + ├── columns: e:3 + ├── left columns: e:9 + ├── right columns: e:15 + ├── cardinality: [0 - 3] + ├── lax-key: (3) + ├── distribution: east + ├── scan def_part@e_idx + │ ├── columns: e:9 + │ ├── constraint: /7/9: [/'east' - /'east'] + │ ├── limit: 3 + │ ├── flags: force-index=e_idx + │ └── lax-key: (9) + └── scan def_part@e_idx + ├── columns: e:15 + ├── constraint: /13/15 + │ ├── [/'central' - /'central'] + │ └── [/'west' - /'west'] + ├── limit: 3 + ├── flags: force-index=e_idx + └── lax-key: (15) + +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT f FROM def_part@f_idx LIMIT 3 +---- +locality-optimized-search + ├── columns: f:4 + ├── left columns: f:10 + ├── right columns: f:16 + ├── cardinality: [0 - 3] + ├── distribution: east + ├── scan def_part@f_idx + │ ├── columns: f:10 + │ ├── constraint: /7/10/8: [/'east' - /'east'] + │ ├── limit: 3 + │ └── flags: force-index=f_idx + └── scan def_part@f_idx + ├── columns: f:16 + ├── constraint: /13/16/14 + │ ├── [/'central' - /'central'] + │ └── [/'west' - /'west'] + ├── limit: 3 + └── flags: force-index=f_idx + +opt locality=(region=east) expect=GenerateLocalityOptimizedScan +SELECT d FROM def_part@def_part_pkey LIMIT 3 +---- +locality-optimized-search + ├── columns: d:2!null + ├── left columns: d:8 + ├── right columns: d:14 + ├── cardinality: [0 - 3] + ├── key: (2) + ├── distribution: east + ├── scan def_part + │ ├── columns: d:8!null + │ ├── constraint: /7/8: [/'east' - /'east'] + │ ├── limit: 3 + │ ├── flags: force-index=def_part_pkey + │ └── key: (8) + └── scan def_part + ├── columns: d:14!null + ├── constraint: /13/14 + │ ├── [/'central' - /'central'] + │ └── [/'west' - /'west'] + ├── limit: 3 + ├── flags: force-index=def_part_pkey + └── key: (14) diff --git a/pkg/sql/sem/tree/eval.go b/pkg/sql/sem/tree/eval.go index 6ddd0052fcfa..00d51ab20fcb 100644 --- a/pkg/sql/sem/tree/eval.go +++ b/pkg/sql/sem/tree/eval.go @@ -3791,6 +3791,14 @@ func (ctx *EvalContext) HasPlaceholders() bool { return ctx.Placeholders != nil } +const regionKey = "region" + +// GetLocalRegion returns the region name of the local processor +// on which we're executing. +func (ctx *EvalContext) GetLocalRegion() (regionName string, ok bool) { + return ctx.Locality.Find(regionKey) +} + // TimestampToDecimal converts the logical timestamp into a decimal // value with the number of nanoseconds in the integer part and the // logical counter in the decimal part.