diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 12fc8fdb249b..7fd2338ebf89 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1336,6 +1336,33 @@ func (r *Replica) atomicReplicationChange( // this may want to detect that and retry, sending a snapshot and promoting // both sides. + // Wait for our replica to catch up with the descriptor change. The replica is + // expected to usually be already caught up because it's expected to usually + // be the leaseholder - but it doesn't have to be. Being caught up is + // important because we might need to send snapshots below to newly-added + // replicas, and those snapshots would be invalid if our stale descriptor + // doesn't contain the respective replicas. + // TODO(andrei): Find a better way to wait for replication. If we knew the + // LAI of the respective command, we could use waitForApplication(). + descriptorOK := false + start := timeutil.Now() + retOpts := retry.Options{InitialBackoff: time.Second, MaxBackoff: time.Second, MaxRetries: 10} + for re := retry.StartWithCtx(ctx, retOpts); ; re.Next() { + rDesc := r.Desc() + if rDesc.Generation >= desc.Generation { + descriptorOK = true + break + } + log.VEventf(ctx, 1, "stale descriptor detected; waiting to catch up to replication. want: %s, have: %s", + desc, rDesc) + if _, err := r.IsDestroyed(); err != nil { + return nil, errors.Wrapf(err, "replica destroyed while waiting desc replication") + } + } + if !descriptorOK { + return nil, errors.Newf("waited for %s and replication hasn't caught up with descriptor update", time.Since(start)) + } + iChgs := make([]internalReplicationChange, 0, len(chgs)) for _, target := range chgs.VoterAdditions() { @@ -1609,6 +1636,11 @@ func prepareChangeReplicasTrigger( return crt, nil } +// execChangeReplicasTxn runs a txn updating a range descriptor. The txn commit +// will carry a ChangeReplicasTrigger. Returns the updated descriptor. Note +// that, if the current node does not have the leaseholder for the respective +// range, then upon return the node's replica of the range (if any) might not +// reflect the updated descriptor yet until it applies the transaction. func execChangeReplicasTxn( ctx context.Context, store *Store,