From 9f47d30e58ac37e2cb8eb2b2bdcd7097c9b957e4 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Thu, 10 Dec 2020 12:50:55 +0200 Subject: [PATCH 1/8] update thrift definitions --- src/dbnode/generated/thrift/rpc.thrift | 9 +- src/dbnode/generated/thrift/rpc/rpc.go | 193 ++++++++++++++++++++++++- 2 files changed, 199 insertions(+), 3 deletions(-) diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index 112405c525..61f1018077 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -29,12 +29,19 @@ enum TimeType { enum ErrorType { INTERNAL_ERROR, - BAD_REQUEST + BAD_REQUEST, + RESOURCE_EXHAUSTED +} + +enum ErrorCode { + NONE, + RESOURCE_EXHAUSTED } exception Error { 1: required ErrorType type = ErrorType.INTERNAL_ERROR 2: required string message + 3: optional ErrorCode code = ErrorCode.NONE } exception WriteBatchRawErrors { diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index 52d5696af3..f19b2010ab 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -107,8 +107,9 @@ func (p *TimeType) Value() (driver.Value, error) { type ErrorType int64 const ( - ErrorType_INTERNAL_ERROR ErrorType = 0 - ErrorType_BAD_REQUEST ErrorType = 1 + ErrorType_INTERNAL_ERROR ErrorType = 0 + ErrorType_BAD_REQUEST ErrorType = 1 + ErrorType_RESOURCE_EXHAUSTED ErrorType = 2 ) func (p ErrorType) String() string { @@ -117,6 +118,8 @@ func (p ErrorType) String() string { return "INTERNAL_ERROR" case ErrorType_BAD_REQUEST: return "BAD_REQUEST" + case ErrorType_RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED" } return "" } @@ -127,6 +130,8 @@ func ErrorTypeFromString(s string) (ErrorType, error) { return ErrorType_INTERNAL_ERROR, nil case "BAD_REQUEST": return ErrorType_BAD_REQUEST, nil + case "RESOURCE_EXHAUSTED": + return ErrorType_RESOURCE_EXHAUSTED, nil } return ErrorType(0), fmt.Errorf("not a valid ErrorType string") } @@ -162,6 +167,64 @@ func (p *ErrorType) Value() (driver.Value, error) { return int64(*p), nil } +type ErrorCode int64 + +const ( + ErrorCode_NONE ErrorCode = 0 + ErrorCode_RESOURCE_EXHAUSTED ErrorCode = 1 +) + +func (p ErrorCode) String() string { + switch p { + case ErrorCode_NONE: + return "NONE" + case ErrorCode_RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED" + } + return "" +} + +func ErrorCodeFromString(s string) (ErrorCode, error) { + switch s { + case "NONE": + return ErrorCode_NONE, nil + case "RESOURCE_EXHAUSTED": + return ErrorCode_RESOURCE_EXHAUSTED, nil + } + return ErrorCode(0), fmt.Errorf("not a valid ErrorCode string") +} + +func ErrorCodePtr(v ErrorCode) *ErrorCode { return &v } + +func (p ErrorCode) MarshalText() ([]byte, error) { + return []byte(p.String()), nil +} + +func (p *ErrorCode) UnmarshalText(text []byte) error { + q, err := ErrorCodeFromString(string(text)) + if err != nil { + return err + } + *p = q + return nil +} + +func (p *ErrorCode) Scan(value interface{}) error { + v, ok := value.(int64) + if !ok { + return errors.New("Scan value is not int64") + } + *p = ErrorCode(v) + return nil +} + +func (p *ErrorCode) Value() (driver.Value, error) { + if p == nil { + return nil, nil + } + return int64(*p), nil +} + type AggregateQueryType int64 const ( @@ -223,14 +286,18 @@ func (p *AggregateQueryType) Value() (driver.Value, error) { // Attributes: // - Type // - Message +// - Code type Error struct { Type ErrorType `thrift:"type,1,required" db:"type" json:"type"` Message string `thrift:"message,2,required" db:"message" json:"message"` + Code ErrorCode `thrift:"code,3" db:"code" json:"code,omitempty"` } func NewError() *Error { return &Error{ Type: 0, + + Code: 0, } } @@ -241,6 +308,16 @@ func (p *Error) GetType() ErrorType { func (p *Error) GetMessage() string { return p.Message } + +var Error_Code_DEFAULT ErrorCode = 0 + +func (p *Error) GetCode() ErrorCode { + return p.Code +} +func (p *Error) IsSetCode() bool { + return p.Code != Error_Code_DEFAULT +} + func (p *Error) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -268,6 +345,10 @@ func (p *Error) Read(iprot thrift.TProtocol) error { return err } issetMessage = true + case 3: + if err := p.ReadField3(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -308,6 +389,16 @@ func (p *Error) ReadField2(iprot thrift.TProtocol) error { return nil } +func (p *Error) ReadField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI32(); err != nil { + return thrift.PrependError("error reading field 3: ", err) + } else { + temp := ErrorCode(v) + p.Code = temp + } + return nil +} + func (p *Error) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("Error"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -319,6 +410,9 @@ func (p *Error) Write(oprot thrift.TProtocol) error { if err := p.writeField2(oprot); err != nil { return err } + if err := p.writeField3(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -355,6 +449,21 @@ func (p *Error) writeField2(oprot thrift.TProtocol) (err error) { return err } +func (p *Error) writeField3(oprot thrift.TProtocol) (err error) { + if p.IsSetCode() { + if err := oprot.WriteFieldBegin("code", thrift.I32, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:code: ", p), err) + } + if err := oprot.WriteI32(int32(p.Code)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.code (3) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:code: ", p), err) + } + } + return err +} + func (p *Error) String() string { if p == nil { return "" @@ -2657,6 +2766,8 @@ func (p *FetchRawResult_) ReadField1(iprot thrift.TProtocol) error { func (p *FetchRawResult_) ReadField2(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -3980,6 +4091,8 @@ func (p *FetchTaggedIDResult_) ReadField4(iprot thrift.TProtocol) error { func (p *FetchTaggedIDResult_) ReadField5(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -4907,6 +5020,8 @@ func (p *Block) ReadField2(iprot thrift.TProtocol) error { func (p *Block) ReadField3(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -5955,6 +6070,8 @@ func (p *BlockMetadataV2) ReadField2(iprot thrift.TProtocol) error { func (p *BlockMetadataV2) ReadField3(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -7688,6 +7805,8 @@ func (p *WriteBatchRawError) ReadField1(iprot thrift.TProtocol) error { func (p *WriteBatchRawError) ReadField2(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -19038,6 +19157,8 @@ func (p *NodeQueryResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeQueryResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -19293,6 +19414,8 @@ func (p *NodeAggregateResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeAggregateResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -19548,6 +19671,8 @@ func (p *NodeFetchResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeFetchResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -19772,6 +19897,8 @@ func (p *NodeWriteResult) Read(iprot thrift.TProtocol) error { func (p *NodeWriteResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -19978,6 +20105,8 @@ func (p *NodeWriteTaggedResult) Read(iprot thrift.TProtocol) error { func (p *NodeWriteTaggedResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -20215,6 +20344,8 @@ func (p *NodeAggregateRawResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeAggregateRawResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -20468,6 +20599,8 @@ func (p *NodeFetchBatchRawResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeFetchBatchRawResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -20719,6 +20852,8 @@ func (p *NodeFetchBatchRawV2Result) ReadField0(iprot thrift.TProtocol) error { func (p *NodeFetchBatchRawV2Result) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -20970,6 +21105,8 @@ func (p *NodeFetchBlocksRawResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeFetchBlocksRawResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -21225,6 +21362,8 @@ func (p *NodeFetchTaggedResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeFetchTaggedResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -21476,6 +21615,8 @@ func (p *NodeFetchBlocksMetadataRawV2Result) ReadField0(iprot thrift.TProtocol) func (p *NodeFetchBlocksMetadataRawV2Result) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -22470,6 +22611,8 @@ func (p *NodeRepairResult) Read(iprot thrift.TProtocol) error { func (p *NodeRepairResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -22703,6 +22846,8 @@ func (p *NodeTruncateResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeTruncateResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -22956,6 +23101,8 @@ func (p *NodeAggregateTilesResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeAggregateTilesResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -23161,6 +23308,8 @@ func (p *NodeHealthResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeHealthResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -23366,6 +23515,8 @@ func (p *NodeBootstrappedResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeBootstrappedResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -23571,6 +23722,8 @@ func (p *NodeBootstrappedInPlacementOrNoPlacementResult) ReadField0(iprot thrift func (p *NodeBootstrappedInPlacementOrNoPlacementResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -23776,6 +23929,8 @@ func (p *NodeGetPersistRateLimitResult) ReadField0(iprot thrift.TProtocol) error func (p *NodeGetPersistRateLimitResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -24027,6 +24182,8 @@ func (p *NodeSetPersistRateLimitResult) ReadField0(iprot thrift.TProtocol) error func (p *NodeSetPersistRateLimitResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -24232,6 +24389,8 @@ func (p *NodeGetWriteNewSeriesAsyncResult) ReadField0(iprot thrift.TProtocol) er func (p *NodeGetWriteNewSeriesAsyncResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -24483,6 +24642,8 @@ func (p *NodeSetWriteNewSeriesAsyncResult) ReadField0(iprot thrift.TProtocol) er func (p *NodeSetWriteNewSeriesAsyncResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -24688,6 +24849,8 @@ func (p *NodeGetWriteNewSeriesBackoffDurationResult) ReadField0(iprot thrift.TPr func (p *NodeGetWriteNewSeriesBackoffDurationResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -24941,6 +25104,8 @@ func (p *NodeSetWriteNewSeriesBackoffDurationResult) ReadField0(iprot thrift.TPr func (p *NodeSetWriteNewSeriesBackoffDurationResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -25146,6 +25311,8 @@ func (p *NodeGetWriteNewSeriesLimitPerShardPerSecondResult) ReadField0(iprot thr func (p *NodeGetWriteNewSeriesLimitPerShardPerSecondResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -25397,6 +25564,8 @@ func (p *NodeSetWriteNewSeriesLimitPerShardPerSecondResult) ReadField0(iprot thr func (p *NodeSetWriteNewSeriesLimitPerShardPerSecondResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -25648,6 +25817,8 @@ func (p *NodeDebugProfileStartResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeDebugProfileStartResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -25899,6 +26070,8 @@ func (p *NodeDebugProfileStopResult) ReadField0(iprot thrift.TProtocol) error { func (p *NodeDebugProfileStopResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -26150,6 +26323,8 @@ func (p *NodeDebugIndexMemorySegmentsResult) ReadField0(iprot thrift.TProtocol) func (p *NodeDebugIndexMemorySegmentsResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -27381,6 +27556,8 @@ func (p *ClusterHealthResult) ReadField0(iprot thrift.TProtocol) error { func (p *ClusterHealthResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -27605,6 +27782,8 @@ func (p *ClusterWriteResult) Read(iprot thrift.TProtocol) error { func (p *ClusterWriteResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -27811,6 +27990,8 @@ func (p *ClusterWriteTaggedResult) Read(iprot thrift.TProtocol) error { func (p *ClusterWriteTaggedResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -28048,6 +28229,8 @@ func (p *ClusterQueryResult) ReadField0(iprot thrift.TProtocol) error { func (p *ClusterQueryResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -28303,6 +28486,8 @@ func (p *ClusterAggregateResult) ReadField0(iprot thrift.TProtocol) error { func (p *ClusterAggregateResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -28558,6 +28743,8 @@ func (p *ClusterFetchResult) ReadField0(iprot thrift.TProtocol) error { func (p *ClusterFetchResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) @@ -28809,6 +28996,8 @@ func (p *ClusterTruncateResult) ReadField0(iprot thrift.TProtocol) error { func (p *ClusterTruncateResult) ReadField1(iprot thrift.TProtocol) error { p.Err = &Error{ Type: 0, + + Code: 0, } if err := p.Err.Read(iprot); err != nil { return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) From d0cadd4b2f1154cd086ce7f22904c99cf84d4147 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Fri, 11 Dec 2020 12:12:14 +0200 Subject: [PATCH 2/8] Resource Exhausted helper functions --- .../server/tchannelthrift/errors/errors.go | 24 ++++++++++-- .../tchannelthrift/errors/errors_test.go | 39 +++++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) create mode 100644 src/dbnode/network/server/tchannelthrift/errors/errors_test.go diff --git a/src/dbnode/network/server/tchannelthrift/errors/errors.go b/src/dbnode/network/server/tchannelthrift/errors/errors.go index 035e3f4fb8..ba1d77bc59 100644 --- a/src/dbnode/network/server/tchannelthrift/errors/errors.go +++ b/src/dbnode/network/server/tchannelthrift/errors/errors.go @@ -26,9 +26,10 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" ) -func newError(errType rpc.ErrorType, err error) *rpc.Error { +func newError(errType rpc.ErrorType, errCode rpc.ErrorCode, err error) *rpc.Error { rpcErr := rpc.NewError() rpcErr.Type = errType + rpcErr.Code = errCode rpcErr.Message = fmt.Sprintf("%v", err) return rpcErr } @@ -43,14 +44,31 @@ func IsBadRequestError(err *rpc.Error) bool { return err != nil && err.Type == rpc.ErrorType_BAD_REQUEST } +// IsResourceExhaustedError returns whether the error is a resource exhausted error. +func IsResourceExhaustedError(err *rpc.Error) bool { + // NB: To maintain better backwards compatibility, Resource Exhausted errors might also be + // defined by BAD_REQUEST error type coupled with RESOURCE_EXHAUSTED error code + return err != nil && + (err.Type == rpc.ErrorType_RESOURCE_EXHAUSTED || + (err.Type == rpc.ErrorType_BAD_REQUEST && err.Code == rpc.ErrorCode_RESOURCE_EXHAUSTED)) +} + // NewInternalError creates a new internal error func NewInternalError(err error) *rpc.Error { - return newError(rpc.ErrorType_INTERNAL_ERROR, err) + return newError(rpc.ErrorType_INTERNAL_ERROR, rpc.ErrorCode_NONE, err) } // NewBadRequestError creates a new bad request error func NewBadRequestError(err error) *rpc.Error { - return newError(rpc.ErrorType_BAD_REQUEST, err) + return newError(rpc.ErrorType_BAD_REQUEST, rpc.ErrorCode_NONE, err) +} + +// NewResourceExhaustedError creates a new resource exhausted error. +func NewResourceExhaustedError(err error) *rpc.Error { + // NB: To maintain better backwards compatibility, using BAD_REQUEST error type coupled with + // RESOURCE_EXHAUSTED error code. After a reasonable amount of time this should be switched to + // RESOURCE_EXHAUSTED error code (added after M3 release v1.0.0) + return newError(rpc.ErrorType_BAD_REQUEST, rpc.ErrorCode_RESOURCE_EXHAUSTED, err) } // NewWriteBatchRawError creates a new write batch error diff --git a/src/dbnode/network/server/tchannelthrift/errors/errors_test.go b/src/dbnode/network/server/tchannelthrift/errors/errors_test.go new file mode 100644 index 0000000000..dbf1419da1 --- /dev/null +++ b/src/dbnode/network/server/tchannelthrift/errors/errors_test.go @@ -0,0 +1,39 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package errors + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestErrorTypesAreRecognized(t *testing.T) { + someError := errors.New("") + assert.True(t, IsBadRequestError(NewBadRequestError(someError))) + assert.True(t, IsResourceExhaustedError(NewResourceExhaustedError(someError))) + assert.True(t, IsInternalError(NewInternalError(someError))) +} + +func TestResourceExhaustedBackwardsCompatibility(t *testing.T) { + assert.True(t, IsBadRequestError(NewResourceExhaustedError(errors.New("")))) +} From ecdea2b57e5ff7254677ea33e0fa9cd4837c931b Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Fri, 11 Dec 2020 12:30:23 +0200 Subject: [PATCH 3/8] Resource Exhausted error in x/errors --- src/x/errors/errors.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/x/errors/errors.go b/src/x/errors/errors.go index 611710a57f..b50e0e0344 100644 --- a/src/x/errors/errors.go +++ b/src/x/errors/errors.go @@ -130,6 +130,40 @@ func GetInnerInvalidParamsError(err error) error { return nil } +type resourceExhaustedError struct { + containedError +} + +// NewResourceExhaustedError creates a new resource exhausted error. +func NewResourceExhaustedError(inner error) error { + return resourceExhaustedError{containedError{inner}} +} + +func (e resourceExhaustedError) Error() string { + return e.inner.Error() +} + +func (e resourceExhaustedError) InnerError() error { + return e.inner +} + +// IsResourceExhausted returns true if this is a resource exhausted error. +func IsResourceExhausted(err error) bool { + return GetInnerResourceExhaustedError(err) != nil +} + +// GetInnerResourceExhaustedError returns an inner resource exhausted error if contained by +// this error, nil otherwise. +func GetInnerResourceExhaustedError(err error) error { + for err != nil { + if _, ok := err.(resourceExhaustedError); ok { //nolint:errorlint + return InnerError(err) + } + err = InnerError(err) + } + return nil +} + type retryableError struct { containedError } From b92ee9acf1625fd2181315438d4a5b7c5df61f8d Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Fri, 11 Dec 2020 15:24:34 +0200 Subject: [PATCH 4/8] use ResourceExhausted error in code --- src/aggregator/server/http/handlers.go | 2 +- src/dbnode/client/errors.go | 35 +++++++++++++++++-- src/dbnode/client/fetch_attempt.go | 2 +- src/dbnode/client/fetch_state.go | 8 +---- .../fetch_tagged_results_accumulator.go | 6 +++- src/dbnode/client/session.go | 18 +++++----- src/dbnode/client/write_attempt.go | 2 +- src/dbnode/client/write_state.go | 7 +--- .../network/server/httpjson/handlers.go | 2 +- .../server/tchannelthrift/cluster/service.go | 25 ++++++------- .../server/tchannelthrift/convert/convert.go | 3 ++ .../server/tchannelthrift/node/service.go | 11 +++--- src/dbnode/storage/limits/query_limits.go | 2 +- .../storage/limits/query_limits_test.go | 8 ++--- src/dbnode/storage/shard.go | 2 +- .../api/experimental/annotated/handler.go | 2 +- src/query/api/v1/handler/influxdb/write.go | 4 +-- .../api/v1/handler/prometheus/remote/write.go | 4 +-- src/x/errors/errors.go | 22 ++++++++---- src/x/net/http/errors.go | 2 +- 20 files changed, 102 insertions(+), 65 deletions(-) diff --git a/src/aggregator/server/http/handlers.go b/src/aggregator/server/http/handlers.go index e14d90d5d5..60bc031db6 100644 --- a/src/aggregator/server/http/handlers.go +++ b/src/aggregator/server/http/handlers.go @@ -148,7 +148,7 @@ func writeResponse(w http.ResponseWriter, resp interface{}, err error) { if err == nil { w.WriteHeader(http.StatusOK) - } else if xerrors.IsInvalidParams(err) { + } else if xerrors.IsInvalidParams(err) || xerrors.IsResourceExhausted(err) { w.WriteHeader(http.StatusBadRequest) } else { w.WriteHeader(http.StatusInternalServerError) diff --git a/src/dbnode/client/errors.go b/src/dbnode/client/errors.go index 896f1998b4..0b7f33814b 100644 --- a/src/dbnode/client/errors.go +++ b/src/dbnode/client/errors.go @@ -54,6 +54,35 @@ func IsBadRequestError(err error) bool { return false } +// IsResourceExhaustedError determines if the error is a resource exhausted error. +func IsResourceExhaustedError(err error) bool { + for err != nil { + if e, ok := err.(*rpc.Error); ok && tterrors.IsResourceExhaustedError(e) { //nolint:errorlint + return true + } + if e := xerrors.GetInnerResourceExhaustedError(err); e != nil { + return true + } + err = xerrors.InnerError(err) + } + return false +} + +// WrapIfNonRetryable wraps the error with non-retryable type if appropriate. +func WrapIfNonRetryable(err error) error { + // NB: due to resource exhausted RPC error implementation, it has to be checked + // before bad request error. See NewResourceExhausted() in + // src/dbnode/network/server/tchannelthrift/errors/errors.go for more details. + if IsResourceExhaustedError(err) { + err = xerrors.NewResourceExhaustedError(err) + err = xerrors.NewNonRetryableError(err) + } else if IsBadRequestError(err) { + err = xerrors.NewInvalidParamsError(err) + err = xerrors.NewNonRetryableError(err) + } + return err +} + // IsConsistencyResultError determines if the error is a consistency result error. func IsConsistencyResultError(err error) bool { _, ok := err.(consistencyResultErr) @@ -137,15 +166,15 @@ func newConsistencyResultError( enqueued, responded int, errs []error, ) consistencyResultError { - // NB(r): if any errors are bad request errors, encapsulate that error - // to ensure the error itself is wholly classified as a bad request error + // NB(r): if any errors are bad request or resource exhausted errors, encapsulate that error + // to ensure the error itself is wholly classified accordingly var topLevelErr error for i := 0; i < len(errs); i++ { if topLevelErr == nil { topLevelErr = errs[i] continue } - if IsBadRequestError(errs[i]) { + if IsBadRequestError(errs[i]) || IsResourceExhaustedError(errs[i]) { topLevelErr = errs[i] break } diff --git a/src/dbnode/client/fetch_attempt.go b/src/dbnode/client/fetch_attempt.go index cb6b84c99b..2f3329bc6c 100644 --- a/src/dbnode/client/fetch_attempt.go +++ b/src/dbnode/client/fetch_attempt.go @@ -59,7 +59,7 @@ func (f *fetchAttempt) perform() error { f.args.ids, f.args.start, f.args.end) f.result = result - if IsBadRequestError(err) { + if IsBadRequestError(err) || IsResourceExhaustedError(err) { // Do not retry bad request errors err = xerrors.NewNonRetryableError(err) } diff --git a/src/dbnode/client/fetch_state.go b/src/dbnode/client/fetch_state.go index c991e19db5..600da7ec58 100644 --- a/src/dbnode/client/fetch_state.go +++ b/src/dbnode/client/fetch_state.go @@ -31,7 +31,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/x/xpool" - xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/serialize" ) @@ -137,12 +136,7 @@ func (f *fetchState) completionFn( result interface{}, resultErr error, ) { - if IsBadRequestError(resultErr) { - // Wrap with invalid params and non-retryable so it is - // not retried. - resultErr = xerrors.NewInvalidParamsError(resultErr) - resultErr = xerrors.NewNonRetryableError(resultErr) - } + resultErr = WrapIfNonRetryable(resultErr) f.Lock() defer func() { diff --git a/src/dbnode/client/fetch_tagged_results_accumulator.go b/src/dbnode/client/fetch_tagged_results_accumulator.go index 32f3b3aa2d..4a3a2aa56b 100644 --- a/src/dbnode/client/fetch_tagged_results_accumulator.go +++ b/src/dbnode/client/fetch_tagged_results_accumulator.go @@ -208,7 +208,11 @@ func (accum *fetchTaggedResultAccumulator) accumulatedResult( err := fmt.Errorf("unable to satisfy consistency requirements: shards=%d, err=%v", accum.numShardsPending, accum.errors) for i := range accum.errors { - if IsBadRequestError(accum.errors[i]) { + if IsResourceExhaustedError(accum.errors[i]) { + err = xerrors.NewResourceExhaustedError(err) + err = xerrors.NewNonRetryableError(err) + break + } else if IsBadRequestError(accum.errors[i]) { err = xerrors.NewInvalidParamsError(err) err = xerrors.NewNonRetryableError(err) break diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index be79aa8331..3aef7e90e5 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -446,7 +446,7 @@ func (s *session) newPeerMetadataStreamingProgressMetrics( func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32, start time.Time) { if idx := s.nodesRespondingErrorsMetricIndex(respErrs); idx >= 0 { - if IsBadRequestError(consistencyResultErr) { + if IsBadRequestError(consistencyResultErr) || IsResourceExhaustedError(consistencyResultErr) { s.metrics.writeNodesRespondingBadRequestErrors[idx].Inc(1) } else { s.metrics.writeNodesRespondingErrors[idx].Inc(1) @@ -454,7 +454,8 @@ func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32, } if consistencyResultErr == nil { s.metrics.writeSuccess.Inc(1) - } else if IsBadRequestError(consistencyResultErr) { + } else if IsBadRequestError(consistencyResultErr) || + IsResourceExhaustedError(consistencyResultErr) { s.metrics.writeErrorsBadRequest.Inc(1) } else { s.metrics.writeErrorsInternalError.Inc(1) @@ -470,7 +471,8 @@ func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32, func (s *session) recordFetchMetrics(consistencyResultErr error, respErrs int32, start time.Time) { if idx := s.nodesRespondingErrorsMetricIndex(respErrs); idx >= 0 { - if IsBadRequestError(consistencyResultErr) { + if IsBadRequestError(consistencyResultErr) || + IsResourceExhaustedError(consistencyResultErr) { s.metrics.fetchNodesRespondingBadRequestErrors[idx].Inc(1) } else { s.metrics.fetchNodesRespondingErrors[idx].Inc(1) @@ -478,7 +480,8 @@ func (s *session) recordFetchMetrics(consistencyResultErr error, respErrs int32, } if consistencyResultErr == nil { s.metrics.fetchSuccess.Inc(1) - } else if IsBadRequestError(consistencyResultErr) { + } else if IsBadRequestError(consistencyResultErr) || + IsResourceExhaustedError(consistencyResultErr) { s.metrics.fetchErrorsBadRequest.Inc(1) } else { s.metrics.fetchErrorsInternalError.Inc(1) @@ -1741,12 +1744,7 @@ func (s *session) fetchIDsAttempt( completionFn := func(result interface{}, err error) { var snapshotSuccess int32 if err != nil { - if IsBadRequestError(err) { - // Wrap with invalid params and non-retryable so it is - // not retried. - err = xerrors.NewInvalidParamsError(err) - err = xerrors.NewNonRetryableError(err) - } + err = WrapIfNonRetryable(err) atomic.AddInt32(&errs, 1) // NB(r): reuse the error lock here as we do not want to create // a whole lot of locks for every single ID fetched due to size diff --git a/src/dbnode/client/write_attempt.go b/src/dbnode/client/write_attempt.go index 03e5a81127..e38bffc26f 100644 --- a/src/dbnode/client/write_attempt.go +++ b/src/dbnode/client/write_attempt.go @@ -69,7 +69,7 @@ func (w *writeAttempt) perform() error { w.args.namespace, w.args.id, w.args.tags, w.args.t, w.args.value, w.args.unit, w.args.annotation) - if IsBadRequestError(err) { + if IsBadRequestError(err) || IsResourceExhaustedError(err) { // Do not retry bad request errors err = xerrors.NewNonRetryableError(err) } diff --git a/src/dbnode/client/write_state.go b/src/dbnode/client/write_state.go index 61a39d7b61..c1ab8db0dd 100644 --- a/src/dbnode/client/write_state.go +++ b/src/dbnode/client/write_state.go @@ -116,12 +116,7 @@ func (w *writeState) completionFn(result interface{}, err error) { var wErr error if err != nil { - if IsBadRequestError(err) { - // Wrap with invalid params and non-retryable so it is - // not retried. - err = xerrors.NewInvalidParamsError(err) - err = xerrors.NewNonRetryableError(err) - } + err = WrapIfNonRetryable(err) wErr = xerrors.NewRenamedError(err, fmt.Errorf("error writing to host %s: %v", hostID, err)) } else if hostShardSet, ok := w.topoMap.LookupHostShardSet(hostID); !ok { errStr := "missing host shard in writeState completionFn: %s" diff --git a/src/dbnode/network/server/httpjson/handlers.go b/src/dbnode/network/server/httpjson/handlers.go index fd3c49ea9f..b641bda459 100644 --- a/src/dbnode/network/server/httpjson/handlers.go +++ b/src/dbnode/network/server/httpjson/handlers.go @@ -266,7 +266,7 @@ func writeError(w http.ResponseWriter, errValue interface{}) { case Error: w.WriteHeader(v.StatusCode()) case error: - if xerrors.IsInvalidParams(v) { + if xerrors.IsInvalidParams(v) || xerrors.IsResourceExhausted(v) { w.WriteHeader(http.StatusBadRequest) } else { w.WriteHeader(http.StatusInternalServerError) diff --git a/src/dbnode/network/server/tchannelthrift/cluster/service.go b/src/dbnode/network/server/tchannelthrift/cluster/service.go index 40891ef573..2f7598d75f 100644 --- a/src/dbnode/network/server/tchannelthrift/cluster/service.go +++ b/src/dbnode/network/server/tchannelthrift/cluster/service.go @@ -241,10 +241,7 @@ func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchR it, err := session.Fetch(nsID, tsID, start, end) if err != nil { - if client.IsBadRequestError(err) { - return nil, tterrors.NewBadRequestError(err) - } - return nil, tterrors.NewInternalError(err) + return nil, toRPCError(err) } defer it.Close() @@ -340,10 +337,7 @@ func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error { tsID := s.idPool.GetStringID(ctx, req.ID) err = session.Write(nsID, tsID, ts, dp.Value, unit, dp.Annotation) if err != nil { - if client.IsBadRequestError(err) { - return tterrors.NewBadRequestError(err) - } - return tterrors.NewInternalError(err) + return toRPCError(err) } return nil } @@ -377,10 +371,7 @@ func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) err = session.WriteTagged(nsID, tsID, ident.NewTagsIterator(tags), ts, dp.Value, unit, dp.Annotation) if err != nil { - if client.IsBadRequestError(err) { - return tterrors.NewBadRequestError(err) - } - return tterrors.NewInternalError(err) + return toRPCError(err) } return nil } @@ -406,3 +397,13 @@ func (s *service) Truncate(tctx thrift.Context, req *rpc.TruncateRequest) (*rpc. res.NumSeries = truncated return res, nil } + +func toRPCError(err error) *rpc.Error { + if client.IsResourceExhaustedError(err) { + return tterrors.NewResourceExhaustedError(err) + } + if client.IsBadRequestError(err) { + return tterrors.NewBadRequestError(err) + } + return tterrors.NewInternalError(err) +} diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert.go b/src/dbnode/network/server/tchannelthrift/convert/convert.go index b2d4ac20f1..52f1b345e9 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert.go @@ -192,6 +192,9 @@ func ToRPCError(err error) *rpc.Error { if xerrors.IsInvalidParams(err) { return tterrors.NewBadRequestError(err) } + if xerrors.IsResourceExhausted(err) { + return tterrors.NewResourceExhaustedError(err) + } return tterrors.NewInternalError(err) } diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 2250b1ba5a..95531dc108 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -1027,7 +1027,8 @@ func (s *service) FetchBatchRaw(tctx thrift.Context, req *rpc.FetchBatchRawReque segments, rpcErr := s.readEncodedResult(ctx, nsID, encodedResults[i].result) if rpcErr != nil { rawResult.Err = rpcErr - if tterrors.IsBadRequestError(rawResult.Err) { + if tterrors.IsBadRequestError(rawResult.Err) || + tterrors.IsResourceExhaustedError(rawResult.Err) { nonRetryableErrors++ } else { retryableErrors++ @@ -1093,7 +1094,8 @@ func (s *service) FetchBatchRawV2(tctx thrift.Context, req *rpc.FetchBatchRawV2R encodedResult, err := db.ReadEncoded(ctx, nsIdx, tsID, start, end) if err != nil { rawResult.Err = convert.ToRPCError(err) - if tterrors.IsBadRequestError(rawResult.Err) { + if tterrors.IsBadRequestError(rawResult.Err) || + tterrors.IsResourceExhaustedError(rawResult.Err) { nonRetryableErrors++ } else { retryableErrors++ @@ -1104,7 +1106,8 @@ func (s *service) FetchBatchRawV2(tctx thrift.Context, req *rpc.FetchBatchRawV2R segments, rpcErr := s.readEncodedResult(ctx, nsIdx, encodedResult) if rpcErr != nil { rawResult.Err = rpcErr - if tterrors.IsBadRequestError(rawResult.Err) { + if tterrors.IsBadRequestError(rawResult.Err) || + tterrors.IsResourceExhaustedError(rawResult.Err) { nonRetryableErrors++ } else { retryableErrors++ @@ -2542,7 +2545,7 @@ func (r *writeBatchPooledReq) HandleError(index int, err error) { return } - if xerrors.IsInvalidParams(err) { + if xerrors.IsInvalidParams(err) || xerrors.IsResourceExhausted(err) { r.nonRetryableErrors++ r.errs = append( r.errs, diff --git a/src/dbnode/storage/limits/query_limits.go b/src/dbnode/storage/limits/query_limits.go index 49274f83b7..d8646b55a2 100644 --- a/src/dbnode/storage/limits/query_limits.go +++ b/src/dbnode/storage/limits/query_limits.go @@ -192,7 +192,7 @@ func (q *lookbackLimit) exceeded() error { func (q *lookbackLimit) checkLimit(recent int64) error { if q.options.Limit > 0 && recent > q.options.Limit { q.metrics.exceeded.Inc(1) - return xerrors.NewInvalidParamsError(fmt.Errorf( + return xerrors.NewResourceExhaustedError(fmt.Errorf( "query aborted due to limit: name=%s, limit=%d, current=%d, within=%s", q.name, q.options.Limit, recent, q.options.Lookback)) } diff --git a/src/dbnode/storage/limits/query_limits_test.go b/src/dbnode/storage/limits/query_limits_test.go index 5ea202bbc1..ca8342c33d 100644 --- a/src/dbnode/storage/limits/query_limits_test.go +++ b/src/dbnode/storage/limits/query_limits_test.go @@ -66,7 +66,7 @@ func TestQueryLimits(t *testing.T) { require.Error(t, queryLimits.DocsLimit().Inc(2, nil)) err = queryLimits.AnyExceeded() require.Error(t, err) - require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, xerrors.IsResourceExhausted(err)) opts = testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) queryLimits, err = NewQueryLimits(opts) @@ -81,7 +81,7 @@ func TestQueryLimits(t *testing.T) { require.Error(t, queryLimits.BytesReadLimit().Inc(2, nil)) err = queryLimits.AnyExceeded() require.Error(t, err) - require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, xerrors.IsResourceExhausted(err)) } func TestLookbackLimit(t *testing.T) { @@ -160,7 +160,7 @@ func verifyLimit(t *testing.T, limit *lookbackLimit, inc int, expectedLimit int6 require.NoError(t, err) } else { require.Error(t, err) - require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, xerrors.IsResourceExhausted(err)) exceededCount++ } err = limit.exceeded() @@ -168,7 +168,7 @@ func verifyLimit(t *testing.T, limit *lookbackLimit, inc int, expectedLimit int6 require.NoError(t, err) } else { require.Error(t, err) - require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, xerrors.IsResourceExhausted(err)) exceededCount++ } return exceededCount diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 5b1d8e0034..df5908c300 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1567,7 +1567,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { _, _, err = entry.Series.Write(ctx, write.timestamp, write.value, write.unit, annotationBytes, write.opts) if err != nil { - if xerrors.IsInvalidParams(err) { + if xerrors.IsInvalidParams(err) || xerrors.IsResourceExhausted(err) { s.metrics.insertAsyncWriteInvalidParamsErrors.Inc(1) } else { s.metrics.insertAsyncWriteInternalErrors.Inc(1) diff --git a/src/query/api/experimental/annotated/handler.go b/src/query/api/experimental/annotated/handler.go index 0625606f90..cf91ea3392 100644 --- a/src/query/api/experimental/annotated/handler.go +++ b/src/query/api/experimental/annotated/handler.go @@ -92,7 +92,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if batchErr != nil { var foundInternalErr bool for _, err := range batchErr.Errors() { - if client.IsBadRequestError(err) { + if client.IsBadRequestError(err) || client.IsResourceExhaustedError(err) { continue } foundInternalErr = true diff --git a/src/query/api/v1/handler/influxdb/write.go b/src/query/api/v1/handler/influxdb/write.go index dbced1317b..a5a7e2f29d 100644 --- a/src/query/api/v1/handler/influxdb/write.go +++ b/src/query/api/v1/handler/influxdb/write.go @@ -308,10 +308,10 @@ func (iwh *ingestWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) ) for _, err := range errs { switch { - case client.IsBadRequestError(err): + case client.IsBadRequestError(err) || client.IsResourceExhaustedError(err): numBadRequest++ lastBadRequestErr = err.Error() - case xerrors.IsInvalidParams(err): + case xerrors.IsInvalidParams(err) || xerrors.IsResourceExhausted(err): numBadRequest++ lastBadRequestErr = err.Error() default: diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index 6850e4e8eb..7d0cd25a62 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -361,10 +361,10 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ) for _, err := range errs { switch { - case client.IsBadRequestError(err): + case client.IsBadRequestError(err) || client.IsResourceExhaustedError(err): numBadRequest++ lastBadRequestErr = err.Error() - case xerrors.IsInvalidParams(err): + case xerrors.IsInvalidParams(err) || xerrors.IsResourceExhausted(err): numBadRequest++ lastBadRequestErr = err.Error() default: diff --git a/src/x/errors/errors.go b/src/x/errors/errors.go index b50e0e0344..90c581fd77 100644 --- a/src/x/errors/errors.go +++ b/src/x/errors/errors.go @@ -318,17 +318,27 @@ func (e MultiError) FinalError() error { if e.err == nil { return nil } - allInvalidParamsErr := IsInvalidParams(e.err) - for _, containedErr := range e.errors { - if !IsInvalidParams(containedErr) { - allInvalidParamsErr = false - break + + allSatisfy := func(predicate func(error) bool) bool { + if !predicate(e.err) { + return false + } + for _, containedErr := range e.errors { + if !predicate(containedErr) { + return false + } } + return true } - if allInvalidParamsErr { + + if allSatisfy(IsInvalidParams) { // Make sure to correctly wrap this error as an invalid params error. return NewInvalidParamsError(e) } + if allSatisfy(IsResourceExhausted) { + // Make sure to correctly wrap this error as an resource exhausted error. + return NewResourceExhaustedError(e) + } return e } diff --git a/src/x/net/http/errors.go b/src/x/net/http/errors.go index 331e23e4d3..9c7a200bed 100644 --- a/src/x/net/http/errors.go +++ b/src/x/net/http/errors.go @@ -108,7 +108,7 @@ func getStatusCode(err error) int { case Error: return v.Code() case error: - if xerrors.IsInvalidParams(v) { + if xerrors.IsInvalidParams(v) || xerrors.IsResourceExhausted(v) { return http.StatusBadRequest } else if errors.Is(err, context.DeadlineExceeded) { return http.StatusGatewayTimeout From bde9a4f74baf71bf5e072a17054d4ca738365a08 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Mon, 14 Dec 2020 10:02:00 +0200 Subject: [PATCH 5/8] remove ErrorType.RESOURCE_EXHAUSTED --- src/dbnode/generated/thrift/rpc.thrift | 3 +-- src/dbnode/generated/thrift/rpc/rpc.go | 9 ++------- .../network/server/tchannelthrift/errors/errors.go | 9 ++------- 3 files changed, 5 insertions(+), 16 deletions(-) diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index 61f1018077..feae823e45 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -29,8 +29,7 @@ enum TimeType { enum ErrorType { INTERNAL_ERROR, - BAD_REQUEST, - RESOURCE_EXHAUSTED + BAD_REQUEST } enum ErrorCode { diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index f19b2010ab..b803c3d69b 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -107,9 +107,8 @@ func (p *TimeType) Value() (driver.Value, error) { type ErrorType int64 const ( - ErrorType_INTERNAL_ERROR ErrorType = 0 - ErrorType_BAD_REQUEST ErrorType = 1 - ErrorType_RESOURCE_EXHAUSTED ErrorType = 2 + ErrorType_INTERNAL_ERROR ErrorType = 0 + ErrorType_BAD_REQUEST ErrorType = 1 ) func (p ErrorType) String() string { @@ -118,8 +117,6 @@ func (p ErrorType) String() string { return "INTERNAL_ERROR" case ErrorType_BAD_REQUEST: return "BAD_REQUEST" - case ErrorType_RESOURCE_EXHAUSTED: - return "RESOURCE_EXHAUSTED" } return "" } @@ -130,8 +127,6 @@ func ErrorTypeFromString(s string) (ErrorType, error) { return ErrorType_INTERNAL_ERROR, nil case "BAD_REQUEST": return ErrorType_BAD_REQUEST, nil - case "RESOURCE_EXHAUSTED": - return ErrorType_RESOURCE_EXHAUSTED, nil } return ErrorType(0), fmt.Errorf("not a valid ErrorType string") } diff --git a/src/dbnode/network/server/tchannelthrift/errors/errors.go b/src/dbnode/network/server/tchannelthrift/errors/errors.go index ba1d77bc59..728081a728 100644 --- a/src/dbnode/network/server/tchannelthrift/errors/errors.go +++ b/src/dbnode/network/server/tchannelthrift/errors/errors.go @@ -46,11 +46,7 @@ func IsBadRequestError(err *rpc.Error) bool { // IsResourceExhaustedError returns whether the error is a resource exhausted error. func IsResourceExhaustedError(err *rpc.Error) bool { - // NB: To maintain better backwards compatibility, Resource Exhausted errors might also be - // defined by BAD_REQUEST error type coupled with RESOURCE_EXHAUSTED error code - return err != nil && - (err.Type == rpc.ErrorType_RESOURCE_EXHAUSTED || - (err.Type == rpc.ErrorType_BAD_REQUEST && err.Code == rpc.ErrorCode_RESOURCE_EXHAUSTED)) + return err != nil && err.Code == rpc.ErrorCode_RESOURCE_EXHAUSTED } // NewInternalError creates a new internal error @@ -66,8 +62,7 @@ func NewBadRequestError(err error) *rpc.Error { // NewResourceExhaustedError creates a new resource exhausted error. func NewResourceExhaustedError(err error) *rpc.Error { // NB: To maintain better backwards compatibility, using BAD_REQUEST error type coupled with - // RESOURCE_EXHAUSTED error code. After a reasonable amount of time this should be switched to - // RESOURCE_EXHAUSTED error code (added after M3 release v1.0.0) + // RESOURCE_EXHAUSTED error code return newError(rpc.ErrorType_BAD_REQUEST, rpc.ErrorCode_RESOURCE_EXHAUSTED, err) } From fba214e74c2b4ebc76db8bd0ae2ce8e46d0c9abc Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Mon, 14 Dec 2020 10:06:32 +0200 Subject: [PATCH 6/8] use convert.ToRPCError() --- .../server/tchannelthrift/cluster/service.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/src/dbnode/network/server/tchannelthrift/cluster/service.go b/src/dbnode/network/server/tchannelthrift/cluster/service.go index 2f7598d75f..0e2548acdd 100644 --- a/src/dbnode/network/server/tchannelthrift/cluster/service.go +++ b/src/dbnode/network/server/tchannelthrift/cluster/service.go @@ -241,7 +241,7 @@ func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchR it, err := session.Fetch(nsID, tsID, start, end) if err != nil { - return nil, toRPCError(err) + return nil, convert.ToRPCError(err) } defer it.Close() @@ -337,7 +337,7 @@ func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error { tsID := s.idPool.GetStringID(ctx, req.ID) err = session.Write(nsID, tsID, ts, dp.Value, unit, dp.Annotation) if err != nil { - return toRPCError(err) + return convert.ToRPCError(err) } return nil } @@ -371,7 +371,7 @@ func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) err = session.WriteTagged(nsID, tsID, ident.NewTagsIterator(tags), ts, dp.Value, unit, dp.Annotation) if err != nil { - return toRPCError(err) + return convert.ToRPCError(err) } return nil } @@ -397,13 +397,3 @@ func (s *service) Truncate(tctx thrift.Context, req *rpc.TruncateRequest) (*rpc. res.NumSeries = truncated return res, nil } - -func toRPCError(err error) *rpc.Error { - if client.IsResourceExhaustedError(err) { - return tterrors.NewResourceExhaustedError(err) - } - if client.IsBadRequestError(err) { - return tterrors.NewBadRequestError(err) - } - return tterrors.NewInternalError(err) -} From ca338326551f92d1ac8a7ac147aa2747a2737ab1 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Mon, 14 Dec 2020 11:03:54 +0200 Subject: [PATCH 7/8] fix lint errors --- src/cluster/etcd/watchmanager/manager.go | 3 ++- src/cluster/etcd/watchmanager/manager_test.go | 6 +++--- src/cluster/kv/etcd/store.go | 2 +- src/cluster/services/heartbeat/etcd/store.go | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/cluster/etcd/watchmanager/manager.go b/src/cluster/etcd/watchmanager/manager.go index cef548c8fa..71a70a1fce 100644 --- a/src/cluster/etcd/watchmanager/manager.go +++ b/src/cluster/etcd/watchmanager/manager.go @@ -91,7 +91,8 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha return watchChan, cancelFn, nil case <-time.After(timeout): cancelFn() - return nil, nil, fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key) + return nil, nil, fmt.Errorf("etcd watch create timed out after %s for key: %s", + timeout.String(), key) } } diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index 1b8b96c70f..1eae166044 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -27,18 +27,18 @@ import ( "testing" "time" - "github.com/m3db/m3/src/cluster/mocks" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/integration" "golang.org/x/net/context" + + "github.com/m3db/m3/src/cluster/mocks" ) func TestWatchChan(t *testing.T) { - wh, ec, _, _, _, closer := testSetup(t) + wh, ec, _, _, _, closer := testSetup(t) //nolint:dogsled defer closer() wc, _, err := wh.watchChanWithTimeout("foo", 0) diff --git a/src/cluster/kv/etcd/store.go b/src/cluster/kv/etcd/store.go index a144dcc541..bd119fbb3c 100644 --- a/src/cluster/kv/etcd/store.go +++ b/src/cluster/kv/etcd/store.go @@ -33,9 +33,9 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/retry" - "go.etcd.io/etcd/clientv3" "github.com/golang/protobuf/proto" "github.com/uber-go/tally" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "golang.org/x/net/context" ) diff --git a/src/cluster/services/heartbeat/etcd/store.go b/src/cluster/services/heartbeat/etcd/store.go index 35fcdcb48c..b4d412ecf3 100644 --- a/src/cluster/services/heartbeat/etcd/store.go +++ b/src/cluster/services/heartbeat/etcd/store.go @@ -35,9 +35,9 @@ import ( "github.com/m3db/m3/src/x/retry" "github.com/m3db/m3/src/x/watch" - "go.etcd.io/etcd/clientv3" "github.com/golang/protobuf/proto" "github.com/uber-go/tally" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "golang.org/x/net/context" ) From 3f67cf80a35289a080ba9b7b9c2a0ca230008469 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Mon, 14 Dec 2020 12:14:15 +0200 Subject: [PATCH 8/8] check least significant byte of the error code --- src/dbnode/generated/thrift/rpc.thrift | 4 ++-- src/dbnode/network/server/tchannelthrift/errors/errors.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index feae823e45..fe3c84ef56 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -33,8 +33,8 @@ enum ErrorType { } enum ErrorCode { - NONE, - RESOURCE_EXHAUSTED + NONE = 0x00, + RESOURCE_EXHAUSTED = 0x01 } exception Error { diff --git a/src/dbnode/network/server/tchannelthrift/errors/errors.go b/src/dbnode/network/server/tchannelthrift/errors/errors.go index 728081a728..5ea9e3eec9 100644 --- a/src/dbnode/network/server/tchannelthrift/errors/errors.go +++ b/src/dbnode/network/server/tchannelthrift/errors/errors.go @@ -46,7 +46,8 @@ func IsBadRequestError(err *rpc.Error) bool { // IsResourceExhaustedError returns whether the error is a resource exhausted error. func IsResourceExhaustedError(err *rpc.Error) bool { - return err != nil && err.Code == rpc.ErrorCode_RESOURCE_EXHAUSTED + lowestByteMask := rpc.ErrorCode(0xff) + return err != nil && err.Code&lowestByteMask == rpc.ErrorCode_RESOURCE_EXHAUSTED } // NewInternalError creates a new internal error