Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: move row hint into key range #40105

Merged
merged 10 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,16 @@ func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementCon
// SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles
// "handles" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder {
var keyRanges []kv.KeyRange
keyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
keyRanges, hints := TableHandlesToKVRanges(tid, handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

// SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges.
// handles in slice must be kv.PartitionHandle.
func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder {
keyRanges := PartitionHandlesToKVRanges(handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
keyRanges, hints := PartitionHandlesToKVRanges(handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

Expand Down Expand Up @@ -194,6 +193,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui
return builder
}

// SetKeyRangesWithHints sets "KeyRanges" for "kv.Request" with row count hints.
func (builder *RequestBuilder) SetKeyRangesWithHints(keyRanges []kv.KeyRange, hints []int) *RequestBuilder {
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

// SetWrappedKeyRanges sets "KeyRanges" for "kv.Request".
func (builder *RequestBuilder) SetWrappedKeyRanges(keyRanges *kv.KeyRanges) *RequestBuilder {
builder.Request.KeyRanges = keyRanges
Expand Down Expand Up @@ -551,7 +556,7 @@ func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc
// For continuous handles, we should merge them to a single key range.
func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []int) {
krs := make([]kv.KeyRange, 0, len(handles))
hint := make([]int, 0, len(handles))
hints := make([]int, 0, len(handles))
i := 0
for i < len(handles) {
if commonHandle, ok := handles[i].(*kv.CommonHandle); ok {
Expand All @@ -560,7 +565,7 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
EndKey: tablecodec.EncodeRowKey(tid, kv.Key(commonHandle.Encoded()).Next()),
}
krs = append(krs, ran)
hint = append(hint, 1)
hints = append(hints, 1)
i++
continue
}
Expand All @@ -576,16 +581,17 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
hint = append(hint, j-i)
hints = append(hints, j-i)
i = j
}
return krs, hint
return krs, hints
}

// PartitionHandlesToKVRanges convert ParitionHandles to kv ranges.
// Handle in slices must be kv.PartitionHandle
func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
func PartitionHandlesToKVRanges(handles []kv.Handle) ([]kv.KeyRange, []int) {
krs := make([]kv.KeyRange, 0, len(handles))
hints := make([]int, 0, len(handles))
i := 0
for i < len(handles) {
ph := handles[i].(kv.PartitionHandle)
Expand All @@ -597,6 +603,7 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
EndKey: tablecodec.EncodeRowKey(pid, append(commonHandle.Encoded(), 0)),
}
krs = append(krs, ran)
hints = append(hints, 1)
i++
continue
}
Expand All @@ -615,9 +622,10 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
startKey := tablecodec.EncodeRowKey(pid, low)
endKey := tablecodec.EncodeRowKey(pid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
hints = append(hints, j-i)
i = j
}
return krs
return krs, hints
}

// IndexRangesToKVRanges converts index ranges to "KeyRange".
Expand Down
26 changes: 13 additions & 13 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func TestTableHandlesToKVRanges(t *testing.T) {

// Build key ranges.
expect := getExpectedRanges(1, hrs)
actual, _ := TableHandlesToKVRanges(1, handles)
actual, hints := TableHandlesToKVRanges(1, handles)

// Compare key ranges and expected key ranges.
require.Equal(t, len(expect), len(actual))
require.Equal(t, hints, []int{1, 4, 2, 1, 2})
for i := range actual {
require.Equal(t, expect[i].StartKey, actual[i].StartKey)
require.Equal(t, expect[i].EndKey, actual[i].EndKey)
Expand Down Expand Up @@ -378,7 +379,7 @@ func TestRequestBuilder3(t *testing.T) {
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{
KeyRanges: kv.NewNonParitionedKeyRangesWithHint([]kv.KeyRange{
{
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Expand All @@ -395,17 +396,16 @@ func TestRequestBuilder3(t *testing.T) {
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64},
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65},
},
}),
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
FixedRowCountHint: []int{1, 4, 2, 1},
}, []int{1, 4, 2, 1}),
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
expect.Paging.MinPagingSize = paging.MinPagingSize
expect.Paging.MaxPagingSize = paging.MaxPagingSize
Expand Down
8 changes: 4 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4220,13 +4220,13 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
continue
}
handle := kv.IntHandle(content.keys[0].GetInt64())
tmp, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, tmp...)
ranges, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, ranges...)
}
} else {
for _, p := range usedPartitionList {
tmp, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, tmp...)
ranges, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, ranges...)
}
}

Expand Down
58 changes: 58 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,3 +633,61 @@ func TestCoprocessorPagingReqKeyRangeSorted(t *testing.T) {
tk.MustExec(`set @a=0x61219F79C90D3541F70E, @b=5501707547099269248, @c=0xEC43EFD30131DEA2CB8B, @d="呣丼蒢咿卻鹻铴础湜僂頃dž縍套衞陀碵碼幓9", @e="鹹楞睕堚尛鉌翡佾搁紟精廬姆燵藝潐楻翇慸嵊";`)
tk.MustExec(`execute stmt using @a,@b,@c,@d,@e;`)
}

