Skip to content

Commit

Permalink
*: make Backoff perceive the Killed flag to fix MAX_EXECUTION_TIME #1…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Feb 4, 2020
1 parent 0c426b1 commit 80d515c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 3 deletions.
11 changes: 9 additions & 2 deletions kv/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,25 @@ type Variables struct {

// Hook is used for test to verify the variable take effect.
Hook func(name string, vars *Variables)

// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
Killed *uint32
}

// NewVariables create a new Variables instance with default values.
func NewVariables() *Variables {
func NewVariables(killed *uint32) *Variables {
return &Variables{
BackoffLockFast: DefBackoffLockFast,
BackOffWeight: DefBackOffWeight,
Killed: killed,
}
}

var ignoreKill uint32

// DefaultVars is the default variables instance.
var DefaultVars = NewVariables()
var DefaultVars = NewVariables(&ignoreKill)

// Default values
const (
Expand Down
20 changes: 20 additions & 0 deletions session/session_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ package session_test

import (
"context"
"sync/atomic"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -126,3 +128,21 @@ func (s *testSessionSuite) TestRetryPreparedSleep(c *C) {

tk.MustQuery("select c1 from t").Check(testkit.Rows("21", "0"))
}

func (s *testSessionSuite) TestKillFlagInBackoff(c *C) {
// This test checks the `killed` flag is passed down to the backoffer through
// session.KVVars. It works by setting the `killed = 3` first, then using
// failpoint to run backoff() and check the vars.Killed using the Hook() function.
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table kill_backoff (id int)")
var killValue uint32
tk.Se.GetSessionVars().KVVars.Hook = func(name string, vars *kv.Variables) {
killValue = atomic.LoadUint32(vars.Killed)
}
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult", `return("callBackofferHook")`), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult")
// Set kill flag and check its passed to backoffer.
tk.Se.GetSessionVars().Killed = 3
tk.MustQuery("select * from kill_backoff")
c.Assert(killValue, Equals, uint32(3))
}
2 changes: 1 addition & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ func NewSessionVars() *SessionVars {
PreparedStmtNameToID: make(map[string]uint32),
PreparedParams: make([]types.Datum, 0, 10),
TxnCtx: &TransactionContext{},
KVVars: kv.NewVariables(),
RetryInfo: &RetryInfo{},
ActiveRoles: make([]*auth.RoleIdentity, 0, 10),
StrictSQLMode: true,
Expand All @@ -513,6 +512,7 @@ func NewSessionVars() *SessionVars {
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
}
vars.KVVars = kv.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
IndexSerialScanConcurrency: DefIndexSerialScanConcurrency,
Expand Down
7 changes: 7 additions & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"math/rand"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -322,6 +323,12 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err
backoffDuration.Observe(float64(realSleep) / 1000)
b.totalSleep += realSleep

if b.vars != nil && b.vars.Killed != nil {
if atomic.CompareAndSwapUint32(b.vars.Killed, 1, 0) {
return ErrQueryInterrupted
}
}

var startTs interface{}
if ts := b.ctx.Value(txnStartKey); ts != nil {
startTs = ts
Expand Down
4 changes: 4 additions & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (s *RegionRequestSender) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, re
GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}},
}, nil, nil)
}
case "callBackofferHook":
if bo.vars != nil && bo.vars.Hook != nil {
bo.vars.Hook("callBackofferHook", bo.vars)
}
}
})

Expand Down

0 comments on commit 80d515c

Please sign in to comment.