From dbaf491e6942018622a9b77053837646711e3b6b Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 22 Dec 2022 15:15:50 +0800 Subject: [PATCH] move row hint into ranges Signed-off-by: you06 --- distsql/request_builder.go | 30 +++++--- distsql/request_builder_test.go | 26 +++---- executor/builder.go | 8 +-- kv/kv.go | 34 ++++++--- store/copr/BUILD.bazel | 1 + store/copr/copr_test/coprocessor_test.go | 62 ++++++++-------- store/copr/coprocessor.go | 80 +++++++++++++++------ store/copr/coprocessor_test.go | 92 +++++++++++++++--------- store/copr/key_ranges.go | 15 ++-- tablecodec/tablecodec.go | 2 +- 10 files changed, 221 insertions(+), 129 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 09ab4094ab732..0626f98389ea9 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -131,17 +131,16 @@ func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementCon // SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles // "handles" to "KeyRanges" firstly. func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder { - var keyRanges []kv.KeyRange - keyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles) - builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges) + keyRanges, hints := TableHandlesToKVRanges(tid, handles) + builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints) return builder } // SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges. // handles in slice must be kv.PartitionHandle. func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder { - keyRanges := PartitionHandlesToKVRanges(handles) - builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges) + keyRanges, hints := PartitionHandlesToKVRanges(handles) + builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints) return builder } @@ -194,6 +193,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui return builder } +// SetKeyRangesWithHints sets "KeyRanges" for "kv.Request" with row count hints. +func (builder *RequestBuilder) SetKeyRangesWithHints(keyRanges []kv.KeyRange, hints []int) *RequestBuilder { + builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints) + return builder +} + // SetWrappedKeyRanges sets "KeyRanges" for "kv.Request". func (builder *RequestBuilder) SetWrappedKeyRanges(keyRanges *kv.KeyRanges) *RequestBuilder { builder.Request.KeyRanges = keyRanges @@ -543,7 +548,7 @@ func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc // For continuous handles, we should merge them to a single key range. func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []int) { krs := make([]kv.KeyRange, 0, len(handles)) - hint := make([]int, 0, len(handles)) + hints := make([]int, 0, len(handles)) i := 0 for i < len(handles) { if commonHandle, ok := handles[i].(*kv.CommonHandle); ok { @@ -552,7 +557,7 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in EndKey: tablecodec.EncodeRowKey(tid, kv.Key(commonHandle.Encoded()).Next()), } krs = append(krs, ran) - hint = append(hint, 1) + hints = append(hints, 1) i++ continue } @@ -568,16 +573,17 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in startKey := tablecodec.EncodeRowKey(tid, low) endKey := tablecodec.EncodeRowKey(tid, high) krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) - hint = append(hint, j-i) + hints = append(hints, j-i) i = j } - return krs, hint + return krs, hints } // PartitionHandlesToKVRanges convert ParitionHandles to kv ranges. // Handle in slices must be kv.PartitionHandle -func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange { +func PartitionHandlesToKVRanges(handles []kv.Handle) ([]kv.KeyRange, []int) { krs := make([]kv.KeyRange, 0, len(handles)) + hints := make([]int, 0, len(handles)) i := 0 for i < len(handles) { ph := handles[i].(kv.PartitionHandle) @@ -589,6 +595,7 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange { EndKey: tablecodec.EncodeRowKey(pid, append(commonHandle.Encoded(), 0)), } krs = append(krs, ran) + hints = append(hints, 1) i++ continue } @@ -607,9 +614,10 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange { startKey := tablecodec.EncodeRowKey(pid, low) endKey := tablecodec.EncodeRowKey(pid, high) krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) + hints = append(hints, j-i) i = j } - return krs + return krs, hints } // IndexRangesToKVRanges converts index ranges to "KeyRange". diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index fa55229e36fa5..1f0696f964f5b 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -61,10 +61,11 @@ func TestTableHandlesToKVRanges(t *testing.T) { // Build key ranges. expect := getExpectedRanges(1, hrs) - actual, _ := TableHandlesToKVRanges(1, handles) + actual, hints := TableHandlesToKVRanges(1, handles) // Compare key ranges and expected key ranges. require.Equal(t, len(expect), len(actual)) + require.Equal(t, hints, []int{1, 4, 2, 1, 2}) for i := range actual { require.Equal(t, expect[i].StartKey, actual[i].StartKey) require.Equal(t, expect[i].EndKey, actual[i].EndKey) @@ -378,7 +379,7 @@ func TestRequestBuilder3(t *testing.T) { Tp: 103, StartTs: 0x0, Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0}, - KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{ + KeyRanges: kv.NewNonParitionedKeyRangesWithHint([]kv.KeyRange{ { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, @@ -395,17 +396,16 @@ func TestRequestBuilder3(t *testing.T) { StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64}, EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65}, }, - }), - Cacheable: true, - KeepOrder: false, - Desc: false, - Concurrency: variable.DefDistSQLScanConcurrency, - IsolationLevel: 0, - Priority: 0, - NotFillCache: false, - ReplicaRead: kv.ReplicaReadLeader, - ReadReplicaScope: kv.GlobalReplicaScope, - FixedRowCountHint: []int{1, 4, 2, 1}, + }, []int{1, 4, 2, 1}), + Cacheable: true, + KeepOrder: false, + Desc: false, + Concurrency: variable.DefDistSQLScanConcurrency, + IsolationLevel: 0, + Priority: 0, + NotFillCache: false, + ReplicaRead: kv.ReplicaReadLeader, + ReadReplicaScope: kv.GlobalReplicaScope, } expect.Paging.MinPagingSize = paging.MinPagingSize expect.Paging.MaxPagingSize = paging.MaxPagingSize diff --git a/executor/builder.go b/executor/builder.go index d4270397eecd0..dfc1fd52805cc 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -4219,13 +4219,13 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte continue } handle := kv.IntHandle(content.keys[0].GetInt64()) - tmp, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle}) - kvRanges = append(kvRanges, tmp...) + ranges, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle}) + kvRanges = append(kvRanges, ranges...) } } else { for _, p := range usedPartitionList { - tmp, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles) - kvRanges = append(kvRanges, tmp...) + ranges, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles) + kvRanges = append(kvRanges, ranges...) } } diff --git a/kv/kv.go b/kv/kv.go index 38243aa13db08..f394d4a3f9bac 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -340,25 +340,41 @@ func (t StoreType) Name() string { // KeyRanges wrap the ranges for partitioned table cases. // We might send ranges from different in the one request. type KeyRanges struct { - ranges [][]KeyRange + ranges [][]KeyRange + rowCountHints [][]int isPartitioned bool } // NewPartitionedKeyRanges constructs a new RequestRange for partitioned table. func NewPartitionedKeyRanges(ranges [][]KeyRange) *KeyRanges { + return NewPartitionedKeyRangesWithHints(ranges, nil) +} + +// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table. +func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges { + return NewNonParitionedKeyRangesWithHint(ranges, nil) +} + +// NewPartitionedKeyRangesWithHints constructs a new RequestRange for partitioned table with row count hint. +func NewPartitionedKeyRangesWithHints(ranges [][]KeyRange, hints [][]int) *KeyRanges { return &KeyRanges{ ranges: ranges, + rowCountHints: hints, isPartitioned: true, } } -// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table. -func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges { - return &KeyRanges{ +// NewNonParitionedKeyRangesWithHint constructs a new RequestRange for a non partitioned table with rou count hint. +func NewNonParitionedKeyRangesWithHint(ranges []KeyRange, hints []int) *KeyRanges { + rr := &KeyRanges{ ranges: [][]KeyRange{ranges}, isPartitioned: false, } + if hints != nil { + rr.rowCountHints = [][]int{hints} + } + return rr } // FirstPartitionRange returns the the result of first range. @@ -416,9 +432,13 @@ func (rr *KeyRanges) SortByFunc(sortFunc func(i, j KeyRange) bool) { } // ForEachPartitionWithErr runs the func for each partition with an error check. -func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange) error) (err error) { +func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange, []int) error) (err error) { for i := range rr.ranges { - err = theFunc(rr.ranges[i]) + var hints []int + if len(rr.rowCountHints) > i { + hints = rr.rowCountHints[i] + } + err = theFunc(rr.ranges[i], hints) if err != nil { return err } @@ -535,8 +555,6 @@ type Request struct { } // RequestSource indicates whether the request is an internal request. RequestSource util.RequestSource - // FixedRowCountHint is the optimization hint for copr request for task scheduling. - FixedRowCountHint []int // StoreBatchSize indicates the batch size of coprocessor in the same store. StoreBatchSize int } diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index f6cbe57efa2d7..39db5660f7760 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -77,6 +77,7 @@ go_test( "//store/driver/backoff", "//testkit/testsetup", "//util/paging", + "//util/trxevents", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/coprocessor", "@com_github_pingcap_kvproto//pkg/mpp", diff --git a/store/copr/copr_test/coprocessor_test.go b/store/copr/copr_test/coprocessor_test.go index a54c5048e12cb..7931fb8432675 100644 --- a/store/copr/copr_test/coprocessor_test.go +++ b/store/copr/copr_test/coprocessor_test.go @@ -41,11 +41,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { vars := kv.NewVariables(&killed) opt := &kv.ClientSendOption{} + ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req := &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, copr.CopSmallTaskRow}, - Concurrency: 15, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, copr.CopSmallTaskRow}), + Concurrency: 15, } it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -55,11 +55,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { require.Equal(t, smallConc, 1) require.Equal(t, rateLimit.GetCapacity(), 2) + ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, 3}, - Concurrency: 15, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}), + Concurrency: 15, } it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -70,11 +70,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { require.Equal(t, rateLimit.GetCapacity(), 3) // cross-region long range + ranges = copr.BuildKeyRanges("a", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")), - FixedRowCountHint: []int{10}, - Concurrency: 15, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{10}), + Concurrency: 15, } it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -84,11 +84,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) { require.Equal(t, smallConc, 2) require.Equal(t, rateLimit.GetCapacity(), 3) + ranges = copr.BuildKeyRanges("a", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")), - FixedRowCountHint: []int{copr.CopSmallTaskRow + 1}, - Concurrency: 15, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{copr.CopSmallTaskRow + 1}), + Concurrency: 15, } it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -115,12 +115,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { vars := kv.NewVariables(&killed) opt := &kv.ClientSendOption{} + ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req := &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, 3}, - Concurrency: 15, - StoreBatchSize: 1, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}), + Concurrency: 15, + StoreBatchSize: 1, } it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -131,12 +131,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1) require.Equal(t, tasks[1].RowCountHint, 9) + ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, 3}, - Concurrency: 15, - StoreBatchSize: 3, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}), + Concurrency: 15, + StoreBatchSize: 3, } it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) require.Nil(t, errRes) @@ -146,12 +146,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { require.Equal(t, tasks[0].RowCountHint, 14) // paging will disable store batch. + ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z") req = &kv.Request{ - Tp: kv.ReqTypeDAG, - KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")), - FixedRowCountHint: []int{1, 1, 3, 3}, - Concurrency: 15, - StoreBatchSize: 3, + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}), + Concurrency: 15, + StoreBatchSize: 3, Paging: struct { Enable bool MinPagingSize uint64 diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 390c5ffe8e63a..01476536267c7 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -112,7 +112,6 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars if req.StoreType == kv.TiDB { // coprocessor on TiDB doesn't support paging req.Paging.Enable = false - req.FixedRowCountHint = nil } if req.Tp != kv.ReqTypeDAG { // coprocessor request but type is not DAG @@ -125,13 +124,6 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } } }) - if req.RequestSource.RequestSourceInternal || req.Tp != kv.ReqTypeDAG { - // disable extra concurrency for internal tasks. - req.FixedRowCountHint = nil - } - failpoint.Inject("disableFixedRowCountHint", func(_ failpoint.Value) { - req.FixedRowCountHint = nil - }) if req.Tp != kv.ReqTypeDAG || req.StoreType != kv.TiKV { req.StoreBatchSize = 0 } @@ -150,9 +142,19 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars tasks []*copTask err error ) - buildTaskFunc := func(ranges []kv.KeyRange) error { + tryRowHint := optRowHint(req) + buildOpt := &buildCopTaskOpt{ + req: req, + cache: c.store.GetRegionCache(), + eventCb: eventCb, + respChan: req.KeepOrder, + } + buildTaskFunc := func(ranges []kv.KeyRange, hints []int) error { keyRanges := NewKeyRanges(ranges) - tasksFromRanges, err := buildCopTasks(bo, c.store.GetRegionCache(), keyRanges, req, eventCb) + if tryRowHint { + buildOpt.rowHints = hints + } + tasksFromRanges, err := buildCopTasks(bo, keyRanges, buildOpt) if err != nil { return err } @@ -194,7 +196,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars if it.concurrency > len(tasks) { it.concurrency = len(tasks) } - if req.FixedRowCountHint != nil { + if tryRowHint { var smallTasks int smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks) if len(tasks)-smallTasks < it.concurrency { @@ -292,13 +294,21 @@ func (r *copTask) ToPBBatchTasks() []*coprocessor.StoreBatchTask { // rangesPerTask limits the length of the ranges slice sent in one copTask. const rangesPerTask = 25000 -func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) { +type buildCopTaskOpt struct { + req *kv.Request + cache *RegionCache + eventCb trxevents.EventCallback + respChan bool + rowHints []int +} + +func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*copTask, error) { + req, cache, eventCb, hints := opt.req, opt.cache, opt.eventCb, opt.rowHints start := time.Now() cmdType := tikvrpc.CmdCop if req.StoreType == kv.TiDB { return buildTiDBMemCopTasks(ranges, req) } - rangesLen := ranges.Len() // TODO(youjiali1995): is there any request type that needn't be splitted by buckets? @@ -336,31 +346,30 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv nextI := mathutil.Min(i+rangesPerTask, rLen) hint := -1 // calculate the row count hint - if req.FixedRowCountHint != nil { - startKey, endKey := loc.Ranges.At(i).StartKey, loc.Ranges.At(nextI-1).EndKey + if hints != nil { + startKey, endKey := loc.Ranges.RefAt(i).StartKey, loc.Ranges.RefAt(nextI-1).EndKey // move to the previous range if startKey of current range is lower than endKey of previous location. // In the following example, task1 will move origRangeIdx to region(i, z). // When counting the row hint for task2, we need to move origRangeIdx back to region(a, h). // |<- region(a, h) ->| |<- region(i, z) ->| // |<- task1 ->| |<- task2 ->| ... - if origRangeIdx > 0 && ranges.At(origRangeIdx-1).EndKey.Cmp(startKey) > 0 { + if origRangeIdx > 0 && ranges.RefAt(origRangeIdx-1).EndKey.Cmp(startKey) > 0 { origRangeIdx-- } hint = 0 for nextOrigRangeIdx := origRangeIdx; nextOrigRangeIdx < ranges.Len(); nextOrigRangeIdx++ { - rangeStart := ranges.At(nextOrigRangeIdx).StartKey + rangeStart := ranges.RefAt(nextOrigRangeIdx).StartKey if rangeStart.Cmp(endKey) > 0 { origRangeIdx = nextOrigRangeIdx break } - hint += req.FixedRowCountHint[nextOrigRangeIdx] + hint += hints[nextOrigRangeIdx] } } task := &copTask{ region: loc.Location.Region, bucketsVer: loc.getBucketVersion(), ranges: loc.Ranges.Slice(i, nextI), - respChan: make(chan *copResponse, chanSize), cmdType: cmdType, storeType: req.StoreType, eventCb: eventCb, @@ -369,6 +378,10 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv requestSource: req.RequestSource, RowCountHint: hint, } + // only keep-order need chan inside task. + if req.KeepOrder { + task.respChan = make(chan *copResponse, chanSize) + } if err = builder.handle(task); err != nil { return nil, err } @@ -1221,7 +1234,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R return nil, errors.Trace(err) } // We may meet RegionError at the first packet, but not during visiting the stream. - remains, err := buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) + remains, err := buildCopTasks(bo, task.ranges, &buildCopTaskOpt{ + req: worker.req, + cache: worker.store.GetRegionCache(), + respChan: false, + eventCb: task.eventCb, + }) if err != nil { return remains, err } @@ -1369,7 +1387,12 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp if err := bo.Backoff(tikv.BoRegionMiss(), errors.New(errStr)); err != nil { return nil, errors.Trace(err) } - remains, err := buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) + remains, err := buildCopTasks(bo, task.ranges, &buildCopTaskOpt{ + req: worker.req, + cache: worker.store.GetRegionCache(), + respChan: false, + eventCb: task.eventCb, + }) if err != nil { return nil, err } @@ -1803,3 +1826,18 @@ func BuildKeyRanges(keys ...string) []kv.KeyRange { } return ranges } + +func optRowHint(req *kv.Request) bool { + opt := true + if req.StoreType == kv.TiDB { + return false + } + if req.RequestSource.RequestSourceInternal || req.Tp != kv.ReqTypeDAG { + // disable extra concurrency for internal tasks. + return false + } + failpoint.Inject("disableFixedRowCountHint", func(_ failpoint.Value) { + opt = false + }) + return opt +} diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 7790e8f7661fc..82cfcfb4d0496 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -22,11 +22,21 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/util/paging" + "github.com/pingcap/tidb/util/trxevents" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" ) +func buildTestCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) { + return buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + eventCb: eventCb, + respChan: true, + }) +} + func TestBuildTasksWithoutBuckets(t *testing.T) { // nil --- 'g' --- 'n' --- 't' --- nil // <- 0 -> <- 1 -> <- 2 -> <- 3 -> @@ -50,49 +60,49 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { req := &kv.Request{} flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[1], 0, "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") taskEqual(t, tasks[1], regionIDs[1], 0, "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") taskEqual(t, tasks[1], regionIDs[1], 0, "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil) require.NoError(t, err) require.Len(t, tasks, 4) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") @@ -100,7 +110,7 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { taskEqual(t, tasks[2], regionIDs[2], 0, "n", "t") taskEqual(t, tasks[3], regionIDs[3], 0, "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 4) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "g") @@ -108,45 +118,45 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { taskEqual(t, tasks[2], regionIDs[2], 0, "n", "t") taskEqual(t, tasks[3], regionIDs[3], 0, "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 1) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") taskEqual(t, tasks[1], regionIDs[2], 0, "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "g", "n") taskEqual(t, tasks[1], regionIDs[2], 0, "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "h", "k", "m", "n") taskEqual(t, tasks[1], regionIDs[2], 0, "n", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[1], 0, "h", "k", "m", "n") @@ -191,7 +201,7 @@ func TestBuildTasksByBuckets(t *testing.T) { } for _, regionRange := range regionRanges { regionID, ranges := regionRange.regionID, regionRange.ranges - tasks, err := buildCopTasks(bo, cache, buildCopRanges(ranges...), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges(ranges...), req, nil) require.NoError(t, err) require.Len(t, tasks, len(ranges)/2) for i, task := range tasks { @@ -204,7 +214,7 @@ func TestBuildTasksByBuckets(t *testing.T) { for _, regionRange := range regionRanges { allRanges = append(allRanges, regionRange.ranges...) } - tasks, err := buildCopTasks(bo, cache, buildCopRanges(allRanges...), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges(allRanges...), req, nil) require.NoError(t, err) require.Len(t, tasks, len(allRanges)/2) taskIdx := 0 @@ -230,7 +240,7 @@ func TestBuildTasksByBuckets(t *testing.T) { "h", "i", "j", "k", "k", "l", "m", "n", } - tasks, err = buildCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) require.NoError(t, err) require.Len(t, tasks, len(keyRanges)/4) for i, task := range tasks { @@ -251,7 +261,7 @@ func TestBuildTasksByBuckets(t *testing.T) { {"c", "d", "e", "g"}, {"g", "h", "i", "j"}, } - tasks, err = buildCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges(keyRanges...), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -277,7 +287,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'n'}, {'q'}, {'r'}, {'t'}, {'u'}, {'v'}, {'x'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("n", "o", "p", "q", "s", "w"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("n", "o", "p", "q", "s", "w"), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -301,7 +311,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'q'}, {'s'}, {'u'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("n", "o", "p", "s", "t", "v", "w", "x"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("n", "o", "p", "s", "t", "v", "w", "x"), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -321,7 +331,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'g'}, {'t'}, {'z'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("o", "p", "u", "w"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("o", "p", "u", "w"), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -343,7 +353,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'n'}, {'q'}, {'r'}, {'x'}}, regionIDs[1]) cache = NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() - tasks, err = buildCopTasks(bo, cache, buildCopRanges("n", "x"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("n", "x"), req, nil) require.NoError(t, err) require.Len(t, tasks, len(expectedTaskRanges)) for i, task := range tasks { @@ -432,7 +442,7 @@ func TestRebuild(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) require.NoError(t, err) require.Len(t, tasks, 2) taskEqual(t, tasks[0], regionIDs[0], 0, "a", "m") @@ -446,7 +456,7 @@ func TestRebuild(t *testing.T) { cache.InvalidateCachedRegion(tasks[1].region) req.Desc = true - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) + tasks, err = buildTestCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) require.NoError(t, err) require.Len(t, tasks, 3) taskEqual(t, tasks[2], regionIDs[0], 0, "a", "m") @@ -501,7 +511,7 @@ func TestBuildPagingTasks(t *testing.T) { req.Paging.MinPagingSize = paging.MinPagingSize flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) require.Len(t, tasks, 1) @@ -674,8 +684,12 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} - req.FixedRowCountHint = []int{1, 1, 3, CopSmallTaskRow} - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z"), req, nil) + ranges := buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z") + tasks, err := buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + rowHints: []int{1, 1, 3, CopSmallTaskRow}, + }) require.Nil(t, err) require.Equal(t, len(tasks), 4) // task[0] ["a"-"c", "d"-"e"] @@ -689,8 +703,12 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { _, conc := smallTaskConcurrency(tasks) require.Equal(t, conc, 1) - req.FixedRowCountHint = []int{1, 1, 3, 3} - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z"), req, nil) + ranges = buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z") + tasks, err = buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + rowHints: []int{1, 1, 3, 3}, + }) require.Nil(t, err) require.Equal(t, len(tasks), 4) // task[0] ["a"-"c", "d"-"e"] @@ -705,8 +723,12 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, conc, 2) // cross-region long range - req.FixedRowCountHint = []int{10} - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) + ranges = buildCopRanges("a", "z") + tasks, err = buildCopTasks(bo, ranges, &buildCopTaskOpt{ + req: req, + cache: cache, + rowHints: []int{10}, + }) require.Nil(t, err) require.Equal(t, len(tasks), 4) // task[0] ["a"-"g"] diff --git a/store/copr/key_ranges.go b/store/copr/key_ranges.go index 86dcf036fed4e..67effbbc8b7a1 100644 --- a/store/copr/key_ranges.go +++ b/store/copr/key_ranges.go @@ -58,18 +58,23 @@ func (r *KeyRanges) Len() int { return l } -// At returns the range at the ith position. -func (r *KeyRanges) At(i int) kv.KeyRange { +// RefAt returns the reference at the ith position without copy. +func (r *KeyRanges) RefAt(i int) *kv.KeyRange { if r.first != nil { if i == 0 { - return *r.first + return r.first } i-- } if i < len(r.mid) { - return r.mid[i] + return &r.mid[i] } - return *r.last + return r.last +} + +// At returns the range at the ith position. +func (r *KeyRanges) At(i int) kv.KeyRange { + return *r.RefAt(i) } // Slice returns the sub ranges [from, to). diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index c757bb4f82aa0..ff547b0d06419 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1725,7 +1725,7 @@ func IndexKVIsUnique(value []byte) bool { // VerifyTableIDForRanges verifies that all given ranges are valid to decode the table id. func VerifyTableIDForRanges(keyRanges *kv.KeyRanges) ([]int64, error) { tids := make([]int64, 0, keyRanges.PartitionNum()) - collectFunc := func(ranges []kv.KeyRange) error { + collectFunc := func(ranges []kv.KeyRange, _ []int) error { if len(ranges) == 0 { return nil }