From d7e418cde9209a3560ca1f67ed37c7c642bca59c Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 21 Feb 2022 18:03:01 +0100 Subject: [PATCH] kvserver: allow circuit-breaker to serve reads Informs #74799. I hope it will ultimately close it, but not quite there yet. I left extensive `TODO(during review)` comments. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 3 +- ...ient_replica_circuit_breaker_bench_test.go | 124 -------- .../client_replica_circuit_breaker_test.go | 269 +++++++++++------- .../concurrency/concurrency_control.go | 16 +- .../concurrency/concurrency_manager.go | 10 +- .../concurrency/concurrency_manager_test.go | 19 +- .../concurrency/datadriven_util_test.go | 19 ++ pkg/kv/kvserver/concurrency/latch_manager.go | 14 +- .../kvserver/concurrency/lock_table_test.go | 6 +- .../concurrency_manager/poison_policy_err | 55 ++++ .../concurrency_manager/poison_policy_wait | 60 ++++ pkg/kv/kvserver/helpers_test.go | 7 - pkg/kv/kvserver/replica_circuit_breaker.go | 140 ++------- .../replica_circuit_breaker_cancelstorage.go | 115 -------- .../kvserver/replica_circuit_breaker_test.go | 160 ----------- pkg/kv/kvserver/replica_init.go | 8 +- pkg/kv/kvserver/replica_proposal.go | 10 + pkg/kv/kvserver/replica_raft.go | 16 ++ pkg/kv/kvserver/replica_range_lease.go | 26 +- pkg/kv/kvserver/replica_send.go | 57 +++- pkg/kv/kvserver/replica_write.go | 9 + pkg/kv/kvserver/spanlatch/manager.go | 86 +++++- pkg/kv/kvserver/spanlatch/manager_test.go | 22 +- pkg/kv/kvserver/spanlatch/signal.go | 4 +- pkg/kv/kvserver/testing_knobs.go | 4 - pkg/roachpb/api.go | 5 +- 26 files changed, 570 insertions(+), 694 deletions(-) delete mode 100644 pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait delete mode 100644 pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 0aca5a8ace03..e9be57b75af9 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -35,7 +35,6 @@ go_library( "replica_backpressure.go", "replica_batch_updates.go", "replica_circuit_breaker.go", - "replica_circuit_breaker_cancelstorage.go", "replica_closedts.go", "replica_command.go", "replica_consistency.go", @@ -137,6 +136,7 @@ go_library( "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", "//pkg/kv/kvserver/readsummary/rspb", + "//pkg/kv/kvserver/spanlatch", "//pkg/kv/kvserver/spanset", "//pkg/kv/kvserver/split", "//pkg/kv/kvserver/stateloader", @@ -224,7 +224,6 @@ go_test( "client_rangefeed_test.go", "client_relocate_range_test.go", "client_replica_backpressure_test.go", - "client_replica_circuit_breaker_bench_test.go", "client_replica_circuit_breaker_test.go", "client_replica_gc_test.go", "client_replica_test.go", diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go deleted file mode 100644 index 686a6a6c557e..000000000000 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvserver_test - -import ( - "context" - "fmt" - "math/rand" - "strconv" - "sync" - "testing" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/util/encoding" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/stretchr/testify/require" -) - -type replicaCircuitBreakerBench struct { - *testcluster.TestCluster - pool *sync.Pool // *BatchRequest -} - -func (tc *replicaCircuitBreakerBench) repl(b *testing.B) *kvserver.Replica { - return tc.GetFirstStoreFromServer(b, 0).LookupReplica(keys.MustAddr(tc.ScratchRange(b))) -} - -func setupCircuitBreakerReplicaBench( - b *testing.B, breakerEnabled bool, cs string, -) (*replicaCircuitBreakerBench, *stop.Stopper) { - b.Helper() - - var numShards int - { - _, err := fmt.Sscanf(cs, "mutexmap-%d", &numShards) - require.NoError(b, err) - } - sFn := func() kvserver.CancelStorage { return &kvserver.MapCancelStorage{NumShards: numShards} } - - var knobs kvserver.StoreTestingKnobs - knobs.CancelStorageFactory = sFn - - var args base.TestClusterArgs - args.ServerArgs.Knobs.Store = &knobs - tc := testcluster.StartTestCluster(b, 1, args) - - stmt := `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '1000s'` - if !breakerEnabled { - stmt = `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '0s'` - } - _, err := tc.ServerConn(0).Exec(stmt) - require.NoError(b, err) - wtc := &replicaCircuitBreakerBench{ - TestCluster: tc, - } - wtc.pool = &sync.Pool{ - New: func() interface{} { - repl := wtc.repl(b) - var ba roachpb.BatchRequest - ba.RangeID = repl.RangeID - ba.Timestamp = repl.Clock().NowAsClockTimestamp().ToTimestamp() - var k roachpb.Key - k = append(k, repl.Desc().StartKey.AsRawKey()...) - k = encoding.EncodeUint64Ascending(k, uint64(rand.Intn(1000))) - ba.Add(roachpb.NewGet(k, false)) - return &ba - }, - } - return wtc, tc.Stopper() -} - -func BenchmarkReplicaCircuitBreakerSendOverhead(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) - ctx := context.Background() - - for _, enabled := range []bool{false, true} { - b.Run("enabled="+strconv.FormatBool(enabled), func(b *testing.B) { - dss := []string{ - "mutexmap-1", "mutexmap-2", "mutexmap-4", "mutexmap-8", "mutexmap-12", "mutexmap-16", - "mutexmap-20", "mutexmap-24", "mutexmap-32", "mutexmap-64", - } - if !enabled { - dss = dss[:1] - } - - for _, ds := range dss { - b.Run(ds, func(b *testing.B) { - b.ReportAllocs() - tc, stopper := setupCircuitBreakerReplicaBench(b, enabled, ds) - defer stopper.Stop(ctx) - - repl := tc.repl(b) - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - ba := tc.pool.Get().(*roachpb.BatchRequest) - _, err := repl.Send(ctx, *ba) - tc.pool.Put(ba) - if err != nil { - b.Fatal(err) - } - } - }) - }) - } - }) - } -} diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index d7d171cd2be8..340adad34c74 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -87,7 +88,7 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { require.NoError(t, tc.Write(n1)) // Disable the probe so that when the breaker trips, it stays tripped. tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("boom")) + tc.Report(n1, errors.New("injected breaker error")) s1 := tc.GetFirstStoreFromServer(t, n1) s2 := tc.GetFirstStoreFromServer(t, n2) @@ -97,10 +98,9 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCurTripped.Value()) require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCumTripped.Count()) - // n1 could theoretically still serve reads (there is a valid lease - // and none of the latches are taken), but since it is hard to determine - // that upfront we currently fail all reads as well. - tc.RequireIsBreakerOpen(t, tc.Read(n1)) + // n1 can still serve reads despite the breaker having tripped, as there is a + // valid lease and no poisoned latches prevent the read. + require.NoError(t, tc.Read(n1)) tc.RequireIsBreakerOpen(t, tc.Write(n1)) // When we go through the KV client stack, we still get the breaker error @@ -116,15 +116,13 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { // Enable the probe. Even a read should trigger the probe // and within due time the breaker should heal. tc.SetProbeEnabled(n1, true) - tc.UntripsSoon(t, tc.Read, n1) - // Same behavior on writes. - tc.Report(n1, errors.New("boom again")) + require.NoError(t, tc.Read(n1)) // this always worked + // Writes heal soon. tc.UntripsSoon(t, tc.Write, n1) - // Currently tripped drops back to zero, all-time is two (since we tripped - // it twice) + // Currently tripped drops back to zero, all-time remains at one. require.EqualValues(t, 0, s1.Metrics().ReplicaCircuitBreakerCurTripped.Value()) - require.EqualValues(t, 2, s1.Metrics().ReplicaCircuitBreakerCumTripped.Count()) + require.EqualValues(t, 1, s1.Metrics().ReplicaCircuitBreakerCumTripped.Count()) // s2 wasn't affected by any breaker events. require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCurTripped.Value()) require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCumTripped.Count()) @@ -147,7 +145,7 @@ func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { require.NoError(t, tc.Write(n1)) // Disable the probe on n2 so that when the breaker trips, it stays tripped. tc.SetProbeEnabled(n2, false) - tc.Report(n2, errors.New("boom")) + tc.Report(n2, errors.New("injected breaker error")) // We didn't trip the leaseholder n1, so it is unaffected. require.NoError(t, tc.Read(n1)) @@ -158,32 +156,15 @@ func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { // time of writing it would propagate it. require.NoError(t, tc.WriteDS(n1)) - tc.RequireIsBreakerOpen(t, tc.Read(n2)) - tc.RequireIsBreakerOpen(t, tc.Write(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) + // TODO(tbg): add tc.FollowerRead(n2) here which should succeed (if that + // method does what it promises). // Enable the probe. Even a read should trigger the probe - // and within due time the breaker should heal, giving us - // NotLeaseholderErrors again. - // - // TODO(tbg): this test would be more meaningful with follower reads. They - // should succeed when the breaker is open and fail if the breaker is - // tripped. However knowing that the circuit breaker check sits at the top - // of Replica.sendWithRangeID, it's clear that it won't make a difference. tc.SetProbeEnabled(n2, true) - testutils.SucceedsSoon(t, func() error { - if err := tc.Read(n2); !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { - return err - } - return nil - }) - // Same behavior on writes. - tc.Report(n2, errors.New("boom again")) - testutils.SucceedsSoon(t, func() error { - if err := tc.Write(n2); !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { - return err - } - return nil - }) + tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) } // In this scenario, the breaker is tripped and the probe is disabled and @@ -202,8 +183,8 @@ func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) { // disabled. require.NoError(t, tc.Write(n1)) tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("boom")) - resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) + tc.Report(n1, errors.New("injected breaker error")) + resumeHeartbeats := tc.ExpireLeaseAndLivenessOnN1(t, pauseHeartbeats) // n2 (not n1) will return a NotLeaseholderError. This may be surprising - // why isn't it trying and succeeding to acquire a lease - but it does @@ -218,7 +199,13 @@ func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) { // Same behavior for write on n2. tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) } - // On n1, run into the circuit breaker when requesting lease. + // On n1, run into the circuit breaker when requesting lease. We have to + // resume heartbeats for this to not time out, as requesting the new lease + // entails doing liveness checks which can't succeed if nobody is + // heartbeating, and we'd get stuck in liveness before reaching the circuit + // breaker. (In other words, replica circuit breaking doesn't fail-fast + // requests reliably if liveness is unavailable; this is tracked in #74616. + resumeHeartbeats() { tc.RequireIsBreakerOpen(t, tc.Read(n1)) tc.RequireIsBreakerOpen(t, tc.Write(n1)) @@ -229,7 +216,6 @@ func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) { // we're not careful, the probe itself is held up by the breaker as well, or // the probe will try to acquire a lease (which we're currently careful to // avoid). - resumeHeartbeats() tc.SetProbeEnabled(n1, true) tc.UntripsSoon(t, tc.Read, n1) tc.UntripsSoon(t, tc.Write, n1) @@ -255,20 +241,30 @@ func TestReplicaCircuitBreaker_Leaseholder_QuorumLoss(t *testing.T) { // We didn't lose the liveness range (which is only on n1). require.NoError(t, tc.Server(n1).HeartbeatNodeLiveness()) + // Read still works, as we have valid lease and no poisoned latch + // underneath. + require.NoError(t, tc.Read(n1)) tc.SetSlowThreshold(10 * time.Millisecond) { err := tc.Write(n1) var ae *roachpb.AmbiguousResultError require.True(t, errors.As(err, &ae), "%+v", err) t.Log(err) + tc.RequireIsBreakerOpen(t, err) + } + // We still have a valid lease, but now the above write is holding a poisoned + // latch (this is true despite the write itself having returned already). + { + err := tc.Read(n1) + t.Log(err) + tc.RequireIsBreakerOpen(t, err) } - tc.RequireIsBreakerOpen(t, tc.Read(n1)) // Bring n2 back and service should be restored. tc.SetSlowThreshold(0) // reset require.NoError(t, tc.RestartServer(n2)) - tc.UntripsSoon(t, tc.Read, n1) - require.NoError(t, tc.Write(n1)) + tc.UntripsSoon(t, tc.Write, n1) // poisoned latch goes away + require.NoError(t, tc.Read(n1)) } // In this test, the range is on n1 and n2 and we place the lease on n2 and @@ -286,7 +282,7 @@ func TestReplicaCircuitBreaker_Follower_QuorumLoss(t *testing.T) { // Get lease to n2 so that we can lose it without taking down the system ranges. desc := tc.LookupRangeOrFatal(t, tc.ScratchRange(t)) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(n2)) - resumeHeartbeats := tc.ExpireAllLeases(t, keepHeartbeats) + resumeHeartbeats := tc.ExpireLeaseAndLivenessOnN1(t, keepHeartbeats) tc.StopServer(n2) // lose quorum and leaseholder resumeHeartbeats() @@ -299,8 +295,8 @@ func TestReplicaCircuitBreaker_Follower_QuorumLoss(t *testing.T) { // Bring n2 back and service should be restored. tc.SetSlowThreshold(0) // reset require.NoError(t, tc.RestartServer(n2)) - tc.UntripsSoon(t, tc.Read, n1) - require.NoError(t, tc.Write(n1)) + tc.UntripsSoon(t, tc.Write, n1) + require.NoError(t, tc.Read(n1)) } // This test is skipped but documents that the current circuit breakers cannot @@ -352,7 +348,7 @@ func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) { // Expire all leases. We also pause all heartbeats but that doesn't really // matter since the liveness range is unavailable anyway. - resume := tc.ExpireAllLeases(t, pauseHeartbeats) + resume := tc.ExpireLeaseAndLivenessOnN1(t, pauseHeartbeats) defer resume() // Since there isn't a lease, and the liveness range is down, the circuit @@ -408,22 +404,25 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { args := &roachpb.RangeFeedRequest{ Span: roachpb.Span{Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey()}, } - // This test shouldn't take in excess of 45s even under the worst of conditions. - ctx, cancel := context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration) + + ctx, cancel := context.WithCancel(ctx) defer cancel() stream1 := &dummyStream{ctx: ctx, recv: make(chan *roachpb.RangeFeedEvent)} require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) { err := tc.repls[0].RangeFeed(args, stream1).GoError() if ctx.Err() != nil { - return + return // main goroutine stopping } assert.NoError(t, err) // avoid Fatal on goroutine })) - readOneVal := func(t *testing.T, stream *dummyStream) { + readOneVal := func(t *testing.T, ctx context.Context, stream *dummyStream) { + t.Helper() for { var done bool select { + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatal("timed out") case <-ctx.Done(): t.Fatal(ctx.Err()) case ev := <-stream.recv: @@ -437,10 +436,10 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { } require.NoError(t, tc.Write(n1)) - readOneVal(t, stream1) + readOneVal(t, ctx, stream1) // NB: keep heartbeats because we're not trying to lose the liveness range. - undo := tc.ExpireAllLeases(t, keepHeartbeats) + undo := tc.ExpireLeaseAndLivenessOnN1(t, keepHeartbeats) undo() tc.SetSlowThreshold(10 * time.Millisecond) tc.StopServer(n2) @@ -452,7 +451,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) { err := tc.repls[0].RangeFeed(args, stream2).GoError() if ctx.Err() != nil { - return + return // main goroutine stopping } assert.NoError(t, err) // avoid Fatal on goroutine })) @@ -461,8 +460,8 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { require.NoError(t, tc.RestartServer(n2)) tc.UntripsSoon(t, tc.Write, n1) - readOneVal(t, stream1) - readOneVal(t, stream2) + readOneVal(t, ctx, stream1) + readOneVal(t, ctx, stream2) } func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { @@ -475,7 +474,7 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { // disabled, i.e. it will stay tripped. require.NoError(t, tc.Write(n1)) tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("boom")) + tc.Report(n1, errors.New("injected breaker error")) exemptRequests := []func() roachpb.Request{ func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} }, @@ -508,31 +507,33 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { for _, reqFn := range exemptRequests { req := reqFn() - t.Run(fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) { + tc.Run(t, fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) { require.NoError(t, tc.Send(n1, req)) }) } for _, reqFn := range exemptRequests { req := reqFn() - t.Run(fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) { - resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) + tc.Run(t, fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) { + resumeHeartbeats := tc.ExpireLeaseAndLivenessOnN1(t, pauseHeartbeats) resumeHeartbeats() // intentionally resume right now so that lease can be acquired + // NB: when looking into the traces here, we sometimes see - as expected - that + // when the request tries to acquire a lease, the breaker is still tripped and require.NoError(t, tc.Send(n1, req)) }) } - resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) - defer resumeHeartbeats() // can't acquire leases until test ends + resumeHeartbeats := tc.ExpireLeaseAndLivenessOnN1(t, pauseHeartbeats) for _, reqFn := range exemptRequests { req := reqFn() - if req.Method() == roachpb.Probe { - // Probe does not require the lease, and is the most-tested of the bunch - // already. We don't have to test it again here, which would require undue - // amounts of special-casing below. - continue - } - t.Run(fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) { + tc.Run(t, fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) { + if m := req.Method(); m == roachpb.Probe { + // Probe does not require the lease, and is the most-tested of the bunch + // already. We don't have to test it again here, which would require undue + // amounts of special-casing below. + skip.IgnoreLintf(t, "subtest does not apply to %s", m) + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond) defer cancel() const maxWait = 5 * time.Second @@ -546,6 +547,46 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { require.Less(t, timeutil.Since(tBegin), maxWait) }) } + + // Restore the breaker via the probe. + resumeHeartbeats() + tc.SetProbeEnabled(n1, true) + tc.UntripsSoon(t, tc.Write, n1) + + // Lose quorum (liveness stays intact). + tc.SetSlowThreshold(10 * time.Millisecond) + tc.StopServer(n2) + // Let the breaker trip. This leaves a poisoned latch behind that at least some of + // the requests will interact with. + tc.RequireIsBreakerOpen(t, tc.Write(n1)) + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + + for _, reqFn := range exemptRequests { + req := reqFn() + tc.Run(t, fmt.Sprintf("with-poisoned-latch/%s", req.Method()), func(t *testing.T) { + if m := req.Method(); m == roachpb.GC { + // GC without GCKeys acquires no latches and is a pure read. If we want + // to put a key in there, we need to pick the right timestamp (since you + // can't GC a live key); it's all rather annoying and not worth it. In + // the long run, we also completely want to avoid acquiring latches for + // this request (since it should only mutate keyspace that has since + // fallen under the GCThreshold), so avoid cooking up anything special + // here. + skip.IgnoreLint(t, "subtest does not apply to %s", m) + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond) + defer cancel() + const maxWait = 5 * time.Second + tBegin := timeutil.Now() + err := tc.SendCtx(ctx, n1, req) + t.Log(err) + require.Error(t, err) + require.Error(t, ctx.Err()) + // Make sure we didn't run into the "long" timeout inside of SendCtx but + // actually terminated as a result of our ctx cancelling. + require.Less(t, timeutil.Since(tBegin), maxWait) + }) + } } // Test infrastructure below. @@ -577,6 +618,10 @@ type replWithKnob struct { } type circuitBreakerTest struct { + t interface { + Helper() + Logf(string, ...interface{}) + } *testcluster.TestCluster slowThresh *atomic.Value // time.Duration ManualClock *hlc.HybridManualClock @@ -663,6 +708,7 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { repls = append(repls, replWithKnob{repl, enableProbe}) } return &circuitBreakerTest{ + t: t, TestCluster: tc, ManualClock: manualClock, repls: repls, @@ -670,6 +716,21 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { } } +// Run is a wrapper around t.Run that allows the test harness to print traces +// using the subtest's *testing.T. +func (cbt *circuitBreakerTest) Run(t *testing.T, name string, f func(t *testing.T)) { + t.Helper() + t.Run(name, func(t *testing.T) { + t.Helper() + outerT := cbt.t + cbt.t = t + defer func() { + cbt.t = outerT + }() + f(t) + }) +} + func (cbt *circuitBreakerTest) SetProbeEnabled(idx int, to bool) { cbt.repls[idx].setProbeEnabled(to) } @@ -692,23 +753,35 @@ func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) er }) } -func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats bool) (undo func()) { +func (cbt *circuitBreakerTest) ExpireLeaseAndLivenessOnN1( + t *testing.T, pauseHeartbeats bool, +) (undo func()) { t.Helper() var maxWT int64 var fs []func() - for _, srv := range cbt.Servers { - lv := srv.NodeLiveness().(*liveness.NodeLiveness) - if pauseHeartbeats { - undo := lv.PauseAllHeartbeatsForTest() - fs = append(fs, undo) - } - self, ok := lv.Self() - require.True(t, ok) - if maxWT < self.Expiration.WallTime { - maxWT = self.Expiration.WallTime - } + srv := cbt.Servers[0] + lv := srv.NodeLiveness().(*liveness.NodeLiveness) + if pauseHeartbeats { + undo := lv.PauseAllHeartbeatsForTest() + fs = append(fs, undo) + } + self, ok := lv.Self() + require.True(t, ok) + if maxWT < self.Expiration.WallTime { + maxWT = self.Expiration.WallTime } cbt.ManualClock.Forward(maxWT + 1) + testutils.SucceedsSoon(t, func() error { + // Invalidate n1's liveness record, to make sure that ranges on n1 need + // to acquire a new lease (vs waiting for a heartbeat to the liveness + // record resuscitating the old one). + // + // Needing to do this is the reason for special-casing this entire method + // around n1; if we stop heartbeats for both nodes, they can't increment + // each others liveness records. + nl2 := cbt.Servers[n2].NodeLiveness().(*liveness.NodeLiveness) + return nl2.IncrementEpoch(context.Background(), self) + }) return func() { for _, f := range fs { f() @@ -717,11 +790,22 @@ func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats boo } func (cbt *circuitBreakerTest) Send(idx int, req roachpb.Request) error { + cbt.t.Helper() return cbt.SendCtx(context.Background(), idx, req) - } func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb.Request) error { + cbt.t.Helper() + ctx, finishAndGet := tracing.ContextWithRecordingSpan(ctx, cbt.repls[idx].Tracer, "SendCtx("+req.Method().String()+")") + defer time.AfterFunc(10*time.Second, func() { + rec := tracing.SpanFromContext(ctx).GetConfiguredRecording() + cbt.t.Logf("slow request: %s", rec) + }).Stop() + defer func() { + cbt.t.Helper() + rec := finishAndGet() + cbt.t.Logf("%s", rec) + }() var ba roachpb.BatchRequest repl := cbt.repls[idx] ba.RangeID = repl.Desc().RangeID @@ -749,29 +833,20 @@ func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb if err := ctx.Err(); err != nil && parCtx.Err() == nil { pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr) } - { - var err error - repl.VisitBreakerContexts(func(ctx context.Context) { - if err == nil && ctx.Value(req) != nil { - err = errors.Errorf( - "request %s returned but context still tracked in breaker", req, - ) - } - }) - if err != nil { - pErr = roachpb.NewErrorf("%s; after %v", err, pErr) - } - } return pErr.GoError() } func (cbt *circuitBreakerTest) WriteDS(idx int) error { + cbt.t.Helper() put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put) } -func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb.Request) error { +func (cbt *circuitBreakerTest) sendViaDistSender( + ds *kvcoord.DistSender, req roachpb.Request, +) error { + cbt.t.Helper() var ba roachpb.BatchRequest ba.Add(req) ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) @@ -788,6 +863,7 @@ func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) { t.Helper() + t.Log(err) // We also accept an ambiguous result wrapping a breaker error; this occurs // when the breaker trips while a write is already inflight. if aErr := (*roachpb.AmbiguousResultError)(nil); errors.As(err, &aErr) && aErr.WrappedErr != nil { @@ -798,6 +874,7 @@ func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) { func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) { t.Helper() + t.Log(err) ok := errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) require.True(t, ok, "%+v", err) } @@ -810,12 +887,14 @@ func (cbt *circuitBreakerTest) SetSlowThreshold(dur time.Duration) { } func (cbt *circuitBreakerTest) Write(idx int) error { + cbt.t.Helper() repl := cbt.repls[idx] put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) return cbt.Send(idx, put) } func (cbt *circuitBreakerTest) Read(idx int) error { + cbt.t.Helper() repl := cbt.repls[idx] get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) return cbt.Send(idx, get) diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index 883a83e6037c..b9adcf252191 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -189,6 +190,10 @@ type RequestSequencer interface { // does so, it will not return a request guard. SequenceReq(context.Context, *Guard, Request, RequestEvalKind) (*Guard, Response, *Error) + // PoisonReq marks a request's latches as poisoned. This allows requests + // waiting for the latches to fail fast. + PoisonReq(*Guard) + // FinishReq marks the request as complete, releasing any protection // the request had against conflicting requests and allowing conflicting // requests that are blocked on this one to proceed. The guard should not @@ -385,6 +390,10 @@ type Request struct { // with a WriteIntentError instead of entering the queue and waiting. MaxLockWaitQueueLength int + // PoisonPolicy . + // TODO. + PoisonPolicy spanlatch.PoisonPolicy + // The individual requests in the batch. Requests []roachpb.RequestUnion @@ -464,9 +473,12 @@ type latchManager interface { // WaitFor waits for conflicting latches on the specified spans without adding // any latches itself. Fast path for operations that only require flushing out // old operations without blocking any new ones. - WaitFor(ctx context.Context, spans *spanset.SpanSet) *Error + WaitFor(ctx context.Context, spans *spanset.SpanSet, pp spanlatch.PoisonPolicy) *Error + + // Poison a guard's latches, allowing waiters to fail fast. + Poison(latchGuard) - // Releases latches, relinquish its protection from conflicting requests. + // Release a guard's latches, relinquish its protection from conflicting requests. Release(latchGuard) // Metrics returns information about the state of the latchManager. diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index c3ee4f1cd150..6815dda4d52a 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -230,7 +230,7 @@ func (m *managerImpl) sequenceReqWithGuard(ctx context.Context, g *Guard) (Respo // them. if shouldWaitOnLatchesWithoutAcquiring(g.Req) { log.Event(ctx, "waiting on latches without acquiring") - return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans) + return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans, g.Req.PoisonPolicy) } // Provide the manager with an opportunity to intercept the request. It @@ -382,6 +382,14 @@ func shouldWaitOnLatchesWithoutAcquiring(req Request) bool { return req.isSingle(roachpb.Barrier) } +// PoisonReq implements the RequestSequencer interface. +func (m *managerImpl) PoisonReq(g *Guard) { + // TODO(during review): write a comment here on the case g.lg == nil. + if g.lg != nil { + m.lm.Poison(g.lg) + } +} + // FinishReq implements the RequestSequencer interface. func (m *managerImpl) FinishReq(g *Guard) { // NOTE: we release latches _before_ exiting lock wait-queues deliberately. diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index e533b5174872..a10bc1786dab 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -54,9 +54,10 @@ import ( // The input files use the following DSL: // // new-txn name= ts=[,] epoch= [uncertainty-limit=[,]] -// new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] [lock-timeout] [max-lock-wait-queue-length=] +// new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] [lock-timeout] [max-lock-wait-queue-length=] [poison-policy=[err|wait]] // [=...] (hint: see scanSingleRequest) // sequence req= [eval-kind= // finish req= // // handle-write-intent-error req= txn= key= lease-seq= @@ -169,6 +170,8 @@ func TestConcurrencyManagerBasic(t *testing.T) { d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) } + pp := scanPoisonPolicy(t, d) + // Each roachpb.Request is provided on an indented line. reqs, reqUnions := scanRequests(t, d, c) latchSpans, lockSpans := c.collectSpans(t, txn, ts, reqs) @@ -184,6 +187,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { Requests: reqUnions, LatchSpans: latchSpans, LockSpans: lockSpans, + PoisonPolicy: pp, } return "" @@ -256,7 +260,20 @@ func TestConcurrencyManagerBasic(t *testing.T) { c.mu.Unlock() }) return c.waitAndCollect(t, mon) + case "poison": + var reqName string + d.ScanArgs(t, "req", &reqName) + guard, ok := c.guardsByReqName[reqName] + if !ok { + d.Fatalf(t, "unknown request: %s", reqName) + } + opName := fmt.Sprintf("poison %s", reqName) + mon.runSync(opName, func(ctx context.Context) { + log.Event(ctx, "poisoning request") + m.PoisonReq(guard) + }) + return c.waitAndCollect(t, mon) case "handle-write-intent-error": var reqName string d.ScanArgs(t, "req", &reqName) diff --git a/pkg/kv/kvserver/concurrency/datadriven_util_test.go b/pkg/kv/kvserver/concurrency/datadriven_util_test.go index e9e8f9483a33..7062995268eb 100644 --- a/pkg/kv/kvserver/concurrency/datadriven_util_test.go +++ b/pkg/kv/kvserver/concurrency/datadriven_util_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -75,6 +76,24 @@ func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.Wa } } +func scanPoisonPolicy(t *testing.T, d *datadriven.TestData) spanlatch.PoisonPolicy { + const key = "poison-policy" + if !d.HasArg(key) { + return spanlatch.PoisonPolicyError + } + var policy string + d.ScanArgs(t, key, &policy) + switch policy { + case "error": + return spanlatch.PoisonPolicyError + case "wait": + return spanlatch.PoisonPolicyWait + default: + d.Fatalf(t, "unknown poison policy: %s", policy) + return 0 + } +} + func scanSingleRequest( t *testing.T, d *datadriven.TestData, line string, txns map[string]*roachpb.Transaction, ) roachpb.Request { diff --git a/pkg/kv/kvserver/concurrency/latch_manager.go b/pkg/kv/kvserver/concurrency/latch_manager.go index b0a4b8eb1073..5b8e51d53988 100644 --- a/pkg/kv/kvserver/concurrency/latch_manager.go +++ b/pkg/kv/kvserver/concurrency/latch_manager.go @@ -24,7 +24,7 @@ type latchManagerImpl struct { } func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard, *Error) { - lg, err := m.m.Acquire(ctx, req.LatchSpans) + lg, err := m.m.Acquire(ctx, req.LatchSpans, req.PoisonPolicy) if err != nil { return nil, roachpb.NewError(err) } @@ -32,7 +32,7 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard } func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard { - lg := m.m.AcquireOptimistic(req.LatchSpans) + lg := m.m.AcquireOptimistic(req.LatchSpans, req.PoisonPolicy) return lg } @@ -50,14 +50,20 @@ func (m *latchManagerImpl) WaitUntilAcquired( return lg, nil } -func (m *latchManagerImpl) WaitFor(ctx context.Context, ss *spanset.SpanSet) *Error { - err := m.m.WaitFor(ctx, ss) +func (m *latchManagerImpl) WaitFor( + ctx context.Context, ss *spanset.SpanSet, pp spanlatch.PoisonPolicy, +) *Error { + err := m.m.WaitFor(ctx, ss, pp) if err != nil { return roachpb.NewError(err) } return nil } +func (m *latchManagerImpl) Poison(lg latchGuard) { + m.m.Poison(lg.(*spanlatch.Guard)) +} + func (m *latchManagerImpl) Release(lg latchGuard) { m.m.Release(lg.(*spanlatch.Guard)) } diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index c78ec55fb09d..91e72d8c24d7 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -870,7 +870,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { // cancellation, the code makes sure to release latches when returning // early due to error. Otherwise other requests will get stuck and // group.Wait() will not return until the test times out. - lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans) + lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans, spanlatch.PoisonPolicyError) if err != nil { return err } @@ -1414,7 +1414,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { var err error firstIter := true for { - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, spanlatch.PoisonPolicyError); err != nil { doneCh <- err return } @@ -1449,7 +1449,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { return } // Release locks. - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, spanlatch.PoisonPolicyError); err != nil { doneCh <- err return } diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err new file mode 100644 index 000000000000..83f8e5c928d2 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err @@ -0,0 +1,55 @@ +# This test sets up the following situation: +# +# e <- put (waiting) +# b---f <- scan (waiting) +# c <- put (sequenced, poisoned) +# +# Since everyone uses PoisonPolicyErr, the +# chain unwinds, with all requests failing +# fast. + +new-request txn=none name=putc ts=10,0 + put key=c value=hi +---- + +sequence req=putc +---- +[1] sequence putc: sequencing request +[1] sequence putc: acquiring latches +[1] sequence putc: scanning lock table for conflicting locks +[1] sequence putc: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=pute ts=11,0 + put key=e value=hi +---- + +sequence req=pute +---- +[3] sequence pute: sequencing request +[3] sequence pute: acquiring latches +[3] sequence pute: waiting to acquire write latch e@11.000000000,0, held by read latch {b-f}@11.000000000,1 +[3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=putc +---- +[-] poison putc: poisoning request +[2] sequence readbf: sequencing complete, returned error: encountered poisoned latch +[3] sequence pute: sequencing complete, returned error: encountered poisoned latch + +finish req=putc +---- +[-] finish putc: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait new file mode 100644 index 000000000000..f9067a2620b0 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait @@ -0,0 +1,60 @@ +# This test sets up the following situation: +# +# e <- put (waiting) +# b---f <- scan (waiting) +# c <- put (sequenced, poisoned) +# +# The top and bottom request use PoisonPolicyWait, +# so only the scan returns. + +new-request txn=none name=putc ts=10,0 poison-policy=wait + put key=c value=hi +---- + +sequence req=putc +---- +[1] sequence putc: sequencing request +[1] sequence putc: acquiring latches +[1] sequence putc: scanning lock table for conflicting locks +[1] sequence putc: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=pute ts=11,0 poison-policy=wait + put key=e value=hi +---- + +sequence req=pute +---- +[3] sequence pute: sequencing request +[3] sequence pute: acquiring latches +[3] sequence pute: waiting to acquire write latch e@11.000000000,0, held by read latch {b-f}@11.000000000,1 +[3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=putc +---- +[-] poison putc: poisoning request +[2] sequence readbf: sequencing complete, returned error: encountered poisoned latch +[3] sequence pute: encountered poisoned latch; continuing to wait +[3] sequence pute: scanning lock table for conflicting locks +[3] sequence pute: sequencing complete, returned guard + +finish req=putc +---- +[-] finish putc: finishing request + +finish req=pute +---- +[-] finish pute: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index a6e92183026d..773cf9a0e65d 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -227,13 +227,6 @@ func (r *Replica) Breaker() *circuit2.Breaker { return r.breaker.wrapped } -func (r *Replica) VisitBreakerContexts(fn func(ctx context.Context)) { - r.breaker.cancels.Visit(func(ctx context.Context, _ func()) (remove bool) { - fn(ctx) - return false // keep - }) -} - func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) { r.raftMu.Lock() defer r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_circuit_breaker.go b/pkg/kv/kvserver/replica_circuit_breaker.go index e64ddb75f3a9..2b3d9de04d4b 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker.go +++ b/pkg/kv/kvserver/replica_circuit_breaker.go @@ -38,6 +38,7 @@ type replicaInCircuitBreaker interface { Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) slowReplicationThreshold(ba *roachpb.BatchRequest) (time.Duration, bool) replicaUnavailableError() error + poisonInflightLatches(err error) } var defaultReplicaCircuitBreakerSlowReplicationThreshold = envutil.EnvOrDefaultDuration( @@ -74,127 +75,15 @@ type replicaCircuitBreaker struct { stopper *stop.Stopper r replicaInCircuitBreaker st *cluster.Settings - cancels CancelStorage wrapped *circuit.Breaker versionIsActive int32 // atomic } -// Register takes a cancelable context and its cancel function (which the caller -// must cancel when the request has finished), and registers them with the -// circuit breaker. If the breaker is already tripped, its error is returned -// immediately and the caller should not continue processing the request. -// Otherwise, the cancel function is invoked if the breaker trips. The caller is -// provided with a token and signaller for use in a call to -// UnregisterAndAdjustError upon request completion. That method also takes the -// error (if any) resulting from the request to ensure that in the case of a -// tripped breaker, the error reflects this fact. -func (br *replicaCircuitBreaker) Register( - ctx context.Context, cancel func(), -) (_token interface{}, _ signaller, _ error) { - brSig := br.Signal() - - // TODO(tbg): we may want to exclude more requests from this check, or allow - // requests to exclude themselves from the check (via their header). This - // latter mechanism could also replace hasBypassCircuitBreakerMarker. - if hasBypassCircuitBreakerMarker(ctx) { - // NB: brSig.C() == nil. - brSig = neverTripSignaller{} - } - - if brSig.C() == nil { - // Circuit breakers are disabled and/or this is a probe request, so don't do - // any work registering the context. UnregisterAndAdjustError will know that we didn't - // since it checks the same brSig for a nil C(). - return ctx, brSig, nil - } - - // NB: it might be tempting to check the breaker error first to avoid the call - // to Set below if the breaker is tripped at this point. However, the ordering - // here, subtly, is required to avoid situations in which the cancel is still - // in the map despite the probe having shut down (in which case cancel will - // not be invoked until the probe is next triggered, which maybe "never"). - // - // To see this, consider the case in which the breaker is initially not - // tripped when we check, but then trips immediately and has the probe fail - // (and terminate). Since the probe is in charge of cancelling all tracked - // requests, we must ensure that this probe sees our request. Adding the - // request prior to calling Signal() means that if we see an untripped - // breaker, no probe is running - consequently should the breaker then trip, - // it will observe our cancel, thus avoiding a leak. If we observe a tripped - // breaker, we also need to remove our own cancel, as the probe may already - // have passed the point at which it iterates through the cancels prior to us - // inserting it. The cancel may be invoked twice, but that's ok. - // - // See TestReplicaCircuitBreaker_NoCancelRace. - tok := br.cancels.Set(ctx, cancel) - if err := brSig.Err(); err != nil { - br.cancels.Del(tok) - cancel() - return nil, nil, err - } - - return tok, brSig, nil -} - -// UnregisterAndAdjustError releases a tracked cancel function upon request -// completion. The error resulting from the request is passed in to allow -// decorating it in case the breaker tripped while the request was in-flight. -// -// See Register. -func (br *replicaCircuitBreaker) UnregisterAndAdjustError( - tok interface{}, sig signaller, pErr *roachpb.Error, -) *roachpb.Error { - if sig.C() == nil { - // Breakers were disabled and we never put the cancel in the registry. - return pErr - } - - br.cancels.Del(tok) - - brErr := sig.Err() - if pErr == nil || brErr == nil { - return pErr - } - - // The breaker tripped and the command is returning an error. Make sure the - // error reflects the tripped breaker. - - err := pErr.GoError() - if ae := (&roachpb.AmbiguousResultError{}); errors.As(err, &ae) { - // The breaker tripped while a command was inflight, so we have to - // propagate an ambiguous result. We don't want to replace it, but there - // is a way to stash an Error in it so we use that. - // - // TODO(tbg): could also wrap it; there is no other write to WrappedErr - // in the codebase and it might be better to remove it. Nested *Errors - // are not a good idea. - wrappedErr := brErr - if ae.WrappedErr != nil { - wrappedErr = errors.Wrapf(brErr, "%v", ae.WrappedErr) - } - ae.WrappedErr = roachpb.NewError(wrappedErr) - return roachpb.NewError(ae) - } else if le := (&roachpb.NotLeaseHolderError{}); errors.As(err, &le) { - // When a lease acquisition triggered by this request is short-circuited - // by the breaker, it will return an opaque NotLeaseholderError, which we - // replace with the breaker's error. - return roachpb.NewError(errors.CombineErrors(brErr, le)) - } - return pErr -} - func (br *replicaCircuitBreaker) HasMark(err error) bool { return br.wrapped.HasMark(err) } -func (br *replicaCircuitBreaker) cancelAllTrackedContexts() { - br.cancels.Visit(func(ctx context.Context, cancel func()) (remove bool) { - cancel() - return true // remove - }) -} - func (br *replicaCircuitBreaker) canEnable() bool { b := atomic.LoadInt32(&br.versionIsActive) == 1 if b { @@ -252,7 +141,6 @@ func newReplicaCircuitBreaker( stopper *stop.Stopper, ambientCtx log.AmbientContext, r replicaInCircuitBreaker, - s CancelStorage, onTrip func(), onReset func(), ) *replicaCircuitBreaker { @@ -262,8 +150,6 @@ func newReplicaCircuitBreaker( r: r, st: cs, } - br.cancels = s - br.cancels.Reset() br.wrapped = circuit.NewBreaker(circuit.Options{ Name: "breaker", // log bridge has ctx tags AsyncProbe: br.asyncProbe, @@ -319,13 +205,23 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { return } - // First, tell all current requests to fail fast. Note that clients insert - // first, then check the breaker (and remove themselves if breaker already - // tripped then). This prevents any cancels from sneaking in after the probe - // gets past this point, which could otherwise leave cancels hanging until - // "something" triggers the next probe (which may be never if no more traffic - // arrives at the Replica). See Register. - br.cancelAllTrackedContexts() + brErr := br.Signal().Err() + if brErr == nil { + // This shouldn't happen, but if we're not even tripped, don't do + // anything. + return + } + + // Poison any inflight latches. + // + // TODO(during review): discuss what happens if a request sneaks in between + // the next line and sendProbe. I think the answer is just that we will + // start a new probe with each call to refreshProposalsLocked that + // reproposes entries, so there is something that will catch any stragglers. + // We need to discuss if that mechanism is OK since it means we'll roughly + // have a probe per tripped Replica in-flight most of the time, so in the + // worst case as many probes as Replicas. + br.r.poisonInflightLatches(brErr) err := sendProbe(ctx, br.r) report(err) }); err != nil { diff --git a/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go b/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go deleted file mode 100644 index c8f46bcffa16..000000000000 --- a/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvserver - -import ( - "context" - "sync" - "unsafe" - - "github.com/cockroachdb/cockroach/pkg/util/syncutil" -) - -// CancelStorage implements tracking of context cancellation functions -// for use by Replica circuit breakers. -type CancelStorage interface { - // Reset initializes the storage. Not thread safe. - Reset() - // Set adds context and associated cancel func to the storage. Returns a token - // that can be passed to Del. - // - // Set is thread-safe. - Set(_ context.Context, cancel func()) (token interface{}) - // Del removes a cancel func, as identified by the token returned from Set. - // - // Del is thread-safe. - Del(token interface{}) - // Visit invokes the provided closure with each (context,cancel) pair currently - // present in the storage. Items for which the visitor returns true are removed - // from the storage. - // - // Visit is thread-safe, but it is illegal to invoke methods of the - // CancelStorage from within the visitor. - Visit(func(context.Context, func()) (remove bool)) -} - -type cancelToken struct { - ctx context.Context -} - -func (tok *cancelToken) fasthash() int { - // From https://github.com/taylorza/go-lfsr/blob/7ec2b93980f950da1e36c6682771e6fe14c144c2/lfsr.go#L46-L48. - s := int(uintptr(unsafe.Pointer(tok))) - b := (s >> 0) ^ (s >> 2) ^ (s >> 3) ^ (s >> 4) - return (s >> 1) | (b << 7) -} - -var cancelTokenPool = sync.Pool{ - New: func() interface{} { return &cancelToken{} }, -} - -type mapCancelShard struct { - syncutil.Mutex - m map[*cancelToken]func() -} - -// A MapCancelStorage implements CancelStorage via shards of mutex-protected -// maps. -type MapCancelStorage struct { - NumShards int - sl []*mapCancelShard -} - -// Reset implements CancelStorage. -func (m *MapCancelStorage) Reset() { - if m.NumShards == 0 { - m.NumShards = 1 - } - m.sl = make([]*mapCancelShard, m.NumShards) - for i := range m.sl { - s := &mapCancelShard{} - s.m = map[*cancelToken]func(){} - m.sl[i] = s - } -} - -// Set implements CancelStorage. -func (m *MapCancelStorage) Set(ctx context.Context, cancel func()) interface{} { - tok := cancelTokenPool.Get().(*cancelToken) - tok.ctx = ctx - shard := m.sl[tok.fasthash()%len(m.sl)] - shard.Lock() - shard.m[tok] = cancel - shard.Unlock() - return tok -} - -// Del implements CancelStorage. -func (m *MapCancelStorage) Del(tok interface{}) { - ttok := tok.(*cancelToken) - shard := m.sl[ttok.fasthash()%len(m.sl)] - shard.Lock() - delete(shard.m, tok.(*cancelToken)) - shard.Unlock() -} - -// Visit implements CancelStorage. -func (m *MapCancelStorage) Visit(fn func(context.Context, func()) (remove bool)) { - for _, shard := range m.sl { - shard.Lock() - for tok, cancel := range shard.m { - if fn(tok.ctx, cancel) { - delete(shard.m, tok) - } - } - shard.Unlock() - } -} diff --git a/pkg/kv/kvserver/replica_circuit_breaker_test.go b/pkg/kv/kvserver/replica_circuit_breaker_test.go index c7b6029b7d70..b056713c36a2 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/replica_circuit_breaker_test.go @@ -11,27 +11,15 @@ package kvserver import ( - "context" - "fmt" - "math/rand" - "runtime" - "strconv" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" - "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" ) @@ -52,151 +40,3 @@ func TestReplicaUnavailableError(t *testing.T) { err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs) echotest.Require(t, string(redact.Sprint(err)), testutils.TestDataPath(t, "replica_unavailable_error.txt")) } - -type circuitBreakerReplicaMock struct { - clock *hlc.Clock -} - -func (c *circuitBreakerReplicaMock) Clock() *hlc.Clock { - return c.clock -} - -func (c *circuitBreakerReplicaMock) Desc() *roachpb.RangeDescriptor { - return &roachpb.RangeDescriptor{} -} - -func (c *circuitBreakerReplicaMock) Send( - ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - return ba.CreateReply(), nil -} - -func (c *circuitBreakerReplicaMock) slowReplicationThreshold( - ba *roachpb.BatchRequest, -) (time.Duration, bool) { - return 0, false -} - -func (c *circuitBreakerReplicaMock) replicaUnavailableError() error { - return errors.New("unavailable") -} - -// This test verifies that when the breaker trips and untrips again, -// there is no scenario under which the request's cancel leaks. -func TestReplicaCircuitBreaker_NoCancelRace(t *testing.T) { - defer leaktest.AfterTest(t)() - br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - defer stopper.Stop(ctx) - - g := ctxgroup.WithContext(ctx) - const count = 100 - for i := 0; i < count; i++ { - i := i // for goroutine - g.GoCtx(func(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - tok, sig, err := br.Register(ctx, cancel) - if err != nil { - _ = err // ignoring intentionally - return nil - } - if i == count/2 { - br.TripAsync() // probe will succeed - } - runtime.Gosched() - time.Sleep(time.Duration(rand.Intn(int(time.Millisecond)))) - var pErr *roachpb.Error - if i%2 == 0 { - pErr = roachpb.NewErrorf("boom") - } - _ = br.UnregisterAndAdjustError(tok, sig, pErr) - return nil - }) - } - require.NoError(t, g.Wait()) - var n int - br.cancels.Visit(func(ctx context.Context, _ func()) (remove bool) { - n++ - return false // keep - }) - require.Zero(t, n, "found tracked requests") -} - -func TestReplicaCircuitBreaker_Register(t *testing.T) { - defer leaktest.AfterTest(t)() - br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") - defer stopper.Stop(context.Background()) - ctx := withBypassCircuitBreakerMarker(context.Background()) - tok, sig, err := br.Register(ctx, func() {}) - require.NoError(t, err) - defer br.UnregisterAndAdjustError(tok, sig, nil /* pErr */) - require.Zero(t, sig.C()) - var n int - br.cancels.Visit(func(ctx context.Context, f func()) (remove bool) { - n++ - return false // keep - }) - require.Zero(t, n, "probe context got added to CancelStorage") -} - -func setupCircuitBreakerTest(t testing.TB, cs string) (*replicaCircuitBreaker, *stop.Stopper) { - st := cluster.MakeTestingClusterSettings() - // Enable circuit breakers. - replicaCircuitBreakerSlowReplicationThreshold.Override(context.Background(), &st.SV, time.Hour) - r := &circuitBreakerReplicaMock{clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond)} - var numShards int - { - _, err := fmt.Sscanf(cs, "mutexmap-%d", &numShards) - require.NoError(t, err) - } - s := &MapCancelStorage{NumShards: numShards} - onTrip := func() {} - onReset := func() {} - stopper := stop.NewStopper() - br := newReplicaCircuitBreaker(st, stopper, log.AmbientContext{}, r, s, onTrip, onReset) - return br, stopper -} - -func BenchmarkReplicaCircuitBreaker_Register(b *testing.B) { - defer leaktest.AfterTest(b)() - - for _, enabled := range []bool{false, true} { - b.Run("enabled="+strconv.FormatBool(enabled), func(b *testing.B) { - dss := []string{ - "mutexmap-1", "mutexmap-2", "mutexmap-4", "mutexmap-8", "mutexmap-12", "mutexmap-16", - "mutexmap-20", "mutexmap-24", "mutexmap-32", "mutexmap-64", - } - if !enabled { - dss = dss[:1] - } - for _, ds := range dss { - b.Run(ds, func(b *testing.B) { - b.ReportAllocs() - br, stopper := setupCircuitBreakerTest(b, ds) - defer stopper.Stop(context.Background()) - - var dur time.Duration - if enabled { - dur = time.Hour - } - replicaCircuitBreakerSlowReplicationThreshold.Override(context.Background(), &br.st.SV, dur) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - ctx, cancel := context.WithCancel(context.Background()) - tok, sig, err := br.Register(ctx, cancel) - if err != nil { - b.Error(err) - } - if pErr := br.UnregisterAndAdjustError(tok, sig, nil); pErr != nil { - b.Error(pErr) - } - cancel() - } - }) - }) - } - }) - } -} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 2545b21644d7..5f7e1629d379 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -137,14 +137,8 @@ func newUnloadedReplica( onReset := func() { store.Metrics().ReplicaCircuitBreakerCurTripped.Dec(1) } - var cancelStorage CancelStorage - if f := r.store.cfg.TestingKnobs.CancelStorageFactory; f != nil { - cancelStorage = f() - } else { - cancelStorage = &MapCancelStorage{} - } r.breaker = newReplicaCircuitBreaker( - store.cfg.Settings, store.stopper, r.AmbientContext, r, cancelStorage, onTrip, onReset, + store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset, ) return r } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index bbef1c9cfa95..f958db11f9ea 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -158,6 +158,16 @@ func (proposal *ProposalData) signalProposalResult(pr proposalResult) { if proposal.doneCh != nil { proposal.doneCh <- pr proposal.doneCh = nil + // Need to remove any span from the proposal, as the signalled caller + // will likely finish it, and if we then end up applying this proposal + // we'll try to make a ChildSpan off `proposal.ctx` and this will + // trigger the Span use-after-finish assertions. + // + // See: https://github.com/cockroachdb/cockroach/pull/76858#issuecomment-1048179588 + // + // NB: `proposal.ec.repl` might already have been cleared if we arrive here + // through finishApplication. + proposal.ctx = context.Background() } } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 1cb46178ec4b..c44263da3631 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -1256,6 +1257,21 @@ func (r *Replica) refreshProposalsLocked( } } +func (r *Replica) poisonInflightLatches(err error) { + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() + for _, p := range r.mu.proposals { + p.ec.poison() + if p.ec.g.Req.PoisonPolicy == spanlatch.PoisonPolicyError { + aErr := roachpb.NewAmbiguousResultError("circuit breaker tripped") + aErr.WrappedErr = roachpb.NewError(err) + p.signalProposalResult(proposalResult{Err: roachpb.NewError(aErr)}) + } + } +} + // maybeCoalesceHeartbeat returns true if the heartbeat was coalesced and added // to the appropriate queue. func (r *Replica) maybeCoalesceHeartbeat( diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 46dab0b90d50..04ac29510774 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -318,7 +318,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // coalesced requests timeout/cancel. p.cancelLocked (defined below) is the // cancel function that must be called; calling just cancel is insufficient. ctx := p.repl.AnnotateCtx(context.Background()) - if hasBypassCircuitBreakerMarker(ctx) { + if hasBypassCircuitBreakerMarker(parentCtx) { // If the caller bypasses the circuit breaker, allow the lease to do the // same. Otherwise, the lease will be refused by the circuit breaker as // well. @@ -330,7 +330,21 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // will receive the circuit breaker error. This is special-cased in // `redirectOnOrAcquireLease`, where such a caller needs to retry instead of // propagating the error. + // + // TODO(during review): just highlighting this code. When we switch to + // PoisonPolicy, we get new (undesired) behavior where a req with + // PoisonPolicyError might join on a fail-slow (PoisonPolicyWait) lease. So + // we'll need extra one-off checks while joining leases under an open + // breaker. I wonder why we can't use latches to join on an existing lease + // request. That way, poisoning could do the job for us (though more + // coarsely). Basically, before requesting or transferring a lease, go + // through latching for a lease-specific key (perhaps without inserting + // yourself, to avoid long chains, but then we get a thundering herd + // instead) using the caller's PoisonPolicy. When queue has resolved, + // reassess the situation. That way, fail-fast requests will fail-fast and + // fail-slow requests will wait. ctx = withBypassCircuitBreakerMarker(ctx) + log.Event(ctx, "lease request bypasses circuit breaker") } const opName = "request range lease" tr := p.repl.AmbientContext.Tracer @@ -452,7 +466,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( if pErr == nil { // The Replica circuit breakers together with round-tripping a ProbeRequest // here before asking for the lease could provide an alternative, simpler - // solution to the the below issue: + // solution to the below issue: // // https://github.com/cockroachdb/cockroach/issues/37906 ba := roachpb.BatchRequest{} @@ -1126,11 +1140,6 @@ func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStat func (r *Replica) redirectOnOrAcquireLeaseForRequest( ctx context.Context, reqTS hlc.Timestamp, ) (kvserverpb.LeaseStatus, *roachpb.Error) { - if hasBypassCircuitBreakerMarker(ctx) { - defer func() { - log.Infof(ctx, "hello") - }() - } // Try fast-path. now := r.store.Clock().NowAsClockTimestamp() { @@ -1269,6 +1278,9 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // breaker error back, it joined a lease request started by an operation // that did not bypass circuit breaker errors. Loop around and try again. // See requestLeaseAsync for details. + // + // TODO(during review): this case might go away if we're getting better + // about not joining leases with different PoisonPolicies. return nil case errors.HasType(goErr, (*roachpb.LeaseRejectedError)(nil)): var tErr *roachpb.LeaseRejectedError diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 8554c35a663b..23ea1da5e214 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -138,19 +139,30 @@ func (r *Replica) sendWithoutRangeID( return nil, roachpb.NewError(err) } - // Circuit breaker handling. - ctx, cancel := context.WithCancel(ctx) + // TODO(during review): will need to (re-)address the race in which the + // breaker trips and the probe launches just before a proposal gets inserted, + // such that the probe's invocation of poisonInflightLatches misses the + // proposal. The ctx cancellation based setup avoids this (in Register) using + // an "insert first then check breaker" ordering. Translating that to proposals, + // it would be "insert proposal before checking breaker", which is reasonable; + // an additional earlier check should be added to avoid adding to the raft log + // though. + // + // TODO(during review): PoisonPolicyWait replaces withCircuitBreakerProbeMarker. + // + // TODO(during review): if a lease acquisition is triggered by a request that + // wants PoisonPolicyWait, it must also use PoisonPolicyWait. And we have to + // ensure that requests that want PoisonPolicyError don't join onto that + // lease request, or they won't get the fail-fast behavior that they want. + // This is not a problem that exists under ctx cancellation because in that + // case requests using PoisonPolicyError never get to acquire a lease. They + // get cancelled immediately and un-join the inflight lease request. (But the + // opposite race exists: a fail-slow request might join on a lease that then + // cancels when the breaker trips). + //ctx, cancel := context.WithCancel(ctx) if bypassReplicaCircuitBreakerForBatch(ba) { ctx = withBypassCircuitBreakerMarker(ctx) } - tok, brSig, err := r.breaker.Register(ctx, cancel) - if err != nil { - return nil, roachpb.NewError(err) - } - defer func() { - rErr = r.breaker.UnregisterAndAdjustError(tok, brSig, rErr) - cancel() - }() if err := r.maybeBackpressureBatch(ctx, ba); err != nil { return nil, roachpb.NewError(err) @@ -163,7 +175,7 @@ func (r *Replica) sendWithoutRangeID( } // NB: must be performed before collecting request spans. - ba, err = maybeStripInFlightWrites(ba) + ba, err := maybeStripInFlightWrites(ba) if err != nil { return nil, roachpb.NewError(err) } @@ -414,6 +426,13 @@ func (r *Replica) executeBatchWithConcurrencyRetries( r.concMgr.FinishReq(g) } }() + pp := spanlatch.PoisonPolicyError + if hasBypassCircuitBreakerMarker(ctx) { + // TODO(during review): if breakers are disabled via the cluster setting + // while a latch is poisoned, this should result in PoisonPolicyWait being + // applied to all requests. + pp = spanlatch.PoisonPolicyWait + } for first := true; ; first = false { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { @@ -454,8 +473,16 @@ func (r *Replica) executeBatchWithConcurrencyRetries( Requests: ba.Requests, LatchSpans: latchSpans, // nil if g != nil LockSpans: lockSpans, // nil if g != nil + PoisonPolicy: pp, }, requestEvalKind) if pErr != nil { + if brErr := r.breaker.Signal().Err(); errors.Is(pErr.GoError(), spanlatch.ErrPoisonedLatch) && brErr != nil { + // TODO(during review): include the poisoned span. + // TODO(during review): the breaker could no longer be tripped at this point, + // so we'd need to manufacture an error. This is at odds with the old setup + // which tried to have the breaker be the source of the error. + pErr = roachpb.NewError(errors.Wrapf(brErr, "%s", spanlatch.ErrPoisonedLatch)) + } return nil, pErr } else if resp != nil { br = new(roachpb.BatchResponse) @@ -1156,6 +1183,14 @@ func (ec *endCmds) move() endCmds { return res } +func (ec *endCmds) poison() { + if ec.repl == nil { + // Already cleared. + return + } + ec.repl.concMgr.PoisonReq(ec.g) +} + // done releases the latches acquired by the command and updates the timestamp // cache using the final timestamp of each command. // diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 681395c07e29..e4dc868a722d 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -94,6 +94,15 @@ func (r *Replica) executeWriteBatch( return nil, g, roachpb.NewError(err) } + // Check the breaker. Note that we do this after checkExecutionCanProceed, + // so that NotLeaseholderError has precedence. + if !hasBypassCircuitBreakerMarker(ctx) { + brSig := r.breaker.Signal() + if err := brSig.Err(); err != nil { + return nil, g, roachpb.NewError(err) + } + } + // Compute the transaction's local uncertainty limit using observed // timestamps, which can help avoid uncertainty restarts. ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset()) diff --git a/pkg/kv/kvserver/spanlatch/manager.go b/pkg/kv/kvserver/spanlatch/manager.go index 92a67b87d8f3..1cf3b5dc2bad 100644 --- a/pkg/kv/kvserver/spanlatch/manager.go +++ b/pkg/kv/kvserver/spanlatch/manager.go @@ -87,6 +87,7 @@ type latch struct { span roachpb.Span ts hlc.Timestamp done *signal + poison *signal next, prev *latch // readSet linked-list. } @@ -109,7 +110,8 @@ func (la *latch) SetEndKey(v []byte) { la.span.EndKey = v } // Guard is a handle to a set of acquired latches. It is returned by // Manager.Acquire and accepted by Manager.Release. type Guard struct { - done signal + pp PoisonPolicy + done, poison signal // latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size. latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32 @@ -166,9 +168,10 @@ func allocGuardAndLatches(nLatches int) (*Guard, []latch) { return new(Guard), make([]latch, nLatches) } -func newGuard(spans *spanset.SpanSet) *Guard { +func newGuard(spans *spanset.SpanSet, pp PoisonPolicy) *Guard { nLatches := spans.Len() guard, latches := allocGuardAndLatches(nLatches) + guard.pp = pp for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { ss := spans.GetSpans(a, s) @@ -182,6 +185,7 @@ func newGuard(spans *spanset.SpanSet) *Guard { latch := &latches[i] latch.span = ss[i].Span latch.done = &guard.done + latch.poison = &guard.poison latch.ts = ss[i].Timestamp // latch.setID() in Manager.insert, under lock. } @@ -203,8 +207,10 @@ func newGuard(spans *spanset.SpanSet) *Guard { // acquired. // // It returns a Guard which must be provided to Release. -func (m *Manager) Acquire(ctx context.Context, spans *spanset.SpanSet) (*Guard, error) { - lg, snap := m.sequence(spans) +func (m *Manager) Acquire( + ctx context.Context, spans *spanset.SpanSet, pp PoisonPolicy, +) (*Guard, error) { + lg, snap := m.sequence(spans, pp) defer snap.close() err := m.wait(ctx, lg, snap) @@ -227,8 +233,8 @@ func (m *Manager) Acquire(ctx context.Context, spans *spanset.SpanSet) (*Guard, // // The method returns a Guard which must be provided to the // CheckOptimisticNoConflicts, Release methods. -func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet) *Guard { - lg, snap := m.sequence(spans) +func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, pp PoisonPolicy) *Guard { + lg, snap := m.sequence(spans, pp) lg.snap = &snap return lg } @@ -236,10 +242,10 @@ func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet) *Guard { // WaitFor waits for conflicting latches on the spans without adding // any latches itself. Fast path for operations that only require past latches // to be released without blocking new latches. -func (m *Manager) WaitFor(ctx context.Context, spans *spanset.SpanSet) error { +func (m *Manager) WaitFor(ctx context.Context, spans *spanset.SpanSet, pp PoisonPolicy) error { // The guard is only used to store latches by this request. These latches // are not actually inserted using insertLocked. - lg := newGuard(spans) + lg := newGuard(spans, pp) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -335,8 +341,8 @@ func (m *Manager) WaitUntilAcquired(ctx context.Context, lg *Guard) (*Guard, err // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet) (*Guard, snapshot) { - lg := newGuard(spans) +func (m *Manager) sequence(spans *spanset.SpanSet, pp PoisonPolicy) (*Guard, snapshot) { + lg := newGuard(spans, pp) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -454,7 +460,7 @@ func (m *Manager) wait(ctx context.Context, lg *Guard, snap snapshot) error { // Wait for writes at equal or lower timestamps. a2 := spanset.SpanReadWrite it := tr[a2].MakeIter() - if err := m.iterAndWait(ctx, timer, &it, a, a2, latch, ignoreLater); err != nil { + if err := m.iterAndWait(ctx, timer, &it, lg.pp, a, a2, latch, ignoreLater); err != nil { return err } case spanset.SpanReadWrite: @@ -466,13 +472,13 @@ func (m *Manager) wait(ctx context.Context, lg *Guard, snap snapshot) error { // to release their latches, so we wait on them first. a2 := spanset.SpanReadWrite it := tr[a2].MakeIter() - if err := m.iterAndWait(ctx, timer, &it, a, a2, latch, ignoreNothing); err != nil { + if err := m.iterAndWait(ctx, timer, &it, lg.pp, a, a2, latch, ignoreNothing); err != nil { return err } // Wait for reads at equal or higher timestamps. a2 = spanset.SpanReadOnly it = tr[a2].MakeIter() - if err := m.iterAndWait(ctx, timer, &it, a, a2, latch, ignoreEarlier); err != nil { + if err := m.iterAndWait(ctx, timer, &it, lg.pp, a, a2, latch, ignoreEarlier); err != nil { return err } default: @@ -491,6 +497,7 @@ func (m *Manager) iterAndWait( ctx context.Context, t *timeutil.Timer, it *iterator, + pp PoisonPolicy, waitType, heldType spanset.SpanAccess, wait *latch, ignore ignoreFn, @@ -503,22 +510,64 @@ func (m *Manager) iterAndWait( if ignore(wait.ts, held.ts) { continue } - if err := m.waitForSignal(ctx, t, waitType, heldType, wait, held); err != nil { + if err := m.waitForSignal(ctx, t, pp, waitType, heldType, wait, held); err != nil { return err } } return nil } +// A PoisonPolicy determines how a request will react to encountering a poisoned +// latch. +type PoisonPolicy byte + +const ( + // PoisonPolicyError instructs a request to return an error upon encountering + // a poisoned latch. + PoisonPolicyError PoisonPolicy = iota + // PoisonPolicyWait instructs a request to treat poisoned latches like + // regular latches, i.e. to wait for them to be released. + PoisonPolicyWait +) + +// PoisonPolicyTODO is a placeholder that will need to be replaced with +// either of PoisonPolicy{Err,Wait}. +const PoisonPolicyTODO = PoisonPolicyError + +// ErrPoisonedLatch is returned when a request using PoisonPolicyError +// encounters a poisoned latch. +var ErrPoisonedLatch = errors.Errorf("encountered poisoned latch") + // waitForSignal waits for the latch that is currently held to be signaled. func (m *Manager) waitForSignal( - ctx context.Context, t *timeutil.Timer, waitType, heldType spanset.SpanAccess, wait, held *latch, + ctx context.Context, + t *timeutil.Timer, + pp PoisonPolicy, + waitType, heldType spanset.SpanAccess, + wait, held *latch, ) error { log.Eventf(ctx, "waiting to acquire %s latch %s, held by %s latch %s", waitType, wait, heldType, held) + poisonCh := held.poison.signalChan() for { select { case <-held.done.signalChan(): return nil + case <-poisonCh: + // A dependent latch was poisoned, which poisons this latch as well + // (transitively notifying all waiters of this circumstance, giving them a + // chance to decide what to do). The current request can choose between + // failing fast (the common case) and waiting anyway (as poisoned latches + // could ultimately be released as well). + wait.poison.signal() + switch pp { + case PoisonPolicyError: + return ErrPoisonedLatch + case PoisonPolicyWait: + log.Eventf(ctx, "encountered poisoned latch; continuing to wait") + poisonCh = nil + default: + return errors.Errorf("unsupported PoisonPolicy %d", pp) + } case <-t.C: t.Read = true defer t.Reset(base.SlowRequestThreshold) @@ -541,6 +590,13 @@ func (m *Manager) waitForSignal( } } +// Poison marks the Guard as poisoned, meaning that the request will not be +// expected to be releasing its latches (any time soon). This gives requests +// blocking on the Guard's latches an opportunity to fail fast. +func (m *Manager) Poison(lg *Guard) { + lg.poison.signal() +} + // Release releases the latches held by the provided Guard. After being called, // dependent latch acquisition attempts can complete if not blocked on any other // owned latches. diff --git a/pkg/kv/kvserver/spanlatch/manager_test.go b/pkg/kv/kvserver/spanlatch/manager_test.go index 115577567470..dd05ee20ad0a 100644 --- a/pkg/kv/kvserver/spanlatch/manager_test.go +++ b/pkg/kv/kvserver/spanlatch/manager_test.go @@ -91,7 +91,7 @@ func testLatchBlocks(t *testing.T, lgC <-chan *Guard) { // MustAcquire is like Acquire, except it can't return context cancellation // errors. func (m *Manager) MustAcquire(spans *spanset.SpanSet) *Guard { - lg, err := m.Acquire(context.Background(), spans) + lg, err := m.Acquire(context.Background(), spans, PoisonPolicyTODO) if err != nil { panic(err) } @@ -110,7 +110,7 @@ func (m *Manager) MustAcquireCh(spans *spanset.SpanSet) <-chan *Guard { // MustAcquireChCtx is like MustAcquireCh, except it accepts a context. func (m *Manager) MustAcquireChCtx(ctx context.Context, spans *spanset.SpanSet) <-chan *Guard { ch := make(chan *Guard) - lg, snap := m.sequence(spans) + lg, snap := m.sequence(spans, PoisonPolicyTODO) go func() { err := m.wait(ctx, lg, snap) if err != nil { @@ -541,13 +541,13 @@ func TestLatchManagerOptimistic(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS)) - require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS))) + lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS), PoisonPolicyTODO) + require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS)), PoisonPolicyTODO) lg1, err := m.WaitUntilAcquired(context.Background(), lg1) require.NoError(t, err) // Optimistic acquire encounters conflict in some cases. - lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS)) + lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS), PoisonPolicyTODO) require.False(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "e", read, zeroTS))) require.True(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "d", read, zeroTS))) waitUntilAcquiredCh := func(g *Guard) <-chan *Guard { @@ -565,7 +565,7 @@ func TestLatchManagerOptimistic(t *testing.T) { testLatchSucceeds(t, ch2) // Optimistic acquire encounters conflict. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS)) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), PoisonPolicyTODO) require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) m.Release(lg2) // There is still a conflict even though lg2 has been released. @@ -577,7 +577,7 @@ func TestLatchManagerOptimistic(t *testing.T) { // Optimistic acquire for read below write encounters no conflict. oneTS, twoTS := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2} lg4 := m.MustAcquire(spans("c", "e", write, twoTS)) - lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS)) + lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS), PoisonPolicyTODO) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "e", read, oneTS))) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "c", read, oneTS))) lg5, err = m.WaitUntilAcquired(context.Background(), lg5) @@ -591,14 +591,14 @@ func TestLatchManagerWaitFor(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS)) + lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS), PoisonPolicyTODO) require.NoError(t, err) // See if WaitFor waits for above latch. waitForCh := func() <-chan *Guard { ch := make(chan *Guard) go func() { - err := m.WaitFor(context.Background(), spans("a", "e", read, zeroTS)) + err := m.WaitFor(context.Background(), spans("a", "e", read, zeroTS), PoisonPolicyTODO) require.NoError(t, err) ch <- &Guard{} }() @@ -611,7 +611,7 @@ func TestLatchManagerWaitFor(t *testing.T) { // Optimistic acquire should _not_ encounter conflict - as WaitFor should // not lay any latches. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS)) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), PoisonPolicyTODO) require.True(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) lg3, err = m.WaitUntilAcquired(context.Background(), lg3) require.NoError(t, err) @@ -659,7 +659,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { b.ResetTimer() for i := range spans { - lg, snap := m.sequence(&spans[i]) + lg, snap := m.sequence(&spans[i], PoisonPolicyTODO) snap.close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) diff --git a/pkg/kv/kvserver/spanlatch/signal.go b/pkg/kv/kvserver/spanlatch/signal.go index 71deb5621918..0e33b13da97e 100644 --- a/pkg/kv/kvserver/spanlatch/signal.go +++ b/pkg/kv/kvserver/spanlatch/signal.go @@ -42,7 +42,9 @@ type signal struct { func (s *signal) signal() { if !atomic.CompareAndSwapInt32(&s.a, noSig, sig) { - panic("signaled twice") + // TODO(tbg): could add an assertion for signals that must only be signaled + // once. + return } // Close the channel if it was ever initialized. if cPtr := atomic.LoadPointer(&s.c); cPtr != nil { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 77798a352a8d..5e70695cb359 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -86,10 +86,6 @@ type StoreTestingKnobs struct { // per-Batch SlowReplicationThreshold. SlowReplicationThresholdOverride func(ba *roachpb.BatchRequest) time.Duration - // CancelStorageFactory overrides the default CancelStorage used by Replica - // circuit breakers. - CancelStorageFactory func() CancelStorage - // TestingRangefeedFilter is called before a replica processes a rangefeed // in order for unit tests to modify the request, error returned to the client // or data. diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 2514b5ef09e3..e9d3f68c7eda 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1303,8 +1303,9 @@ func (*AdminChangeReplicasRequest) flags() flag { return isAdmin | isAlone } func (*AdminRelocateRangeRequest) flags() flag { return isAdmin | isAlone } func (*GCRequest) flags() flag { - // We let GCRequest bypass the circuit breaker because otherwise, the GC queue may - // busy loop on an unavailable range, doing lots of work but never making progress. + // We defensively let GCRequest bypass the circuit breaker because otherwise, + // the GC queue might busy loop on an unavailable range, doing lots of work + // but never making progress. return isWrite | isRange | bypassesReplicaCircuitBreaker }