Skip to content

Commit

Permalink
opt: ensure validation of unique constraints is efficient
Browse files Browse the repository at this point in the history
This commit adds two new exploration rules: SplitGroupByScanIntoUnionScans
and SplitGroupByFilteredScanIntoUnionScans.

SplitGroupByScanIntoUnionScans splits a non-inverted scan under a GroupBy,
DistinctOn, or EnsureUpsertDistinctOn into a union-all of scans, where each
scan is ordered on the grouping columns. This ordering is then maintained by
the union-all operation and passed on to the grouping operation. Ordering on
the grouping columns is important since it enables the grouping operation to
execute in a streaming fashion, which is more efficient. Example:

   CREATE TABLE tab (
     region STRING NOT NULL CHECK (region IN ('ASIA', 'EUROPE')),
     data INT NOT NULL,
     INDEX (region, data)
   );

   SELECT DISTINCT data
   FROM tab;

   =>

   SELECT DISTINCT data
   FROM (SELECT * FROM tab WHERE region='ASIA')
   UNION ALL (SELECT * FROM tab WHERE region='EUROPE');

This rule does not actually build the streaming grouping operation, but it
allows another rule, GenerateStreamingGroupBy, to fire and use the new
interesting orderings provided by the UnionAll of scans to build a streaming
operation.

SplitGroupByFilteredScanIntoUnionScans is like SplitGroupByScanIntoUnionScans,
but the scan is wrapped in a Select.

These transformations are important for ensuring that validation of the
unique constraint in an implicitly-partitioned unique index is efficient. The
validation query to verify that (a, b) is UNIQUE on table tbl looks like this:

   SELECT a, b
   FROM tbl
   WHERE a IS NOT NULL AND b IS NOT NULL
   GROUP BY a, b
   HAVING count(*) > 1
   LIMIT 1;

Without SplitGroupByFilteredScanIntoUnionScans, this query would require an
inefficient and memory-intensive hash group by operation. Note that the
previous rule, SplitGroupByScanIntoUnionScans, is also needed since it would
apply in cases where a and b are not nullable.

Fixes #56201

Release note (performance improvement): Validation of a new UNIQUE index in a
REGIONAL BY ROW table no longer requires an inefficient and memory-intensive
hash aggregation query. The optimizer can now plan the validation query so
that it uses all streaming operations, which are much more efficient.
  • Loading branch information
rytaft committed May 19, 2021
1 parent 65f685b commit b033f6c
Show file tree
Hide file tree
Showing 7 changed files with 751 additions and 66 deletions.
103 changes: 94 additions & 9 deletions pkg/ccl/logictestccl/testdata/logic_test/regional_by_row
Original file line number Diff line number Diff line change
Expand Up @@ -1070,22 +1070,107 @@ SELECT * FROM [EXPLAIN DELETE FROM parent WHERE p_id = 1] OFFSET 2
└── • error if rows
└── • lookup join (semi)
│ table: child@child_c_p_id_idx
│ equality: (lookup_join_const_col_@12, p_id) = (crdb_region,c_p_id)
└── • merge join
│ equality: (p_id) = (c_p_id)
│ left cols are key
│ right cols are key
└── • cross join
├── • values
│ size: 1 column, 3 rows
├── • scan buffer
│ label: buffer 1
└── • distinct
│ distinct on: c_p_id
│ order key: c_p_id
└── • scan buffer
label: buffer 1
└── • union all
├── • union all
│ │
│ ├── • scan
│ │ missing stats
│ │ table: child@child_c_p_id_idx
│ │ spans: [/'ap-southeast-2' - /'ap-southeast-2']
│ │
│ └── • scan
│ missing stats
│ table: child@child_c_p_id_idx
│ spans: [/'ca-central-1' - /'ca-central-1']
└── • scan
missing stats
table: child@child_c_p_id_idx
spans: [/'us-east-1' - /'us-east-1']

# Tests creating a index and a unique constraint on a REGIONAL BY ROW table.
statement ok
CREATE INDEX new_idx ON regional_by_row_table(a, b)

# The validation query to create the unique constraint should be efficient (see
# #56201).
statement ok
CREATE TABLE t56201 (a INT, b STRING) LOCALITY REGIONAL BY ROW;
ALTER TABLE t56201 ADD CONSTRAINT key_a_b UNIQUE (a, b);

