Skip to content

Commit

Permalink
kvserver: wait for replication before sending snapshots
Browse files Browse the repository at this point in the history
This patch handles an edge case in config changes:
1. Replica.ChangeReplicas() is called on r1 to add a replica to the range
2. ChangeReplicas runs a transaction that modifies the range descriptor
3. Say that the replica doing this loses the lease. Or it never had it
   to begin with - although at least most of the time ChangeReplicas is
   called on a leaseholder.
4. ChangeReplicas attempts to send a snapshot to the newly-added replica
   (which has been added as a learner, but that doesn’t matter here)
5. If r1 has not applied the descriptor change yet, then the snapshot it
   produces is invalid because it doesn't contain the recipient in the
   descriptor (the desc inside the snapshot). We'll then attempt to
   remove the new replica(s) and fail the whole ChangeReplicas.

This patch makes snapshot generation wait until the sender replicas has
caught up with the descriptor that is has previously written. At that
point, it can send the snapshot just fine even if it isn't the
leaseholder.

I think this is generally a sane thing to do, but what prompted it is a
test that was directly calling r.ChangeReplicas() on a replica that's
not necessarily the leaseholder. This test becomes flaky with #55148
because the replica cannot always just take the lease as it did before
that change: with #55148 it now cannot take the lease if it's not the
leader, and so it ends up executing the ChangeReplicas() without holding
a lease. That was triggering the scenario described above. I'm
separately improving that one test to direct its request at the
leaseholder explicitly, but I'm afraid there might be others.

Release note: None
  • Loading branch information
andreimatei committed Dec 1, 2020
1 parent 07a6851 commit 2caacbd
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,33 @@ func (r *Replica) atomicReplicationChange(
// this may want to detect that and retry, sending a snapshot and promoting
// both sides.

// Wait for our replica to catch up with the descriptor change. The replica is
// expected to usually be already caught up because it's expected to usually
// be the leaseholder - but it doesn't have to be. Being caught up is
// important because we might need to send snapshots below to newly-added
// replicas, and those snapshots would be invalid if our stale descriptor
// doesn't contain the respective replicas.
// TODO(andrei): Find a better way to wait for replication. If we knew the
// LAI of the respective command, we could use waitForApplication().
descriptorOK := false
start := timeutil.Now()
retOpts := retry.Options{InitialBackoff: time.Second, MaxBackoff: time.Second, MaxRetries: 10}
for re := retry.StartWithCtx(ctx, retOpts); ; re.Next() {
rDesc := r.Desc()
if rDesc.Generation >= desc.Generation {
descriptorOK = true
break
}
log.VEventf(ctx, 1, "stale descriptor detected; waiting to catch up to replication. want: %s, have: %s",
desc, rDesc)
if _, err := r.IsDestroyed(); err != nil {
return nil, errors.Wrapf(err, "replica destroyed while waiting desc replication")
}
}
if !descriptorOK {
return nil, errors.Newf("waited for %s and replication hasn't caught up with descriptor update", time.Since(start))
}

iChgs := make([]internalReplicationChange, 0, len(chgs))

for _, target := range chgs.VoterAdditions() {
Expand Down Expand Up @@ -1609,6 +1636,11 @@ func prepareChangeReplicasTrigger(
return crt, nil
}

// execChangeReplicasTxn runs a txn updating a range descriptor. The txn commit
// will carry a ChangeReplicasTrigger. Returns the updated descriptor. Note
// that, if the current node does not have the leaseholder for the respective
// range, then upon return the node's replica of the range (if any) might not
// reflect the updated descriptor yet until it applies the transaction.
func execChangeReplicasTxn(
ctx context.Context,
store *Store,
Expand Down

0 comments on commit 2caacbd

Please sign in to comment.