Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: support locality optimized anti join #63044

Merged
merged 3 commits into from
Apr 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 275 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/regional_by_row
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,281 @@ vectorized: true
table: regional_by_row_table@primary
spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1] [/'us-east-1'/1 - /'us-east-1'/1]

# Tests using locality optimized search for lookup anti joins (including foreign
# key checks).
statement ok
CREATE TABLE parent (
p_id INT PRIMARY KEY,
FAMILY (p_id)
) LOCALITY REGIONAL BY ROW;

statement ok
CREATE TABLE child (
c_id INT PRIMARY KEY,
c_p_id INT REFERENCES parent (p_id),
INDEX (c_p_id),
FAMILY (c_id, c_p_id)
) LOCALITY REGIONAL BY ROW;

statement ok
INSERT INTO parent (crdb_region, p_id)
VALUES ('ap-southeast-2', 10), ('ca-central-1', 20), ('us-east-1', 30)

statement ok
INSERT INTO child (crdb_region, c_id, c_p_id)
VALUES ('ap-southeast-2', 10, 10), ('ca-central-1', 20, 20), ('us-east-1', 30, 30)

statement ok
SET locality_optimized_partitioned_index_scan = false

# Query with locality optimized search disabled.
query T
EXPLAIN SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 10
----
distribution: full
vectorized: true
·
• lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (p_id = c_p_id) AND (crdb_region IN ('ap-southeast-2', 'ca-central-1', 'us-east-1'))
└── • scan
missing stats
table: child@primary
spans: [/'ap-southeast-2'/10 - /'ap-southeast-2'/10] [/'ca-central-1'/10 - /'ca-central-1'/10] [/'us-east-1'/10 - /'us-east-1'/10]

statement ok
SET tracing = on,kv,results; SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 10; SET tracing = off

