Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
39151: storage: handle learner replicas in merge code r=tbg a=danhhz

For simplicity of implementation and concept, learners are disallowed by
AdminMerge. The merge queue now removes them before calling it.

AdminRelocateRange similarly removes them before doing its work (even if
the learner is one of the supplied targets). This is not strictly
necessary for its use in the merge queue, since the merge queue does its
own removal, but decoupling this is appealing. Additionally,
AdminRelocateRange has other callers than the merge queue.

Touches cockroachdb#38902

Release note: None

39170: opt: stop storing references to WITH expressions r=justinj a=justinj

It's sketchy to include references to RelExprs that are "outside the
system," and we only needed them any more to compute stats, so we'll
just call them unknown for the time being.

Release note: None

Co-authored-by: Daniel Harrison <[email protected]>
Co-authored-by: Justin Jaffray <[email protected]>
  • Loading branch information
3 people committed Jul 30, 2019
3 parents 95a3d42 + 5bda9fb + e40faa9 commit 8a08c17
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 137 deletions.
9 changes: 5 additions & 4 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,11 @@ func (db *DB) DelRange(ctx context.Context, begin, end interface{}) error {
return getOneErr(db.Run(ctx, b), b)
}

// AdminMerge merges the range containing key and the subsequent
// range. After the merge operation is complete, the range containing
// key will contain all of the key/value pairs of the subsequent range
// and the subsequent range will no longer exist.
// AdminMerge merges the range containing key and the subsequent range. After
// the merge operation is complete, the range containing key will contain all of
// the key/value pairs of the subsequent range and the subsequent range will no
// longer exist. Neither range may contain learner replicas, if one does, an
// error is returned.
//
// key can be either a byte slice or a string.
func (db *DB) AdminMerge(ctx context.Context, key interface{}) error {
Expand Down
30 changes: 7 additions & 23 deletions pkg/sql/opt/memo/memo.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,8 @@ type Memo struct {
// curID is the highest currently in-use scalar expression ID.
curID opt.ScalarID

// withExprs is the set of With expressions that have been constructed thus
// far.
withExprs []RelExpr
// curWithID is the highest currently in-use WITH ID.
curWithID opt.WithID

// WARNING: if you add more members, add initialization code in Init.
}
Expand All @@ -162,7 +161,6 @@ func (m *Memo) Init(evalCtx *tree.EvalContext) {
m.rootExpr = nil
m.rootProps = nil
m.memEstimate = 0
m.withExprs = nil

m.dataConversion = evalCtx.SessionData.DataConversion
m.reorderJoinsLimit = evalCtx.SessionData.ReorderJoinsLimit
Expand All @@ -172,6 +170,7 @@ func (m *Memo) Init(evalCtx *tree.EvalContext) {
m.saveTablesPrefix = evalCtx.SessionData.SaveTablesPrefix

m.curID = 0
m.curWithID = 0
}

// IsEmpty returns true if there are no expressions in the memo.
Expand Down Expand Up @@ -359,23 +358,8 @@ func (m *Memo) RequestColStat(
return nil, false
}

// AddWithBinding adds a new expression to the set of referenceable With
// expressions, returning its associated ID that can be used to retrieve it.
func (m *Memo) AddWithBinding(e RelExpr) opt.WithID {
m.withExprs = append(m.withExprs, e)
return opt.WithID(len(m.withExprs))
}

// WithExpr returns the expression associated with the given WithID.
// This shouldn't be used for anything relating to physical props.
func (m *Memo) WithExpr(id opt.WithID) RelExpr {
return m.withExprs[id-1]
}

// CopyWiths copies over the set of WITH expressions used by the other memo.
// TODO(justin): These are just used to compute stats, we should have a way to
// avoid hanging on to expressions like this.
func (m *Memo) CopyWiths(o *Memo, replace func(opt.Expr) opt.Expr) {
m.withExprs = make([]RelExpr, len(o.withExprs))
copy(m.withExprs, o.withExprs)
// NextWithID returns a not-yet-assigned identifier for a WITH expression.
func (m *Memo) NextWithID() opt.WithID {
m.curWithID++
return m.curWithID
}
37 changes: 5 additions & 32 deletions pkg/sql/opt/memo/statistics_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,11 @@ func (sb *statisticsBuilder) colStat(colSet opt.ColSet, e RelExpr) *props.Column
return sb.colStat(colSet, e.Child(1).(RelExpr))

case opt.WithScanOp:
return sb.colStatWithScan(colSet, e.(*WithScanExpr))
// This is tricky, since if we deferred to the expression being referenced,
// the computation of stats for a WithScan would depend on something
// outside of the expression itself. Just call it unknown for now.
// TODO(justin): find a real solution for this.
return sb.colStatUnknown(colSet, e.Relational())

case opt.FakeRelOp:
panic(errors.AssertionFailedf("FakeRelOp does not contain col stat for %v", colSet))
Expand Down Expand Up @@ -2061,37 +2065,6 @@ func (sb *statisticsBuilder) colStatSequenceSelect(
return colStat
}

// +-----------+
// | With Scan |
// +-----------+

func (sb *statisticsBuilder) colStatWithScan(
colSet opt.ColSet, ws *WithScanExpr,
) *props.ColumnStatistic {
relProps := ws.Relational()
s := &relProps.Stats

withExpr := ws.Memo().WithExpr(ws.ID)

// We need to pass on the colStat request to the referenced expression, but
// we need to translate the columns to the ones returned by the original
// expression, rather than the reference.
cols := translateColSet(colSet, ws.OutCols, ws.InCols)

// Note that if the WITH contained placeholders, we still hold onto the
// original, unreplaced expression, and so we won't be able to generate stats
// corresponding to the replaced expression.
// TODO(justin): find a way to lift this limitation. One way could be to
// rebuild WithScans when the WITH they reference has placeholders that get
// replaced.

colstat, _ := s.ColStats.Add(colSet)
*colstat = *sb.colStat(cols, withExpr)
colstat.Cols = colSet

return colstat
}

// +---------+
// | Unknown |
// +---------+
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/opt/memo/testdata/stats/groupby
Original file line number Diff line number Diff line change
Expand Up @@ -468,45 +468,45 @@ GROUP BY q.b
with &1 (q)
├── columns: "?column?":6(int!null)
├── cardinality: [0 - 3]
├── stats: [rows=0.22654092]
├── stats: [rows=1]
├── fd: ()-->(6)
├── values
│ ├── columns: column1:1(bool!null) column2:2(int)
│ ├── cardinality: [3 - 3]
│ ├── stats: [rows=3, distinct(1)=2, null(1)=0, distinct(2)=2, null(2)=2]
│ ├── stats: [rows=3]
│ ├── (true, NULL) [type=tuple{bool, int}]
│ ├── (false, NULL) [type=tuple{bool, int}]
│ └── (true, 5) [type=tuple{bool, int}]
└── project
├── columns: "?column?":6(int!null)
├── cardinality: [0 - 3]
├── stats: [rows=0.22654092]
├── stats: [rows=1]
├── fd: ()-->(6)
├── select
│ ├── columns: b:4(int) bool_or:5(bool!null)
│ ├── cardinality: [0 - 3]
│ ├── stats: [rows=0.22654092, distinct(5)=0.22654092, null(5)=0]
│ ├── stats: [rows=1, distinct(5)=1, null(5)=0]
│ ├── key: (4)
│ ├── fd: ()-->(5)
│ ├── group-by
│ │ ├── columns: b:4(int) bool_or:5(bool)
│ │ ├── grouping columns: b:4(int)
│ │ ├── cardinality: [0 - 3]
│ │ ├── stats: [rows=1.29289322, distinct(4)=1.29289322, null(4)=1, distinct(5)=1.29289322, null(5)=1]
│ │ ├── stats: [rows=1, distinct(4)=1, null(4)=0, distinct(5)=1, null(5)=0]
│ │ ├── key: (4)
│ │ ├── fd: (4)-->(5)
│ │ ├── select
│ │ │ ├── columns: a:3(bool!null) b:4(int)
│ │ │ ├── cardinality: [0 - 3]
│ │ │ ├── stats: [rows=1.5, distinct(3)=1, null(3)=0, distinct(4)=1.29289322, null(4)=1]
│ │ │ ├── stats: [rows=1, distinct(3)=1, null(3)=0, distinct(4)=1, null(4)=0]
│ │ │ ├── fd: ()-->(3)
│ │ │ ├── with-scan &1 (q)
│ │ │ │ ├── columns: a:3(bool!null) b:4(int)
│ │ │ │ ├── mapping:
│ │ │ │ │ ├── column1:1(bool) => a:3(bool)
│ │ │ │ │ └── column2:2(int) => b:4(int)
│ │ │ │ ├── cardinality: [3 - 3]
│ │ │ │ ├── stats: [rows=3, distinct(3)=2, null(3)=0, distinct(4)=2, null(4)=2]
│ │ │ │ ├── stats: [rows=3]
│ │ │ │ └── fd: (1)-->(3), (2)-->(4)
│ │ │ └── filters
│ │ │ └── variable: a [type=bool, outer=(3), constraints=(/3: [/true - /true]; tight), fd=()-->(3)]
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/opt/memo/testdata/stats/scan
Original file line number Diff line number Diff line change
Expand Up @@ -539,38 +539,38 @@ WHERE
----
with &1 (subq)
├── columns: "?column?":26(int!null)
├── stats: [rows=0.95099005]
├── stats: [rows=1]
├── fd: ()-->(26)
├── project
│ ├── columns: col1:23(bool) tab1.g:18(int4!null)
│ ├── stats: [rows=333333.333, distinct(18)=100, null(18)=0, distinct(23)=333333.333, null(23)=16336.65]
│ ├── stats: [rows=333333.333]
│ ├── inner-join (hash)
│ │ ├── columns: tab0.e:5(varchar) tab0.f:6("char") tab0.h:8(varchar) tab0.j:10(float!null) tab1.e:16(varchar) tab1.f:17("char") tab1.g:18(int4!null) tab1.j:21(float!null)
│ │ ├── stats: [rows=333333.333, distinct(10)=100, null(10)=0, distinct(18)=100, null(18)=0, distinct(21)=100, null(21)=0, distinct(5,6,8,16,17)=333333.333, null(5,6,8,16,17)=16336.65]
│ │ ├── stats: [rows=333333.333, distinct(10)=100, null(10)=0, distinct(18)=100, null(18)=0, distinct(21)=100, null(21)=0]
│ │ ├── scan tab0
│ │ │ ├── columns: tab0.e:5(varchar) tab0.f:6("char") tab0.h:8(varchar) tab0.j:10(float!null)
│ │ │ └── stats: [rows=1000, distinct(10)=100, null(10)=0, distinct(5,6,8)=1000, null(5,6,8)=29.701]
│ │ │ └── stats: [rows=1000, distinct(10)=100, null(10)=0]
│ │ ├── scan tab1
│ │ │ ├── columns: tab1.e:16(varchar) tab1.f:17("char") tab1.g:18(int4!null) tab1.j:21(float!null)
│ │ │ └── stats: [rows=1000, distinct(18)=100, null(18)=0, distinct(21)=100, null(21)=0, distinct(16,17)=1000, null(16,17)=19.9]
│ │ │ └── stats: [rows=1000, distinct(18)=100, null(18)=0, distinct(21)=100, null(21)=0]
│ │ └── filters
│ │ └── tab0.j IN (tab1.j,) [type=bool, outer=(10,21)]
│ └── projections
│ └── CASE WHEN ilike_escape(regexp_replace(tab0.h, tab1.e, tab0.f, tab0.e::STRING), tab1.f, '') THEN true ELSE false END [type=bool, outer=(5,6,8,16,17)]
└── project
├── columns: "?column?":26(int!null)
├── stats: [rows=0.95099005]
├── stats: [rows=1]
├── fd: ()-->(26)
├── select
│ ├── columns: col0:24(int4!null) col1:25(bool!null)
│ ├── stats: [rows=0.95099005, distinct(24)=0.95099005, null(24)=0, distinct(25)=0.95099005, null(25)=0]
│ ├── stats: [rows=1, distinct(24)=1, null(24)=0, distinct(25)=1, null(25)=0]
│ ├── fd: ()-->(25)
│ ├── with-scan &1 (subq)
│ │ ├── columns: col0:24(int4!null) col1:25(bool)
│ │ ├── mapping:
│ │ │ ├── tab1.g:18(int4) => col0:24(int4)
│ │ │ └── col1:23(bool) => col1:25(bool)
│ │ ├── stats: [rows=333333.333, distinct(24)=100, null(24)=0, distinct(25)=333333.333, null(25)=16336.65]
│ │ ├── stats: [rows=333333.333]
│ │ └── fd: (18)-->(24), (23)-->(25)
│ └── filters
│ └── variable: col1 [type=bool, outer=(25), constraints=(/25: [/true - /true]; tight), fd=()-->(25)]
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/opt/memo/testdata/stats/select
Original file line number Diff line number Diff line change
Expand Up @@ -1369,32 +1369,32 @@ SELECT x FROM t WHERE x
----
with &1 (t)
├── columns: x:6(bool!null)
├── stats: [rows=4e+10, distinct(6)=1, null(6)=0]
├── stats: [rows=1, distinct(6)=1, null(6)=0]
├── fd: ()-->(6)
├── project
│ ├── columns: x:5(bool)
│ ├── stats: [rows=4e+20, distinct(5)=1, null(5)=4e+20]
│ ├── stats: [rows=4e+20]
│ ├── left-join (hash)
│ │ ├── columns: t1.x:1(bool) t2.x:3(bool)
│ │ ├── stats: [rows=4e+20, distinct(1,3)=1, null(1,3)=4e+20]
│ │ ├── stats: [rows=4e+20]
│ │ ├── scan t1
│ │ │ ├── columns: t1.x:1(bool)
│ │ │ └── stats: [rows=2e+10, distinct(1)=1, null(1)=2e+10]
│ │ │ └── stats: [rows=2e+10]
│ │ ├── scan t2
│ │ │ ├── columns: t2.x:3(bool)
│ │ │ └── stats: [rows=2e+10, distinct(3)=1, null(3)=2e+10]
│ │ │ └── stats: [rows=2e+10]
│ │ └── filters (true)
│ └── projections
│ └── (t1.x::INT8 << 5533)::BOOL OR t2.x [type=bool, outer=(1,3)]
└── select
├── columns: x:6(bool!null)
├── stats: [rows=4e+10, distinct(6)=1, null(6)=0]
├── stats: [rows=1, distinct(6)=1, null(6)=0]
├── fd: ()-->(6)
├── with-scan &1 (t)
│ ├── columns: x:6(bool)
│ ├── mapping:
│ │ └── x:5(bool) => x:6(bool)
│ ├── stats: [rows=4e+20, distinct(6)=1, null(6)=4e+20]
│ ├── stats: [rows=4e+20]
│ └── fd: (5)-->(6)
└── filters
└── variable: x [type=bool, outer=(6), constraints=(/6: [/true - /true]; tight), fd=()-->(6)]
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/memo/testdata/stats/with
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ with &1 (foo)
├── fd: (1)-->(2-4), (2)-->(5), (3)-->(6)
├── scan a
│ ├── columns: a.x:1(int!null) a.y:2(int) a.s:3(string)
│ ├── stats: [rows=5000, distinct(1)=5000, null(1)=0, distinct(2)=400, null(2)=0, distinct(3)=10, null(3)=0]
│ ├── stats: [rows=5000]
│ ├── key: (1)
│ └── fd: (1)-->(2,3)
└── with-scan &1 (foo)
Expand All @@ -50,6 +50,6 @@ with &1 (foo)
│ ├── a.x:1(int) => x:4(int)
│ ├── a.y:2(int) => y:5(int)
│ └── a.s:3(string) => s:6(string)
├── stats: [rows=5000, distinct(4)=5000, null(4)=0, distinct(5)=400, null(5)=0, distinct(6)=10, null(6)=0]
├── stats: [rows=5000]
├── key: (1)
└── fd: (1)-->(2-4), (2)-->(5), (3)-->(6)
1 change: 0 additions & 1 deletion pkg/sql/opt/norm/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func (f *Factory) CopyAndReplace(
// Copy all metadata to the target memo so that referenced tables and columns
// can keep the same ids they had in the "from" memo.
f.mem.Metadata().CopyFrom(from.Memo().Metadata())
f.mem.CopyWiths(from.Memo(), replace)

// Perform copy and replacement, and store result as the root of this
// factory's memo.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/optbuilder/mutation_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func (mb *mutationBuilder) buildFKChecks() {
// need to buffer it. This could be a normalization rule, but it's probably
// more efficient if we did it in here (or we'd end up building the entire FK
// subtrees twice).
mb.withID = mb.b.factory.Memo().AddWithBinding(mb.outScope.expr)
mb.withID = mb.b.factory.Memo().NextWithID()

for i, n := 0, mb.tab.OutboundForeignKeyCount(); i < n; i++ {
fk := mb.tab.OutboundForeignKey(i)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/optbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func (b *Builder) buildCTE(

cteScope = projectionsScope

id := b.factory.Memo().AddWithBinding(cteScope.expr)
id := b.factory.Memo().NextWithID()

// No good way to show non-select expressions, like INSERT, here.
var stmt tree.SelectStatement
Expand Down
56 changes: 45 additions & 11 deletions pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -239,6 +240,15 @@ func (mq *mergeQueue) process(
return nil
}

// Range was manually split and not expired, so skip merging.
now := mq.store.Clock().Now()
if now.Less(rhsDesc.GetStickyBit()) {
log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired")
// TODO(jeffreyxiao): Consider returning a purgatory error to avoid
// repeatedly processing ranges that cannot be merged.
return nil
}

mergedDesc := &roachpb.RangeDescriptor{
StartKey: lhsDesc.StartKey,
EndKey: rhsDesc.EndKey,
Expand All @@ -264,13 +274,46 @@ func (mq *mergeQueue) process(
return nil
}

if !replicaSetsEqual(lhsDesc.Replicas().Unwrap(), rhsDesc.Replicas().Unwrap()) {
{
// AdminMerge errors if there are learners on either side and
// AdminRelocateRange removes any on the range it operates on. For the sake
// of obviousness, just remove them all upfront.
newLHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), lhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
lhsDesc = newLHSDesc
newRHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), &rhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
rhsDesc = *newRHSDesc
}
lhsReplicas, rhsReplicas := lhsDesc.Replicas().All(), rhsDesc.Replicas().All()

// Defensive sanity check that everything is now a voter.
for i := range lhsReplicas {
if lhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER {
return errors.Errorf(`cannot merge non-voter replicas on lhs: %v`, lhsReplicas)
}
}
for i := range rhsReplicas {
if rhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER {
return errors.Errorf(`cannot merge non-voter replicas on rhs: %v`, rhsReplicas)
}
}

if !replicaSetsEqual(lhsReplicas, rhsReplicas) {
var targets []roachpb.ReplicationTarget
for _, lhsReplDesc := range lhsDesc.Replicas().Unwrap() {
for _, lhsReplDesc := range lhsReplicas {
targets = append(targets, roachpb.ReplicationTarget{
NodeID: lhsReplDesc.NodeID, StoreID: lhsReplDesc.StoreID,
})
}
// AdminRelocateRange moves the lease to the first target in the list, so
// sort the existing leaseholder there to leave it unchanged.
lease, _ := lhsRepl.GetLease()
for i := range targets {
if targets[i].NodeID == lease.Replica.NodeID && targets[i].StoreID == lease.Replica.StoreID {
Expand All @@ -287,15 +330,6 @@ func (mq *mergeQueue) process(
}
}

// Range was manually split and not expired, so skip merging.
now := mq.store.Clock().Now()
if now.Less(rhsDesc.GetStickyBit()) {
log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired")
// TODO(jeffreyxiao): Consider returning a purgatory error to avoid
// repeatedly processing ranges that cannot be merged.
return nil
}

log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey)
reason := fmt.Sprintf("lhs+rhs has (size=%s+%s qps=%.2f+%.2f --> %.2fqps) below threshold (size=%s, qps=%.2f)",
humanizeutil.IBytes(lhsStats.Total()),
Expand Down
Loading

0 comments on commit 8a08c17

Please sign in to comment.