Skip to content

Commit

Permalink
Merge #77365
Browse files Browse the repository at this point in the history
77365: roachpb,kvserver: make ReplicaUnavailableError a wrapping error r=erikgrinaker a=tbg

`ReplicaUnavailableError` was previously a leaf error, so the only
way to attach additional information "on the way out" was to wrap *it*
with wrapper errors. This made it difficult to read the messages since
the result and cause were reversed.

This became exacerbated with the recent addition of `PoisonedError`,
which should really be a cause of `ReplicaUnavailableError` too.

In this commit, we make ReplicaUnavailableError a wrapper error and
rearrange breaker errors such that if a PoisonError occurs, it is
a cause of the ReplicaUnavailableError.

The change in `pkg/kv/kvserver/testdata/replica_unavailable_error.txt`
illustrates the improved error users will see (and which will thus be
reported to us).

As a wrapping error, I needed to register the error with
`cockroachdb/errors` to allow for proper encoding/decoding. I'm unsure
whether this worked properly before, but now it definitely does (as it
is tested).

Testing was improved to check for presence of `ReplicaUnavailableError`
in all breaker errors.

Touches #33007.

Release justification: UX improvement for existing functionality
Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Mar 7, 2022
2 parents 3b7f761 + 227fbe7 commit fd66b4b
Show file tree
Hide file tree
Showing 16 changed files with 156 additions and 59 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/closedts/sidetransport:sidetransport_test",
"//pkg/kv/kvserver/closedts/tracker:tracker_test",
"//pkg/kv/kvserver/closedts:closedts_test",
"//pkg/kv/kvserver/concurrency/poison:poison_test",
"//pkg/kv/kvserver/concurrency:concurrency_test",
"//pkg/kv/kvserver/gc:gc_test",
"//pkg/kv/kvserver/idalloc:idalloc_test",
Expand Down
14 changes: 8 additions & 6 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) {
require.NoError(t, tc.Write(n1))
// Disable the probe so that when the breaker trips, it stays tripped.
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("injected breaker error"))
tc.TripBreaker(n1)

s1 := tc.GetFirstStoreFromServer(t, n1)
s2 := tc.GetFirstStoreFromServer(t, n2)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) {
require.NoError(t, tc.Write(n1))
// Disable the probe on n2 so that when the breaker trips, it stays tripped.
tc.SetProbeEnabled(n2, false)
tc.Report(n2, errors.New("injected breaker error"))
tc.TripBreaker(n2)

// We didn't trip the leaseholder n1, so it is unaffected.
require.NoError(t, tc.Read(n1))
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) {
// disabled.
require.NoError(t, tc.Write(n1))
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("injected breaker error"))
tc.TripBreaker(n1)
resumeHeartbeats := tc.ExpireAllLeasesAndN1LivenessRecord(t, pauseHeartbeats)

// On n1, run into the circuit breaker when requesting lease. We have to
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) {
// disabled, i.e. it will stay tripped.
require.NoError(t, tc.Write(n1))
tc.SetProbeEnabled(n1, false)
tc.Report(n1, errors.New("injected breaker error"))
tc.TripBreaker(n1)

exemptRequests := []func() roachpb.Request{
func() roachpb.Request { return &roachpb.ExportRequest{ReturnSST: true} },
Expand Down Expand Up @@ -780,8 +780,9 @@ func (cbt *circuitBreakerTest) SetProbeEnabled(idx int, to bool) {
cbt.repls[idx].setProbeEnabled(to)
}

func (cbt *circuitBreakerTest) Report(idx int, err error) {
cbt.repls[idx].Replica.Breaker().Report(err)
func (cbt *circuitBreakerTest) TripBreaker(idx int) {
repl := cbt.repls[idx].Replica
repl.TripBreaker()
}

func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) error, idx int) {
Expand Down Expand Up @@ -938,6 +939,7 @@ func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) {
err = aErr.WrappedErr.GoError()
}
require.True(t, errors.Is(err, circuit.ErrBreakerOpen), "%+v", err)
require.True(t, errors.HasType(err, (*roachpb.ReplicaUnavailableError)(nil)), "%+v", err)
}

