Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: test atomic replication changes #40234

Merged
merged 27 commits into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ddf8e15
storage: rename a test
tbg Aug 27, 2019
63c49c5
storage: add ReplicaAddStopAfterJointConfig store knob
tbg Aug 27, 2019
5df1dc5
storage: add ReplicationAlwaysUseJointConfig testing knob
tbg Aug 27, 2019
e604fb1
testcluster: add RemoveReplicasOrFatal
tbg Aug 27, 2019
d3d8dd8
batcheval: explain ban on lease transfer to VOTER_OUTGOING
tbg Aug 27, 2019
30df723
storage: add TestJointConfigLease
tbg Aug 27, 2019
224441e
storage: test that incoming/outcoming voters don't serve follower reads
tbg Aug 27, 2019
c207c17
storage: test AdminMerge in presence of joint config
tbg Aug 27, 2019
1afd7b3
storage: test split when in joint configuration
tbg Aug 27, 2019
85eacfc
storage: test merge queue seeing joint configs
tbg Aug 27, 2019
4b523c7
storage: test replicaGCQueue seeing joint config
tbg Aug 27, 2019
b7a10e2
storage: test that replicate queue transitions out of joint configs
tbg Aug 27, 2019
6d983ec
fixup! storage: test that incoming/outcoming voters don't serve follo…
tbg Aug 27, 2019
235df82
fixup! storage: test merge queue seeing joint configs
tbg Aug 27, 2019
218476c
fixup! storage: test that replicate queue transitions out of joint co…
tbg Aug 27, 2019
0698167
fixup! storage: add ReplicationAlwaysUseJointConfig testing knob
tbg Aug 27, 2019
837c16c
fixup! storage: add ReplicaAddStopAfterJointConfig store knob
tbg Aug 27, 2019
5e234b4
fixup! storage: add TestJointConfigLease
tbg Aug 27, 2019
cdd70ff
fixup! storage: test merge queue seeing joint configs
tbg Aug 27, 2019
43b5d31
fixup! storage: test merge queue seeing joint configs
tbg Aug 27, 2019
732d3a7
fixup! storage: test that incoming/outcoming voters don't serve follo…
tbg Aug 27, 2019
9b3247b
fixup! batcheval: explain ban on lease transfer to VOTER_OUTGOING
tbg Aug 27, 2019
11eb483
fixup! storage: test merge queue seeing joint configs
tbg Aug 27, 2019
fb13fe7
fixup! storage: test merge queue seeing joint configs
tbg Aug 27, 2019
d69a54e
fixup! storage: test AdminMerge in presence of joint config
tbg Aug 27, 2019
ae17a5e
fixup! testcluster: add RemoveReplicasOrFatal
tbg Aug 27, 2019
4c3d5a5
fixup! storage: test AdminMerge in presence of joint config
tbg Aug 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,19 @@ func (r *RangeDescriptor) SetReplicas(replicas ReplicaDescriptors) {

// SetReplicaType changes the type of the replica with the given ID to the given
// type. Returns zero values if the replica was not found and the updated
// descriptor and true otherwise.
// descriptor, the previous type, and true, otherwise.
func (r *RangeDescriptor) SetReplicaType(
nodeID NodeID, storeID StoreID, typ ReplicaType,
) (ReplicaDescriptor, bool) {
) (ReplicaDescriptor, ReplicaType, bool) {
for i := range r.InternalReplicas {
desc := &r.InternalReplicas[i]
if desc.StoreID == storeID && desc.NodeID == nodeID {
prevTyp := desc.GetType()
desc.Type = &typ
return *desc, true
return *desc, prevTyp, true
}
}
return ReplicaDescriptor{}, false
return ReplicaDescriptor{}, 0, false
}

// AddReplica adds a replica on the given node and store with the supplied type.
Expand Down
63 changes: 35 additions & 28 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ func (d ReplicaDescriptors) All() []ReplicaDescriptor {
return d.wrapped
}

func predVoterFullOrIncoming(rDesc ReplicaDescriptor) bool {
switch rDesc.GetType() {
case VOTER_FULL, VOTER_INCOMING:
return true
default:
}
return false
}

func predLearner(rDesc ReplicaDescriptor) bool {
return rDesc.GetType() == LEARNER
}

// Voters returns the current and future voter replicas in the set. This means
// that during an atomic replication change, only the replicas that will be
// voters once the change completes will be returned; "outgoing" voters will not
Expand All @@ -102,26 +115,7 @@ func (d ReplicaDescriptors) All() []ReplicaDescriptor {
// different subset of voters. Consider renaming this method so that it's
// more descriptive.
func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
// Fastpath, most of the time, everything is a voter, so special case that and
// save the alloc.
fastpath := true
for i := range d.wrapped {
if d.wrapped[i].GetType() != VOTER_FULL {
fastpath = false
break
}
}
if fastpath {
return d.wrapped
}
voters := make([]ReplicaDescriptor, 0, len(d.wrapped))
for i := range d.wrapped {
switch d.wrapped[i].GetType() {
case VOTER_FULL, VOTER_INCOMING:
voters = append(voters, d.wrapped[i])
}
}
return voters
return d.Filter(predVoterFullOrIncoming)
}

// Learners returns the learner replicas in the set. This may allocate, but it
Expand Down Expand Up @@ -211,18 +205,31 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
//
// For some related mega-comments, see Replica.sendSnapshot.
func (d ReplicaDescriptors) Learners() []ReplicaDescriptor {
// Fastpath, most of the time, everything is a voter, so special case that and
// save the alloc.
var learners []ReplicaDescriptor
return d.Filter(predLearner)
}

// Filter returns only the replica descriptors for which the supplied method
// returns true. The memory returned may be shared with the receiver.
func (d ReplicaDescriptors) Filter(
pred func(descriptor ReplicaDescriptor) bool,
) []ReplicaDescriptor {
// Fast path when all or none match to avoid allocations.
fastpath := true
out := d.wrapped
for i := range d.wrapped {
if d.wrapped[i].GetType() == LEARNER {
if learners == nil {
learners = make([]ReplicaDescriptor, 0, len(d.wrapped)-i)
if pred(d.wrapped[i]) {
if !fastpath {
out = append(out, d.wrapped[i])
}
} else {
if fastpath {
out = nil
out = append(out, d.wrapped[:i]...)
fastpath = false
}
learners = append(learners, d.wrapped[i])
}
}
return learners
return out
}

// AsProto returns the protobuf representation of these replicas, suitable for
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/batcheval/cmd_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ func checkCanReceiveLease(rec EvalContext) error {
return errors.AssertionFailedf(
`could not find replica for store %s in %s`, rec.StoreID(), rec.Desc())
} else if t := repDesc.GetType(); t != roachpb.VOTER_FULL {
// NB: there's no harm in transferring the lease to a VOTER_INCOMING,
// but we disallow it anyway. On the other hand, transferring to
// VOTER_OUTGOING would be a pretty bad idea since those voters are
// dropped when transitioning out of the joint config, which then
// amounts to removing the leaseholder without any safety precautions.
// This would either wedge the range or allow illegal reads to be
// served.
return errors.Errorf(`cannot transfer lease to replica of type %s`, t)
}
return nil
Expand Down
34 changes: 23 additions & 11 deletions pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,22 +188,22 @@ var _ purgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
) (roachpb.RangeDescriptor, enginepb.MVCCStats, float64, error) {
) (*roachpb.RangeDescriptor, enginepb.MVCCStats, float64, error) {
res, pErr := client.SendWrappedWith(ctx, mq.db.NonTransactionalSender(), roachpb.Header{
ReturnRangeInfo: true,
}, &roachpb.RangeStatsRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
})
if pErr != nil {
return roachpb.RangeDescriptor{}, enginepb.MVCCStats{}, 0, pErr.GoError()
return nil, enginepb.MVCCStats{}, 0, pErr.GoError()
}
rangeInfos := res.Header().RangeInfos
if len(rangeInfos) != 1 {
return roachpb.RangeDescriptor{}, enginepb.MVCCStats{}, 0, fmt.Errorf(
return nil, enginepb.MVCCStats{}, 0, fmt.Errorf(
"mergeQueue.requestRangeStats: response had %d range infos but exactly one was expected",
len(rangeInfos))
}
return rangeInfos[0].Desc, res.(*roachpb.RangeStatsResponse).MVCCStats,
return &rangeInfos[0].Desc, res.(*roachpb.RangeStatsResponse).MVCCStats,
res.(*roachpb.RangeStatsResponse).QueriesPerSecond, nil
}

Expand Down Expand Up @@ -275,21 +275,33 @@ func (mq *mergeQueue) process(
}

{
// AdminMerge errors if there are learners on either side and
// AdminRelocateRange removes any on the range it operates on. For the sake
// of obviousness, just remove them all upfront.
newLHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), lhsDesc)
// AdminMerge errors if there is a learner or joint config on either
// side and AdminRelocateRange removes any on the range it operates on.
// For the sake of obviousness, just fix this all upfront.
var err error
lhsDesc, err = maybeLeaveAtomicChangeReplicas(ctx, lhsRepl.store, lhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
lhsDesc = newLHSDesc
newRHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), &rhsDesc)