query T
SELECT * FROM [EXPLAIN (VERBOSE) SELECT a, b
FROM t56201
GROUP BY a, b
HAVING count(*) > 1
LIMIT 1] OFFSET 2
----
·
• project
│ columns: (a, b)
│ estimated row count: 1 (missing stats)
└── • limit
│ columns: (a, b, count_rows)
│ estimated row count: 1 (missing stats)
│ count: 1
└── • filter
│ columns: (a, b, count_rows)
│ estimated row count: 330 (missing stats)
│ filter: count_rows > 1
└── • group
│ columns: (a, b, count_rows)
│ estimated row count: 991 (missing stats)
│ aggregate 0: count_rows()
│ group by: a, b
│ ordered: +a,+b
└── • union all
│ columns: (a, b)
│ ordering: +a,+b
│ estimated row count: 30 (missing stats)
├── • union all
│ │ columns: (a, b)
│ │ ordering: +a,+b
│ │ estimated row count: 20 (missing stats)
│ │
│ ├── • scan
│ │ columns: (a, b)
│ │ ordering: +a,+b
│ │ estimated row count: 10 (missing stats)
│ │ table: t56201@key_a_b
│ │ spans: /"@"-/"@"/PrefixEnd
│ │
│ └── • scan
│ columns: (a, b)
│ ordering: +a,+b
│ estimated row count: 10 (missing stats)
│ table: t56201@key_a_b
│ spans: /"\x80"-/"\x80"/PrefixEnd
└── • scan
columns: (a, b)
ordering: +a,+b
estimated row count: 10 (missing stats)
table: t56201@key_a_b
spans: /"\xc0"-/"\xc0"/PrefixEnd

statement ok
ALTER TABLE regional_by_row_table ADD CONSTRAINT unique_b_a UNIQUE(b, a)

Expand Down
64 changes: 61 additions & 3 deletions pkg/sql/opt/xform/groupby_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ func (c *CustomFuncs) MakeProjectFromPassthroughAggs(
}, grp)
}

// GenerateStreamingGroupBy generates variants of a GroupBy or DistinctOn
// expression with more specific orderings on the grouping columns, using the
// interesting orderings property. See the GenerateStreamingGroupBy rule.
// GenerateStreamingGroupBy generates variants of a GroupBy, DistinctOn,
// EnsureDistinctOn, UpsertDistinctOn, or EnsureUpsertDistinctOn expression
// with more specific orderings on the grouping columns, using the interesting
// orderings property. See the GenerateStreamingGroupBy rule.
func (c *CustomFuncs) GenerateStreamingGroupBy(
grp memo.RelExpr,
op opt.Operator,
Expand Down Expand Up @@ -198,3 +199,60 @@ func (c *CustomFuncs) MakeOrderingChoiceFromColumn(
}
return oc
}

// SplitGroupByScanIntoUnionScans splits a non-inverted scan under a GroupBy,
// DistinctOn, or EnsureUpsertDistinctOn into a UnionAll of scans, where each
// scan can provide an ordering on the grouping columns. If no such UnionAll
// can be built, returns ok=false.
//
// This is useful because the GenerateStreamingGroupBy rule can then create a
// streaming grouping operation, which is more efficient.
// GenerateStreamingGroupBy will use the new interesting orderings provided by
// the UnionAll of scans to build the streaming operation.
//
// See the SplitGroupByScanIntoUnionScans rule for more details.
func (c *CustomFuncs) SplitGroupByScanIntoUnionScans(
scan memo.RelExpr, sp *memo.ScanPrivate, private *memo.GroupingPrivate,
) (_ memo.RelExpr, ok bool) {
cons, ok := c.getKnownScanConstraint(sp)
if !ok {
// No valid constraint was found.
return nil, false
}

intraOrd := private.Ordering

// Find the length of the prefix of index columns preceding the first groupby
// ordering column. We will verify later that the entire ordering sequence is
// represented in the index. Ex:
//
// Index: +1/+2/-3, Group By internal ordering +3 opt(4) => Prefix Length: 2
//
keyPrefixLength := cons.Columns.Count()
for i := 0; i < cons.Columns.Count(); i++ {
col := cons.Columns.Get(i).ID()
if private.GroupingCols.Contains(col) || intraOrd.Optional.Contains(col) {
// Grouping or optional column.
keyPrefixLength = i
break
}
if len(intraOrd.Columns) > 0 &&
intraOrd.Columns[0].Group.Contains(col) {
// Column matches the one in the ordering.
keyPrefixLength = i
break
}
}
if keyPrefixLength == 0 {
// This case can be handled by GenerateStreamingGroupBy.
return nil, false
}

// Create a UnionAll of scans that can provide the ordering of the
// GroupingPrivate (if no such UnionAll is possible this will return
// ok=false). We pass a limit of 0 since the scans are unlimited
// (splitScanIntoUnionScans is also used for another rule with limited scans).
return c.SplitScanIntoUnionScans(
intraOrd, scan, sp, cons, 0 /* limit */, keyPrefixLength,
)
}
65 changes: 43 additions & 22 deletions pkg/sql/opt/xform/limit_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,15 @@ func (c *CustomFuncs) ScanIsInverted(sp *memo.ScanPrivate) bool {
return idx.IsInverted()
}