func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) {
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/kvserver/concurrency/poison/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")

proto_library(
Expand Down Expand Up @@ -42,3 +42,20 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "poison_test",
srcs = ["error_test.go"],
data = glob(["testdata/**"]),
deps = [
":poison",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/testutils/echotest",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_stretchr_testify//require",
],
)
39 changes: 39 additions & 0 deletions pkg/kv/kvserver/concurrency/poison/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package poison_test

import (
"context"
"path/filepath"
"testing"

_ "github.com/cockroachdb/cockroach/pkg/keys" // to init roachpb.PrettyPrintRange
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/require"
)

func TestPoisonedError(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
err := errors.DecodeError(ctx, errors.EncodeError(ctx, poison.NewPoisonedError(
roachpb.Span{Key: roachpb.Key("a")}, hlc.Timestamp{WallTime: 1},
)))
require.True(t, errors.HasType(err, (*poison.PoisonedError)(nil)), "%+v", err)
var buf redact.StringBuilder
buf.Printf("%s", err)
echotest.Require(t, string(buf.RedactableString()), filepath.Join("testdata", "poisoned_error.txt"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
encountered poisoned latch ‹a›@0.000000001,0
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ func (r *Replica) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
return r.closedTimestampPolicyRLocked()
}

// TripBreaker synchronously trips the breaker.
func (r *Replica) TripBreaker() {
r.breaker.tripSync(errors.New("injected error"))
}

// GetCircuitBreaker returns the circuit breaker controlling
// connection attempts to the specified node.
func (t *RaftTransport) GetCircuitBreaker(
Expand Down
36 changes: 17 additions & 19 deletions pkg/kv/kvserver/replica_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type replicaInCircuitBreaker interface {
Desc() *roachpb.RangeDescriptor
Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
slowReplicationThreshold(ba *roachpb.BatchRequest) (time.Duration, bool)
replicaUnavailableError() error
replicaUnavailableError(err error) error
poisonInflightLatches(err error)
}

Expand Down Expand Up @@ -102,23 +102,23 @@ func (br *replicaCircuitBreaker) enabled() bool {
return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 && br.canEnable()
}

func (br *replicaCircuitBreaker) newError() error {
return br.r.replicaUnavailableError()
}

func (br *replicaCircuitBreaker) TripAsync() {
func (br *replicaCircuitBreaker) TripAsync(err error) {
if !br.enabled() {
return
}

_ = br.stopper.RunAsyncTask(
br.ambCtx.AnnotateCtx(context.Background()), "trip-breaker",
func(ctx context.Context) {
br.wrapped.Report(br.newError())
br.tripSync(err)
},
)
}

func (br *replicaCircuitBreaker) tripSync(err error) {
br.wrapped.Report(br.r.replicaUnavailableError(err))
}

type signaller interface {
Err() error
C() <-chan struct{}
Expand Down Expand Up @@ -240,12 +240,13 @@ func sendProbe(ctx context.Context, r replicaInCircuitBreaker) error {
return pErr.GoError()
},
); err != nil {
return errors.CombineErrors(r.replicaUnavailableError(), err)
return r.replicaUnavailableError(err)
}
return nil
}

func replicaUnavailableError(
err error,
desc *roachpb.RangeDescriptor,
replDesc roachpb.ReplicaDescriptor,
lm liveness.IsLiveMap,
Expand All @@ -270,25 +271,22 @@ func replicaUnavailableError(
var _ redact.SafeFormatter = desc
var _ redact.SafeFormatter = replDesc

err := roachpb.NewReplicaUnavailableError(desc, replDesc)
if len(nonLiveRepls.AsProto()) > 0 {
err = errors.Wrapf(err, "replicas on non-live nodes: %v (lost quorum: %t)", nonLiveRepls, !canMakeProgress)
}

err = errors.Wrapf(
err,
"raft status: %+v", redact.Safe(rs), // raft status contains no PII
)
if len(nonLiveRepls.AsProto()) > 0 {
err = errors.Wrapf(err, "replicas on non-live nodes: %v (lost quorum: %t)", nonLiveRepls, !canMakeProgress)
}

return err
return roachpb.NewReplicaUnavailableError(err, desc, replDesc)
}

func (r *Replica) replicaUnavailableError() error {
func (r *Replica) replicaUnavailableError(err error) error {
desc := r.Desc()
replDesc, _ := desc.GetReplicaDescriptor(r.store.StoreID())

var isLiveMap liveness.IsLiveMap
if nl := r.store.cfg.NodeLiveness; nl != nil { // exclude unit test
isLiveMap = nl.GetIsLiveMap()
}
return replicaUnavailableError(desc, replDesc, isLiveMap, r.RaftStatus())
isLiveMap, _ := r.store.livenessMap.Load().(liveness.IsLiveMap)
return replicaUnavailableError(err, desc, replDesc, isLiveMap, r.RaftStatus())
}
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package kvserver

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
Expand All @@ -19,7 +20,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3"
)

Expand All @@ -36,7 +39,12 @@ func TestReplicaUnavailableError(t *testing.T) {
lm := liveness.IsLiveMap{
1: liveness.IsLiveMapEntry{IsLive: true},
}
wrappedErr := errors.New("probe failed")
rs := raft.Status{}
err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs)
ctx := context.Background()
err := errors.DecodeError(ctx, errors.EncodeError(ctx, replicaUnavailableError(
wrappedErr, desc, desc.Replicas().AsProto()[0], lm, &rs),
))
require.True(t, errors.Is(err, wrappedErr), "%+v", err)
echotest.Require(t, string(redact.Sprint(err)), testutils.TestDataPath(t, "replica_unavailable_error.txt"))
}
9 changes: 4 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,16 +1227,15 @@ func (r *Replica) refreshProposalsLocked(
// request got added in while the probe was about to shut down, there will
// be regular attempts at healing the breaker.
if maxSlowProposalDuration > 0 && r.breaker.Signal().Err() == nil {
log.Warningf(ctx,
"have been waiting %.2fs for slow proposal %s",
maxSlowProposalDuration.Seconds(), maxSlowProposalDurationRequest,
)
err := errors.Errorf("have been waiting %.2fs for slow proposal %s",
maxSlowProposalDuration.Seconds(), maxSlowProposalDurationRequest)
log.Warningf(ctx, "%s", err)
// NB: this is async because we're holding lots of locks here, and we want
// to avoid having to pass all the information about the replica into the
// breaker (since the breaker needs access to this information at will to
// power the probe anyway). Over time, we anticipate there being multiple
// mechanisms which trip the breaker.
r.breaker.TripAsync()
r.breaker.TripAsync(err)
}

if log.V(1) && len(reproposals) > 0 {
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -450,17 +451,20 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
LockSpans: lockSpans, // nil if g != nil
}, requestEvalKind)
if pErr != nil {
if errors.HasType(pErr.GoError(), (*poison.PoisonedError)(nil)) {
brErr := r.breaker.Signal().Err()
if brErr == nil {
// The breaker may have healed in the meantime.
//
// TODO(tbg): it would be nicer if poisoning took an err and it
// came wrapped with the PoisonedError instead. Or we could
// retry the request.
brErr = r.replicaUnavailableError()
}
pErr = roachpb.NewError(errors.CombineErrors(brErr, pErr.GoError()))
if poisonErr := (*poison.PoisonedError)(nil); errors.As(pErr.GoError(), &poisonErr) {
// NB: we make the breaker error (which may be nil at this point, but
// usually is not) a secondary error, meaning it is not in the error
// chain. That is fine; the important bits to investigate
// programmatically are the ReplicaUnavailableError (which contains the
// descriptor) and the *PoisonedError (which contains the concrete
// subspan that caused this request to fail). We mark
// circuit.ErrBreakerOpen into the chain as well so that we have the
// invariant that all replica circuit breaker errors contain both
// ErrBreakerOpen and ReplicaUnavailableError.
pErr = roachpb.NewError(r.replicaUnavailableError(errors.CombineErrors(
errors.Mark(poisonErr, circuit.ErrBreakerOpen),
r.breaker.Signal().Err(),
)))
}
return nil, pErr
} else if resp != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/testdata/replica_unavailable_error.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
replicas on non-live nodes: (n2,s20):2 (lost quorum: true): raft status: {"id":"0","term":0,"vote":"0","commit":0,"lead":"0","raftState":"StateFollower","applied":0,"progress":{},"leadtransferee":"0"}: replica (n1,s10):1 unable to serve request to r10:‹{a-z}› [(n1,s10):1, (n2,s20):2, next=3, gen=0]
replica (n1,s10):1 unable to serve request to r10:‹{a-z}› [(n1,s10):1, (n2,s20):2, next=3, gen=0]: raft status: {"id":"0","term":0,"vote":"0","commit":0,"lead":"0","raftState":"StateFollower","applied":0,"progress":{},"leadtransferee":"0"}: replicas on non-live nodes: (n2,s20):2 (lost quorum: true): probe failed
1 change: 1 addition & 0 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_library(
"@com_github_cockroachdb_errors//errorspb",
"@com_github_cockroachdb_errors//extgrpc",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_mock//gomock", # keep
"@io_etcd_go_etcd_raft_v3//raftpb",
"@org_golang_google_grpc//metadata", # keep
Expand Down
1 change: 1 addition & 0 deletions pkg/roachpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ message ReplicaUnavailableError {

optional roachpb.RangeDescriptor desc = 2 [(gogoproto.nullable) = false];
optional roachpb.ReplicaDescriptor replica = 4 [(gogoproto.nullable) = false];
optional errorspb.EncodedError cause = 5 [(gogoproto.nullable) = false];
}

// A RaftGroupDeletedError indicates a raft group has been deleted for
Expand Down
Loading

0 comments on commit fd66b4b

Please sign in to comment.