Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed Feb 24, 2022
1 parent d7e418c commit 463a6ba
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 49 deletions.
100 changes: 52 additions & 48 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,27 +184,17 @@ func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) {
require.NoError(t, tc.Write(n1))
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("injected breaker error"))
resumeHeartbeats := tc.ExpireLeaseAndLivenessOnN1(t, pauseHeartbeats)
resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats)

// n2 (not n1) will return a NotLeaseholderError. This may be surprising -
// why isn't it trying and succeeding to acquire a lease - but it does
// not do that because it sees that the new leaseholder (n2) is not live
// itself. We'll revisit this after re-enabling liveness later in the test.
{
err := tc.Read(n2)
// At time of writing: not incrementing epoch on n1 because next
// leaseholder (n2) not live.
t.Log(err)
tc.RequireIsNotLeaseholderError(t, err)
// Same behavior for write on n2.
tc.RequireIsNotLeaseholderError(t, tc.Write(n2))
}
// On n1, run into the circuit breaker when requesting lease. We have to
// resume heartbeats for this to not time out, as requesting the new lease
// entails doing liveness checks which can't succeed if nobody is
// heartbeating, and we'd get stuck in liveness before reaching the circuit
// breaker. (In other words, replica circuit breaking doesn't fail-fast
// requests reliably if liveness is unavailable; this is tracked in #74616.
// requests reliably if liveness is unavailable; this is tracked in #74616).
// We don't attempt to acquire a lease on n2 since it would try and succeed
// (except the test harness categorically prevents n2 from getting a lease,
// injecting an error).
resumeHeartbeats()
{
tc.RequireIsBreakerOpen(t, tc.Read(n1))
Expand Down Expand Up @@ -282,7 +272,7 @@ func TestReplicaCircuitBreaker_Follower_QuorumLoss(t *testing.T) {
// Get lease to n2 so that we can lose it without taking down the system ranges.
desc := tc.LookupRangeOrFatal(t, tc.ScratchRange(t))
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(n2))
resumeHeartbeats := tc.ExpireLeaseAndLivenessOnN1(t, keepHeartbeats)
resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, keepHeartbeats)
tc.StopServer(n2) // lose quorum and leaseholder
resumeHeartbeats()

Expand Down Expand Up @@ -348,7 +338,7 @@ func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) {

// Expire all leases. We also pause all heartbeats but that doesn't really
// matter since the liveness range is unavailable anyway.
resume := tc.ExpireLeaseAndLivenessOnN1(t, pauseHeartbeats)
resume := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats)
defer resume()

// Since there isn't a lease, and the liveness range is down, the circuit
Expand All @@ -368,6 +358,10 @@ func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) {
}

type dummyStream struct {
t interface {
Helper()
Logf(string, ...interface{})
}
ctx context.Context
recv chan *roachpb.RangeFeedEvent
}
Expand All @@ -377,7 +371,9 @@ func (s *dummyStream) Context() context.Context {
}