# All regions are scanned without the optimization.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/75/1/"@"/10{-/#}, /Table/75/1/"\x80"/10{-/#}, /Table/75/1/"\xc0"/10{-/#}
fetched: /child/primary/'ap-southeast-2'/10/c_p_id -> /10
Scan /Table/74/1/"@"/10{-/#}, /Table/74/1/"\x80"/10{-/#}, /Table/74/1/"\xc0"/10{-/#}
fetched: /parent/primary/'ap-southeast-2'/10 -> NULL

statement ok
SET locality_optimized_partitioned_index_scan = true

# Same query with locality optimized search enabled.
query T
EXPLAIN (DISTSQL) SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 10
----
distribution: local
vectorized: true
·
• lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (p_id = c_p_id) AND (crdb_region IN ('ca-central-1', 'us-east-1'))
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (p_id = c_p_id) AND (crdb_region = 'ap-southeast-2')
└── • union all
│ limit: 1
├── • scan
│ missing stats
│ table: child@primary
│ spans: [/'ap-southeast-2'/10 - /'ap-southeast-2'/10]
└── • scan
missing stats
table: child@primary
spans: [/'ca-central-1'/10 - /'ca-central-1'/10] [/'us-east-1'/10 - /'us-east-1'/10]
·
Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJy0k1Fr2zAQx9_3KY7bQ5KhEtlxoQgKLqvLXDynSwwrLKZ41tF6SyRPliEl5LuP2GGNwxLSjr35Tve3fv_T3QqrX3MUGNzfRVdhDP3rcJpMv0QDmAZR8DGBD3AzGX-G_KmYS_j6KZgE0I_HCQT3m0Lod8vKzJCy27ryoZBwCfnD5mMwgKv4Gvp5m3T4ABkqLSnOFlSh-IYOpgxLo3OqKm02qVVTEMolCs6wUGVtN-mUYa4NoVihLeycUGCSfZ_ThDJJZsiRoSSbFfPmtw25X5pikZlnZDgtM1UJGM5wNlte8BkOHX7WiYbvIVMSHND2iQwyHNdWgO8w38V0zVDX9oWkstkjoXDW7G20zqm0_h9Sf0t5Gpl7kOwFqFbaSDIkOzDp-i_ssT7T5dDtUkfForDgHGTgr-nOrS7Utjmj7jXJc0kCouAmgas4CeF2HMbIsB26naZFWv-sS_ihCwVaCej7I7gE392OoO_BJSx7Hu8JIXyHc37uDU5r5uiNRrz_ZWSzssvexa4VBste3vF2ojnvNeYmVJVaVbQ3MYfeP2VI8pHasat0bXK6MzpvrmnDcaNrEpIq2566bRCq5qhZsl2x8y9i96h41BHzffHoqNg7LvaOis_3xOn63e8AAAD__2Pcwcc=

statement ok
SET tracing = on,kv,results; SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 10; SET tracing = off

# If the row is found in the local region, the other regions are not searched.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/75/1/"@"/10{-/#}
fetched: /child/primary/'ap-southeast-2'/10/c_p_id -> /10
Scan /Table/74/1/"@"/10{-/#}
fetched: /parent/primary/'ap-southeast-2'/10 -> NULL

statement ok
SET tracing = on,kv,results; SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 20; SET tracing = off

# If the row is not found in the local region, the other regions are searched in
# parallel.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/75/1/"@"/20{-/#}
Scan /Table/75/1/"\x80"/20{-/#}, /Table/75/1/"\xc0"/20{-/#}
fetched: /child/primary/'ca-central-1'/20/c_p_id -> /20
Scan /Table/74/1/"@"/20{-/#}
Scan /Table/74/1/"\x80"/20{-/#}, /Table/74/1/"\xc0"/20{-/#}
fetched: /parent/primary/'ca-central-1'/20 -> NULL

query T
EXPLAIN INSERT INTO child VALUES (1, 1)
----
distribution: local
vectorized: true
·
• root
├── • insert
│ │ into: child(c_id, c_p_id, crdb_region)
│ │
│ └── • buffer
│ │ label: buffer 1
│ │
│ └── • values
│ size: 4 columns, 1 row
├── • constraint-check
│ │
│ └── • error if rows
│ │
│ └── • lookup join (semi)
│ │ table: child@primary
│ │ equality: (lookup_join_const_col_@12, column1) = (crdb_region,c_id)
│ │ equality cols are key
│ │ pred: column8 != crdb_region
│ │
│ └── • cross join
│ │ estimated row count: 3
│ │
│ ├── • values
│ │ size: 1 column, 3 rows
│ │
│ └── • scan buffer
│ label: buffer 1
└── • constraint-check
└── • error if rows
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (column2 = p_id) AND (crdb_region IN ('ca-central-1', 'us-east-1'))
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (column2 = p_id) AND (crdb_region = 'ap-southeast-2')
└── • scan buffer
label: buffer 1

query T
EXPLAIN UPSERT INTO child VALUES (1, 1)
----
distribution: local
vectorized: true
·
• root
├── • upsert
│ │ into: child(c_id, c_p_id, crdb_region)
│ │ arbiter constraints: primary
│ │
│ └── • buffer
│ │ label: buffer 1
│ │
│ └── • render
│ │
│ └── • cross join (left outer)
│ │
│ ├── • values
│ │ size: 3 columns, 1 row
│ │
│ └── • union all
│ │ limit: 1
│ │
│ ├── • scan
│ │ missing stats
│ │ table: child@primary
│ │ spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1]
│ │
│ └── • scan
│ missing stats
│ table: child@primary
│ spans: [/'ca-central-1'/1 - /'ca-central-1'/1] [/'us-east-1'/1 - /'us-east-1'/1]
└── • constraint-check
└── • error if rows
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (column2 = p_id) AND (crdb_region IN ('ca-central-1', 'us-east-1'))
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (column2 = p_id) AND (crdb_region = 'ap-southeast-2')
└── • scan buffer
label: buffer 1

# We don't yet support locality optimized search for semi join.
query T
EXPLAIN DELETE FROM parent WHERE p_id = 1
----
distribution: local
vectorized: true
·
• root
├── • delete
│ │ from: parent
│ │
│ └── • buffer
│ │ label: buffer 1
│ │
│ └── • union all
│ │ limit: 1
│ │
│ ├── • scan
│ │ missing stats
│ │ table: parent@primary
│ │ spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1]
│ │
│ └── • scan
│ missing stats
│ table: parent@primary
│ spans: [/'ca-central-1'/1 - /'ca-central-1'/1] [/'us-east-1'/1 - /'us-east-1'/1]
└── • constraint-check
└── • error if rows
└── • lookup join (semi)
│ table: child@child_c_p_id_idx
│ equality: (lookup_join_const_col_@12, p_id) = (crdb_region,c_p_id)
└── • cross join
├── • values
│ size: 1 column, 3 rows
└── • scan buffer
label: buffer 1

# Tests creating a index and a unique constraint on a REGIONAL BY ROW table.
statement ok
Expand Down
17 changes: 14 additions & 3 deletions pkg/sql/opt/ops/relational.opt
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ define LookupJoinPrivate {
# paired-joiner used for left joins.
IsSecondJoinInPairedJoiner bool

# LocalityOptimized is true if this lookup join is part of a locality
# optimized search strategy, indicating that it either requires all local
# (relative to the gateway region) or all remote lookups. Currently, only
# anti joins can be locality optimized, and they are implemented with two
# nested anti lookup joins in which the first join targets local partitions
# and the second join targets remote partitions. Therefore,
# LocalityOptimized is used as a hint to the coster to reduce the cost of
# this lookup join.
LocalityOptimized bool

# ConstFilters contains the constant filters that are represented as equality
# conditions on the KeyCols. These filters are needed by the statistics code to
# correctly estimate selectivity.
Expand Down Expand Up @@ -765,9 +775,10 @@ define Union {
}

# SetPrivate contains fields used by the relational set operators: Union,
# Intersect, Except, UnionAll, IntersectAll and ExceptAll. It matches columns
# from the left and right inputs of the operator with the output columns, since
# OutputCols are not ordered and may not correspond to each other.
# Intersect, Except, UnionAll, IntersectAll, ExceptAll, and
# LocalityOptimizedSearch. It matches columns from the left and right inputs of
# the operator with the output columns, since OutputCols are not ordered and may
# not correspond to each other.
#
# For example, consider the following query:
# SELECT y, x FROM xy UNION SELECT b, a FROM ab
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/xform/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ go_test(
size = "small",
srcs = [
"coster_test.go",
"general_funcs_test.go",
"join_order_builder_test.go",
"main_test.go",
"optimizer_test.go",
"physical_props_test.go",
"scan_funcs_test.go",
],
data = glob(["testdata/**"]) + [
"@cockroach//c-deps:libgeos",
Expand Down
22 changes: 17 additions & 5 deletions pkg/sql/opt/xform/coster.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (c *coster) computeScanCost(scan *memo.ScanExpr, required *physical.Require
cost := baseCost + memo.Cost(rowCount)*(seqIOCostFactor+perRowCost)

// If this scan is locality optimized, divide the cost in two in order to make
// the total cost of the two scans in the locality optimized plan less then
// the total cost of the two scans in the locality optimized plan less than
// the cost of the single scan in the non-locality optimized plan.
// TODO(rytaft): This is hacky. We should really be making this determination
// based on the latency between regions.
Expand Down Expand Up @@ -773,6 +773,7 @@ func (c *coster) computeIndexJoinCost(
join.Table,
cat.PrimaryIndex,
memo.JoinFlags(0),
false, /* localityOptimized */
)
}

Expand All @@ -791,6 +792,7 @@ func (c *coster) computeLookupJoinCost(
join.Table,
join.Index,
join.Flags,
join.LocalityOptimized,
)
}

Expand All @@ -803,6 +805,7 @@ func (c *coster) computeIndexLookupJoinCost(
table opt.TableID,
index cat.IndexOrdinal,
flags memo.JoinFlags,
localityOptimized bool,
) memo.Cost {
input := join.Child(0).(memo.RelExpr)
lookupCount := input.Relational().Stats.RowCount
Expand Down Expand Up @@ -872,6 +875,15 @@ func (c *coster) computeIndexLookupJoinCost(
// If we prefer a lookup join, make the cost much smaller.
cost *= preferLookupJoinFactor
}

// If this lookup join is locality optimized, divide the cost by two in order to make
// the total cost of the two lookup joins in the locality optimized plan less than
// the cost of the single lookup join in the non-locality optimized plan.
// TODO(rytaft): This is hacky. We should really be making this determination
// based on the latency between regions.
if localityOptimized {
cost /= 2
}
return cost
}

Expand Down Expand Up @@ -1010,10 +1022,10 @@ func (c *coster) computeSetCost(set memo.RelExpr) memo.Cost {
// Add the CPU cost of emitting the rows.
cost := memo.Cost(set.Relational().Stats.RowCount) * cpuCostFactor

// A set operation must process every row from both tables once.
// UnionAll can avoid any extra computation, but all other set operations
// must perform a hash table lookup or update for each input row.
if set.Op() != opt.UnionAllOp {
// A set operation must process every row from both tables once. UnionAll and
// LocalityOptimizedSearch can avoid any extra computation, but all other set
// operations must perform a hash table lookup or update for each input row.
if set.Op() != opt.UnionAllOp && set.Op() != opt.LocalityOptimizedSearchOp {
leftRowCount := set.Child(0).(memo.RelExpr).Relational().Stats.RowCount
rightRowCount := set.Child(1).(memo.RelExpr).Relational().Stats.RowCount
cost += memo.Cost(leftRowCount+rightRowCount) * cpuCostFactor
Expand Down
Loading