Skip to content

Commit

Permalink
Merge #40234 #40267
Browse files Browse the repository at this point in the history
40234: storage: test atomic replication changes r=nvanbenschoten a=tbg

This PR adds a number of tests that focus on the interaction between the various
queues and joint configurations.

We don't flip the switch yet since adding/removing only learners does not work
yet via the joint path. This isn't something we need per se, but it's an
annoying restriction to keep in mind and work around if it does happen. Tracked
in
#12768 (comment).

40267: tree: make int::regtype::text O(1) r=jordanlewis a=jordanlewis

Previously, casting an integer to a regtype and then text (which turns a
type OID into the string of its corresponding type) would run a select
over pg_type to figure out the answer, which under the hood
materializes all of the types into a table and filters, an O(n)
operation. This is silly because we already have a static lookup table
for this info. Use it.

This commonly shows up in visualization tools as O(n^2), since people
tend to run one of these casts once per type. So this improves metadata
query performance significantly.

Release note: None

Co-authored-by: Tobias Schottdorf <[email protected]>
Co-authored-by: Jordan Lewis <[email protected]>
  • Loading branch information
3 people committed Aug 28, 2019
3 parents 1606c9d + 4c3d5a5 + 79226f6 commit f81ac2b
Show file tree
Hide file tree
Showing 12 changed files with 544 additions and 202 deletions.
9 changes: 5 additions & 4 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,19 @@ func (r *RangeDescriptor) SetReplicas(replicas ReplicaDescriptors) {

// SetReplicaType changes the type of the replica with the given ID to the given
// type. Returns zero values if the replica was not found and the updated
// descriptor and true otherwise.
// descriptor, the previous type, and true, otherwise.
func (r *RangeDescriptor) SetReplicaType(
nodeID NodeID, storeID StoreID, typ ReplicaType,
) (ReplicaDescriptor, bool) {
) (ReplicaDescriptor, ReplicaType, bool) {
for i := range r.InternalReplicas {
desc := &r.InternalReplicas[i]
if desc.StoreID == storeID && desc.NodeID == nodeID {
prevTyp := desc.GetType()
desc.Type = &typ
return *desc, true
return *desc, prevTyp, true
}
}
return ReplicaDescriptor{}, false
return ReplicaDescriptor{}, 0, false
}

// AddReplica adds a replica on the given node and store with the supplied type.
Expand Down
61 changes: 33 additions & 28 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ func (d ReplicaDescriptors) All() []ReplicaDescriptor {
return d.wrapped
}

func predVoterFullOrIncoming(rDesc ReplicaDescriptor) bool {
switch rDesc.GetType() {
case VOTER_FULL, VOTER_INCOMING:
return true
default:
}
return false
}

func predLearner(rDesc ReplicaDescriptor) bool {
return rDesc.GetType() == LEARNER
}

// Voters returns the current and future voter replicas in the set. This means
// that during an atomic replication change, only the replicas that will be
// voters once the change completes will be returned; "outgoing" voters will not
Expand All @@ -102,26 +115,7 @@ func (d ReplicaDescriptors) All() []ReplicaDescriptor {
// different subset of voters. Consider renaming this method so that it's
// more descriptive.
func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
// Fastpath, most of the time, everything is a voter, so special case that and
// save the alloc.
fastpath := true
for i := range d.wrapped {
if d.wrapped[i].GetType() != VOTER_FULL {
fastpath = false
break
}
}
if fastpath {
return d.wrapped
}
voters := make([]ReplicaDescriptor, 0, len(d.wrapped))
for i := range d.wrapped {
switch d.wrapped[i].GetType() {
case VOTER_FULL, VOTER_INCOMING:
voters = append(voters, d.wrapped[i])
}
}
return voters
return d.Filter(predVoterFullOrIncoming)
}

// Learners returns the learner replicas in the set. This may allocate, but it
Expand Down Expand Up @@ -211,18 +205,29 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
//
// For some related mega-comments, see Replica.sendSnapshot.
func (d ReplicaDescriptors) Learners() []ReplicaDescriptor {
// Fastpath, most of the time, everything is a voter, so special case that and
// save the alloc.
var learners []ReplicaDescriptor
return d.Filter(predLearner)
}

// Filter returns only the replica descriptors for which the supplied method
// returns true. The memory returned may be shared with the receiver.
func (d ReplicaDescriptors) Filter(pred func(rDesc ReplicaDescriptor) bool) []ReplicaDescriptor {
// Fast path when all or none match to avoid allocations.
fastpath := true
out := d.wrapped
for i := range d.wrapped {
if d.wrapped[i].GetType() == LEARNER {
if learners == nil {
learners = make([]ReplicaDescriptor, 0, len(d.wrapped)-i)
if pred(d.wrapped[i]) {
if !fastpath {
out = append(out, d.wrapped[i])
}
} else {
if fastpath {
out = nil
out = append(out, d.wrapped[:i]...)
fastpath = false
}
learners = append(learners, d.wrapped[i])
}
}
return learners
return out
}

// AsProto returns the protobuf representation of these replicas, suitable for
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/sem/tree/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -3093,7 +3093,7 @@ func (expr *CastExpr) Eval(ctx *EvalContext) (Datum, error) {
}

// PerformCast performs a cast from the provided Datum to the specified
// CastTargetType.
// types.T.
func PerformCast(ctx *EvalContext, d Datum, t *types.T) (Datum, error) {
switch t.Family() {
case types.BitFamily:
Expand Down Expand Up @@ -3552,6 +3552,15 @@ func PerformCast(ctx *EvalContext, d Datum, t *types.T) (Datum, error) {
switch t.Oid() {
case oid.T_oid:
return &DOid{semanticType: t, DInt: v.DInt}, nil
case oid.T_regtype:
// Mapping an oid to a regtype is easy: we have a hardcoded map.
typ, ok := types.OidToType[oid.Oid(v.DInt)]
ret := &DOid{semanticType: t, DInt: v.DInt}
if !ok {
return ret, nil
}
ret.name = typ.PGName()
return ret, nil
default:
oid, err := queryOid(ctx, t, v)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/batcheval/cmd_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ func checkCanReceiveLease(rec EvalContext) error {
return errors.AssertionFailedf(
`could not find replica for store %s in %s`, rec.StoreID(), rec.Desc())
} else if t := repDesc.GetType(); t != roachpb.VOTER_FULL {
// NB: there's no harm in transferring the lease to a VOTER_INCOMING,
// but we disallow it anyway. On the other hand, transferring to
// VOTER_OUTGOING would be a pretty bad idea since those voters are
// dropped when transitioning out of the joint config, which then
// amounts to removing the leaseholder without any safety precautions.
// This would either wedge the range or allow illegal reads to be
// served.
//
// Since the leaseholder can't remove itself and is a VOTER_FULL, we
// also know that in any configuration there's at least one VOTER_FULL.
return errors.Errorf(`cannot transfer lease to replica of type %s`, t)
}
return nil
Expand Down
35 changes: 24 additions & 11 deletions pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,22 +188,22 @@ var _ purgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
) (roachpb.RangeDescriptor, enginepb.MVCCStats, float64, error) {
) (*roachpb.RangeDescriptor, enginepb.MVCCStats, float64, error) {
res, pErr := client.SendWrappedWith(ctx, mq.db.NonTransactionalSender(), roachpb.Header{
ReturnRangeInfo: true,
}, &roachpb.RangeStatsRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
})
if pErr != nil {
return roachpb.RangeDescriptor{}, enginepb.MVCCStats{}, 0, pErr.GoError()
return nil, enginepb.MVCCStats{}, 0, pErr.GoError()
}
rangeInfos := res.Header().RangeInfos
if len(rangeInfos) != 1 {
return roachpb.RangeDescriptor{}, enginepb.MVCCStats{}, 0, fmt.Errorf(
return nil, enginepb.MVCCStats{}, 0, fmt.Errorf(
"mergeQueue.requestRangeStats: response had %d range infos but exactly one was expected",
len(rangeInfos))
}
return rangeInfos[0].Desc, res.(*roachpb.RangeStatsResponse).MVCCStats,
return &rangeInfos[0].Desc, res.(*roachpb.RangeStatsResponse).MVCCStats,
res.(*roachpb.RangeStatsResponse).QueriesPerSecond, nil
}

Expand Down Expand Up @@ -275,21 +275,34 @@ func (mq *mergeQueue) process(
}

{
// AdminMerge errors if there are learners on either side and
// AdminRelocateRange removes any on the range it operates on. For the sake
// of obviousness, just remove them all upfront.
newLHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), lhsDesc)
store, db := lhsRepl.store, lhsRepl.store.DB()
// AdminMerge errors if there is a learner or joint config on either
// side and AdminRelocateRange removes any on the range it operates on.
// For the sake of obviousness, just fix this all upfront.
var err error
lhsDesc, err = maybeLeaveAtomicChangeReplicas(ctx, store, lhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
lhsDesc = newLHSDesc
newRHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), &rhsDesc)

lhsDesc, err = removeLearners(ctx, db, lhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}

rhsDesc, err = maybeLeaveAtomicChangeReplicas(ctx, store, rhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}

rhsDesc, err = removeLearners(ctx, db, rhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
rhsDesc = *newRHSDesc
}
lhsReplicas, rhsReplicas := lhsDesc.Replicas().All(), rhsDesc.Replicas().All()

Expand Down
72 changes: 48 additions & 24 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (r *Replica) adminSplitWithDescriptor(
// The split queue doesn't care about the set of replicas, so if we somehow
// are being handed one that's in a joint state, finalize that before
// continuing.
desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc)
desc, err = maybeLeaveAtomicChangeReplicas(ctx, r.store, desc)
if err != nil {
return roachpb.AdminSplitResponse{}, err
}
Expand Down Expand Up @@ -575,11 +575,21 @@ func (r *Replica) AdminMerge(
// are no non-voter replicas, in case some third type is later added).
// This behavior can be changed later if the complexity becomes worth
// it, but it's not right now.
//
// NB: the merge queue transitions out of any joint states and removes
// any learners it sees. It's sort of silly that we don't do that here
// instead; effectively any caller of AdminMerge that is not the merge
// queue won't be able to recover from these cases (though the replicate
// queues should fix things up quickly).
lReplicas, rReplicas := origLeftDesc.Replicas(), rightDesc.Replicas()
if len(lReplicas.Voters()) != len(lReplicas.All()) {

predFullVoter := func(rDesc roachpb.ReplicaDescriptor) bool {
return rDesc.GetType() == roachpb.VOTER_FULL
}
if len(lReplicas.Filter(predFullVoter)) != len(lReplicas.All()) {
return errors.Errorf("cannot merge range with non-voter replicas on lhs: %s", lReplicas)
}
if len(rReplicas.Voters()) != len(rReplicas.All()) {
if len(rReplicas.Filter(predFullVoter)) != len(rReplicas.All()) {
return errors.Errorf("cannot merge range with non-voter replicas on rhs: %s", rReplicas)
}
if !replicaSetsEqual(lReplicas.All(), rReplicas.All()) {
Expand Down Expand Up @@ -912,7 +922,7 @@ func (r *Replica) changeReplicasImpl(
// If in a joint config, clean up. The assumption here is that the caller
// of ChangeReplicas didn't even realize that they were holding on to a
// joint descriptor and would rather not have to deal with that fact.
desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc)
desc, err = maybeLeaveAtomicChangeReplicas(ctx, r.store, desc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -957,7 +967,7 @@ func (r *Replica) changeReplicasImpl(
if err != nil {
// If the error occurred while transitioning out of an atomic replication change,
// try again here with a fresh descriptor; this is a noop otherwise.
if _, err := r.maybeLeaveAtomicChangeReplicas(ctx, r.Desc()); err != nil {
if _, err := maybeLeaveAtomicChangeReplicas(ctx, r.store, r.Desc()); err != nil {
return nil, err
}
if fn := r.store.cfg.TestingKnobs.ReplicaAddSkipLearnerRollback; fn != nil && fn() {
Expand All @@ -978,8 +988,8 @@ func (r *Replica) changeReplicasImpl(
// maybeLeaveAtomicChangeReplicas transitions out of the joint configuration if
// the descriptor indicates one. This involves running a distributed transaction
// updating said descriptor, the result of which will be returned.
func (r *Replica) maybeLeaveAtomicChangeReplicas(
ctx context.Context, desc *roachpb.RangeDescriptor,
func maybeLeaveAtomicChangeReplicas(
ctx context.Context, store *Store, desc *roachpb.RangeDescriptor,
) (*roachpb.RangeDescriptor, error) {
// We want execChangeReplicasTxn to be able to make sure it's only tasked
// with leaving a joint state when it's in one, so make sure we don't call
Expand All @@ -988,12 +998,15 @@ func (r *Replica) maybeLeaveAtomicChangeReplicas(
return desc, nil
}

// NB: this is matched on in TestMergeQueueSeesLearner.
log.Eventf(ctx, "transitioning out of joint configuration %s", desc)

// NB: reason and detail won't be used because no range log event will be
// emitted.
//
// TODO(tbg): reconsider this.
return execChangeReplicasTxn(
ctx, r.store, desc, storagepb.ReasonUnknown /* unused */, "", nil, /* iChgs */
ctx, store, desc, storagepb.ReasonUnknown /* unused */, "", nil, /* iChgs */
)
}

Expand Down Expand Up @@ -1087,7 +1100,7 @@ func addLearnerReplicas(
// two distributed transactions. On error, it is possible that the range is in
// the intermediate ("joint") configuration in which a quorum of both the old
// and new sets of voters is required. If a range is encountered in this state,
// r.maybeLeaveAtomicReplicationChange can fix this, but it is the caller's
// maybeLeaveAtomicReplicationChange can fix this, but it is the caller's
// job to do this when necessary.
func (r *Replica) atomicReplicationChange(
ctx context.Context,
Expand Down Expand Up @@ -1140,10 +1153,8 @@ func (r *Replica) atomicReplicationChange(
}
}

if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil {
if fn() {
return desc, nil
}
if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil && fn() {
return desc, nil
}

for _, target := range chgs.Removals() {
Expand All @@ -1155,8 +1166,13 @@ func (r *Replica) atomicReplicationChange(
if err != nil {
return nil, err
}

if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterJointConfig; fn != nil && fn() {
return desc, nil
}

// Leave the joint config if we entered one.
return r.maybeLeaveAtomicChangeReplicas(ctx, desc)
return maybeLeaveAtomicChangeReplicas(ctx, r.store, desc)
}

// tryRollbackLearnerReplica attempts to remove a learner specified by the
Expand Down Expand Up @@ -1306,6 +1322,9 @@ func execChangeReplicasTxn(
}

useJoint := len(chgs) > 1
if fn := store.TestingKnobs().ReplicationAlwaysUseJointConfig; fn != nil && fn() {
useJoint = true
}
for _, chg := range chgs {
switch chg.typ {
case internalChangeTypeAddVoterViaPreemptiveSnap:
Expand All @@ -1320,21 +1339,26 @@ func execChangeReplicasTxn(
if useJoint {
typ = roachpb.VOTER_INCOMING
}
rDesc, ok := updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, typ)
if !ok {
rDesc, prevTyp, ok := updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, typ)
if !ok || prevTyp != roachpb.LEARNER {
return nil, errors.Errorf("cannot promote target %v which is missing as Learner", chg.target)
}
added = append(added, rDesc)
case internalChangeTypeRemove:
var rDesc roachpb.ReplicaDescriptor
var ok bool
if !useJoint {
rDesc, ok = updatedDesc.RemoveReplica(chg.target.NodeID, chg.target.StoreID)
} else {
rDesc, ok = updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, roachpb.VOTER_OUTGOING)
}
rDesc, ok := updatedDesc.GetReplicaDescriptor(chg.target.StoreID)
if !ok {
return nil, errors.Errorf("cannot remove nonexistent target %v", chg.target)
return nil, errors.Errorf("target %s not found", chg.target)
}
if typ := rDesc.GetType(); !useJoint || typ == roachpb.LEARNER {
rDesc, _ = updatedDesc.RemoveReplica(chg.target.NodeID, chg.target.StoreID)
} else {
// NB: typ is already known to be VOTER_FULL because of !InAtomicReplicationChange() above.
// We check it anyway.
var prevTyp roachpb.ReplicaType
rDesc, _, _ = updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, roachpb.VOTER_OUTGOING)
if prevTyp != roachpb.VOTER_FULL {
return nil, errors.Errorf("cannot transition from %s to VOTER_OUTGOING", prevTyp)
}
}
removed = append(removed, rDesc)
default:
Expand Down
Loading

0 comments on commit f81ac2b

Please sign in to comment.