diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 295ae01f942e..7b9f7466b62a 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "replica_backpressure.go", "replica_batch_updates.go", "replica_circuit_breaker.go", + "replica_circuit_breaker_cancelstorage.go", "replica_closedts.go", "replica_command.go", "replica_consistency.go", @@ -223,6 +224,7 @@ go_test( "client_rangefeed_test.go", "client_relocate_range_test.go", "client_replica_backpressure_test.go", + "client_replica_circuit_breaker_bench_test.go", "client_replica_circuit_breaker_test.go", "client_replica_gc_test.go", "client_replica_test.go", diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go new file mode 100644 index 000000000000..686a6a6c557e --- /dev/null +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go @@ -0,0 +1,124 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" +) + +type replicaCircuitBreakerBench struct { + *testcluster.TestCluster + pool *sync.Pool // *BatchRequest +} + +func (tc *replicaCircuitBreakerBench) repl(b *testing.B) *kvserver.Replica { + return tc.GetFirstStoreFromServer(b, 0).LookupReplica(keys.MustAddr(tc.ScratchRange(b))) +} + +func setupCircuitBreakerReplicaBench( + b *testing.B, breakerEnabled bool, cs string, +) (*replicaCircuitBreakerBench, *stop.Stopper) { + b.Helper() + + var numShards int + { + _, err := fmt.Sscanf(cs, "mutexmap-%d", &numShards) + require.NoError(b, err) + } + sFn := func() kvserver.CancelStorage { return &kvserver.MapCancelStorage{NumShards: numShards} } + + var knobs kvserver.StoreTestingKnobs + knobs.CancelStorageFactory = sFn + + var args base.TestClusterArgs + args.ServerArgs.Knobs.Store = &knobs + tc := testcluster.StartTestCluster(b, 1, args) + + stmt := `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '1000s'` + if !breakerEnabled { + stmt = `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '0s'` + } + _, err := tc.ServerConn(0).Exec(stmt) + require.NoError(b, err) + wtc := &replicaCircuitBreakerBench{ + TestCluster: tc, + } + wtc.pool = &sync.Pool{ + New: func() interface{} { + repl := wtc.repl(b) + var ba roachpb.BatchRequest + ba.RangeID = repl.RangeID + ba.Timestamp = repl.Clock().NowAsClockTimestamp().ToTimestamp() + var k roachpb.Key + k = append(k, repl.Desc().StartKey.AsRawKey()...) + k = encoding.EncodeUint64Ascending(k, uint64(rand.Intn(1000))) + ba.Add(roachpb.NewGet(k, false)) + return &ba + }, + } + return wtc, tc.Stopper() +} + +func BenchmarkReplicaCircuitBreakerSendOverhead(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + ctx := context.Background() + + for _, enabled := range []bool{false, true} { + b.Run("enabled="+strconv.FormatBool(enabled), func(b *testing.B) { + dss := []string{ + "mutexmap-1", "mutexmap-2", "mutexmap-4", "mutexmap-8", "mutexmap-12", "mutexmap-16", + "mutexmap-20", "mutexmap-24", "mutexmap-32", "mutexmap-64", + } + if !enabled { + dss = dss[:1] + } + + for _, ds := range dss { + b.Run(ds, func(b *testing.B) { + b.ReportAllocs() + tc, stopper := setupCircuitBreakerReplicaBench(b, enabled, ds) + defer stopper.Stop(ctx) + + repl := tc.repl(b) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + ba := tc.pool.Get().(*roachpb.BatchRequest) + _, err := repl.Send(ctx, *ba) + tc.pool.Put(ba) + if err != nil { + b.Fatal(err) + } + } + }) + }) + } + }) + } +} diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index f75dec6bb1f0..5a5c106f5e8f 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -540,6 +540,11 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque ba.Timestamp = repl.Clock().Now() ba.Add(req) ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) + // Tag the breaker with the request. Once Send returns, we'll check that it's + // no longer tracked by the breaker. This gives good coverage that we're not + // going to leak memory. + ctx = context.WithValue(ctx, req, struct{}{}) + defer cancel() _, pErr := repl.Send(ctx, ba) // If our context got canceled, return an opaque error regardless of presence or @@ -548,6 +553,20 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque if err := ctx.Err(); err != nil { pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr) } + { + var err error + repl.VisitBreakerContexts(func(ctx context.Context) { + if err == nil && ctx.Value(req) != nil { + err = errors.Errorf( + "request %s returned but context still tracked in breaker", req, + ) + } + }) + if err != nil { + pErr = roachpb.NewErrorf("%s; after %v", err, pErr) + } + } + return pErr.GoError() } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 42b4ec4e6f78..886a230addf2 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -226,6 +226,13 @@ func (r *Replica) Breaker() *circuit2.Breaker { return r.breaker.wrapped } +func (r *Replica) VisitBreakerContexts(fn func(ctx context.Context)) { + r.breaker.cancels.Visit(func(ctx context.Context, _ func()) (remove bool) { + fn(ctx) + return false // keep + }) +} + func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) { r.raftMu.Lock() defer r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_circuit_breaker.go b/pkg/kv/kvserver/replica_circuit_breaker.go index 11815e0d9b41..09ccf070c6f8 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker.go +++ b/pkg/kv/kvserver/replica_circuit_breaker.go @@ -12,6 +12,7 @@ package kvserver import ( "context" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -61,12 +62,139 @@ type replicaCircuitBreaker struct { stopper *stop.Stopper r replicaInCircuitBreaker st *cluster.Settings + cancels CancelStorage wrapped *circuit.Breaker + + versionIsActive int32 // atomic +} + +// Register takes a cancelable context and its cancel function (which the caller +// must cancel when the request has finished), and registers them with the +// circuit breaker. If the breaker is already tripped, its error is returned +// immediately and the caller should not continue processing the request. +// Otherwise, the cancel function is invoked if the breaker trips. The caller is +// provided with a token and signaller for use in a call to +// UnregisterAndAdjustError upon request completion. That method also takes the +// error (if any) resulting from the request to ensure that in the case of a +// tripped breaker, the error reflects this fact. +func (br *replicaCircuitBreaker) Register( + ctx context.Context, cancel func(), +) (_token interface{}, _ signaller, _ error) { + brSig := br.Signal() + + // TODO(tbg): we may want to exclude more requests from this check, or allow + // requests to exclude themselves from the check (via their header). This + // latter mechanism could also replace isCircuitBreakerProbe. + if isCircuitBreakerProbe(ctx) { + // NB: brSig.C() == nil. + brSig = neverTripSignaller{} + } + + if brSig.C() == nil { + // Circuit breakers are disabled and/or this is a probe request, so don't do + // any work registering the context. UnregisterAndAdjustError will know that we didn't + // since it checks the same brSig for a nil C(). + return ctx, brSig, nil + } + + // NB: it might be tempting to check the breaker error first to avoid the call + // to Set below if the breaker is tripped at this point. However, the ordering + // here, subtly, is required to avoid situations in which the cancel is still + // in the map despite the probe having shut down (in which case cancel will + // not be invoked until the probe is next triggered, which maybe "never"). + // + // To see this, consider the case in which the breaker is initially not + // tripped when we check, but then trips immediately and has the probe fail + // (and terminate). Since the probe is in charge of cancelling all tracked + // requests, we must ensure that this probe sees our request. Adding the + // request prior to calling Signal() means that if we see an untripped + // breaker, no probe is running - consequently should the breaker then trip, + // it will observe our cancel, thus avoiding a leak. If we observe a tripped + // breaker, we also need to remove our own cancel, as the probe may already + // have passed the point at which it iterates through the cancels prior to us + // inserting it. The cancel may be invoked twice, but that's ok. + // + // See TestReplicaCircuitBreaker_NoCancelRace. + tok := br.cancels.Set(ctx, cancel) + if err := brSig.Err(); err != nil { + br.cancels.Del(tok) + cancel() + return nil, nil, err + } + + return tok, brSig, nil +} + +// UnregisterAndAdjustError releases a tracked cancel function upon request +// completion. The error resulting from the request is passed in to allow +// decorating it in case the breaker tripped while the request was in-flight. +// +// See Register. +func (br *replicaCircuitBreaker) UnregisterAndAdjustError( + tok interface{}, sig signaller, pErr *roachpb.Error, +) *roachpb.Error { + if sig.C() == nil { + // Breakers were disabled and we never put the cancel in the registry. + return pErr + } + + br.cancels.Del(tok) + + brErr := sig.Err() + if pErr == nil || brErr == nil { + return pErr + } + + // The breaker tripped and the command is returning an error. Make sure the + // error reflects the tripped breaker. + + err := pErr.GoError() + if ae := (&roachpb.AmbiguousResultError{}); errors.As(err, &ae) { + // The breaker tripped while a command was inflight, so we have to + // propagate an ambiguous result. We don't want to replace it, but there + // is a way to stash an Error in it so we use that. + // + // TODO(tbg): could also wrap it; there is no other write to WrappedErr + // in the codebase and it might be better to remove it. Nested *Errors + // are not a good idea. + wrappedErr := brErr + if ae.WrappedErr != nil { + wrappedErr = errors.Wrapf(brErr, "%v", ae.WrappedErr) + } + ae.WrappedErr = roachpb.NewError(wrappedErr) + return roachpb.NewError(ae) + } else if le := (&roachpb.NotLeaseHolderError{}); errors.As(err, &le) { + // When a lease acquisition triggered by this request is short-circuited + // by the breaker, it will return an opaque NotLeaseholderError, which we + // replace with the breaker's error. + return roachpb.NewError(errors.CombineErrors(brErr, le)) + } + return pErr +} + +func (br *replicaCircuitBreaker) cancelAllTrackedContexts() { + br.cancels.Visit(func(ctx context.Context, cancel func()) (remove bool) { + cancel() + return true // remove + }) +} + +func (br *replicaCircuitBreaker) canEnable() bool { + b := atomic.LoadInt32(&br.versionIsActive) == 1 + if b { + return true // fast path + } + // IsActive is mildly expensive since it has to unmarshal + // a protobuf. + if br.st.Version.IsActive(context.Background(), clusterversion.ProbeRequest) { + atomic.StoreInt32(&br.versionIsActive, 1) + return true + } + return false // slow path } func (br *replicaCircuitBreaker) enabled() bool { - return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 && - br.st.Version.IsActive(context.Background(), clusterversion.ProbeRequest) + return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 && br.canEnable() } func (br *replicaCircuitBreaker) newError() error { @@ -108,6 +236,7 @@ func newReplicaCircuitBreaker( stopper *stop.Stopper, ambientCtx log.AmbientContext, r replicaInCircuitBreaker, + s CancelStorage, onTrip func(), onReset func(), ) *replicaCircuitBreaker { @@ -117,7 +246,8 @@ func newReplicaCircuitBreaker( r: r, st: cs, } - + br.cancels = s + br.cancels.Reset() br.wrapped = circuit.NewBreaker(circuit.Options{ Name: "breaker", // log bridge has ctx tags AsyncProbe: br.asyncProbe, @@ -173,6 +303,13 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { return } + // First, tell all current requests to fail fast. Note that clients insert + // first, then check the breaker (and remove themselves if breaker already + // tripped then). This prevents any cancels from sneaking in after the probe + // gets past this point, which could otherwise leave cancels hanging until + // "something" triggers the next probe (which may be never if no more traffic + // arrives at the Replica). See Register. + br.cancelAllTrackedContexts() err := sendProbe(ctx, br.r) report(err) }); err != nil { diff --git a/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go b/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go new file mode 100644 index 000000000000..c8f46bcffa16 --- /dev/null +++ b/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go @@ -0,0 +1,115 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "sync" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// CancelStorage implements tracking of context cancellation functions +// for use by Replica circuit breakers. +type CancelStorage interface { + // Reset initializes the storage. Not thread safe. + Reset() + // Set adds context and associated cancel func to the storage. Returns a token + // that can be passed to Del. + // + // Set is thread-safe. + Set(_ context.Context, cancel func()) (token interface{}) + // Del removes a cancel func, as identified by the token returned from Set. + // + // Del is thread-safe. + Del(token interface{}) + // Visit invokes the provided closure with each (context,cancel) pair currently + // present in the storage. Items for which the visitor returns true are removed + // from the storage. + // + // Visit is thread-safe, but it is illegal to invoke methods of the + // CancelStorage from within the visitor. + Visit(func(context.Context, func()) (remove bool)) +} + +type cancelToken struct { + ctx context.Context +} + +func (tok *cancelToken) fasthash() int { + // From https://github.com/taylorza/go-lfsr/blob/7ec2b93980f950da1e36c6682771e6fe14c144c2/lfsr.go#L46-L48. + s := int(uintptr(unsafe.Pointer(tok))) + b := (s >> 0) ^ (s >> 2) ^ (s >> 3) ^ (s >> 4) + return (s >> 1) | (b << 7) +} + +var cancelTokenPool = sync.Pool{ + New: func() interface{} { return &cancelToken{} }, +} + +type mapCancelShard struct { + syncutil.Mutex + m map[*cancelToken]func() +} + +// A MapCancelStorage implements CancelStorage via shards of mutex-protected +// maps. +type MapCancelStorage struct { + NumShards int + sl []*mapCancelShard +} + +// Reset implements CancelStorage. +func (m *MapCancelStorage) Reset() { + if m.NumShards == 0 { + m.NumShards = 1 + } + m.sl = make([]*mapCancelShard, m.NumShards) + for i := range m.sl { + s := &mapCancelShard{} + s.m = map[*cancelToken]func(){} + m.sl[i] = s + } +} + +// Set implements CancelStorage. +func (m *MapCancelStorage) Set(ctx context.Context, cancel func()) interface{} { + tok := cancelTokenPool.Get().(*cancelToken) + tok.ctx = ctx + shard := m.sl[tok.fasthash()%len(m.sl)] + shard.Lock() + shard.m[tok] = cancel + shard.Unlock() + return tok +} + +// Del implements CancelStorage. +func (m *MapCancelStorage) Del(tok interface{}) { + ttok := tok.(*cancelToken) + shard := m.sl[ttok.fasthash()%len(m.sl)] + shard.Lock() + delete(shard.m, tok.(*cancelToken)) + shard.Unlock() +} + +// Visit implements CancelStorage. +func (m *MapCancelStorage) Visit(fn func(context.Context, func()) (remove bool)) { + for _, shard := range m.sl { + shard.Lock() + for tok, cancel := range shard.m { + if fn(tok.ctx, cancel) { + delete(shard.m, tok) + } + } + shard.Unlock() + } +} diff --git a/pkg/kv/kvserver/replica_circuit_breaker_test.go b/pkg/kv/kvserver/replica_circuit_breaker_test.go index b056713c36a2..e76db6e507ce 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/replica_circuit_breaker_test.go @@ -11,15 +11,27 @@ package kvserver import ( + "context" + "fmt" + "math/rand" + "runtime" + "strconv" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" ) @@ -40,3 +52,151 @@ func TestReplicaUnavailableError(t *testing.T) { err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs) echotest.Require(t, string(redact.Sprint(err)), testutils.TestDataPath(t, "replica_unavailable_error.txt")) } + +type circuitBreakerReplicaMock struct { + clock *hlc.Clock +} + +func (c *circuitBreakerReplicaMock) Clock() *hlc.Clock { + return c.clock +} + +func (c *circuitBreakerReplicaMock) Desc() *roachpb.RangeDescriptor { + return &roachpb.RangeDescriptor{} +} + +func (c *circuitBreakerReplicaMock) Send( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, *roachpb.Error) { + return ba.CreateReply(), nil +} + +func (c *circuitBreakerReplicaMock) slowReplicationThreshold( + ba *roachpb.BatchRequest, +) (time.Duration, bool) { + return 0, false +} + +func (c *circuitBreakerReplicaMock) replicaUnavailableError() error { + return errors.New("unavailable") +} + +// This test verifies that when the breaker trips and untrips again, +// there is no scenario under which the request's cancel leaks. +func TestReplicaCircuitBreaker_NoCancelRace(t *testing.T) { + defer leaktest.AfterTest(t)() + br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer stopper.Stop(ctx) + + g := ctxgroup.WithContext(ctx) + const count = 100 + for i := 0; i < count; i++ { + i := i // for goroutine + g.GoCtx(func(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + tok, sig, err := br.Register(ctx, cancel) + if err != nil { + _ = err // ignoring intentionally + return nil + } + if i == count/2 { + br.TripAsync() // probe will succeed + } + runtime.Gosched() + time.Sleep(time.Duration(rand.Intn(int(time.Millisecond)))) + var pErr *roachpb.Error + if i%2 == 0 { + pErr = roachpb.NewErrorf("boom") + } + _ = br.UnregisterAndAdjustError(tok, sig, pErr) + return nil + }) + } + require.NoError(t, g.Wait()) + var n int + br.cancels.Visit(func(ctx context.Context, _ func()) (remove bool) { + n++ + return false // keep + }) + require.Zero(t, n, "found tracked requests") +} + +func TestReplicaCircuitBreaker_Register(t *testing.T) { + defer leaktest.AfterTest(t)() + br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") + defer stopper.Stop(context.Background()) + ctx := withCircuitBreakerProbeMarker(context.Background()) + tok, sig, err := br.Register(ctx, func() {}) + require.NoError(t, err) + defer br.UnregisterAndAdjustError(tok, sig, nil /* pErr */) + require.Zero(t, sig.C()) + var n int + br.cancels.Visit(func(ctx context.Context, f func()) (remove bool) { + n++ + return false // keep + }) + require.Zero(t, n, "probe context got added to CancelStorage") +} + +func setupCircuitBreakerTest(t testing.TB, cs string) (*replicaCircuitBreaker, *stop.Stopper) { + st := cluster.MakeTestingClusterSettings() + // Enable circuit breakers. + replicaCircuitBreakerSlowReplicationThreshold.Override(context.Background(), &st.SV, time.Hour) + r := &circuitBreakerReplicaMock{clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond)} + var numShards int + { + _, err := fmt.Sscanf(cs, "mutexmap-%d", &numShards) + require.NoError(t, err) + } + s := &MapCancelStorage{NumShards: numShards} + onTrip := func() {} + onReset := func() {} + stopper := stop.NewStopper() + br := newReplicaCircuitBreaker(st, stopper, log.AmbientContext{}, r, s, onTrip, onReset) + return br, stopper +} + +func BenchmarkReplicaCircuitBreaker_Register(b *testing.B) { + defer leaktest.AfterTest(b)() + + for _, enabled := range []bool{false, true} { + b.Run("enabled="+strconv.FormatBool(enabled), func(b *testing.B) { + dss := []string{ + "mutexmap-1", "mutexmap-2", "mutexmap-4", "mutexmap-8", "mutexmap-12", "mutexmap-16", + "mutexmap-20", "mutexmap-24", "mutexmap-32", "mutexmap-64", + } + if !enabled { + dss = dss[:1] + } + for _, ds := range dss { + b.Run(ds, func(b *testing.B) { + b.ReportAllocs() + br, stopper := setupCircuitBreakerTest(b, ds) + defer stopper.Stop(context.Background()) + + var dur time.Duration + if enabled { + dur = time.Hour + } + replicaCircuitBreakerSlowReplicationThreshold.Override(context.Background(), &br.st.SV, dur) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + ctx, cancel := context.WithCancel(context.Background()) + tok, sig, err := br.Register(ctx, cancel) + if err != nil { + b.Error(err) + } + if pErr := br.UnregisterAndAdjustError(tok, sig, nil); pErr != nil { + b.Error(pErr) + } + cancel() + } + }) + }) + } + }) + } +} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 44a810af6798..dd3bf9f591ed 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -137,8 +137,14 @@ func newUnloadedReplica( onReset := func() { store.Metrics().ReplicaCircuitBreakerCurTripped.Dec(1) } + var cancelStorage CancelStorage + if f := r.store.cfg.TestingKnobs.CancelStorageFactory; f != nil { + cancelStorage = f() + } else { + cancelStorage = &MapCancelStorage{} + } r.breaker = newReplicaCircuitBreaker( - store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset, + store.cfg.Settings, store.stopper, r.AmbientContext, r, cancelStorage, onTrip, onReset, ) return r } diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index d00d219db103..c09c251d2b5e 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -56,6 +56,8 @@ var optimisticEvalLimitedScans = settings.RegisterBoolSetting( // ▼ // Replica.Send // │ +// Circuit breaker +// │ // ▼ // Replica.maybeBackpressureBatch (if Range too large) // │ @@ -96,73 +98,6 @@ func (r *Replica) Send( return r.sendWithoutRangeID(ctx, &ba) } -// checkCircuitBreaker takes a cancelable context and its cancel function. The -// context is cancelled when the circuit breaker trips. If the breaker is -// already tripped , its error is returned immediately and the caller should not -// continue processing the request. Otherwise, the caller is provided with a -// signaller for use in a deferred call to maybeAdjustWithBreakerError, which -// will annotate the outgoing error in the event of the breaker tripping while -// the request is processing. -func (r *Replica) checkCircuitBreaker(ctx context.Context, cancel func()) (signaller, error) { - // NB: brSig will never trip if circuit breakers are not enabled. - brSig := r.breaker.Signal() - if isCircuitBreakerProbe(ctx) { - brSig = neverTripSignaller{} - } - - if err := brSig.Err(); err != nil { - // TODO(tbg): we may want to exclude some requests from this check, or allow - // requests to exclude themselves from the check (via their header). - cancel() - return nil, err - } - - // NB: this is a total crutch, see: - // https://github.com/cockroachdb/cockroach/issues/74707 - // It will do until breakers default to on: - // https://github.com/cockroachdb/cockroach/issues/74705 - if ch := brSig.C(); ch != nil { - _ = r.store.Stopper().RunAsyncTask(ctx, "watch", func(ctx context.Context) { - select { - case <-ctx.Done(): - return - case <-ch: - cancel() - } - }) - } - - return brSig, nil -} - -func maybeAdjustWithBreakerError(pErr *roachpb.Error, brErr error) *roachpb.Error { - if pErr == nil || brErr == nil { - return pErr - } - err := pErr.GoError() - if ae := (&roachpb.AmbiguousResultError{}); errors.As(err, &ae) { - // The breaker tripped while a command was inflight, so we have to - // propagate an ambiguous result. We don't want to replace it, but there - // is a way to stash an Error in it so we use that. - // - // TODO(tbg): could also wrap it; there is no other write to WrappedErr - // in the codebase and it might be better to remove it. Nested *Errors - // are not a good idea. - wrappedErr := brErr - if ae.WrappedErr != nil { - wrappedErr = errors.Wrapf(brErr, "%v", ae.WrappedErr) - } - ae.WrappedErr = roachpb.NewError(wrappedErr) - return roachpb.NewError(ae) - } else if le := (&roachpb.NotLeaseHolderError{}); errors.As(err, &le) { - // When a lease acquisition triggered by this request is short-circuited - // by the breaker, it will return an opaque NotLeaseholderError, which we - // replace with the breaker's error. - return roachpb.NewError(errors.CombineErrors(brErr, le)) - } - return pErr -} - // sendWithoutRangeID used to be called sendWithRangeID, accepted a `_forStacks // roachpb.RangeID` argument, and had the description below. Ever since Go // switched to the register-based calling convention though, this stopped @@ -204,12 +139,12 @@ func (r *Replica) sendWithoutRangeID( // Circuit breaker handling. ctx, cancel := context.WithCancel(ctx) - brSig, err := r.checkCircuitBreaker(ctx, cancel) + tok, brSig, err := r.breaker.Register(ctx, cancel) if err != nil { return nil, roachpb.NewError(err) } defer func() { - rErr = maybeAdjustWithBreakerError(rErr, brSig.Err()) + rErr = r.breaker.UnregisterAndAdjustError(tok, brSig, rErr) cancel() }() diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 060c1240273a..5d5c7b5e39f5 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -85,6 +85,10 @@ type StoreTestingKnobs struct { // per-Batch SlowReplicationThreshold. SlowReplicationThresholdOverride func(ba *roachpb.BatchRequest) time.Duration + // CancelStorageFactory overrides the default CancelStorage used by Replica + // circuit breakers. + CancelStorageFactory func() CancelStorage + // TestingRangefeedFilter is called before a replica processes a rangefeed // in order for unit tests to modify the request, error returned to the client // or data.