lhsDesc, err = removeLearners(ctx, lhsRepl.store.DB(), lhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}

rhsDesc, err = maybeLeaveAtomicChangeReplicas(ctx, lhsRepl.store, rhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}

rhsDesc, err = removeLearners(ctx, lhsRepl.store.DB(), rhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
rhsDesc = *newRHSDesc
}
lhsReplicas, rhsReplicas := lhsDesc.Replicas().All(), rhsDesc.Replicas().All()

Expand Down
72 changes: 48 additions & 24 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (r *Replica) adminSplitWithDescriptor(
// The split queue doesn't care about the set of replicas, so if we somehow
// are being handed one that's in a joint state, finalize that before
// continuing.
desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc)
desc, err = maybeLeaveAtomicChangeReplicas(ctx, r.store, desc)
if err != nil {
return roachpb.AdminSplitResponse{}, err
}
Expand Down Expand Up @@ -584,11 +584,21 @@ func (r *Replica) AdminMerge(
// are no non-voter replicas, in case some third type is later added).
// This behavior can be changed later if the complexity becomes worth
// it, but it's not right now.
//
// NB: the merge queue transitions out of any joint states and removes
// any learners it sees. It's sort of silly that we don't do that here
// instead; effectively any caller of AdminMerge that is not the merge
// queue won't be able to recover from these cases (though the replicate
// queues should fix things up quickly).
lReplicas, rReplicas := origLeftDesc.Replicas(), rightDesc.Replicas()
if len(lReplicas.Voters()) != len(lReplicas.All()) {

predFullVoter := func(rDesc roachpb.ReplicaDescriptor) bool {
return rDesc.GetType() == roachpb.VOTER_FULL
}
if len(lReplicas.Filter(predFullVoter)) != len(lReplicas.All()) {
return errors.Errorf("cannot merge range with non-voter replicas on lhs: %s", lReplicas)
}
if len(rReplicas.Voters()) != len(rReplicas.All()) {
if len(rReplicas.Filter(predFullVoter)) != len(rReplicas.All()) {
return errors.Errorf("cannot merge range with non-voter replicas on rhs: %s", rReplicas)
}
if !replicaSetsEqual(lReplicas.All(), rReplicas.All()) {
Expand Down Expand Up @@ -923,7 +933,7 @@ func (r *Replica) changeReplicasImpl(
// If in a joint config, clean up. The assumption here is that the caller
// of ChangeReplicas didn't even realize that they were holding on to a
// joint descriptor and would rather not have to deal with that fact.
desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc)
desc, err = maybeLeaveAtomicChangeReplicas(ctx, r.store, desc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -970,7 +980,7 @@ func (r *Replica) changeReplicasImpl(
if err != nil {
// If the error occurred while transitioning out of an atomic replication change,
// try again here with a fresh descriptor; this is a noop otherwise.
if _, err := r.maybeLeaveAtomicChangeReplicas(ctx, r.Desc()); err != nil {
if _, err := maybeLeaveAtomicChangeReplicas(ctx, r.store, r.Desc()); err != nil {
return nil, err
}
if fn := r.store.cfg.TestingKnobs.ReplicaAddSkipLearnerRollback; fn != nil && fn() {
Expand All @@ -991,8 +1001,8 @@ func (r *Replica) changeReplicasImpl(
// maybeLeaveAtomicChangeReplicas transitions out of the joint configuration if
// the descriptor indicates one. This involves running a distributed transaction
// updating said descriptor, the result of which will be returned.
func (r *Replica) maybeLeaveAtomicChangeReplicas(
ctx context.Context, desc *roachpb.RangeDescriptor,
func maybeLeaveAtomicChangeReplicas(
ctx context.Context, store *Store, desc *roachpb.RangeDescriptor,
) (*roachpb.RangeDescriptor, error) {
// We want execChangeReplicasTxn to be able to make sure it's only tasked
// with leaving a joint state when it's in one, so make sure we don't call
Expand All @@ -1001,12 +1011,15 @@ func (r *Replica) maybeLeaveAtomicChangeReplicas(
return desc, nil
}

// NB: this is matched on in TestMergeQueueSeesLearner.
log.Eventf(ctx, "transitioning out of joint configuration %s", desc)

// NB: reason and detail won't be used because no range log event will be
// emitted.
//
// TODO(tbg): reconsider this.
return execChangeReplicasTxn(
ctx, r.store, desc, storagepb.ReasonUnknown /* unused */, "", nil, /* iChgs */
ctx, store, desc, storagepb.ReasonUnknown /* unused */, "", nil, /* iChgs */
)
}

Expand Down Expand Up @@ -1100,7 +1113,7 @@ func addLearnerReplicas(
// two distributed transactions. On error, it is possible that the range is in
// the intermediate ("joint") configuration in which a quorum of both the old
// and new sets of voters is required. If a range is encountered in this state,
// r.maybeLeaveAtomicReplicationChange can fix this, but it is the caller's
// maybeLeaveAtomicReplicationChange can fix this, but it is the caller's
// job to do this when necessary.
func (r *Replica) atomicReplicationChange(
ctx context.Context,
Expand Down Expand Up @@ -1153,10 +1166,8 @@ func (r *Replica) atomicReplicationChange(
}
}

if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil {
if fn() {
return desc, nil
}
if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil && fn() {
return desc, nil
}

for _, target := range chgs.Removals() {
Expand All @@ -1168,8 +1179,13 @@ func (r *Replica) atomicReplicationChange(
if err != nil {
return nil, err
}

if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterJointConfig; fn != nil && fn() {
return desc, nil
}

// Leave the joint config if we entered one.
return r.maybeLeaveAtomicChangeReplicas(ctx, desc)
return maybeLeaveAtomicChangeReplicas(ctx, r.store, desc)
}

// tryRollbackLearnerReplica attempts to remove a learner specified by the
Expand Down Expand Up @@ -1319,6 +1335,9 @@ func execChangeReplicasTxn(
}

useJoint := len(chgs) > 1
if fn := store.TestingKnobs().ReplicationAlwaysUseJointConfig; fn != nil && fn() {
useJoint = true
}
for _, chg := range chgs {
switch chg.typ {
case internalChangeTypeAddVoterViaPreemptiveSnap:
Expand All @@ -1333,21 +1352,26 @@ func execChangeReplicasTxn(
if useJoint {
typ = roachpb.VOTER_INCOMING
}
rDesc, ok := updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, typ)
if !ok {
rDesc, prevTyp, ok := updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, typ)
if !ok || prevTyp != roachpb.LEARNER {
return nil, errors.Errorf("cannot promote target %v which is missing as Learner", chg.target)
}
added = append(added, rDesc)
case internalChangeTypeRemove:
var rDesc roachpb.ReplicaDescriptor
var ok bool
if !useJoint {
rDesc, ok = updatedDesc.RemoveReplica(chg.target.NodeID, chg.target.StoreID)
} else {
rDesc, ok = updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, roachpb.VOTER_OUTGOING)
}
rDesc, ok := updatedDesc.GetReplicaDescriptor(chg.target.StoreID)
if !ok {
return nil, errors.Errorf("cannot remove nonexistent target %v", chg.target)
return nil, errors.Errorf("target %s not found", chg.target)
}
if typ := rDesc.GetType(); !useJoint || typ != roachpb.VOTER_FULL {
// NB: typ != VOTER_FULL means it's a LEARNER since we verified above that we
// did not start in a joint config.
rDesc, _ = updatedDesc.RemoveReplica(chg.target.NodeID, chg.target.StoreID)
} else {
var prevTyp roachpb.ReplicaType
rDesc, prevTyp, _ = updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, roachpb.VOTER_OUTGOING)
if prevTyp != roachpb.VOTER_FULL {
return nil, errors.Errorf("cannot transition from %s to VOTER_OUTGOING", prevTyp)
}
}
removed = append(removed, rDesc)
default:
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,17 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting(
func (r *Replica) canServeFollowerRead(
ctx context.Context, ba *roachpb.BatchRequest, pErr *roachpb.Error,
) *roachpb.Error {
// There's no known reason that a learner replica couldn't serve follower
// reads (or RangeFeed), but as of the time of writing, learners are expected
// There's no known reason that a non-VOTER_FULL replica couldn't serve follower
// reads (or RangeFeed), but as of the time of writing, these are expected
// to be short-lived, so it's not worth working out the edge-cases. Revisit if
// we add long-lived learners.
// we add long-lived learners or feel that incoming/outgoing voters also need
// to be able to serve follower reads.
repDesc, err := r.GetReplicaDescriptor()
if err != nil {
return roachpb.NewError(err)
}
if repDesc.GetType() == roachpb.LEARNER {
log.Event(ctx, "learner replicas cannot serve follower reads")
if typ := repDesc.GetType(); typ != roachpb.VOTER_FULL {
log.Eventf(ctx, "%s replicas cannot serve follower reads", typ)
return pErr
}

Expand Down
Loading