func TestCoprocessorBatchByStore(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t(id int primary key, c1 int, c2 int, key i(c1))")
tk.MustExec(`create table t1(id int primary key, c1 int, c2 int, key i(c1)) partition by range(id) (
partition p0 values less than(10000),
partition p1 values less than (50000),
partition p2 values less than (100000))`)
for i := 0; i < 10; i++ {
tk.MustExec("insert into t values(?, ?, ?)", i*10000, i*10000, i%2)
tk.MustExec("insert into t1 values(?, ?, ?)", i*10000, i*10000, i%2)
}
tk.MustQuery("split table t between (0) and (100000) regions 20").Check(testkit.Rows("20 1"))
tk.MustQuery("split table t1 between (0) and (100000) regions 20").Check(testkit.Rows("60 1"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/setRangesPerTask", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/setRangesPerTask"))
}()
ranges := []string{
"(c1 >= 0 and c1 < 5000)",
"(c1 >= 10000 and c1 < 15000)",
"(c1 >= 20000 and c1 < 25000)",
"(c1 >= 30000 and c1 < 35000)",
"(c1 >= 40000 and c1 < 45000)",
"(c1 >= 50000 and c1 < 55000)",
"(c1 >= 60000 and c1 < 65000)",
"(c1 >= 70000 and c1 < 75000)",
"(c1 >= 80000 and c1 < 85000)",
"(c1 >= 90000 and c1 < 95000)",
}
evenRows := testkit.Rows("0 0 0", "20000 20000 0", "40000 40000 0", "60000 60000 0", "80000 80000 0")
oddRows := testkit.Rows("10000 10000 1", "30000 30000 1", "50000 50000 1", "70000 70000 1", "90000 90000 1")
reverseOddRows := testkit.Rows("90000 90000 1", "70000 70000 1", "50000 50000 1", "30000 30000 1", "10000 10000 1")
for _, table := range []string{"t", "t1"} {
baseSQL := fmt.Sprintf("select * from %s force index(i) where id < 100000 and (%s)", table, strings.Join(ranges, " or "))
for _, paging := range []string{"on", "off"} {
tk.MustExec("set session tidb_enable_paging=?", paging)
for size := 0; size < 10; size++ {
tk.MustExec("set session tidb_store_batch_size=?", size)
tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows)
tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows)
// every batched task will get region error and fallback.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/batchCopRegionError", "return"))
tk.MustQuery(baseSQL + " and c2 = 0").Sort().Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1").Sort().Check(oddRows)
tk.MustQuery(baseSQL + " and c2 = 0 order by c1 asc").Check(evenRows)
tk.MustQuery(baseSQL + " and c2 = 1 order by c1 desc").Check(reverseOddRows)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/batchCopRegionError"))
}
}
}
}
34 changes: 26 additions & 8 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,25 +354,41 @@ func (t StoreType) Name() string {
// KeyRanges wrap the ranges for partitioned table cases.
// We might send ranges from different in the one request.
type KeyRanges struct {
ranges [][]KeyRange
ranges [][]KeyRange
rowCountHints [][]int

isPartitioned bool
}

// NewPartitionedKeyRanges constructs a new RequestRange for partitioned table.
func NewPartitionedKeyRanges(ranges [][]KeyRange) *KeyRanges {
return NewPartitionedKeyRangesWithHints(ranges, nil)
}

// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table.
func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges {
return NewNonParitionedKeyRangesWithHint(ranges, nil)
}

// NewPartitionedKeyRangesWithHints constructs a new RequestRange for partitioned table with row count hint.
func NewPartitionedKeyRangesWithHints(ranges [][]KeyRange, hints [][]int) *KeyRanges {
return &KeyRanges{
ranges: ranges,
rowCountHints: hints,
isPartitioned: true,
}
}

// NewNonParitionedKeyRanges constructs a new RequestRange for a non partitioned table.
func NewNonParitionedKeyRanges(ranges []KeyRange) *KeyRanges {
return &KeyRanges{
// NewNonParitionedKeyRangesWithHint constructs a new RequestRange for a non partitioned table with rou count hint.
func NewNonParitionedKeyRangesWithHint(ranges []KeyRange, hints []int) *KeyRanges {
rr := &KeyRanges{
ranges: [][]KeyRange{ranges},
isPartitioned: false,
}
if hints != nil {
rr.rowCountHints = [][]int{hints}
}
return rr
}

// FirstPartitionRange returns the the result of first range.
Expand Down Expand Up @@ -430,9 +446,13 @@ func (rr *KeyRanges) SortByFunc(sortFunc func(i, j KeyRange) bool) {
}

// ForEachPartitionWithErr runs the func for each partition with an error check.
func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange) error) (err error) {
func (rr *KeyRanges) ForEachPartitionWithErr(theFunc func([]KeyRange, []int) error) (err error) {
for i := range rr.ranges {
err = theFunc(rr.ranges[i])
var hints []int
if len(rr.rowCountHints) > i {
hints = rr.rowCountHints[i]
}
err = theFunc(rr.ranges[i], hints)
if err != nil {
return err
}
Expand Down Expand Up @@ -549,8 +569,6 @@ type Request struct {
}
// RequestSource indicates whether the request is an internal request.
RequestSource util.RequestSource
// FixedRowCountHint is the optimization hint for copr request for task scheduling.
FixedRowCountHint []int
// StoreBatchSize indicates the batch size of coprocessor in the same store.
StoreBatchSize int
// ResourceGroupName is the name of the bind resource group.
Expand Down
2 changes: 2 additions & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
Expand Down Expand Up @@ -77,6 +78,7 @@ go_test(
"//store/driver/backoff",
"//testkit/testsetup",
"//util/paging",
"//util/trxevents",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/mpp",
Expand Down
Loading