func (s *dummyStream) Send(ev *roachpb.RangeFeedEvent) error {
s.t.Helper()
if ev.Val == nil {
s.t.Logf("ignoring event: %v", ev)
return nil
}
select {
Expand Down Expand Up @@ -407,7 +403,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream1 := &dummyStream{ctx: ctx, recv: make(chan *roachpb.RangeFeedEvent)}
stream1 := &dummyStream{t: t, ctx: ctx, recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(args, stream1).GoError()
if ctx.Err() != nil {
Expand All @@ -421,7 +417,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
for {
var done bool
select {
case <-time.After(testutils.DefaultSucceedsSoonDuration):
case <-time.After(testutils.DefaultSucceedsSoonDuration / 5):
t.Fatal("timed out")
case <-ctx.Done():
t.Fatal(ctx.Err())
Expand All @@ -439,15 +435,15 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
readOneVal(t, ctx, stream1)

// NB: keep heartbeats because we're not trying to lose the liveness range.
undo := tc.ExpireLeaseAndLivenessOnN1(t, keepHeartbeats)
undo := tc.ExpireAllLeasesAndN1LivenessRecord(t, keepHeartbeats)
undo()
tc.SetSlowThreshold(10 * time.Millisecond)
tc.StopServer(n2)
tc.RequireIsBreakerOpen(t, tc.Write(n1))

// Start another stream during the "outage" to make sure it isn't rejected by
// the breaker.
stream2 := &dummyStream{ctx: ctx, recv: make(chan *roachpb.RangeFeedEvent)}
stream2 := &dummyStream{t: t, ctx: ctx, recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(args, stream2).GoError()
if ctx.Err() != nil {
Expand Down Expand Up @@ -514,15 +510,15 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) {
for _, reqFn := range exemptRequests {
req := reqFn()
tc.Run(t, fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) {
resumeHeartbeats := tc.ExpireLeaseAndLivenessOnN1(t, pauseHeartbeats)
resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats)
resumeHeartbeats() // intentionally resume right now so that lease can be acquired
// NB: when looking into the traces here, we sometimes see - as expected - that
// when the request tries to acquire a lease, the breaker is still tripped and
require.NoError(t, tc.Send(n1, req))
})
}

resumeHeartbeats := tc.ExpireLeaseAndLivenessOnN1(t, pauseHeartbeats)
resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats)

for _, reqFn := range exemptRequests {
req := reqFn()
Expand Down Expand Up @@ -626,9 +622,12 @@ type circuitBreakerTest struct {
slowThresh *atomic.Value // time.Duration
ManualClock *hlc.HybridManualClock
repls []replWithKnob // 0 -> repl on Servers[0], etc

seq int
}

func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest {
skip.UnderStressRace(t)
manualClock := hlc.NewHybridManualClock()
var rangeID int64 // atomic
slowThresh := &atomic.Value{} // supports .SetSlowThreshold(x)
Expand Down Expand Up @@ -753,35 +752,37 @@ func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) er
})
}

func (cbt *circuitBreakerTest) ExpireLeaseAndLivenessOnN1(
func (cbt *circuitBreakerTest) ExpireAllLeasesAndN1LivenessRecord(
t *testing.T, pauseHeartbeats bool,
) (undo func()) {
t.Helper()
var maxWT int64
var fs []func()
srv := cbt.Servers[0]
lv := srv.NodeLiveness().(*liveness.NodeLiveness)
if pauseHeartbeats {
undo := lv.PauseAllHeartbeatsForTest()
fs = append(fs, undo)
}
self, ok := lv.Self()
require.True(t, ok)
if maxWT < self.Expiration.WallTime {
maxWT = self.Expiration.WallTime
for idx, srv := range cbt.Servers {
lv := srv.NodeLiveness().(*liveness.NodeLiveness)

if pauseHeartbeats {
undo := lv.PauseAllHeartbeatsForTest()
fs = append(fs, undo)
}

self, ok := lv.Self()
require.True(t, ok)

cbt.ManualClock.Forward(self.Expiration.WallTime)
if idx == n1 {
// Invalidate n1's liveness record, to make sure that ranges on n1 need
// to acquire a new lease (vs waiting for a heartbeat to the liveness
// record resuscitating the old one).
//
// Needing to do this is the reason for special-casing this entire method
// around n1; if we stop heartbeats for both nodes, they can't increment
// each others liveness records: if a node's liveness is paused, it doesn't
// allow incrementing records neither. (This is silly).
lv2 := cbt.Server(n2).NodeLiveness().(*liveness.NodeLiveness)
require.NoError(t, lv2.IncrementEpoch(context.Background(), self))
}
}
cbt.ManualClock.Forward(maxWT + 1)
testutils.SucceedsSoon(t, func() error {
// Invalidate n1's liveness record, to make sure that ranges on n1 need
// to acquire a new lease (vs waiting for a heartbeat to the liveness
// record resuscitating the old one).
//
// Needing to do this is the reason for special-casing this entire method
// around n1; if we stop heartbeats for both nodes, they can't increment
// each others liveness records.
nl2 := cbt.Servers[n2].NodeLiveness().(*liveness.NodeLiveness)
return nl2.IncrementEpoch(context.Background(), self)
})

return func() {
for _, f := range fs {
f()
Expand Down Expand Up @@ -889,7 +890,10 @@ func (cbt *circuitBreakerTest) SetSlowThreshold(dur time.Duration) {
func (cbt *circuitBreakerTest) Write(idx int) error {
cbt.t.Helper()
repl := cbt.repls[idx]
put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello"))
cbt.seq++
put := roachpb.NewPut(
repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString(fmt.Sprintf("hello-%d", cbt.seq)),
)
return cbt.Send(idx, put)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
// TODO(during review): the breaker could no longer be tripped at this point,
// so we'd need to manufacture an error. This is at odds with the old setup
// which tried to have the breaker be the source of the error.
pErr = roachpb.NewError(errors.Wrapf(brErr, "%s", spanlatch.ErrPoisonedLatch))
pErr = roachpb.NewError(errors.CombineErrors(brErr, spanlatch.ErrPoisonedLatch))
}
return nil, pErr
} else if resp != nil {
Expand Down

0 comments on commit 463a6ba

Please sign in to comment.