Skip to content

Commit

Permalink
opt: support locality optimized anti join
Browse files Browse the repository at this point in the history
This commit adds a new transformation rule, GenerateLocalityOptimizedAntiJoin.
GenerateLocalityOptimizedAntiJoin converts an anti join into a locality
optimized anti join if possible. A locality optimized anti join is implemented
as a nested pair of anti lookup joins and is designed to avoid communicating
with remote nodes (relative to the gateway region) if at all possible.

A locality optimized anti join can be planned under the following conditions:
 - The anti join can be planned as a lookup join.
 - The lookup join scans multiple spans in the lookup index for each input
   row, with some spans targeting partitions on local nodes (relative to the
   gateway region), and some targeting partitions on remote nodes. It is not
   known which span(s) will contain the matching row(s).

The result of GenerateLocalityOptimizedAntiJoin will be a nested pair of anti
lookup joins in which the first lookup join is an anti join targeting the
local values from the original join, and the second lookup join is an anti
join targeting the remote values. Because of the way anti join is defined, a
row will only be returned by the first anti join if a match is *not* found
locally. If a match is found, no row will be returned and therefore the second
lookup join will not need to search the remote nodes. This nested pair of anti
joins is logically equivalent to the original, single anti join.

This is a useful optimization if there is locality of access in the workload,
such that rows tend to be accessed from the region where they are located. If
there is no locality of access, using a locality optimized anti join could be
a slight pessimization, since rows residing in remote regions will be found
slightly more slowly than they would be otherwise.

For example, suppose we have a multi-region database with regions 'us-east1',
'us-west1' and 'europe-west1', and we have the following tables and query,
issued from 'us-east1':

  CREATE TABLE parent (
    p_id INT PRIMARY KEY
  ) LOCALITY REGIONAL BY ROW;

  CREATE TABLE child (
    c_id INT PRIMARY KEY,
    c_p_id INT REFERENCES parent (p_id)
  ) LOCALITY REGIONAL BY ROW;

  SELECT * FROM child WHERE NOT EXISTS (
    SELECT * FROM parent WHERE p_id = c_p_id
  ) AND c_id = 10;

Normally, this would produce the following plan:

  anti-join (lookup parent)
   ├── lookup columns are key
   ├── lookup expr: (p_id = c_p_id) AND (crdb_region IN ('europe-west1', 'us-east1', 'us-west1'))
   ├── scan child
   │    └── constraint: /7/5
   │         ├── [/'europe-west1'/10 - /'europe-west1'/10]
   │         ├── [/'us-east1'/10 - /'us-east1'/10]
   │         └── [/'us-west1'/10 - /'us-west1'/10]
   └── filters (true)

but if the session setting locality_optimized_partitioned_index_scan is enabled,
the optimizer will produce this plan, using locality optimized search, both for
the scan of child and for the lookup join with parent. See the rule
GenerateLocalityOptimizedScan for details about how the optimization is applied
for scans.

  anti-join (lookup parent)
   ├── lookup columns are key
   ├── lookup expr: (p_id = c_p_id) AND (crdb_region IN ('europe-west1', 'us-west1'))
   ├── anti-join (lookup parent)
   │    ├── lookup columns are key
   │    ├── lookup expr: (p_id = c_p_id) AND (crdb_region = 'us-east1')
   │    ├── locality-optimized-search
   │    │    ├── scan child
   │    │    │    └── constraint: /13/11: [/'us-east1'/10 - /'us-east1'/10]
   │    │    └── scan child
   │    │         └── constraint: /18/16
   │    │              ├── [/'europe-west1'/10 - /'europe-west1'/10]
   │    │              └── [/'us-west1'/10 - /'us-west1'/10]
   │    └── filters (true)
   └── filters (true)

As long as child.c_id = 10 and the matching row in parent are both located in
'us-east1', the second plan will be much faster. But if they are located in
one of the other regions, the first plan would be slightly faster.

Informs #55185

Release note (performance improvement): The optimizer will now try
to plan anti lookup joins using "locality optimized search". This optimization
applies for anti lookup joins into REGIONAL BY ROW tables (i.e., the right side
of the join is a REGIONAL BY ROW table), and if enabled, it means that the
execution engine will first search locally for matching rows before searching
remote nodes. If a matching row is found in a local node, remote nodes will not
be searched. This optimization may improve the performance of foreign key
checks when rows are inserted or updated in a table that references a foreign
key in a REGIONAL BY ROW table.
  • Loading branch information
