Skip to content

Commit

Permalink
kvserver: deflake TestReplicaTombstone
Browse files Browse the repository at this point in the history
Like many other tests, this test could flake because we'd sometimes
catch a "cannot remove learner while snapshot is in flight" error.

I think the root cause is that sometimes there are errant Raft snapshots
in the system[^1] and these get mistaken for LEARNERs that are still
being caught up by the replicate queue. I tried to address this general
class of issues by making the check for in-flight learner snapshots not
care about *raft* snapshots.

I was able to stress TestReplicaTombstone for 30+ minutes without a
failure using that approach, whereas previously it usually failed within
a few minutes.

```
./dev test --stress pkg/kv/kvserver/ --filter TestReplicaTombstone 2>&1 | tee stress.log
[...]
2461 runs so far, 0 failures, over 35m45s
```

[^1]: #87553

Fixes #98883.

Epic: none
Release note: None
  • Loading branch information
tbg committed Mar 22, 2023
1 parent 4dc10b5 commit 6444df5
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err
raftStatus := r.raftStatusRLocked()

const anyRecipientStore roachpb.StoreID = 0
pendingSnapshotIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore)
_, pendingSnapshotIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore, false /* initialOnly */)
lastIndex := r.mu.lastIndexNotDurable
// NB: raftLogSize above adjusts for pending truncations that have already
// been successfully replicated via raft, but logSizeTrusted does not see if
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {

r.mu.state.RaftAppliedIndex = index1
// Add first constraint.
_, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID)
_, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, false /* initial */, storeID)
exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}}

// Make sure it registered.
Expand All @@ -658,15 +658,15 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
// Add another constraint with the same id. Extremely unlikely in practice
// but we want to make sure it doesn't blow anything up. Collisions are
// handled by ignoring the colliding update.
_, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID)
_, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, false /* initial */, storeID)
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)

// Helper that grabs the min constraint index (which can trigger GC as a
// byproduct) and asserts.
assertMin := func(exp uint64, now time.Time) {
t.Helper()
const anyRecipientStore roachpb.StoreID = 0
if maxIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore); maxIndex != exp {
if _, maxIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore, false /* initialOnly */); maxIndex != exp {
t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp)
}
}
Expand All @@ -678,7 +678,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
r.mu.state.RaftAppliedIndex = index2
// Add another, higher, index. We're not going to notice it's around
// until the lower one disappears.
_, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, storeID)
_, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, false /* initial */, storeID)

