Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support paging protocol on unistore #35244

Merged
merged 12 commits into from
Jun 14, 2022
30 changes: 30 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3304,6 +3304,9 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
if len(partitions) == 0 {
return &TableDualExec{baseExecutor: *ret.base()}
}

// Sort the partition is necessary to make the final multiple partition key ranges ordered.
sort.Sort(partitionSlice(partitions))
ret.kvRangeBuilder = kvRangeBuilderFromRangeAndPartition{
sctx: b.ctx,
partitions: partitions,
Expand Down Expand Up @@ -3423,6 +3426,9 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table
usedPartition = append(usedPartition, p)
}
}

// To make the final key ranges involving multiple partitions ordered.
sort.Sort(partitionSlice(usedPartition))
return usedPartition, true, contentPos, nil
}

Expand Down Expand Up @@ -3998,6 +4004,10 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
kvRanges = append(tmp, kvRanges...)
}
}
// The key ranges should be ordered.
sort.Slice(kvRanges, func(i, j int) bool {
return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0
})
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
}

Expand Down Expand Up @@ -4028,6 +4038,11 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
kvRanges = append(kvRanges, tmp...)
}
}

// The key ranges should be ordered.
sort.Slice(kvRanges, func(i, j int) bool {
return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0
})
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
}

Expand Down Expand Up @@ -4056,6 +4071,21 @@ type kvRangeBuilderFromRangeAndPartition struct {
partitions []table.PhysicalTable
}

// partitionSlice implement the sort interface.
type partitionSlice []table.PhysicalTable

func (s partitionSlice) Len() int {
return len(s)
}

func (s partitionSlice) Less(i, j int) bool {
return s[i].GetPhysicalID() < s[j].GetPhysicalID()
}

