Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: exempt certain requests from circuit breakers #76673

Merged
merged 1 commit into from
Feb 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 116 additions & 21 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver_test

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -461,6 +465,89 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
readOneVal(t, stream2)
}

func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := setupCircuitBreakerTest(t)
defer tc.Stopper().Stop(context.Background())

// Put the lease on n1 but then trip the breaker with the probe
// disabled, i.e. it will stay tripped.
require.NoError(t, tc.Write(n1))
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("boom"))

exemptRequests := []func() roachpb.Request{
func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} },
func() roachpb.Request {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(context.Background(), cluster.MakeTestingClusterSettings(), sstFile)
defer sst.Close()
require.NoError(t, sst.LogData([]byte("hello")))
require.NoError(t, sst.Finish())

addReq := &roachpb.AddSSTableRequest{
Data: sstFile.Data(),
IngestAsWrites: true,
}
return addReq
},
func() roachpb.Request {
return &roachpb.RevertRangeRequest{TargetTime: tc.Servers[0].Clock().Now()}
},
func() roachpb.Request {
return &roachpb.GCRequest{}
},
func() roachpb.Request {
return &roachpb.ClearRangeRequest{}
},
func() roachpb.Request {
return &roachpb.ProbeRequest{}
},
}

for _, reqFn := range exemptRequests {
req := reqFn()
t.Run(fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) {
require.NoError(t, tc.Send(n1, req))
})
}
for _, reqFn := range exemptRequests {
req := reqFn()
t.Run(fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) {
resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats)
resumeHeartbeats() // intentionally resume right now so that lease can be acquired
require.NoError(t, tc.Send(n1, req))
})
}

resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats)
defer resumeHeartbeats() // can't acquire leases until test ends

for _, reqFn := range exemptRequests {
req := reqFn()
if req.Method() == roachpb.Probe {
// Probe does not require the lease, and is the most-tested of the bunch
// already. We don't have to test it again here, which would require undue
// amounts of special-casing below.
continue
}
t.Run(fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond)
defer cancel()
const maxWait = 5 * time.Second
tBegin := timeutil.Now()
err := tc.SendCtx(ctx, n1, req)
t.Log(err) // usually: [NotLeaseHolderError] lease acquisition canceled because context canceled
require.Error(t, err)
require.Error(t, ctx.Err())
// Make sure we didn't run into the "long" timeout inside of SendCtx but
// actually terminated as a result of our ctx cancelling.
require.Less(t, timeutil.Since(tBegin), maxWait)
})
}
}

// Test infrastructure below.

func makeBreakerToggleable(b *circuit.Breaker) (setProbeEnabled func(bool)) {
Expand Down Expand Up @@ -629,23 +716,37 @@ func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats boo
}
}

func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Request) error {
func (cbt *circuitBreakerTest) Send(idx int, req roachpb.Request) error {
return cbt.SendCtx(context.Background(), idx, req)

}

func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb.Request) error {
var ba roachpb.BatchRequest
repl := cbt.repls[idx]
ba.RangeID = repl.Desc().RangeID
ba.Timestamp = repl.Clock().Now()
ba.Add(req)
ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration)
if h := req.Header(); len(h.Key) == 0 {
h.Key = repl.Desc().StartKey.AsRawKey()
if roachpb.IsRange(req) {
h.EndKey = repl.Desc().EndKey.AsRawKey()
}
req.SetHeader(h)
}
parCtx := ctx
ctx, cancel := context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration)
defer cancel()
// Tag the breaker with the request. Once Send returns, we'll check that it's
// no longer tracked by the breaker. This gives good coverage that we're not
// going to leak memory.
ctx = context.WithValue(ctx, req, struct{}{})

defer cancel()
_, pErr := repl.Send(ctx, ba)
// If our context got canceled, return an opaque error regardless of presence or
// absence of actual error. This makes sure we don't accidentally pass tests as
// a result of our context cancellation.
if err := ctx.Err(); err != nil {
if err := ctx.Err(); err != nil && parCtx.Err() == nil {
pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr)
}
{
Expand All @@ -665,6 +766,11 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque
return pErr.GoError()
}