now := timeutil.Now()
// The colliding snapshot comes back. Or the original, we can't tell.
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
if fn := repl.store.cfg.TestingKnobs.RaftSnapshotQueueSkipReplica; fn != nil && fn() {
return false, nil
}
if repl.hasOutstandingSnapshotInFlightToStore(repDesc.StoreID) {
// There is a snapshot being transferred. It's probably an INITIAL snap,
// so bail for now and try again later.
// NB: we could pass `false` for initialOnly as well, but we are the "other"
// possible sender.
if _, ok := repl.hasOutstandingSnapshotInFlightToStore(repDesc.StoreID, true /* initialOnly */); ok {
// There is an INITIAL snapshot being transferred, so bail for now and try again later.
err := errors.Errorf(
"skipping snapshot; replica is likely a %s in the process of being added: %s",
typ,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2314,7 +2314,7 @@ func (r *Replica) GetLoadStatsForTesting() *load.ReplicaLoad {
// HasOutstandingLearnerSnapshotInFlightForTesting is for use only by tests to
// gather whether there are in-flight snapshots to learner replcas.
func (r *Replica) HasOutstandingLearnerSnapshotInFlightForTesting() bool {
return r.hasOutstandingLearnerSnapshotInFlight()
return r.errOnOutstandingLearnerSnapshotInflight() != nil
}

// ReadProtectedTimestampsForTesting is for use only by tests to read and update
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,9 +1334,9 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners(
// periods of time on a single range without making progress, which can stall
// other operations that they are expected to perform (see
// https://github.com/cockroachdb/cockroach/issues/79249 for example).
if r.hasOutstandingLearnerSnapshotInFlight() {
if err := r.errOnOutstandingLearnerSnapshotInflight(); err != nil {
return nil /* desc */, 0, /* learnersRemoved */
errCannotRemoveLearnerWhileSnapshotInFlight
errors.WithSecondaryError(errCannotRemoveLearnerWhileSnapshotInFlight, err)
}

if fn := r.store.TestingKnobs().BeforeRemovingDemotedLearner; fn != nil {
Expand Down Expand Up @@ -1839,7 +1839,7 @@ func (r *Replica) lockLearnerSnapshot(
var cleanups []func()
for _, addition := range additions {
lockUUID := uuid.MakeV4()
_, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, addition.StoreID)
_, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, true /* initial */, addition.StoreID)
cleanups = append(cleanups, cleanup)
}
return func() {
Expand Down Expand Up @@ -2793,7 +2793,7 @@ func (r *Replica) sendSnapshotUsingDelegate(
senderQueuePriority = 0
}
snapUUID := uuid.MakeV4()
appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, recipient.StoreID)
appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, snapType == kvserverpb.SnapshotRequest_INITIAL, recipient.StoreID)
// The cleanup function needs to be called regardless of success or failure of
// sending to release the log truncation constraint.
defer cleanup()
Expand Down
55 changes: 32 additions & 23 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,6 +1756,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
type snapTruncationInfo struct {
index uint64
recipientStore roachpb.StoreID
initial bool
}

// addSnapshotLogTruncation creates a log truncation record which will prevent
Expand All @@ -1771,8 +1772,10 @@ type snapTruncationInfo struct {
// a possibly stale value here is harmless since the values increases
// monotonically. The actual snapshot index, may preserve more from a log
// truncation perspective.
// If initial is true, the snapshot is marked as being sent by the replicate
// queue to a new replica; some callers only care about these snapshots.
func (r *Replica) addSnapshotLogTruncationConstraint(
ctx context.Context, snapUUID uuid.UUID, recipientStore roachpb.StoreID,
ctx context.Context, snapUUID uuid.UUID, initial bool, recipientStore roachpb.StoreID,
) (uint64, func()) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -1794,6 +1797,7 @@ func (r *Replica) addSnapshotLogTruncationConstraint(
r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{
index: appliedIndex,
recipientStore: recipientStore,
initial: initial,
}

return appliedIndex, func() {
Expand All @@ -1814,48 +1818,53 @@ func (r *Replica) addSnapshotLogTruncationConstraint(
}
}

// getSnapshotLogTruncationConstraints returns the minimum index of any
// getSnapshotLogTruncationConstraintsRLocked returns the minimum index of any
// currently outstanding snapshot being sent from this replica to the specified
// recipient or 0 if there isn't one. Passing 0 for recipientStore means any
// recipient.
func (r *Replica) getSnapshotLogTruncationConstraints(
recipientStore roachpb.StoreID,
) (minSnapIndex uint64) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.getSnapshotLogTruncationConstraintsRLocked(recipientStore)
}

// recipient. If initialOnly is set, only snapshots sent by the replicate queue
// to new replicas are considered.
func (r *Replica) getSnapshotLogTruncationConstraintsRLocked(
recipientStore roachpb.StoreID,
) (minSnapIndex uint64) {
recipientStore roachpb.StoreID, initialOnly bool,
) (_ []snapTruncationInfo, minSnapIndex uint64) {
var sl []snapTruncationInfo
for _, item := range r.mu.snapshotLogTruncationConstraints {
if initialOnly && !item.initial {
continue
}
if recipientStore != 0 && item.recipientStore != recipientStore {
continue
}
if minSnapIndex == 0 || minSnapIndex > item.index {
minSnapIndex = item.index
}
sl = append(sl, item)
}
return minSnapIndex
return sl, minSnapIndex
}

// hasOutstandingLearnerSnapshotInFlight returns true if there is a snapshot in
// progress from this replica to a learner replica for this range.
func (r *Replica) hasOutstandingLearnerSnapshotInFlight() bool {
// errOnOutstandingLearnerSnapshotInflight returns an error if there is a
// snapshot in progress from this replica to a learner replica for this range.
func (r *Replica) errOnOutstandingLearnerSnapshotInflight() error {
learners := r.Desc().Replicas().LearnerDescriptors()
for _, repl := range learners {
if r.hasOutstandingSnapshotInFlightToStore(repl.StoreID) {
return true
sl, _ := r.hasOutstandingSnapshotInFlightToStore(repl.StoreID, true /* initialOnly */)
if len(sl) > 0 {
return errors.Errorf("INITIAL snapshots in flight to s%d: %v", repl.StoreID, sl)
}
}
return false
return nil
}

// hasOutstandingSnapshotInFlightToStore returns true if there is a snapshot in
// flight from this replica to the store with the given ID.
func (r *Replica) hasOutstandingSnapshotInFlightToStore(storeID roachpb.StoreID) bool {
return r.getSnapshotLogTruncationConstraints(storeID) > 0
// flight from this replica to the store with the given ID. If initialOnly is
// true, only snapshots sent by the replicate queue to new replicas are considered.
func (r *Replica) hasOutstandingSnapshotInFlightToStore(
storeID roachpb.StoreID, initialOnly bool,
) ([]snapTruncationInfo, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
sl, idx := r.getSnapshotLogTruncationConstraintsRLocked(storeID, initialOnly)
return sl, idx > 0
}

// HasRaftLeader returns true if the raft group has a raft leader currently.
Expand Down

0 comments on commit 6444df5

Please sign in to comment.