func (s partitionSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (h kvRangeBuilderFromRangeAndPartition) buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error) {
ret := make([][]kv.KeyRange, 0, len(h.partitions))
pids := make([]int64, 0, len(h.partitions))
Expand Down
3 changes: 3 additions & 0 deletions executor/copr_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func TestIntegrationCopCache(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("create table t (a int primary key)")

// TODO(tiancaiamao) update the test and support cop cache for paging.
tk.MustExec("set @@tidb_enable_paging = off")

tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tid := tblInfo.Meta().ID
Expand Down
4 changes: 4 additions & 0 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,10 @@ func TestLimitIndexEstimation(t *testing.T) {
SQL string
Plan []string
}
// When paging is used, there is a 'paging:true' makes the explain output differ.
// IndexLookUp 0.00 root paging:true
tk.MustExec("set @@tidb_enable_paging = off")

analyzeSuiteData := core.GetAnalyzeSuiteData()
analyzeSuiteData.GetTestCases(t, &input, &output)
for i, tt := range input {
Expand Down
18 changes: 18 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func TestPushLimitDownIndexLookUpReader(t *testing.T) {
tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustExec("analyze table tbl")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -3681,6 +3684,10 @@ func TestExtendedStatsSwitch(t *testing.T) {
tk.MustQuery("select stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows(
"1.000000 1",
))

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

// Estimated index scan count is 4 using extended stats.
tk.MustQuery("explain format = 'brief' select * from t use index(b) where a > 3 order by b limit 1").Check(testkit.Rows(
"Limit 1.00 root offset:0, count:1",
Expand Down Expand Up @@ -4550,6 +4557,9 @@ func TestLimitIndexLookUpKeepOrder(t *testing.T) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int, b int, c int, d int, index idx(a,b,c));")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -4748,6 +4758,9 @@ func TestMultiColMaxOneRow(t *testing.T) {
tk.MustExec("create table t1(a int)")
tk.MustExec("create table t2(a int, b int, c int, primary key(a,b))")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -5520,6 +5533,8 @@ func TestPreferRangeScanForUnsignedIntHandle(t *testing.T) {

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")
// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
Expand Down Expand Up @@ -5559,6 +5574,9 @@ func TestIssue27083(t *testing.T) {
require.Nil(t, do.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table t")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down
4 changes: 4 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ func TestEliminateMaxOneRow(t *testing.T) {
tk.MustExec("create table t2(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL)")
tk.MustExec("create table t3(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, c int(11) DEFAULT NULL, UNIQUE KEY idx_abc (a, b, c))")

// When paging is used, there is a 'paging:true' makes the explain output differ.
// IndexLookUp 0.00 root paging:true
tk.MustExec("set @@tidb_enable_paging = off")

for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
Expand Down
32 changes: 32 additions & 0 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ func TestUpdateErrorRate(t *testing.T) {

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))")
err := h.HandleDDLEvent(<-h.DDLEventCh())
Expand Down Expand Up @@ -916,6 +920,10 @@ func TestQueryFeedback(t *testing.T) {
defer clean()
testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))")
testKit.MustExec("insert into t values (1,2),(2,2),(4,5)")
Expand Down Expand Up @@ -1174,6 +1182,10 @@ func TestUpdateStatsByLocalFeedback(t *testing.T) {
defer clean()
testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec(`set @@tidb_partition_prune_mode='` + string(variable.Static) + `'`)
testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))")
Expand Down Expand Up @@ -1613,6 +1625,10 @@ func TestIndexQueryFeedback4TopN(t *testing.T) {
handle.MinLogErrorRate.Store(0)

testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec("create table t (a bigint(64), index idx(a))")
for i := 0; i < 20; i++ {
Expand Down Expand Up @@ -1664,6 +1680,10 @@ func TestAbnormalIndexFeedback(t *testing.T) {
handle.MinLogErrorRate.Store(0)

testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("create table t (a bigint(64), b bigint(64), index idx_ab(a,b))")
for i := 0; i < 20; i++ {
testKit.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i/5, i))
Expand Down Expand Up @@ -1741,6 +1761,10 @@ func TestFeedbackRanges(t *testing.T) {
handle.MinLogErrorRate.Store(0)

testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("create table t (a tinyint, b tinyint, primary key(a), index idx(a, b))")
for i := 0; i < 20; i++ {
testKit.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i))
Expand Down Expand Up @@ -1820,6 +1844,10 @@ func TestUnsignedFeedbackRanges(t *testing.T) {
handle.MinLogErrorRate.Store(0)

testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec("create table t (a tinyint unsigned, primary key(a))")
testKit.MustExec("create table t1 (a bigint unsigned, primary key(a))")
Expand Down Expand Up @@ -2013,6 +2041,10 @@ func TestFeedbackCounter(t *testing.T) {
err := metrics.StoreQueryFeedbackCounter.WithLabelValues(metrics.LblOK).Write(oldNum)
require.NoError(t, err)
testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("create table t (a int, b int, index idx_a(a))")
testKit.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (5, 5)")
testKit.MustExec("analyze table t with 0 topn")
Expand Down
11 changes: 11 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars, option)
}
failpoint.Inject("DisablePaging", func(_ failpoint.Value) {
req.Paging = false
})
if req.StoreType == kv.TiDB {
// coprocessor on TiDB doesn't support paging
req.Paging = false
}
if req.Tp != kv.ReqTypeDAG {
// coprocessor request but type is not DAG
req.Paging = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that ReqTypeAnalyze will not use paging. We do not need to set enable_paging = false for the analyze tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paging is a property of the kv request,
kv request can be one of the following kinds:

// ReqTypes.
const (
	ReqTypeSelect   = 101
	ReqTypeIndex    = 102
	ReqTypeDAG      = 103
	ReqTypeAnalyze  = 104
	ReqTypeChecksum = 105

	ReqSubTypeBasic      = 0
	ReqSubTypeDesc       = 10000
	ReqSubTypeGroupBy    = 10001
	ReqSubTypeTopN       = 10002
	ReqSubTypeSignature  = 10003
	ReqSubTypeAnalyzeIdx = 10004
	ReqSubTypeAnalyzeCol = 10005
)

As you can see, there are ReqTypeDAG and ReqTypeAnalyze, and also ReqTypeChecksum for admin check table ... for now, we just support ReqTypeDAG, so all other cases req.Paging = false is set.

A possible change is that we can move the paging property to coprocessor.Request.
That change would make it clear that paging is a property of ReqTypeDAG (or coprocessor.Request to be more specific), not a property kv.Request.
The drawback of the change is, if one day, we need paging protocol also for Analyze/Checksum, we need to add the property to kv.Request...

}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := NewKeyRanges(req.KeyRanges)
Expand Down
17 changes: 13 additions & 4 deletions store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,18 @@ type mockGCWorkerSuite struct {
}

func createGCWorkerSuite(t *testing.T) (s *mockGCWorkerSuite, clean func()) {
s = new(mockGCWorkerSuite)
return createGCWorkerSuiteWithStoreType(t, mockstore.EmbedUnistore)
}

func createGCWorkerSuiteWithStoreType(t *testing.T, storeType mockstore.StoreType) (s *mockGCWorkerSuite, clean func()) {
s = new(mockGCWorkerSuite)
hijackClient := func(client tikv.Client) tikv.Client {
s.client = &mockGCWorkerClient{Client: client}
client = s.client
return client
}

opts := []mockstore.MockTiKVStoreOption{
mockstore.WithStoreType(mockstore.MockTiKV),
mockstore.WithStoreType(storeType),
mockstore.WithClusterInspector(func(c testutils.Cluster) {
s.initRegion.storeIDs, s.initRegion.peerIDs, s.initRegion.regionID, _ = mockstore.BootstrapWithMultiStores(c, 3)
s.cluster = c
Expand Down Expand Up @@ -943,7 +945,14 @@ func TestResolveLockRangeMeetRegionCacheMiss(t *testing.T) {
}

func TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(t *testing.T) {
s, clean := createGCWorkerSuite(t)
// TODO: Update the test code.
// This test rely on the obsolete mock tikv, but mock tikv does not implement paging.
// So use this failpoint to force non-paging protocol.
failpoint.Enable("github.com/pingcap/tidb/store/copr/DisablePaging", `return`)
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/DisablePaging"))
}()
s, clean := createGCWorkerSuiteWithStoreType(t, mockstore.MockTiKV)
defer clean()

var (
Expand Down
7 changes: 7 additions & 0 deletions store/mockstore/mockcopr/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand All @@ -37,6 +38,12 @@ import (
// This test checks the resolve lock functionality. When a txn meets the lock of a large transaction,
// it should not block by the lock.
func TestResolvedLargeTxnLocks(t *testing.T) {
// This is required since mock tikv does not support paging.
failpoint.Enable("github.com/pingcap/tidb/store/copr/DisablePaging", `return`)
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/DisablePaging"))
}()

rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
require.NoError(t, err)

Expand Down
Loading