Skip to content

Commit

Permalink
Merge #39034 #39469
Browse files Browse the repository at this point in the history
39034: storage: use learner replicas for replica addition by default r=tbg a=danhhz

This was previously available by flipping a cluster setting that
defaulted to off, this defaults it to on. With any luck, we'll be
confident enough in this to remove the cluster setting in 19.2, which
will allow us to rip out a bunch of code at the beginning of the 20.1
cycle.

Closes #38902

Release note (general change): Replicas are now added using a raft
learner and going through the normal raft snapshot process to catch them
up, eliminating technical debt. No user facing changes are expected.

39469: opt: fix panic due to incorrect type of ArrayFlatten r=rytaft a=rytaft

This commit fixes a panic caused by incorrect typing of an `ArrayFlatten`
expression. If the input to an `ArrayFlatten` expression is sorted, there
may be more than one output column (although the columns used for sorting
are hidden). If one of these hidden columns is chosen to infer the type
of the expression, the type could be incorrect. This commit fixes the
problem so that only the requested column is chosen for type inference.

Fixes #38867

Release note (bug fix): Fixed a panic due to incorrect type inference
of some ARRAY(...) expressions.

Co-authored-by: Daniel Harrison <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
3 people committed Aug 13, 2019
3 parents 5e6c20d + cd173ff + 1bceb6c commit f11604f
Show file tree
Hide file tree
Showing 27 changed files with 557 additions and 465 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<tr><td><code>kv.closed_timestamp.target_duration</code></td><td>duration</td><td><code>30s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td></tr>
<tr><td><code>kv.follower_read.target_multiple</code></td><td>float</td><td><code>3</code></td><td>if above 1, encourages the distsender to perform a read against the closest replica if a request is older than kv.closed_timestamp.target_duration * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty interval. This value also is used to create follower_timestamp(). (WARNING: may compromise cluster stability or correctness; do not edit without supervision)</td></tr>
<tr><td><code>kv.import.batch_size</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the maximum size of the payload in an AddSSTable request (WARNING: may compromise cluster stability or correctness; do not edit without supervision)</td></tr>
<tr><td><code>kv.learner_replicas.enabled</code></td><td>boolean</td><td><code>false</code></td><td>use learner replicas for replica addition</td></tr>
<tr><td><code>kv.learner_replicas.enabled</code></td><td>boolean</td><td><code>true</code></td><td>use learner replicas for replica addition</td></tr>
<tr><td><code>kv.raft.command.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum size of a raft command</td></tr>
<tr><td><code>kv.raft_log.disable_synchronization_unsafe</code></td><td>boolean</td><td><code>false</code></td><td>set to true to disable synchronization on Raft log writes to persistent storage. Setting to true risks data loss or data corruption on server crashes. The setting is meant for internal testing only and SHOULD NOT be used in production.</td></tr>
<tr><td><code>kv.range.backpressure_range_size_multiplier</code></td><td>float</td><td><code>2</code></td><td>multiple of range_max_bytes that a range is allowed to grow to without splitting before writes to that range are blocked, or 0 to disable</td></tr>
Expand Down
18 changes: 15 additions & 3 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
// - Learner replicas cannot become raft leaders, so we also don't allow them to
// become leaseholders. As a result, DistSender and the various oracles don't
// try to send them traffic.
// - The raft snapshot queue does not send snapshots to learners for reasons
// described below.
// - The raft snapshot queue tries to avoid sending snapshots to learners for
// reasons described below.
// - Merges won't run while a learner replica is present.
//
// Replicas are now added in two ConfChange transactions. The first creates the
Expand All @@ -135,7 +135,19 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
// There is another race between the learner snapshot being sent and the raft
// snapshot queue happening to check the replica at the same time, also sending
// it a snapshot. This is safe but wasteful, so the raft snapshot queue won't
// try to send snapshots to learners.
// try to send snapshots to learners if there is already a snapshot to that
// range in flight.
//
// *However*, raft is currently pickier than the needs to be about the snapshots
// it requests and it can get stuck in StateSnapshot if it doesn't receive
// exactly the index it wants. As a result, for now, the raft snapshot queue
// will send one if it's still needed after the learner snapshot finishes (or
// times out). To make this work in a timely manner (i.e. without relying on the
// replica scanner) but without blocking the raft snapshot queue, when a
// snapshot is skipped, this is reported to raft as an error sending the
// snapshot. This causes raft to eventually re-enqueue it in the raft snapshot
// queue. All of this is quite hard to reason about, so it'd be nice to make
// this go away at some point.
//
// Merges are blocked if either side has a learner (to avoid working out the
// edge cases) but it's historically turned out to be a bad idea to get in the
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/subquery_correlated
Original file line number Diff line number Diff line change
Expand Up @@ -1150,3 +1150,23 @@ ORDER BY
statement ok
DROP TABLE stuff;
DROP TABLE users;

