Skip to content

Commit

Permalink
tikv: fix infinite retry when kv region continuing to return StaleCom…
Browse files Browse the repository at this point in the history
…mand error (pingcap#16481) (pingcap#16528)
  • Loading branch information
sre-bot authored Apr 23, 2020
1 parent c015874 commit 445e69b
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20200311073257-e53d835099b0
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v3.0.13-0.20200415122949-7873549f01a5+incompatible
github.com/pingcap/parser v3.0.13-0.20200422071807-70ac9ca4680d+incompatible
github.com/pingcap/pd v1.1.0-beta.0.20191223090411-ea2b748f6ee2
github.com/pingcap/tidb-tools v3.0.6-0.20191119150227-ff0a3c6e5763+incompatible
github.com/pingcap/tipb v0.0.0-20200401051346-bec3080a5428
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ github.com/pingcap/kvproto v0.0.0-20200311073257-e53d835099b0 h1:dXXNHvDwAEN1YNg
github.com/pingcap/kvproto v0.0.0-20200311073257-e53d835099b0/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v3.0.13-0.20200415122949-7873549f01a5+incompatible h1:YEfvpsi69NTq6icPNU+WLdqowCXJWLhM00UD2qyRQkc=
github.com/pingcap/parser v3.0.13-0.20200415122949-7873549f01a5+incompatible/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v3.0.13-0.20200422071807-70ac9ca4680d+incompatible h1:YxOIHiijAxm2smT47tqThaEjm8Lo9ZOM3zilsx6OpZ8=
github.com/pingcap/parser v3.0.13-0.20200422071807-70ac9ca4680d+incompatible/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v1.1.0-beta.0.20191223090411-ea2b748f6ee2 h1:NL23b8tsg6M1QpSQedK14/Jx++QeyKL2rGiBvXAQVfA=
github.com/pingcap/pd v1.1.0-beta.0.20191223090411-ea2b748f6ee2/go.mod h1:b4gaAPSxaVVtaB+EHamV4Nsv8JmTdjlw0cTKmp4+dRQ=
github.com/pingcap/tidb-tools v3.0.6-0.20191119150227-ff0a3c6e5763+incompatible h1:I8HirWsu1MZp6t9G/g8yKCEjJJxtHooKakEgccvdJ4M=
Expand Down
11 changes: 11 additions & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
tikvBackoffCounterRegionMiss = metrics.TiKVBackoffCounter.WithLabelValues("regionMiss")
tikvBackoffCounterUpdateLeader = metrics.TiKVBackoffCounter.WithLabelValues("updateLeader")
tikvBackoffCounterServerBusy = metrics.TiKVBackoffCounter.WithLabelValues("serverBusy")
tikvBackoffCounterStaleCmd = metrics.TiKVBackoffCounter.WithLabelValues("staleCommand")
tikvBackoffCounterEmpty = metrics.TiKVBackoffCounter.WithLabelValues("")
tikvBackoffHistogramRPC = metrics.TiKVBackoffHistogram.WithLabelValues("tikvRPC")
tikvBackoffHistogramLock = metrics.TiKVBackoffHistogram.WithLabelValues("txnLock")
Expand All @@ -61,6 +62,7 @@ var (
tikvBackoffHistogramRegionMiss = metrics.TiKVBackoffHistogram.WithLabelValues("regionMiss")
tikvBackoffHistogramUpdateLeader = metrics.TiKVBackoffHistogram.WithLabelValues("updateLeader")
tikvBackoffHistogramServerBusy = metrics.TiKVBackoffHistogram.WithLabelValues("serverBusy")
tikvBackoffHistogramStaleCmd = metrics.TiKVBackoffHistogram.WithLabelValues("staleCommand")
tikvBackoffHistogramEmpty = metrics.TiKVBackoffHistogram.WithLabelValues("")
)

Expand All @@ -80,6 +82,8 @@ func (t backoffType) metric() (prometheus.Counter, prometheus.Observer) {
return tikvBackoffCounterUpdateLeader, tikvBackoffHistogramUpdateLeader
case boServerBusy:
return tikvBackoffCounterServerBusy, tikvBackoffHistogramServerBusy
case boStaleCmd:
return tikvBackoffCounterStaleCmd, tikvBackoffHistogramStaleCmd
}
return tikvBackoffCounterEmpty, tikvBackoffHistogramEmpty
}
Expand Down Expand Up @@ -143,6 +147,7 @@ const (
BoRegionMiss
BoUpdateLeader
boServerBusy
boStaleCmd
)