rytaft committed Apr 2, 2021
1 parent f27a41b commit 6862953
Show file tree
Hide file tree
Showing 14 changed files with 1,211 additions and 0 deletions.
275 changes: 275 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/regional_by_row
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,281 @@ vectorized: true
table: regional_by_row_table@primary
spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1] [/'us-east-1'/1 - /'us-east-1'/1]

# Tests using locality optimized search for lookup anti joins (including foreign
# key checks).
statement ok
CREATE TABLE parent (
p_id INT PRIMARY KEY,
FAMILY (p_id)
) LOCALITY REGIONAL BY ROW;

statement ok
CREATE TABLE child (
c_id INT PRIMARY KEY,
c_p_id INT REFERENCES parent (p_id),
INDEX (c_p_id),
FAMILY (c_id, c_p_id)
) LOCALITY REGIONAL BY ROW;

statement ok
INSERT INTO parent (crdb_region, p_id)
VALUES ('ap-southeast-2', 10), ('ca-central-1', 20), ('us-east-1', 30)

statement ok
INSERT INTO child (crdb_region, c_id, c_p_id)
VALUES ('ap-southeast-2', 10, 10), ('ca-central-1', 20, 20), ('us-east-1', 30, 30)

statement ok
SET locality_optimized_partitioned_index_scan = false

# Query with locality optimized search disabled.
query T
EXPLAIN SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 10
----
distribution: full
vectorized: true
·
• lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (p_id = c_p_id) AND (crdb_region IN ('ap-southeast-2', 'ca-central-1', 'us-east-1'))
└── • scan
missing stats
table: child@primary
spans: [/'ap-southeast-2'/10 - /'ap-southeast-2'/10] [/'ca-central-1'/10 - /'ca-central-1'/10] [/'us-east-1'/10 - /'us-east-1'/10]

statement ok
SET tracing = on,kv,results; SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 10; SET tracing = off