# Regression test for #38867.
query T
SELECT (
SELECT
ARRAY (
SELECT c.relname
FROM pg_inherits AS i JOIN pg_class AS c ON c.oid = i.inhparent
WHERE i.inhrelid = rel.oid
ORDER BY inhseqno
)
)
FROM pg_class AS rel
LIMIT 5;
----
{}
{}
{}
{}
{}
15 changes: 14 additions & 1 deletion pkg/sql/opt/colset.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

package opt

import "github.com/cockroachdb/cockroach/pkg/util"
import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)

// ColSet efficiently stores an unordered set of column ids.
type ColSet struct {
Expand Down Expand Up @@ -85,3 +88,13 @@ func (s ColSet) SubsetOf(rhs ColSet) bool { return s.set.SubsetOf(rhs.set) }
// numbers are shown as ranges. For example, for the set {1, 2, 3 5, 6, 10},
// the output is "(1-3,5,6,10)".
func (s ColSet) String() string { return s.set.String() }

// SingleColumn returns the single column in s. Panics if s does not contain
// exactly one column.
func (s ColSet) SingleColumn() ColumnID {
if s.Len() != 1 {
panic(errors.AssertionFailedf("expected a single column but found %d columns", s.Len()))
}
col, _ := s.Next(0)
return col
}
3 changes: 2 additions & 1 deletion pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ type Factory interface {
//
// memo, rightProps, and right are the memo, required physical properties, and
// RelExpr of the right side of the join that will be repeatedly modified,
// re-planned and executed for every row from the left side.
// re-planned and executed for every row from the left side. The rightProps
// always includes a presentation.
//
// fakeRight is a pre-planned node that is the right side of the join with
// all outer columns replaced by NULL. The physical properties of this node
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/memo/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func typeCollate(e opt.ScalarExpr) *types.T {
// typeArrayFlatten returns the type of the subquery as an array.
func typeArrayFlatten(e opt.ScalarExpr) *types.T {
input := e.Child(0).(RelExpr)
colID, _ := input.Relational().OutputCols.Next(0)
colID := e.(*ArrayFlattenExpr).RequestedCol
return types.MakeArray(input.Memo().Metadata().ColumnMeta(colID).Type)
}

Expand Down Expand Up @@ -354,7 +354,7 @@ func typeCast(e opt.ScalarExpr) *types.T {
// its first (and only) column.
func typeSubquery(e opt.ScalarExpr) *types.T {
input := e.Child(0).(RelExpr)
colID, _ := input.Relational().OutputCols.Next(0)
colID := input.Relational().OutputCols.SingleColumn()
return input.Memo().Metadata().ColumnMeta(colID).Type
}

Expand Down
13 changes: 6 additions & 7 deletions pkg/sql/opt/norm/custom_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,9 @@ func (c *CustomFuncs) CanConstructBinary(op opt.Operator, left, right opt.Scalar
return memo.BinaryOverloadExists(op, left.DataType(), right.DataType())
}

// ArrayType returns the type of the first output column wrapped
// ArrayType returns the type of the given column wrapped
// in an array.
func (c *CustomFuncs) ArrayType(in memo.RelExpr) *types.T {
inCol, _ := c.OutputCols(in).Next(0)
func (c *CustomFuncs) ArrayType(inCol opt.ColumnID) *types.T {
inTyp := c.mem.Metadata().ColumnMeta(inCol).Type
return types.MakeArray(inTyp)
}
Expand Down Expand Up @@ -1626,10 +1625,10 @@ func (c *CustomFuncs) SubqueryOrdering(sub *memo.SubqueryPrivate) physical.Order
return oc
}

// FirstCol returns the first column in the input expression.
func (c *CustomFuncs) FirstCol(in memo.RelExpr) opt.ColumnID {
inCol, _ := c.OutputCols(in).Next(0)
return inCol
// SubqueryRequestedCol returns the requested column from a SubqueryPrivate.
// This function should only be used with ArrayFlatten expressions.
func (c *CustomFuncs) SubqueryRequestedCol(sub *memo.SubqueryPrivate) opt.ColumnID {
return sub.RequestedCol
}

// MakeArrayAggCol returns a ColPrivate with the given type and an "array_agg" label.
Expand Down
16 changes: 9 additions & 7 deletions pkg/sql/opt/norm/decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,7 @@ func (c *CustomFuncs) ConstructNoColsRow() memo.RelExpr {
// referenceSingleColumn returns a Variable operator that refers to the one and
// only column that is projected by the input expression.
func (c *CustomFuncs) referenceSingleColumn(in memo.RelExpr) opt.ScalarExpr {
cols := in.Relational().OutputCols
if cols.Len() != 1 {
panic(errors.AssertionFailedf("expression does not have exactly one column"))
}
colID, _ := cols.Next(0)
colID := in.Relational().OutputCols.SingleColumn()
return c.f.ConstructVariable(colID)
}

Expand Down Expand Up @@ -801,8 +797,14 @@ func (r *subqueryHoister) hoistAll(scalar opt.ScalarExpr) opt.ScalarExpr {
}

// Replace the Subquery operator with a Variable operator referring to
// the first (and only) column in the hoisted query.
colID, _ := subqueryProps.OutputCols.Next(0)
// the output column of the hoisted query.
var colID opt.ColumnID
switch t := scalar.(type) {
case *memo.ArrayFlattenExpr:
colID = t.RequestedCol
default:
colID = subqueryProps.OutputCols.SingleColumn()
}
return r.f.ConstructVariable(colID)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/opt/norm/rules/scalar.opt
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@
$input
[
(AggregationsItem
(ArrayAgg (Variable (FirstCol $input)))
(MakeArrayAggCol (ArrayType $input))
(ArrayAgg (Variable $requestedCol:(SubqueryRequestedCol $subquery)))
(MakeArrayAggCol (ArrayType $requestedCol))
)
]
(MakeOrderedGrouping
Expand All @@ -299,6 +299,6 @@
)
(MakeUnorderedSubquery)
)
(Array [] (ArrayType $input))
(Array [] (ArrayType $requestedCol))
]
)
124 changes: 124 additions & 0 deletions pkg/sql/opt/norm/testdata/rules/scalar
Original file line number Diff line number Diff line change
Expand Up @@ -1287,3 +1287,127 @@ project
└── scan a
├── columns: k:7(int!null)
└── key: (7)

exec-ddl
CREATE TABLE pg_class (
oid OID NULL,
relname NAME NOT NULL,
relnamespace OID NULL,
reltype OID NULL,
reloftype OID NULL,
relowner OID NULL,
relam OID NULL,
relfilenode OID NULL,
reltablespace OID NULL,
relpages INT4 NULL,
reltuples FLOAT4 NULL,
relallvisible INT4 NULL,
reltoastrelid OID NULL,
relhasindex BOOL NULL,
relisshared BOOL NULL,
relpersistence CHAR NULL,
relistemp BOOL NULL,
relkind CHAR NULL,
relnatts INT2 NULL,
relchecks INT2 NULL,
relhasoids BOOL NULL,
relhaspkey BOOL NULL,
relhasrules BOOL NULL,
relhastriggers BOOL NULL,
relhassubclass BOOL NULL,
relfrozenxid INT8 NULL,
relacl STRING[] NULL,
reloptions STRING[] NULL
)
----

exec-ddl
CREATE TABLE pg_inherits (
inhrelid OID NULL,
inhparent OID NULL,
inhseqno INT4 NULL
)
----

# Regression test for #38867.
norm expect=NormalizeArrayFlattenToAgg
SELECT (
SELECT
ARRAY (
SELECT c.relname
FROM pg_inherits AS i JOIN pg_class AS c ON c.oid = i.inhparent
WHERE i.inhrelid = rel.oid
ORDER BY inhseqno
)
)
FROM pg_class AS rel
----
project
├── columns: array:66(name[])
├── inner-join-apply
│ ├── columns: rel.oid:1(oid) array_agg:63(name[]) array:64(name[])
│ ├── scan rel
│ │ └── columns: rel.oid:1(oid)
│ ├── inner-join-apply
│ │ ├── columns: array_agg:63(name[]) array:64(name[])
│ │ ├── outer: (1)
│ │ ├── cardinality: [1 - 1]
│ │ ├── key: ()
│ │ ├── fd: ()-->(63,64)
│ │ ├── project
│ │ │ ├── columns: array_agg:63(name[])
│ │ │ ├── outer: (1)
│ │ │ ├── cardinality: [1 - 1]
│ │ │ ├── key: ()
│ │ │ ├── fd: ()-->(63)
│ │ │ ├── group-by
│ │ │ │ ├── columns: inhrelid:30(oid) array_agg:65(name[])
│ │ │ │ ├── internal-ordering: +32 opt(30)
│ │ │ │ ├── outer: (1)
│ │ │ │ ├── cardinality: [1 - 1]
│ │ │ │ ├── key: ()
│ │ │ │ ├── fd: ()-->(30,65)
│ │ │ │ ├── sort
│ │ │ │ │ ├── columns: inhrelid:30(oid) inhparent:31(oid) inhseqno:32(int4) c.oid:34(oid) c.relname:35(name)
│ │ │ │ │ ├── outer: (1)
│ │ │ │ │ ├── cardinality: [1 - ]
│ │ │ │ │ ├── fd: (31)==(34), (34)==(31)
│ │ │ │ │ ├── ordering: +32 opt(30) [actual: +32]
│ │ │ │ │ └── left-join (hash)
│ │ │ │ │ ├── columns: inhrelid:30(oid) inhparent:31(oid) inhseqno:32(int4) c.oid:34(oid) c.relname:35(name)
│ │ │ │ │ ├── outer: (1)
│ │ │ │ │ ├── cardinality: [1 - ]
│ │ │ │ │ ├── fd: (31)==(34), (34)==(31)
│ │ │ │ │ ├── values
│ │ │ │ │ │ ├── cardinality: [1 - 1]
│ │ │ │ │ │ ├── key: ()
│ │ │ │ │ │ └── tuple [type=tuple]
│ │ │ │ │ ├── inner-join (hash)
│ │ │ │ │ │ ├── columns: inhrelid:30(oid) inhparent:31(oid!null) inhseqno:32(int4) c.oid:34(oid!null) c.relname:35(name!null)
│ │ │ │ │ │ ├── fd: (31)==(34), (34)==(31)
│ │ │ │ │ │ ├── scan i
│ │ │ │ │ │ │ └── columns: inhrelid:30(oid) inhparent:31(oid) inhseqno:32(int4)
│ │ │ │ │ │ ├── scan c
│ │ │ │ │ │ │ └── columns: c.oid:34(oid) c.relname:35(name!null)
│ │ │ │ │ │ └── filters
│ │ │ │ │ │ └── c.oid = inhparent [type=bool, outer=(31,34), constraints=(/31: (/NULL - ]; /34: (/NULL - ]), fd=(31)==(34), (34)==(31)]
│ │ │ │ │ └── filters
│ │ │ │ │ └── inhrelid = rel.oid [type=bool, outer=(1,30), constraints=(/1: (/NULL - ]; /30: (/NULL - ]), fd=(1)==(30), (30)==(1)]
│ │ │ │ └── aggregations
│ │ │ │ ├── array-agg [type=name[], outer=(35)]
│ │ │ │ │ └── variable: c.relname [type=name]
│ │ │ │ └── any-not-null-agg [type=oid, outer=(30)]
│ │ │ │ └── variable: inhrelid [type=oid]
│ │ │ └── projections
│ │ │ └── CASE WHEN inhrelid IS NOT NULL THEN array_agg END [type=name[], outer=(30,65)]
│ │ ├── values
│ │ │ ├── columns: array:64(name[])
│ │ │ ├── outer: (63)
│ │ │ ├── cardinality: [1 - 1]
│ │ │ ├── key: ()
│ │ │ ├── fd: ()-->(64)
│ │ │ └── (COALESCE(array_agg, ARRAY[]),) [type=tuple{name[]}]
│ │ └── filters (true)
│ └── filters (true)
└── projections
└── variable: array [type=name[], outer=(64)]
2 changes: 1 addition & 1 deletion pkg/sql/opt/ops/scalar.opt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ define SubqueryPrivate {

# RequestedCol is set if there could possibly be other columns in the input
# (say, if there was an ordering that must be respected) besides the one that
# will eventually be output.
# will eventually be output. It is only used for ArrayFlatten expressions.
RequestedCol ColumnID

# Cmp is only used for AnyOp.
Expand Down
Loading

0 comments on commit f11604f

Please sign in to comment.