diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 63855311beb5..c9c63072bd6a 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -137,18 +137,19 @@ func (r *RangeDescriptor) SetReplicas(replicas ReplicaDescriptors) { // SetReplicaType changes the type of the replica with the given ID to the given // type. Returns zero values if the replica was not found and the updated -// descriptor and true otherwise. +// descriptor, the previous type, and true, otherwise. func (r *RangeDescriptor) SetReplicaType( nodeID NodeID, storeID StoreID, typ ReplicaType, -) (ReplicaDescriptor, bool) { +) (ReplicaDescriptor, ReplicaType, bool) { for i := range r.InternalReplicas { desc := &r.InternalReplicas[i] if desc.StoreID == storeID && desc.NodeID == nodeID { + prevTyp := desc.GetType() desc.Type = &typ - return *desc, true + return *desc, prevTyp, true } } - return ReplicaDescriptor{}, false + return ReplicaDescriptor{}, 0, false } // AddReplica adds a replica on the given node and store with the supplied type. diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 7d64c9151251..d13d8c4ef075 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -88,6 +88,19 @@ func (d ReplicaDescriptors) All() []ReplicaDescriptor { return d.wrapped } +func predVoterFullOrIncoming(rDesc ReplicaDescriptor) bool { + switch rDesc.GetType() { + case VOTER_FULL, VOTER_INCOMING: + return true + default: + } + return false +} + +func predLearner(rDesc ReplicaDescriptor) bool { + return rDesc.GetType() == LEARNER +} + // Voters returns the current and future voter replicas in the set. This means // that during an atomic replication change, only the replicas that will be // voters once the change completes will be returned; "outgoing" voters will not @@ -102,26 +115,7 @@ func (d ReplicaDescriptors) All() []ReplicaDescriptor { // different subset of voters. Consider renaming this method so that it's // more descriptive. func (d ReplicaDescriptors) Voters() []ReplicaDescriptor { - // Fastpath, most of the time, everything is a voter, so special case that and - // save the alloc. - fastpath := true - for i := range d.wrapped { - if d.wrapped[i].GetType() != VOTER_FULL { - fastpath = false - break - } - } - if fastpath { - return d.wrapped - } - voters := make([]ReplicaDescriptor, 0, len(d.wrapped)) - for i := range d.wrapped { - switch d.wrapped[i].GetType() { - case VOTER_FULL, VOTER_INCOMING: - voters = append(voters, d.wrapped[i]) - } - } - return voters + return d.Filter(predVoterFullOrIncoming) } // Learners returns the learner replicas in the set. This may allocate, but it @@ -211,18 +205,29 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor { // // For some related mega-comments, see Replica.sendSnapshot. func (d ReplicaDescriptors) Learners() []ReplicaDescriptor { - // Fastpath, most of the time, everything is a voter, so special case that and - // save the alloc. - var learners []ReplicaDescriptor + return d.Filter(predLearner) +} + +// Filter returns only the replica descriptors for which the supplied method +// returns true. The memory returned may be shared with the receiver. +func (d ReplicaDescriptors) Filter(pred func(rDesc ReplicaDescriptor) bool) []ReplicaDescriptor { + // Fast path when all or none match to avoid allocations. + fastpath := true + out := d.wrapped for i := range d.wrapped { - if d.wrapped[i].GetType() == LEARNER { - if learners == nil { - learners = make([]ReplicaDescriptor, 0, len(d.wrapped)-i) + if pred(d.wrapped[i]) { + if !fastpath { + out = append(out, d.wrapped[i]) + } + } else { + if fastpath { + out = nil + out = append(out, d.wrapped[:i]...) + fastpath = false } - learners = append(learners, d.wrapped[i]) } } - return learners + return out } // AsProto returns the protobuf representation of these replicas, suitable for diff --git a/pkg/storage/batcheval/cmd_lease.go b/pkg/storage/batcheval/cmd_lease.go index ba597922fda5..87b12cb433aa 100644 --- a/pkg/storage/batcheval/cmd_lease.go +++ b/pkg/storage/batcheval/cmd_lease.go @@ -48,6 +48,16 @@ func checkCanReceiveLease(rec EvalContext) error { return errors.AssertionFailedf( `could not find replica for store %s in %s`, rec.StoreID(), rec.Desc()) } else if t := repDesc.GetType(); t != roachpb.VOTER_FULL { + // NB: there's no harm in transferring the lease to a VOTER_INCOMING, + // but we disallow it anyway. On the other hand, transferring to + // VOTER_OUTGOING would be a pretty bad idea since those voters are + // dropped when transitioning out of the joint config, which then + // amounts to removing the leaseholder without any safety precautions. + // This would either wedge the range or allow illegal reads to be + // served. + // + // Since the leaseholder can't remove itself and is a VOTER_FULL, we + // also know that in any configuration there's at least one VOTER_FULL. return errors.Errorf(`cannot transfer lease to replica of type %s`, t) } return nil diff --git a/pkg/storage/merge_queue.go b/pkg/storage/merge_queue.go index 08557107cf2b..1d59b2300fa4 100644 --- a/pkg/storage/merge_queue.go +++ b/pkg/storage/merge_queue.go @@ -188,22 +188,22 @@ var _ purgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, -) (roachpb.RangeDescriptor, enginepb.MVCCStats, float64, error) { +) (*roachpb.RangeDescriptor, enginepb.MVCCStats, float64, error) { res, pErr := client.SendWrappedWith(ctx, mq.db.NonTransactionalSender(), roachpb.Header{ ReturnRangeInfo: true, }, &roachpb.RangeStatsRequest{ RequestHeader: roachpb.RequestHeader{Key: key}, }) if pErr != nil { - return roachpb.RangeDescriptor{}, enginepb.MVCCStats{}, 0, pErr.GoError() + return nil, enginepb.MVCCStats{}, 0, pErr.GoError() } rangeInfos := res.Header().RangeInfos if len(rangeInfos) != 1 { - return roachpb.RangeDescriptor{}, enginepb.MVCCStats{}, 0, fmt.Errorf( + return nil, enginepb.MVCCStats{}, 0, fmt.Errorf( "mergeQueue.requestRangeStats: response had %d range infos but exactly one was expected", len(rangeInfos)) } - return rangeInfos[0].Desc, res.(*roachpb.RangeStatsResponse).MVCCStats, + return &rangeInfos[0].Desc, res.(*roachpb.RangeStatsResponse).MVCCStats, res.(*roachpb.RangeStatsResponse).QueriesPerSecond, nil } @@ -275,21 +275,34 @@ func (mq *mergeQueue) process( } { - // AdminMerge errors if there are learners on either side and - // AdminRelocateRange removes any on the range it operates on. For the sake - // of obviousness, just remove them all upfront. - newLHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), lhsDesc) + store, db := lhsRepl.store, lhsRepl.store.DB() + // AdminMerge errors if there is a learner or joint config on either + // side and AdminRelocateRange removes any on the range it operates on. + // For the sake of obviousness, just fix this all upfront. + var err error + lhsDesc, err = maybeLeaveAtomicChangeReplicas(ctx, store, lhsDesc) if err != nil { log.VEventf(ctx, 2, `%v`, err) return err } - lhsDesc = newLHSDesc - newRHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), &rhsDesc) + + lhsDesc, err = removeLearners(ctx, db, lhsDesc) + if err != nil { + log.VEventf(ctx, 2, `%v`, err) + return err + } + + rhsDesc, err = maybeLeaveAtomicChangeReplicas(ctx, store, rhsDesc) + if err != nil { + log.VEventf(ctx, 2, `%v`, err) + return err + } + + rhsDesc, err = removeLearners(ctx, db, rhsDesc) if err != nil { log.VEventf(ctx, 2, `%v`, err) return err } - rhsDesc = *newRHSDesc } lhsReplicas, rhsReplicas := lhsDesc.Replicas().All(), rhsDesc.Replicas().All() diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 77071b82c3ad..59f3ee603925 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -147,7 +147,7 @@ func (r *Replica) adminSplitWithDescriptor( // The split queue doesn't care about the set of replicas, so if we somehow // are being handed one that's in a joint state, finalize that before // continuing. - desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc) + desc, err = maybeLeaveAtomicChangeReplicas(ctx, r.store, desc) if err != nil { return roachpb.AdminSplitResponse{}, err } @@ -584,11 +584,21 @@ func (r *Replica) AdminMerge( // are no non-voter replicas, in case some third type is later added). // This behavior can be changed later if the complexity becomes worth // it, but it's not right now. + // + // NB: the merge queue transitions out of any joint states and removes + // any learners it sees. It's sort of silly that we don't do that here + // instead; effectively any caller of AdminMerge that is not the merge + // queue won't be able to recover from these cases (though the replicate + // queues should fix things up quickly). lReplicas, rReplicas := origLeftDesc.Replicas(), rightDesc.Replicas() - if len(lReplicas.Voters()) != len(lReplicas.All()) { + + predFullVoter := func(rDesc roachpb.ReplicaDescriptor) bool { + return rDesc.GetType() == roachpb.VOTER_FULL + } + if len(lReplicas.Filter(predFullVoter)) != len(lReplicas.All()) { return errors.Errorf("cannot merge range with non-voter replicas on lhs: %s", lReplicas) } - if len(rReplicas.Voters()) != len(rReplicas.All()) { + if len(rReplicas.Filter(predFullVoter)) != len(rReplicas.All()) { return errors.Errorf("cannot merge range with non-voter replicas on rhs: %s", rReplicas) } if !replicaSetsEqual(lReplicas.All(), rReplicas.All()) { @@ -923,7 +933,7 @@ func (r *Replica) changeReplicasImpl( // If in a joint config, clean up. The assumption here is that the caller // of ChangeReplicas didn't even realize that they were holding on to a // joint descriptor and would rather not have to deal with that fact. - desc, err = r.maybeLeaveAtomicChangeReplicas(ctx, desc) + desc, err = maybeLeaveAtomicChangeReplicas(ctx, r.store, desc) if err != nil { return nil, err } @@ -970,7 +980,7 @@ func (r *Replica) changeReplicasImpl( if err != nil { // If the error occurred while transitioning out of an atomic replication change, // try again here with a fresh descriptor; this is a noop otherwise. - if _, err := r.maybeLeaveAtomicChangeReplicas(ctx, r.Desc()); err != nil { + if _, err := maybeLeaveAtomicChangeReplicas(ctx, r.store, r.Desc()); err != nil { return nil, err } if fn := r.store.cfg.TestingKnobs.ReplicaAddSkipLearnerRollback; fn != nil && fn() { @@ -991,8 +1001,8 @@ func (r *Replica) changeReplicasImpl( // maybeLeaveAtomicChangeReplicas transitions out of the joint configuration if // the descriptor indicates one. This involves running a distributed transaction // updating said descriptor, the result of which will be returned. -func (r *Replica) maybeLeaveAtomicChangeReplicas( - ctx context.Context, desc *roachpb.RangeDescriptor, +func maybeLeaveAtomicChangeReplicas( + ctx context.Context, store *Store, desc *roachpb.RangeDescriptor, ) (*roachpb.RangeDescriptor, error) { // We want execChangeReplicasTxn to be able to make sure it's only tasked // with leaving a joint state when it's in one, so make sure we don't call @@ -1001,12 +1011,15 @@ func (r *Replica) maybeLeaveAtomicChangeReplicas( return desc, nil } + // NB: this is matched on in TestMergeQueueSeesLearner. + log.Eventf(ctx, "transitioning out of joint configuration %s", desc) + // NB: reason and detail won't be used because no range log event will be // emitted. // // TODO(tbg): reconsider this. return execChangeReplicasTxn( - ctx, r.store, desc, storagepb.ReasonUnknown /* unused */, "", nil, /* iChgs */ + ctx, store, desc, storagepb.ReasonUnknown /* unused */, "", nil, /* iChgs */ ) } @@ -1100,7 +1113,7 @@ func addLearnerReplicas( // two distributed transactions. On error, it is possible that the range is in // the intermediate ("joint") configuration in which a quorum of both the old // and new sets of voters is required. If a range is encountered in this state, -// r.maybeLeaveAtomicReplicationChange can fix this, but it is the caller's +// maybeLeaveAtomicReplicationChange can fix this, but it is the caller's // job to do this when necessary. func (r *Replica) atomicReplicationChange( ctx context.Context, @@ -1153,10 +1166,8 @@ func (r *Replica) atomicReplicationChange( } } - if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil { - if fn() { - return desc, nil - } + if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil && fn() { + return desc, nil } for _, target := range chgs.Removals() { @@ -1168,8 +1179,13 @@ func (r *Replica) atomicReplicationChange( if err != nil { return nil, err } + + if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterJointConfig; fn != nil && fn() { + return desc, nil + } + // Leave the joint config if we entered one. - return r.maybeLeaveAtomicChangeReplicas(ctx, desc) + return maybeLeaveAtomicChangeReplicas(ctx, r.store, desc) } // tryRollbackLearnerReplica attempts to remove a learner specified by the @@ -1319,6 +1335,9 @@ func execChangeReplicasTxn( } useJoint := len(chgs) > 1 + if fn := store.TestingKnobs().ReplicationAlwaysUseJointConfig; fn != nil && fn() { + useJoint = true + } for _, chg := range chgs { switch chg.typ { case internalChangeTypeAddVoterViaPreemptiveSnap: @@ -1333,21 +1352,26 @@ func execChangeReplicasTxn( if useJoint { typ = roachpb.VOTER_INCOMING } - rDesc, ok := updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, typ) - if !ok { + rDesc, prevTyp, ok := updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, typ) + if !ok || prevTyp != roachpb.LEARNER { return nil, errors.Errorf("cannot promote target %v which is missing as Learner", chg.target) } added = append(added, rDesc) case internalChangeTypeRemove: - var rDesc roachpb.ReplicaDescriptor - var ok bool - if !useJoint { - rDesc, ok = updatedDesc.RemoveReplica(chg.target.NodeID, chg.target.StoreID) - } else { - rDesc, ok = updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, roachpb.VOTER_OUTGOING) - } + rDesc, ok := updatedDesc.GetReplicaDescriptor(chg.target.StoreID) if !ok { - return nil, errors.Errorf("cannot remove nonexistent target %v", chg.target) + return nil, errors.Errorf("target %s not found", chg.target) + } + if typ := rDesc.GetType(); !useJoint || typ == roachpb.LEARNER { + rDesc, _ = updatedDesc.RemoveReplica(chg.target.NodeID, chg.target.StoreID) + } else { + // NB: typ is already known to be VOTER_FULL because of !InAtomicReplicationChange() above. + // We check it anyway. + var prevTyp roachpb.ReplicaType + rDesc, _, _ = updatedDesc.SetReplicaType(chg.target.NodeID, chg.target.StoreID, roachpb.VOTER_OUTGOING) + if prevTyp != roachpb.VOTER_FULL { + return nil, errors.Errorf("cannot transition from %s to VOTER_OUTGOING", prevTyp) + } } removed = append(removed, rDesc) default: diff --git a/pkg/storage/replica_follower_read.go b/pkg/storage/replica_follower_read.go index 807c85a383b0..5344470562e4 100644 --- a/pkg/storage/replica_follower_read.go +++ b/pkg/storage/replica_follower_read.go @@ -37,16 +37,17 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting( func (r *Replica) canServeFollowerRead( ctx context.Context, ba *roachpb.BatchRequest, pErr *roachpb.Error, ) *roachpb.Error { - // There's no known reason that a learner replica couldn't serve follower - // reads (or RangeFeed), but as of the time of writing, learners are expected + // There's no known reason that a non-VOTER_FULL replica couldn't serve follower + // reads (or RangeFeed), but as of the time of writing, these are expected // to be short-lived, so it's not worth working out the edge-cases. Revisit if - // we add long-lived learners. + // we add long-lived learners or feel that incoming/outgoing voters also need + // to be able to serve follower reads. repDesc, err := r.GetReplicaDescriptor() if err != nil { return roachpb.NewError(err) } - if repDesc.GetType() == roachpb.LEARNER { - log.Event(ctx, "learner replicas cannot serve follower reads") + if typ := repDesc.GetType(); typ != roachpb.VOTER_FULL { + log.Eventf(ctx, "%s replicas cannot serve follower reads", typ) return pErr } diff --git a/pkg/storage/replica_learner_test.go b/pkg/storage/replica_learner_test.go index 7a19a5bb97c5..028481bb96ac 100644 --- a/pkg/storage/replica_learner_test.go +++ b/pkg/storage/replica_learner_test.go @@ -38,16 +38,45 @@ import ( "github.com/stretchr/testify/require" ) -type learnerTestKnobs struct { +func predIncoming(rDesc roachpb.ReplicaDescriptor) bool { + return rDesc.GetType() == roachpb.VOTER_INCOMING +} +func predOutgoing(rDesc roachpb.ReplicaDescriptor) bool { + return rDesc.GetType() == roachpb.VOTER_OUTGOING +} + +type replicationTestKnobs struct { storeKnobs storage.StoreTestingKnobs replicaAddStopAfterLearnerAtomic int64 + replicaAddStopAfterJointConfig int64 + replicationAlwaysUseJointConfig int64 +} + +func (rtl *replicationTestKnobs) withStopAfterLearnerAtomic(f func()) { + prev := atomic.SwapInt64(&rtl.replicaAddStopAfterLearnerAtomic, 1) + defer atomic.StoreInt64(&rtl.replicaAddStopAfterLearnerAtomic, prev) + f() +} + +func (rtl *replicationTestKnobs) withStopAfterJointConfig(f func()) { + au := atomic.SwapInt64(&rtl.replicationAlwaysUseJointConfig, 1) + sa := atomic.SwapInt64(&rtl.replicaAddStopAfterJointConfig, 1) + defer atomic.StoreInt64(&rtl.replicationAlwaysUseJointConfig, au) + defer atomic.StoreInt64(&rtl.replicaAddStopAfterJointConfig, sa) + f() } -func makeLearnerTestKnobs() (base.TestingKnobs, *learnerTestKnobs) { - var k learnerTestKnobs +func makeReplicationTestKnobs() (base.TestingKnobs, *replicationTestKnobs) { + var k replicationTestKnobs k.storeKnobs.ReplicaAddStopAfterLearnerSnapshot = func() bool { return atomic.LoadInt64(&k.replicaAddStopAfterLearnerAtomic) > 0 } + k.storeKnobs.ReplicaAddStopAfterJointConfig = func() bool { + return atomic.LoadInt64(&k.replicaAddStopAfterJointConfig) > 0 + } + k.storeKnobs.ReplicationAlwaysUseJointConfig = func() bool { + return atomic.LoadInt64(&k.replicationAlwaysUseJointConfig) > 0 + } return base.TestingKnobs{Store: &k.storeKnobs}, &k } @@ -106,7 +135,7 @@ func TestAddReplicaViaLearner(t *testing.T) { blockUntilSnapshotCh := make(chan struct{}) blockSnapshotsCh := make(chan struct{}) - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error { close(blockUntilSnapshotCh) select { @@ -189,7 +218,7 @@ func TestLearnerRaftConfState(t *testing.T) { dir, cleanup := testutils.TempDir(t) defer cleanup() - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() ctx := context.Background() const numNodes = 2 serverArgsPerNode := make(map[int]base.TestServerArgs) @@ -215,9 +244,10 @@ func TestLearnerRaftConfState(t *testing.T) { // Add a learner replica, send a snapshot so that it's materialized as a // Replica on the Store, but don't promote it to a voter. scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) - desc := tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + var desc roachpb.RangeDescriptor + ltk.withStopAfterLearnerAtomic(func() { + desc = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + }) require.Len(t, desc.Replicas().Learners(), 1) learnerReplicaID := desc.Replicas().Learners()[0].ReplicaID @@ -246,7 +276,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { defer leaktest.AfterTest(t)() var rejectSnapshots int64 - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error { if atomic.LoadInt64(&rejectSnapshots) > 0 { return errors.New(`nope`) @@ -278,11 +308,11 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { require.Empty(t, desc.Replicas().Learners()) } -func TestSplitWithLearner(t *testing.T) { +func TestSplitWithLearnerOrJointConfig(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, ReplicationMode: base.ReplicationManual, @@ -294,9 +324,9 @@ func TestSplitWithLearner(t *testing.T) { // Add a learner replica, send a snapshot so that it's materialized as a // Replica on the Store, but don't promote it to a voter. scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) - _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + ltk.withStopAfterLearnerAtomic(func() { + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + }) // Splitting a learner is allowed. This orphans the two learners, but the // replication queue will eventually clean this up. @@ -304,14 +334,28 @@ func TestSplitWithLearner(t *testing.T) { require.NoError(t, err) require.Len(t, left.Replicas().Learners(), 1) require.Len(t, right.Replicas().Learners(), 1) + + // Remove the learner on the RHS. + right = tc.RemoveReplicasOrFatal(t, right.StartKey.AsRawKey(), tc.Target(1)) + + // Put an incoming voter on the RHS and split again. This works because the + // split auto-transitions us out of the joint conf before doing work. + atomic.StoreInt64(<k.replicationAlwaysUseJointConfig, 1) + atomic.StoreInt64(<k.replicaAddStopAfterJointConfig, 1) + right = tc.AddReplicasOrFatal(t, right.StartKey.AsRawKey(), tc.Target(1)) + require.Len(t, right.Replicas().Filter(predIncoming), 1) + left, right, err = tc.SplitRange(right.StartKey.AsRawKey().Next()) + require.NoError(t, err) + require.False(t, left.Replicas().InAtomicReplicationChange(), left) + require.False(t, right.Replicas().InAtomicReplicationChange(), right) } -func TestReplicateQueueSeesLearner(t *testing.T) { +func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) { defer leaktest.AfterTest(t)() // NB also see TestAllocatorRemoveLearner for a lower-level test. ctx := context.Background() - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, ReplicationMode: base.ReplicationManual, @@ -323,31 +367,54 @@ func TestReplicateQueueSeesLearner(t *testing.T) { // Add a learner replica, send a snapshot so that it's materialized as a // Replica on the Store, but don't promote it to a voter. scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) - _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + ltk.withStopAfterLearnerAtomic(func() { + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + }) // Run the replicate queue. store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`)) - _, errMsg, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */) - require.NoError(t, err) - require.Equal(t, ``, errMsg) - require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`)) + { + require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`)) + _, errMsg, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`)) - // Make sure it deleted the learner. - desc := tc.LookupRangeOrFatal(t, scratchStartKey) - require.Empty(t, desc.Replicas().Learners()) + // Make sure it deleted the learner. + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + require.Empty(t, desc.Replicas().Learners()) + + // Bonus points: the replicate queue keeps processing until there is nothing + // to do, so it should have upreplicated the range to 3. + require.Len(t, desc.Replicas().Voters(), 3) + } + + // Create a VOTER_OUTGOING, i.e. a joint configuration. + ltk.withStopAfterJointConfig(func() { + desc := tc.RemoveReplicasOrFatal(t, scratchStartKey, tc.Target(2)) + require.True(t, desc.Replicas().InAtomicReplicationChange(), desc) + trace, errMsg, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + formattedTrace := tracing.FormatRecordedSpans(trace) + expectedMessages := []string{ + `transitioning out of joint configuration`, + } + if err := testutils.MatchInOrder(formattedTrace, expectedMessages...); err != nil { + t.Fatal(err) + } - // Bonus points: the replicate queue keeps processing until there is nothing - // to do, so it should have upreplicated the range to 3. - require.Len(t, desc.Replicas().Voters(), 3) + desc = tc.LookupRangeOrFatal(t, scratchStartKey) + require.False(t, desc.Replicas().InAtomicReplicationChange(), desc) + // Queue processed again, so we're back to three replicas. + require.Len(t, desc.Replicas().Voters(), 3) + }) } -func TestGCQueueSeesLearner(t *testing.T) { +func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, ReplicationMode: base.ReplicationManual, @@ -359,28 +426,41 @@ func TestGCQueueSeesLearner(t *testing.T) { // Add a learner replica, send a snapshot so that it's materialized as a // Replica on the Store, but don't promote it to a voter. scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) - _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + ltk.withStopAfterLearnerAtomic(func() { + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + }) // Run the replicaGC queue. - store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey) - trace, errMsg, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */) - require.NoError(t, err) - require.Equal(t, ``, errMsg) - const msg = `not gc'able, replica is still in range descriptor: (n2,s2):2LEARNER` - require.Contains(t, tracing.FormatRecordedSpans(trace), msg) + checkNoGC := func() roachpb.RangeDescriptor { + store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey) + trace, errMsg, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + const msg = `not gc'able, replica is still in range descriptor: (n2,s2):` + require.Contains(t, tracing.FormatRecordedSpans(trace), msg) + return tc.LookupRangeOrFatal(t, scratchStartKey) + } + desc := checkNoGC() // Make sure it didn't collect the learner. - desc := tc.LookupRangeOrFatal(t, scratchStartKey) require.NotEmpty(t, desc.Replicas().Learners()) + + // Now get the range into a joint config. + tc.RemoveReplicasOrFatal(t, scratchStartKey, tc.Target(1)) // remove learner + ltk.withStopAfterJointConfig(func() { + desc = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + require.Len(t, desc.Replicas().Filter(predIncoming), 1, desc) + }) + + postDesc := checkNoGC() + require.Equal(t, desc, postDesc) } func TestRaftSnapshotQueueSeesLearner(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() blockSnapshotsCh := make(chan struct{}) - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.DisableRaftSnapshotQueue = true ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error { select { @@ -444,7 +524,7 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) { blockUntilSnapshotCh := make(chan struct{}, 2) blockSnapshotsCh := make(chan struct{}) - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error { blockUntilSnapshotCh <- struct{}{} <-blockSnapshotsCh @@ -501,7 +581,7 @@ func TestLearnerReplicateQueueRace(t *testing.T) { var skipReceiveSnapshotKnobAtomic int64 = 1 blockUntilSnapshotCh := make(chan struct{}, 2) blockSnapshotsCh := make(chan struct{}) - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error { if atomic.LoadInt64(&skipReceiveSnapshotKnobAtomic) > 0 { return nil @@ -574,7 +654,7 @@ func TestLearnerReplicateQueueRace(t *testing.T) { func TestLearnerNoAcceptLease(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, ReplicationMode: base.ReplicationManual, @@ -586,9 +666,9 @@ func TestLearnerNoAcceptLease(t *testing.T) { // Add a learner replica, send a snapshot so that it's materialized as a // Replica on the Store, but don't promote it to a voter. scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) - _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + ltk.withStopAfterLearnerAtomic(func() { + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + }) desc := tc.LookupRangeOrFatal(t, scratchStartKey) err := tc.TransferRangeLease(desc, tc.Target(1)) @@ -597,7 +677,38 @@ func TestLearnerNoAcceptLease(t *testing.T) { } } -func TestLearnerFollowerRead(t *testing.T) { +// TestJointConfigLease verifies that incoming and outgoing voters can't have the +// lease transferred to them. +func TestJointConfigLease(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + knobs, ltk := makeReplicationTestKnobs() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + atomic.StoreInt64(<k.replicaAddStopAfterJointConfig, 1) + atomic.StoreInt64(<k.replicationAlwaysUseJointConfig, 1) + desc := tc.AddReplicasOrFatal(t, k, tc.Target(1)) + require.True(t, desc.Replicas().InAtomicReplicationChange(), desc) + + err := tc.TransferRangeLease(desc, tc.Target(1)) + exp := `cannot transfer lease to replica of type VOTER_INCOMING` + require.True(t, testutils.IsError(err, exp), err) + + // NB: we don't have to transition out of the previous joint config first + // because this is done automatically by ChangeReplicas before it does what + // it's asked to do. + desc = tc.RemoveReplicasOrFatal(t, k, tc.Target(1)) + err = tc.TransferRangeLease(desc, tc.Target(1)) + exp = `cannot transfer lease to replica of type VOTER_OUTGOING` + require.True(t, testutils.IsError(err, exp), err) +} + +func TestLearnerAndJointConfigFollowerRead(t *testing.T) { defer leaktest.AfterTest(t)() if util.RaceEnabled { @@ -607,7 +718,7 @@ func TestLearnerFollowerRead(t *testing.T) { } ctx := context.Background() - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, ReplicationMode: base.ReplicationManual, @@ -620,43 +731,72 @@ func TestLearnerFollowerRead(t *testing.T) { db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true`) scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) - scratchDesc := tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) - - req := roachpb.BatchRequest{Header: roachpb.Header{ - RangeID: scratchDesc.RangeID, - Timestamp: tc.Server(0).Clock().Now(), - }} - req.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{ - Key: scratchDesc.StartKey.AsRawKey(), EndKey: scratchDesc.EndKey.AsRawKey(), - }}) - - _, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey) - testutils.SucceedsSoon(t, func() error { - // Trace the Send call so we can verify that it hit the exact `learner - // replicas cannot serve follower reads` branch that we're trying to test. - sendCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "manual read request") - defer cancel() - _, pErr := repl.Send(sendCtx, req) - err := pErr.GoError() - if !testutils.IsError(err, `not lease holder`) { - return errors.Errorf(`expected "not lease holder" error got: %+v`, err) - } - const msg = `learner replicas cannot serve follower reads` - formattedTrace := tracing.FormatRecordedSpans(collect()) - if !strings.Contains(formattedTrace, msg) { - return errors.Errorf("expected a trace with `%s` got:\n%s", msg, formattedTrace) - } - return nil + var scratchDesc roachpb.RangeDescriptor + ltk.withStopAfterLearnerAtomic(func() { + scratchDesc = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) }) + + check := func() { + req := roachpb.BatchRequest{Header: roachpb.Header{ + RangeID: scratchDesc.RangeID, + Timestamp: tc.Server(0).Clock().Now(), + }} + req.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{ + Key: scratchDesc.StartKey.AsRawKey(), EndKey: scratchDesc.EndKey.AsRawKey(), + }}) + + _, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey) + testutils.SucceedsSoon(t, func() error { + // Trace the Send call so we can verify that it hit the exact `learner + // replicas cannot serve follower reads` branch that we're trying to test. + sendCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "manual read request") + defer cancel() + _, pErr := repl.Send(sendCtx, req) + err := pErr.GoError() + if !testutils.IsError(err, `not lease holder`) { + return errors.Errorf(`expected "not lease holder" error got: %+v`, err) + } + const msg = `cannot serve follower reads` + formattedTrace := tracing.FormatRecordedSpans(collect()) + if !strings.Contains(formattedTrace, msg) { + return errors.Errorf("expected a trace with `%s` got:\n%s", msg, formattedTrace) + } + return nil + }) + } + + // Can't serve follower read from the LEARNER. + check() + + atomic.StoreInt64(<k.replicaAddStopAfterJointConfig, 1) + atomic.StoreInt64(<k.replicationAlwaysUseJointConfig, 1) + + scratchDesc = tc.RemoveReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + // Removing a learner doesn't get you into a joint state (no voters changed). + require.False(t, scratchDesc.Replicas().InAtomicReplicationChange(), scratchDesc) + scratchDesc = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + + // Re-adding the voter (and remaining in joint config) does. + require.True(t, scratchDesc.Replicas().InAtomicReplicationChange(), scratchDesc) + require.Len(t, scratchDesc.Replicas().Filter(predIncoming), 1) + + // Can't serve follower read from the VOTER_INCOMING. + check() + + // Removing the voter (and remaining in joint config) does. + scratchDesc = tc.RemoveReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + require.True(t, scratchDesc.Replicas().InAtomicReplicationChange(), scratchDesc) + require.Len(t, scratchDesc.Replicas().Filter(predOutgoing), 1) + + // Can't serve follower read from the VOTER_OUTGOING. + check() } func TestLearnerAdminRelocateRange(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, ReplicationMode: base.ReplicationManual, @@ -666,10 +806,10 @@ func TestLearnerAdminRelocateRange(t *testing.T) { db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) scratchStartKey := tc.ScratchRange(t) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) - _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) - _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(2)) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + ltk.withStopAfterLearnerAtomic(func() { + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(2)) + }) // Test AdminRelocateRange's treatment of learners by having one that it has // to remove and one that should stay and become a voter. @@ -689,11 +829,11 @@ func TestLearnerAdminRelocateRange(t *testing.T) { require.Empty(t, desc.Replicas().Learners()) } -func TestLearnerAdminMerge(t *testing.T) { +func TestLearnerAndJointConfigAdminMerge(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, ReplicationMode: base.ReplicationManual, @@ -708,27 +848,79 @@ func TestLearnerAdminMerge(t *testing.T) { _, _ = tc.SplitRangeOrFatal(t, splitKey1) _, _ = tc.SplitRangeOrFatal(t, splitKey2) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) - _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) - _ = tc.AddReplicasOrFatal(t, splitKey2, tc.Target(1)) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + // Three ranges (in that order): + // desc1: will have a learner (later joint voter) + // desc2 (unnamed): is always left vanilla + // desc3: like desc1 + // + // This allows testing merges that have a learner on the RHS (on desc2) and + // the LHS (on desc1). + var desc1, desc3 roachpb.RangeDescriptor + ltk.withStopAfterLearnerAtomic(func() { + desc1 = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + desc3 = tc.AddReplicasOrFatal(t, splitKey2, tc.Target(1)) + }) - // Learner on the lhs should fail. - err := tc.Server(0).DB().AdminMerge(ctx, scratchStartKey) - if !testutils.IsError(err, `cannot merge range with non-voter replicas on lhs`) { - t.Fatalf(`expected "cannot merge range with non-voter replicas on lhs" error got: %+v`, err) - } - // Learner on the rhs should fail. - err = tc.Server(0).DB().AdminMerge(ctx, splitKey1) - if !testutils.IsError(err, `cannot merge range with non-voter replicas on rhs`) { - t.Fatalf(`expected "cannot merge range with non-voter replicas on rhs" error got: %+v`, err) + checkFails := func() { + err := tc.Server(0).DB().AdminMerge(ctx, scratchStartKey) + if exp := `cannot merge range with non-voter replicas on`; !testutils.IsError(err, exp) { + t.Fatalf(`expected "%s" error got: %+v`, exp, err) + } + err = tc.Server(0).DB().AdminMerge(ctx, splitKey1) + if exp := `cannot merge range with non-voter replicas on`; !testutils.IsError(err, exp) { + t.Fatalf(`expected "%s" error got: %+v`, exp, err) + } } + + // LEARNER on the lhs or rhs should fail. + // desc{1,2,3} = (VOTER_FULL, LEARNER) (VOTER_FULL) (VOTER_FULL, LEARNER) + checkFails() + + // Turn the learners on desc1 and desc3 into VOTER_INCOMINGs. + atomic.StoreInt64(<k.replicaAddStopAfterJointConfig, 1) + atomic.StoreInt64(<k.replicationAlwaysUseJointConfig, 1) + desc1 = tc.RemoveReplicasOrFatal(t, desc1.StartKey.AsRawKey(), tc.Target(1)) + desc1 = tc.AddReplicasOrFatal(t, desc1.StartKey.AsRawKey(), tc.Target(1)) + require.Len(t, desc1.Replicas().Filter(predIncoming), 1) + desc3 = tc.RemoveReplicasOrFatal(t, desc3.StartKey.AsRawKey(), tc.Target(1)) + desc3 = tc.AddReplicasOrFatal(t, desc3.StartKey.AsRawKey(), tc.Target(1)) + require.Len(t, desc1.Replicas().Filter(predIncoming), 1) + + // VOTER_INCOMING on the lhs or rhs should fail. + // desc{1,2,3} = (VOTER_FULL, VOTER_INCOMING) (VOTER_FULL) (VOTER_FULL, VOTER_INCOMING) + checkFails() + + // Turn the incoming voters on desc1 and desc3 into VOTER_OUTGOINGs. + // desc{1,2,3} = (VOTER_FULL, VOTER_OUTGOING) (VOTER_FULL) (VOTER_FULL, VOTER_OUTGOING) + desc1 = tc.RemoveReplicasOrFatal(t, desc1.StartKey.AsRawKey(), tc.Target(1)) + require.Len(t, desc1.Replicas().Filter(predOutgoing), 1) + desc3 = tc.RemoveReplicasOrFatal(t, desc3.StartKey.AsRawKey(), tc.Target(1)) + require.Len(t, desc3.Replicas().Filter(predOutgoing), 1) + + // VOTER_OUTGOING on the lhs or rhs should fail. + checkFails() + + // Add a VOTER_INCOMING to desc2 to make sure it actually excludes this type + // of replicas from merges (rather than really just checking whether the + // replica sets are equal). + // desc{1,2,3} = (VOTER_FULL, VOTER_OUTGOING) (VOTER_FULL, VOTER_INCOMING) (VOTER_FULL, VOTER_OUTGOING) + desc2 := tc.AddReplicasOrFatal(t, splitKey1, tc.Target(1)) + require.Len(t, desc2.Replicas().Filter(predIncoming), 1) + + checkFails() + + // Ditto VOTER_OUTGOING. + // desc{1,2,3} = (VOTER_FULL, VOTER_OUTGOING) (VOTER_FULL, VOTER_OUTGOING) (VOTER_FULL, VOTER_OUTGOING) + desc2 = tc.RemoveReplicasOrFatal(t, desc2.StartKey.AsRawKey(), tc.Target(1)) + require.Len(t, desc2.Replicas().Filter(predOutgoing), 1) + + checkFails() } -func TestMergeQueueSeesLearner(t *testing.T) { +func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - knobs, ltk := makeLearnerTestKnobs() + knobs, ltk := makeReplicationTestKnobs() tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, ReplicationMode: base.ReplicationManual, @@ -743,34 +935,89 @@ func TestMergeQueueSeesLearner(t *testing.T) { origDesc := tc.LookupRangeOrFatal(t, scratchStartKey) splitKey := scratchStartKey.Next() - _, _ = tc.SplitRangeOrFatal(t, splitKey) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) - _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) - atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + splitAndUnsplit := func() roachpb.RangeDescriptor { + desc, _ := tc.SplitRangeOrFatal(t, splitKey) + // Unsplit the range to clear the sticky bit. + require.NoError(t, tc.Server(0).DB().AdminUnsplit(ctx, splitKey)) + return desc + } - // Unsplit the range to clear the sticky bit. - require.NoError(t, tc.Server(0).DB().AdminUnsplit(ctx, splitKey)) + // Run the merge queue while there's a learner on the LHS. + { + splitAndUnsplit() - // Run the merge queue. - store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - trace, errMsg, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) - require.NoError(t, err) - require.Equal(t, ``, errMsg) - formattedTrace := tracing.FormatRecordedSpans(trace) - expectedMessages := []string{ - `removing learner replicas \[n2,s2\]`, - `merging to produce range: /Table/Max-/Max`, - } - if err := testutils.MatchInOrder(formattedTrace, expectedMessages...); err != nil { - t.Fatal(err) + ltk.withStopAfterLearnerAtomic(func() { + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + }) + + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + trace, errMsg, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + formattedTrace := tracing.FormatRecordedSpans(trace) + expectedMessages := []string{ + `removing learner replicas \[n2,s2\]`, + `merging to produce range: /Table/Max-/Max`, + } + if err := testutils.MatchInOrder(formattedTrace, expectedMessages...); err != nil { + t.Fatal(err) + } + + // Sanity check that the desc has the same bounds it did originally. + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + require.Equal(t, origDesc.StartKey, desc.StartKey) + require.Equal(t, origDesc.EndKey, desc.EndKey) + // The merge removed the learner. + require.Len(t, desc.Replicas().Voters(), 1) + require.Empty(t, desc.Replicas().Learners()) } - // Sanity check that the desc has the same bounds it did originally. - desc := tc.LookupRangeOrFatal(t, scratchStartKey) - require.Equal(t, origDesc.StartKey, desc.StartKey) - require.Equal(t, origDesc.EndKey, desc.EndKey) - // The merge removed the learner. - require.Len(t, desc.Replicas().Voters(), 1) - require.Empty(t, desc.Replicas().Learners()) + // Create the RHS again and repeat the same game, except this time the LHS + // gets a VOTER_INCOMING for s2, and then the merge queue runs into it. It + // will transition the LHS out of the joint config and then do the merge. + { + desc := splitAndUnsplit() + + ltk.withStopAfterJointConfig(func() { + desc = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + }) + require.Len(t, desc.Replicas().Filter(predIncoming), 1, desc) + + checkTransitioningOut := func() { + t.Helper() + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + trace, errMsg, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + formattedTrace := tracing.FormatRecordedSpans(trace) + expectedMessages := []string{ + `transitioning out of joint configuration`, + `merging to produce range: /Table/Max-/Max`, + } + if err := testutils.MatchInOrder(formattedTrace, expectedMessages...); err != nil { + t.Fatal(err) + } + } + + checkTransitioningOut() + desc = tc.LookupRangeOrFatal(t, scratchStartKey) + require.Len(t, desc.Replicas().Voters(), 2) + require.False(t, desc.Replicas().InAtomicReplicationChange(), desc) + + // Repeat the game, except now we start with two replicas and we're + // giving the RHS a VOTER_OUTGOING. + desc = splitAndUnsplit() + ltk.withStopAfterJointConfig(func() { + descRight := tc.RemoveReplicasOrFatal(t, desc.EndKey.AsRawKey(), tc.Target(1)) + require.Len(t, descRight.Replicas().Filter(predOutgoing), 1, desc) + }) + + // This should transition out (i.e. remove the voter on s2 for the RHS) + // and then do its thing, which means in the end we have two voters again. + checkTransitioningOut() + desc = tc.LookupRangeOrFatal(t, scratchStartKey) + require.Len(t, desc.Replicas().Voters(), 2) + require.False(t, desc.Replicas().InAtomicReplicationChange(), desc) + } } diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index a1e2a40af580..f2c1b41cca0f 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -347,8 +347,10 @@ func (rq *replicateQueue) processOneChange( case AllocatorConsiderRebalance: return rq.considerRebalance(ctx, repl, voterReplicas, canTransferLease, dryRun) case AllocatorFinalizeAtomicReplicationChange: - _, err := repl.maybeLeaveAtomicChangeReplicas(ctx, repl.Desc()) - return false, err + _, err := maybeLeaveAtomicChangeReplicas(ctx, repl.store, repl.Desc()) + // Requeue because either we failed to transition out of a joint state + // (bad) or we did and there might be more to do for that range. + return true, err } return true, nil } diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 92e3d910856e..8a8548d5bac5 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -198,6 +198,17 @@ type StoreTestingKnobs struct { // This ensures the `*Replica` will be materialized on the Store when it // returns. ReplicaAddStopAfterLearnerSnapshot func() bool + // ReplicaAddStopAfterJointConfig causes replica addition to return early if + // the func returns true. This happens before transitioning out of a joint + // configuration, after the joint configuration has been entered by means + // of a first ChangeReplicas transaction. If the replication change does + // not use joint consensus, this early return is identical to the regular + // return path. + ReplicaAddStopAfterJointConfig func() bool + // ReplicationAlwaysUseJointConfig causes replica addition to always go + // through a joint configuration, even when this isn't necessary (because + // the replication change affects only one replica). + ReplicationAlwaysUseJointConfig func() bool // BeforeSnapshotSSTIngestion is run just before the SSTs are ingested when // applying a snapshot. BeforeSnapshotSSTIngestion func(IncomingSnapshot, SnapshotRequest_Type, []string) error diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 37f9d925a2bf..efa54c9b2f10 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -68,6 +68,12 @@ type TestClusterInterface interface { startKey roachpb.Key, targets ...roachpb.ReplicationTarget, ) (roachpb.RangeDescriptor, error) + // RemoveReplicasOrFatal is the same as RemoveReplicas but will Fatal the test on + // error. + RemoveReplicasOrFatal( + t testing.TB, startKey roachpb.Key, targets ...roachpb.ReplicationTarget, + ) roachpb.RangeDescriptor + // FindRangeLeaseHolder returns the current lease holder for the given range. // In particular, it returns one particular node's (the hint, if specified) view // of the current lease. diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 23127ff92705..666c7c679488 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -506,6 +506,19 @@ func (tc *TestCluster) RemoveReplicas( return tc.changeReplicas(roachpb.REMOVE_REPLICA, keys.MustAddr(startKey), targets...) } +// RemoveReplicasOrFatal is part of TestClusterInterface. +func (tc *TestCluster) RemoveReplicasOrFatal( + t testing.TB, startKey roachpb.Key, targets ...roachpb.ReplicationTarget, +) roachpb.RangeDescriptor { + t.Helper() + desc, err := tc.RemoveReplicas(startKey, targets...) + if err != nil { + t.Fatalf(`could not remove %v replicas from range containing %s: %+v`, + targets, startKey, err) + } + return desc +} + // TransferRangeLease is part of the TestServerInterface. func (tc *TestCluster) TransferRangeLease( rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget,