diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index df87c11dff58..1a2eac484301 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -20,6 +20,7 @@ PROTOBUF_SRCS = [ "//pkg/kv/kvnemesis:kvnemesis_go_proto", "//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto", "//pkg/kv/kvserver/concurrency/lock:lock_go_proto", + "//pkg/kv/kvserver/concurrency/poison:poison_go_proto", "//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e24b2673dcf9..63e1223b84c0 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -37,7 +37,6 @@ go_library( "replica_backpressure.go", "replica_batch_updates.go", "replica_circuit_breaker.go", - "replica_circuit_breaker_cancelstorage.go", "replica_closedts.go", "replica_command.go", "replica_consistency.go", @@ -124,6 +123,7 @@ go_library( "//pkg/kv/kvserver/closedts/sidetransport", "//pkg/kv/kvserver/closedts/tracker", "//pkg/kv/kvserver/concurrency", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/constraint", "//pkg/kv/kvserver/gc", "//pkg/kv/kvserver/idalloc", @@ -227,7 +227,6 @@ go_test( "client_rangefeed_test.go", "client_relocate_range_test.go", "client_replica_backpressure_test.go", - "client_replica_circuit_breaker_bench_test.go", "client_replica_circuit_breaker_test.go", "client_replica_gc_test.go", "client_replica_test.go", diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go deleted file mode 100644 index 686a6a6c557e..000000000000 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvserver_test - -import ( - "context" - "fmt" - "math/rand" - "strconv" - "sync" - "testing" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/util/encoding" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/stretchr/testify/require" -) - -type replicaCircuitBreakerBench struct { - *testcluster.TestCluster - pool *sync.Pool // *BatchRequest -} - -func (tc *replicaCircuitBreakerBench) repl(b *testing.B) *kvserver.Replica { - return tc.GetFirstStoreFromServer(b, 0).LookupReplica(keys.MustAddr(tc.ScratchRange(b))) -} - -func setupCircuitBreakerReplicaBench( - b *testing.B, breakerEnabled bool, cs string, -) (*replicaCircuitBreakerBench, *stop.Stopper) { - b.Helper() - - var numShards int - { - _, err := fmt.Sscanf(cs, "mutexmap-%d", &numShards) - require.NoError(b, err) - } - sFn := func() kvserver.CancelStorage { return &kvserver.MapCancelStorage{NumShards: numShards} } - - var knobs kvserver.StoreTestingKnobs - knobs.CancelStorageFactory = sFn - - var args base.TestClusterArgs - args.ServerArgs.Knobs.Store = &knobs - tc := testcluster.StartTestCluster(b, 1, args) - - stmt := `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '1000s'` - if !breakerEnabled { - stmt = `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '0s'` - } - _, err := tc.ServerConn(0).Exec(stmt) - require.NoError(b, err) - wtc := &replicaCircuitBreakerBench{ - TestCluster: tc, - } - wtc.pool = &sync.Pool{ - New: func() interface{} { - repl := wtc.repl(b) - var ba roachpb.BatchRequest - ba.RangeID = repl.RangeID - ba.Timestamp = repl.Clock().NowAsClockTimestamp().ToTimestamp() - var k roachpb.Key - k = append(k, repl.Desc().StartKey.AsRawKey()...) - k = encoding.EncodeUint64Ascending(k, uint64(rand.Intn(1000))) - ba.Add(roachpb.NewGet(k, false)) - return &ba - }, - } - return wtc, tc.Stopper() -} - -func BenchmarkReplicaCircuitBreakerSendOverhead(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) - ctx := context.Background() - - for _, enabled := range []bool{false, true} { - b.Run("enabled="+strconv.FormatBool(enabled), func(b *testing.B) { - dss := []string{ - "mutexmap-1", "mutexmap-2", "mutexmap-4", "mutexmap-8", "mutexmap-12", "mutexmap-16", - "mutexmap-20", "mutexmap-24", "mutexmap-32", "mutexmap-64", - } - if !enabled { - dss = dss[:1] - } - - for _, ds := range dss { - b.Run(ds, func(b *testing.B) { - b.ReportAllocs() - tc, stopper := setupCircuitBreakerReplicaBench(b, enabled, ds) - defer stopper.Stop(ctx) - - repl := tc.repl(b) - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - ba := tc.pool.Get().(*roachpb.BatchRequest) - _, err := repl.Send(ctx, *ba) - tc.pool.Put(ba) - if err != nil { - b.Fatal(err) - } - } - }) - }) - } - }) - } -} diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 0293834831bf..bdade53f7f26 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -82,12 +83,13 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { defer log.Scope(t).Close(t) tc := setupCircuitBreakerTest(t) defer tc.Stopper().Stop(context.Background()) + k := tc.ScratchRange(t) // Get lease on n1. require.NoError(t, tc.Write(n1)) // Disable the probe so that when the breaker trips, it stays tripped. tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("boom")) + tc.Report(n1, errors.New("injected breaker error")) s1 := tc.GetFirstStoreFromServer(t, n1) s2 := tc.GetFirstStoreFromServer(t, n2) @@ -97,10 +99,10 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCurTripped.Value()) require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCumTripped.Count()) - // n1 could theoretically still serve reads (there is a valid lease - // and none of the latches are taken), but since it is hard to determine - // that upfront we currently fail all reads as well. - tc.RequireIsBreakerOpen(t, tc.Read(n1)) + // n1 can still serve reads despite the breaker having tripped, as there is a + // valid lease and no poisoned latches prevent the read. + require.NoError(t, tc.Read(n1)) + require.NoError(t, tc.FollowerRead(n1)) tc.RequireIsBreakerOpen(t, tc.Write(n1)) // When we go through the KV client stack, we still get the breaker error @@ -108,23 +110,29 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { tc.RequireIsBreakerOpen(t, tc.WriteDS(n1)) tc.RequireIsBreakerOpen(t, tc.WriteDS(n2)) + // Can't transfer the lease away while breaker is tripped. (This would be + // a bad idea, since n1 would stop serving strong reads, thus making the + // outage worse). + tc.RequireIsBreakerOpen(t, + tc.TransferRangeLease(tc.LookupRangeOrFatal(t, k), tc.Target(n2)), + ) + // n2 does not have the lease so all it does is redirect to the leaseholder - // n1. + // n1, but it can serve follower reads. tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + require.NoError(t, tc.FollowerRead(n2)) tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) // Enable the probe. Even a read should trigger the probe // and within due time the breaker should heal. tc.SetProbeEnabled(n1, true) - tc.UntripsSoon(t, tc.Read, n1) - // Same behavior on writes. - tc.Report(n1, errors.New("boom again")) + require.NoError(t, tc.Read(n1)) // this always worked + // Writes heal soon. tc.UntripsSoon(t, tc.Write, n1) - // Currently tripped drops back to zero, all-time is two (since we tripped - // it twice) + // Currently tripped drops back to zero, all-time remains at one. require.EqualValues(t, 0, s1.Metrics().ReplicaCircuitBreakerCurTripped.Value()) - require.EqualValues(t, 2, s1.Metrics().ReplicaCircuitBreakerCumTripped.Count()) + require.EqualValues(t, 1, s1.Metrics().ReplicaCircuitBreakerCumTripped.Count()) // s2 wasn't affected by any breaker events. require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCurTripped.Value()) require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCumTripped.Count()) @@ -134,9 +142,9 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { // breaker on follower n2. Before the breaker is tripped, we see // NotLeaseholderError. When it's tripped, those are supplanted by the breaker // errors. Once we allow the breaker to probe, the breaker untrips. In -// particular, this tests that the probe can succeed even when run on a -// follower (which would not be true if it required the local Replica to -// execute an operation that requires the lease). +// particular, this tests that the probe can succeed even when run on a follower +// (which would not be true if it required the local Replica to execute an +// operation that requires the lease). func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -147,7 +155,7 @@ func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { require.NoError(t, tc.Write(n1)) // Disable the probe on n2 so that when the breaker trips, it stays tripped. tc.SetProbeEnabled(n2, false) - tc.Report(n2, errors.New("boom")) + tc.Report(n2, errors.New("injected breaker error")) // We didn't trip the leaseholder n1, so it is unaffected. require.NoError(t, tc.Read(n1)) @@ -158,31 +166,21 @@ func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { // time of writing it would propagate it. require.NoError(t, tc.WriteDS(n1)) - tc.RequireIsBreakerOpen(t, tc.Read(n2)) - tc.RequireIsBreakerOpen(t, tc.Write(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) + require.NoError(t, tc.FollowerRead(n2)) - // Enable the probe. Even a read should trigger the probe - // and within due time the breaker should heal, giving us - // NotLeaseholderErrors again. - // - // TODO(tbg): this test would be more meaningful with follower reads. They - // should succeed when the breaker is open and fail if the breaker is - // tripped. However knowing that the circuit breaker check sits at the top - // of Replica.sendWithRangeID, it's clear that it won't make a difference. + // Enable the probe again. n2 should untrip soon. tc.SetProbeEnabled(n2, true) + tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) testutils.SucceedsSoon(t, func() error { - if err := tc.Read(n2); !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { - return err - } - return nil - }) - // Same behavior on writes. - tc.Report(n2, errors.New("boom again")) - testutils.SucceedsSoon(t, func() error { - if err := tc.Write(n2); !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { - return err - } - return nil + // NB: this is slightly contrived - the mere act of accessing Err() is what + // triggers the probe! Regular requests on the replica wouldn'd do that, + // since we're intentionally preferring a NotLeaseholderError over a breaker + // error (and thus aren't ever accessing the breaker when we can't serve the + // request). + return tc.repls[n2].Breaker().Signal().Err() }) } @@ -202,34 +200,45 @@ func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) { // disabled. require.NoError(t, tc.Write(n1)) tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("boom")) - resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) - - // n2 (not n1) will return a NotLeaseholderError. This may be surprising - - // why isn't it trying and succeeding to acquire a lease - but it does - // not do that because it sees that the new leaseholder (n2) is not live - // itself. We'll revisit this after re-enabling liveness later in the test. - { - err := tc.Read(n2) - // At time of writing: not incrementing epoch on n1 because next - // leaseholder (n2) not live. - t.Log(err) - tc.RequireIsNotLeaseholderError(t, err) - // Same behavior for write on n2. - tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) - } - // On n1, run into the circuit breaker when requesting lease. - { - tc.RequireIsBreakerOpen(t, tc.Read(n1)) + tc.Report(n1, errors.New("injected breaker error")) + resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats) + + // On n1, run into the circuit breaker when requesting lease. We have to + // resume heartbeats for this to not time out, as requesting the new lease + // entails doing liveness checks which can't succeed if nobody is + // heartbeating, and we'd get stuck in liveness before reaching the circuit + // breaker. (In other words, replica circuit breaking doesn't fail-fast + // requests reliably if liveness is unavailable; this is tracked in #74616). + // We don't attempt to acquire a lease on n2 since it would try and succeed + // (except the test harness categorically prevents n2 from getting a lease, + // injecting an error). + resumeHeartbeats() + testutils.SucceedsSoon(t, func() error { + err := tc.Read(n1) + if errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { + // Retriable errors can occur when manipulating the liveness record in + // preparation for requesting a lease, such as: + // + // [NotLeaseHolderError] failed to manipulate liveness record: heartbeat + // failed on epoch increment; r45: replica (n1,s1):1 not lease holder; + // current lease is repl=(n1,s1):1 seq=1 start=0,0 epo=1 pro=[...] + return err + } + tc.RequireIsBreakerOpen(t, err) tc.RequireIsBreakerOpen(t, tc.Write(n1)) - } + return nil + }) + + // Can still perform follower reads on both nodes, as this does not rely on + // the lease and does not consult the breaker. + require.NoError(t, tc.FollowerRead(n1)) + require.NoError(t, tc.FollowerRead(n2)) // Let the breaker heal and things should go back to normal. This is not a // trivial thing to hold, as the probe needs to go through for this, and if // we're not careful, the probe itself is held up by the breaker as well, or // the probe will try to acquire a lease (which we're currently careful to // avoid). - resumeHeartbeats() tc.SetProbeEnabled(n1, true) tc.UntripsSoon(t, tc.Read, n1) tc.UntripsSoon(t, tc.Write, n1) @@ -254,21 +263,33 @@ func TestReplicaCircuitBreaker_Leaseholder_QuorumLoss(t *testing.T) { tc.StopServer(n2) // lose quorum // We didn't lose the liveness range (which is only on n1). - require.NoError(t, tc.Server(n1).HeartbeatNodeLiveness()) + tc.HeartbeatNodeLiveness(t, n1) + + // Read still works, as we have valid lease and no poisoned latch + // underneath. + require.NoError(t, tc.Read(n1)) tc.SetSlowThreshold(10 * time.Millisecond) { err := tc.Write(n1) var ae *roachpb.AmbiguousResultError require.True(t, errors.As(err, &ae), "%+v", err) t.Log(err) + tc.RequireIsBreakerOpen(t, err) + } + // We still have a valid lease, but now the above write is holding a poisoned + // latch (this is true despite the write itself having returned already). + // However, can still serve follower reads because those don't check latches + // (nor do they need the lease, though there is a valid one in this case). + { + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + require.NoError(t, tc.FollowerRead(n1)) } - tc.RequireIsBreakerOpen(t, tc.Read(n1)) // Bring n2 back and service should be restored. tc.SetSlowThreshold(0) // reset require.NoError(t, tc.RestartServer(n2)) - tc.UntripsSoon(t, tc.Read, n1) - require.NoError(t, tc.Write(n1)) + tc.UntripsSoon(t, tc.Write, n1) // poisoned latch goes away + require.NoError(t, tc.Read(n1)) } // In this test, the range is on n1 and n2 and we place the lease on n2 and @@ -287,12 +308,13 @@ func TestReplicaCircuitBreaker_Follower_QuorumLoss(t *testing.T) { // Get lease to n2 so that we can lose it without taking down the system ranges. desc := tc.LookupRangeOrFatal(t, tc.ScratchRange(t)) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(n2)) - resumeHeartbeats := tc.ExpireAllLeases(t, keepHeartbeats) + resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, keepHeartbeats) tc.StopServer(n2) // lose quorum and leaseholder resumeHeartbeats() // We didn't lose the liveness range (which is only on n1). - require.NoError(t, tc.Server(n1).HeartbeatNodeLiveness()) + tc.HeartbeatNodeLiveness(t, n1) + tc.SetSlowThreshold(10 * time.Millisecond) tc.RequireIsBreakerOpen(t, tc.Write(n1)) tc.RequireIsBreakerOpen(t, tc.Read(n1)) @@ -300,8 +322,8 @@ func TestReplicaCircuitBreaker_Follower_QuorumLoss(t *testing.T) { // Bring n2 back and service should be restored. tc.SetSlowThreshold(0) // reset require.NoError(t, tc.RestartServer(n2)) - tc.UntripsSoon(t, tc.Read, n1) - require.NoError(t, tc.Write(n1)) + tc.UntripsSoon(t, tc.Write, n1) + require.NoError(t, tc.Read(n1)) } // This test is skipped but documents that the current circuit breakers cannot @@ -353,7 +375,7 @@ func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) { // Expire all leases. We also pause all heartbeats but that doesn't really // matter since the liveness range is unavailable anyway. - resume := tc.ExpireAllLeases(t, pauseHeartbeats) + resume := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats) defer resume() // Since there isn't a lease, and the liveness range is down, the circuit @@ -373,6 +395,11 @@ func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) { } type dummyStream struct { + name string + t interface { + Helper() + Logf(string, ...interface{}) + } ctx context.Context recv chan *roachpb.RangeFeedEvent } @@ -382,7 +409,8 @@ func (s *dummyStream) Context() context.Context { } func (s *dummyStream) Send(ev *roachpb.RangeFeedEvent) error { - if ev.Val == nil { + if ev.Val == nil && ev.Error == nil { + s.t.Logf("%s: ignoring event: %v", s.name, ev) return nil } select { @@ -410,39 +438,44 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { args := &roachpb.RangeFeedRequest{ Span: roachpb.Span{Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey()}, } - // This test shouldn't take in excess of 45s even under the worst of conditions. - ctx, cancel := context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration) + + ctx, cancel := context.WithCancel(ctx) defer cancel() - stream1 := &dummyStream{ctx: ctx, recv: make(chan *roachpb.RangeFeedEvent)} + stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)} require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) { err := tc.repls[0].RangeFeed(args, stream1).GoError() if ctx.Err() != nil { - return + return // main goroutine stopping } assert.NoError(t, err) // avoid Fatal on goroutine })) - readOneVal := func(t *testing.T, stream *dummyStream) { + readOneVal := func(ctx context.Context, stream *dummyStream, timeout time.Duration) error { for { - var done bool select { + case <-time.After(timeout): + return errors.Errorf("%s: read timed out after %.2fs", stream.name, timeout.Seconds()) case <-ctx.Done(): - t.Fatal(ctx.Err()) + return ctx.Err() case ev := <-stream.recv: - t.Log(ev) - done = true - } - if done { - break + if ev.Error != nil { + return ev.Error.Error.GoError() + } + if ev.Val != nil { + t.Logf("%s: %s", stream.name, ev) + return nil + } } } } - require.NoError(t, tc.Write(n1)) - readOneVal(t, stream1) + testutils.SucceedsSoon(t, func() error { + require.NoError(t, tc.Write(n1)) + return readOneVal(ctx, stream1, time.Millisecond) + }) // NB: keep heartbeats because we're not trying to lose the liveness range. - undo := tc.ExpireAllLeases(t, keepHeartbeats) + undo := tc.ExpireAllLeasesAndN1LivenessRecord(t, keepHeartbeats) undo() tc.SetSlowThreshold(10 * time.Millisecond) tc.StopServer(n2) @@ -450,11 +483,11 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { // Start another stream during the "outage" to make sure it isn't rejected by // the breaker. - stream2 := &dummyStream{ctx: ctx, recv: make(chan *roachpb.RangeFeedEvent)} + stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)} require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) { err := tc.repls[0].RangeFeed(args, stream2).GoError() if ctx.Err() != nil { - return + return // main goroutine stopping } assert.NoError(t, err) // avoid Fatal on goroutine })) @@ -463,8 +496,13 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { require.NoError(t, tc.RestartServer(n2)) tc.UntripsSoon(t, tc.Write, n1) - readOneVal(t, stream1) - readOneVal(t, stream2) + require.NoError(t, readOneVal(ctx, stream1, testutils.DefaultSucceedsSoonDuration)) + // For the stream that started mid-way through the outage, we expect it to + // return a circuit breaker error, but in theory it could also never have + // tried to acquire a lease, in which case it might return a value as well. + if err := readOneVal(ctx, stream2, testutils.DefaultSucceedsSoonDuration); err != nil { + tc.RequireIsBreakerOpen(t, err) + } } func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { @@ -477,7 +515,7 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { // disabled, i.e. it will stay tripped. require.NoError(t, tc.Write(n1)) tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("boom")) + tc.Report(n1, errors.New("injected breaker error")) exemptRequests := []func() roachpb.Request{ func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} }, @@ -510,31 +548,41 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { for _, reqFn := range exemptRequests { req := reqFn() - t.Run(fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) { + tc.Run(t, fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) { require.NoError(t, tc.Send(n1, req)) }) } for _, reqFn := range exemptRequests { req := reqFn() - t.Run(fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) { - resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) + tc.Run(t, fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) { + resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats) resumeHeartbeats() // intentionally resume right now so that lease can be acquired - require.NoError(t, tc.Send(n1, req)) + // NB: when looking into the traces here, we sometimes see - as expected - + // that when the request tries to acquire a lease, the breaker is still + // tripped. That's why there is a retry loop here. + testutils.SucceedsSoon(t, func() error { + err := tc.Send(n1, req) + if errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { + return err + } + require.NoError(t, err) + return nil + }) }) } - resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) - defer resumeHeartbeats() // can't acquire leases until test ends + resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats) for _, reqFn := range exemptRequests { req := reqFn() - if req.Method() == roachpb.Probe { - // Probe does not require the lease, and is the most-tested of the bunch - // already. We don't have to test it again here, which would require undue - // amounts of special-casing below. - continue - } - t.Run(fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) { + tc.Run(t, fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) { + if m := req.Method(); m == roachpb.Probe { + // Probe does not require the lease, and is the most-tested of the bunch + // already. We don't have to test it again here, which would require undue + // amounts of special-casing below. + skip.IgnoreLintf(t, "subtest does not apply to %s", m) + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond) defer cancel() const maxWait = 5 * time.Second @@ -548,6 +596,46 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { require.Less(t, timeutil.Since(tBegin), maxWait) }) } + + // Restore the breaker via the probe. + resumeHeartbeats() + tc.SetProbeEnabled(n1, true) + tc.UntripsSoon(t, tc.Write, n1) + + // Lose quorum (liveness stays intact). + tc.SetSlowThreshold(10 * time.Millisecond) + tc.StopServer(n2) + // Let the breaker trip. This leaves a poisoned latch behind that at least some of + // the requests will interact with. + tc.RequireIsBreakerOpen(t, tc.Write(n1)) + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + + for _, reqFn := range exemptRequests { + req := reqFn() + tc.Run(t, fmt.Sprintf("with-poisoned-latch/%s", req.Method()), func(t *testing.T) { + if m := req.Method(); m == roachpb.GC { + // GC without GCKeys acquires no latches and is a pure read. If we want + // to put a key in there, we need to pick the right timestamp (since you + // can't GC a live key); it's all rather annoying and not worth it. In + // the long run, we also completely want to avoid acquiring latches for + // this request (since it should only mutate keyspace that has since + // fallen under the GCThreshold), so avoid cooking up anything special + // here. + skip.IgnoreLintf(t, "subtest does not apply to %s", m) + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond) + defer cancel() + const maxWait = 5 * time.Second + tBegin := timeutil.Now() + err := tc.SendCtx(ctx, n1, req) + t.Log(err) + require.Error(t, err) + require.Error(t, ctx.Err()) + // Make sure we didn't run into the "long" timeout inside of SendCtx but + // actually terminated as a result of our ctx cancelling. + require.Less(t, timeutil.Since(tBegin), maxWait) + }) + } } // Test infrastructure below. @@ -573,19 +661,18 @@ func makeBreakerToggleable(b *circuit.Breaker) (setProbeEnabled func(bool)) { } } -type replWithKnob struct { - *kvserver.Replica - setProbeEnabled func(bool) -} - type circuitBreakerTest struct { + t decoT *testcluster.TestCluster slowThresh *atomic.Value // time.Duration ManualClock *hlc.HybridManualClock repls []replWithKnob // 0 -> repl on Servers[0], etc + + seq int } func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { + skip.UnderStressRace(t) manualClock := hlc.NewHybridManualClock() var rangeID int64 // atomic slowThresh := &atomic.Value{} // supports .SetSlowThreshold(x) @@ -656,7 +743,8 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { k := tc.ScratchRange(t) atomic.StoreInt64(&rangeID, int64(tc.LookupRangeOrFatal(t, k).RangeID)) - tc.AddVotersOrFatal(t, k, tc.Target(1)) + tc.AddVotersOrFatal(t, k, tc.Target(n2)) + require.NoError(t, tc.WaitForVoters(k, tc.Target(n2))) var repls []replWithKnob for i := range tc.Servers { @@ -665,6 +753,7 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { repls = append(repls, replWithKnob{repl, enableProbe}) } return &circuitBreakerTest{ + t: decoT{t}, TestCluster: tc, ManualClock: manualClock, repls: repls, @@ -672,6 +761,21 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { } } +// Run is a wrapper around t.Run that allows the test harness to print traces +// using the subtest's *testing.T. +func (cbt *circuitBreakerTest) Run(t *testing.T, name string, f func(t *testing.T)) { + t.Helper() + t.Run(name, func(t *testing.T) { + t.Helper() + outerT := cbt.t + cbt.t = decoT{t} + defer func() { + cbt.t = outerT + }() + f(t) + }) +} + func (cbt *circuitBreakerTest) SetProbeEnabled(idx int, to bool) { cbt.repls[idx].setProbeEnabled(to) } @@ -686,31 +790,60 @@ func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) er t.Helper() err := method(idx) // All errors coming out should be annotated as coming from - // the circuit breaker. - if err != nil && !errors.Is(err, circuit.ErrBreakerOpen) { + // the circuit breaker. In rare cases, we can also see a + // NotLeaseholderError such as this one: + // [NotLeaseHolderError] failed to manipulate liveness record: heartbeat + // failed on epoch increment; r45: replica (n1,s1):1 not lease holder; + // current lease is repl=(n1,s1):1 seq=1 start=0,0 epo=1 pro=[...] + if err != nil && + !errors.Is(err, circuit.ErrBreakerOpen) && + !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { + t.Fatalf("saw unexpected error %+v", err) } return err }) } -func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats bool) (undo func()) { +func (cbt *circuitBreakerTest) ExpireAllLeasesAndN1LivenessRecord( + t *testing.T, pauseHeartbeats bool, +) (undo func()) { t.Helper() - var maxWT int64 var fs []func() - for _, srv := range cbt.Servers { + for idx, srv := range cbt.Servers { lv := srv.NodeLiveness().(*liveness.NodeLiveness) + if pauseHeartbeats { undo := lv.PauseAllHeartbeatsForTest() fs = append(fs, undo) } + self, ok := lv.Self() require.True(t, ok) - if maxWT < self.Expiration.WallTime { - maxWT = self.Expiration.WallTime + + cbt.ManualClock.Forward(self.Expiration.WallTime) + if idx == n1 { + // Invalidate n1's liveness record, to make sure that ranges on n1 need + // to acquire a new lease (vs waiting for a heartbeat to the liveness + // record resuscitating the old one). + // + // Needing to do this is the reason for special-casing this entire method + // around n1; if we stop heartbeats for both nodes, they can't increment + // each others liveness records: if a node's liveness is paused, it doesn't + // allow incrementing records neither. (This is silly). + lv2 := cbt.Server(n2).NodeLiveness().(*liveness.NodeLiveness) + testutils.SucceedsSoon(t, func() error { + self, ok := lv.Self() + require.True(t, ok) + if self.IsLive(cbt.Server(n2).Clock().Now().GoTime()) { + // Someone else must have incremented epoch. + return nil + } + return lv2.IncrementEpoch(context.Background(), self) + }) } } - cbt.ManualClock.Forward(maxWT + 1) + return func() { for _, f := range fs { f() @@ -719,15 +852,32 @@ func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats boo } func (cbt *circuitBreakerTest) Send(idx int, req roachpb.Request) error { + cbt.t.Helper() return cbt.SendCtx(context.Background(), idx, req) - } func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb.Request) error { + return cbt.SendCtxTS(ctx, idx, req, cbt.repls[idx].Clock().Now()) +} + +func (cbt *circuitBreakerTest) SendCtxTS( + ctx context.Context, idx int, req roachpb.Request, ts hlc.Timestamp, +) error { + cbt.t.Helper() + ctx, finishAndGet := tracing.ContextWithRecordingSpan(ctx, cbt.repls[idx].Tracer, "SendCtx("+req.Method().String()+")") + defer time.AfterFunc(10*time.Second, func() { + rec := tracing.SpanFromContext(ctx).GetConfiguredRecording() + cbt.t.Logf("slow request: %s", rec) + }).Stop() + defer func() { + cbt.t.Helper() + rec := finishAndGet() + cbt.t.Logf("%s", rec) + }() var ba roachpb.BatchRequest repl := cbt.repls[idx] ba.RangeID = repl.Desc().RangeID - ba.Timestamp = repl.Clock().Now() + ba.Timestamp = ts ba.Add(req) if h := req.Header(); len(h.Key) == 0 { h.Key = repl.Desc().StartKey.AsRawKey() @@ -751,29 +901,20 @@ func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb if err := ctx.Err(); err != nil && parCtx.Err() == nil { pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr) } - { - var err error - repl.VisitBreakerContexts(func(ctx context.Context) { - if err == nil && ctx.Value(req) != nil { - err = errors.Errorf( - "request %s returned but context still tracked in breaker", req, - ) - } - }) - if err != nil { - pErr = roachpb.NewErrorf("%s; after %v", err, pErr) - } - } return pErr.GoError() } func (cbt *circuitBreakerTest) WriteDS(idx int) error { + cbt.t.Helper() put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put) } -func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb.Request) error { +func (cbt *circuitBreakerTest) sendViaDistSender( + ds *kvcoord.DistSender, req roachpb.Request, +) error { + cbt.t.Helper() var ba roachpb.BatchRequest ba.Add(req) ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) @@ -790,6 +931,7 @@ func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) { t.Helper() + t.Log(err) // We also accept an ambiguous result wrapping a breaker error; this occurs // when the breaker trips while a write is already inflight. if aErr := (*roachpb.AmbiguousResultError)(nil); errors.As(err, &aErr) && aErr.WrappedErr != nil { @@ -800,6 +942,7 @@ func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) { func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) { t.Helper() + t.Log(err) ok := errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) require.True(t, ok, "%+v", err) } @@ -812,13 +955,56 @@ func (cbt *circuitBreakerTest) SetSlowThreshold(dur time.Duration) { } func (cbt *circuitBreakerTest) Write(idx int) error { + cbt.t.Helper() repl := cbt.repls[idx] - put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) + cbt.seq++ + put := roachpb.NewPut( + repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString(fmt.Sprintf("hello-%d", cbt.seq)), + ) return cbt.Send(idx, put) } func (cbt *circuitBreakerTest) Read(idx int) error { + cbt.t.Helper() repl := cbt.repls[idx] get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) return cbt.Send(idx, get) } + +func (cbt *circuitBreakerTest) FollowerRead(idx int) error { + cbt.t.Helper() + repl := cbt.repls[idx] + get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) + ctx := context.Background() + ts := repl.GetClosedTimestamp(ctx) + return cbt.SendCtxTS(ctx, idx, get, ts) +} + +func (cbt *circuitBreakerTest) HeartbeatNodeLiveness(t *testing.T, idx int) { + // Retry loop is needed because heartbeat may race with internal heartbeat + // loop. + testutils.SucceedsSoon(t, func() error { + return cbt.Server(idx).HeartbeatNodeLiveness() + }) +} + +type replWithKnob struct { + *kvserver.Replica + setProbeEnabled func(bool) +} + +type logT interface { + Helper() + Logf(string, ...interface{}) +} + +type decoT struct { + logT +} + +func (t *decoT) Logf(format string, args ...interface{}) { + // It can be difficult to spot the actual failure among all of the + // traces, so this is a convenient place to annotate the logging + // (or disable it one-off). + t.logT.Logf("info:\n"+format, args...) +} diff --git a/pkg/kv/kvserver/concurrency/BUILD.bazel b/pkg/kv/kvserver/concurrency/BUILD.bazel index 4e28adfbb895..b895b75d5a91 100644 --- a/pkg/kv/kvserver/concurrency/BUILD.bazel +++ b/pkg/kv/kvserver/concurrency/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/concurrency/lock", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/intentresolver", "//pkg/kv/kvserver/spanlatch", "//pkg/kv/kvserver/spanset", @@ -54,6 +55,7 @@ go_test( deps = [ "//pkg/kv/kvserver/batcheval", "//pkg/kv/kvserver/concurrency/lock", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/intentresolver", "//pkg/kv/kvserver/spanlatch", "//pkg/kv/kvserver/spanset", diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index 883a83e6037c..c5d27ebe1f90 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -189,6 +190,15 @@ type RequestSequencer interface { // does so, it will not return a request guard. SequenceReq(context.Context, *Guard, Request, RequestEvalKind) (*Guard, Response, *Error) + // PoisonReq idempotently marks a Guard as poisoned, indicating that its + // latches may be held for an indefinite amount of time. Requests waiting on + // this Guard will be notified. Latch acquisitions under poison.Policy_Error + // react to this by failing with a poison.PoisonedError, while requests under + // poison.Policy_Wait continue waiting, but propagate the poisoning upwards. + // + // See poison.Policy for details. + PoisonReq(*Guard) + // FinishReq marks the request as complete, releasing any protection // the request had against conflicting requests and allowing conflicting // requests that are blocked on this one to proceed. The guard should not @@ -385,6 +395,9 @@ type Request struct { // with a WriteIntentError instead of entering the queue and waiting. MaxLockWaitQueueLength int + // The poison.Policy to use for this Request. + PoisonPolicy poison.Policy + // The individual requests in the batch. Requests []roachpb.RequestUnion @@ -464,9 +477,12 @@ type latchManager interface { // WaitFor waits for conflicting latches on the specified spans without adding // any latches itself. Fast path for operations that only require flushing out // old operations without blocking any new ones. - WaitFor(ctx context.Context, spans *spanset.SpanSet) *Error + WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy) *Error + + // Poison a guard's latches, allowing waiters to fail fast. + Poison(latchGuard) - // Releases latches, relinquish its protection from conflicting requests. + // Release a guard's latches, relinquish its protection from conflicting requests. Release(latchGuard) // Metrics returns information about the state of the latchManager. diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index c3ee4f1cd150..5bc0d2241494 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -230,7 +230,7 @@ func (m *managerImpl) sequenceReqWithGuard(ctx context.Context, g *Guard) (Respo // them. if shouldWaitOnLatchesWithoutAcquiring(g.Req) { log.Event(ctx, "waiting on latches without acquiring") - return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans) + return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans, g.Req.PoisonPolicy) } // Provide the manager with an opportunity to intercept the request. It @@ -382,6 +382,15 @@ func shouldWaitOnLatchesWithoutAcquiring(req Request) bool { return req.isSingle(roachpb.Barrier) } +// PoisonReq implements the RequestSequencer interface. +func (m *managerImpl) PoisonReq(g *Guard) { + // NB: g.lg == nil is the case for requests that ignore latches, see + // shouldIgnoreLatches. + if g.lg != nil { + m.lm.Poison(g.lg) + } +} + // FinishReq implements the RequestSequencer interface. func (m *managerImpl) FinishReq(g *Guard) { // NOTE: we release latches _before_ exiting lock wait-queues deliberately. diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index e533b5174872..cfe06d29e982 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -54,9 +54,10 @@ import ( // The input files use the following DSL: // // new-txn name= ts=[,] epoch= [uncertainty-limit=[,]] -// new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] [lock-timeout] [max-lock-wait-queue-length=] +// new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] [lock-timeout] [max-lock-wait-queue-length=] [poison-policy=[err|wait]] // [=...] (hint: see scanSingleRequest) // sequence req= [eval-kind= // finish req= // // handle-write-intent-error req= txn= key= lease-seq= @@ -169,6 +170,8 @@ func TestConcurrencyManagerBasic(t *testing.T) { d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) } + pp := scanPoisonPolicy(t, d) + // Each roachpb.Request is provided on an indented line. reqs, reqUnions := scanRequests(t, d, c) latchSpans, lockSpans := c.collectSpans(t, txn, ts, reqs) @@ -184,6 +187,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { Requests: reqUnions, LatchSpans: latchSpans, LockSpans: lockSpans, + PoisonPolicy: pp, } return "" @@ -257,6 +261,20 @@ func TestConcurrencyManagerBasic(t *testing.T) { }) return c.waitAndCollect(t, mon) + case "poison": + var reqName string + d.ScanArgs(t, "req", &reqName) + guard, ok := c.guardsByReqName[reqName] + if !ok { + d.Fatalf(t, "unknown request: %s", reqName) + } + + opName := fmt.Sprintf("poison %s", reqName) + mon.runSync(opName, func(ctx context.Context) { + log.Event(ctx, "poisoning request") + m.PoisonReq(guard) + }) + return c.waitAndCollect(t, mon) case "handle-write-intent-error": var reqName string d.ScanArgs(t, "req", &reqName) diff --git a/pkg/kv/kvserver/concurrency/datadriven_util_test.go b/pkg/kv/kvserver/concurrency/datadriven_util_test.go index e9e8f9483a33..11067bce44c8 100644 --- a/pkg/kv/kvserver/concurrency/datadriven_util_test.go +++ b/pkg/kv/kvserver/concurrency/datadriven_util_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -75,6 +76,24 @@ func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.Wa } } +func scanPoisonPolicy(t *testing.T, d *datadriven.TestData) poison.Policy { + const key = "poison-policy" + if !d.HasArg(key) { + return poison.Policy_Error + } + var policy string + d.ScanArgs(t, key, &policy) + switch policy { + case "error": + return poison.Policy_Error + case "wait": + return poison.Policy_Wait + default: + d.Fatalf(t, "unknown poison policy: %s", policy) + return 0 + } +} + func scanSingleRequest( t *testing.T, d *datadriven.TestData, line string, txns map[string]*roachpb.Transaction, ) roachpb.Request { diff --git a/pkg/kv/kvserver/concurrency/latch_manager.go b/pkg/kv/kvserver/concurrency/latch_manager.go index b0a4b8eb1073..b0f07057344b 100644 --- a/pkg/kv/kvserver/concurrency/latch_manager.go +++ b/pkg/kv/kvserver/concurrency/latch_manager.go @@ -13,6 +13,7 @@ package concurrency import ( "context" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -24,7 +25,7 @@ type latchManagerImpl struct { } func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard, *Error) { - lg, err := m.m.Acquire(ctx, req.LatchSpans) + lg, err := m.m.Acquire(ctx, req.LatchSpans, req.PoisonPolicy) if err != nil { return nil, roachpb.NewError(err) } @@ -32,7 +33,7 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard } func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard { - lg := m.m.AcquireOptimistic(req.LatchSpans) + lg := m.m.AcquireOptimistic(req.LatchSpans, req.PoisonPolicy) return lg } @@ -50,14 +51,20 @@ func (m *latchManagerImpl) WaitUntilAcquired( return lg, nil } -func (m *latchManagerImpl) WaitFor(ctx context.Context, ss *spanset.SpanSet) *Error { - err := m.m.WaitFor(ctx, ss) +func (m *latchManagerImpl) WaitFor( + ctx context.Context, ss *spanset.SpanSet, pp poison.Policy, +) *Error { + err := m.m.WaitFor(ctx, ss, pp) if err != nil { return roachpb.NewError(err) } return nil } +func (m *latchManagerImpl) Poison(lg latchGuard) { + m.m.Poison(lg.(*spanlatch.Guard)) +} + func (m *latchManagerImpl) Release(lg latchGuard) { m.m.Release(lg.(*spanlatch.Guard)) } diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index c78ec55fb09d..b5a1e30ce159 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -870,7 +871,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { // cancellation, the code makes sure to release latches when returning // early due to error. Otherwise other requests will get stuck and // group.Wait() will not return until the test times out. - lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans) + lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans, poison.Policy_Error) if err != nil { return err } @@ -1414,7 +1415,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { var err error firstIter := true for { - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil { doneCh <- err return } @@ -1449,7 +1450,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { return } // Release locks. - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil { doneCh <- err return } diff --git a/pkg/kv/kvserver/concurrency/poison/BUILD.bazel b/pkg/kv/kvserver/concurrency/poison/BUILD.bazel new file mode 100644 index 000000000000..fa267c099677 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/BUILD.bazel @@ -0,0 +1,44 @@ +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "poison_proto", + srcs = [ + "error.proto", + "policy.proto", + ], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb:roachpb_proto", + "//pkg/util/hlc:hlc_proto", + "@com_github_gogo_protobuf//gogoproto:gogo_proto", + ], +) + +go_proto_library( + name = "poison_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison", + proto = ":poison_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/util/hlc", + "@com_github_gogo_protobuf//gogoproto", + ], +) + +go_library( + name = "poison", + srcs = ["error.go"], + embed = [":poison_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", + ], +) diff --git a/pkg/kv/kvserver/concurrency/poison/error.go b/pkg/kv/kvserver/concurrency/poison/error.go new file mode 100644 index 000000000000..3c473fc40a50 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/error.go @@ -0,0 +1,42 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package poison + +import ( + "fmt" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// NewPoisonedError instantiates a *PoisonedError referencing a poisoned latch +// (as identified by span and timestamp). +func NewPoisonedError(span roachpb.Span, ts hlc.Timestamp) *PoisonedError { + return &PoisonedError{Span: span, Timestamp: ts} +} + +var _ errors.SafeFormatter = (*PoisonedError)(nil) +var _ fmt.Formatter = (*PoisonedError)(nil) + +// SafeFormatError implements errors.SafeFormatter. +func (e *PoisonedError) SafeFormatError(p errors.Printer) error { + p.Printf("encountered poisoned latch %s@%s", e.Span, e.Timestamp) + return nil +} + +// Format implements fmt.Formatter. +func (e *PoisonedError) Format(s fmt.State, verb rune) { errors.FormatError(e, s, verb) } + +// Error implements error. +func (e *PoisonedError) Error() string { + return fmt.Sprint(e) +} diff --git a/pkg/kv/kvserver/concurrency/poison/error.proto b/pkg/kv/kvserver/concurrency/poison/error.proto new file mode 100644 index 000000000000..3ebe47586f81 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/error.proto @@ -0,0 +1,26 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.kv.kvserver.concurrency.poison; +option go_package = "poison"; + +import "util/hlc/timestamp.proto"; +import "roachpb/data.proto"; +import "gogoproto/gogo.proto"; + +// PoisonedError indicates that a request failed fast during sequencing as a +// result of having encountered a poisoned latch under Policy_Error. +// +// See also concurrency.RequestSequencer. +message PoisonedError { + roachpb.Span span = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; +} diff --git a/pkg/kv/kvserver/concurrency/poison/policy.proto b/pkg/kv/kvserver/concurrency/poison/policy.proto new file mode 100644 index 000000000000..5f3371cbf281 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/policy.proto @@ -0,0 +1,35 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.kv.kvserver.concurrency.poison; +option go_package = "poison"; + +import "gogoproto/gogo.proto"; + +// Policy determines how a request will react to encountering a poisoned +// latch. A poisoned latch is a latch for which the holder is unable to make +// progress. That is, waiters of this latch should not expect to be able to +// acquire this latch "for some time"; in practice this is the case of an +// unavailable Replica. +// +// The name is inspired by Rust's mutexes, which undergo poisoning[^1] when a +// thread panics while holding the mutex. +// +// [^1]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#poisoning +enum Policy { + // Policy_Wait instructs a request to return an error upon encountering + // a poisoned latch. + Wait = 0; + + // Policy_Error instructs a request to return an error upon encountering + // a poisoned latch. + Error = 1; +} diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err new file mode 100644 index 000000000000..2fffae01a743 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err @@ -0,0 +1,60 @@ +# This test sets up the following situation: +# +# e <- put (PoisonPolicyErr; waiting) +# b---f <- scan (PoisonPolicyErr; waiting) +# c <- put (PoisonPolicyErr; sequenced, poisoned) +# +# Since everyone uses PoisonPolicyErr, the chain unwinds. However, only `b---f` +# gets an error, since it overlaps `c`. `e` can proceed once `c` and `b---f` +# have finished. + +new-request txn=none name=putc ts=10,0 + put key=c value=hi +---- + +sequence req=putc +---- +[1] sequence putc: sequencing request +[1] sequence putc: acquiring latches +[1] sequence putc: scanning lock table for conflicting locks +[1] sequence putc: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=pute ts=11,0 + put key=e value=hi +---- + +sequence req=pute +---- +[3] sequence pute: sequencing request +[3] sequence pute: acquiring latches +[3] sequence pute: waiting to acquire write latch e@11.000000000,0, held by read latch {b-f}@11.000000000,1 +[3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=putc +---- +[-] poison putc: poisoning request +[2] sequence readbf: sequencing complete, returned error: encountered poisoned latch c@10.000000000,0 +[3] sequence pute: scanning lock table for conflicting locks +[3] sequence pute: sequencing complete, returned guard + +finish req=putc +---- +[-] finish putc: finishing request + +finish req=pute +---- +[-] finish pute: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect new file mode 100644 index 000000000000..2615d3524c16 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect @@ -0,0 +1,61 @@ +# This test sets up the following situation: +# +# e <- put (PoisonPolicyError; waiting) +# b---f <- scan (PoisonPolicyWait; waiting) +# c <- put (PoisonPolicyWait; sequenced, poisoned) +# +# When `c` gets poisoned (and continues waiting), the same +# happens to `b---f`, which prompts `e` to fail fast. + +new-request txn=none name=putc ts=10,0 poison-policy=wait + put key=c value=hi +---- + +sequence req=putc +---- +[1] sequence putc: sequencing request +[1] sequence putc: acquiring latches +[1] sequence putc: scanning lock table for conflicting locks +[1] sequence putc: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 poison-policy=wait + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=pute ts=11,0 + put key=e value=hi +---- + +sequence req=pute +---- +[3] sequence pute: sequencing request +[3] sequence pute: acquiring latches +[3] sequence pute: waiting to acquire write latch e@11.000000000,0, held by read latch {b-f}@11.000000000,1 +[3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=putc +---- +[-] poison putc: poisoning request +[2] sequence readbf: encountered poisoned latch; continuing to wait +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal +[3] sequence pute: sequencing complete, returned error: encountered poisoned latch {b-f}@11.000000000,1 + +finish req=putc +---- +[-] finish putc: finishing request +[2] sequence readbf: scanning lock table for conflicting locks +[2] sequence readbf: sequencing complete, returned guard + +finish req=readbf +---- +[-] finish readbf: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint new file mode 100644 index 000000000000..bf7646ff84b6 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint @@ -0,0 +1,59 @@ +# This test sets up the following situation: +# +# e <- put (PoisonPolicyWait; waiting) +# b---f <- scan (PoisonPolicyError; waiting) +# c <- put (PoisonPolicyWait; sequenced, poisoned) +# +# The top and bottom request use PoisonPolicyWait, so the scan returns when `c` +# is poisoned, which in turn lets `e` through. However, `c` continues to wait. + +new-request txn=none name=putc ts=10,0 poison-policy=wait + put key=c value=hi +---- + +sequence req=putc +---- +[1] sequence putc: sequencing request +[1] sequence putc: acquiring latches +[1] sequence putc: scanning lock table for conflicting locks +[1] sequence putc: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=pute ts=11,0 poison-policy=wait + put key=e value=hi +---- + +sequence req=pute +---- +[3] sequence pute: sequencing request +[3] sequence pute: acquiring latches +[3] sequence pute: waiting to acquire write latch e@11.000000000,0, held by read latch {b-f}@11.000000000,1 +[3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=putc +---- +[-] poison putc: poisoning request +[2] sequence readbf: sequencing complete, returned error: encountered poisoned latch c@10.000000000,0 +[3] sequence pute: scanning lock table for conflicting locks +[3] sequence pute: sequencing complete, returned guard + +finish req=putc +---- +[-] finish putc: finishing request + +finish req=pute +---- +[-] finish pute: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping new file mode 100644 index 000000000000..4e8799dde333 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping @@ -0,0 +1,61 @@ +# This test sets up the following situation: +# +# c <- put (PoisonPolicyWait; waiting) +# b---f <- scan (PoisonPolicyError; waiting) +# c <- put (PoisonPolicyWait; sequenced, poisoned) +# +# When the bottom `c` is poisoned, `b---f` fails fast, and +# the top `c` poisons itself but continues to wait. + +new-request txn=none name=put1 ts=10,0 poison-policy=wait + put key=c value=hi +---- + +sequence req=put1 +---- +[1] sequence put1: sequencing request +[1] sequence put1: acquiring latches +[1] sequence put1: scanning lock table for conflicting locks +[1] sequence put1: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=put2 ts=11,0 poison-policy=wait + put key=c value=bar +---- + +sequence req=put2 +---- +[3] sequence put2: sequencing request +[3] sequence put2: acquiring latches +[3] sequence put2: waiting to acquire write latch c@11.000000000,0, held by write latch c@10.000000000,0 +[3] sequence put2: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=put1 +---- +[-] poison put1: poisoning request +[2] sequence readbf: sequencing complete, returned error: encountered poisoned latch c@10.000000000,0 +[3] sequence put2: encountered poisoned latch; continuing to wait +[3] sequence put2: blocked on select in spanlatch.(*Manager).waitForSignal + +finish req=put1 +---- +[-] finish put1: finishing request +[3] sequence put2: scanning lock table for conflicting locks +[3] sequence put2: sequencing complete, returned guard + +finish req=put2 +---- +[-] finish put2: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index a6e92183026d..773cf9a0e65d 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -227,13 +227,6 @@ func (r *Replica) Breaker() *circuit2.Breaker { return r.breaker.wrapped } -func (r *Replica) VisitBreakerContexts(fn func(ctx context.Context)) { - r.breaker.cancels.Visit(func(ctx context.Context, _ func()) (remove bool) { - fn(ctx) - return false // keep - }) -} - func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) { r.raftMu.Lock() defer r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_backpressure.go b/pkg/kv/kvserver/replica_backpressure.go index 3ae46e823c95..8ac038879425 100644 --- a/pkg/kv/kvserver/replica_backpressure.go +++ b/pkg/kv/kvserver/replica_backpressure.go @@ -108,17 +108,19 @@ func canBackpressureBatch(ba *roachpb.BatchRequest) bool { return false } -// bypassReplicaCircuitBreakerForBatch returns whether the provided -// BatchRequest bypasses the per-Replica circuit breaker. This is the -// case if any request in the batch is requesting to do so. -func bypassReplicaCircuitBreakerForBatch(ba *roachpb.BatchRequest) bool { +// signallerForBatch returns the signaller to use for this batch. This is the +// Replica's breaker's signaller except if any request in the batch uses +// poison.Policy_Wait, in which case it's a neverTripSignaller. In particular, +// `(signaller).C() == nil` signals that the request bypasses the circuit +// breakers. +func (r *Replica) signallerForBatch(ba *roachpb.BatchRequest) signaller { for _, ru := range ba.Requests { req := ru.GetInner() if roachpb.BypassesReplicaCircuitBreaker(req) { - return true + return neverTripSignaller{} } } - return false + return r.breaker.Signal() } // shouldBackpressureWrites returns whether writes to the range should be diff --git a/pkg/kv/kvserver/replica_circuit_breaker.go b/pkg/kv/kvserver/replica_circuit_breaker.go index e64ddb75f3a9..05d8e8ee5957 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker.go +++ b/pkg/kv/kvserver/replica_circuit_breaker.go @@ -38,6 +38,7 @@ type replicaInCircuitBreaker interface { Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) slowReplicationThreshold(ba *roachpb.BatchRequest) (time.Duration, bool) replicaUnavailableError() error + poisonInflightLatches(err error) } var defaultReplicaCircuitBreakerSlowReplicationThreshold = envutil.EnvOrDefaultDuration( @@ -74,127 +75,15 @@ type replicaCircuitBreaker struct { stopper *stop.Stopper r replicaInCircuitBreaker st *cluster.Settings - cancels CancelStorage wrapped *circuit.Breaker versionIsActive int32 // atomic } -// Register takes a cancelable context and its cancel function (which the caller -// must cancel when the request has finished), and registers them with the -// circuit breaker. If the breaker is already tripped, its error is returned -// immediately and the caller should not continue processing the request. -// Otherwise, the cancel function is invoked if the breaker trips. The caller is -// provided with a token and signaller for use in a call to -// UnregisterAndAdjustError upon request completion. That method also takes the -// error (if any) resulting from the request to ensure that in the case of a -// tripped breaker, the error reflects this fact. -func (br *replicaCircuitBreaker) Register( - ctx context.Context, cancel func(), -) (_token interface{}, _ signaller, _ error) { - brSig := br.Signal() - - // TODO(tbg): we may want to exclude more requests from this check, or allow - // requests to exclude themselves from the check (via their header). This - // latter mechanism could also replace hasBypassCircuitBreakerMarker. - if hasBypassCircuitBreakerMarker(ctx) { - // NB: brSig.C() == nil. - brSig = neverTripSignaller{} - } - - if brSig.C() == nil { - // Circuit breakers are disabled and/or this is a probe request, so don't do - // any work registering the context. UnregisterAndAdjustError will know that we didn't - // since it checks the same brSig for a nil C(). - return ctx, brSig, nil - } - - // NB: it might be tempting to check the breaker error first to avoid the call - // to Set below if the breaker is tripped at this point. However, the ordering - // here, subtly, is required to avoid situations in which the cancel is still - // in the map despite the probe having shut down (in which case cancel will - // not be invoked until the probe is next triggered, which maybe "never"). - // - // To see this, consider the case in which the breaker is initially not - // tripped when we check, but then trips immediately and has the probe fail - // (and terminate). Since the probe is in charge of cancelling all tracked - // requests, we must ensure that this probe sees our request. Adding the - // request prior to calling Signal() means that if we see an untripped - // breaker, no probe is running - consequently should the breaker then trip, - // it will observe our cancel, thus avoiding a leak. If we observe a tripped - // breaker, we also need to remove our own cancel, as the probe may already - // have passed the point at which it iterates through the cancels prior to us - // inserting it. The cancel may be invoked twice, but that's ok. - // - // See TestReplicaCircuitBreaker_NoCancelRace. - tok := br.cancels.Set(ctx, cancel) - if err := brSig.Err(); err != nil { - br.cancels.Del(tok) - cancel() - return nil, nil, err - } - - return tok, brSig, nil -} - -// UnregisterAndAdjustError releases a tracked cancel function upon request -// completion. The error resulting from the request is passed in to allow -// decorating it in case the breaker tripped while the request was in-flight. -// -// See Register. -func (br *replicaCircuitBreaker) UnregisterAndAdjustError( - tok interface{}, sig signaller, pErr *roachpb.Error, -) *roachpb.Error { - if sig.C() == nil { - // Breakers were disabled and we never put the cancel in the registry. - return pErr - } - - br.cancels.Del(tok) - - brErr := sig.Err() - if pErr == nil || brErr == nil { - return pErr - } - - // The breaker tripped and the command is returning an error. Make sure the - // error reflects the tripped breaker. - - err := pErr.GoError() - if ae := (&roachpb.AmbiguousResultError{}); errors.As(err, &ae) { - // The breaker tripped while a command was inflight, so we have to - // propagate an ambiguous result. We don't want to replace it, but there - // is a way to stash an Error in it so we use that. - // - // TODO(tbg): could also wrap it; there is no other write to WrappedErr - // in the codebase and it might be better to remove it. Nested *Errors - // are not a good idea. - wrappedErr := brErr - if ae.WrappedErr != nil { - wrappedErr = errors.Wrapf(brErr, "%v", ae.WrappedErr) - } - ae.WrappedErr = roachpb.NewError(wrappedErr) - return roachpb.NewError(ae) - } else if le := (&roachpb.NotLeaseHolderError{}); errors.As(err, &le) { - // When a lease acquisition triggered by this request is short-circuited - // by the breaker, it will return an opaque NotLeaseholderError, which we - // replace with the breaker's error. - return roachpb.NewError(errors.CombineErrors(brErr, le)) - } - return pErr -} - func (br *replicaCircuitBreaker) HasMark(err error) bool { return br.wrapped.HasMark(err) } -func (br *replicaCircuitBreaker) cancelAllTrackedContexts() { - br.cancels.Visit(func(ctx context.Context, cancel func()) (remove bool) { - cancel() - return true // remove - }) -} - func (br *replicaCircuitBreaker) canEnable() bool { b := atomic.LoadInt32(&br.versionIsActive) == 1 if b { @@ -252,7 +141,6 @@ func newReplicaCircuitBreaker( stopper *stop.Stopper, ambientCtx log.AmbientContext, r replicaInCircuitBreaker, - s CancelStorage, onTrip func(), onReset func(), ) *replicaCircuitBreaker { @@ -262,8 +150,6 @@ func newReplicaCircuitBreaker( r: r, st: cs, } - br.cancels = s - br.cancels.Reset() br.wrapped = circuit.NewBreaker(circuit.Options{ Name: "breaker", // log bridge has ctx tags AsyncProbe: br.asyncProbe, @@ -299,16 +185,6 @@ func (r replicaCircuitBreakerLogger) OnReset(br *circuit.Breaker) { r.EventHandler.OnReset(br) } -type probeKey struct{} - -func hasBypassCircuitBreakerMarker(ctx context.Context) bool { - return ctx.Value(probeKey{}) != nil -} - -func withBypassCircuitBreakerMarker(ctx context.Context) context.Context { - return context.WithValue(ctx, probeKey{}, probeKey{}) -} - func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { bgCtx := br.ambCtx.AnnotateCtx(context.Background()) if err := br.stopper.RunAsyncTask(bgCtx, "replica-probe", func(ctx context.Context) { @@ -319,13 +195,19 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { return } - // First, tell all current requests to fail fast. Note that clients insert - // first, then check the breaker (and remove themselves if breaker already - // tripped then). This prevents any cancels from sneaking in after the probe - // gets past this point, which could otherwise leave cancels hanging until - // "something" triggers the next probe (which may be never if no more traffic - // arrives at the Replica). See Register. - br.cancelAllTrackedContexts() + brErr := br.Signal().Err() + if brErr == nil { + // This shouldn't happen, but if we're not even tripped, don't do + // anything. + return + } + + // Poison any inflight latches. Note that any new request that is added in + // while the probe is running but after poisonInflightLatches has been + // invoked will remain untouched. We rely on the replica to periodically + // access the circuit breaker to trigger additional probes in that case. + // (This happens in refreshProposalsLocked). + br.r.poisonInflightLatches(brErr) err := sendProbe(ctx, br.r) report(err) }); err != nil { @@ -334,10 +216,9 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { } func sendProbe(ctx context.Context, r replicaInCircuitBreaker) error { - // NB: we don't need to put this marker since ProbeRequest has the - // canBypassReplicaCircuitBreaker flag, but if in the future we do - // additional work in this method we may need it. - ctx = withBypassCircuitBreakerMarker(ctx) + // NB: ProbeRequest has the bypassesCircuitBreaker flag. If in the future we + // enhance the probe, we may need to allow any additional requests we send to + // chose to bypass the circuit breaker explicitly. desc := r.Desc() if !desc.IsInitialized() { return nil diff --git a/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go b/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go deleted file mode 100644 index c8f46bcffa16..000000000000 --- a/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvserver - -import ( - "context" - "sync" - "unsafe" - - "github.com/cockroachdb/cockroach/pkg/util/syncutil" -) - -// CancelStorage implements tracking of context cancellation functions -// for use by Replica circuit breakers. -type CancelStorage interface { - // Reset initializes the storage. Not thread safe. - Reset() - // Set adds context and associated cancel func to the storage. Returns a token - // that can be passed to Del. - // - // Set is thread-safe. - Set(_ context.Context, cancel func()) (token interface{}) - // Del removes a cancel func, as identified by the token returned from Set. - // - // Del is thread-safe. - Del(token interface{}) - // Visit invokes the provided closure with each (context,cancel) pair currently - // present in the storage. Items for which the visitor returns true are removed - // from the storage. - // - // Visit is thread-safe, but it is illegal to invoke methods of the - // CancelStorage from within the visitor. - Visit(func(context.Context, func()) (remove bool)) -} - -type cancelToken struct { - ctx context.Context -} - -func (tok *cancelToken) fasthash() int { - // From https://github.com/taylorza/go-lfsr/blob/7ec2b93980f950da1e36c6682771e6fe14c144c2/lfsr.go#L46-L48. - s := int(uintptr(unsafe.Pointer(tok))) - b := (s >> 0) ^ (s >> 2) ^ (s >> 3) ^ (s >> 4) - return (s >> 1) | (b << 7) -} - -var cancelTokenPool = sync.Pool{ - New: func() interface{} { return &cancelToken{} }, -} - -type mapCancelShard struct { - syncutil.Mutex - m map[*cancelToken]func() -} - -// A MapCancelStorage implements CancelStorage via shards of mutex-protected -// maps. -type MapCancelStorage struct { - NumShards int - sl []*mapCancelShard -} - -// Reset implements CancelStorage. -func (m *MapCancelStorage) Reset() { - if m.NumShards == 0 { - m.NumShards = 1 - } - m.sl = make([]*mapCancelShard, m.NumShards) - for i := range m.sl { - s := &mapCancelShard{} - s.m = map[*cancelToken]func(){} - m.sl[i] = s - } -} - -// Set implements CancelStorage. -func (m *MapCancelStorage) Set(ctx context.Context, cancel func()) interface{} { - tok := cancelTokenPool.Get().(*cancelToken) - tok.ctx = ctx - shard := m.sl[tok.fasthash()%len(m.sl)] - shard.Lock() - shard.m[tok] = cancel - shard.Unlock() - return tok -} - -// Del implements CancelStorage. -func (m *MapCancelStorage) Del(tok interface{}) { - ttok := tok.(*cancelToken) - shard := m.sl[ttok.fasthash()%len(m.sl)] - shard.Lock() - delete(shard.m, tok.(*cancelToken)) - shard.Unlock() -} - -// Visit implements CancelStorage. -func (m *MapCancelStorage) Visit(fn func(context.Context, func()) (remove bool)) { - for _, shard := range m.sl { - shard.Lock() - for tok, cancel := range shard.m { - if fn(tok.ctx, cancel) { - delete(shard.m, tok) - } - } - shard.Unlock() - } -} diff --git a/pkg/kv/kvserver/replica_circuit_breaker_test.go b/pkg/kv/kvserver/replica_circuit_breaker_test.go index c7b6029b7d70..b056713c36a2 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/replica_circuit_breaker_test.go @@ -11,27 +11,15 @@ package kvserver import ( - "context" - "fmt" - "math/rand" - "runtime" - "strconv" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" - "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" ) @@ -52,151 +40,3 @@ func TestReplicaUnavailableError(t *testing.T) { err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs) echotest.Require(t, string(redact.Sprint(err)), testutils.TestDataPath(t, "replica_unavailable_error.txt")) } - -type circuitBreakerReplicaMock struct { - clock *hlc.Clock -} - -func (c *circuitBreakerReplicaMock) Clock() *hlc.Clock { - return c.clock -} - -func (c *circuitBreakerReplicaMock) Desc() *roachpb.RangeDescriptor { - return &roachpb.RangeDescriptor{} -} - -func (c *circuitBreakerReplicaMock) Send( - ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - return ba.CreateReply(), nil -} - -func (c *circuitBreakerReplicaMock) slowReplicationThreshold( - ba *roachpb.BatchRequest, -) (time.Duration, bool) { - return 0, false -} - -func (c *circuitBreakerReplicaMock) replicaUnavailableError() error { - return errors.New("unavailable") -} - -// This test verifies that when the breaker trips and untrips again, -// there is no scenario under which the request's cancel leaks. -func TestReplicaCircuitBreaker_NoCancelRace(t *testing.T) { - defer leaktest.AfterTest(t)() - br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - defer stopper.Stop(ctx) - - g := ctxgroup.WithContext(ctx) - const count = 100 - for i := 0; i < count; i++ { - i := i // for goroutine - g.GoCtx(func(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - tok, sig, err := br.Register(ctx, cancel) - if err != nil { - _ = err // ignoring intentionally - return nil - } - if i == count/2 { - br.TripAsync() // probe will succeed - } - runtime.Gosched() - time.Sleep(time.Duration(rand.Intn(int(time.Millisecond)))) - var pErr *roachpb.Error - if i%2 == 0 { - pErr = roachpb.NewErrorf("boom") - } - _ = br.UnregisterAndAdjustError(tok, sig, pErr) - return nil - }) - } - require.NoError(t, g.Wait()) - var n int - br.cancels.Visit(func(ctx context.Context, _ func()) (remove bool) { - n++ - return false // keep - }) - require.Zero(t, n, "found tracked requests") -} - -func TestReplicaCircuitBreaker_Register(t *testing.T) { - defer leaktest.AfterTest(t)() - br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") - defer stopper.Stop(context.Background()) - ctx := withBypassCircuitBreakerMarker(context.Background()) - tok, sig, err := br.Register(ctx, func() {}) - require.NoError(t, err) - defer br.UnregisterAndAdjustError(tok, sig, nil /* pErr */) - require.Zero(t, sig.C()) - var n int - br.cancels.Visit(func(ctx context.Context, f func()) (remove bool) { - n++ - return false // keep - }) - require.Zero(t, n, "probe context got added to CancelStorage") -} - -func setupCircuitBreakerTest(t testing.TB, cs string) (*replicaCircuitBreaker, *stop.Stopper) { - st := cluster.MakeTestingClusterSettings() - // Enable circuit breakers. - replicaCircuitBreakerSlowReplicationThreshold.Override(context.Background(), &st.SV, time.Hour) - r := &circuitBreakerReplicaMock{clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond)} - var numShards int - { - _, err := fmt.Sscanf(cs, "mutexmap-%d", &numShards) - require.NoError(t, err) - } - s := &MapCancelStorage{NumShards: numShards} - onTrip := func() {} - onReset := func() {} - stopper := stop.NewStopper() - br := newReplicaCircuitBreaker(st, stopper, log.AmbientContext{}, r, s, onTrip, onReset) - return br, stopper -} - -func BenchmarkReplicaCircuitBreaker_Register(b *testing.B) { - defer leaktest.AfterTest(b)() - - for _, enabled := range []bool{false, true} { - b.Run("enabled="+strconv.FormatBool(enabled), func(b *testing.B) { - dss := []string{ - "mutexmap-1", "mutexmap-2", "mutexmap-4", "mutexmap-8", "mutexmap-12", "mutexmap-16", - "mutexmap-20", "mutexmap-24", "mutexmap-32", "mutexmap-64", - } - if !enabled { - dss = dss[:1] - } - for _, ds := range dss { - b.Run(ds, func(b *testing.B) { - b.ReportAllocs() - br, stopper := setupCircuitBreakerTest(b, ds) - defer stopper.Stop(context.Background()) - - var dur time.Duration - if enabled { - dur = time.Hour - } - replicaCircuitBreakerSlowReplicationThreshold.Override(context.Background(), &br.st.SV, dur) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - ctx, cancel := context.WithCancel(context.Background()) - tok, sig, err := br.Register(ctx, cancel) - if err != nil { - b.Error(err) - } - if pErr := br.UnregisterAndAdjustError(tok, sig, nil); pErr != nil { - b.Error(pErr) - } - cancel() - } - }) - }) - } - }) - } -} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 2545b21644d7..5f7e1629d379 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -137,14 +137,8 @@ func newUnloadedReplica( onReset := func() { store.Metrics().ReplicaCircuitBreakerCurTripped.Dec(1) } - var cancelStorage CancelStorage - if f := r.store.cfg.TestingKnobs.CancelStorageFactory; f != nil { - cancelStorage = f() - } else { - cancelStorage = &MapCancelStorage{} - } r.breaker = newReplicaCircuitBreaker( - store.cfg.Settings, store.stopper, r.AmbientContext, r, cancelStorage, onTrip, onReset, + store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset, ) return r } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index bbef1c9cfa95..f958db11f9ea 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -158,6 +158,16 @@ func (proposal *ProposalData) signalProposalResult(pr proposalResult) { if proposal.doneCh != nil { proposal.doneCh <- pr proposal.doneCh = nil + // Need to remove any span from the proposal, as the signalled caller + // will likely finish it, and if we then end up applying this proposal + // we'll try to make a ChildSpan off `proposal.ctx` and this will + // trigger the Span use-after-finish assertions. + // + // See: https://github.com/cockroachdb/cockroach/pull/76858#issuecomment-1048179588 + // + // NB: `proposal.ec.repl` might already have been cleared if we arrive here + // through finishApplication. + proposal.ctx = context.Background() } } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 68cf20533c0b..8193e6710c30 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -1219,7 +1220,12 @@ func (r *Replica) refreshProposalsLocked( // which could avoid build-up of raft log entries during outages, see // for example: // https://github.com/cockroachdb/cockroach/issues/60612 - if r.breaker.Signal().Err() == nil && maxSlowProposalDuration > 0 { + // + // NB: the call to Err() here also re-triggers the probe if the breaker is + // already tripped and no probe is running, thus ensuring that even if a + // request got added in while the probe was about to shut down, there will + // be regular attempts at healing the breaker. + if maxSlowProposalDuration > 0 && r.breaker.Signal().Err() == nil { log.Warningf(ctx, "have been waiting %.2fs for slow proposal %s", maxSlowProposalDuration.Seconds(), maxSlowProposalDurationRequest, @@ -1256,6 +1262,23 @@ func (r *Replica) refreshProposalsLocked( } } +func (r *Replica) poisonInflightLatches(err error) { + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() + for _, p := range r.mu.proposals { + p.ec.poison() + if p.ec.g.Req.PoisonPolicy == poison.Policy_Error { + aErr := roachpb.NewAmbiguousResultError("circuit breaker tripped") + aErr.WrappedErr = roachpb.NewError(err) + // NB: this does not release the request's latches. It's important that + // the latches stay in place, since the command could still apply. + p.signalProposalResult(proposalResult{Err: roachpb.NewError(aErr)}) + } + } +} + // maybeCoalesceHeartbeat returns true if the heartbeat was coalesced and added // to the appropriate queue. func (r *Replica) maybeCoalesceHeartbeat( diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 46dab0b90d50..6aad6682d460 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -318,20 +318,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // coalesced requests timeout/cancel. p.cancelLocked (defined below) is the // cancel function that must be called; calling just cancel is insufficient. ctx := p.repl.AnnotateCtx(context.Background()) - if hasBypassCircuitBreakerMarker(ctx) { - // If the caller bypasses the circuit breaker, allow the lease to do the - // same. Otherwise, the lease will be refused by the circuit breaker as - // well. - // - // Note that there is a tiny race: if a request is in flight, but the - // request that triggered it (i.e. parentCtx here) does *not* bypass the - // probe, and before the circuit breaker rejects the inflight lease another - // request that *does* want to bypass the probe joins the request, it too - // will receive the circuit breaker error. This is special-cased in - // `redirectOnOrAcquireLease`, where such a caller needs to retry instead of - // propagating the error. - ctx = withBypassCircuitBreakerMarker(ctx) - } + const opName = "request range lease" tr := p.repl.AmbientContext.Tracer tagsOpt := tracing.WithLogTags(logtags.FromContext(parentCtx)) @@ -452,12 +439,26 @@ func (p *pendingLeaseRequest) requestLeaseAsync( if pErr == nil { // The Replica circuit breakers together with round-tripping a ProbeRequest // here before asking for the lease could provide an alternative, simpler - // solution to the the below issue: + // solution to the below issue: // // https://github.com/cockroachdb/cockroach/issues/37906 ba := roachpb.BatchRequest{} ba.Timestamp = p.repl.store.Clock().Now() ba.RangeID = p.repl.RangeID + // NB: + // RequestLease always bypasses the circuit breaker (i.e. will prefer to + // get stuck on an unavailable range rather than failing fast; see + // `(*RequestLeaseRequest).flags()`). This enables the caller to chose + // between either behavior for themselves: if they too want to bypass + // the circuit breaker, they simply don't check for the circuit breaker + // while waiting for their lease handle. If they want to fail-fast, they + // do. If the lease instead adopted the caller's preference, we'd have + // to handle the case of multiple preferences joining onto one lease + // request, which is more difficult. + // + // TransferLease will observe the circuit breaker, as transferring a + // lease when the range is unavailable results in, essentially, giving + // up on the lease and thus worsening the situation. ba.Add(leaseReq) _, pErr = p.repl.Send(ctx, ba) } @@ -779,6 +780,7 @@ func (r *Replica) requestLeaseLocked( newNotLeaseHolderError(roachpb.Lease{}, r.store.StoreID(), r.mu.state.Desc, "refusing to take the lease; node is draining"))) } + // Propose a Raft command to get a lease for this replica. repDesc, err := r.getReplicaDescriptorRLocked() if err != nil { @@ -1109,7 +1111,7 @@ func (r *Replica) leaseGoodToGo( func (r *Replica) redirectOnOrAcquireLease( ctx context.Context, ) (kvserverpb.LeaseStatus, *roachpb.Error) { - return r.redirectOnOrAcquireLeaseForRequest(ctx, hlc.Timestamp{}) + return r.redirectOnOrAcquireLeaseForRequest(ctx, hlc.Timestamp{}, r.breaker.Signal()) } // TestingAcquireLease is redirectOnOrAcquireLease exposed for tests. @@ -1124,13 +1126,8 @@ func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStat // but it accepts a specific request timestamp instead of assuming that // the request is operating at the current time. func (r *Replica) redirectOnOrAcquireLeaseForRequest( - ctx context.Context, reqTS hlc.Timestamp, + ctx context.Context, reqTS hlc.Timestamp, brSig signaller, ) (kvserverpb.LeaseStatus, *roachpb.Error) { - if hasBypassCircuitBreakerMarker(ctx) { - defer func() { - log.Infof(ctx, "hello") - }() - } // Try fast-path. now := r.store.Clock().NowAsClockTimestamp() { @@ -1145,6 +1142,10 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( } } + if err := brSig.Err(); err != nil { + return kvserverpb.LeaseStatus{}, roachpb.NewError(err) + } + // Loop until the lease is held or the replica ascertains the actual lease // holder. Returns also on context.Done() (timeout or cancellation). for attempt := 1; ; attempt++ { @@ -1264,12 +1265,6 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // cannot be reproposed so we get this ambiguity. // We'll just loop around. return nil - case r.breaker.HasMark(goErr) && hasBypassCircuitBreakerMarker(ctx): - // If this request wanted to bypass the circuit breaker but still got a - // breaker error back, it joined a lease request started by an operation - // that did not bypass circuit breaker errors. Loop around and try again. - // See requestLeaseAsync for details. - return nil case errors.HasType(goErr, (*roachpb.LeaseRejectedError)(nil)): var tErr *roachpb.LeaseRejectedError errors.As(goErr, &tErr) @@ -1302,6 +1297,11 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( } log.VEventf(ctx, 2, "lease acquisition succeeded: %+v", status.Lease) return nil + case <-brSig.C(): + llHandle.Cancel() + err := brSig.Err() + log.VErrEventf(ctx, 2, "lease acquisition failed: %s", err) + return roachpb.NewError(err) case <-slowTimer.C: slowTimer.Read = true log.Warningf(ctx, "have been waiting %s attempting to acquire lease (%d attempts)", @@ -1361,6 +1361,11 @@ func (r *Replica) maybeExtendLeaseAsync(ctx context.Context, st kvserverpb.Lease log.Infof(ctx, "extending lease %s at %s", st.Lease, st.Now) } // We explicitly ignore the returned handle as we won't block on it. + // + // TODO(tbg): this ctx is likely cancelled very soon, which will in turn + // cancel the lease acquisition (unless joined by another more long-lived + // ctx). So this possibly isn't working as advertised (which only plays a role + // for expiration-based leases, at least). _ = r.requestLeaseLocked(ctx, st) } diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 8554c35a663b..127fc14627d7 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" @@ -138,20 +139,6 @@ func (r *Replica) sendWithoutRangeID( return nil, roachpb.NewError(err) } - // Circuit breaker handling. - ctx, cancel := context.WithCancel(ctx) - if bypassReplicaCircuitBreakerForBatch(ba) { - ctx = withBypassCircuitBreakerMarker(ctx) - } - tok, brSig, err := r.breaker.Register(ctx, cancel) - if err != nil { - return nil, roachpb.NewError(err) - } - defer func() { - rErr = r.breaker.UnregisterAndAdjustError(tok, brSig, rErr) - cancel() - }() - if err := r.maybeBackpressureBatch(ctx, ba); err != nil { return nil, roachpb.NewError(err) } @@ -163,7 +150,7 @@ func (r *Replica) sendWithoutRangeID( } // NB: must be performed before collecting request spans. - ba, err = maybeStripInFlightWrites(ba) + ba, err := maybeStripInFlightWrites(ba) if err != nil { return nil, roachpb.NewError(err) } @@ -414,6 +401,12 @@ func (r *Replica) executeBatchWithConcurrencyRetries( r.concMgr.FinishReq(g) } }() + pp := poison.Policy_Error + if r.signallerForBatch(ba).C() == nil { + // The request wishes to ignore the circuit breaker, i.e. attempt to propose + // commands and wait even if the circuit breaker is tripped. + pp = poison.Policy_Wait + } for first := true; ; first = false { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { @@ -451,11 +444,24 @@ func (r *Replica) executeBatchWithConcurrencyRetries( ReadConsistency: ba.ReadConsistency, WaitPolicy: ba.WaitPolicy, LockTimeout: ba.LockTimeout, + PoisonPolicy: pp, Requests: ba.Requests, LatchSpans: latchSpans, // nil if g != nil LockSpans: lockSpans, // nil if g != nil }, requestEvalKind) if pErr != nil { + if errors.HasType(pErr.GoError(), (*poison.PoisonedError)(nil)) { + brErr := r.breaker.Signal().Err() + if brErr == nil { + // The breaker may have healed in the meantime. + // + // TODO(tbg): it would be nicer if poisoning took an err and it + // came wrapped with the PoisonedError instead. Or we could + // retry the request. + brErr = r.replicaUnavailableError() + } + pErr = roachpb.NewError(errors.CombineErrors(brErr, pErr.GoError())) + } return nil, pErr } else if resp != nil { br = new(roachpb.BatchResponse) @@ -799,7 +805,7 @@ func (r *Replica) handleInvalidLeaseError( // On an invalid lease error, attempt to acquire a new lease. If in the // process of doing so, we determine that the lease now lives elsewhere, // redirect. - _, pErr := r.redirectOnOrAcquireLeaseForRequest(ctx, ba.Timestamp) + _, pErr := r.redirectOnOrAcquireLeaseForRequest(ctx, ba.Timestamp, r.signallerForBatch(ba)) // If we managed to get a lease (i.e. pErr == nil), the request evaluation // will be retried. return pErr @@ -876,6 +882,9 @@ func (r *Replica) executeAdminBatch( } _, err := r.checkExecutionCanProceed(ctx, ba, nil /* g */) + if err == nil { + err = r.signallerForBatch(ba).Err() + } if err == nil { break } @@ -883,7 +892,7 @@ func (r *Replica) executeAdminBatch( case errors.HasType(err, (*roachpb.InvalidLeaseError)(nil)): // If the replica does not have the lease, attempt to acquire it, or // redirect to the current leaseholder by returning an error. - _, pErr := r.redirectOnOrAcquireLeaseForRequest(ctx, ba.Timestamp) + _, pErr := r.redirectOnOrAcquireLeaseForRequest(ctx, ba.Timestamp, r.signallerForBatch(ba)) if pErr != nil { return nil, pErr } @@ -1156,6 +1165,14 @@ func (ec *endCmds) move() endCmds { return res } +func (ec *endCmds) poison() { + if ec.repl == nil { + // Already cleared. + return + } + ec.repl.concMgr.PoisonReq(ec.g) +} + // done releases the latches acquired by the command and updates the timestamp // cache using the final timestamp of each command. // diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 681395c07e29..b10e69c2dd99 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -94,6 +94,12 @@ func (r *Replica) executeWriteBatch( return nil, g, roachpb.NewError(err) } + // Check the breaker. Note that we do this after checkExecutionCanProceed, + // so that NotLeaseholderError has precedence. + if err := r.signallerForBatch(ba).Err(); err != nil { + return nil, g, roachpb.NewError(err) + } + // Compute the transaction's local uncertainty limit using observed // timestamps, which can help avoid uncertainty restarts. ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset()) diff --git a/pkg/kv/kvserver/spanlatch/BUILD.bazel b/pkg/kv/kvserver/spanlatch/BUILD.bazel index da2b43c4130d..56b5b2c7da9f 100644 --- a/pkg/kv/kvserver/spanlatch/BUILD.bazel +++ b/pkg/kv/kvserver/spanlatch/BUILD.bazel @@ -14,6 +14,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/spanset", "//pkg/roachpb", "//pkg/util/hlc", @@ -37,12 +38,15 @@ go_test( embed = [":spanlatch"], deps = [ "//pkg/keys", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/spanset", "//pkg/roachpb", "//pkg/testutils", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/timeutil", # keep + "//pkg/util/tracing", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/spanlatch/manager.go b/pkg/kv/kvserver/spanlatch/manager.go index 92a67b87d8f3..7dac899c7d83 100644 --- a/pkg/kv/kvserver/spanlatch/manager.go +++ b/pkg/kv/kvserver/spanlatch/manager.go @@ -16,6 +16,7 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -83,10 +84,10 @@ func Make(stopper *stop.Stopper, slowReqs *metric.Gauge) Manager { // latches are stored in the Manager's btrees. They represent the latching // of a single key span. type latch struct { + *signals id uint64 span roachpb.Span ts hlc.Timestamp - done *signal next, prev *latch // readSet linked-list. } @@ -106,10 +107,16 @@ func (la *latch) SetID(v uint64) { la.id = v } func (la *latch) SetKey(v []byte) { la.span.Key = v } func (la *latch) SetEndKey(v []byte) { la.span.EndKey = v } +type signals struct { + done signal + poison idempotentSignal +} + // Guard is a handle to a set of acquired latches. It is returned by // Manager.Acquire and accepted by Manager.Release. type Guard struct { - done signal + signals + pp poison.Policy // latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size. latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32 @@ -166,9 +173,10 @@ func allocGuardAndLatches(nLatches int) (*Guard, []latch) { return new(Guard), make([]latch, nLatches) } -func newGuard(spans *spanset.SpanSet) *Guard { +func newGuard(spans *spanset.SpanSet, pp poison.Policy) *Guard { nLatches := spans.Len() guard, latches := allocGuardAndLatches(nLatches) + guard.pp = pp for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { ss := spans.GetSpans(a, s) @@ -181,7 +189,7 @@ func newGuard(spans *spanset.SpanSet) *Guard { for i := range ssLatches { latch := &latches[i] latch.span = ss[i].Span - latch.done = &guard.done + latch.signals = &guard.signals latch.ts = ss[i].Timestamp // latch.setID() in Manager.insert, under lock. } @@ -203,8 +211,10 @@ func newGuard(spans *spanset.SpanSet) *Guard { // acquired. // // It returns a Guard which must be provided to Release. -func (m *Manager) Acquire(ctx context.Context, spans *spanset.SpanSet) (*Guard, error) { - lg, snap := m.sequence(spans) +func (m *Manager) Acquire( + ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, +) (*Guard, error) { + lg, snap := m.sequence(spans, pp) defer snap.close() err := m.wait(ctx, lg, snap) @@ -227,8 +237,8 @@ func (m *Manager) Acquire(ctx context.Context, spans *spanset.SpanSet) (*Guard, // // The method returns a Guard which must be provided to the // CheckOptimisticNoConflicts, Release methods. -func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet) *Guard { - lg, snap := m.sequence(spans) +func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, pp poison.Policy) *Guard { + lg, snap := m.sequence(spans, pp) lg.snap = &snap return lg } @@ -236,10 +246,10 @@ func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet) *Guard { // WaitFor waits for conflicting latches on the spans without adding // any latches itself. Fast path for operations that only require past latches // to be released without blocking new latches. -func (m *Manager) WaitFor(ctx context.Context, spans *spanset.SpanSet) error { +func (m *Manager) WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy) error { // The guard is only used to store latches by this request. These latches // are not actually inserted using insertLocked. - lg := newGuard(spans) + lg := newGuard(spans, pp) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -335,8 +345,8 @@ func (m *Manager) WaitUntilAcquired(ctx context.Context, lg *Guard) (*Guard, err // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet) (*Guard, snapshot) { - lg := newGuard(spans) +func (m *Manager) sequence(spans *spanset.SpanSet, pp poison.Policy) (*Guard, snapshot) { + lg := newGuard(spans, pp) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -454,7 +464,7 @@ func (m *Manager) wait(ctx context.Context, lg *Guard, snap snapshot) error { // Wait for writes at equal or lower timestamps. a2 := spanset.SpanReadWrite it := tr[a2].MakeIter() - if err := m.iterAndWait(ctx, timer, &it, a, a2, latch, ignoreLater); err != nil { + if err := m.iterAndWait(ctx, timer, &it, lg.pp, a, a2, latch, ignoreLater); err != nil { return err } case spanset.SpanReadWrite: @@ -466,13 +476,13 @@ func (m *Manager) wait(ctx context.Context, lg *Guard, snap snapshot) error { // to release their latches, so we wait on them first. a2 := spanset.SpanReadWrite it := tr[a2].MakeIter() - if err := m.iterAndWait(ctx, timer, &it, a, a2, latch, ignoreNothing); err != nil { + if err := m.iterAndWait(ctx, timer, &it, lg.pp, a, a2, latch, ignoreNothing); err != nil { return err } // Wait for reads at equal or higher timestamps. a2 = spanset.SpanReadOnly it = tr[a2].MakeIter() - if err := m.iterAndWait(ctx, timer, &it, a, a2, latch, ignoreEarlier); err != nil { + if err := m.iterAndWait(ctx, timer, &it, lg.pp, a, a2, latch, ignoreEarlier); err != nil { return err } default: @@ -491,6 +501,7 @@ func (m *Manager) iterAndWait( ctx context.Context, t *timeutil.Timer, it *iterator, + pp poison.Policy, waitType, heldType spanset.SpanAccess, wait *latch, ignore ignoreFn, @@ -503,7 +514,7 @@ func (m *Manager) iterAndWait( if ignore(wait.ts, held.ts) { continue } - if err := m.waitForSignal(ctx, t, waitType, heldType, wait, held); err != nil { + if err := m.waitForSignal(ctx, t, pp, waitType, heldType, wait, held); err != nil { return err } } @@ -512,13 +523,34 @@ func (m *Manager) iterAndWait( // waitForSignal waits for the latch that is currently held to be signaled. func (m *Manager) waitForSignal( - ctx context.Context, t *timeutil.Timer, waitType, heldType spanset.SpanAccess, wait, held *latch, + ctx context.Context, + t *timeutil.Timer, + pp poison.Policy, + waitType, heldType spanset.SpanAccess, + wait, held *latch, ) error { log.Eventf(ctx, "waiting to acquire %s latch %s, held by %s latch %s", waitType, wait, heldType, held) + poisonCh := held.poison.signalChan() for { select { case <-held.done.signalChan(): return nil + case <-poisonCh: + // The latch we're waiting on was poisoned. If we continue to wait, we have to + // poison our latches as well (so that waiters blocked on us which want to + // fail fast don't get stuck). If we fail fast, we're momentarily removing + // ourselves anyway, so we don't need to self-poison. + switch pp { + case poison.Policy_Error: + return poison.NewPoisonedError(held.span, held.ts) + case poison.Policy_Wait: + log.Eventf(ctx, "encountered poisoned latch; continuing to wait") + wait.poison.signal() + // No need to self-poison multiple times. + poisonCh = nil + default: + return errors.Errorf("unsupported poison.Policy %d", pp) + } case <-t.C: t.Read = true defer t.Reset(base.SlowRequestThreshold) @@ -541,6 +573,14 @@ func (m *Manager) waitForSignal( } } +// Poison marks the Guard as poisoned, meaning that the request will not be +// expected to be releasing its latches (any time soon). This gives requests +// blocking on the Guard's latches an opportunity to fail fast, according to +// their poison.Policy. +func (m *Manager) Poison(lg *Guard) { + lg.poison.signal() +} + // Release releases the latches held by the provided Guard. After being called, // dependent latch acquisition attempts can complete if not blocked on any other // owned latches. diff --git a/pkg/kv/kvserver/spanlatch/manager_test.go b/pkg/kv/kvserver/spanlatch/manager_test.go index 115577567470..8e9fdc4e3484 100644 --- a/pkg/kv/kvserver/spanlatch/manager_test.go +++ b/pkg/kv/kvserver/spanlatch/manager_test.go @@ -20,11 +20,13 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -64,11 +66,12 @@ func add(spanSet *spanset.SpanSet, from, to string, write bool, ts hlc.Timestamp } } -func testLatchSucceeds(t *testing.T, lgC <-chan *Guard) *Guard { +func testLatchSucceeds(t *testing.T, a attempt) *Guard { t.Helper() select { - case lg := <-lgC: - return lg + case err := <-a.errCh: + require.NoError(t, err) + return a.lg case <-time.After(testutils.DefaultSucceedsSoonDuration): // False positives are not ok, so we use a more // conservative timeout than in testLatchBlocks. @@ -77,10 +80,10 @@ func testLatchSucceeds(t *testing.T, lgC <-chan *Guard) *Guard { return nil } -func testLatchBlocks(t *testing.T, lgC <-chan *Guard) { +func testLatchBlocks(t *testing.T, a attempt) { t.Helper() select { - case <-lgC: + case <-a.errCh: t.Fatal("latch acquisition should block") case <-time.After(3 * time.Millisecond): // False positives are ok as long as they are rare, so we @@ -91,35 +94,43 @@ func testLatchBlocks(t *testing.T, lgC <-chan *Guard) { // MustAcquire is like Acquire, except it can't return context cancellation // errors. func (m *Manager) MustAcquire(spans *spanset.SpanSet) *Guard { - lg, err := m.Acquire(context.Background(), spans) + lg, err := m.Acquire(context.Background(), spans, poison.Policy_Error) if err != nil { panic(err) } return lg } -// MustAcquireCh is like Acquire, except it only sequences the latch latch -// attempt synchronously and waits on dependent latches asynchronously. It -// returns a channel that provides the Guard when the latches are acquired (i.e. -// after waiting). If the context expires, a nil Guard will be delivered on the -// channel. -func (m *Manager) MustAcquireCh(spans *spanset.SpanSet) <-chan *Guard { - return m.MustAcquireChCtx(context.Background(), spans) +// attempt is a testing helper returned from MustAcquireCh. +type attempt struct { + lg *Guard + errCh <-chan error } -// MustAcquireChCtx is like MustAcquireCh, except it accepts a context. -func (m *Manager) MustAcquireChCtx(ctx context.Context, spans *spanset.SpanSet) <-chan *Guard { - ch := make(chan *Guard) - lg, snap := m.sequence(spans) +// MustAcquireCh is like Acquire, except it only sequences the latch attempt +// synchronously and waits on dependent latches asynchronously. It returns an +// `attempt` helper whose error channel is signaled with the result of +// sequencing. Use MustAcquireChExt when testing context cancellation or +// poisoning. +func (m *Manager) MustAcquireCh(spans *spanset.SpanSet) attempt { + return m.MustAcquireChExt(context.Background(), spans, poison.Policy_Error) +} + +// MustAcquireChExt is like MustAcquireCh, except it accepts a context and +// poison.Policy. +func (m *Manager) MustAcquireChExt( + ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, +) attempt { + errCh := make(chan error, 1) + lg, snap := m.sequence(spans, pp) go func() { err := m.wait(ctx, lg, snap) if err != nil { m.Release(lg) - lg = nil } - ch <- lg + errCh <- err }() - return ch + return attempt{lg: lg, errCh: errCh} } func TestLatchManager(t *testing.T) { @@ -170,20 +181,20 @@ func TestLatchManagerAcquireOverlappingSpans(t *testing.T) { // We acquire reads at lower timestamps than writes to check for blocked // acquisitions based on the original latch, not the latches declared in // earlier test cases. - var latchCs []<-chan *Guard - latchCs = append(latchCs, m.MustAcquireCh(spans("a", "b", write, ts1))) - latchCs = append(latchCs, m.MustAcquireCh(spans("b", "c", read, ts0))) - latchCs = append(latchCs, m.MustAcquireCh(spans("b", "c", write, ts1))) - latchCs = append(latchCs, m.MustAcquireCh(spans("c", "d", write, ts1))) - latchCs = append(latchCs, m.MustAcquireCh(spans("c", "d", read, ts0))) - - for _, lgC := range latchCs { + var attempts []attempt + attempts = append(attempts, m.MustAcquireCh(spans("a", "b", write, ts1))) + attempts = append(attempts, m.MustAcquireCh(spans("b", "c", read, ts0))) + attempts = append(attempts, m.MustAcquireCh(spans("b", "c", write, ts1))) + attempts = append(attempts, m.MustAcquireCh(spans("c", "d", write, ts1))) + attempts = append(attempts, m.MustAcquireCh(spans("c", "d", read, ts0))) + + for _, lgC := range attempts { testLatchBlocks(t, lgC) } m.Release(lg1) - for _, lgC := range latchCs { + for _, lgC := range attempts { lg := testLatchSucceeds(t, lgC) m.Release(lg) } @@ -205,19 +216,19 @@ func TestLatchManagerAcquiringReadsVaryingTimestamps(t *testing.T) { m.Release(lg) } - var latchCs []<-chan *Guard + var attempts []attempt for _, walltime := range []int64{0, 1, 2} { ts := hlc.Timestamp{WallTime: walltime} - latchCs = append(latchCs, m.MustAcquireCh(spans("a", "", write, ts))) + attempts = append(attempts, m.MustAcquireCh(spans("a", "", write, ts))) } - for _, lgC := range latchCs { + for _, lgC := range attempts { testLatchBlocks(t, lgC) } m.Release(lg1) - for _, lgC := range latchCs { + for _, lgC := range attempts { lg := testLatchSucceeds(t, lgC) m.Release(lg) } @@ -267,18 +278,18 @@ func TestLatchManagerMultipleOverlappingLatches(t *testing.T) { var m Manager // Acquire multiple latches. - lg1C := m.MustAcquireCh(spans("a", "", write, zeroTS)) - lg2C := m.MustAcquireCh(spans("b", "c", write, zeroTS)) - lg3C := m.MustAcquireCh(spans("a", "d", write, zeroTS)) + a1 := m.MustAcquireCh(spans("a", "", write, zeroTS)) + a2 := m.MustAcquireCh(spans("b", "c", write, zeroTS)) + a3 := m.MustAcquireCh(spans("a", "d", write, zeroTS)) // Attempt to acquire latch which overlaps them all. lg4C := m.MustAcquireCh(spans("0", "z", write, zeroTS)) testLatchBlocks(t, lg4C) - m.Release(<-lg1C) + m.Release(testLatchSucceeds(t, a1)) testLatchBlocks(t, lg4C) - m.Release(<-lg2C) + m.Release(testLatchSucceeds(t, a2)) testLatchBlocks(t, lg4C) - m.Release(<-lg3C) + m.Release(testLatchSucceeds(t, a3)) testLatchSucceeds(t, lg4C) } @@ -509,6 +520,61 @@ func TestLatchManagerDependentLatches(t *testing.T) { } } +func TestLatchManagerPoison(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + testLatchPoisons := func(t *testing.T, a attempt) { + t.Helper() + select { + case err := <-a.errCh: + require.True(t, errors.HasType(err, (*poison.PoisonedError)(nil)), "%+v", err) + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatal("timed out") + } + } + + var m Manager + a1 := m.MustAcquireChExt(ctx, spans("a", "", write, zeroTS), poison.Policy_Wait) + a2 := m.MustAcquireChExt(ctx, spans("a", "", write, zeroTS), poison.Policy_Error) + a3 := m.MustAcquireChExt(ctx, spans("a", "", write, zeroTS), poison.Policy_Error) + a4 := m.MustAcquireChExt(ctx, spans("a", "", write, zeroTS), poison.Policy_Wait) + a5 := m.MustAcquireChExt(ctx, spans("a", "", write, zeroTS), poison.Policy_Error) + + ga1 := testLatchSucceeds(t, a1) + + // chga5 blocks on chga4 blocks on chga3 blocks on chga2 blocks on chga1. + testLatchBlocks(t, a2) + testLatchBlocks(t, a3) + testLatchBlocks(t, a4) + testLatchBlocks(t, a5) + + // Poison a1. This poisons a2, a3, a4, and a5. However, a4 is Policy_Wait so + // we don't see that yet (but since a5 waits on a4 and a5 poisons, we are + // pretty sure). + ga1.poison.signal() + + testLatchPoisons(t, a2) + testLatchPoisons(t, a3) + testLatchPoisons(t, a5) + + // NB: we must use SucceedsSoon here & do this before we release ga1 below. + // Otherwise, it is possible for a4 to never observe a poisoned latch during + // sequencing. + testutils.SucceedsSoon(t, func() error { + if !a4.lg.poison.sig.signaled() { + return errors.New("not signaled yet") + } + return nil + }) + + // Release ga1, which allows ga4 to sequence. At that point, we can check + // directly that it is poisoned. + m.Release(ga1) + ga4 := testLatchSucceeds(t, a4) + m.Release(ga4) +} + func TestLatchManagerContextCancellation(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager @@ -517,23 +583,23 @@ func TestLatchManagerContextCancellation(t *testing.T) { lg1 := m.MustAcquire(spans("a", "", write, zeroTS)) // The second one is given a cancelable context. ctx2, cancel2 := context.WithCancel(context.Background()) - lg2C := m.MustAcquireChCtx(ctx2, spans("a", "", write, zeroTS)) - lg3C := m.MustAcquireCh(spans("a", "", write, zeroTS)) + a2C := m.MustAcquireChExt(ctx2, spans("a", "", write, zeroTS), poison.Policy_Error) + a3C := m.MustAcquireCh(spans("a", "", write, zeroTS)) // The second and third latch attempt block on the first. - testLatchBlocks(t, lg2C) - testLatchBlocks(t, lg3C) + testLatchBlocks(t, a2C) + testLatchBlocks(t, a3C) // Cancel the second acquisition's context. It should stop waiting. cancel2() - require.Nil(t, <-lg2C) + require.ErrorIs(t, <-a2C.errCh, context.Canceled) // The third latch attempt still blocks. - testLatchBlocks(t, lg3C) + testLatchBlocks(t, a3C) // Release the first latch. The third succeeds in acquiring the latch. m.Release(lg1) - testLatchSucceeds(t, lg3C) + testLatchSucceeds(t, a3C) } func TestLatchManagerOptimistic(t *testing.T) { @@ -541,31 +607,30 @@ func TestLatchManagerOptimistic(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS)) - require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS))) + lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS), poison.Policy_Error) + require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS)), poison.Policy_Error) lg1, err := m.WaitUntilAcquired(context.Background(), lg1) require.NoError(t, err) // Optimistic acquire encounters conflict in some cases. - lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS)) + lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS), poison.Policy_Error) require.False(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "e", read, zeroTS))) require.True(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "d", read, zeroTS))) - waitUntilAcquiredCh := func(g *Guard) <-chan *Guard { - ch := make(chan *Guard) + waitUntilAcquiredCh := func(g *Guard) attempt { + errCh := make(chan error, 1) go func() { - lg, err := m.WaitUntilAcquired(context.Background(), g) - require.NoError(t, err) - ch <- lg + _, err := m.WaitUntilAcquired(context.Background(), g) + errCh <- err }() - return ch + return attempt{lg: g, errCh: errCh} } - ch2 := waitUntilAcquiredCh(lg2) - testLatchBlocks(t, ch2) + a2 := waitUntilAcquiredCh(lg2) + testLatchBlocks(t, a2) m.Release(lg1) - testLatchSucceeds(t, ch2) + testLatchSucceeds(t, a2) // Optimistic acquire encounters conflict. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS)) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error) require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) m.Release(lg2) // There is still a conflict even though lg2 has been released. @@ -577,7 +642,7 @@ func TestLatchManagerOptimistic(t *testing.T) { // Optimistic acquire for read below write encounters no conflict. oneTS, twoTS := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2} lg4 := m.MustAcquire(spans("c", "e", write, twoTS)) - lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS)) + lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS), poison.Policy_Error) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "e", read, oneTS))) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "c", read, oneTS))) lg5, err = m.WaitUntilAcquired(context.Background(), lg5) @@ -591,18 +656,16 @@ func TestLatchManagerWaitFor(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS)) + lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS), poison.Policy_Error) require.NoError(t, err) // See if WaitFor waits for above latch. - waitForCh := func() <-chan *Guard { - ch := make(chan *Guard) + waitForCh := func() attempt { + errCh := make(chan error) go func() { - err := m.WaitFor(context.Background(), spans("a", "e", read, zeroTS)) - require.NoError(t, err) - ch <- &Guard{} + errCh <- m.WaitFor(context.Background(), spans("a", "e", read, zeroTS), poison.Policy_Error) }() - return ch + return attempt{errCh: errCh} } ch2 := waitForCh() testLatchBlocks(t, ch2) @@ -611,7 +674,7 @@ func TestLatchManagerWaitFor(t *testing.T) { // Optimistic acquire should _not_ encounter conflict - as WaitFor should // not lay any latches. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS)) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error) require.True(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) lg3, err = m.WaitUntilAcquired(context.Background(), lg3) require.NoError(t, err) @@ -659,7 +722,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { b.ResetTimer() for i := range spans { - lg, snap := m.sequence(&spans[i]) + lg, snap := m.sequence(&spans[i], poison.Policy_Error) snap.close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) diff --git a/pkg/kv/kvserver/spanlatch/signal.go b/pkg/kv/kvserver/spanlatch/signal.go index 71deb5621918..1a7fdceb1968 100644 --- a/pkg/kv/kvserver/spanlatch/signal.go +++ b/pkg/kv/kvserver/spanlatch/signal.go @@ -41,9 +41,17 @@ type signal struct { } func (s *signal) signal() { + s.signalWithChoice(false /* idempotent */) +} + +func (s *signal) signalWithChoice(idempotent bool) { if !atomic.CompareAndSwapInt32(&s.a, noSig, sig) { + if idempotent { + return + } panic("signaled twice") } + // Close the channel if it was ever initialized. if cPtr := atomic.LoadPointer(&s.c); cPtr != nil { // Coordinate with signalChan to avoid double-closing. @@ -82,6 +90,19 @@ func (s *signal) signalChan() <-chan struct{} { return c } +// idempotentSignal is like signal, but its signal method is idempotent. +type idempotentSignal struct { + sig signal +} + +func (s *idempotentSignal) signal() { + s.sig.signalWithChoice(true /* idempotent */) +} + +func (s *idempotentSignal) signalChan() <-chan struct{} { + return s.sig.signalChan() +} + func chanToPtr(c chan struct{}) unsafe.Pointer { return *(*unsafe.Pointer)(unsafe.Pointer(&c)) } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 77798a352a8d..5e70695cb359 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -86,10 +86,6 @@ type StoreTestingKnobs struct { // per-Batch SlowReplicationThreshold. SlowReplicationThresholdOverride func(ba *roachpb.BatchRequest) time.Duration - // CancelStorageFactory overrides the default CancelStorage used by Replica - // circuit breakers. - CancelStorageFactory func() CancelStorage - // TestingRangefeedFilter is called before a replica processes a rangefeed // in order for unit tests to modify the request, error returned to the client // or data. diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 2dd836e61cdb..2ab9e465855e 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1294,8 +1294,9 @@ func (*AdminChangeReplicasRequest) flags() flag { return isAdmin | isAlone } func (*AdminRelocateRangeRequest) flags() flag { return isAdmin | isAlone } func (*GCRequest) flags() flag { - // We let GCRequest bypass the circuit breaker because otherwise, the GC queue may - // busy loop on an unavailable range, doing lots of work but never making progress. + // We defensively let GCRequest bypass the circuit breaker because otherwise, + // the GC queue might busy loop on an unavailable range, doing lots of work + // but never making progress. return isWrite | isRange | bypassesReplicaCircuitBreaker } @@ -1321,7 +1322,9 @@ func (*ResolveIntentRequest) flags() flag { return isWrite } func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange } func (*TruncateLogRequest) flags() flag { return isWrite } func (*MergeRequest) flags() flag { return isWrite | canBackpressure } -func (*RequestLeaseRequest) flags() flag { return isWrite | isAlone | skipsLeaseCheck } +func (*RequestLeaseRequest) flags() flag { + return isWrite | isAlone | skipsLeaseCheck | bypassesReplicaCircuitBreaker +} // LeaseInfoRequest is usually executed in an INCONSISTENT batch, which has the // effect of the `skipsLeaseCheck` flag that lease write operations have. @@ -1339,6 +1342,11 @@ func (*TransferLeaseRequest) flags() flag { // the store has registered that a transfer is in progress and // `redirectOnOrAcquireLease` would already tentatively redirect to the future // lease holder. + // + // Note that we intentionally don't let TransferLease bypass the Replica + // circuit breaker. Transferring a lease while the replication layer is + // unavailable results in the "old" leaseholder relinquishing the ability + // to serve (strong) reads, without being able to hand over the lease. return isWrite | isAlone | skipsLeaseCheck } func (*ProbeRequest) flags() flag { diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 0c12f9399dbc..891fd8f23198 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -744,6 +744,13 @@ func (tc *TestCluster) WaitForVoters( // respective replica has caught up with the config change). // // targets are replication target for change replica. +// +// TODO(tbg): it seems silly that most callers pass `waitForVoter==false` even +// when they are adding a voter, and instead well over a dozen tests then go and +// call `.WaitForVoter` instead. It is very rare for a test to want to add a +// voter but not wait for this voter to show up on the target replica (perhaps +// when some strange error is injected) so the rare test should have to do the +// extra work instead. func (tc *TestCluster) waitForNewReplicas( startKey roachpb.Key, waitForVoter bool, targets ...roachpb.ReplicationTarget, ) error {