Skip to content

Commit

Permalink
move row hint into ranges
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <[email protected]>
  • Loading branch information
you06 committed Dec 22, 2022
1 parent 5c04d78 commit dbaf491
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 129 deletions.
30 changes: 19 additions & 11 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,16 @@ func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementCon
// SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles
// "handles" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder {
var keyRanges []kv.KeyRange
keyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
keyRanges, hints := TableHandlesToKVRanges(tid, handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

// SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges.
// handles in slice must be kv.PartitionHandle.
func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder {
keyRanges := PartitionHandlesToKVRanges(handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
keyRanges, hints := PartitionHandlesToKVRanges(handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

Expand Down Expand Up @@ -194,6 +193,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui
return builder
}

// SetKeyRangesWithHints sets "KeyRanges" for "kv.Request" with row count hints.
func (builder *RequestBuilder) SetKeyRangesWithHints(keyRanges []kv.KeyRange, hints []int) *RequestBuilder {
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

// SetWrappedKeyRanges sets "KeyRanges" for "kv.Request".
func (builder *RequestBuilder) SetWrappedKeyRanges(keyRanges *kv.KeyRanges) *RequestBuilder {
builder.Request.KeyRanges = keyRanges
Expand Down Expand Up @@ -543,7 +548,7 @@ func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc
// For continuous handles, we should merge them to a single key range.
func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []int) {
krs := make([]kv.KeyRange, 0, len(handles))
hint := make([]int, 0, len(handles))
hints := make([]int, 0, len(handles))
i := 0
for i < len(handles) {
if commonHandle, ok := handles[i].(*kv.CommonHandle); ok {
Expand All @@ -552,7 +557,7 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
EndKey: tablecodec.EncodeRowKey(tid, kv.Key(commonHandle.Encoded()).Next()),
}
krs = append(krs, ran)
hint = append(hint, 1)
hints = append(hints, 1)
i++
continue
}
Expand All @@ -568,16 +573,17 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
hint = append(hint, j-i)
hints = append(hints, j-i)
i = j
}
return krs, hint
return krs, hints
}

// PartitionHandlesToKVRanges convert ParitionHandles to kv ranges.
// Handle in slices must be kv.PartitionHandle
func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
func PartitionHandlesToKVRanges(handles []kv.Handle) ([]kv.KeyRange, []int) {
krs := make([]kv.KeyRange, 0, len(handles))
hints := make([]int, 0, len(handles))
i := 0
for i < len(handles) {
ph := handles[i].(kv.PartitionHandle)
Expand All @@ -589,6 +595,7 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
EndKey: tablecodec.EncodeRowKey(pid, append(commonHandle.Encoded(), 0)),
}
krs = append(krs, ran)
hints = append(hints, 1)
i++
continue
}
Expand All @@ -607,9 +614,10 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
startKey := tablecodec.EncodeRowKey(pid, low)
endKey := tablecodec.EncodeRowKey(pid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
hints = append(hints, j-i)
i = j
}
return krs
return krs, hints
}

// IndexRangesToKVRanges converts index ranges to "KeyRange".
Expand Down
26 changes: 13 additions & 13 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func TestTableHandlesToKVRanges(t *testing.T) {

// Build key ranges.
expect := getExpectedRanges(1, hrs)
actual, _ := TableHandlesToKVRanges(1, handles)
actual, hints := TableHandlesToKVRanges(1, handles)

// Compare key ranges and expected key ranges.
require.Equal(t, len(expect), len(actual))
require.Equal(t, hints, []int{1, 4, 2, 1, 2})
for i := range actual {
require.Equal(t, expect[i].StartKey, actual[i].StartKey)
require.Equal(t, expect[i].EndKey, actual[i].EndKey)
Expand Down Expand Up @@ -378,7 +379,7 @@ func TestRequestBuilder3(t *testing.T) {
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{
KeyRanges: kv.NewNonParitionedKeyRangesWithHint([]kv.KeyRange{
{
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Expand All @@ -395,17 +396,16 @@ func TestRequestBuilder3(t *testing.T) {
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64},
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65},
},
}),
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
FixedRowCountHint: []int{1, 4, 2, 1},
}, []int{1, 4, 2, 1}),
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
expect.Paging.MinPagingSize = paging.MinPagingSize
expect.Paging.MaxPagingSize = paging.MaxPagingSize
Expand Down
8 changes: 4 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4219,13 +4219,13 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
continue
}
handle := kv.IntHandle(content.keys[0].GetInt64())
tmp, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, tmp...)
ranges, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, ranges...)
}
} else {
for _, p := range usedPartitionList {
tmp, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, tmp...)
ranges, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, ranges...)
}
}