// SplitScanIntoUnionScans returns a UnionAll tree of Scan operators with hard
// limits that each scan over a single key from the original Scan's constraints.
// If no such UnionAll of Scans can be found, ok=false is returned. This is
// beneficial in cases where the original Scan had to scan over many rows but
// had relatively few keys to scan over.
// SplitLimitedScanIntoUnionScans returns a UnionAll tree of Scan operators with
// hard limits that each scan over a single key from the original Scan's
// constraints. If no such UnionAll of Scans can be found, ok=false is returned.
// This is beneficial in cases where the original Scan had to scan over many
// rows but had relatively few keys to scan over.
// TODO(drewk): handle inverted scans.
func (c *CustomFuncs) SplitScanIntoUnionScans(
func (c *CustomFuncs) SplitLimitedScanIntoUnionScans(
limitOrdering props.OrderingChoice, scan memo.RelExpr, sp *memo.ScanPrivate, limit tree.Datum,
) (_ memo.RelExpr, ok bool) {
const maxScanCount = 16
const threshold = 4

cons, ok := c.getKnownScanConstraint(sp)
if !ok {
// No valid constraint was found.
Expand Down Expand Up @@ -209,6 +206,30 @@ func (c *CustomFuncs) SplitScanIntoUnionScans(
return nil, false
}

limitVal := int(*limit.(*tree.DInt))
return c.SplitScanIntoUnionScans(limitOrdering, scan, sp, cons, limitVal, keyPrefixLength)
}

// SplitScanIntoUnionScans tries to find a UnionAll of Scan operators (with an
// optional hard limit) that each scan over a single key from the original
// Scan's constraints. The UnionAll is returned if the scans can provide the
// given ordering, and if the statistics suggest that splitting the scan could
// be beneficial. If no such UnionAll of Scans can be found, ok=false is
// returned. This is beneficial in cases where an ordering is required on a
// suffix of the index columns, and constraining the first column(s) allows the
// scan to provide that ordering.
// TODO(drewk): handle inverted scans.
func (c *CustomFuncs) SplitScanIntoUnionScans(
ordering props.OrderingChoice,
scan memo.RelExpr,
sp *memo.ScanPrivate,
cons *constraint.Constraint,
limit int,
keyPrefixLength int,
) (_ memo.RelExpr, ok bool) {
const maxScanCount = 16
const threshold = 4

keyCtx := constraint.MakeKeyContext(&cons.Columns, c.e.evalCtx)
spans := cons.Spans

Expand Down Expand Up @@ -240,24 +261,24 @@ func (c *CustomFuncs) SplitScanIntoUnionScans(
scanCount = maxScanCount
}

limitVal := int(*limit.(*tree.DInt))

if scan.Relational().Stats.Available &&
float64(scanCount*limitVal*threshold) >= scan.Relational().Stats.RowCount {
// Splitting the Scan may not be worth the overhead. Creating a sequence of
// Scans and Unions is expensive, so we only want to create the plan if it
// is likely to be used.
return nil, false
if limit > 0 {
if scan.Relational().Stats.Available &&
float64(scanCount*limit*threshold) >= scan.Relational().Stats.RowCount {
// Splitting the Scan may not be worth the overhead. Creating a sequence of
// Scans and Unions is expensive, so we only want to create the plan if it
// is likely to be used.
return nil, false
}
}

// The index ordering must have a prefix of columns of length keyLength
// followed by the limitOrdering columns either in order or in reverse order.
hasLimitOrderingSeq, reverse := indexHasOrderingSequence(
c.e.mem.Metadata(), scan, sp, limitOrdering, keyPrefixLength)
if !hasLimitOrderingSeq {
// followed by the ordering columns either in order or in reverse order.
hasOrderingSeq, reverse := indexHasOrderingSequence(
c.e.mem.Metadata(), scan, sp, ordering, keyPrefixLength)
if !hasOrderingSeq {
return nil, false
}
newHardLimit := memo.MakeScanLimit(int64(limitVal), reverse)
newHardLimit := memo.MakeScanLimit(int64(limit), reverse)

// makeNewUnion extends the UnionAll tree rooted at 'last' to include
// 'newScan'. The ColumnIDs of the original Scan are used by the resulting
Expand Down
Loading

0 comments on commit b033f6c

Please sign in to comment.