Skip to content

Commit

Permalink
roachpb,kvserver: make ReplicaUnavailableError a wrapping error
Browse files Browse the repository at this point in the history
`ReplicaUnavailableError` was previously a leaf error, so the only
way to attach additional information "on the way out" was to wrap *it*
with wrapper errors. This made it difficult to read the messages since
the result and cause were reversed.

This became exacerbated with the recent addition of `PoisonedError`,
which should really be a cause of `ReplicaUnavailableError` too.

In this commit, we make ReplicaUnavailableError a wrapper error and
rearrange breaker errors such that if a PoisonError occurs, it is
a cause of the ReplicaUnavailableError.

The change in `pkg/kv/kvserver/testdata/replica_unavailable_error.txt`
illustrates the improved error users will see (and which will thus be
reported to us).

As a wrapping error, I needed to register the error with
`cockroachdb/errors` to allow for proper encoding/decoding. I'm unsure
whether this worked properly before, but now it definitely does (as it
is tested).

Testing was improved to check for presence of `ReplicaUnavailableError`
in all breaker errors.

Touches cockroachdb#33007.

Release justification: UX improvement for existing functionality
Release note: None
  • Loading branch information
tbg committed Mar 7, 2022
1 parent eebc802 commit 68566b6
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 48 deletions.
14 changes: 8 additions & 6 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) {
require.NoError(t, tc.Write(n1))
// Disable the probe so that when the breaker trips, it stays tripped.
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("injected breaker error"))
tc.TripBreaker(n1)

s1 := tc.GetFirstStoreFromServer(t, n1)
s2 := tc.GetFirstStoreFromServer(t, n2)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) {
require.NoError(t, tc.Write(n1))
// Disable the probe on n2 so that when the breaker trips, it stays tripped.
tc.SetProbeEnabled(n2, false)
tc.Report(n2, errors.New("injected breaker error"))
tc.TripBreaker(n2)

// We didn't trip the leaseholder n1, so it is unaffected.
require.NoError(t, tc.Read(n1))
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) {
// disabled.
require.NoError(t, tc.Write(n1))
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("injected breaker error"))
tc.TripBreaker(n1)
resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats)

// On n1, run into the circuit breaker when requesting lease. We have to
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) {
// disabled, i.e. it will stay tripped.
require.NoError(t, tc.Write(n1))
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("injected breaker error"))
tc.TripBreaker(n1)

exemptRequests := []func() roachpb.Request{
func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} },
Expand Down Expand Up @@ -780,8 +780,9 @@ func (cbt *circuitBreakerTest) SetProbeEnabled(idx int, to bool) {
cbt.repls[idx].setProbeEnabled(to)
}

func (cbt *circuitBreakerTest) Report(idx int, err error) {
cbt.repls[idx].Replica.Breaker().Report(err)
func (cbt *circuitBreakerTest) TripBreaker(idx int) {
repl := cbt.repls[idx].Replica
repl.TripBreaker()
}

func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) error, idx int) {
Expand Down Expand Up @@ -938,6 +939,7 @@ func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) {
err = aErr.WrappedErr.GoError()
}
require.True(t, errors.Is(err, circuit.ErrBreakerOpen), "%+v", err)
require.True(t, errors.HasType(err, (*roachpb.ReplicaUnavailableError)(nil)), "%+v", err)
}

