Skip to content

Commit

Permalink
executor, store: Plumb the query max execution time to timebox the PD…
Browse files Browse the repository at this point in the history
… GetRegion grpc calls (#56923)

ref #56753
  • Loading branch information
HaoW30 authored Nov 20, 2024
1 parent 2ff351d commit c068b39
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ go_test(
embed = [":distsql"],
flaky = True,
race = "on",
shard_count = 28,
shard_count = 29,
deps = [
"//pkg/distsql/context",
"//pkg/errctx",
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type DistSQLContext struct {
LoadBasedReplicaReadThreshold time.Duration
RunawayChecker resourcegroup.RunawayChecker
TiKVClientReadTimeout uint64
MaxExecutionTime uint64

ReplicaClosestReadThreshold int64
ConnectionID uint64
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestContextDetach(t *testing.T) {
ResourceGroupName: "c",
LoadBasedReplicaReadThreshold: time.Second,
TiKVClientReadTimeout: 1,
MaxExecutionTime: 1,

ReplicaClosestReadThreshold: 1,
ConnectionID: 1,
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func (builder *RequestBuilder) SetFromSessionVars(dctx *distsqlctx.DistSQLContex
builder.Request.StoreBusyThreshold = dctx.LoadBasedReplicaReadThreshold
builder.Request.RunawayChecker = dctx.RunawayChecker
builder.Request.TiKVClientReadTimeout = dctx.TiKVClientReadTimeout
builder.Request.MaxExecutionTime = dctx.MaxExecutionTime
return builder
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,33 @@ func TestRequestBuilderTiKVClientReadTimeout(t *testing.T) {
require.Equal(t, expect, actual)
}

func TestRequestBuilderMaxExecutionTime(t *testing.T) {
dctx := NewDistSQLContextForTest()
dctx.MaxExecutionTime = 100
actual, err := (&RequestBuilder{}).
SetFromSessionVars(dctx).
Build()
require.NoError(t, err)
expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
Data: []uint8(nil),
KeyRanges: kv.NewNonPartitionedKeyRanges(nil),
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
MemTracker: (*memory.Tracker)(nil),
SchemaVar: 0,
ReadReplicaScope: kv.GlobalReplicaScope,
MaxExecutionTime: 100,
ResourceGroupName: resourcegroup.DefaultResourceGroupName,
}
expect.Paging.MinPagingSize = paging.MinPagingSize
expect.Paging.MaxPagingSize = paging.MaxPagingSize
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
}

func TestTableRangesToKVRangesWithFbs(t *testing.T) {
ranges := []*ranger.Range{
{
Expand Down
7 changes: 7 additions & 0 deletions pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"slices"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
Expand Down Expand Up @@ -235,6 +236,12 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
var indexKeys []kv.Key
var err error
batchGetter := e.batchGetter
if e.Ctx().GetSessionVars().MaxExecutionTime > 0 {
// If MaxExecutionTime is set, we need to set the context deadline for the batch get.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond)
defer cancel()
}
rc := e.Ctx().GetSessionVars().IsPessimisticReadConsistency()
if e.idxInfo != nil && !isCommonHandleRead(e.tblInfo, e.idxInfo) {
// `SELECT a, b FROM t WHERE (a, b) IN ((1, 2), (1, 2), (2, 1), (1, 2))` should not return duplicated rows
Expand Down
7 changes: 7 additions & 0 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"sort"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -672,6 +673,12 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
}
}
// if not read lock or table was unlock then snapshot get
if e.Ctx().GetSessionVars().MaxExecutionTime > 0 {
// if the query has max execution time set, we need to set the context deadline for the get request
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond)
defer cancel()
return e.snapshot.Get(ctxWithTimeout, key)
}
return e.snapshot.Get(ctx, key)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ type Request struct {
StoreBusyThreshold time.Duration
// TiKVClientReadTimeout is the timeout of kv read request
TiKVClientReadTimeout uint64
// MaxExecutionTime is the timeout of the whole query execution
MaxExecutionTime uint64

RunawayChecker resourcegroup.RunawayChecker

Expand Down
1 change: 1 addition & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2644,6 +2644,7 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext {
LoadBasedReplicaReadThreshold: vars.LoadBasedReplicaReadThreshold,
RunawayChecker: sc.RunawayChecker,
TiKVClientReadTimeout: vars.GetTiKVClientReadTimeout(),
MaxExecutionTime: vars.GetMaxExecutionTime(),

ReplicaClosestReadThreshold: vars.ReplicaClosestReadThreshold,
ConnectionID: vars.ConnectionID,
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,13 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)

if req.MaxExecutionTime > 0 {
// If MaxExecutionTime is set, we need to set the deadline for the whole batch coprocessor request context.
ctxWithTimeout, cancel := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond)
defer cancel()
bo.TiKVBackoffer().SetCtx(ctxWithTimeout)
}

var tasks []*batchCopTask
var err error
if req.PartitionIDAndRanges != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,13 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
}
})

if req.MaxExecutionTime > 0 {
// If the request has a MaxExecutionTime, we need to set the deadline of the context.
ctxWithTimeout, cancel := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond)
defer cancel()
bo.TiKVBackoffer().SetCtx(ctxWithTimeout)
}

// TODO(youjiali1995): is there any request type that needn't be split by buckets?
locs, err := cache.SplitKeyRangesByBuckets(bo, ranges)
if err != nil {
Expand Down

0 comments on commit c068b39

Please sign in to comment.