diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index ac3a94e08d4c..f0a0125bcf87 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1212,6 +1212,29 @@ func (acrr *AdminChangeReplicasRequest) AddChanges(chgs ...ReplicationChange) { } } +// ReplicationChanges is a slice of ReplicationChange. +type ReplicationChanges []ReplicationChange + +func (rc ReplicationChanges) byType(typ ReplicaChangeType) []ReplicationChange { + var sl []ReplicationChange + for _, chg := range rc { + if chg.ChangeType == typ { + sl = append(sl, chg) + } + } + return sl +} + +// Additions returns a slice of all contained replication changes that add replicas. +func (rc ReplicationChanges) Additions() []ReplicationChange { + return rc.byType(ADD_REPLICA) +} + +// Removals returns a slice of all contained replication changes that remove replicas. +func (rc ReplicationChanges) Removals() []ReplicationChange { + return rc.byType(REMOVE_REPLICA) +} + // Changes returns the changes requested by this AdminChangeReplicasRequest, taking // the deprecated method of doing so into account. func (acrr *AdminChangeReplicasRequest) Changes() []ReplicationChange { diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index e45d3d849e50..67262a5260b2 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -1673,20 +1673,22 @@ func TestChangeReplicasGeneration(t *testing.T) { assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+2) oldGeneration = repl.Desc().GetGeneration() - if _, err := repl.ChangeReplicas( + oldDesc := repl.Desc() + newDesc, err := repl.ChangeReplicas( context.Background(), roachpb.REMOVE_REPLICA, roachpb.ReplicationTarget{ NodeID: mtc.idents[1].NodeID, StoreID: mtc.idents[1].StoreID, }, - repl.Desc(), + oldDesc, storagepb.ReasonRangeOverReplicated, "", - ); err != nil { + ) + if err != nil { t.Fatalf("unexpected error: %v", err) } - assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+1) + assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+1, "\nold: %+v\nnew: %+v", oldDesc, newDesc) } func TestSystemZoneConfigs(t *testing.T) { diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 8483715adec8..f3bb04f3c5fb 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -879,54 +879,86 @@ func (r *Replica) ChangeReplicas( return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r) } + var chgs []roachpb.ReplicationChange switch changeType { case roachpb.ADD_REPLICA: - return r.addReplica(ctx, target, desc, SnapshotRequest_REBALANCE, reason, details) + chgs = roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, target) case roachpb.REMOVE_REPLICA: - return r.removeReplica(ctx, target, desc, SnapshotRequest_REBALANCE, reason, details) + chgs = roachpb.MakeReplicationChanges(roachpb.REMOVE_REPLICA, target) default: return nil, errors.Errorf(`unknown change type: %s`, changeType) } + return r.addAndRemoveReplicas(ctx, desc, SnapshotRequest_REBALANCE, reason, details, chgs) } func validateReplicationChanges( - desc *roachpb.RangeDescriptor, target roachpb.ReplicationTarget, + desc *roachpb.RangeDescriptor, chgs roachpb.ReplicationChanges, ) error { + // First make sure that the changes don't self-overlap (i.e. we're not adding + // a replica twice, or removing and immediately re-adding it). + byNodeID := make(map[roachpb.NodeID]roachpb.ReplicationChange, len(chgs)) + for _, chg := range chgs { + if _, ok := byNodeID[chg.Target.NodeID]; ok { + return fmt.Errorf("changes %+v refer to n%d twice", chgs, chg.Target.NodeID) + } + byNodeID[chg.Target.NodeID] = chg + } + + // Then, check that we're not adding a second replica on nodes that already + // have one, or "re-add" an existing replica. We delete from byNodeID so that + // after this loop, it contains only StoreIDs that we haven't seen in desc. for _, rDesc := range desc.Replicas().All() { - if rDesc.NodeID == target.NodeID { - // Two replicas from the same range are not allowed on the same node, even - // in different stores. - if rDesc.StoreID != target.StoreID { - return errors.Errorf("unable to add replica %v; node already has a replica in %s", target, desc) - } + chg, ok := byNodeID[rDesc.NodeID] + delete(byNodeID, rDesc.NodeID) + if !ok || chg.ChangeType == roachpb.REMOVE_REPLICA { + continue + } + // We're adding a replica that's already there. This isn't allowed, even + // when the newly added one would be on a different store. + if rDesc.StoreID != chg.Target.StoreID { + return errors.Errorf("unable to add replica %v; node already has a replica in %s", chg.Target.StoreID, desc) + } - // Looks like we found a replica with the same store and node id. If the - // replica is already a learner, then either some previous leaseholder was - // trying to add it with the learner+snapshot+voter cycle and got - // interrupted or else we hit a race between the replicate queue and - // AdminChangeReplicas. - if rDesc.GetType() == roachpb.ReplicaType_LEARNER { - return errors.Errorf( - "unable to add replica %v which is already present as a learner in %s", target, desc) - } + // Looks like we found a replica with the same store and node id. If the + // replica is already a learner, then either some previous leaseholder was + // trying to add it with the learner+snapshot+voter cycle and got + // interrupted or else we hit a race between the replicate queue and + // AdminChangeReplicas. + if rDesc.GetType() == roachpb.ReplicaType_LEARNER { + return errors.Errorf( + "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + } - // Otherwise, we already had a full voter replica. Can't add another to - // this store. - return errors.Errorf("unable to add replica %v which is already present in %s", target, desc) + // Otherwise, we already had a full voter replica. Can't add another to + // this store. + return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) + } + + // Any removals left in the map now refer to nonexisting replicas, and we refuse them. + for _, chg := range byNodeID { + if chg.ChangeType == roachpb.ADD_REPLICA { + continue } + return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } return nil } -func (r *Replica) addReplica( +func (r *Replica) addAndRemoveReplicas( ctx context.Context, - target roachpb.ReplicationTarget, desc *roachpb.RangeDescriptor, priority SnapshotRequest_Priority, reason storagepb.RangeLogEventReason, details string, + chgs roachpb.ReplicationChanges, ) (*roachpb.RangeDescriptor, error) { - if err := validateReplicationChanges(desc, target); err != nil { + if len(chgs) != 1 { + // TODO(tbg): lift this restriction when atomic membership changes are + // plumbed into raft. + return nil, errors.Errorf("need exactly one change, got %+v", chgs) + } + + if err := validateReplicationChanges(desc, chgs); err != nil { return nil, err } @@ -934,114 +966,140 @@ func (r *Replica) addReplica( useLearners := useLearnerReplicas.Get(&settings.SV) useLearners = useLearners && settings.Version.IsActive(cluster.VersionLearnerReplicas) if !useLearners { + // NB: we will never use atomic replication changes while learners are not + // also active. + target := chgs[0].Target return r.addReplicaLegacyPreemptiveSnapshot(ctx, target, desc, priority, reason, details) } - // First add the replica as a raft learner. This means it accepts raft traffic - // (so it can catch up) but doesn't vote (so it doesn't affect quorum and thus - // doesn't introduce fragility into the system). For details see + // For all newly added nodes, first add raft learner replicas. They accept raft traffic + // (so they can catch up) but don't get to vote (so they don't affect quorum and thus + // don't introduce fragility into the system). For details see: _ = roachpb.ReplicaDescriptors.Learners - learnerDesc, err := addLearnerReplica(ctx, r.store, desc, target, reason, details) + learnerDesc, err := addLearnerReplicas(ctx, r.store, desc, reason, details, chgs.Additions()) if err != nil { return nil, err } // Now move it to be a full voter (waiting on it to get a raft snapshot first, // so it's not immediately way behind). - voterDesc, err := r.promoteLearnerReplicaToVoter(ctx, learnerDesc, target, priority, reason, details) + voterDesc, err := r.finalizeConfChange(ctx, learnerDesc, priority, reason, details, chgs) if err != nil { // Don't leave a learner replica lying around if we didn't succeed in // promoting it to a voter. - log.Infof(ctx, "could not promote %s to voter, rolling back: %v", target, err) - r.rollbackLearnerReplica(ctx, learnerDesc, target, reason, details) + adds := chgs.Additions() + log.Infof(ctx, "could not promote %v to voter, rolling back: %v", adds, err) + for _, chg := range adds { + r.rollbackLearnerReplica(ctx, learnerDesc, chg.Target, reason, details) + } return nil, err } return voterDesc, nil } -func addLearnerReplica( +func addLearnerReplicas( ctx context.Context, store *Store, desc *roachpb.RangeDescriptor, - target roachpb.ReplicationTarget, reason storagepb.RangeLogEventReason, details string, + chgs []roachpb.ReplicationChange, ) (*roachpb.RangeDescriptor, error) { + if len(chgs) == 0 { + // If there's nothing to do, return early to avoid redundant work. + return desc, nil + } newDesc := *desc newDesc.SetReplicas(desc.Replicas().DeepCopy()) - replDesc := roachpb.ReplicaDescriptor{ - NodeID: target.NodeID, - StoreID: target.StoreID, - ReplicaID: desc.NextReplicaID, - Type: roachpb.ReplicaTypeLearner(), - } - newDesc.NextReplicaID++ - newDesc.AddReplica(replDesc) + for _, chg := range chgs { + replDesc := roachpb.ReplicaDescriptor{ + NodeID: chg.Target.NodeID, + StoreID: chg.Target.StoreID, + ReplicaID: desc.NextReplicaID, + Type: roachpb.ReplicaTypeLearner(), + } + newDesc.NextReplicaID++ + newDesc.AddReplica(replDesc) + } err := execChangeReplicasTxn( - ctx, store, roachpb.ADD_REPLICA, desc, replDesc, &newDesc, reason, details, + ctx, store, desc, &newDesc, reason, details, chgs, ) return &newDesc, err } -func (r *Replica) promoteLearnerReplicaToVoter( +// finalizeConfChange carries out the atomic membership change that finalizes +// the addition and/or removal of replicas. Any voters in the process of being +// added (as reflected by the replication changes) must have been added as +// learners already and will be caught up before being promoted to voters. Any +// replica removals (from the replication changes) will be removed. All of this +// occurs in one atomic raft membership change. +func (r *Replica) finalizeConfChange( ctx context.Context, desc *roachpb.RangeDescriptor, - target roachpb.ReplicationTarget, priority SnapshotRequest_Priority, reason storagepb.RangeLogEventReason, details string, + chgs roachpb.ReplicationChanges, ) (*roachpb.RangeDescriptor, error) { // TODO(dan): We allow ranges with learner replicas to split, so in theory // this may want to detect that and retry, sending a snapshot and promoting // both sides. + adds, removes := chgs.Additions(), chgs.Removals() newReplicas := desc.Replicas().DeepCopy().All() for i, rDesc := range newReplicas { - if rDesc.NodeID != target.NodeID || rDesc.StoreID != target.StoreID { - continue - } - if rDesc.GetType() != roachpb.ReplicaType_LEARNER { - return nil, errors.Errorf(`%s: cannot promote replica of type %s`, r, rDesc.Type) - } + for _, chg := range adds { + if rDesc.StoreID != chg.Target.StoreID { + continue + } + if rDesc.GetType() != roachpb.ReplicaType_LEARNER { + return nil, errors.Errorf(`%s: cannot promote replica of type %s`, r, rDesc.Type) + } - // Note that raft snapshot queue will refuse to send a snapshot to a learner - // replica if its store is already sending a snapshot to that replica. That - // races with this snapshot. Most of the time, this side will win the race, - // which avoids needlessly sending the snapshot twice. If the raft snapshot - // queue wins, it's wasteful, but doesn't impact correctness. - // - // Replicas are added to the raft snapshot queue by the raft leader. This - // code can be run anywhere (though it's usually run on the leaseholder, - // which is usually co-located with the raft leader). This means that - // they're usually on the same node, but not always, so that's about as good - // a guarantee as we can offer, anyway. - // - // We originally tried always refusing to send snapshots from the raft - // snapshot queue to learner replicas, but this turned out to be brittle. - // First, if the snapshot failed, any attempt to use the learner's raft - // group would hang until the replicate queue got around to cleaning up the - // orphaned learner. Second, this tickled some bugs in etcd/raft around - // switching between StateSnapshot and StateProbe. Even if we worked through - // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot(ctx, rDesc, SnapshotRequest_LEARNER, priority); err != nil { - return nil, err - } - - if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil { - if fn() { - return desc, nil + // Note that raft snapshot queue will refuse to send a snapshot to a learner + // replica if its store is already sending a snapshot to that replica. That + // races with this snapshot. Most of the time, this side will win the race, + // which avoids needlessly sending the snapshot twice. If the raft snapshot + // queue wins, it's wasteful, but doesn't impact correctness. + // + // Replicas are added to the raft snapshot queue by the raft leader. This + // code can be run anywhere (though it's usually run on the leaseholder, + // which is usually co-located with the raft leader). This means that + // they're usually on the same node, but not always, so that's about as good + // a guarantee as we can offer, anyway. + // + // We originally tried always refusing to send snapshots from the raft + // snapshot queue to learner replicas, but this turned out to be brittle. + // First, if the snapshot failed, any attempt to use the learner's raft + // group would hang until the replicate queue got around to cleaning up the + // orphaned learner. Second, this tickled some bugs in etcd/raft around + // switching between StateSnapshot and StateProbe. Even if we worked through + // these, it would be susceptible to future similar issues. + if err := r.sendSnapshot(ctx, rDesc, SnapshotRequest_LEARNER, priority); err != nil { + return nil, err } + + rDesc.Type = roachpb.ReplicaTypeVoter() + newReplicas[i] = rDesc } + } - rDesc.Type = roachpb.ReplicaTypeVoter() - newReplicas[i] = rDesc + if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil { + if fn() { + return desc, nil + } + } - updatedDesc := *desc - updatedDesc.SetReplicas(roachpb.MakeReplicaDescriptors(&newReplicas)) - err := execChangeReplicasTxn(ctx, r.store, roachpb.ADD_REPLICA, desc, rDesc, &updatedDesc, reason, details) - return &updatedDesc, err + updatedDesc := *desc + updatedDesc.SetReplicas(roachpb.MakeReplicaDescriptors(&newReplicas)) + for _, chg := range removes { + if _, found := updatedDesc.RemoveReplica(chg.Target.NodeID, chg.Target.StoreID); !found { + return nil, errors.Errorf("target to remove %v not found in %s", chg.Target, &updatedDesc) + } } - return nil, errors.Errorf(`%s: could not find replica to promote %s`, r, target) + + err := execChangeReplicasTxn(ctx, r.store, desc, &updatedDesc, reason, details, chgs) + return &updatedDesc, err } func (r *Replica) rollbackLearnerReplica( @@ -1067,8 +1125,9 @@ func (r *Replica) rollbackLearnerReplica( // blocking the caller indefinitely). const rollbackTimeout = 10 * time.Second rollbackFn := func(ctx context.Context) error { + chgs := roachpb.MakeReplicationChanges(roachpb.REMOVE_REPLICA, target) return execChangeReplicasTxn( - ctx, r.store, roachpb.REMOVE_REPLICA, desc, replDesc, &newDesc, reason, details, + ctx, r.store, desc, &newDesc, reason, details, chgs, ) } rollbackCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) @@ -1154,42 +1213,19 @@ func (r *Replica) addReplicaLegacyPreemptiveSnapshot( updatedDesc.NextReplicaID++ updatedDesc.AddReplica(repDesc) - err := execChangeReplicasTxn(ctx, r.store, roachpb.ADD_REPLICA, desc, repDesc, &updatedDesc, reason, details) - return &updatedDesc, err -} - -func (r *Replica) removeReplica( - ctx context.Context, - target roachpb.ReplicationTarget, - desc *roachpb.RangeDescriptor, - priority SnapshotRequest_Priority, - reason storagepb.RangeLogEventReason, - details string, -) (*roachpb.RangeDescriptor, error) { - if desc == nil { - return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r) - } - updatedDesc := *desc - updatedDesc.SetReplicas(desc.Replicas().DeepCopy()) - // If that exact node-store combination does not have the replica, - // abort the removal. - removed, ok := updatedDesc.RemoveReplica(target.NodeID, target.StoreID) - if !ok { - return nil, errors.Errorf("%s: unable to remove replica %v which is not present", r, target) - } - err := execChangeReplicasTxn(ctx, r.store, roachpb.REMOVE_REPLICA, desc, removed, &updatedDesc, reason, details) + chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, target) + err := execChangeReplicasTxn(ctx, r.store, desc, &updatedDesc, reason, details, chgs) return &updatedDesc, err } func execChangeReplicasTxn( ctx context.Context, store *Store, - changeType roachpb.ReplicaChangeType, desc *roachpb.RangeDescriptor, - repDesc roachpb.ReplicaDescriptor, updatedDesc *roachpb.RangeDescriptor, reason storagepb.RangeLogEventReason, details string, + chgs roachpb.ReplicationChanges, ) error { generationComparableEnabled := store.ClusterSettings().Version.IsActive(cluster.VersionGenerationComparable) if generationComparableEnabled { @@ -1197,6 +1233,22 @@ func execChangeReplicasTxn( updatedDesc.GenerationComparable = proto.Bool(true) } + var added, removed []roachpb.ReplicaDescriptor + for _, chg := range chgs.Additions() { + repDesc, ok := updatedDesc.GetReplicaDescriptor(chg.Target.StoreID) + if !ok { + return errors.Errorf("unable to find s%d in updated desc %s", chg.Target.StoreID, updatedDesc) + } + added = append(added, repDesc) + } + for _, chg := range chgs.Removals() { + repDesc, ok := desc.GetReplicaDescriptor(chg.Target.StoreID) + if !ok { + return errors.Errorf("unable to find s%d in desc %s", chg.Target.StoreID, desc) + } + removed = append(removed, repDesc) + } + descKey := keys.RangeDescriptorKey(desc.StartKey) if err := store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error { log.Event(ctx, "attempting txn") @@ -1205,7 +1257,7 @@ func execChangeReplicasTxn( if err != nil { return err } - log.Infof(ctx, "change replicas (%v %s): existing descriptor %s", changeType, repDesc, desc) + log.Infof(ctx, "change replicas (%v): existing descriptor %s", chgs, desc) { b := txn.NewBatch() @@ -1223,10 +1275,20 @@ func execChangeReplicasTxn( } // Log replica change into range event log. - if err := store.logChange( - ctx, txn, changeType, repDesc, *updatedDesc, reason, details, - ); err != nil { - return err + for _, tup := range []struct { + typ roachpb.ReplicaChangeType + repDescs []roachpb.ReplicaDescriptor + }{ + {roachpb.ADD_REPLICA, added}, + {roachpb.REMOVE_REPLICA, removed}, + } { + for _, repDesc := range tup.repDescs { + if err := store.logChange( + ctx, txn, tup.typ, repDesc, *updatedDesc, reason, details, + ); err != nil { + return err + } + } } // End the transaction manually instead of letting RunTransaction @@ -1240,21 +1302,18 @@ func execChangeReplicasTxn( var crt *roachpb.ChangeReplicasTrigger if !store.ClusterSettings().Version.IsActive(cluster.VersionAtomicChangeReplicasTrigger) { + var deprecatedRepDesc roachpb.ReplicaDescriptor + if len(added) > 0 { + deprecatedRepDesc = added[0] + } else { + deprecatedRepDesc = removed[0] + } crt = &roachpb.ChangeReplicasTrigger{ - DeprecatedChangeType: changeType, - DeprecatedReplica: repDesc, + DeprecatedChangeType: chgs[0].ChangeType, + DeprecatedReplica: deprecatedRepDesc, Desc: updatedDesc, } } else { - var added, removed []roachpb.ReplicaDescriptor - switch changeType { - case roachpb.ADD_REPLICA: - added = append(added, repDesc) - case roachpb.REMOVE_REPLICA: - removed = append(removed, repDesc) - default: - return errors.Errorf("unknown change type: %d", changeType) - } crt = &roachpb.ChangeReplicasTrigger{ Desc: updatedDesc, InternalAddedReplicas: added, diff --git a/pkg/storage/replica_learner_test.go b/pkg/storage/replica_learner_test.go index 4c49dfacf45e..2ad18972f8c4 100644 --- a/pkg/storage/replica_learner_test.go +++ b/pkg/storage/replica_learner_test.go @@ -542,7 +542,7 @@ func TestLearnerReplicateQueueRace(t *testing.T) { } formattedTrace := tracing.FormatRecordedSpans(trace) expectedMessages := []string{ - `could not promote n3,s3 to voter, rolling back: change replicas of r\d+ failed: descriptor changed`, + `could not promote .*n3,s3.* to voter, rolling back: change replicas of r\d+ failed: descriptor changed`, // TODO(dan): Consider skipping the rollback when trying to promote a // learner to a voter results in a "descriptor changed" error. `failed to rollback learner n3,s3, abandoning it for the replicate queue: change replicas of r\d+ failed: descriptor changed`, diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index c099c4ec7110..945e4911351c 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -5990,7 +5990,7 @@ func TestChangeReplicasDuplicateError(t *testing.T) { storagepb.ReasonRebalance, "", ); err == nil || !strings.Contains(err.Error(), "node already has a replica") { - t.Fatalf("must not be able to add second replica to same node (err=%s)", err) + t.Fatalf("must not be able to add second replica to same node (err=%+v)", err) } } diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 3550538f2245..adfbe1d9dbfd 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -760,7 +760,8 @@ func (rq *replicateQueue) addReplica( if dryRun { return nil } - if _, err := repl.addReplica(ctx, target, desc, priority, reason, details); err != nil { + chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, target) + if _, err := repl.addAndRemoveReplicas(ctx, desc, priority, reason, details, chgs); err != nil { return err } rangeUsageInfo := rangeUsageInfoForRepl(repl)