Expand Down
34 changes: 26 additions & 8 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,25 +340,41 @@ func (t StoreType) Name() string {
// KeyRanges wrap the ranges for partitioned table cases.
// We might send ranges from different in the one request.
type KeyRanges struct {
ranges [][]KeyRange
ranges [][]KeyRange
rowCountHints [][]int

isPartitioned bool
}

// NewPartitionedKeyRanges constructs a new RequestRange for partitioned table.
func NewPartitionedKeyRanges(ranges [][]KeyRange) *KeyRanges {
return NewPartitionedKeyRangesWithHints(ranges, nil)
}

// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table.
func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges {
return NewNonParitionedKeyRangesWithHint(ranges, nil)
}

// NewPartitionedKeyRangesWithHints constructs a new RequestRange for partitioned table with row count hint.
func NewPartitionedKeyRangesWithHints(ranges [][]KeyRange, hints [][]int) *KeyRanges {
return &KeyRanges{
ranges: ranges,
rowCountHints: hints,
isPartitioned: true,
}
}

// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table.
func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges {
return &KeyRanges{
// NewNonParitionedKeyRangesWithHint constructs a new RequestRange for a non partitioned table with rou count hint.
func NewNonParitionedKeyRangesWithHint(ranges []KeyRange, hints []int) *KeyRanges {
rr := &KeyRanges{
ranges: [][]KeyRange{ranges},
isPartitioned: false,
}
if hints != nil {
rr.rowCountHints = [][]int{hints}
}
return rr
}

// FirstPartitionRange returns the the result of first range.
Expand Down Expand Up @@ -416,9 +432,13 @@ func (rr *KeyRanges) SortByFunc(sortFunc func(i, j KeyRange) bool) {
}

// ForEachPartitionWithErr runs the func for each partition with an error check.
func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange) error) (err error) {
func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange, []int) error) (err error) {
for i := range rr.ranges {
err = theFunc(rr.ranges[i])
var hints []int
if len(rr.rowCountHints) > i {
hints = rr.rowCountHints[i]
}
err = theFunc(rr.ranges[i], hints)
if err != nil {
return err
}
Expand Down Expand Up @@ -535,8 +555,6 @@ type Request struct {
}
// RequestSource indicates whether the request is an internal request.
RequestSource util.RequestSource
// FixedRowCountHint is the optimization hint for copr request for task scheduling.
FixedRowCountHint []int
// StoreBatchSize indicates the batch size of coprocessor in the same store.
StoreBatchSize int
}
Expand Down
1 change: 1 addition & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ go_test(
"//store/driver/backoff",
"//testkit/testsetup",
"//util/paging",
"//util/trxevents",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/mpp",
Expand Down
62 changes: 31 additions & 31 deletions store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
vars := kv.NewVariables(&killed)
opt := &kv.ClientSendOption{}

ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
req := &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, copr.CopSmallTaskRow},
Concurrency: 15,
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, copr.CopSmallTaskRow}),
Concurrency: 15,
}
it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
Expand All @@ -55,11 +55,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
require.Equal(t, smallConc, 1)
require.Equal(t, rateLimit.GetCapacity(), 2)

ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}),
Concurrency: 15,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
Expand All @@ -70,11 +70,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
require.Equal(t, rateLimit.GetCapacity(), 3)

// cross-region long range
ranges = copr.BuildKeyRanges("a", "z")
req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")),
FixedRowCountHint: []int{10},
Concurrency: 15,
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{10}),
Concurrency: 15,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
Expand All @@ -84,11 +84,11 @@ func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
require.Equal(t, smallConc, 2)
require.Equal(t, rateLimit.GetCapacity(), 3)

ranges = copr.BuildKeyRanges("a", "z")
req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "z")),
FixedRowCountHint: []int{copr.CopSmallTaskRow + 1},
Concurrency: 15,
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{copr.CopSmallTaskRow + 1}),
Concurrency: 15,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
Expand All @@ -115,12 +115,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
vars := kv.NewVariables(&killed)
opt := &kv.ClientSendOption{}

ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
req := &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 1,
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}),
Concurrency: 15,
StoreBatchSize: 1,
}
it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
Expand All @@ -131,12 +131,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
require.Equal(t, len(tasks[1].ToPBBatchTasks()), 1)
require.Equal(t, tasks[1].RowCountHint, 9)

ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 3,
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}),
Concurrency: 15,
StoreBatchSize: 3,
}
it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
Expand All @@ -146,12 +146,12 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
require.Equal(t, tasks[0].RowCountHint, 14)

// paging will disable store batch.
ranges = copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
req = &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRanges(copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")),
FixedRowCountHint: []int{1, 1, 3, 3},
Concurrency: 15,
StoreBatchSize: 3,
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}),
Concurrency: 15,
StoreBatchSize: 3,
Paging: struct {
Enable bool
MinPagingSize uint64
Expand Down
Loading

0 comments on commit dbaf491

Please sign in to comment.