diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index ae8e526f7235..8c1f1da0595f 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -673,10 +673,12 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { }) } - // Restore the breaker via the probe. + // Restore the breaker via the probe, and wait for any pending (re)proposals + // from previous tests to be flushed. resumeHeartbeats() tc.SetProbeEnabled(n1, true) tc.UntripsSoon(t, tc.Write, n1) + tc.WaitForProposals(t, n1) // Lose quorum (liveness stays intact). tc.SetSlowThreshold(10 * time.Millisecond) @@ -886,6 +888,19 @@ func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) er }) } +func (cbt *circuitBreakerTest) WaitForProposals(t *testing.T, idx int) { + t.Helper() + testutils.SucceedsSoon(t, func() error { + t.Helper() + + repl := cbt.repls[idx].Replica + if n := repl.NumPendingProposals(); n > 0 { + return errors.Errorf("%d pending proposals", n) + } + return nil + }) +} + func (cbt *circuitBreakerTest) ExpireAllLeasesAndN1LivenessRecord( t *testing.T, pauseHeartbeats bool, ) (undo func()) { diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 4d3ae1475080..6d8b8602b7ea 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -400,6 +400,12 @@ func (r *Replica) QuotaReleaseQueueLen() int { return len(r.mu.quotaReleaseQueue) } +func (r *Replica) NumPendingProposals() int { + r.mu.RLock() + defer r.mu.RUnlock() + return r.numPendingProposalsRLocked() +} + func (r *Replica) IsFollowerActiveSince( ctx context.Context, followerID roachpb.ReplicaID, threshold time.Duration, ) bool {