diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 04f43093b1b9..709deda57a47 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -139,6 +139,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/closedts/sidetransport:sidetransport_test", "//pkg/kv/kvserver/closedts/tracker:tracker_test", "//pkg/kv/kvserver/closedts:closedts_test", + "//pkg/kv/kvserver/concurrency/poison:poison_test", "//pkg/kv/kvserver/concurrency:concurrency_test", "//pkg/kv/kvserver/gc:gc_test", "//pkg/kv/kvserver/idalloc:idalloc_test", diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index bdade53f7f26..11c19089eb8d 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -89,7 +89,7 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { require.NoError(t, tc.Write(n1)) // Disable the probe so that when the breaker trips, it stays tripped. tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("injected breaker error")) + tc.TripBreaker(n1) s1 := tc.GetFirstStoreFromServer(t, n1) s2 := tc.GetFirstStoreFromServer(t, n2) @@ -155,7 +155,7 @@ func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { require.NoError(t, tc.Write(n1)) // Disable the probe on n2 so that when the breaker trips, it stays tripped. tc.SetProbeEnabled(n2, false) - tc.Report(n2, errors.New("injected breaker error")) + tc.TripBreaker(n2) // We didn't trip the leaseholder n1, so it is unaffected. require.NoError(t, tc.Read(n1)) @@ -200,7 +200,7 @@ func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) { // disabled. require.NoError(t, tc.Write(n1)) tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("injected breaker error")) + tc.TripBreaker(n1) resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats) // On n1, run into the circuit breaker when requesting lease. We have to @@ -515,7 +515,7 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { // disabled, i.e. it will stay tripped. require.NoError(t, tc.Write(n1)) tc.SetProbeEnabled(n1, false) - tc.Report(n1, errors.New("injected breaker error")) + tc.TripBreaker(n1) exemptRequests := []func() roachpb.Request{ func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} }, @@ -780,8 +780,9 @@ func (cbt *circuitBreakerTest) SetProbeEnabled(idx int, to bool) { cbt.repls[idx].setProbeEnabled(to) } -func (cbt *circuitBreakerTest) Report(idx int, err error) { - cbt.repls[idx].Replica.Breaker().Report(err) +func (cbt *circuitBreakerTest) TripBreaker(idx int) { + repl := cbt.repls[idx].Replica + repl.TripBreaker() } func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) error, idx int) { @@ -938,6 +939,7 @@ func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) { err = aErr.WrappedErr.GoError() } require.True(t, errors.Is(err, circuit.ErrBreakerOpen), "%+v", err) + require.True(t, errors.HasType(err, (*roachpb.ReplicaUnavailableError)(nil)), "%+v", err) } func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) { diff --git a/pkg/kv/kvserver/concurrency/poison/BUILD.bazel b/pkg/kv/kvserver/concurrency/poison/BUILD.bazel index fa267c099677..cf282b9e15d4 100644 --- a/pkg/kv/kvserver/concurrency/poison/BUILD.bazel +++ b/pkg/kv/kvserver/concurrency/poison/BUILD.bazel @@ -1,5 +1,5 @@ load("@rules_proto//proto:defs.bzl", "proto_library") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") proto_library( @@ -42,3 +42,20 @@ go_library( "@com_github_cockroachdb_errors//:errors", ], ) + +go_test( + name = "poison_test", + srcs = ["error_test.go"], + data = glob(["testdata/**"]), + deps = [ + ":poison", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/testutils/echotest", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/kv/kvserver/concurrency/poison/error_test.go b/pkg/kv/kvserver/concurrency/poison/error_test.go new file mode 100644 index 000000000000..360501d4e68e --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/error_test.go @@ -0,0 +1,39 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package poison_test + +import ( + "context" + "path/filepath" + "testing" + + _ "github.com/cockroachdb/cockroach/pkg/keys" // to init roachpb.PrettyPrintRange + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/echotest" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" + "github.com/stretchr/testify/require" +) + +func TestPoisonedError(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + err := errors.DecodeError(ctx, errors.EncodeError(ctx, poison.NewPoisonedError( + roachpb.Span{Key: roachpb.Key("a")}, hlc.Timestamp{WallTime: 1}, + ))) + require.True(t, errors.HasType(err, (*poison.PoisonedError)(nil)), "%+v", err) + var buf redact.StringBuilder + buf.Printf("%s", err) + echotest.Require(t, string(buf.RedactableString()), filepath.Join("testdata", "poisoned_error.txt")) +} diff --git a/pkg/kv/kvserver/concurrency/poison/testdata/poisoned_error.txt b/pkg/kv/kvserver/concurrency/poison/testdata/poisoned_error.txt new file mode 100644 index 000000000000..b4b6bf406111 --- /dev/null +++ b/pkg/kv/kvserver/concurrency/poison/testdata/poisoned_error.txt @@ -0,0 +1,3 @@ +echo +---- +encountered poisoned latch ‹a›@0.000000001,0 diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index c25aa8641a14..2d0156d68969 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -494,6 +494,11 @@ func (r *Replica) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy { return r.closedTimestampPolicyRLocked() } +// TripBreaker synchronously trips the breaker. +func (r *Replica) TripBreaker() { + r.breaker.tripSync(errors.New("injected error")) +} + // GetCircuitBreaker returns the circuit breaker controlling // connection attempts to the specified node. func (t *RaftTransport) GetCircuitBreaker( diff --git a/pkg/kv/kvserver/replica_circuit_breaker.go b/pkg/kv/kvserver/replica_circuit_breaker.go index 05d8e8ee5957..300f1db2fd62 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker.go +++ b/pkg/kv/kvserver/replica_circuit_breaker.go @@ -37,7 +37,7 @@ type replicaInCircuitBreaker interface { Desc() *roachpb.RangeDescriptor Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) slowReplicationThreshold(ba *roachpb.BatchRequest) (time.Duration, bool) - replicaUnavailableError() error + replicaUnavailableError(err error) error poisonInflightLatches(err error) } @@ -102,11 +102,7 @@ func (br *replicaCircuitBreaker) enabled() bool { return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 && br.canEnable() } -func (br *replicaCircuitBreaker) newError() error { - return br.r.replicaUnavailableError() -} - -func (br *replicaCircuitBreaker) TripAsync() { +func (br *replicaCircuitBreaker) TripAsync(err error) { if !br.enabled() { return } @@ -114,11 +110,15 @@ func (br *replicaCircuitBreaker) TripAsync() { _ = br.stopper.RunAsyncTask( br.ambCtx.AnnotateCtx(context.Background()), "trip-breaker", func(ctx context.Context) { - br.wrapped.Report(br.newError()) + br.tripSync(err) }, ) } +func (br *replicaCircuitBreaker) tripSync(err error) { + br.wrapped.Report(br.r.replicaUnavailableError(err)) +} + type signaller interface { Err() error C() <-chan struct{} @@ -240,12 +240,13 @@ func sendProbe(ctx context.Context, r replicaInCircuitBreaker) error { return pErr.GoError() }, ); err != nil { - return errors.CombineErrors(r.replicaUnavailableError(), err) + return r.replicaUnavailableError(err) } return nil } func replicaUnavailableError( + err error, desc *roachpb.RangeDescriptor, replDesc roachpb.ReplicaDescriptor, lm liveness.IsLiveMap, @@ -270,25 +271,22 @@ func replicaUnavailableError( var _ redact.SafeFormatter = desc var _ redact.SafeFormatter = replDesc - err := roachpb.NewReplicaUnavailableError(desc, replDesc) + if len(nonLiveRepls.AsProto()) > 0 { + err = errors.Wrapf(err, "replicas on non-live nodes: %v (lost quorum: %t)", nonLiveRepls, !canMakeProgress) + } + err = errors.Wrapf( err, "raft status: %+v", redact.Safe(rs), // raft status contains no PII ) - if len(nonLiveRepls.AsProto()) > 0 { - err = errors.Wrapf(err, "replicas on non-live nodes: %v (lost quorum: %t)", nonLiveRepls, !canMakeProgress) - } - return err + return roachpb.NewReplicaUnavailableError(err, desc, replDesc) } -func (r *Replica) replicaUnavailableError() error { +func (r *Replica) replicaUnavailableError(err error) error { desc := r.Desc() replDesc, _ := desc.GetReplicaDescriptor(r.store.StoreID()) - var isLiveMap liveness.IsLiveMap - if nl := r.store.cfg.NodeLiveness; nl != nil { // exclude unit test - isLiveMap = nl.GetIsLiveMap() - } - return replicaUnavailableError(desc, replDesc, isLiveMap, r.RaftStatus()) + isLiveMap, _ := r.store.livenessMap.Load().(liveness.IsLiveMap) + return replicaUnavailableError(err, desc, replDesc, isLiveMap, r.RaftStatus()) } diff --git a/pkg/kv/kvserver/replica_circuit_breaker_test.go b/pkg/kv/kvserver/replica_circuit_breaker_test.go index b056713c36a2..ab1f9aa32dff 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/replica_circuit_breaker_test.go @@ -11,6 +11,7 @@ package kvserver import ( + "context" "testing" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -19,7 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/echotest" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" ) @@ -36,7 +39,12 @@ func TestReplicaUnavailableError(t *testing.T) { lm := liveness.IsLiveMap{ 1: liveness.IsLiveMapEntry{IsLive: true}, } + wrappedErr := errors.New("probe failed") rs := raft.Status{} - err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs) + ctx := context.Background() + err := errors.DecodeError(ctx, errors.EncodeError(ctx, replicaUnavailableError( + wrappedErr, desc, desc.Replicas().AsProto()[0], lm, &rs), + )) + require.True(t, errors.Is(err, wrappedErr), "%+v", err) echotest.Require(t, string(redact.Sprint(err)), testutils.TestDataPath(t, "replica_unavailable_error.txt")) } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index d5b1e17233c0..9ea60963228c 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1227,16 +1227,15 @@ func (r *Replica) refreshProposalsLocked( // request got added in while the probe was about to shut down, there will // be regular attempts at healing the breaker. if maxSlowProposalDuration > 0 && r.breaker.Signal().Err() == nil { - log.Warningf(ctx, - "have been waiting %.2fs for slow proposal %s", - maxSlowProposalDuration.Seconds(), maxSlowProposalDurationRequest, - ) + err := errors.Errorf("have been waiting %.2fs for slow proposal %s", + maxSlowProposalDuration.Seconds(), maxSlowProposalDurationRequest) + log.Warningf(ctx, "%s", err) // NB: this is async because we're holding lots of locks here, and we want // to avoid having to pass all the information about the replica into the // breaker (since the breaker needs access to this information at will to // power the probe anyway). Over time, we anticipate there being multiple // mechanisms which trip the breaker. - r.breaker.TripAsync() + r.breaker.TripAsync(err) } if log.V(1) && len(reproposals) > 0 { diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 127fc14627d7..f94ef1d4e6a4 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util/circuit" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -450,17 +451,20 @@ func (r *Replica) executeBatchWithConcurrencyRetries( LockSpans: lockSpans, // nil if g != nil }, requestEvalKind) if pErr != nil { - if errors.HasType(pErr.GoError(), (*poison.PoisonedError)(nil)) { - brErr := r.breaker.Signal().Err() - if brErr == nil { - // The breaker may have healed in the meantime. - // - // TODO(tbg): it would be nicer if poisoning took an err and it - // came wrapped with the PoisonedError instead. Or we could - // retry the request. - brErr = r.replicaUnavailableError() - } - pErr = roachpb.NewError(errors.CombineErrors(brErr, pErr.GoError())) + if poisonErr := (*poison.PoisonedError)(nil); errors.As(pErr.GoError(), &poisonErr) { + // NB: we make the breaker error (which may be nil at this point, but + // usually is not) a secondary error, meaning it is not in the error + // chain. That is fine; the important bits to investigate + // programmatically are the ReplicaUnavailableError (which contains the + // descriptor) and the *PoisonedError (which contains the concrete + // subspan that caused this request to fail). We mark + // circuit.ErrBreakerOpen into the chain as well so that we have the + // invariant that all replica circuit breaker errors contain both + // ErrBreakerOpen and ReplicaUnavailableError. + pErr = roachpb.NewError(r.replicaUnavailableError(errors.CombineErrors( + errors.Mark(poisonErr, circuit.ErrBreakerOpen), + r.breaker.Signal().Err(), + ))) } return nil, pErr } else if resp != nil { diff --git a/pkg/kv/kvserver/testdata/replica_unavailable_error.txt b/pkg/kv/kvserver/testdata/replica_unavailable_error.txt index 226adfe058b3..e685782c302e 100644 --- a/pkg/kv/kvserver/testdata/replica_unavailable_error.txt +++ b/pkg/kv/kvserver/testdata/replica_unavailable_error.txt @@ -1,3 +1,3 @@ echo ---- -replicas on non-live nodes: (n2,s20):2 (lost quorum: true): raft status: {"id":"0","term":0,"vote":"0","commit":0,"lead":"0","raftState":"StateFollower","applied":0,"progress":{},"leadtransferee":"0"}: replica (n1,s10):1 unable to serve request to r10:‹{a-z}› [(n1,s10):1, (n2,s20):2, next=3, gen=0] +replica (n1,s10):1 unable to serve request to r10:‹{a-z}› [(n1,s10):1, (n2,s20):2, next=3, gen=0]: raft status: {"id":"0","term":0,"vote":"0","commit":0,"lead":"0","raftState":"StateFollower","applied":0,"progress":{},"leadtransferee":"0"}: replicas on non-live nodes: (n2,s20):2 (lost quorum: true): probe failed diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index 727432e40804..4c83cab37d32 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -54,6 +54,7 @@ go_library( "@com_github_cockroachdb_errors//errorspb", "@com_github_cockroachdb_errors//extgrpc", "@com_github_cockroachdb_redact//:redact", + "@com_github_gogo_protobuf//proto", "@com_github_golang_mock//gomock", # keep "@io_etcd_go_etcd_raft_v3//raftpb", "@org_golang_google_grpc//metadata", # keep diff --git a/pkg/roachpb/errors.proto b/pkg/roachpb/errors.proto index 48c6b2d5f8e8..9a556db87931 100644 --- a/pkg/roachpb/errors.proto +++ b/pkg/roachpb/errors.proto @@ -385,6 +385,7 @@ message ReplicaUnavailableError { optional roachpb.RangeDescriptor desc = 2 [(gogoproto.nullable) = false]; optional roachpb.ReplicaDescriptor replica = 4 [(gogoproto.nullable) = false]; + optional errorspb.EncodedError cause = 5 [(gogoproto.nullable) = false]; } // A RaftGroupDeletedError indicates a raft group has been deleted for diff --git a/pkg/roachpb/replica_unavailable_error.go b/pkg/roachpb/replica_unavailable_error.go index 4b4e5b9d3c57..4259ee97eb94 100644 --- a/pkg/roachpb/replica_unavailable_error.go +++ b/pkg/roachpb/replica_unavailable_error.go @@ -11,43 +11,58 @@ package roachpb import ( + context "context" "fmt" "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/proto" ) // NewReplicaUnavailableError initializes a new *ReplicaUnavailableError. It is // provided with the range descriptor known to the replica, and the relevant // replica descriptor within. -func NewReplicaUnavailableError(desc *RangeDescriptor, replDesc ReplicaDescriptor) error { +func NewReplicaUnavailableError( + cause error, desc *RangeDescriptor, replDesc ReplicaDescriptor, +) error { return &ReplicaUnavailableError{ Desc: *desc, Replica: replDesc, + Cause: errors.EncodeError(context.Background(), cause), } } var _ errors.SafeFormatter = (*ReplicaUnavailableError)(nil) var _ fmt.Formatter = (*ReplicaUnavailableError)(nil) +var _ errors.Wrapper = (*ReplicaUnavailableError)(nil) // SafeFormatError implements errors.SafeFormatter. func (e *ReplicaUnavailableError) SafeFormatError(p errors.Printer) error { - e.printTo(p.Printf) + p.Printf("replica %s unable to serve request to %s: %s", e.Replica, e.Desc, e.Unwrap()) return nil } -// See https://github.com/cockroachdb/errors/issues/88. -func (e *ReplicaUnavailableError) printTo(printf func(string, ...interface{})) { - printf("replica %s unable to serve request to %s", e.Replica, e.Desc) -} - // Format implements fmt.Formatter. func (e *ReplicaUnavailableError) Format(s fmt.State, verb rune) { errors.FormatError(e, s, verb) } // Error implements error. func (e *ReplicaUnavailableError) Error() string { - var s string - e.printTo(func(format string, args ...interface{}) { - s = fmt.Sprintf(format, args...) - }) - return s + return fmt.Sprint(e) +} + +// Unwrap implements errors.Wrapper. +func (e *ReplicaUnavailableError) Unwrap() error { + return errors.DecodeError(context.Background(), e.Cause) +} + +func init() { + encode := func(ctx context.Context, err error) (msgPrefix string, safeDetails []string, payload proto.Message) { + errors.As(err, &payload) // payload = err.(proto.Message) + return "", nil, payload + } + decode := func(ctx context.Context, cause error, msgPrefix string, safeDetails []string, payload proto.Message) error { + return payload.(*ReplicaUnavailableError) + } + typeName := errors.GetTypeKey((*ReplicaUnavailableError)(nil)) + errors.RegisterWrapperEncoder(typeName, encode) + errors.RegisterWrapperDecoder(typeName, decode) } diff --git a/pkg/roachpb/string_test.go b/pkg/roachpb/string_test.go index 2689e594e96f..07977c2e3db2 100644 --- a/pkg/roachpb/string_test.go +++ b/pkg/roachpb/string_test.go @@ -156,8 +156,12 @@ func TestReplicaUnavailableError(t *testing.T) { set.AddReplica(rDesc) desc := roachpb.NewRangeDescriptor(123, roachpb.RKeyMin, roachpb.RKeyMax, set) - var err = roachpb.NewReplicaUnavailableError(desc, rDesc) + errSlowProposal := errors.New("slow proposal") + var err = roachpb.NewReplicaUnavailableError(errSlowProposal, desc, rDesc) err = errors.DecodeError(ctx, errors.EncodeError(ctx, err)) + // Sanity check that Unwrap() was implemented. + require.True(t, errors.Is(err, errSlowProposal), "%+v", err) + require.True(t, errors.HasType(err, (*roachpb.ReplicaUnavailableError)(nil)), "%+v", err) s := fmt.Sprintf("%s\n%s", err, redact.Sprint(err)) echotest.Require(t, s, filepath.Join("testdata", "replica_unavailable_error.txt")) diff --git a/pkg/roachpb/testdata/replica_unavailable_error.txt b/pkg/roachpb/testdata/replica_unavailable_error.txt index 2f06a1b72678..f2ae80eb8b9b 100644 --- a/pkg/roachpb/testdata/replica_unavailable_error.txt +++ b/pkg/roachpb/testdata/replica_unavailable_error.txt @@ -1,4 +1,4 @@ echo ---- -replica (n1,s2):3 unable to serve request to r123:/M{in-ax} [(n1,s2):1, next=2, gen=0] -replica (n1,s2):3 unable to serve request to r123:‹/M{in-ax}› [(n1,s2):1, next=2, gen=0] +replica (n1,s2):3 unable to serve request to r123:/M{in-ax} [(n1,s2):1, next=2, gen=0]: slow proposal +replica (n1,s2):3 unable to serve request to r123:‹/M{in-ax}› [(n1,s2):1, next=2, gen=0]: slow proposal