func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int {
Expand All @@ -165,6 +170,8 @@ func (t backoffType) createFn(vars *kv.Variables) func(context.Context, int) int
return NewBackoffFn(1, 10, NoJitter)
case boServerBusy:
return NewBackoffFn(2000, 10000, EqualJitter)
case boStaleCmd:
return NewBackoffFn(2, 1000, NoJitter)
}
return nil
}
Expand All @@ -185,6 +192,8 @@ func (t backoffType) String() string {
return "updateLeader"
case boServerBusy:
return "serverBusy"
case boStaleCmd:
return "staleCommand"
}
return ""
}
Expand All @@ -201,6 +210,8 @@ func (t backoffType) TError() error {
return ErrRegionUnavailable
case boServerBusy:
return ErrTiKVServerBusy
case boStaleCmd:
return ErrTiKVStaleCommand
}
return terror.ClassTiKV.New(mysql.ErrUnknown, mysql.MySQLErrName[mysql.ErrUnknown])
}
Expand Down
1 change: 1 addition & 0 deletions store/tikv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted])
ErrLockWaitTimeout = terror.ClassTiKV.New(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrLockWaitTimeout])
ErrTokenLimit = terror.ClassTiKV.New(mysql.ErrTiKVStoreLimit, mysql.MySQLErrName[mysql.ErrTiKVStoreLimit])
ErrTiKVStaleCommand = terror.ClassTiKV.New(mysql.ErrTiKVStaleCommand, mysql.MySQLErrName[mysql.ErrTiKVStaleCommand])
)

// ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface.
Expand Down
4 changes: 4 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, regi
}
if regionErr.GetStaleCommand() != nil {
logutil.Logger(context.Background()).Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx))
err = bo.Backoff(boStaleCmd, errors.Errorf("stale command, ctx: %v", ctx))
if err != nil {
return false, errors.Trace(err)
}
return true, nil
}
if regionErr.GetRaftEntryTooLarge() != nil {
Expand Down
42 changes: 42 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -88,6 +89,47 @@ func (s *testStoreLimitSuite) TearDownTest(c *C) {
s.cache.Close()
}

type fnClient struct {
fn func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error)
}

func (f *fnClient) Close() error {
return nil
}

func (f *fnClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
return f.fn(ctx, addr, req, timeout)
}

func (s *testRegionRequestSuite) TestOnRegionError(c *C) {
req := &tikvrpc.Request{Type: tikvrpc.CmdRawPut, RawPut: &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
}}
region, err := s.cache.LocateRegionByID(s.bo, s.region)
c.Assert(err, IsNil)
c.Assert(region, NotNil)

// test stale command retry.
func() {
oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()
s.regionRequestSender.client = &fnClient{func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
staleResp := &tikvrpc.Response{Get: &kvrpcpb.GetResponse{
RegionError: &errorpb.Error{StaleCommand: &errorpb.StaleCommand{}},
}}
return staleResp, nil
}}
bo := NewBackoffer(context.Background(), 5)
resp, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
c.Assert(err, NotNil)
c.Assert(resp, IsNil)
}()

}

func (s *testStoreLimitSuite) TestStoreTokenLimit(c *C) {
req := &tikvrpc.Request{Type: tikvrpc.CmdPrewrite, Prewrite: &kvrpcpb.PrewriteRequest{}, Context: kvrpcpb.Context{}}
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
Expand Down

0 comments on commit 445e69b

Please sign in to comment.