From bbc37ba2128e4dd233ce1036fc7b9e6d3eecc7bc Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 21 Feb 2022 18:03:01 +0100 Subject: [PATCH] kvserver: allow circuit-breaker to serve reads This commit revamps an earlier implementation (#71806) of per-Replica circuit breakers (#33007). The earlier implementation relied on context cancellation and coarsely failed all requests addressing the Replica when the breaker was tripped. This had two downsides: First, there was a (small) performance overhead for implementing the cancellation that was paid even in the common case of a healthy Replica. Second, and more importantly, the coarseness meant that we'd potentially fail many requests that would otherwise succeed, and in particular follower reads. @nvanbenschoten suggested in #74799 that latching could be extended with the concept of "poisoning" and that this could result in fine-grained circuit breaker behavior where only requests that are truly affected by unavailability (at the replication layer) would be rejected. This commit implements that strategy: A request's latches are poisoned if its completion is predicated on the replication layer being healthy. In other words, when the breaker trips, all inflight proposals have their latches poisoned and new proposals are failed fast. However, and this is the big difference, reads can potentially still be accepted in either of two scenarios: - a valid follower read remains valid regardless of the circuit breaker status, and also regardless of inflight proposals (follower reads don't observe latches). - a read that can be served under the current lease and which does not conflict with any of the stuck proposals in the replication layer (= poisoned latches) can also be served. In short, reads only fail fast if they encounter a poisoned latch or need to request a lease. (If they opted out of fail-fast behavior, they behave as today). Latch poisoning is added as a first-class concept in the `concurrency` package, and a structured error `PoisonError` is introduced. This error in particular contains the span and timestamp of the poisoned latch that prompted the fail-fast. Lease proposals now always use `poison.Policy_Wait`, leaving the fail-fast behavior to the caller. This simplifies things since multiple callers with their own `poison.Policy` can multiplex onto a single inflight lease proposal. Addresses #74799. Release note: None Release justification: 22.1 project work --- pkg/gen/protobuf.bzl | 1 + pkg/kv/kvserver/BUILD.bazel | 3 +- ...ient_replica_circuit_breaker_bench_test.go | 124 ----- .../client_replica_circuit_breaker_test.go | 449 +++++++++++++----- pkg/kv/kvserver/concurrency/BUILD.bazel | 2 + .../concurrency/concurrency_control.go | 15 +- .../concurrency/concurrency_manager.go | 11 +- .../concurrency/concurrency_manager_test.go | 19 +- .../concurrency/datadriven_util_test.go | 19 + pkg/kv/kvserver/concurrency/latch_manager.go | 15 +- .../kvserver/concurrency/lock_table_test.go | 7 +- .../kvserver/concurrency/poison/BUILD.bazel | 44 ++ pkg/kv/kvserver/concurrency/poison/error.go | 42 ++ .../kvserver/concurrency/poison/error.proto | 22 + .../kvserver/concurrency/poison/policy.proto | 35 ++ .../concurrency_manager/poison_policy_err | 60 +++ .../poison_policy_err_indirect | 61 +++ .../poison_policy_wait_disjoint | 59 +++ .../poison_policy_wait_overlapping | 61 +++ pkg/kv/kvserver/helpers_test.go | 7 - pkg/kv/kvserver/replica_circuit_breaker.go | 138 +----- .../replica_circuit_breaker_cancelstorage.go | 115 ----- .../kvserver/replica_circuit_breaker_test.go | 160 ------- pkg/kv/kvserver/replica_init.go | 8 +- pkg/kv/kvserver/replica_proposal.go | 10 + pkg/kv/kvserver/replica_raft.go | 23 +- pkg/kv/kvserver/replica_range_lease.go | 52 +- pkg/kv/kvserver/replica_send.go | 45 +- pkg/kv/kvserver/replica_write.go | 8 + pkg/kv/kvserver/spanlatch/BUILD.bazel | 3 + pkg/kv/kvserver/spanlatch/manager.go | 68 ++- pkg/kv/kvserver/spanlatch/manager_test.go | 88 +++- pkg/kv/kvserver/spanlatch/signal.go | 21 + pkg/kv/kvserver/testing_knobs.go | 4 - pkg/roachpb/api.go | 14 +- 35 files changed, 1051 insertions(+), 762 deletions(-) delete mode 100644 pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go create mode 100644 pkg/kv/kvserver/concurrency/poison/BUILD.bazel create mode 100644 pkg/kv/kvserver/concurrency/poison/error.go create mode 100644 pkg/kv/kvserver/concurrency/poison/error.proto create mode 100644 pkg/kv/kvserver/concurrency/poison/policy.proto create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping delete mode 100644 pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index df87c11dff58..1a2eac484301 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -20,6 +20,7 @@ PROTOBUF_SRCS = [ "//pkg/kv/kvnemesis:kvnemesis_go_proto", "//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto", "//pkg/kv/kvserver/concurrency/lock:lock_go_proto", + "//pkg/kv/kvserver/concurrency/poison:poison_go_proto", "//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e24b2673dcf9..63e1223b84c0 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -37,7 +37,6 @@ go_library( "replica_backpressure.go", "replica_batch_updates.go", "replica_circuit_breaker.go", - "replica_circuit_breaker_cancelstorage.go", "replica_closedts.go", "replica_command.go", "replica_consistency.go", @@ -124,6 +123,7 @@ go_library( "//pkg/kv/kvserver/closedts/sidetransport", "//pkg/kv/kvserver/closedts/tracker", "//pkg/kv/kvserver/concurrency", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/constraint", "//pkg/kv/kvserver/gc", "//pkg/kv/kvserver/idalloc", @@ -227,7 +227,6 @@ go_test( "client_rangefeed_test.go", "client_relocate_range_test.go", "client_replica_backpressure_test.go", - "client_replica_circuit_breaker_bench_test.go", "client_replica_circuit_breaker_test.go", "client_replica_gc_test.go", "client_replica_test.go", diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go deleted file mode 100644 index 686a6a6c557e..000000000000 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_bench_test.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvserver_test - -import ( - "context" - "fmt" - "math/rand" - "strconv" - "sync" - "testing" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/util/encoding" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/stretchr/testify/require" -) - -type replicaCircuitBreakerBench struct { - *testcluster.TestCluster - pool *sync.Pool // *BatchRequest -} - -func (tc *replicaCircuitBreakerBench) repl(b *testing.B) *kvserver.Replica { - return tc.GetFirstStoreFromServer(b, 0).LookupReplica(keys.MustAddr(tc.ScratchRange(b))) -} - -func setupCircuitBreakerReplicaBench( - b *testing.B, breakerEnabled bool, cs string, -) (*replicaCircuitBreakerBench, *stop.Stopper) { - b.Helper() - - var numShards int - { - _, err := fmt.Sscanf(cs, "mutexmap-%d", &numShards) - require.NoError(b, err) - } - sFn := func() kvserver.CancelStorage { return &kvserver.MapCancelStorage{NumShards: numShards} } - - var knobs kvserver.StoreTestingKnobs - knobs.CancelStorageFactory = sFn - - var args base.TestClusterArgs - args.ServerArgs.Knobs.Store = &knobs - tc := testcluster.StartTestCluster(b, 1, args) - - stmt := `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '1000s'` - if !breakerEnabled { - stmt = `SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '0s'` - } - _, err := tc.ServerConn(0).Exec(stmt) - require.NoError(b, err) - wtc := &replicaCircuitBreakerBench{ - TestCluster: tc, - } - wtc.pool = &sync.Pool{ - New: func() interface{} { - repl := wtc.repl(b) - var ba roachpb.BatchRequest - ba.RangeID = repl.RangeID - ba.Timestamp = repl.Clock().NowAsClockTimestamp().ToTimestamp() - var k roachpb.Key - k = append(k, repl.Desc().StartKey.AsRawKey()...) - k = encoding.EncodeUint64Ascending(k, uint64(rand.Intn(1000))) - ba.Add(roachpb.NewGet(k, false)) - return &ba - }, - } - return wtc, tc.Stopper() -} - -func BenchmarkReplicaCircuitBreakerSendOverhead(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) - ctx := context.Background() - - for _, enabled := range []bool{false, true} { - b.Run("enabled="+strconv.FormatBool(enabled), func(b *testing.B) { - dss := []string{ - "mutexmap-1", "mutexmap-2", "mutexmap-4", "mutexmap-8", "mutexmap-12", "mutexmap-16", - "mutexmap-20", "mutexmap-24", "mutexmap-32", "mutexmap-64", - } - if !enabled { - dss = dss[:1] - } - - for _, ds := range dss { - b.Run(ds, func(b *testing.B) { - b.ReportAllocs() - tc, stopper := setupCircuitBreakerReplicaBench(b, enabled, ds) - defer stopper.Stop(ctx) - - repl := tc.repl(b) - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - ba := tc.pool.Get().(*roachpb.BatchRequest) - _, err := repl.Send(ctx, *ba) - tc.pool.Put(ba) - if err != nil { - b.Fatal(err) - } - } - }) - }) - } - }) - } -} diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 0293834831bf..dc47f762a756 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -82,12 +83,13 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { defer log.Scope(t).Close(t) tc := setupCircuitBreakerTest(t) defer tc.Stopper().Stop(context.Background()) + k := tc.ScratchRange(t) // Get lease on n1. require.NoError(t, tc.Write(n1)) // Disable the probe so that when the breaker trips, it stays tripped. tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("boom")) + tc.Report(n1, errors.New("injected breaker error")) s1 := tc.GetFirstStoreFromServer(t, n1) s2 := tc.GetFirstStoreFromServer(t, n2) @@ -97,10 +99,10 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCurTripped.Value()) require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCumTripped.Count()) - // n1 could theoretically still serve reads (there is a valid lease - // and none of the latches are taken), but since it is hard to determine - // that upfront we currently fail all reads as well. - tc.RequireIsBreakerOpen(t, tc.Read(n1)) + // n1 can still serve reads despite the breaker having tripped, as there is a + // valid lease and no poisoned latches prevent the read. + require.NoError(t, tc.Read(n1)) + require.NoError(t, tc.FollowerRead(n1)) tc.RequireIsBreakerOpen(t, tc.Write(n1)) // When we go through the KV client stack, we still get the breaker error @@ -108,23 +110,29 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { tc.RequireIsBreakerOpen(t, tc.WriteDS(n1)) tc.RequireIsBreakerOpen(t, tc.WriteDS(n2)) + // Can't transfer the lease away while breaker is tripped. (This would be + // a bad idea, since n1 would stop serving strong reads, thus making the + // outage worse). + tc.RequireIsBreakerOpen(t, + tc.TransferRangeLease(tc.LookupRangeOrFatal(t, k), tc.Target(n2)), + ) + // n2 does not have the lease so all it does is redirect to the leaseholder - // n1. + // n1, but it can serve follower reads. tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + require.NoError(t, tc.FollowerRead(n2)) tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) // Enable the probe. Even a read should trigger the probe // and within due time the breaker should heal. tc.SetProbeEnabled(n1, true) - tc.UntripsSoon(t, tc.Read, n1) - // Same behavior on writes. - tc.Report(n1, errors.New("boom again")) + require.NoError(t, tc.Read(n1)) // this always worked + // Writes heal soon. tc.UntripsSoon(t, tc.Write, n1) - // Currently tripped drops back to zero, all-time is two (since we tripped - // it twice) + // Currently tripped drops back to zero, all-time remains at one. require.EqualValues(t, 0, s1.Metrics().ReplicaCircuitBreakerCurTripped.Value()) - require.EqualValues(t, 2, s1.Metrics().ReplicaCircuitBreakerCumTripped.Count()) + require.EqualValues(t, 1, s1.Metrics().ReplicaCircuitBreakerCumTripped.Count()) // s2 wasn't affected by any breaker events. require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCurTripped.Value()) require.Zero(t, s2.Metrics().ReplicaCircuitBreakerCumTripped.Count()) @@ -134,9 +142,9 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { // breaker on follower n2. Before the breaker is tripped, we see // NotLeaseholderError. When it's tripped, those are supplanted by the breaker // errors. Once we allow the breaker to probe, the breaker untrips. In -// particular, this tests that the probe can succeed even when run on a -// follower (which would not be true if it required the local Replica to -// execute an operation that requires the lease). +// particular, this tests that the probe can succeed even when run on a follower +// (which would not be true if it required the local Replica to execute an +// operation that requires the lease). func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -147,7 +155,7 @@ func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { require.NoError(t, tc.Write(n1)) // Disable the probe on n2 so that when the breaker trips, it stays tripped. tc.SetProbeEnabled(n2, false) - tc.Report(n2, errors.New("boom")) + tc.Report(n2, errors.New("injected breaker error")) // We didn't trip the leaseholder n1, so it is unaffected. require.NoError(t, tc.Read(n1)) @@ -158,31 +166,21 @@ func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { // time of writing it would propagate it. require.NoError(t, tc.WriteDS(n1)) - tc.RequireIsBreakerOpen(t, tc.Read(n2)) - tc.RequireIsBreakerOpen(t, tc.Write(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) + require.NoError(t, tc.FollowerRead(n2)) - // Enable the probe. Even a read should trigger the probe - // and within due time the breaker should heal, giving us - // NotLeaseholderErrors again. - // - // TODO(tbg): this test would be more meaningful with follower reads. They - // should succeed when the breaker is open and fail if the breaker is - // tripped. However knowing that the circuit breaker check sits at the top - // of Replica.sendWithRangeID, it's clear that it won't make a difference. + // Enable the probe again. n2 should untrip soon. tc.SetProbeEnabled(n2, true) + tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) testutils.SucceedsSoon(t, func() error { - if err := tc.Read(n2); !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { - return err - } - return nil - }) - // Same behavior on writes. - tc.Report(n2, errors.New("boom again")) - testutils.SucceedsSoon(t, func() error { - if err := tc.Write(n2); !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { - return err - } - return nil + // NB: this is slightly contrived - the mere act of accessing Err() is what + // triggers the probe! Regular requests on the replica wouldn'd do that, + // since we're intentionally preferring a NotLeaseholderError over a breaker + // error (and thus aren't ever accessing the breaker when we can't serve the + // request). + return tc.repls[n2].Breaker().Signal().Err() }) } @@ -202,34 +200,45 @@ func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) { // disabled. require.NoError(t, tc.Write(n1)) tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("boom")) - resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) - - // n2 (not n1) will return a NotLeaseholderError. This may be surprising - - // why isn't it trying and succeeding to acquire a lease - but it does - // not do that because it sees that the new leaseholder (n2) is not live - // itself. We'll revisit this after re-enabling liveness later in the test. - { - err := tc.Read(n2) - // At time of writing: not incrementing epoch on n1 because next - // leaseholder (n2) not live. - t.Log(err) - tc.RequireIsNotLeaseholderError(t, err) - // Same behavior for write on n2. - tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) - } - // On n1, run into the circuit breaker when requesting lease. - { - tc.RequireIsBreakerOpen(t, tc.Read(n1)) + tc.Report(n1, errors.New("injected breaker error")) + resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats) + + // On n1, run into the circuit breaker when requesting lease. We have to + // resume heartbeats for this to not time out, as requesting the new lease + // entails doing liveness checks which can't succeed if nobody is + // heartbeating, and we'd get stuck in liveness before reaching the circuit + // breaker. (In other words, replica circuit breaking doesn't fail-fast + // requests reliably if liveness is unavailable; this is tracked in #74616). + // We don't attempt to acquire a lease on n2 since it would try and succeed + // (except the test harness categorically prevents n2 from getting a lease, + // injecting an error). + resumeHeartbeats() + testutils.SucceedsSoon(t, func() error { + err := tc.Read(n1) + if errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { + // Retriable errors can occur when manipulating the liveness record in + // preparation for requesting a lease, such as: + // + // [NotLeaseHolderError] failed to manipulate liveness record: heartbeat + // failed on epoch increment; r45: replica (n1,s1):1 not lease holder; + // current lease is repl=(n1,s1):1 seq=1 start=0,0 epo=1 pro=[...] + return err + } + tc.RequireIsBreakerOpen(t, err) tc.RequireIsBreakerOpen(t, tc.Write(n1)) - } + return nil + }) + + // Can still perform follower reads on both nodes, as this does not rely on + // the lease and does not consult the breaker. + tc.FollowerRead(n1) + tc.FollowerRead(n2) // Let the breaker heal and things should go back to normal. This is not a // trivial thing to hold, as the probe needs to go through for this, and if // we're not careful, the probe itself is held up by the breaker as well, or // the probe will try to acquire a lease (which we're currently careful to // avoid). - resumeHeartbeats() tc.SetProbeEnabled(n1, true) tc.UntripsSoon(t, tc.Read, n1) tc.UntripsSoon(t, tc.Write, n1) @@ -254,21 +263,33 @@ func TestReplicaCircuitBreaker_Leaseholder_QuorumLoss(t *testing.T) { tc.StopServer(n2) // lose quorum // We didn't lose the liveness range (which is only on n1). - require.NoError(t, tc.Server(n1).HeartbeatNodeLiveness()) + tc.HeartbeatNodeLiveness(t, n1) + + // Read still works, as we have valid lease and no poisoned latch + // underneath. + require.NoError(t, tc.Read(n1)) tc.SetSlowThreshold(10 * time.Millisecond) { err := tc.Write(n1) var ae *roachpb.AmbiguousResultError require.True(t, errors.As(err, &ae), "%+v", err) t.Log(err) + tc.RequireIsBreakerOpen(t, err) + } + // We still have a valid lease, but now the above write is holding a poisoned + // latch (this is true despite the write itself having returned already). + // However, can still serve follower reads because those don't check latches + // (nor do they need the lease, though there is a valid one in this case). + { + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + require.NoError(t, tc.FollowerRead(n1)) } - tc.RequireIsBreakerOpen(t, tc.Read(n1)) // Bring n2 back and service should be restored. tc.SetSlowThreshold(0) // reset require.NoError(t, tc.RestartServer(n2)) - tc.UntripsSoon(t, tc.Read, n1) - require.NoError(t, tc.Write(n1)) + tc.UntripsSoon(t, tc.Write, n1) // poisoned latch goes away + require.NoError(t, tc.Read(n1)) } // In this test, the range is on n1 and n2 and we place the lease on n2 and @@ -287,12 +308,13 @@ func TestReplicaCircuitBreaker_Follower_QuorumLoss(t *testing.T) { // Get lease to n2 so that we can lose it without taking down the system ranges. desc := tc.LookupRangeOrFatal(t, tc.ScratchRange(t)) tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(n2)) - resumeHeartbeats := tc.ExpireAllLeases(t, keepHeartbeats) + resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, keepHeartbeats) tc.StopServer(n2) // lose quorum and leaseholder resumeHeartbeats() // We didn't lose the liveness range (which is only on n1). - require.NoError(t, tc.Server(n1).HeartbeatNodeLiveness()) + tc.HeartbeatNodeLiveness(t, n1) + tc.SetSlowThreshold(10 * time.Millisecond) tc.RequireIsBreakerOpen(t, tc.Write(n1)) tc.RequireIsBreakerOpen(t, tc.Read(n1)) @@ -300,8 +322,8 @@ func TestReplicaCircuitBreaker_Follower_QuorumLoss(t *testing.T) { // Bring n2 back and service should be restored. tc.SetSlowThreshold(0) // reset require.NoError(t, tc.RestartServer(n2)) - tc.UntripsSoon(t, tc.Read, n1) - require.NoError(t, tc.Write(n1)) + tc.UntripsSoon(t, tc.Write, n1) + require.NoError(t, tc.Read(n1)) } // This test is skipped but documents that the current circuit breakers cannot @@ -353,7 +375,7 @@ func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) { // Expire all leases. We also pause all heartbeats but that doesn't really // matter since the liveness range is unavailable anyway. - resume := tc.ExpireAllLeases(t, pauseHeartbeats) + resume := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats) defer resume() // Since there isn't a lease, and the liveness range is down, the circuit @@ -373,6 +395,11 @@ func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) { } type dummyStream struct { + name string + t interface { + Helper() + Logf(string, ...interface{}) + } ctx context.Context recv chan *roachpb.RangeFeedEvent } @@ -382,7 +409,8 @@ func (s *dummyStream) Context() context.Context { } func (s *dummyStream) Send(ev *roachpb.RangeFeedEvent) error { - if ev.Val == nil { + if ev.Val == nil && ev.Error == nil { + s.t.Logf("%s: ignoring event: %v", s.name, ev) return nil } select { @@ -410,39 +438,44 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { args := &roachpb.RangeFeedRequest{ Span: roachpb.Span{Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey()}, } - // This test shouldn't take in excess of 45s even under the worst of conditions. - ctx, cancel := context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration) + + ctx, cancel := context.WithCancel(ctx) defer cancel() - stream1 := &dummyStream{ctx: ctx, recv: make(chan *roachpb.RangeFeedEvent)} + stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)} require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) { err := tc.repls[0].RangeFeed(args, stream1).GoError() if ctx.Err() != nil { - return + return // main goroutine stopping } assert.NoError(t, err) // avoid Fatal on goroutine })) - readOneVal := func(t *testing.T, stream *dummyStream) { + readOneVal := func(ctx context.Context, stream *dummyStream, timeout time.Duration) error { for { - var done bool select { + case <-time.After(timeout): + return errors.Errorf("%s: read timed out after %.2fs", stream.name, timeout.Seconds()) case <-ctx.Done(): - t.Fatal(ctx.Err()) + return ctx.Err() case ev := <-stream.recv: - t.Log(ev) - done = true - } - if done { - break + if ev.Error != nil { + return ev.Error.Error.GoError() + } + if ev.Val != nil { + t.Logf("%s: %s", stream.name, ev) + return nil + } } } } - require.NoError(t, tc.Write(n1)) - readOneVal(t, stream1) + testutils.SucceedsSoon(t, func() error { + require.NoError(t, tc.Write(n1)) + return readOneVal(ctx, stream1, time.Millisecond) + }) // NB: keep heartbeats because we're not trying to lose the liveness range. - undo := tc.ExpireAllLeases(t, keepHeartbeats) + undo := tc.ExpireAllLeasesAndN1LivenessRecord(t, keepHeartbeats) undo() tc.SetSlowThreshold(10 * time.Millisecond) tc.StopServer(n2) @@ -450,11 +483,11 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { // Start another stream during the "outage" to make sure it isn't rejected by // the breaker. - stream2 := &dummyStream{ctx: ctx, recv: make(chan *roachpb.RangeFeedEvent)} + stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)} require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) { err := tc.repls[0].RangeFeed(args, stream2).GoError() if ctx.Err() != nil { - return + return // main goroutine stopping } assert.NoError(t, err) // avoid Fatal on goroutine })) @@ -463,8 +496,13 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { require.NoError(t, tc.RestartServer(n2)) tc.UntripsSoon(t, tc.Write, n1) - readOneVal(t, stream1) - readOneVal(t, stream2) + require.NoError(t, readOneVal(ctx, stream1, testutils.DefaultSucceedsSoonDuration)) + // For the stream that started mid-way through the outage, we expect it to + // return a circuit breaker error, but in theory it could also never have + // tried to acquire a lease, in which case it might return a value as well. + if err := readOneVal(ctx, stream2, testutils.DefaultSucceedsSoonDuration); err != nil { + tc.RequireIsBreakerOpen(t, err) + } } func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { @@ -477,7 +515,7 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { // disabled, i.e. it will stay tripped. require.NoError(t, tc.Write(n1)) tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("boom")) + tc.Report(n1, errors.New("injected breaker error")) exemptRequests := []func() roachpb.Request{ func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} }, @@ -510,31 +548,41 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { for _, reqFn := range exemptRequests { req := reqFn() - t.Run(fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) { + tc.Run(t, fmt.Sprintf("with-existing-lease/%s", req.Method()), func(t *testing.T) { require.NoError(t, tc.Send(n1, req)) }) } for _, reqFn := range exemptRequests { req := reqFn() - t.Run(fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) { - resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) + tc.Run(t, fmt.Sprintf("with-acquire-lease/%s", req.Method()), func(t *testing.T) { + resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats) resumeHeartbeats() // intentionally resume right now so that lease can be acquired - require.NoError(t, tc.Send(n1, req)) + // NB: when looking into the traces here, we sometimes see - as expected - + // that when the request tries to acquire a lease, the breaker is still + // tripped. That's why there is a retry loop here. + testutils.SucceedsSoon(t, func() error { + err := tc.Send(n1, req) + if errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { + return err + } + require.NoError(t, err) + return nil + }) }) } - resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) - defer resumeHeartbeats() // can't acquire leases until test ends + resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats) for _, reqFn := range exemptRequests { req := reqFn() - if req.Method() == roachpb.Probe { - // Probe does not require the lease, and is the most-tested of the bunch - // already. We don't have to test it again here, which would require undue - // amounts of special-casing below. - continue - } - t.Run(fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) { + tc.Run(t, fmt.Sprintf("with-unavailable-lease/%s", req.Method()), func(t *testing.T) { + if m := req.Method(); m == roachpb.Probe { + // Probe does not require the lease, and is the most-tested of the bunch + // already. We don't have to test it again here, which would require undue + // amounts of special-casing below. + skip.IgnoreLintf(t, "subtest does not apply to %s", m) + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond) defer cancel() const maxWait = 5 * time.Second @@ -548,6 +596,46 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { require.Less(t, timeutil.Since(tBegin), maxWait) }) } + + // Restore the breaker via the probe. + resumeHeartbeats() + tc.SetProbeEnabled(n1, true) + tc.UntripsSoon(t, tc.Write, n1) + + // Lose quorum (liveness stays intact). + tc.SetSlowThreshold(10 * time.Millisecond) + tc.StopServer(n2) + // Let the breaker trip. This leaves a poisoned latch behind that at least some of + // the requests will interact with. + tc.RequireIsBreakerOpen(t, tc.Write(n1)) + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + + for _, reqFn := range exemptRequests { + req := reqFn() + tc.Run(t, fmt.Sprintf("with-poisoned-latch/%s", req.Method()), func(t *testing.T) { + if m := req.Method(); m == roachpb.GC { + // GC without GCKeys acquires no latches and is a pure read. If we want + // to put a key in there, we need to pick the right timestamp (since you + // can't GC a live key); it's all rather annoying and not worth it. In + // the long run, we also completely want to avoid acquiring latches for + // this request (since it should only mutate keyspace that has since + // fallen under the GCThreshold), so avoid cooking up anything special + // here. + skip.IgnoreLintf(t, "subtest does not apply to %s", m) + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Millisecond) + defer cancel() + const maxWait = 5 * time.Second + tBegin := timeutil.Now() + err := tc.SendCtx(ctx, n1, req) + t.Log(err) + require.Error(t, err) + require.Error(t, ctx.Err()) + // Make sure we didn't run into the "long" timeout inside of SendCtx but + // actually terminated as a result of our ctx cancelling. + require.Less(t, timeutil.Since(tBegin), maxWait) + }) + } } // Test infrastructure below. @@ -573,19 +661,18 @@ func makeBreakerToggleable(b *circuit.Breaker) (setProbeEnabled func(bool)) { } } -type replWithKnob struct { - *kvserver.Replica - setProbeEnabled func(bool) -} - type circuitBreakerTest struct { + t decoT *testcluster.TestCluster slowThresh *atomic.Value // time.Duration ManualClock *hlc.HybridManualClock repls []replWithKnob // 0 -> repl on Servers[0], etc + + seq int } func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { + skip.UnderStressRace(t) manualClock := hlc.NewHybridManualClock() var rangeID int64 // atomic slowThresh := &atomic.Value{} // supports .SetSlowThreshold(x) @@ -665,6 +752,7 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { repls = append(repls, replWithKnob{repl, enableProbe}) } return &circuitBreakerTest{ + t: decoT{t}, TestCluster: tc, ManualClock: manualClock, repls: repls, @@ -672,6 +760,21 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { } } +// Run is a wrapper around t.Run that allows the test harness to print traces +// using the subtest's *testing.T. +func (cbt *circuitBreakerTest) Run(t *testing.T, name string, f func(t *testing.T)) { + t.Helper() + t.Run(name, func(t *testing.T) { + t.Helper() + outerT := cbt.t + cbt.t = decoT{t} + defer func() { + cbt.t = outerT + }() + f(t) + }) +} + func (cbt *circuitBreakerTest) SetProbeEnabled(idx int, to bool) { cbt.repls[idx].setProbeEnabled(to) } @@ -686,31 +789,60 @@ func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) er t.Helper() err := method(idx) // All errors coming out should be annotated as coming from - // the circuit breaker. - if err != nil && !errors.Is(err, circuit.ErrBreakerOpen) { + // the circuit breaker. In rare cases, we can also see a + // NotLeaseholderError such as this one: + // [NotLeaseHolderError] failed to manipulate liveness record: heartbeat + // failed on epoch increment; r45: replica (n1,s1):1 not lease holder; + // current lease is repl=(n1,s1):1 seq=1 start=0,0 epo=1 pro=[...] + if err != nil && + !errors.Is(err, circuit.ErrBreakerOpen) && + !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { + t.Fatalf("saw unexpected error %+v", err) } return err }) } -func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats bool) (undo func()) { +func (cbt *circuitBreakerTest) ExpireAllLeasesAndN1LivenessRecord( + t *testing.T, pauseHeartbeats bool, +) (undo func()) { t.Helper() - var maxWT int64 var fs []func() - for _, srv := range cbt.Servers { + for idx, srv := range cbt.Servers { lv := srv.NodeLiveness().(*liveness.NodeLiveness) + if pauseHeartbeats { undo := lv.PauseAllHeartbeatsForTest() fs = append(fs, undo) } + self, ok := lv.Self() require.True(t, ok) - if maxWT < self.Expiration.WallTime { - maxWT = self.Expiration.WallTime + + cbt.ManualClock.Forward(self.Expiration.WallTime) + if idx == n1 { + // Invalidate n1's liveness record, to make sure that ranges on n1 need + // to acquire a new lease (vs waiting for a heartbeat to the liveness + // record resuscitating the old one). + // + // Needing to do this is the reason for special-casing this entire method + // around n1; if we stop heartbeats for both nodes, they can't increment + // each others liveness records: if a node's liveness is paused, it doesn't + // allow incrementing records neither. (This is silly). + lv2 := cbt.Server(n2).NodeLiveness().(*liveness.NodeLiveness) + testutils.SucceedsSoon(t, func() error { + self, ok := lv.Self() + require.True(t, ok) + if self.IsLive(cbt.Server(n2).Clock().Now().GoTime()) { + // Someone else must have incremented epoch. + return nil + } + return lv2.IncrementEpoch(context.Background(), self) + }) } } - cbt.ManualClock.Forward(maxWT + 1) + return func() { for _, f := range fs { f() @@ -719,15 +851,32 @@ func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats boo } func (cbt *circuitBreakerTest) Send(idx int, req roachpb.Request) error { + cbt.t.Helper() return cbt.SendCtx(context.Background(), idx, req) - } func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb.Request) error { + return cbt.SendCtxTS(ctx, idx, req, cbt.repls[idx].Clock().Now()) +} + +func (cbt *circuitBreakerTest) SendCtxTS( + ctx context.Context, idx int, req roachpb.Request, ts hlc.Timestamp, +) error { + cbt.t.Helper() + ctx, finishAndGet := tracing.ContextWithRecordingSpan(ctx, cbt.repls[idx].Tracer, "SendCtx("+req.Method().String()+")") + defer time.AfterFunc(10*time.Second, func() { + rec := tracing.SpanFromContext(ctx).GetConfiguredRecording() + cbt.t.Logf("slow request: %s", rec) + }).Stop() + defer func() { + cbt.t.Helper() + rec := finishAndGet() + cbt.t.Logf("%s", rec) + }() var ba roachpb.BatchRequest repl := cbt.repls[idx] ba.RangeID = repl.Desc().RangeID - ba.Timestamp = repl.Clock().Now() + ba.Timestamp = ts ba.Add(req) if h := req.Header(); len(h.Key) == 0 { h.Key = repl.Desc().StartKey.AsRawKey() @@ -751,29 +900,20 @@ func (cbt *circuitBreakerTest) SendCtx(ctx context.Context, idx int, req roachpb if err := ctx.Err(); err != nil && parCtx.Err() == nil { pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr) } - { - var err error - repl.VisitBreakerContexts(func(ctx context.Context) { - if err == nil && ctx.Value(req) != nil { - err = errors.Errorf( - "request %s returned but context still tracked in breaker", req, - ) - } - }) - if err != nil { - pErr = roachpb.NewErrorf("%s; after %v", err, pErr) - } - } return pErr.GoError() } func (cbt *circuitBreakerTest) WriteDS(idx int) error { + cbt.t.Helper() put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put) } -func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb.Request) error { +func (cbt *circuitBreakerTest) sendViaDistSender( + ds *kvcoord.DistSender, req roachpb.Request, +) error { + cbt.t.Helper() var ba roachpb.BatchRequest ba.Add(req) ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) @@ -790,6 +930,7 @@ func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) { t.Helper() + t.Log(err) // We also accept an ambiguous result wrapping a breaker error; this occurs // when the breaker trips while a write is already inflight. if aErr := (*roachpb.AmbiguousResultError)(nil); errors.As(err, &aErr) && aErr.WrappedErr != nil { @@ -800,6 +941,7 @@ func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) { func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) { t.Helper() + t.Log(err) ok := errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) require.True(t, ok, "%+v", err) } @@ -812,13 +954,56 @@ func (cbt *circuitBreakerTest) SetSlowThreshold(dur time.Duration) { } func (cbt *circuitBreakerTest) Write(idx int) error { + cbt.t.Helper() repl := cbt.repls[idx] - put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) + cbt.seq++ + put := roachpb.NewPut( + repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString(fmt.Sprintf("hello-%d", cbt.seq)), + ) return cbt.Send(idx, put) } func (cbt *circuitBreakerTest) Read(idx int) error { + cbt.t.Helper() repl := cbt.repls[idx] get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) return cbt.Send(idx, get) } + +func (cbt *circuitBreakerTest) FollowerRead(idx int) error { + cbt.t.Helper() + repl := cbt.repls[idx] + get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) + ctx := context.Background() + ts := repl.GetClosedTimestamp(ctx) + return cbt.SendCtxTS(ctx, idx, get, ts) +} + +func (cbt *circuitBreakerTest) HeartbeatNodeLiveness(t *testing.T, idx int) { + // Retry loop is needed because heartbeat may race with internal heartbeat + // loop. + testutils.SucceedsSoon(t, func() error { + return cbt.Server(idx).HeartbeatNodeLiveness() + }) +} + +type replWithKnob struct { + *kvserver.Replica + setProbeEnabled func(bool) +} + +type logT interface { + Helper() + Logf(string, ...interface{}) +} + +type decoT struct { + logT +} + +func (t *decoT) Logf(format string, args ...interface{}) { + // It can be difficult to spot the actual failure among all of the + // traces, so this is a convenient place to annotate the logging + // (or disable it one-off). + t.logT.Logf("info:\n"+format, args...) +} diff --git a/pkg/kv/kvserver/concurrency/BUILD.bazel b/pkg/kv/kvserver/concurrency/BUILD.bazel index 4e28adfbb895..b895b75d5a91 100644 --- a/pkg/kv/kvserver/concurrency/BUILD.bazel +++ b/pkg/kv/kvserver/concurrency/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/concurrency/lock", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/intentresolver", "//pkg/kv/kvserver/spanlatch", "//pkg/kv/kvserver/spanset", @@ -54,6 +55,7 @@ go_test( deps = [ "//pkg/kv/kvserver/batcheval", "//pkg/kv/kvserver/concurrency/lock", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/intentresolver", "//pkg/kv/kvserver/spanlatch", "//pkg/kv/kvserver/spanset", diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index 883a83e6037c..3a676f08c752 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -189,6 +190,10 @@ type RequestSequencer interface { // does so, it will not return a request guard. SequenceReq(context.Context, *Guard, Request, RequestEvalKind) (*Guard, Response, *Error) + // PoisonReq marks a request's latches as poisoned. This allows requests + // waiting for the latches to fail fast. + PoisonReq(*Guard) + // FinishReq marks the request as complete, releasing any protection // the request had against conflicting requests and allowing conflicting // requests that are blocked on this one to proceed. The guard should not @@ -385,6 +390,9 @@ type Request struct { // with a WriteIntentError instead of entering the queue and waiting. MaxLockWaitQueueLength int + // The poison.Policy to use for this Request. + PoisonPolicy poison.Policy + // The individual requests in the batch. Requests []roachpb.RequestUnion @@ -464,9 +472,12 @@ type latchManager interface { // WaitFor waits for conflicting latches on the specified spans without adding // any latches itself. Fast path for operations that only require flushing out // old operations without blocking any new ones. - WaitFor(ctx context.Context, spans *spanset.SpanSet) *Error + WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy) *Error + + // Poison a guard's latches, allowing waiters to fail fast. + Poison(latchGuard) - // Releases latches, relinquish its protection from conflicting requests. + // Release a guard's latches, relinquish its protection from conflicting requests. Release(latchGuard) // Metrics returns information about the state of the latchManager. diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index c3ee4f1cd150..5bc0d2241494 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -230,7 +230,7 @@ func (m *managerImpl) sequenceReqWithGuard(ctx context.Context, g *Guard) (Respo // them. if shouldWaitOnLatchesWithoutAcquiring(g.Req) { log.Event(ctx, "waiting on latches without acquiring") - return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans) + return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans, g.Req.PoisonPolicy) } // Provide the manager with an opportunity to intercept the request. It @@ -382,6 +382,15 @@ func shouldWaitOnLatchesWithoutAcquiring(req Request) bool { return req.isSingle(roachpb.Barrier) } +// PoisonReq implements the RequestSequencer interface. +func (m *managerImpl) PoisonReq(g *Guard) { + // NB: g.lg == nil is the case for requests that ignore latches, see + // shouldIgnoreLatches. + if g.lg != nil { + m.lm.Poison(g.lg) + } +} + // FinishReq implements the RequestSequencer interface. func (m *managerImpl) FinishReq(g *Guard) { // NOTE: we release latches _before_ exiting lock wait-queues deliberately. diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index e533b5174872..a10bc1786dab 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -54,9 +54,10 @@ import ( // The input files use the following DSL: // // new-txn name= ts=[,] epoch= [uncertainty-limit=[,]] -// new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] [lock-timeout] [max-lock-wait-queue-length=] +// new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] [lock-timeout] [max-lock-wait-queue-length=] [poison-policy=[err|wait]] // [=...] (hint: see scanSingleRequest) // sequence req= [eval-kind= // finish req= // // handle-write-intent-error req= txn= key= lease-seq= @@ -169,6 +170,8 @@ func TestConcurrencyManagerBasic(t *testing.T) { d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) } + pp := scanPoisonPolicy(t, d) + // Each roachpb.Request is provided on an indented line. reqs, reqUnions := scanRequests(t, d, c) latchSpans, lockSpans := c.collectSpans(t, txn, ts, reqs) @@ -184,6 +187,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { Requests: reqUnions, LatchSpans: latchSpans, LockSpans: lockSpans, + PoisonPolicy: pp, } return "" @@ -256,7 +260,20 @@ func TestConcurrencyManagerBasic(t *testing.T) { c.mu.Unlock() }) return c.waitAndCollect(t, mon) + case "poison": + var reqName string + d.ScanArgs(t, "req", &reqName) + guard, ok := c.guardsByReqName[reqName] + if !ok { + d.Fatalf(t, "unknown request: %s", reqName) + } + opName := fmt.Sprintf("poison %s", reqName) + mon.runSync(opName, func(ctx context.Context) { + log.Event(ctx, "poisoning request") + m.PoisonReq(guard) + }) + return c.waitAndCollect(t, mon) case "handle-write-intent-error": var reqName string d.ScanArgs(t, "req", &reqName) diff --git a/pkg/kv/kvserver/concurrency/datadriven_util_test.go b/pkg/kv/kvserver/concurrency/datadriven_util_test.go index e9e8f9483a33..11067bce44c8 100644 --- a/pkg/kv/kvserver/concurrency/datadriven_util_test.go +++ b/pkg/kv/kvserver/concurrency/datadriven_util_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -75,6 +76,24 @@ func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.Wa } } +func scanPoisonPolicy(t *testing.T, d *datadriven.TestData) poison.Policy { + const key = "poison-policy" + if !d.HasArg(key) { + return poison.Policy_Error + } + var policy string + d.ScanArgs(t, key, &policy) + switch policy { + case "error": + return poison.Policy_Error + case "wait": + return poison.Policy_Wait + default: + d.Fatalf(t, "unknown poison policy: %s", policy) + return 0 + } +} + func scanSingleRequest( t *testing.T, d *datadriven.TestData, line string, txns map[string]*roachpb.Transaction, ) roachpb.Request { diff --git a/pkg/kv/kvserver/concurrency/latch_manager.go b/pkg/kv/kvserver/concurrency/latch_manager.go index b0a4b8eb1073..b0f07057344b 100644 --- a/pkg/kv/kvserver/concurrency/latch_manager.go +++ b/pkg/kv/kvserver/concurrency/latch_manager.go @@ -13,6 +13,7 @@ package concurrency import ( "context" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -24,7 +25,7 @@ type latchManagerImpl struct { } func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard, *Error) { - lg, err := m.m.Acquire(ctx, req.LatchSpans) + lg, err := m.m.Acquire(ctx, req.LatchSpans, req.PoisonPolicy) if err != nil { return nil, roachpb.NewError(err) } @@ -32,7 +33,7 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard } func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard { - lg := m.m.AcquireOptimistic(req.LatchSpans) + lg := m.m.AcquireOptimistic(req.LatchSpans, req.PoisonPolicy) return lg } @@ -50,14 +51,20 @@ func (m *latchManagerImpl) WaitUntilAcquired( return lg, nil } -func (m *latchManagerImpl) WaitFor(ctx context.Context, ss *spanset.SpanSet) *Error { - err := m.m.WaitFor(ctx, ss) +func (m *latchManagerImpl) WaitFor( + ctx context.Context, ss *spanset.SpanSet, pp poison.Policy, +) *Error { + err := m.m.WaitFor(ctx, ss, pp) if err != nil { return roachpb.NewError(err) } return nil } +func (m *latchManagerImpl) Poison(lg latchGuard) { + m.m.Poison(lg.(*spanlatch.Guard)) +} + func (m *latchManagerImpl) Release(lg latchGuard) { m.m.Release(lg.(*spanlatch.Guard)) } diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index c78ec55fb09d..b5a1e30ce159 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -870,7 +871,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { // cancellation, the code makes sure to release latches when returning // early due to error. Otherwise other requests will get stuck and // group.Wait() will not return until the test times out. - lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans) + lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans, poison.Policy_Error) if err != nil { return err } @@ -1414,7 +1415,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { var err error firstIter := true for { - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil { doneCh <- err return } @@ -1449,7 +1450,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { return } // Release locks. - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil { doneCh <- err return } diff --git a/pkg/kv/kvserver/concurrency/poison/BUILD.bazel b/pkg/kv/kvserver/concurrency/poison/BUILD.bazel new file mode 100644 index 000000000000..fa267c099677 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/BUILD.bazel @@ -0,0 +1,44 @@ +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "poison_proto", + srcs = [ + "error.proto", + "policy.proto", + ], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb:roachpb_proto", + "//pkg/util/hlc:hlc_proto", + "@com_github_gogo_protobuf//gogoproto:gogo_proto", + ], +) + +go_proto_library( + name = "poison_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison", + proto = ":poison_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/util/hlc", + "@com_github_gogo_protobuf//gogoproto", + ], +) + +go_library( + name = "poison", + srcs = ["error.go"], + embed = [":poison_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", + ], +) diff --git a/pkg/kv/kvserver/concurrency/poison/error.go b/pkg/kv/kvserver/concurrency/poison/error.go new file mode 100644 index 000000000000..3c473fc40a50 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/error.go @@ -0,0 +1,42 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package poison + +import ( + "fmt" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// NewPoisonedError instantiates a *PoisonedError referencing a poisoned latch +// (as identified by span and timestamp). +func NewPoisonedError(span roachpb.Span, ts hlc.Timestamp) *PoisonedError { + return &PoisonedError{Span: span, Timestamp: ts} +} + +var _ errors.SafeFormatter = (*PoisonedError)(nil) +var _ fmt.Formatter = (*PoisonedError)(nil) + +// SafeFormatError implements errors.SafeFormatter. +func (e *PoisonedError) SafeFormatError(p errors.Printer) error { + p.Printf("encountered poisoned latch %s@%s", e.Span, e.Timestamp) + return nil +} + +// Format implements fmt.Formatter. +func (e *PoisonedError) Format(s fmt.State, verb rune) { errors.FormatError(e, s, verb) } + +// Error implements error. +func (e *PoisonedError) Error() string { + return fmt.Sprint(e) +} diff --git a/pkg/kv/kvserver/concurrency/poison/error.proto b/pkg/kv/kvserver/concurrency/poison/error.proto new file mode 100644 index 000000000000..84fbbeb5770b --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/error.proto @@ -0,0 +1,22 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.kv.kvserver.concurrency.poison; +option go_package = "poison"; + +import "util/hlc/timestamp.proto"; +import "roachpb/data.proto"; +import "gogoproto/gogo.proto"; + +message PoisonedError { + roachpb.Span span = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; +} diff --git a/pkg/kv/kvserver/concurrency/poison/policy.proto b/pkg/kv/kvserver/concurrency/poison/policy.proto new file mode 100644 index 000000000000..5f3371cbf281 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/policy.proto @@ -0,0 +1,35 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.kv.kvserver.concurrency.poison; +option go_package = "poison"; + +import "gogoproto/gogo.proto"; + +// Policy determines how a request will react to encountering a poisoned +// latch. A poisoned latch is a latch for which the holder is unable to make +// progress. That is, waiters of this latch should not expect to be able to +// acquire this latch "for some time"; in practice this is the case of an +// unavailable Replica. +// +// The name is inspired by Rust's mutexes, which undergo poisoning[^1] when a +// thread panics while holding the mutex. +// +// [^1]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#poisoning +enum Policy { + // Policy_Wait instructs a request to return an error upon encountering + // a poisoned latch. + Wait = 0; + + // Policy_Error instructs a request to return an error upon encountering + // a poisoned latch. + Error = 1; +} diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err new file mode 100644 index 000000000000..2fffae01a743 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err @@ -0,0 +1,60 @@ +# This test sets up the following situation: +# +# e <- put (PoisonPolicyErr; waiting) +# b---f <- scan (PoisonPolicyErr; waiting) +# c <- put (PoisonPolicyErr; sequenced, poisoned) +# +# Since everyone uses PoisonPolicyErr, the chain unwinds. However, only `b---f` +# gets an error, since it overlaps `c`. `e` can proceed once `c` and `b---f` +# have finished. + +new-request txn=none name=putc ts=10,0 + put key=c value=hi +---- + +sequence req=putc +---- +[1] sequence putc: sequencing request +[1] sequence putc: acquiring latches +[1] sequence putc: scanning lock table for conflicting locks +[1] sequence putc: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=pute ts=11,0 + put key=e value=hi +---- + +sequence req=pute +---- +[3] sequence pute: sequencing request +[3] sequence pute: acquiring latches +[3] sequence pute: waiting to acquire write latch e@11.000000000,0, held by read latch {b-f}@11.000000000,1 +[3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=putc +---- +[-] poison putc: poisoning request +[2] sequence readbf: sequencing complete, returned error: encountered poisoned latch c@10.000000000,0 +[3] sequence pute: scanning lock table for conflicting locks +[3] sequence pute: sequencing complete, returned guard + +finish req=putc +---- +[-] finish putc: finishing request + +finish req=pute +---- +[-] finish pute: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect new file mode 100644 index 000000000000..2615d3524c16 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect @@ -0,0 +1,61 @@ +# This test sets up the following situation: +# +# e <- put (PoisonPolicyError; waiting) +# b---f <- scan (PoisonPolicyWait; waiting) +# c <- put (PoisonPolicyWait; sequenced, poisoned) +# +# When `c` gets poisoned (and continues waiting), the same +# happens to `b---f`, which prompts `e` to fail fast. + +new-request txn=none name=putc ts=10,0 poison-policy=wait + put key=c value=hi +---- + +sequence req=putc +---- +[1] sequence putc: sequencing request +[1] sequence putc: acquiring latches +[1] sequence putc: scanning lock table for conflicting locks +[1] sequence putc: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 poison-policy=wait + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=pute ts=11,0 + put key=e value=hi +---- + +sequence req=pute +---- +[3] sequence pute: sequencing request +[3] sequence pute: acquiring latches +[3] sequence pute: waiting to acquire write latch e@11.000000000,0, held by read latch {b-f}@11.000000000,1 +[3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=putc +---- +[-] poison putc: poisoning request +[2] sequence readbf: encountered poisoned latch; continuing to wait +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal +[3] sequence pute: sequencing complete, returned error: encountered poisoned latch {b-f}@11.000000000,1 + +finish req=putc +---- +[-] finish putc: finishing request +[2] sequence readbf: scanning lock table for conflicting locks +[2] sequence readbf: sequencing complete, returned guard + +finish req=readbf +---- +[-] finish readbf: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint new file mode 100644 index 000000000000..bf7646ff84b6 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint @@ -0,0 +1,59 @@ +# This test sets up the following situation: +# +# e <- put (PoisonPolicyWait; waiting) +# b---f <- scan (PoisonPolicyError; waiting) +# c <- put (PoisonPolicyWait; sequenced, poisoned) +# +# The top and bottom request use PoisonPolicyWait, so the scan returns when `c` +# is poisoned, which in turn lets `e` through. However, `c` continues to wait. + +new-request txn=none name=putc ts=10,0 poison-policy=wait + put key=c value=hi +---- + +sequence req=putc +---- +[1] sequence putc: sequencing request +[1] sequence putc: acquiring latches +[1] sequence putc: scanning lock table for conflicting locks +[1] sequence putc: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=pute ts=11,0 poison-policy=wait + put key=e value=hi +---- + +sequence req=pute +---- +[3] sequence pute: sequencing request +[3] sequence pute: acquiring latches +[3] sequence pute: waiting to acquire write latch e@11.000000000,0, held by read latch {b-f}@11.000000000,1 +[3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=putc +---- +[-] poison putc: poisoning request +[2] sequence readbf: sequencing complete, returned error: encountered poisoned latch c@10.000000000,0 +[3] sequence pute: scanning lock table for conflicting locks +[3] sequence pute: sequencing complete, returned guard + +finish req=putc +---- +[-] finish putc: finishing request + +finish req=pute +---- +[-] finish pute: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping new file mode 100644 index 000000000000..4e8799dde333 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping @@ -0,0 +1,61 @@ +# This test sets up the following situation: +# +# c <- put (PoisonPolicyWait; waiting) +# b---f <- scan (PoisonPolicyError; waiting) +# c <- put (PoisonPolicyWait; sequenced, poisoned) +# +# When the bottom `c` is poisoned, `b---f` fails fast, and +# the top `c` poisons itself but continues to wait. + +new-request txn=none name=put1 ts=10,0 poison-policy=wait + put key=c value=hi +---- + +sequence req=put1 +---- +[1] sequence put1: sequencing request +[1] sequence put1: acquiring latches +[1] sequence put1: scanning lock table for conflicting locks +[1] sequence put1: sequencing complete, returned guard + +new-request txn=none name=readbf ts=11,1 + scan key=b endkey=f +---- + +sequence req=readbf +---- +[2] sequence readbf: sequencing request +[2] sequence readbf: acquiring latches +[2] sequence readbf: waiting to acquire read latch {b-f}@11.000000000,1, held by write latch c@10.000000000,0 +[2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal + +new-request txn=none name=put2 ts=11,0 poison-policy=wait + put key=c value=bar +---- + +sequence req=put2 +---- +[3] sequence put2: sequencing request +[3] sequence put2: acquiring latches +[3] sequence put2: waiting to acquire write latch c@11.000000000,0, held by write latch c@10.000000000,0 +[3] sequence put2: blocked on select in spanlatch.(*Manager).waitForSignal + +poison req=put1 +---- +[-] poison put1: poisoning request +[2] sequence readbf: sequencing complete, returned error: encountered poisoned latch c@10.000000000,0 +[3] sequence put2: encountered poisoned latch; continuing to wait +[3] sequence put2: blocked on select in spanlatch.(*Manager).waitForSignal + +finish req=put1 +---- +[-] finish put1: finishing request +[3] sequence put2: scanning lock table for conflicting locks +[3] sequence put2: sequencing complete, returned guard + +finish req=put2 +---- +[-] finish put2: finishing request + +reset +---- diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index a6e92183026d..773cf9a0e65d 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -227,13 +227,6 @@ func (r *Replica) Breaker() *circuit2.Breaker { return r.breaker.wrapped } -func (r *Replica) VisitBreakerContexts(fn func(ctx context.Context)) { - r.breaker.cancels.Visit(func(ctx context.Context, _ func()) (remove bool) { - fn(ctx) - return false // keep - }) -} - func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) { r.raftMu.Lock() defer r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_circuit_breaker.go b/pkg/kv/kvserver/replica_circuit_breaker.go index e64ddb75f3a9..9957511c02f5 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker.go +++ b/pkg/kv/kvserver/replica_circuit_breaker.go @@ -38,6 +38,7 @@ type replicaInCircuitBreaker interface { Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) slowReplicationThreshold(ba *roachpb.BatchRequest) (time.Duration, bool) replicaUnavailableError() error + poisonInflightLatches(err error) } var defaultReplicaCircuitBreakerSlowReplicationThreshold = envutil.EnvOrDefaultDuration( @@ -74,127 +75,15 @@ type replicaCircuitBreaker struct { stopper *stop.Stopper r replicaInCircuitBreaker st *cluster.Settings - cancels CancelStorage wrapped *circuit.Breaker versionIsActive int32 // atomic } -// Register takes a cancelable context and its cancel function (which the caller -// must cancel when the request has finished), and registers them with the -// circuit breaker. If the breaker is already tripped, its error is returned -// immediately and the caller should not continue processing the request. -// Otherwise, the cancel function is invoked if the breaker trips. The caller is -// provided with a token and signaller for use in a call to -// UnregisterAndAdjustError upon request completion. That method also takes the -// error (if any) resulting from the request to ensure that in the case of a -// tripped breaker, the error reflects this fact. -func (br *replicaCircuitBreaker) Register( - ctx context.Context, cancel func(), -) (_token interface{}, _ signaller, _ error) { - brSig := br.Signal() - - // TODO(tbg): we may want to exclude more requests from this check, or allow - // requests to exclude themselves from the check (via their header). This - // latter mechanism could also replace hasBypassCircuitBreakerMarker. - if hasBypassCircuitBreakerMarker(ctx) { - // NB: brSig.C() == nil. - brSig = neverTripSignaller{} - } - - if brSig.C() == nil { - // Circuit breakers are disabled and/or this is a probe request, so don't do - // any work registering the context. UnregisterAndAdjustError will know that we didn't - // since it checks the same brSig for a nil C(). - return ctx, brSig, nil - } - - // NB: it might be tempting to check the breaker error first to avoid the call - // to Set below if the breaker is tripped at this point. However, the ordering - // here, subtly, is required to avoid situations in which the cancel is still - // in the map despite the probe having shut down (in which case cancel will - // not be invoked until the probe is next triggered, which maybe "never"). - // - // To see this, consider the case in which the breaker is initially not - // tripped when we check, but then trips immediately and has the probe fail - // (and terminate). Since the probe is in charge of cancelling all tracked - // requests, we must ensure that this probe sees our request. Adding the - // request prior to calling Signal() means that if we see an untripped - // breaker, no probe is running - consequently should the breaker then trip, - // it will observe our cancel, thus avoiding a leak. If we observe a tripped - // breaker, we also need to remove our own cancel, as the probe may already - // have passed the point at which it iterates through the cancels prior to us - // inserting it. The cancel may be invoked twice, but that's ok. - // - // See TestReplicaCircuitBreaker_NoCancelRace. - tok := br.cancels.Set(ctx, cancel) - if err := brSig.Err(); err != nil { - br.cancels.Del(tok) - cancel() - return nil, nil, err - } - - return tok, brSig, nil -} - -// UnregisterAndAdjustError releases a tracked cancel function upon request -// completion. The error resulting from the request is passed in to allow -// decorating it in case the breaker tripped while the request was in-flight. -// -// See Register. -func (br *replicaCircuitBreaker) UnregisterAndAdjustError( - tok interface{}, sig signaller, pErr *roachpb.Error, -) *roachpb.Error { - if sig.C() == nil { - // Breakers were disabled and we never put the cancel in the registry. - return pErr - } - - br.cancels.Del(tok) - - brErr := sig.Err() - if pErr == nil || brErr == nil { - return pErr - } - - // The breaker tripped and the command is returning an error. Make sure the - // error reflects the tripped breaker. - - err := pErr.GoError() - if ae := (&roachpb.AmbiguousResultError{}); errors.As(err, &ae) { - // The breaker tripped while a command was inflight, so we have to - // propagate an ambiguous result. We don't want to replace it, but there - // is a way to stash an Error in it so we use that. - // - // TODO(tbg): could also wrap it; there is no other write to WrappedErr - // in the codebase and it might be better to remove it. Nested *Errors - // are not a good idea. - wrappedErr := brErr - if ae.WrappedErr != nil { - wrappedErr = errors.Wrapf(brErr, "%v", ae.WrappedErr) - } - ae.WrappedErr = roachpb.NewError(wrappedErr) - return roachpb.NewError(ae) - } else if le := (&roachpb.NotLeaseHolderError{}); errors.As(err, &le) { - // When a lease acquisition triggered by this request is short-circuited - // by the breaker, it will return an opaque NotLeaseholderError, which we - // replace with the breaker's error. - return roachpb.NewError(errors.CombineErrors(brErr, le)) - } - return pErr -} - func (br *replicaCircuitBreaker) HasMark(err error) bool { return br.wrapped.HasMark(err) } -func (br *replicaCircuitBreaker) cancelAllTrackedContexts() { - br.cancels.Visit(func(ctx context.Context, cancel func()) (remove bool) { - cancel() - return true // remove - }) -} - func (br *replicaCircuitBreaker) canEnable() bool { b := atomic.LoadInt32(&br.versionIsActive) == 1 if b { @@ -252,7 +141,6 @@ func newReplicaCircuitBreaker( stopper *stop.Stopper, ambientCtx log.AmbientContext, r replicaInCircuitBreaker, - s CancelStorage, onTrip func(), onReset func(), ) *replicaCircuitBreaker { @@ -262,8 +150,6 @@ func newReplicaCircuitBreaker( r: r, st: cs, } - br.cancels = s - br.cancels.Reset() br.wrapped = circuit.NewBreaker(circuit.Options{ Name: "breaker", // log bridge has ctx tags AsyncProbe: br.asyncProbe, @@ -306,6 +192,8 @@ func hasBypassCircuitBreakerMarker(ctx context.Context) bool { } func withBypassCircuitBreakerMarker(ctx context.Context) context.Context { + // TODO(tbg): when/if we let requests chose their poison.Policy via the + // request header, we can phase out this context. return context.WithValue(ctx, probeKey{}, probeKey{}) } @@ -319,13 +207,19 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { return } - // First, tell all current requests to fail fast. Note that clients insert - // first, then check the breaker (and remove themselves if breaker already - // tripped then). This prevents any cancels from sneaking in after the probe - // gets past this point, which could otherwise leave cancels hanging until - // "something" triggers the next probe (which may be never if no more traffic - // arrives at the Replica). See Register. - br.cancelAllTrackedContexts() + brErr := br.Signal().Err() + if brErr == nil { + // This shouldn't happen, but if we're not even tripped, don't do + // anything. + return + } + + // Poison any inflight latches. Note that any new request that is added in + // while the probe is running but after poisonInflightLatches has been + // invoked will remain untouched. We rely on the replica to periodically + // access the circuit breaker to trigger additional probes in that case. + // (This happens in refreshProposalsLocked). + br.r.poisonInflightLatches(brErr) err := sendProbe(ctx, br.r) report(err) }); err != nil { diff --git a/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go b/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go deleted file mode 100644 index c8f46bcffa16..000000000000 --- a/pkg/kv/kvserver/replica_circuit_breaker_cancelstorage.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvserver - -import ( - "context" - "sync" - "unsafe" - - "github.com/cockroachdb/cockroach/pkg/util/syncutil" -) - -// CancelStorage implements tracking of context cancellation functions -// for use by Replica circuit breakers. -type CancelStorage interface { - // Reset initializes the storage. Not thread safe. - Reset() - // Set adds context and associated cancel func to the storage. Returns a token - // that can be passed to Del. - // - // Set is thread-safe. - Set(_ context.Context, cancel func()) (token interface{}) - // Del removes a cancel func, as identified by the token returned from Set. - // - // Del is thread-safe. - Del(token interface{}) - // Visit invokes the provided closure with each (context,cancel) pair currently - // present in the storage. Items for which the visitor returns true are removed - // from the storage. - // - // Visit is thread-safe, but it is illegal to invoke methods of the - // CancelStorage from within the visitor. - Visit(func(context.Context, func()) (remove bool)) -} - -type cancelToken struct { - ctx context.Context -} - -func (tok *cancelToken) fasthash() int { - // From https://github.com/taylorza/go-lfsr/blob/7ec2b93980f950da1e36c6682771e6fe14c144c2/lfsr.go#L46-L48. - s := int(uintptr(unsafe.Pointer(tok))) - b := (s >> 0) ^ (s >> 2) ^ (s >> 3) ^ (s >> 4) - return (s >> 1) | (b << 7) -} - -var cancelTokenPool = sync.Pool{ - New: func() interface{} { return &cancelToken{} }, -} - -type mapCancelShard struct { - syncutil.Mutex - m map[*cancelToken]func() -} - -// A MapCancelStorage implements CancelStorage via shards of mutex-protected -// maps. -type MapCancelStorage struct { - NumShards int - sl []*mapCancelShard -} - -// Reset implements CancelStorage. -func (m *MapCancelStorage) Reset() { - if m.NumShards == 0 { - m.NumShards = 1 - } - m.sl = make([]*mapCancelShard, m.NumShards) - for i := range m.sl { - s := &mapCancelShard{} - s.m = map[*cancelToken]func(){} - m.sl[i] = s - } -} - -// Set implements CancelStorage. -func (m *MapCancelStorage) Set(ctx context.Context, cancel func()) interface{} { - tok := cancelTokenPool.Get().(*cancelToken) - tok.ctx = ctx - shard := m.sl[tok.fasthash()%len(m.sl)] - shard.Lock() - shard.m[tok] = cancel - shard.Unlock() - return tok -} - -// Del implements CancelStorage. -func (m *MapCancelStorage) Del(tok interface{}) { - ttok := tok.(*cancelToken) - shard := m.sl[ttok.fasthash()%len(m.sl)] - shard.Lock() - delete(shard.m, tok.(*cancelToken)) - shard.Unlock() -} - -// Visit implements CancelStorage. -func (m *MapCancelStorage) Visit(fn func(context.Context, func()) (remove bool)) { - for _, shard := range m.sl { - shard.Lock() - for tok, cancel := range shard.m { - if fn(tok.ctx, cancel) { - delete(shard.m, tok) - } - } - shard.Unlock() - } -} diff --git a/pkg/kv/kvserver/replica_circuit_breaker_test.go b/pkg/kv/kvserver/replica_circuit_breaker_test.go index c7b6029b7d70..b056713c36a2 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/replica_circuit_breaker_test.go @@ -11,27 +11,15 @@ package kvserver import ( - "context" - "fmt" - "math/rand" - "runtime" - "strconv" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" - "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" ) @@ -52,151 +40,3 @@ func TestReplicaUnavailableError(t *testing.T) { err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs) echotest.Require(t, string(redact.Sprint(err)), testutils.TestDataPath(t, "replica_unavailable_error.txt")) } - -type circuitBreakerReplicaMock struct { - clock *hlc.Clock -} - -func (c *circuitBreakerReplicaMock) Clock() *hlc.Clock { - return c.clock -} - -func (c *circuitBreakerReplicaMock) Desc() *roachpb.RangeDescriptor { - return &roachpb.RangeDescriptor{} -} - -func (c *circuitBreakerReplicaMock) Send( - ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - return ba.CreateReply(), nil -} - -func (c *circuitBreakerReplicaMock) slowReplicationThreshold( - ba *roachpb.BatchRequest, -) (time.Duration, bool) { - return 0, false -} - -func (c *circuitBreakerReplicaMock) replicaUnavailableError() error { - return errors.New("unavailable") -} - -// This test verifies that when the breaker trips and untrips again, -// there is no scenario under which the request's cancel leaks. -func TestReplicaCircuitBreaker_NoCancelRace(t *testing.T) { - defer leaktest.AfterTest(t)() - br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - defer stopper.Stop(ctx) - - g := ctxgroup.WithContext(ctx) - const count = 100 - for i := 0; i < count; i++ { - i := i // for goroutine - g.GoCtx(func(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - tok, sig, err := br.Register(ctx, cancel) - if err != nil { - _ = err // ignoring intentionally - return nil - } - if i == count/2 { - br.TripAsync() // probe will succeed - } - runtime.Gosched() - time.Sleep(time.Duration(rand.Intn(int(time.Millisecond)))) - var pErr *roachpb.Error - if i%2 == 0 { - pErr = roachpb.NewErrorf("boom") - } - _ = br.UnregisterAndAdjustError(tok, sig, pErr) - return nil - }) - } - require.NoError(t, g.Wait()) - var n int - br.cancels.Visit(func(ctx context.Context, _ func()) (remove bool) { - n++ - return false // keep - }) - require.Zero(t, n, "found tracked requests") -} - -func TestReplicaCircuitBreaker_Register(t *testing.T) { - defer leaktest.AfterTest(t)() - br, stopper := setupCircuitBreakerTest(t, "mutexmap-1") - defer stopper.Stop(context.Background()) - ctx := withBypassCircuitBreakerMarker(context.Background()) - tok, sig, err := br.Register(ctx, func() {}) - require.NoError(t, err) - defer br.UnregisterAndAdjustError(tok, sig, nil /* pErr */) - require.Zero(t, sig.C()) - var n int - br.cancels.Visit(func(ctx context.Context, f func()) (remove bool) { - n++ - return false // keep - }) - require.Zero(t, n, "probe context got added to CancelStorage") -} - -func setupCircuitBreakerTest(t testing.TB, cs string) (*replicaCircuitBreaker, *stop.Stopper) { - st := cluster.MakeTestingClusterSettings() - // Enable circuit breakers. - replicaCircuitBreakerSlowReplicationThreshold.Override(context.Background(), &st.SV, time.Hour) - r := &circuitBreakerReplicaMock{clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond)} - var numShards int - { - _, err := fmt.Sscanf(cs, "mutexmap-%d", &numShards) - require.NoError(t, err) - } - s := &MapCancelStorage{NumShards: numShards} - onTrip := func() {} - onReset := func() {} - stopper := stop.NewStopper() - br := newReplicaCircuitBreaker(st, stopper, log.AmbientContext{}, r, s, onTrip, onReset) - return br, stopper -} - -func BenchmarkReplicaCircuitBreaker_Register(b *testing.B) { - defer leaktest.AfterTest(b)() - - for _, enabled := range []bool{false, true} { - b.Run("enabled="+strconv.FormatBool(enabled), func(b *testing.B) { - dss := []string{ - "mutexmap-1", "mutexmap-2", "mutexmap-4", "mutexmap-8", "mutexmap-12", "mutexmap-16", - "mutexmap-20", "mutexmap-24", "mutexmap-32", "mutexmap-64", - } - if !enabled { - dss = dss[:1] - } - for _, ds := range dss { - b.Run(ds, func(b *testing.B) { - b.ReportAllocs() - br, stopper := setupCircuitBreakerTest(b, ds) - defer stopper.Stop(context.Background()) - - var dur time.Duration - if enabled { - dur = time.Hour - } - replicaCircuitBreakerSlowReplicationThreshold.Override(context.Background(), &br.st.SV, dur) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - ctx, cancel := context.WithCancel(context.Background()) - tok, sig, err := br.Register(ctx, cancel) - if err != nil { - b.Error(err) - } - if pErr := br.UnregisterAndAdjustError(tok, sig, nil); pErr != nil { - b.Error(pErr) - } - cancel() - } - }) - }) - } - }) - } -} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 2545b21644d7..5f7e1629d379 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -137,14 +137,8 @@ func newUnloadedReplica( onReset := func() { store.Metrics().ReplicaCircuitBreakerCurTripped.Dec(1) } - var cancelStorage CancelStorage - if f := r.store.cfg.TestingKnobs.CancelStorageFactory; f != nil { - cancelStorage = f() - } else { - cancelStorage = &MapCancelStorage{} - } r.breaker = newReplicaCircuitBreaker( - store.cfg.Settings, store.stopper, r.AmbientContext, r, cancelStorage, onTrip, onReset, + store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset, ) return r } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index bbef1c9cfa95..f958db11f9ea 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -158,6 +158,16 @@ func (proposal *ProposalData) signalProposalResult(pr proposalResult) { if proposal.doneCh != nil { proposal.doneCh <- pr proposal.doneCh = nil + // Need to remove any span from the proposal, as the signalled caller + // will likely finish it, and if we then end up applying this proposal + // we'll try to make a ChildSpan off `proposal.ctx` and this will + // trigger the Span use-after-finish assertions. + // + // See: https://github.com/cockroachdb/cockroach/pull/76858#issuecomment-1048179588 + // + // NB: `proposal.ec.repl` might already have been cleared if we arrive here + // through finishApplication. + proposal.ctx = context.Background() } } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 68cf20533c0b..fae81f2bd21a 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -1219,7 +1220,12 @@ func (r *Replica) refreshProposalsLocked( // which could avoid build-up of raft log entries during outages, see // for example: // https://github.com/cockroachdb/cockroach/issues/60612 - if r.breaker.Signal().Err() == nil && maxSlowProposalDuration > 0 { + // + // NB: the call to Err() here also re-triggers the probe if the breaker is + // already tripped and no probe is running, thus ensuring that even if a + // request got added in while the probe was about to shut down, there will + // be regular attempts at healing the breaker. + if maxSlowProposalDuration > 0 && r.breaker.Signal().Err() == nil { log.Warningf(ctx, "have been waiting %.2fs for slow proposal %s", maxSlowProposalDuration.Seconds(), maxSlowProposalDurationRequest, @@ -1256,6 +1262,21 @@ func (r *Replica) refreshProposalsLocked( } } +func (r *Replica) poisonInflightLatches(err error) { + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() + for _, p := range r.mu.proposals { + p.ec.poison() + if p.ec.g.Req.PoisonPolicy == poison.Policy_Error { + aErr := roachpb.NewAmbiguousResultError("circuit breaker tripped") + aErr.WrappedErr = roachpb.NewError(err) + p.signalProposalResult(proposalResult{Err: roachpb.NewError(aErr)}) + } + } +} + // maybeCoalesceHeartbeat returns true if the heartbeat was coalesced and added // to the appropriate queue. func (r *Replica) maybeCoalesceHeartbeat( diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 46dab0b90d50..f470914c6ca2 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -318,20 +318,16 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // coalesced requests timeout/cancel. p.cancelLocked (defined below) is the // cancel function that must be called; calling just cancel is insufficient. ctx := p.repl.AnnotateCtx(context.Background()) - if hasBypassCircuitBreakerMarker(ctx) { - // If the caller bypasses the circuit breaker, allow the lease to do the - // same. Otherwise, the lease will be refused by the circuit breaker as - // well. - // - // Note that there is a tiny race: if a request is in flight, but the - // request that triggered it (i.e. parentCtx here) does *not* bypass the - // probe, and before the circuit breaker rejects the inflight lease another - // request that *does* want to bypass the probe joins the request, it too - // will receive the circuit breaker error. This is special-cased in - // `redirectOnOrAcquireLease`, where such a caller needs to retry instead of - // propagating the error. - ctx = withBypassCircuitBreakerMarker(ctx) - } + // Lease requests always bypass the circuit breaker (i.e. will prefer to get + // stuck on an unavailable range rather than failing fast). This enables the + // caller to chose between either behavior for themselves: if they too want to + // bypass the circuit breaker, they simply don't check for the circuit breaker + // while waiting for their lease handle. If they want to fail-fast, they do. + // If the lease instead adopted the caller's preference, we'd have to handle + // the case of multiple preferences joining onto one lease request, which is + // more difficult. + ctx = withBypassCircuitBreakerMarker(ctx) + const opName = "request range lease" tr := p.repl.AmbientContext.Tracer tagsOpt := tracing.WithLogTags(logtags.FromContext(parentCtx)) @@ -452,7 +448,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( if pErr == nil { // The Replica circuit breakers together with round-tripping a ProbeRequest // here before asking for the lease could provide an alternative, simpler - // solution to the the below issue: + // solution to the below issue: // // https://github.com/cockroachdb/cockroach/issues/37906 ba := roachpb.BatchRequest{} @@ -779,6 +775,7 @@ func (r *Replica) requestLeaseLocked( newNotLeaseHolderError(roachpb.Lease{}, r.store.StoreID(), r.mu.state.Desc, "refusing to take the lease; node is draining"))) } + // Propose a Raft command to get a lease for this replica. repDesc, err := r.getReplicaDescriptorRLocked() if err != nil { @@ -1126,10 +1123,9 @@ func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStat func (r *Replica) redirectOnOrAcquireLeaseForRequest( ctx context.Context, reqTS hlc.Timestamp, ) (kvserverpb.LeaseStatus, *roachpb.Error) { + brSig := r.breaker.Signal() if hasBypassCircuitBreakerMarker(ctx) { - defer func() { - log.Infof(ctx, "hello") - }() + brSig = neverTripSignaller{} } // Try fast-path. now := r.store.Clock().NowAsClockTimestamp() @@ -1145,6 +1141,10 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( } } + if err := brSig.Err(); err != nil { + return kvserverpb.LeaseStatus{}, roachpb.NewError(err) + } + // Loop until the lease is held or the replica ascertains the actual lease // holder. Returns also on context.Done() (timeout or cancellation). for attempt := 1; ; attempt++ { @@ -1264,12 +1264,6 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // cannot be reproposed so we get this ambiguity. // We'll just loop around. return nil - case r.breaker.HasMark(goErr) && hasBypassCircuitBreakerMarker(ctx): - // If this request wanted to bypass the circuit breaker but still got a - // breaker error back, it joined a lease request started by an operation - // that did not bypass circuit breaker errors. Loop around and try again. - // See requestLeaseAsync for details. - return nil case errors.HasType(goErr, (*roachpb.LeaseRejectedError)(nil)): var tErr *roachpb.LeaseRejectedError errors.As(goErr, &tErr) @@ -1302,6 +1296,11 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( } log.VEventf(ctx, 2, "lease acquisition succeeded: %+v", status.Lease) return nil + case <-brSig.C(): + llHandle.Cancel() + err := brSig.Err() + log.VErrEventf(ctx, 2, "lease acquisition failed: %s", err) + return roachpb.NewError(err) case <-slowTimer.C: slowTimer.Read = true log.Warningf(ctx, "have been waiting %s attempting to acquire lease (%d attempts)", @@ -1361,6 +1360,11 @@ func (r *Replica) maybeExtendLeaseAsync(ctx context.Context, st kvserverpb.Lease log.Infof(ctx, "extending lease %s at %s", st.Lease, st.Now) } // We explicitly ignore the returned handle as we won't block on it. + // + // TODO(tbg): this ctx is likely cancelled very soon, which will in turn + // cancel the lease acquisition (unless joined by another more long-lived + // ctx). So this possibly isn't working as advertised (which only plays a role + // for expiration-based leases, at least). _ = r.requestLeaseLocked(ctx, st) } diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 8554c35a663b..6e1faa00eae9 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" @@ -138,19 +139,13 @@ func (r *Replica) sendWithoutRangeID( return nil, roachpb.NewError(err) } - // Circuit breaker handling. - ctx, cancel := context.WithCancel(ctx) if bypassReplicaCircuitBreakerForBatch(ba) { + // NB: we use the context to propagate whether to observe the circuit + // breaker since this information matters in a few places that today don't + // have access to the batch, for example the redirectOnOrAcquireLeaseLocked, + // for which several callers don't even have a BatchRequest. ctx = withBypassCircuitBreakerMarker(ctx) } - tok, brSig, err := r.breaker.Register(ctx, cancel) - if err != nil { - return nil, roachpb.NewError(err) - } - defer func() { - rErr = r.breaker.UnregisterAndAdjustError(tok, brSig, rErr) - cancel() - }() if err := r.maybeBackpressureBatch(ctx, ba); err != nil { return nil, roachpb.NewError(err) @@ -163,7 +158,7 @@ func (r *Replica) sendWithoutRangeID( } // NB: must be performed before collecting request spans. - ba, err = maybeStripInFlightWrites(ba) + ba, err := maybeStripInFlightWrites(ba) if err != nil { return nil, roachpb.NewError(err) } @@ -414,6 +409,10 @@ func (r *Replica) executeBatchWithConcurrencyRetries( r.concMgr.FinishReq(g) } }() + pp := poison.Policy_Wait + if !r.breaker.enabled() || !hasBypassCircuitBreakerMarker(ctx) { + pp = poison.Policy_Error + } for first := true; ; first = false { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { @@ -454,8 +453,21 @@ func (r *Replica) executeBatchWithConcurrencyRetries( Requests: ba.Requests, LatchSpans: latchSpans, // nil if g != nil LockSpans: lockSpans, // nil if g != nil + PoisonPolicy: pp, }, requestEvalKind) if pErr != nil { + if errors.HasType(pErr.GoError(), (*poison.PoisonedError)(nil)) { + brErr := r.breaker.Signal().Err() + if brErr == nil { + // The breaker may have healed in the meantime. + // + // TODO(tbg): it would be nicer if poisoning took an err and it + // came wrapped with the PoisonedError instead. Or we could + // retry the request. + brErr = r.replicaUnavailableError() + } + pErr = roachpb.NewError(errors.CombineErrors(brErr, pErr.GoError())) + } return nil, pErr } else if resp != nil { br = new(roachpb.BatchResponse) @@ -876,6 +888,9 @@ func (r *Replica) executeAdminBatch( } _, err := r.checkExecutionCanProceed(ctx, ba, nil /* g */) + if err == nil && !hasBypassCircuitBreakerMarker(ctx) { + err = r.breaker.Signal().Err() + } if err == nil { break } @@ -1156,6 +1171,14 @@ func (ec *endCmds) move() endCmds { return res } +func (ec *endCmds) poison() { + if ec.repl == nil { + // Already cleared. + return + } + ec.repl.concMgr.PoisonReq(ec.g) +} + // done releases the latches acquired by the command and updates the timestamp // cache using the final timestamp of each command. // diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 681395c07e29..594a62095088 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -94,6 +94,14 @@ func (r *Replica) executeWriteBatch( return nil, g, roachpb.NewError(err) } + // Check the breaker. Note that we do this after checkExecutionCanProceed, + // so that NotLeaseholderError has precedence. + if !hasBypassCircuitBreakerMarker(ctx) { + if err := r.breaker.Signal().Err(); err != nil { + return nil, g, roachpb.NewError(err) + } + } + // Compute the transaction's local uncertainty limit using observed // timestamps, which can help avoid uncertainty restarts. ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset()) diff --git a/pkg/kv/kvserver/spanlatch/BUILD.bazel b/pkg/kv/kvserver/spanlatch/BUILD.bazel index da2b43c4130d..a8109b363db8 100644 --- a/pkg/kv/kvserver/spanlatch/BUILD.bazel +++ b/pkg/kv/kvserver/spanlatch/BUILD.bazel @@ -14,6 +14,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/spanset", "//pkg/roachpb", "//pkg/util/hlc", @@ -37,12 +38,14 @@ go_test( embed = [":spanlatch"], deps = [ "//pkg/keys", + "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/spanset", "//pkg/roachpb", "//pkg/testutils", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/timeutil", # keep + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/spanlatch/manager.go b/pkg/kv/kvserver/spanlatch/manager.go index 92a67b87d8f3..bdbaaa71c7f9 100644 --- a/pkg/kv/kvserver/spanlatch/manager.go +++ b/pkg/kv/kvserver/spanlatch/manager.go @@ -16,6 +16,7 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -87,6 +88,7 @@ type latch struct { span roachpb.Span ts hlc.Timestamp done *signal + poison *poisonSignal next, prev *latch // readSet linked-list. } @@ -109,7 +111,9 @@ func (la *latch) SetEndKey(v []byte) { la.span.EndKey = v } // Guard is a handle to a set of acquired latches. It is returned by // Manager.Acquire and accepted by Manager.Release. type Guard struct { - done signal + pp poison.Policy + done signal + poison poisonSignal // latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size. latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32 @@ -166,9 +170,10 @@ func allocGuardAndLatches(nLatches int) (*Guard, []latch) { return new(Guard), make([]latch, nLatches) } -func newGuard(spans *spanset.SpanSet) *Guard { +func newGuard(spans *spanset.SpanSet, pp poison.Policy) *Guard { nLatches := spans.Len() guard, latches := allocGuardAndLatches(nLatches) + guard.pp = pp for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { ss := spans.GetSpans(a, s) @@ -182,6 +187,7 @@ func newGuard(spans *spanset.SpanSet) *Guard { latch := &latches[i] latch.span = ss[i].Span latch.done = &guard.done + latch.poison = &guard.poison latch.ts = ss[i].Timestamp // latch.setID() in Manager.insert, under lock. } @@ -203,8 +209,10 @@ func newGuard(spans *spanset.SpanSet) *Guard { // acquired. // // It returns a Guard which must be provided to Release. -func (m *Manager) Acquire(ctx context.Context, spans *spanset.SpanSet) (*Guard, error) { - lg, snap := m.sequence(spans) +func (m *Manager) Acquire( + ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, +) (*Guard, error) { + lg, snap := m.sequence(spans, pp) defer snap.close() err := m.wait(ctx, lg, snap) @@ -227,8 +235,8 @@ func (m *Manager) Acquire(ctx context.Context, spans *spanset.SpanSet) (*Guard, // // The method returns a Guard which must be provided to the // CheckOptimisticNoConflicts, Release methods. -func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet) *Guard { - lg, snap := m.sequence(spans) +func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, pp poison.Policy) *Guard { + lg, snap := m.sequence(spans, pp) lg.snap = &snap return lg } @@ -236,10 +244,10 @@ func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet) *Guard { // WaitFor waits for conflicting latches on the spans without adding // any latches itself. Fast path for operations that only require past latches // to be released without blocking new latches. -func (m *Manager) WaitFor(ctx context.Context, spans *spanset.SpanSet) error { +func (m *Manager) WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy) error { // The guard is only used to store latches by this request. These latches // are not actually inserted using insertLocked. - lg := newGuard(spans) + lg := newGuard(spans, pp) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -335,8 +343,8 @@ func (m *Manager) WaitUntilAcquired(ctx context.Context, lg *Guard) (*Guard, err // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet) (*Guard, snapshot) { - lg := newGuard(spans) +func (m *Manager) sequence(spans *spanset.SpanSet, pp poison.Policy) (*Guard, snapshot) { + lg := newGuard(spans, pp) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -454,7 +462,7 @@ func (m *Manager) wait(ctx context.Context, lg *Guard, snap snapshot) error { // Wait for writes at equal or lower timestamps. a2 := spanset.SpanReadWrite it := tr[a2].MakeIter() - if err := m.iterAndWait(ctx, timer, &it, a, a2, latch, ignoreLater); err != nil { + if err := m.iterAndWait(ctx, timer, &it, lg.pp, a, a2, latch, ignoreLater); err != nil { return err } case spanset.SpanReadWrite: @@ -466,13 +474,13 @@ func (m *Manager) wait(ctx context.Context, lg *Guard, snap snapshot) error { // to release their latches, so we wait on them first. a2 := spanset.SpanReadWrite it := tr[a2].MakeIter() - if err := m.iterAndWait(ctx, timer, &it, a, a2, latch, ignoreNothing); err != nil { + if err := m.iterAndWait(ctx, timer, &it, lg.pp, a, a2, latch, ignoreNothing); err != nil { return err } // Wait for reads at equal or higher timestamps. a2 = spanset.SpanReadOnly it = tr[a2].MakeIter() - if err := m.iterAndWait(ctx, timer, &it, a, a2, latch, ignoreEarlier); err != nil { + if err := m.iterAndWait(ctx, timer, &it, lg.pp, a, a2, latch, ignoreEarlier); err != nil { return err } default: @@ -491,6 +499,7 @@ func (m *Manager) iterAndWait( ctx context.Context, t *timeutil.Timer, it *iterator, + pp poison.Policy, waitType, heldType spanset.SpanAccess, wait *latch, ignore ignoreFn, @@ -503,7 +512,7 @@ func (m *Manager) iterAndWait( if ignore(wait.ts, held.ts) { continue } - if err := m.waitForSignal(ctx, t, waitType, heldType, wait, held); err != nil { + if err := m.waitForSignal(ctx, t, pp, waitType, heldType, wait, held); err != nil { return err } } @@ -512,13 +521,34 @@ func (m *Manager) iterAndWait( // waitForSignal waits for the latch that is currently held to be signaled. func (m *Manager) waitForSignal( - ctx context.Context, t *timeutil.Timer, waitType, heldType spanset.SpanAccess, wait, held *latch, + ctx context.Context, + t *timeutil.Timer, + pp poison.Policy, + waitType, heldType spanset.SpanAccess, + wait, held *latch, ) error { log.Eventf(ctx, "waiting to acquire %s latch %s, held by %s latch %s", waitType, wait, heldType, held) + poisonCh := held.poison.signalChan() for { select { case <-held.done.signalChan(): return nil + case <-poisonCh: + // The latch we're waiting on was poisoned. If we continue to wait, we have to + // poison our latches as well (so that waiters blocked on us which want to + // fail fast don't get stuck). If we fail fast, we're momentarily removing + // ourselves anyway, so we don't need to self-poison. + switch pp { + case poison.Policy_Error: + return poison.NewPoisonedError(held.span, held.ts) + case poison.Policy_Wait: + log.Eventf(ctx, "encountered poisoned latch; continuing to wait") + wait.poison.signal() + // No need to self-poison multiple times. + poisonCh = nil + default: + return errors.Errorf("unsupported poison.Policy %d", pp) + } case <-t.C: t.Read = true defer t.Reset(base.SlowRequestThreshold) @@ -541,6 +571,14 @@ func (m *Manager) waitForSignal( } } +// Poison marks the Guard as poisoned, meaning that the request will not be +// expected to be releasing its latches (any time soon). This gives requests +// blocking on the Guard's latches an opportunity to fail fast, according to +// their poison.Policy. +func (m *Manager) Poison(lg *Guard) { + lg.poison.signal() +} + // Release releases the latches held by the provided Guard. After being called, // dependent latch acquisition attempts can complete if not blocked on any other // owned latches. diff --git a/pkg/kv/kvserver/spanlatch/manager_test.go b/pkg/kv/kvserver/spanlatch/manager_test.go index 115577567470..93aa79423844 100644 --- a/pkg/kv/kvserver/spanlatch/manager_test.go +++ b/pkg/kv/kvserver/spanlatch/manager_test.go @@ -20,11 +20,13 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -91,35 +93,40 @@ func testLatchBlocks(t *testing.T, lgC <-chan *Guard) { // MustAcquire is like Acquire, except it can't return context cancellation // errors. func (m *Manager) MustAcquire(spans *spanset.SpanSet) *Guard { - lg, err := m.Acquire(context.Background(), spans) + lg, err := m.Acquire(context.Background(), spans, poison.Policy_Error) if err != nil { panic(err) } return lg } -// MustAcquireCh is like Acquire, except it only sequences the latch latch -// attempt synchronously and waits on dependent latches asynchronously. It -// returns a channel that provides the Guard when the latches are acquired (i.e. -// after waiting). If the context expires, a nil Guard will be delivered on the -// channel. +// MustAcquireCh is like Acquire, except it only sequences the latch attempt +// synchronously and waits on dependent latches asynchronously. It returns a +// a channel that is signaled with the *Guard returned after waiting (and nil +// on error). Use MustAcquireChExt if more control is desired. func (m *Manager) MustAcquireCh(spans *spanset.SpanSet) <-chan *Guard { - return m.MustAcquireChCtx(context.Background(), spans) + chG, _ := m.MustAcquireChExt(context.Background(), spans, poison.Policy_Error) + return chG } -// MustAcquireChCtx is like MustAcquireCh, except it accepts a context. -func (m *Manager) MustAcquireChCtx(ctx context.Context, spans *spanset.SpanSet) <-chan *Guard { - ch := make(chan *Guard) - lg, snap := m.sequence(spans) +// MustAcquireChExt is like MustAcquireCh, except it accepts a context and +// poison.Policy, and returns an additional channel that returns an error +// if a nil guard is returned on the first channel. +func (m *Manager) MustAcquireChExt( + ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, +) (<-chan *Guard, <-chan error) { + chErr := make(chan error, 1) + chG := make(chan *Guard, 1) + lg, snap := m.sequence(spans, pp) go func() { err := m.wait(ctx, lg, snap) if err != nil { m.Release(lg) - lg = nil } - ch <- lg + chErr <- err + chG <- lg }() - return ch + return chG, chErr } func TestLatchManager(t *testing.T) { @@ -509,6 +516,39 @@ func TestLatchManagerDependentLatches(t *testing.T) { } } +func TestLatchManagerPoison(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + var m Manager + + cha1, _ := m.MustAcquireChExt(ctx, spans("a", "", write, zeroTS), poison.Policy_Wait) + cha2, erra2 := m.MustAcquireChExt(ctx, spans("a", "", write, zeroTS), poison.Policy_Error) + cha3, erra3 := m.MustAcquireChExt(ctx, spans("a", "", write, zeroTS), poison.Policy_Error) + + ga1 := testLatchSucceeds(t, cha1) + defer m.Release(ga1) + + // cha3 blocks on cha2 blocks on cha1. + testLatchBlocks(t, cha3) + testLatchBlocks(t, cha2) + + // Poison a1. + ga1.poison.signal() + + testLatchPoisons := func(t *testing.T, ch <-chan error) { + t.Helper() + select { + case err := <-ch: + require.True(t, errors.HasType(err, (*poison.PoisonedError)(nil)), "%+v", err) + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatal("timed out") + } + } + + testLatchPoisons(t, erra2) + testLatchPoisons(t, erra3) +} + func TestLatchManagerContextCancellation(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager @@ -517,7 +557,7 @@ func TestLatchManagerContextCancellation(t *testing.T) { lg1 := m.MustAcquire(spans("a", "", write, zeroTS)) // The second one is given a cancelable context. ctx2, cancel2 := context.WithCancel(context.Background()) - lg2C := m.MustAcquireChCtx(ctx2, spans("a", "", write, zeroTS)) + lg2C, _ := m.MustAcquireChExt(ctx2, spans("a", "", write, zeroTS), poison.Policy_Error) lg3C := m.MustAcquireCh(spans("a", "", write, zeroTS)) // The second and third latch attempt block on the first. @@ -541,13 +581,13 @@ func TestLatchManagerOptimistic(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS)) - require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS))) + lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS), poison.Policy_Error) + require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS)), poison.Policy_Error) lg1, err := m.WaitUntilAcquired(context.Background(), lg1) require.NoError(t, err) // Optimistic acquire encounters conflict in some cases. - lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS)) + lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS), poison.Policy_Error) require.False(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "e", read, zeroTS))) require.True(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "d", read, zeroTS))) waitUntilAcquiredCh := func(g *Guard) <-chan *Guard { @@ -565,7 +605,7 @@ func TestLatchManagerOptimistic(t *testing.T) { testLatchSucceeds(t, ch2) // Optimistic acquire encounters conflict. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS)) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error) require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) m.Release(lg2) // There is still a conflict even though lg2 has been released. @@ -577,7 +617,7 @@ func TestLatchManagerOptimistic(t *testing.T) { // Optimistic acquire for read below write encounters no conflict. oneTS, twoTS := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2} lg4 := m.MustAcquire(spans("c", "e", write, twoTS)) - lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS)) + lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS), poison.Policy_Error) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "e", read, oneTS))) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "c", read, oneTS))) lg5, err = m.WaitUntilAcquired(context.Background(), lg5) @@ -591,14 +631,14 @@ func TestLatchManagerWaitFor(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS)) + lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS), poison.Policy_Error) require.NoError(t, err) // See if WaitFor waits for above latch. waitForCh := func() <-chan *Guard { ch := make(chan *Guard) go func() { - err := m.WaitFor(context.Background(), spans("a", "e", read, zeroTS)) + err := m.WaitFor(context.Background(), spans("a", "e", read, zeroTS), poison.Policy_Error) require.NoError(t, err) ch <- &Guard{} }() @@ -611,7 +651,7 @@ func TestLatchManagerWaitFor(t *testing.T) { // Optimistic acquire should _not_ encounter conflict - as WaitFor should // not lay any latches. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS)) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error) require.True(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) lg3, err = m.WaitUntilAcquired(context.Background(), lg3) require.NoError(t, err) @@ -659,7 +699,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { b.ResetTimer() for i := range spans { - lg, snap := m.sequence(&spans[i]) + lg, snap := m.sequence(&spans[i], poison.Policy_Error) snap.close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) diff --git a/pkg/kv/kvserver/spanlatch/signal.go b/pkg/kv/kvserver/spanlatch/signal.go index 71deb5621918..da31c605108d 100644 --- a/pkg/kv/kvserver/spanlatch/signal.go +++ b/pkg/kv/kvserver/spanlatch/signal.go @@ -41,9 +41,17 @@ type signal struct { } func (s *signal) signal() { + s.signalWithChoice(false /* idempotent */) +} + +func (s *signal) signalWithChoice(idempotent bool) { if !atomic.CompareAndSwapInt32(&s.a, noSig, sig) { + if idempotent { + return + } panic("signaled twice") } + // Close the channel if it was ever initialized. if cPtr := atomic.LoadPointer(&s.c); cPtr != nil { // Coordinate with signalChan to avoid double-closing. @@ -82,6 +90,19 @@ func (s *signal) signalChan() <-chan struct{} { return c } +// poisonSignal is like signal, but its signal method is idempotent. +type poisonSignal struct { + sig signal +} + +func (s *poisonSignal) signal() { + s.sig.signalWithChoice(true /* idempotent */) +} + +func (s *poisonSignal) signalChan() <-chan struct{} { + return s.sig.signalChan() +} + func chanToPtr(c chan struct{}) unsafe.Pointer { return *(*unsafe.Pointer)(unsafe.Pointer(&c)) } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 77798a352a8d..5e70695cb359 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -86,10 +86,6 @@ type StoreTestingKnobs struct { // per-Batch SlowReplicationThreshold. SlowReplicationThresholdOverride func(ba *roachpb.BatchRequest) time.Duration - // CancelStorageFactory overrides the default CancelStorage used by Replica - // circuit breakers. - CancelStorageFactory func() CancelStorage - // TestingRangefeedFilter is called before a replica processes a rangefeed // in order for unit tests to modify the request, error returned to the client // or data. diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 2dd836e61cdb..2ab9e465855e 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1294,8 +1294,9 @@ func (*AdminChangeReplicasRequest) flags() flag { return isAdmin | isAlone } func (*AdminRelocateRangeRequest) flags() flag { return isAdmin | isAlone } func (*GCRequest) flags() flag { - // We let GCRequest bypass the circuit breaker because otherwise, the GC queue may - // busy loop on an unavailable range, doing lots of work but never making progress. + // We defensively let GCRequest bypass the circuit breaker because otherwise, + // the GC queue might busy loop on an unavailable range, doing lots of work + // but never making progress. return isWrite | isRange | bypassesReplicaCircuitBreaker } @@ -1321,7 +1322,9 @@ func (*ResolveIntentRequest) flags() flag { return isWrite } func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange } func (*TruncateLogRequest) flags() flag { return isWrite } func (*MergeRequest) flags() flag { return isWrite | canBackpressure } -func (*RequestLeaseRequest) flags() flag { return isWrite | isAlone | skipsLeaseCheck } +func (*RequestLeaseRequest) flags() flag { + return isWrite | isAlone | skipsLeaseCheck | bypassesReplicaCircuitBreaker +} // LeaseInfoRequest is usually executed in an INCONSISTENT batch, which has the // effect of the `skipsLeaseCheck` flag that lease write operations have. @@ -1339,6 +1342,11 @@ func (*TransferLeaseRequest) flags() flag { // the store has registered that a transfer is in progress and // `redirectOnOrAcquireLease` would already tentatively redirect to the future // lease holder. + // + // Note that we intentionally don't let TransferLease bypass the Replica + // circuit breaker. Transferring a lease while the replication layer is + // unavailable results in the "old" leaseholder relinquishing the ability + // to serve (strong) reads, without being able to hand over the lease. return isWrite | isAlone | skipsLeaseCheck } func (*ProbeRequest) flags() flag {