From 3777c384feb13fcaf7c77378fa4ded888a34f6e4 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Fri, 13 Sep 2024 17:05:12 +0800 Subject: [PATCH] [pick-8.1] region_request: ignore resource group errors that not relative storage layer (#1354) (#1463) ref tikv/client-go#1322 Signed-off-by: nolouch --- internal/locate/region_request.go | 30 +++++++++++++++--------- internal/locate/region_request_test.go | 32 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 8bb277d43..a5b207d2b 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -65,6 +65,7 @@ import ( "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" + "github.com/tikv/pd/client/errs" pderr "github.com/tikv/pd/client/errs" ) @@ -2124,7 +2125,9 @@ func (s *RegionRequestSender) sendReqToRegion( } if err != nil { - s.rpcError = err + if isRPCError(err) { + s.rpcError = err + } if s.Stats != nil { errStr := getErrMsg(err) s.Stats.RecordRPCErrorStats(errStr) @@ -2151,6 +2154,11 @@ func (s *RegionRequestSender) sendReqToRegion( return } +func isRPCError(err error) bool { + // exclude ErrClientResourceGroupThrottled + return err != nil && errs.ErrClientResourceGroupThrottled.NotEqual(err) +} + func storeIDLabel(rpcCtx *RPCContext) string { if rpcCtx != nil && rpcCtx.Store != nil { return strconv.FormatUint(rpcCtx.Store.storeID, 10) @@ -2228,16 +2236,6 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r metrics.TiKVRPCErrorCounter.WithLabelValues("unknown", storeLabel).Inc() } - if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute { - s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err) - } else if ctx.Meta != nil { - if s.replicaSelector != nil { - s.replicaSelector.onSendFailure(bo, err) - } else { - s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err) - } - } - // don't need to retry for ResourceGroup error if errors.Is(err, pderr.ErrClientResourceGroupThrottled) { return err @@ -2250,6 +2248,16 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r return err } + if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute { + s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err) + } else if ctx.Meta != nil { + if s.replicaSelector != nil { + s.replicaSelector.onSendFailure(bo, err) + } else { + s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err) + } + } + // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. // TODO: the number of retry time should be limited:since region may be unavailable diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 230bb9a51..5a66645f7 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -65,6 +65,7 @@ import ( "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/tikvrpc" + pderr "github.com/tikv/pd/client/errs" "google.golang.org/grpc" ) @@ -160,6 +161,37 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() { }() } +func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrottled() { + req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ + Key: []byte("key"), + Value: []byte("value"), + }) + region, err := s.cache.LocateRegionByID(s.bo, s.region) + s.Nil(err) + s.NotNil(region) + + // test ErrClientResourceGroupThrottled handled by regionRequestSender + func() { + oc := s.regionRequestSender.client + defer func() { + s.regionRequestSender.client = oc + }() + storeOld, _ := s.regionRequestSender.regionCache.stores.get(1) + epoch := storeOld.epoch + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + return nil, pderr.ErrClientResourceGroupThrottled + }} + bo := retry.NewBackofferWithVars(context.Background(), 5, nil) + _, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) + s.NotNil(err) + storeNew, _ := s.regionRequestSender.regionCache.stores.get(1) + // not mark the store need be refill, then the epoch should not be changed. + s.Equal(epoch, storeNew.epoch) + // no rpc error if the error is ErrClientResourceGroupThrottled + s.Nil(s.regionRequestSender.rpcError) + }() +} + func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"),