diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index d6a5d5293d..da7b1ab239 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -64,6 +64,7 @@ import ( "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" + "github.com/tikv/pd/client/errs" pderr "github.com/tikv/pd/client/errs" ) @@ -173,6 +174,17 @@ func (r *RequestErrorStats) RecordRPCErrorStats(errLabel string) { } } +// getErrMsg returns error message. if the error has cause error, then return cause error message. +func getErrMsg(err error) string { + if err == nil { + return "" + } + if causeErr := errors.Cause(err); causeErr != nil { + return causeErr.Error() + } + return err.Error() +} + // String implements fmt.Stringer interface. func (r *RegionRequestRuntimeStats) String() string { if r == nil { @@ -1954,9 +1966,11 @@ func (s *RegionRequestSender) sendReqToRegion( } if err != nil { - s.rpcError = err + if isRPCError(err) { + s.rpcError = err + } if s.Stats != nil { - errStr := errors.Cause(err).Error() + errStr := getErrMsg(err) s.Stats.RecordRPCErrorStats(errStr) s.recordRPCAccessInfo(req, rpcCtx, errStr) } @@ -1981,6 +1995,11 @@ func (s *RegionRequestSender) sendReqToRegion( return } +func isRPCError(err error) bool { + // exclude ErrClientResourceGroupThrottled + return err != nil && errs.ErrClientResourceGroupThrottled.NotEqual(err) +} + func storeIDLabel(rpcCtx *RPCContext) string { if rpcCtx != nil && rpcCtx.Store != nil { return strconv.FormatUint(rpcCtx.Store.storeID, 10) @@ -2043,16 +2062,10 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal from remote", zap.Error(err)) } } - metrics.TiKVRPCErrorCounter.WithLabelValues(errors.Cause(err).Error(), storeLabel).Inc() - - if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute { - s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err) - } else if ctx.Meta != nil { - if s.replicaSelector != nil { - s.replicaSelector.onSendFailure(bo, err) - } else { - s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err) - } + if errStr := getErrMsg(err); len(errStr) > 0 { + metrics.TiKVRPCErrorCounter.WithLabelValues(getErrMsg(err), storeLabel).Inc() + } else { + metrics.TiKVRPCErrorCounter.WithLabelValues("unknown", storeLabel).Inc() } // don't need to retry for ResourceGroup error @@ -2067,6 +2080,16 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r return err } + if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute { + s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err) + } else if ctx.Meta != nil { + if s.replicaSelector != nil { + s.replicaSelector.onSendFailure(bo, err) + } else { + s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err) + } + } + // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. // TODO: the number of retry time should be limited:since region may be unavailable diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 6105fe5c46..40133913f2 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/internal/apicodec" @@ -62,6 +63,7 @@ import ( "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/tikvrpc" + pderr "github.com/tikv/pd/client/errs" "google.golang.org/grpc" ) @@ -145,6 +147,41 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() { }() } +func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrottled() { + req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ + Key: []byte("key"), + Value: []byte("value"), + }) + region, err := s.cache.LocateRegionByID(s.bo, s.region) + s.Nil(err) + s.NotNil(region) + + // test ErrClientResourceGroupThrottled handled by regionRequestSender + func() { + oc := s.regionRequestSender.client + defer func() { + s.regionRequestSender.client = oc + }() + s.regionRequestSender.regionCache.storeMu.Lock() + storeOld := s.regionRequestSender.regionCache.storeMu.stores[1] + s.regionRequestSender.regionCache.storeMu.Unlock() + epoch := storeOld.epoch + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + return nil, pderr.ErrClientResourceGroupThrottled + }} + bo := retry.NewBackofferWithVars(context.Background(), 5, nil) + _, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) + s.NotNil(err) + s.regionRequestSender.regionCache.storeMu.Lock() + storeNew := s.regionRequestSender.regionCache.storeMu.stores[1] + s.regionRequestSender.regionCache.storeMu.Unlock() + // not mark the store need be refill, then the epoch should not be changed. + s.Equal(epoch, storeNew.epoch) + // no rpc error if the error is ErrClientResourceGroupThrottled + s.Nil(s.regionRequestSender.rpcError) + }() +} + func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), @@ -827,3 +864,20 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestStats() { } s.Contains(expecteds, access.String()) } + +type noCauseError struct { + error +} + +func (_ noCauseError) Cause() error { + return nil +} + +func TestGetErrMsg(t *testing.T) { + err := noCauseError{error: errors.New("no cause err")} + require.Equal(t, nil, errors.Cause(err)) + require.Panicsf(t, func() { + _ = errors.Cause(err).Error() + }, "should panic") + require.Equal(t, "no cause err", getErrMsg(err)) +}