From 06dde3cf7cf46147314ce5e01b64b2333219d774 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 13 Aug 2019 23:32:23 +0200 Subject: [PATCH] storage: unify replica addition and removal paths This continues the reworking of the various replication change APIs with the goal of allowing a) testing of general atomic replication changes b) issuing replica swaps from the replicate queue (in 19.2). For previous steps, see: https://github.com/cockroachdb/cockroach/pull/39485 https://github.com/cockroachdb/cockroach/pull/39611 This change is not a pure plumbing PR. Instead, it unifies `(*Replica).addReplica` and `(*Replica).removeReplica` into a method that can do both, `(*Replica).addAndRemoveReplicas`. Given a slice of ReplicationChanges, this method first adds learner replicas corresponding to the desired new voters. After having sent snapshots to all of them, the method issues a configuration change that atomically - upgrades all learners to voters - removes any undesired replicas. Note that no atomic membership changes are *actually* carried out yet. This is because the callers of `addAndRemoveReplicas` pass in only a single change (i.e. an addition or removal), which the method also verifies. Three pieces are missing after this PR: First, we need to be able to instruct raft to carry out atomic configuration changes: https://github.com/cockroachdb/cockroach/blob/2e8db6ca53c59d3d281e64939f79d937195403d4/pkg/storage/replica_proposal_buf.go#L448-L451 which in particular requires being able to store the ConfState corresponding to a joint configuration in the unreplicated local state (under a new key). Second, we must pass the slice of changes handed to `AdminChangeReplicas` through to `addAndRemoveReplicas` without unrolling it first, see: https://github.com/cockroachdb/cockroach/blob/3b316bac6ef342590ddc68d2989714d6e126371a/pkg/storage/replica_command.go#L870-L891 and https://github.com/cockroachdb/cockroach/blob/3b316bac6ef342590ddc68d2989714d6e126371a/pkg/storage/replica.go#L1314-L1325 Third, we must to teach the replicate queue to issue the "atomic swaps"; this is the reason we're introducing atomic membership changes in the first place. Release note: None --- pkg/roachpb/api.go | 23 ++ pkg/storage/client_replica_test.go | 10 +- pkg/storage/replica_command.go | 311 +++++++++++++++++----------- pkg/storage/replica_learner_test.go | 2 +- pkg/storage/replica_test.go | 2 +- pkg/storage/replicate_queue.go | 3 +- 6 files changed, 218 insertions(+), 133 deletions(-) diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index ac3a94e08d4c..f0a0125bcf87 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1212,6 +1212,29 @@ func (acrr *AdminChangeReplicasRequest) AddChanges(chgs ...ReplicationChange) { } } +// ReplicationChanges is a slice of ReplicationChange. +type ReplicationChanges []ReplicationChange + +func (rc ReplicationChanges) byType(typ ReplicaChangeType) []ReplicationChange { + var sl []ReplicationChange + for _, chg := range rc { + if chg.ChangeType == typ { + sl = append(sl, chg) + } + } + return sl +} + +// Additions returns a slice of all contained replication changes that add replicas. +func (rc ReplicationChanges) Additions() []ReplicationChange { + return rc.byType(ADD_REPLICA) +} + +// Removals returns a slice of all contained replication changes that remove replicas. +func (rc ReplicationChanges) Removals() []ReplicationChange { + return rc.byType(REMOVE_REPLICA) +} + // Changes returns the changes requested by this AdminChangeReplicasRequest, taking // the deprecated method of doing so into account. func (acrr *AdminChangeReplicasRequest) Changes() []ReplicationChange { diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index e45d3d849e50..67262a5260b2 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -1673,20 +1673,22 @@ func TestChangeReplicasGeneration(t *testing.T) { assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+2) oldGeneration = repl.Desc().GetGeneration() - if _, err := repl.ChangeReplicas( + oldDesc := repl.Desc() + newDesc, err := repl.ChangeReplicas( context.Background(), roachpb.REMOVE_REPLICA, roachpb.ReplicationTarget{ NodeID: mtc.idents[1].NodeID, StoreID: mtc.idents[1].StoreID, }, - repl.Desc(), + oldDesc, storagepb.ReasonRangeOverReplicated, "", - ); err != nil { + ) + if err != nil { t.Fatalf("unexpected error: %v", err) } - assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+1) + assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+1, "\nold: %+v\nnew: %+v", oldDesc, newDesc) } func TestSystemZoneConfigs(t *testing.T) { diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 8483715adec8..f3bb04f3c5fb 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -879,54 +879,86 @@ func (r *Replica) ChangeReplicas( return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r) } + var chgs []roachpb.ReplicationChange switch changeType { case roachpb.ADD_REPLICA: - return r.addReplica(ctx, target, desc, SnapshotRequest_REBALANCE, reason, details) + chgs = roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, target) case roachpb.REMOVE_REPLICA: - return r.removeReplica(ctx, target, desc, SnapshotRequest_REBALANCE, reason, details) + chgs = roachpb.MakeReplicationChanges(roachpb.REMOVE_REPLICA, target) default: return nil, errors.Errorf(`unknown change type: %s`, changeType) } + return r.addAndRemoveReplicas(ctx, desc, SnapshotRequest_REBALANCE, reason, details, chgs) } func validateReplicationChanges( - desc *roachpb.RangeDescriptor, target roachpb.ReplicationTarget, + desc *roachpb.RangeDescriptor, chgs roachpb.ReplicationChanges, ) error { + // First make sure that the changes don't self-overlap (i.e. we're not adding + // a replica twice, or removing and immediately re-adding it). + byNodeID := make(map[roachpb.NodeID]roachpb.ReplicationChange, len(chgs)) + for _, chg := range chgs { + if _, ok := byNodeID[chg.Target.NodeID]; ok { + return fmt.Errorf("changes %+v refer to n%d twice", chgs, chg.Target.NodeID) + } + byNodeID[chg.Target.NodeID] = chg + } + + // Then, check that we're not adding a second replica on nodes that already + // have one, or "re-add" an existing replica. We delete from byNodeID so that + // after this loop, it contains only StoreIDs that we haven't seen in desc. for _, rDesc := range desc.Replicas().All() { - if rDesc.NodeID == target.NodeID { - // Two replicas from the same range are not allowed on the same node, even - // in different stores. - if rDesc.StoreID != target.StoreID { - return errors.Errorf("unable to add replica %v; node already has a replica in %s", target, desc) - } + chg, ok := byNodeID[rDesc.NodeID] + delete(byNodeID, rDesc.NodeID) + if !ok || chg.ChangeType == roachpb.REMOVE_REPLICA { + continue + } + // We're adding a replica that's already there. This isn't allowed, even + // when the newly added one would be on a different store. + if rDesc.StoreID != chg.Target.StoreID { + return errors.Errorf("unable to add replica %v; node already has a replica in %s", chg.Target.StoreID, desc) + } - // Looks like we found a replica with the same store and node id. If the - // replica is already a learner, then either some previous leaseholder was - // trying to add it with the learner+snapshot+voter cycle and got - // interrupted or else we hit a race between the replicate queue and - // AdminChangeReplicas. - if rDesc.GetType() == roachpb.ReplicaType_LEARNER { - return errors.Errorf( - "unable to add replica %v which is already present as a learner in %s", target, desc) - } + // Looks like we found a replica with the same store and node id. If the + // replica is already a learner, then either some previous leaseholder was + // trying to add it with the learner+snapshot+voter cycle and got + // interrupted or else we hit a race between the replicate queue and + // AdminChangeReplicas. + if rDesc.GetType() == roachpb.ReplicaType_LEARNER { + return errors.Errorf( + "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + } - // Otherwise, we already had a full voter replica. Can't add another to - // this store. - return errors.Errorf("unable to add replica %v which is already present in %s", target, desc) + // Otherwise, we already had a full voter replica. Can't add another to + // this store. + return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) + } + + // Any removals left in the map now refer to nonexisting replicas, and we refuse them. + for _, chg := range byNodeID { + if chg.ChangeType == roachpb.ADD_REPLICA { + continue } + return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } return nil } -func (r *Replica) addReplica( +func (r *Replica) addAndRemoveReplicas( ctx context.Context, - target roachpb.ReplicationTarget, desc *roachpb.RangeDescriptor, priority SnapshotRequest_Priority, reason storagepb.RangeLogEventReason, details string, + chgs roachpb.ReplicationChanges, ) (*roachpb.RangeDescriptor, error) { - if err := validateReplicationChanges(desc, target); err != nil { + if len(chgs) != 1 { + // TODO(tbg): lift this restriction when atomic membership changes are + // plumbed into raft. + return nil, errors.Errorf("need exactly one change, got %+v", chgs) + } + + if err := validateReplicationChanges(desc, chgs); err != nil { return nil, err } @@ -934,114 +966,140 @@ func (r *Replica) addReplica( useLearners := useLearnerReplicas.Get(&settings.SV) useLearners = useLearners && settings.Version.IsActive(cluster.VersionLearnerReplicas) if !useLearners { + // NB: we will never use atomic replication changes while learners are not + // also active. + target := chgs[0].Target return r.addReplicaLegacyPreemptiveSnapshot(ctx, target, desc, priority, reason, details) } - // First add the replica as a raft learner. This means it accepts raft traffic - // (so it can catch up) but doesn't vote (so it doesn't affect quorum and thus - // doesn't introduce fragility into the system). For details see + // For all newly added nodes, first add raft learner replicas. They accept raft traffic + // (so they can catch up) but don't get to vote (so they don't affect quorum and thus + // don't introduce fragility into the system). For details see: _ = roachpb.ReplicaDescriptors.Learners - learnerDesc, err := addLearnerReplica(ctx, r.store, desc, target, reason, details) + learnerDesc, err := addLearnerReplicas(ctx, r.store, desc, reason, details, chgs.Additions()) if err != nil { return nil, err } // Now move it to be a full voter (waiting on it to get a raft snapshot first, // so it's not immediately way behind). - voterDesc, err := r.promoteLearnerReplicaToVoter(ctx, learnerDesc, target, priority, reason, details) + voterDesc, err := r.finalizeConfChange(ctx, learnerDesc, priority, reason, details, chgs) if err != nil { // Don't leave a learner replica lying around if we didn't succeed in // promoting it to a voter. - log.Infof(ctx, "could not promote %s to voter, rolling back: %v", target, err) - r.rollbackLearnerReplica(ctx, learnerDesc, target, reason, details) + adds := chgs.Additions() + log.Infof(ctx, "could not promote %v to voter, rolling back: %v", adds, err) + for _, chg := range adds { + r.rollbackLearnerReplica(ctx, learnerDesc, chg.Target, reason, details) + } return nil, err } return voterDesc, nil } -func addLearnerReplica( +func addLearnerReplicas( ctx context.Context, store *Store, desc *roachpb.RangeDescriptor, - target roachpb.ReplicationTarget, reason storagepb.RangeLogEventReason, details string, + chgs []roachpb.ReplicationChange, ) (*roachpb.RangeDescriptor, error) { + if len(chgs) == 0 { + // If there's nothing to do, return early to avoid redundant work. + return desc, nil + } newDesc := *desc newDesc.SetReplicas(desc.Replicas().DeepCopy()) - replDesc := roachpb.ReplicaDescriptor{ - NodeID: target.NodeID, - StoreID: target.StoreID, - ReplicaID: desc.NextReplicaID, - Type: roachpb.ReplicaTypeLearner(), - } - newDesc.NextReplicaID++ - newDesc.AddReplica(replDesc) + for _, chg := range chgs { + replDesc := roachpb.ReplicaDescriptor{ + NodeID: chg.Target.NodeID, + StoreID: chg.Target.StoreID, + ReplicaID: desc.NextReplicaID, + Type: roachpb.ReplicaTypeLearner(), + } + newDesc.NextReplicaID++ + newDesc.AddReplica(replDesc) + } err := execChangeReplicasTxn( - ctx, store, roachpb.ADD_REPLICA, desc, replDesc, &newDesc, reason, details, + ctx, store, desc, &newDesc, reason, details, chgs, ) return &newDesc, err } -func (r *Replica) promoteLearnerReplicaToVoter( +// finalizeConfChange carries out the atomic membership change that finalizes +// the addition and/or removal of replicas. Any voters in the process of being +// added (as reflected by the replication changes) must have been added as +// learners already and will be caught up before being promoted to voters. Any +// replica removals (from the replication changes) will be removed. All of this +// occurs in one atomic raft membership change. +func (r *Replica) finalizeConfChange( ctx context.Context, desc *roachpb.RangeDescriptor, - target roachpb.ReplicationTarget, priority SnapshotRequest_Priority, reason storagepb.RangeLogEventReason, details string, + chgs roachpb.ReplicationChanges, ) (*roachpb.RangeDescriptor, error) { // TODO(dan): We allow ranges with learner replicas to split, so in theory // this may want to detect that and retry, sending a snapshot and promoting // both sides. + adds, removes := chgs.Additions(), chgs.Removals() newReplicas := desc.Replicas().DeepCopy().All() for i, rDesc := range newReplicas { - if rDesc.NodeID != target.NodeID || rDesc.StoreID != target.StoreID { - continue - } - if rDesc.GetType() != roachpb.ReplicaType_LEARNER { - return nil, errors.Errorf(`%s: cannot promote replica of type %s`, r, rDesc.Type) - } + for _, chg := range adds { + if rDesc.StoreID != chg.Target.StoreID { + continue + } + if rDesc.GetType() != roachpb.ReplicaType_LEARNER { + return nil, errors.Errorf(`%s: cannot promote replica of type %s`, r, rDesc.Type) + } - // Note that raft snapshot queue will refuse to send a snapshot to a learner - // replica if its store is already sending a snapshot to that replica. That - // races with this snapshot. Most of the time, this side will win the race, - // which avoids needlessly sending the snapshot twice. If the raft snapshot - // queue wins, it's wasteful, but doesn't impact correctness. - // - // Replicas are added to the raft snapshot queue by the raft leader. This - // code can be run anywhere (though it's usually run on the leaseholder, - // which is usually co-located with the raft leader). This means that - // they're usually on the same node, but not always, so that's about as good - // a guarantee as we can offer, anyway. - // - // We originally tried always refusing to send snapshots from the raft - // snapshot queue to learner replicas, but this turned out to be brittle. - // First, if the snapshot failed, any attempt to use the learner's raft - // group would hang until the replicate queue got around to cleaning up the - // orphaned learner. Second, this tickled some bugs in etcd/raft around - // switching between StateSnapshot and StateProbe. Even if we worked through - // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot(ctx, rDesc, SnapshotRequest_LEARNER, priority); err != nil { - return nil, err - } - - if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil { - if fn() { - return desc, nil + // Note that raft snapshot queue will refuse to send a snapshot to a learner + // replica if its store is already sending a snapshot to that replica. That + // races with this snapshot. Most of the time, this side will win the race, + // which avoids needlessly sending the snapshot twice. If the raft snapshot + // queue wins, it's wasteful, but doesn't impact correctness. + // + // Replicas are added to the raft snapshot queue by the raft leader. This + // code can be run anywhere (though it's usually run on the leaseholder, + // which is usually co-located with the raft leader). This means that + // they're usually on the same node, but not always, so that's about as good + // a guarantee as we can offer, anyway. + // + // We originally tried always refusing to send snapshots from the raft + // snapshot queue to learner replicas, but this turned out to be brittle. + // First, if the snapshot failed, any attempt to use the learner's raft + // group would hang until the replicate queue got around to cleaning up the + // orphaned learner. Second, this tickled some bugs in etcd/raft around + // switching between StateSnapshot and StateProbe. Even if we worked through + // these, it would be susceptible to future similar issues. + if err := r.sendSnapshot(ctx, rDesc, SnapshotRequest_LEARNER, priority); err != nil { + return nil, err } + + rDesc.Type = roachpb.ReplicaTypeVoter() + newReplicas[i] = rDesc } + } - rDesc.Type = roachpb.ReplicaTypeVoter() - newReplicas[i] = rDesc + if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil { + if fn() { + return desc, nil + } + } - updatedDesc := *desc - updatedDesc.SetReplicas(roachpb.MakeReplicaDescriptors(&newReplicas)) - err := execChangeReplicasTxn(ctx, r.store, roachpb.ADD_REPLICA, desc, rDesc, &updatedDesc, reason, details) - return &updatedDesc, err + updatedDesc := *desc + updatedDesc.SetReplicas(roachpb.MakeReplicaDescriptors(&newReplicas)) + for _, chg := range removes { + if _, found := updatedDesc.RemoveReplica(chg.Target.NodeID, chg.Target.StoreID); !found { + return nil, errors.Errorf("target to remove %v not found in %s", chg.Target, &updatedDesc) + } } - return nil, errors.Errorf(`%s: could not find replica to promote %s`, r, target) + + err := execChangeReplicasTxn(ctx, r.store, desc, &updatedDesc, reason, details, chgs) + return &updatedDesc, err } func (r *Replica) rollbackLearnerReplica( @@ -1067,8 +1125,9 @@ func (r *Replica) rollbackLearnerReplica( // blocking the caller indefinitely). const rollbackTimeout = 10 * time.Second rollbackFn := func(ctx context.Context) error { + chgs := roachpb.MakeReplicationChanges(roachpb.REMOVE_REPLICA, target) return execChangeReplicasTxn( - ctx, r.store, roachpb.REMOVE_REPLICA, desc, replDesc, &newDesc, reason, details, + ctx, r.store, desc, &newDesc, reason, details, chgs, ) } rollbackCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) @@ -1154,42 +1213,19 @@ func (r *Replica) addReplicaLegacyPreemptiveSnapshot( updatedDesc.NextReplicaID++ updatedDesc.AddReplica(repDesc) - err := execChangeReplicasTxn(ctx, r.store, roachpb.ADD_REPLICA, desc, repDesc, &updatedDesc, reason, details) - return &updatedDesc, err -} - -func (r *Replica) removeReplica( - ctx context.Context, - target roachpb.ReplicationTarget, - desc *roachpb.RangeDescriptor, - priority SnapshotRequest_Priority, - reason storagepb.RangeLogEventReason, - details string, -) (*roachpb.RangeDescriptor, error) { - if desc == nil { - return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r) - } - updatedDesc := *desc - updatedDesc.SetReplicas(desc.Replicas().DeepCopy()) - // If that exact node-store combination does not have the replica, - // abort the removal. - removed, ok := updatedDesc.RemoveReplica(target.NodeID, target.StoreID) - if !ok { - return nil, errors.Errorf("%s: unable to remove replica %v which is not present", r, target) - } - err := execChangeReplicasTxn(ctx, r.store, roachpb.REMOVE_REPLICA, desc, removed, &updatedDesc, reason, details) + chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, target) + err := execChangeReplicasTxn(ctx, r.store, desc, &updatedDesc, reason, details, chgs) return &updatedDesc, err } func execChangeReplicasTxn( ctx context.Context, store *Store, - changeType roachpb.ReplicaChangeType, desc *roachpb.RangeDescriptor, - repDesc roachpb.ReplicaDescriptor, updatedDesc *roachpb.RangeDescriptor, reason storagepb.RangeLogEventReason, details string, + chgs roachpb.ReplicationChanges, ) error { generationComparableEnabled := store.ClusterSettings().Version.IsActive(cluster.VersionGenerationComparable) if generationComparableEnabled { @@ -1197,6 +1233,22 @@ func execChangeReplicasTxn( updatedDesc.GenerationComparable = proto.Bool(true) } + var added, removed []roachpb.ReplicaDescriptor + for _, chg := range chgs.Additions() { + repDesc, ok := updatedDesc.GetReplicaDescriptor(chg.Target.StoreID) + if !ok { + return errors.Errorf("unable to find s%d in updated desc %s", chg.Target.StoreID, updatedDesc) + } + added = append(added, repDesc) + } + for _, chg := range chgs.Removals() { + repDesc, ok := desc.GetReplicaDescriptor(chg.Target.StoreID) + if !ok { + return errors.Errorf("unable to find s%d in desc %s", chg.Target.StoreID, desc) + } + removed = append(removed, repDesc) + } + descKey := keys.RangeDescriptorKey(desc.StartKey) if err := store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error { log.Event(ctx, "attempting txn") @@ -1205,7 +1257,7 @@ func execChangeReplicasTxn( if err != nil { return err } - log.Infof(ctx, "change replicas (%v %s): existing descriptor %s", changeType, repDesc, desc) + log.Infof(ctx, "change replicas (%v): existing descriptor %s", chgs, desc) { b := txn.NewBatch() @@ -1223,10 +1275,20 @@ func execChangeReplicasTxn( } // Log replica change into range event log. - if err := store.logChange( - ctx, txn, changeType, repDesc, *updatedDesc, reason, details, - ); err != nil { - return err + for _, tup := range []struct { + typ roachpb.ReplicaChangeType + repDescs []roachpb.ReplicaDescriptor + }{ + {roachpb.ADD_REPLICA, added}, + {roachpb.REMOVE_REPLICA, removed}, + } { + for _, repDesc := range tup.repDescs { + if err := store.logChange( + ctx, txn, tup.typ, repDesc, *updatedDesc, reason, details, + ); err != nil { + return err + } + } } // End the transaction manually instead of letting RunTransaction @@ -1240,21 +1302,18 @@ func execChangeReplicasTxn( var crt *roachpb.ChangeReplicasTrigger if !store.ClusterSettings().Version.IsActive(cluster.VersionAtomicChangeReplicasTrigger) { + var deprecatedRepDesc roachpb.ReplicaDescriptor + if len(added) > 0 { + deprecatedRepDesc = added[0] + } else { + deprecatedRepDesc = removed[0] + } crt = &roachpb.ChangeReplicasTrigger{ - DeprecatedChangeType: changeType, - DeprecatedReplica: repDesc, + DeprecatedChangeType: chgs[0].ChangeType, + DeprecatedReplica: deprecatedRepDesc, Desc: updatedDesc, } } else { - var added, removed []roachpb.ReplicaDescriptor - switch changeType { - case roachpb.ADD_REPLICA: - added = append(added, repDesc) - case roachpb.REMOVE_REPLICA: - removed = append(removed, repDesc) - default: - return errors.Errorf("unknown change type: %d", changeType) - } crt = &roachpb.ChangeReplicasTrigger{ Desc: updatedDesc, InternalAddedReplicas: added, diff --git a/pkg/storage/replica_learner_test.go b/pkg/storage/replica_learner_test.go index 4c49dfacf45e..2ad18972f8c4 100644 --- a/pkg/storage/replica_learner_test.go +++ b/pkg/storage/replica_learner_test.go @@ -542,7 +542,7 @@ func TestLearnerReplicateQueueRace(t *testing.T) { } formattedTrace := tracing.FormatRecordedSpans(trace) expectedMessages := []string{ - `could not promote n3,s3 to voter, rolling back: change replicas of r\d+ failed: descriptor changed`, + `could not promote .*n3,s3.* to voter, rolling back: change replicas of r\d+ failed: descriptor changed`, // TODO(dan): Consider skipping the rollback when trying to promote a // learner to a voter results in a "descriptor changed" error. `failed to rollback learner n3,s3, abandoning it for the replicate queue: change replicas of r\d+ failed: descriptor changed`, diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index c099c4ec7110..945e4911351c 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -5990,7 +5990,7 @@ func TestChangeReplicasDuplicateError(t *testing.T) { storagepb.ReasonRebalance, "", ); err == nil || !strings.Contains(err.Error(), "node already has a replica") { - t.Fatalf("must not be able to add second replica to same node (err=%s)", err) + t.Fatalf("must not be able to add second replica to same node (err=%+v)", err) } } diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 3550538f2245..adfbe1d9dbfd 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -760,7 +760,8 @@ func (rq *replicateQueue) addReplica( if dryRun { return nil } - if _, err := repl.addReplica(ctx, target, desc, priority, reason, details); err != nil { + chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, target) + if _, err := repl.addAndRemoveReplicas(ctx, desc, priority, reason, details, chgs); err != nil { return err } rangeUsageInfo := rangeUsageInfoForRepl(repl)