# All regions are scanned without the optimization.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/75/1/"@"/10{-/#}, /Table/75/1/"\x80"/10{-/#}, /Table/75/1/"\xc0"/10{-/#}
fetched: /child/primary/'ap-southeast-2'/10/c_p_id -> /10
Scan /Table/74/1/"@"/10{-/#}, /Table/74/1/"\x80"/10{-/#}, /Table/74/1/"\xc0"/10{-/#}
fetched: /parent/primary/'ap-southeast-2'/10 -> NULL

statement ok
SET locality_optimized_partitioned_index_scan = true

# Same query with locality optimized search enabled.
query T
EXPLAIN (DISTSQL) SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 10
----
distribution: local
vectorized: true
·
• lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (p_id = c_p_id) AND (crdb_region IN ('ca-central-1', 'us-east-1'))
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (p_id = c_p_id) AND (crdb_region = 'ap-southeast-2')
└── • union all
│ limit: 1
├── • scan
│ missing stats
│ table: child@primary
│ spans: [/'ap-southeast-2'/10 - /'ap-southeast-2'/10]
└── • scan
missing stats
table: child@primary
spans: [/'ca-central-1'/10 - /'ca-central-1'/10] [/'us-east-1'/10 - /'us-east-1'/10]
·
Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJy0k1Fr2zAQx9_3KY7bQ5KhEtlxoQgKLqvLXDynSwwrLKZ41tF6SyRPliEl5LuP2GGNwxLSjr35Tve3fv_T3QqrX3MUGNzfRVdhDP3rcJpMv0QDmAZR8DGBD3AzGX-G_KmYS_j6KZgE0I_HCQT3m0Lod8vKzJCy27ryoZBwCfnD5mMwgKv4Gvp5m3T4ABkqLSnOFlSh-IYOpgxLo3OqKm02qVVTEMolCs6wUGVtN-mUYa4NoVihLeycUGCSfZ_ThDJJZsiRoSSbFfPmtw25X5pikZlnZDgtM1UJGM5wNlte8BkOHX7WiYbvIVMSHND2iQwyHNdWgO8w38V0zVDX9oWkstkjoXDW7G20zqm0_h9Sf0t5Gpl7kOwFqFbaSDIkOzDp-i_ssT7T5dDtUkfForDgHGTgr-nOrS7Utjmj7jXJc0kCouAmgas4CeF2HMbIsB26naZFWv-sS_ihCwVaCej7I7gE392OoO_BJSx7Hu8JIXyHc37uDU5r5uiNRrz_ZWSzssvexa4VBste3vF2ojnvNeYmVJVaVbQ3MYfeP2VI8pHasat0bXK6MzpvrmnDcaNrEpIq2566bRCq5qhZsl2x8y9i96h41BHzffHoqNg7LvaOis_3xOn63e8AAAD__2Pcwcc=

statement ok
SET tracing = on,kv,results; SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 10; SET tracing = off

# If the row is found in the local region, the other regions are not searched.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/75/1/"@"/10{-/#}
fetched: /child/primary/'ap-southeast-2'/10/c_p_id -> /10
Scan /Table/74/1/"@"/10{-/#}
fetched: /parent/primary/'ap-southeast-2'/10 -> NULL

statement ok
SET tracing = on,kv,results; SELECT * FROM child WHERE NOT EXISTS (SELECT * FROM parent WHERE p_id = c_p_id) AND c_id = 20; SET tracing = off

# If the row is not found in the local region, the other regions are searched in
# parallel.
query T
SELECT message FROM [SHOW KV TRACE FOR SESSION] WITH ORDINALITY
WHERE message LIKE 'fetched:%' OR message LIKE 'output row%'
OR message LIKE 'Scan%'
ORDER BY ordinality ASC
----
Scan /Table/75/1/"@"/20{-/#}
Scan /Table/75/1/"\x80"/20{-/#}, /Table/75/1/"\xc0"/20{-/#}
fetched: /child/primary/'ca-central-1'/20/c_p_id -> /20
Scan /Table/74/1/"@"/20{-/#}
Scan /Table/74/1/"\x80"/20{-/#}, /Table/74/1/"\xc0"/20{-/#}
fetched: /parent/primary/'ca-central-1'/20 -> NULL

query T
EXPLAIN INSERT INTO child VALUES (1, 1)
----
distribution: local
vectorized: true
·
• root
├── • insert
│ │ into: child(c_id, c_p_id, crdb_region)
│ │
│ └── • buffer
│ │ label: buffer 1
│ │
│ └── • values
│ size: 4 columns, 1 row
├── • constraint-check
│ │
│ └── • error if rows
│ │
│ └── • lookup join (semi)
│ │ table: child@primary
│ │ equality: (lookup_join_const_col_@12, column1) = (crdb_region,c_id)
│ │ equality cols are key
│ │ pred: column8 != crdb_region
│ │
│ └── • cross join
│ │ estimated row count: 3
│ │
│ ├── • values
│ │ size: 1 column, 3 rows
│ │
│ └── • scan buffer
│ label: buffer 1
└── • constraint-check
└── • error if rows
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (column2 = p_id) AND (crdb_region IN ('ca-central-1', 'us-east-1'))
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (column2 = p_id) AND (crdb_region = 'ap-southeast-2')
└── • scan buffer
label: buffer 1

query T
EXPLAIN UPSERT INTO child VALUES (1, 1)
----
distribution: local
vectorized: true
·
• root
├── • upsert
│ │ into: child(c_id, c_p_id, crdb_region)
│ │ arbiter constraints: primary
│ │
│ └── • buffer
│ │ label: buffer 1
│ │
│ └── • render
│ │
│ └── • cross join (left outer)
│ │
│ ├── • values
│ │ size: 3 columns, 1 row
│ │
│ └── • union all
│ │ limit: 1
│ │
│ ├── • scan
│ │ missing stats
│ │ table: child@primary
│ │ spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1]
│ │
│ └── • scan
│ missing stats
│ table: child@primary
│ spans: [/'ca-central-1'/1 - /'ca-central-1'/1] [/'us-east-1'/1 - /'us-east-1'/1]
└── • constraint-check
└── • error if rows
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (column2 = p_id) AND (crdb_region IN ('ca-central-1', 'us-east-1'))
└── • lookup join (anti)
│ table: parent@primary
│ equality cols are key
│ lookup condition: (column2 = p_id) AND (crdb_region = 'ap-southeast-2')
└── • scan buffer
label: buffer 1

# We don't yet support locality optimized search for semi join.
query T
EXPLAIN DELETE FROM parent WHERE p_id = 1
----
distribution: local
vectorized: true
·
• root
├── • delete
│ │ from: parent
│ │
│ └── • buffer
│ │ label: buffer 1
│ │
│ └── • union all
│ │ limit: 1
│ │
│ ├── • scan
│ │ missing stats
│ │ table: parent@primary
│ │ spans: [/'ap-southeast-2'/1 - /'ap-southeast-2'/1]
│ │
│ └── • scan
│ missing stats
│ table: parent@primary
│ spans: [/'ca-central-1'/1 - /'ca-central-1'/1] [/'us-east-1'/1 - /'us-east-1'/1]
└── • constraint-check
└── • error if rows
└── • lookup join (semi)
│ table: child@child_c_p_id_idx
│ equality: (lookup_join_const_col_@12, p_id) = (crdb_region,c_p_id)
└── • cross join
├── • values
│ size: 1 column, 3 rows
└── • scan buffer
label: buffer 1

# Tests creating a index and a unique constraint on a REGIONAL BY ROW table.
statement ok
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return cannotDistribute, cannotDistributeRowLevelLockingErr
}

if n.localityOptimized {
// This is a locality optimized lookup join.
return cannotDistribute, nil
}
if err := checkExpr(n.lookupExpr); err != nil {
return cannotDistribute, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin(
isSecondJoinInPairedJoiner bool,
reqOrdering exec.OutputOrdering,
locking *tree.LockingItem,
localityOptimized bool,
) (exec.Node, error) {
// TODO (rohany): Implement production of system columns by the underlying scan here.
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: lookup join")
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type lookupJoinNode struct {
isSecondJoinInPairedJoiner bool

reqOrdering ReqOrdering

// localityOptimized is true if this lookup join is part of a locality
// optimized search strategy, which tries to find a row on nodes in the
// gateway's region before fanning out to remote nodes. In order for this
// optimization to work, the DistSQL planner must create a local plan.
localityOptimized bool
}

func (lj *lookupJoinNode) startExec(params runParams) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {
join.IsSecondJoinInPairedJoiner,
res.reqOrdering(join),
locking,
join.LocalityOptimized,
)
if err != nil {
return execPlan{}, err
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ define IndexJoin {
# The node produces the columns in the input and (unless join type is
# LeftSemiJoin or LeftAntiJoin) the lookupCols, ordered by ordinal. The ON
# condition can refer to these using IndexedVars.
#
# If LocalityOptimized is true, we are performing a locality optimized search.
# In order for this to work correctly, the execution engine must create a local
# DistSQL plan for the main query (subqueries and postqueries need not be local).
define LookupJoin {
JoinType descpb.JoinType
Input exec.Node
Expand All @@ -234,6 +238,7 @@ define LookupJoin {
IsSecondJoinInPairedJoiner bool
ReqOrdering exec.OutputOrdering
Locking *tree.LockingItem
LocalityOptimized bool
}

# InvertedJoin performs a lookup join into an inverted index.
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/opt/ops/relational.opt
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,17 @@ define LookupJoinPrivate {
# paired-joiner used for left joins.
IsSecondJoinInPairedJoiner bool

# LocalityOptimized is true if this lookup join is part of a locality
# optimized search strategy, indicating that it either requires all local
# (relative to the gateway region) or all remote lookups. Currently, only
# anti joins can be locality optimized, and they are implemented with two
# nested anti lookup joins in which the first join targets local partitions
# and the second join targets remote partitions. Therefore,
# LocalityOptimized is used as a hint to the coster to reduce the cost of
# this lookup join. It is also used to ensure that the DistSQL planner
# creates a local plan.
LocalityOptimized bool

# ConstFilters contains the constant filters that are represented as equality
# conditions on the KeyCols. These filters are needed by the statistics code to
# correctly estimate selectivity.
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/opt/xform/coster.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ func (c *coster) computeIndexJoinCost(
join.Table,
cat.PrimaryIndex,
memo.JoinFlags(0),
false, /* localityOptimized */
)
}

Expand All @@ -791,6 +792,7 @@ func (c *coster) computeLookupJoinCost(
join.Table,
join.Index,
join.Flags,
join.LocalityOptimized,
)
}

Expand All @@ -803,6 +805,7 @@ func (c *coster) computeIndexLookupJoinCost(
table opt.TableID,
index cat.IndexOrdinal,
flags memo.JoinFlags,
localityOptimized bool,
) memo.Cost {
input := join.Child(0).(memo.RelExpr)
lookupCount := input.Relational().Stats.RowCount
Expand Down Expand Up @@ -872,6 +875,15 @@ func (c *coster) computeIndexLookupJoinCost(
// If we prefer a lookup join, make the cost much smaller.
cost *= preferLookupJoinFactor
}

// If this lookup join is locality optimized, divide the cost by two in order to make
// the total cost of the two lookup joins in the locality optimized plan less then
// the cost of the single lookup join in the non-locality optimized plan.
// TODO(rytaft): This is hacky. We should really be making this determination
// based on the latency between regions.
if localityOptimized {
cost /= 2
}
return cost
}

Expand Down
Loading

0 comments on commit 6862953

Please sign in to comment.