Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Introduce a field for RPC error code and Resource Exhausted error type #3005

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/aggregator/server/http/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func writeResponse(w http.ResponseWriter, resp interface{}, err error) {

if err == nil {
w.WriteHeader(http.StatusOK)
} else if xerrors.IsInvalidParams(err) {
} else if xerrors.IsInvalidParams(err) || xerrors.IsResourceExhausted(err) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are quite a few cases in this PR grouping invalid param with resource exhausted errors does make sense, e.g.:

  • returning same status code (404 in this case);
  • incrementing the same metric.

In such cases I wanted to limit the amount of changes in a single PR and I erred on maintaining externally observable behavior (status codes, published metrics) as it was up until now. This would be fixed in upcoming commits.

w.WriteHeader(http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusInternalServerError)
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/etcd/watchmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha
return watchChan, cancelFn, nil
case <-time.After(timeout):
cancelFn()
return nil, nil, fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key)
return nil, nil, fmt.Errorf("etcd watch create timed out after %s for key: %s",
timeout.String(), key)
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/cluster/mocks"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
"golang.org/x/net/context"

"github.com/m3db/m3/src/cluster/mocks"
)

func TestWatchChan(t *testing.T) {
wh, ec, _, _, _, closer := testSetup(t)
wh, ec, _, _, _, closer := testSetup(t) //nolint:dogsled
defer closer()

wc, _, err := wh.watchChanWithTimeout("foo", 0)
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/kv/etcd/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/retry"

"go.etcd.io/etcd/clientv3"
"github.com/golang/protobuf/proto"
"github.com/uber-go/tally"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"golang.org/x/net/context"
)
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/services/heartbeat/etcd/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (
"github.com/m3db/m3/src/x/retry"
"github.com/m3db/m3/src/x/watch"

"go.etcd.io/etcd/clientv3"
"github.com/golang/protobuf/proto"
"github.com/uber-go/tally"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"golang.org/x/net/context"
)
Expand Down
35 changes: 32 additions & 3 deletions src/dbnode/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,35 @@ func IsBadRequestError(err error) bool {
return false
}

// IsResourceExhaustedError determines if the error is a resource exhausted error.
func IsResourceExhaustedError(err error) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Call this IsResourceExhaustedErrorCode to match the fact that it's matching on ErrorCode not ErrorType?

for err != nil {
if e, ok := err.(*rpc.Error); ok && tterrors.IsResourceExhaustedError(e) { //nolint:errorlint
return true
}
if e := xerrors.GetInnerResourceExhaustedError(err); e != nil {
return true
}
err = xerrors.InnerError(err)
}
return false
}

// WrapIfNonRetryable wraps the error with non-retryable type if appropriate.
func WrapIfNonRetryable(err error) error {
// NB: due to resource exhausted RPC error implementation, it has to be checked
// before bad request error. See NewResourceExhausted() in
// src/dbnode/network/server/tchannelthrift/errors/errors.go for more details.
if IsResourceExhaustedError(err) {
err = xerrors.NewResourceExhaustedError(err)
err = xerrors.NewNonRetryableError(err)
} else if IsBadRequestError(err) {
err = xerrors.NewInvalidParamsError(err)
err = xerrors.NewNonRetryableError(err)
}
return err
}