func (cbt *circuitBreakerTest) WriteDS(idx int) error {
put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello"))
return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put)
}

func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb.Request) error {
var ba roachpb.BatchRequest
ba.Add(req)
Expand Down Expand Up @@ -696,32 +802,21 @@ func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error)
require.True(t, ok, "%+v", err)
}

func (cbt *circuitBreakerTest) Write(idx int) error {
return cbt.writeViaRepl(cbt.repls[idx].Replica)
}

func (cbt *circuitBreakerTest) WriteDS(idx int) error {
put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello"))
return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put)
}

// SetSlowThreshold sets the SlowReplicationThreshold for requests sent through the
// test harness (i.e. via Write) to the provided duration. The zero value restores
// the default.
func (cbt *circuitBreakerTest) SetSlowThreshold(dur time.Duration) {
cbt.slowThresh.Store(dur)
}

func (cbt *circuitBreakerTest) Read(idx int) error {
return cbt.readViaRepl(cbt.repls[idx].Replica)
}

func (cbt *circuitBreakerTest) writeViaRepl(repl *kvserver.Replica) error {
func (cbt *circuitBreakerTest) Write(idx int) error {
repl := cbt.repls[idx]
put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello"))
return cbt.sendViaRepl(repl, put)
return cbt.Send(idx, put)
}

func (cbt *circuitBreakerTest) readViaRepl(repl *kvserver.Replica) error {
func (cbt *circuitBreakerTest) Read(idx int) error {
repl := cbt.repls[idx]
get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */)
return cbt.sendViaRepl(repl, get)
return cbt.Send(idx, get)
}
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/replica_backpressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ func canBackpressureBatch(ba *roachpb.BatchRequest) bool {
return false
}

// bypassReplicaCircuitBreakerForBatch returns whether the provided
// BatchRequest bypasses the per-Replica circuit breaker. This is the
// case if any request in the batch is requesting to do so.
func bypassReplicaCircuitBreakerForBatch(ba *roachpb.BatchRequest) bool {
for _, ru := range ba.Requests {
req := ru.GetInner()
if roachpb.BypassesReplicaCircuitBreaker(req) {
return true
}
}
return false
}

// shouldBackpressureWrites returns whether writes to the range should be
// subject to backpressure. This is based on the size of the range in
// relation to the split size. The method returns true if the range is more
Expand Down
17 changes: 12 additions & 5 deletions pkg/kv/kvserver/replica_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (br *replicaCircuitBreaker) Register(

// TODO(tbg): we may want to exclude more requests from this check, or allow
// requests to exclude themselves from the check (via their header). This
// latter mechanism could also replace isCircuitBreakerProbe.
if isCircuitBreakerProbe(ctx) {
// latter mechanism could also replace hasBypassCircuitBreakerMarker.
if hasBypassCircuitBreakerMarker(ctx) {
// NB: brSig.C() == nil.
brSig = neverTripSignaller{}
}
Expand Down Expand Up @@ -184,6 +184,10 @@ func (br *replicaCircuitBreaker) UnregisterAndAdjustError(
return pErr
}

func (br *replicaCircuitBreaker) HasMark(err error) bool {
return br.wrapped.HasMark(err)
}

func (br *replicaCircuitBreaker) cancelAllTrackedContexts() {
br.cancels.Visit(func(ctx context.Context, cancel func()) (remove bool) {
cancel()
Expand Down Expand Up @@ -297,11 +301,11 @@ func (r replicaCircuitBreakerLogger) OnReset(br *circuit.Breaker) {

type probeKey struct{}

func isCircuitBreakerProbe(ctx context.Context) bool {
func hasBypassCircuitBreakerMarker(ctx context.Context) bool {
return ctx.Value(probeKey{}) != nil
}

func withCircuitBreakerProbeMarker(ctx context.Context) context.Context {
func withBypassCircuitBreakerMarker(ctx context.Context) context.Context {
return context.WithValue(ctx, probeKey{}, probeKey{})
}

Expand Down Expand Up @@ -330,7 +334,10 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) {
}

func sendProbe(ctx context.Context, r replicaInCircuitBreaker) error {
ctx = withCircuitBreakerProbeMarker(ctx)
// NB: we don't need to put this marker since ProbeRequest has the
// canBypassReplicaCircuitBreaker flag, but if in the future we do
// additional work in this method we may need it.
ctx = withBypassCircuitBreakerMarker(ctx)
desc := r.Desc()
if !desc.IsInitialized() {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestReplicaCircuitBreaker_Register(t *testing.T) {
defer leaktest.AfterTest(t)()
br, stopper := setupCircuitBreakerTest(t, "mutexmap-1")
defer stopper.Stop(context.Background())
ctx := withCircuitBreakerProbeMarker(context.Background())
ctx := withBypassCircuitBreakerMarker(context.Background())
tok, sig, err := br.Register(ctx, func() {})
require.NoError(t, err)
defer br.UnregisterAndAdjustError(tok, sig, nil /* pErr */)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,7 @@ func (r *Replica) refreshProposalsLocked(
}

var maxSlowProposalDurationRequest *roachpb.BatchRequest
// TODO(tbg): don't track exempt requests for tripping the breaker?
var maxSlowProposalDuration time.Duration
var slowProposalCount int64
var reproposals pendingCmdSlice
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,20 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// coalesced requests timeout/cancel. p.cancelLocked (defined below) is the
// cancel function that must be called; calling just cancel is insufficient.
ctx := p.repl.AnnotateCtx(context.Background())
if hasBypassCircuitBreakerMarker(ctx) {
// If the caller bypasses the circuit breaker, allow the lease to do the
// same. Otherwise, the lease will be refused by the circuit breaker as
// well.
//
// Note that there is a tiny race: if a request is in flight, but the
// request that triggered it (i.e. parentCtx here) does *not* bypass the
// probe, and before the circuit breaker rejects the inflight lease another
// request that *does* want to bypass the probe joins the request, it too
// will receive the circuit breaker error. This is special-cased in
// `redirectOnOrAcquireLease`, where such a caller needs to retry instead of
// propagating the error.
ctx = withBypassCircuitBreakerMarker(ctx)
}
const opName = "request range lease"
tr := p.repl.AmbientContext.Tracer
tagsOpt := tracing.WithLogTags(logtags.FromContext(parentCtx))
Expand Down Expand Up @@ -1112,6 +1126,11 @@ func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStat
func (r *Replica) redirectOnOrAcquireLeaseForRequest(
ctx context.Context, reqTS hlc.Timestamp,
) (kvserverpb.LeaseStatus, *roachpb.Error) {
if hasBypassCircuitBreakerMarker(ctx) {
defer func() {
log.Infof(ctx, "hello")
}()
}
// Try fast-path.
now := r.store.Clock().NowAsClockTimestamp()
{
Expand Down Expand Up @@ -1209,6 +1228,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
}
if llHandle == nil {
// We own a valid lease.
log.Eventf(ctx, "valid lease %+v", status)
return status, nil
}

Expand Down Expand Up @@ -1244,6 +1264,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// cannot be reproposed so we get this ambiguity.
// We'll just loop around.
return nil
case r.breaker.HasMark(goErr) && hasBypassCircuitBreakerMarker(ctx):
// If this request wanted to bypass the circuit breaker but still got a
// breaker error back, it joined a lease request started by an operation
// that did not bypass circuit breaker errors. Loop around and try again.
// See requestLeaseAsync for details.
return nil
case errors.HasType(goErr, (*roachpb.LeaseRejectedError)(nil)):
var tErr *roachpb.LeaseRejectedError
errors.As(goErr, &tErr)
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (r *Replica) sendWithoutRangeID(

// Circuit breaker handling.
ctx, cancel := context.WithCancel(ctx)
if bypassReplicaCircuitBreakerForBatch(ba) {
ctx = withBypassCircuitBreakerMarker(ctx)
}
tok, brSig, err := r.breaker.Register(ctx, cancel)
if err != nil {
return nil, roachpb.NewError(err)
Expand Down
Loading