func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/concurrency/poison/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ func (e *PoisonedError) Format(s fmt.State, verb rune) { errors.FormatError(e, s
func (e *PoisonedError) Error() string {
return fmt.Sprint(e)
}

// TODO(tbg): need similar init() function that ReplicaUnavailableError has,
// or PoisonError will not survive network round-trips.
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ func (r *Replica) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
return r.closedTimestampPolicyRLocked()
}

// TripBreaker synchronously trips the breaker.
func (r *Replica) TripBreaker() {
r.breaker.tripSync(errors.New("injected error"))
}

// GetCircuitBreaker returns the circuit breaker controlling
// connection attempts to the specified node.
func (t *RaftTransport) GetCircuitBreaker(
Expand Down
36 changes: 17 additions & 19 deletions pkg/kv/kvserver/replica_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type replicaInCircuitBreaker interface {
Desc() *roachpb.RangeDescriptor
Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
slowReplicationThreshold(ba *roachpb.BatchRequest) (time.Duration, bool)
replicaUnavailableError() error
replicaUnavailableError(err error) error
poisonInflightLatches(err error)
}

Expand Down Expand Up @@ -102,23 +102,23 @@ func (br *replicaCircuitBreaker) enabled() bool {
return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 && br.canEnable()
}

func (br *replicaCircuitBreaker) newError() error {
return br.r.replicaUnavailableError()
}

func (br *replicaCircuitBreaker) TripAsync() {
func (br *replicaCircuitBreaker) TripAsync(err error) {
if !br.enabled() {
return
}

_ = br.stopper.RunAsyncTask(
br.ambCtx.AnnotateCtx(context.Background()), "trip-breaker",
func(ctx context.Context) {
br.wrapped.Report(br.newError())
br.tripSync(err)
},
)
}

func (br *replicaCircuitBreaker) tripSync(err error) {
br.wrapped.Report(br.r.replicaUnavailableError(err))
}

type signaller interface {
Err() error
C() <-chan struct{}
Expand Down Expand Up @@ -240,12 +240,13 @@ func sendProbe(ctx context.Context, r replicaInCircuitBreaker) error {
return pErr.GoError()
},
); err != nil {
return errors.CombineErrors(r.replicaUnavailableError(), err)
return r.replicaUnavailableError(err)
}
return nil
}

func replicaUnavailableError(
err error,
desc *roachpb.RangeDescriptor,
replDesc roachpb.ReplicaDescriptor,
lm liveness.IsLiveMap,
Expand All @@ -270,25 +271,22 @@ func replicaUnavailableError(
var _ redact.SafeFormatter = desc
var _ redact.SafeFormatter = replDesc

err := roachpb.NewReplicaUnavailableError(desc, replDesc)
if len(nonLiveRepls.AsProto()) > 0 {
err = errors.Wrapf(err, "replicas on non-live nodes: %v (lost quorum: %t)", nonLiveRepls, !canMakeProgress)
}

err = errors.Wrapf(
err,
"raft status: %+v", redact.Safe(rs), // raft status contains no PII
)
if len(nonLiveRepls.AsProto()) > 0 {
err = errors.Wrapf(err, "replicas on non-live nodes: %v (lost quorum: %t)", nonLiveRepls, !canMakeProgress)
}

return err
return roachpb.NewReplicaUnavailableError(err, desc, replDesc)
}

func (r *Replica) replicaUnavailableError() error {
func (r *Replica) replicaUnavailableError(err error) error {
desc := r.Desc()
replDesc, _ := desc.GetReplicaDescriptor(r.store.StoreID())

var isLiveMap liveness.IsLiveMap
if nl := r.store.cfg.NodeLiveness; nl != nil { // exclude unit test
isLiveMap = nl.GetIsLiveMap()
}
return replicaUnavailableError(desc, replDesc, isLiveMap, r.RaftStatus())
isLiveMap, _ := r.store.livenessMap.Load().(liveness.IsLiveMap)
return replicaUnavailableError(err, desc, replDesc, isLiveMap, r.RaftStatus())
}
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package kvserver

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
Expand All @@ -19,7 +20,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3"
)

Expand All @@ -36,7 +39,12 @@ func TestReplicaUnavailableError(t *testing.T) {
lm := liveness.IsLiveMap{
1: liveness.IsLiveMapEntry{IsLive: true},
}
wrappedErr := errors.New("probe failed")
rs := raft.Status{}
err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs)
ctx := context.Background()
err := errors.DecodeError(ctx, errors.EncodeError(ctx, replicaUnavailableError(
wrappedErr, desc, desc.Replicas().AsProto()[0], lm, &rs),
))
require.True(t, errors.Is(err, wrappedErr), "%+v", err)
echotest.Require(t, string(redact.Sprint(err)), testutils.TestDataPath(t, "replica_unavailable_error.txt"))
}
9 changes: 4 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,16 +1227,15 @@ func (r *Replica) refreshProposalsLocked(
// request got added in while the probe was about to shut down, there will
// be regular attempts at healing the breaker.
if maxSlowProposalDuration > 0 && r.breaker.Signal().Err() == nil {
log.Warningf(ctx,
"have been waiting %.2fs for slow proposal %s",
maxSlowProposalDuration.Seconds(), maxSlowProposalDurationRequest,
)
err := errors.Errorf("have been waiting %.2fs for slow proposal %s",
maxSlowProposalDuration.Seconds(), maxSlowProposalDurationRequest)
log.Warningf(ctx, "%s", err)
// NB: this is async because we're holding lots of locks here, and we want
// to avoid having to pass all the information about the replica into the
// breaker (since the breaker needs access to this information at will to
// power the probe anyway). Over time, we anticipate there being multiple
// mechanisms which trip the breaker.
r.breaker.TripAsync()
r.breaker.TripAsync(err)
}

if log.V(1) && len(reproposals) > 0 {
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -450,17 +451,20 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
LockSpans: lockSpans, // nil if g != nil
}, requestEvalKind)
if pErr != nil {
if errors.HasType(pErr.GoError(), (*poison.PoisonedError)(nil)) {
brErr := r.breaker.Signal().Err()
if brErr == nil {
// The breaker may have healed in the meantime.
//
// TODO(tbg): it would be nicer if poisoning took an err and it
// came wrapped with the PoisonedError instead. Or we could
// retry the request.
brErr = r.replicaUnavailableError()
}
pErr = roachpb.NewError(errors.CombineErrors(brErr, pErr.GoError()))
if poisonErr := (*poison.PoisonedError)(nil); errors.As(pErr.GoError(), &poisonErr) {
// NB: we make the breaker error (which may be nil at this point, but
// usually is not) a secondary error, meaning it is not in the error
// chain. That is fine; the important bits to investigate
// programmatically are the ReplicaUnavailableError (which contains the
// descriptor) and the *PoisonedError (which contains the concrete
// subspan that caused this request to fail). We mark
// circuit.ErrBreakerOpen into the chain as well so that we have the
// invariant that all replica circuit breaker errors contain both
// ErrBreakerOpen and ReplicaUnavailableError.
pErr = roachpb.NewError(r.replicaUnavailableError(errors.CombineErrors(
errors.Mark(poisonErr, circuit.ErrBreakerOpen),
r.breaker.Signal().Err(),
)))
}
return nil, pErr
} else if resp != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/testdata/replica_unavailable_error.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
replicas on non-live nodes: (n2,s20):2 (lost quorum: true): raft status: {"id":"0","term":0,"vote":"0","commit":0,"lead":"0","raftState":"StateFollower","applied":0,"progress":{},"leadtransferee":"0"}: replica (n1,s10):1 unable to serve request to r10:‹{a-z}› [(n1,s10):1, (n2,s20):2, next=3, gen=0]
replica (n1,s10):1 unable to serve request to r10:‹{a-z}› [(n1,s10):1, (n2,s20):2, next=3, gen=0]: raft status: {"id":"0","term":0,"vote":"0","commit":0,"lead":"0","raftState":"StateFollower","applied":0,"progress":{},"leadtransferee":"0"}: replicas on non-live nodes: (n2,s20):2 (lost quorum: true): probe failed
1 change: 1 addition & 0 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_library(
"@com_github_cockroachdb_errors//errorspb",
"@com_github_cockroachdb_errors//extgrpc",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_mock//gomock", # keep
"@io_etcd_go_etcd_raft_v3//raftpb",
"@org_golang_google_grpc//metadata", # keep
Expand Down
1 change: 1 addition & 0 deletions pkg/roachpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ message ReplicaUnavailableError {

optional roachpb.RangeDescriptor desc = 2 [(gogoproto.nullable) = false];
optional roachpb.ReplicaDescriptor replica = 4 [(gogoproto.nullable) = false];
optional errorspb.EncodedError cause = 5 [(gogoproto.nullable) = false];
}

// A RaftGroupDeletedError indicates a raft group has been deleted for
Expand Down
28 changes: 26 additions & 2 deletions pkg/roachpb/replica_unavailable_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,33 @@
package roachpb

import (
context "context"
"fmt"

"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
)

// NewReplicaUnavailableError initializes a new *ReplicaUnavailableError. It is
// provided with the range descriptor known to the replica, and the relevant
// replica descriptor within.
func NewReplicaUnavailableError(desc *RangeDescriptor, replDesc ReplicaDescriptor) error {
func NewReplicaUnavailableError(
cause error, desc *RangeDescriptor, replDesc ReplicaDescriptor,
) error {
return &ReplicaUnavailableError{
Desc: *desc,
Replica: replDesc,
Cause: errors.EncodeError(context.Background(), cause),
}
}

var _ errors.SafeFormatter = (*ReplicaUnavailableError)(nil)
var _ fmt.Formatter = (*ReplicaUnavailableError)(nil)
var _ errors.Wrapper = (*ReplicaUnavailableError)(nil)

// SafeFormatError implements errors.SafeFormatter.
func (e *ReplicaUnavailableError) SafeFormatError(p errors.Printer) error {
p.Printf("replica %s unable to serve request to %s", e.Replica, e.Desc)
p.Printf("replica %s unable to serve request to %s: %s", e.Replica, e.Desc, e.Unwrap())
return nil
}

Expand All @@ -42,3 +48,21 @@ func (e *ReplicaUnavailableError) Format(s fmt.State, verb rune) { errors.Format
func (e *ReplicaUnavailableError) Error() string {
return fmt.Sprint(e)
}

// Unwrap implements errors.Wrapper.
func (e *ReplicaUnavailableError) Unwrap() error {
return errors.DecodeError(context.Background(), e.Cause)
}

func init() {
encode := func(ctx context.Context, err error) (msgPrefix string, safeDetails []string, payload proto.Message) {
errors.As(err, &payload) // payload = err.(proto.Message)
return "", nil, payload
}
decode := func(ctx context.Context, cause error, msgPrefix string, safeDetails []string, payload proto.Message) error {
return payload.(*ReplicaUnavailableError)
}
typeName := errors.GetTypeKey((*ReplicaUnavailableError)(nil))
errors.RegisterWrapperEncoder(typeName, encode)
errors.RegisterWrapperDecoder(typeName, decode)
}
6 changes: 5 additions & 1 deletion pkg/roachpb/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,12 @@ func TestReplicaUnavailableError(t *testing.T) {
set.AddReplica(rDesc)
desc := roachpb.NewRangeDescriptor(123, roachpb.RKeyMin, roachpb.RKeyMax, set)

var err = roachpb.NewReplicaUnavailableError(desc, rDesc)
errSlowProposal := errors.New("slow proposal")
var err = roachpb.NewReplicaUnavailableError(errSlowProposal, desc, rDesc)
err = errors.DecodeError(ctx, errors.EncodeError(ctx, err))
// Sanity check that Unwrap() was implemented.
require.True(t, errors.Is(err, errSlowProposal), "%+v", err)
require.True(t, errors.HasType(err, (*roachpb.ReplicaUnavailableError)(nil)), "%+v", err)

s := fmt.Sprintf("%s\n%s", err, redact.Sprint(err))
echotest.Require(t, s, filepath.Join("testdata", "replica_unavailable_error.txt"))
Expand Down
4 changes: 2 additions & 2 deletions pkg/roachpb/testdata/replica_unavailable_error.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
echo
----
replica (n1,s2):3 unable to serve request to r123:/M{in-ax} [(n1,s2):1, next=2, gen=0]
replica (n1,s2):3 unable to serve request to r123:‹/M{in-ax}› [(n1,s2):1, next=2, gen=0]
replica (n1,s2):3 unable to serve request to r123:/M{in-ax} [(n1,s2):1, next=2, gen=0]: slow proposal
replica (n1,s2):3 unable to serve request to r123:‹/M{in-ax}› [(n1,s2):1, next=2, gen=0]: slow proposal

0 comments on commit 68566b6

Please sign in to comment.