// IsConsistencyResultError determines if the error is a consistency result error.
func IsConsistencyResultError(err error) bool {
_, ok := err.(consistencyResultErr)
Expand Down Expand Up @@ -137,15 +166,15 @@ func newConsistencyResultError(
enqueued, responded int,
errs []error,
) consistencyResultError {
// NB(r): if any errors are bad request errors, encapsulate that error
// to ensure the error itself is wholly classified as a bad request error
// NB(r): if any errors are bad request or resource exhausted errors, encapsulate that error
// to ensure the error itself is wholly classified accordingly
var topLevelErr error
for i := 0; i < len(errs); i++ {
if topLevelErr == nil {
topLevelErr = errs[i]
continue
}
if IsBadRequestError(errs[i]) {
if IsBadRequestError(errs[i]) || IsResourceExhaustedError(errs[i]) {
topLevelErr = errs[i]
break
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/client/fetch_attempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (f *fetchAttempt) perform() error {
f.args.ids, f.args.start, f.args.end)
f.result = result

if IsBadRequestError(err) {
if IsBadRequestError(err) || IsResourceExhaustedError(err) {
// Do not retry bad request errors
err = xerrors.NewNonRetryableError(err)
}
Expand Down
8 changes: 1 addition & 7 deletions src/dbnode/client/fetch_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/x/xpool"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/serialize"
)
Expand Down Expand Up @@ -137,12 +136,7 @@ func (f *fetchState) completionFn(
result interface{},
resultErr error,
) {
if IsBadRequestError(resultErr) {
// Wrap with invalid params and non-retryable so it is
// not retried.
resultErr = xerrors.NewInvalidParamsError(resultErr)
resultErr = xerrors.NewNonRetryableError(resultErr)
}
resultErr = WrapIfNonRetryable(resultErr)

f.Lock()
defer func() {
Expand Down
6 changes: 5 additions & 1 deletion src/dbnode/client/fetch_tagged_results_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ func (accum *fetchTaggedResultAccumulator) accumulatedResult(
err := fmt.Errorf("unable to satisfy consistency requirements: shards=%d, err=%v",
accum.numShardsPending, accum.errors)
for i := range accum.errors {
if IsBadRequestError(accum.errors[i]) {
if IsResourceExhaustedError(accum.errors[i]) {
err = xerrors.NewResourceExhaustedError(err)
err = xerrors.NewNonRetryableError(err)
break
} else if IsBadRequestError(accum.errors[i]) {
err = xerrors.NewInvalidParamsError(err)
err = xerrors.NewNonRetryableError(err)
break
Expand Down
18 changes: 8 additions & 10 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,16 @@ func (s *session) newPeerMetadataStreamingProgressMetrics(

func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32, start time.Time) {
if idx := s.nodesRespondingErrorsMetricIndex(respErrs); idx >= 0 {
if IsBadRequestError(consistencyResultErr) {
if IsBadRequestError(consistencyResultErr) || IsResourceExhaustedError(consistencyResultErr) {
s.metrics.writeNodesRespondingBadRequestErrors[idx].Inc(1)
} else {
s.metrics.writeNodesRespondingErrors[idx].Inc(1)
}
}
if consistencyResultErr == nil {
s.metrics.writeSuccess.Inc(1)
} else if IsBadRequestError(consistencyResultErr) {
} else if IsBadRequestError(consistencyResultErr) ||
IsResourceExhaustedError(consistencyResultErr) {
s.metrics.writeErrorsBadRequest.Inc(1)
} else {
s.metrics.writeErrorsInternalError.Inc(1)
Expand All @@ -470,15 +471,17 @@ func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32,

func (s *session) recordFetchMetrics(consistencyResultErr error, respErrs int32, start time.Time) {
if idx := s.nodesRespondingErrorsMetricIndex(respErrs); idx >= 0 {
if IsBadRequestError(consistencyResultErr) {
if IsBadRequestError(consistencyResultErr) ||
IsResourceExhaustedError(consistencyResultErr) {
s.metrics.fetchNodesRespondingBadRequestErrors[idx].Inc(1)
} else {
s.metrics.fetchNodesRespondingErrors[idx].Inc(1)
}
}
if consistencyResultErr == nil {
s.metrics.fetchSuccess.Inc(1)
} else if IsBadRequestError(consistencyResultErr) {
} else if IsBadRequestError(consistencyResultErr) ||
IsResourceExhaustedError(consistencyResultErr) {
s.metrics.fetchErrorsBadRequest.Inc(1)
} else {
s.metrics.fetchErrorsInternalError.Inc(1)
Expand Down Expand Up @@ -1741,12 +1744,7 @@ func (s *session) fetchIDsAttempt(
completionFn := func(result interface{}, err error) {
var snapshotSuccess int32
if err != nil {
if IsBadRequestError(err) {
// Wrap with invalid params and non-retryable so it is
// not retried.
err = xerrors.NewInvalidParamsError(err)
err = xerrors.NewNonRetryableError(err)
}
err = WrapIfNonRetryable(err)
atomic.AddInt32(&errs, 1)
// NB(r): reuse the error lock here as we do not want to create
// a whole lot of locks for every single ID fetched due to size
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/client/write_attempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (w *writeAttempt) perform() error {
w.args.namespace, w.args.id, w.args.tags, w.args.t,
w.args.value, w.args.unit, w.args.annotation)

if IsBadRequestError(err) {
if IsBadRequestError(err) || IsResourceExhaustedError(err) {
// Do not retry bad request errors
err = xerrors.NewNonRetryableError(err)
}
Expand Down
7 changes: 1 addition & 6 deletions src/dbnode/client/write_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,7 @@ func (w *writeState) completionFn(result interface{}, err error) {
var wErr error

if err != nil {
if IsBadRequestError(err) {
// Wrap with invalid params and non-retryable so it is
// not retried.
err = xerrors.NewInvalidParamsError(err)
err = xerrors.NewNonRetryableError(err)
}
err = WrapIfNonRetryable(err)
wErr = xerrors.NewRenamedError(err, fmt.Errorf("error writing to host %s: %v", hostID, err))
} else if hostShardSet, ok := w.topoMap.LookupHostShardSet(hostID); !ok {
errStr := "missing host shard in writeState completionFn: %s"
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/generated/thrift/rpc.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ enum ErrorType {
BAD_REQUEST
}

enum ErrorCode {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's use ErrorFlags instead of ErrorCode as we arrived at during our catchup? Will map the naming of the error value to how we would like to use them if flags&RESOURCE_EXHAUSTED { ... }.

NONE = 0x00,
RESOURCE_EXHAUSTED = 0x01
}

exception Error {
1: required ErrorType type = ErrorType.INTERNAL_ERROR
2: required string message
3: optional ErrorCode code = ErrorCode.NONE
}

exception WriteBatchRawErrors {
Expand Down
Loading