Skip to content

Commit

Permalink
[pick-7.5]region_request: ignore resource group errors that not relat…
Browse files Browse the repository at this point in the history
…ive storage layer (tikv#1354) (tikv#1462)

ref tikv#1322

Signed-off-by: nolouch <[email protected]>
Signed-off-by: crazycs <[email protected]>

Co-authored-by: crazycs <[email protected]>
  • Loading branch information
nolouch and crazycs520 authored Sep 20, 2024
1 parent db90b0d commit 3725b31
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 12 deletions.
47 changes: 35 additions & 12 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"github.com/tikv/pd/client/errs"
pderr "github.com/tikv/pd/client/errs"
)

Expand Down Expand Up @@ -173,6 +174,17 @@ func (r *RequestErrorStats) RecordRPCErrorStats(errLabel string) {
}
}

// getErrMsg returns error message. if the error has cause error, then return cause error message.
func getErrMsg(err error) string {
if err == nil {
return ""
}
if causeErr := errors.Cause(err); causeErr != nil {
return causeErr.Error()
}
return err.Error()
}

// String implements fmt.Stringer interface.
func (r *RegionRequestRuntimeStats) String() string {
if r == nil {
Expand Down Expand Up @@ -1954,9 +1966,11 @@ func (s *RegionRequestSender) sendReqToRegion(
}

if err != nil {
s.rpcError = err
if isRPCError(err) {
s.rpcError = err
}
if s.Stats != nil {
errStr := errors.Cause(err).Error()
errStr := getErrMsg(err)
s.Stats.RecordRPCErrorStats(errStr)
s.recordRPCAccessInfo(req, rpcCtx, errStr)
}
Expand All @@ -1981,6 +1995,11 @@ func (s *RegionRequestSender) sendReqToRegion(
return
}

func isRPCError(err error) bool {
// exclude ErrClientResourceGroupThrottled
return err != nil && errs.ErrClientResourceGroupThrottled.NotEqual(err)
}

func storeIDLabel(rpcCtx *RPCContext) string {
if rpcCtx != nil && rpcCtx.Store != nil {
return strconv.FormatUint(rpcCtx.Store.storeID, 10)
Expand Down Expand Up @@ -2043,16 +2062,10 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal from remote", zap.Error(err))
}
}
metrics.TiKVRPCErrorCounter.WithLabelValues(errors.Cause(err).Error(), storeLabel).Inc()

if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute {
s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err)
} else if ctx.Meta != nil {
if s.replicaSelector != nil {
s.replicaSelector.onSendFailure(bo, err)
} else {
s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err)
}
if errStr := getErrMsg(err); len(errStr) > 0 {
metrics.TiKVRPCErrorCounter.WithLabelValues(getErrMsg(err), storeLabel).Inc()
} else {
metrics.TiKVRPCErrorCounter.WithLabelValues("unknown", storeLabel).Inc()
}

// don't need to retry for ResourceGroup error
Expand All @@ -2067,6 +2080,16 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
return err
}

if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute {
s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err)
} else if ctx.Meta != nil {
if s.replicaSelector != nil {
s.replicaSelector.onSendFailure(bo, err)
} else {
s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err)
}
}

// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
Expand Down
54 changes: 54 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/apicodec"
Expand All @@ -62,6 +63,7 @@ import (
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/tikvrpc"
pderr "github.com/tikv/pd/client/errs"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -145,6 +147,41 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() {
}()
}

func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrottled() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
})
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)

// test ErrClientResourceGroupThrottled handled by regionRequestSender
func() {
oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()
s.regionRequestSender.regionCache.storeMu.Lock()
storeOld := s.regionRequestSender.regionCache.storeMu.stores[1]
s.regionRequestSender.regionCache.storeMu.Unlock()
epoch := storeOld.epoch
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
return nil, pderr.ErrClientResourceGroupThrottled
}}
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
_, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.NotNil(err)
s.regionRequestSender.regionCache.storeMu.Lock()
storeNew := s.regionRequestSender.regionCache.storeMu.stores[1]
s.regionRequestSender.regionCache.storeMu.Unlock()
// not mark the store need be refill, then the epoch should not be changed.
s.Equal(epoch, storeNew.epoch)
// no rpc error if the error is ErrClientResourceGroupThrottled
s.Nil(s.regionRequestSender.rpcError)
}()
}

func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Expand Down Expand Up @@ -827,3 +864,20 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestStats() {
}
s.Contains(expecteds, access.String())
}

type noCauseError struct {
error
}

func (_ noCauseError) Cause() error {
return nil
}

func TestGetErrMsg(t *testing.T) {
err := noCauseError{error: errors.New("no cause err")}
require.Equal(t, nil, errors.Cause(err))
require.Panicsf(t, func() {
_ = errors.Cause(err).Error()
}, "should panic")
require.Equal(t, "no cause err", getErrMsg(err))
}

0 comments on commit 3725b31

Please sign in to comment.