Skip to content

Commit

Permalink
*: support paging protocol on unistore (#35244)
Browse files Browse the repository at this point in the history
ref #35242, close #35243
  • Loading branch information
tiancaiamao authored Jun 14, 2022
1 parent 4c443d5 commit ad1cb78
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 20 deletions.
30 changes: 30 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3304,6 +3304,9 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
if len(partitions) == 0 {
return &TableDualExec{baseExecutor: *ret.base()}
}

// Sort the partition is necessary to make the final multiple partition key ranges ordered.
sort.Sort(partitionSlice(partitions))
ret.kvRangeBuilder = kvRangeBuilderFromRangeAndPartition{
sctx: b.ctx,
partitions: partitions,
Expand Down Expand Up @@ -3423,6 +3426,9 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table
usedPartition = append(usedPartition, p)
}
}

// To make the final key ranges involving multiple partitions ordered.
sort.Sort(partitionSlice(usedPartition))
return usedPartition, true, contentPos, nil
}

Expand Down Expand Up @@ -3998,6 +4004,10 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
kvRanges = append(tmp, kvRanges...)
}
}
// The key ranges should be ordered.
sort.Slice(kvRanges, func(i, j int) bool {
return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0
})
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
}

Expand Down Expand Up @@ -4028,6 +4038,11 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
kvRanges = append(kvRanges, tmp...)
}
}

// The key ranges should be ordered.
sort.Slice(kvRanges, func(i, j int) bool {
return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0
})
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
}

Expand Down Expand Up @@ -4056,6 +4071,21 @@ type kvRangeBuilderFromRangeAndPartition struct {
partitions []table.PhysicalTable
}

// partitionSlice implement the sort interface.
type partitionSlice []table.PhysicalTable

func (s partitionSlice) Len() int {
return len(s)
}

func (s partitionSlice) Less(i, j int) bool {
return s[i].GetPhysicalID() < s[j].GetPhysicalID()
}

func (s partitionSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (h kvRangeBuilderFromRangeAndPartition) buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error) {
ret := make([][]kv.KeyRange, 0, len(h.partitions))
pids := make([]int64, 0, len(h.partitions))
Expand Down
3 changes: 3 additions & 0 deletions executor/copr_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func TestIntegrationCopCache(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("create table t (a int primary key)")

// TODO(tiancaiamao) update the test and support cop cache for paging.
tk.MustExec("set @@tidb_enable_paging = off")

tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tid := tblInfo.Meta().ID
Expand Down
4 changes: 4 additions & 0 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,10 @@ func TestLimitIndexEstimation(t *testing.T) {
SQL string
Plan []string
}
// When paging is used, there is a 'paging:true' makes the explain output differ.
// IndexLookUp 0.00 root paging:true
tk.MustExec("set @@tidb_enable_paging = off")

analyzeSuiteData := core.GetAnalyzeSuiteData()
analyzeSuiteData.GetTestCases(t, &input, &output)
for i, tt := range input {
Expand Down
18 changes: 18 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func TestPushLimitDownIndexLookUpReader(t *testing.T) {
tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustExec("analyze table tbl")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -3681,6 +3684,10 @@ func TestExtendedStatsSwitch(t *testing.T) {
tk.MustQuery("select stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows(
"1.000000 1",
))

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

// Estimated index scan count is 4 using extended stats.
tk.MustQuery("explain format = 'brief' select * from t use index(b) where a > 3 order by b limit 1").Check(testkit.Rows(
"Limit 1.00 root offset:0, count:1",
Expand Down Expand Up @@ -4550,6 +4557,9 @@ func TestLimitIndexLookUpKeepOrder(t *testing.T) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int, b int, c int, d int, index idx(a,b,c));")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -4748,6 +4758,9 @@ func TestMultiColMaxOneRow(t *testing.T) {
tk.MustExec("create table t1(a int)")
tk.MustExec("create table t2(a int, b int, c int, primary key(a,b))")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -5520,6 +5533,8 @@ func TestPreferRangeScanForUnsignedIntHandle(t *testing.T) {

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")
// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
Expand Down Expand Up @@ -5559,6 +5574,9 @@ func TestIssue27083(t *testing.T) {
require.Nil(t, do.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table t")

// When paging is enabled, there would be a 'paging: true' in the explain result.
tk.MustExec("set @@tidb_enable_paging = off")

var input []string
var output []struct {
SQL string
Expand Down
4 changes: 4 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ func TestEliminateMaxOneRow(t *testing.T) {
tk.MustExec("create table t2(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL)")
tk.MustExec("create table t3(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, c int(11) DEFAULT NULL, UNIQUE KEY idx_abc (a, b, c))")

// When paging is used, there is a 'paging:true' makes the explain output differ.
// IndexLookUp 0.00 root paging:true
tk.MustExec("set @@tidb_enable_paging = off")

for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
Expand Down
32 changes: 32 additions & 0 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ func TestUpdateErrorRate(t *testing.T) {

testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))")
err := h.HandleDDLEvent(<-h.DDLEventCh())
Expand Down Expand Up @@ -916,6 +920,10 @@ func TestQueryFeedback(t *testing.T) {
defer clean()
testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))")
testKit.MustExec("insert into t values (1,2),(2,2),(4,5)")
Expand Down Expand Up @@ -1174,6 +1182,10 @@ func TestUpdateStatsByLocalFeedback(t *testing.T) {
defer clean()
testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec(`set @@tidb_partition_prune_mode='` + string(variable.Static) + `'`)
testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))")
Expand Down Expand Up @@ -1613,6 +1625,10 @@ func TestIndexQueryFeedback4TopN(t *testing.T) {
handle.MinLogErrorRate.Store(0)

testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec("create table t (a bigint(64), index idx(a))")
for i := 0; i < 20; i++ {
Expand Down Expand Up @@ -1664,6 +1680,10 @@ func TestAbnormalIndexFeedback(t *testing.T) {
handle.MinLogErrorRate.Store(0)

testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("create table t (a bigint(64), b bigint(64), index idx_ab(a,b))")
for i := 0; i < 20; i++ {
testKit.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i/5, i))
Expand Down Expand Up @@ -1741,6 +1761,10 @@ func TestFeedbackRanges(t *testing.T) {
handle.MinLogErrorRate.Store(0)

testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("create table t (a tinyint, b tinyint, primary key(a), index idx(a, b))")
for i := 0; i < 20; i++ {
testKit.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i))
Expand Down Expand Up @@ -1820,6 +1844,10 @@ func TestUnsignedFeedbackRanges(t *testing.T) {
handle.MinLogErrorRate.Store(0)

testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("set @@session.tidb_analyze_version = 0")
testKit.MustExec("create table t (a tinyint unsigned, primary key(a))")
testKit.MustExec("create table t1 (a bigint unsigned, primary key(a))")
Expand Down Expand Up @@ -2013,6 +2041,10 @@ func TestFeedbackCounter(t *testing.T) {
err := metrics.StoreQueryFeedbackCounter.WithLabelValues(metrics.LblOK).Write(oldNum)
require.NoError(t, err)
testKit.MustExec("use test")

// TODO(tiancaiamao): query feedback is broken when paging is on.
testKit.MustExec("set @@tidb_enable_paging = off")

testKit.MustExec("create table t (a int, b int, index idx_a(a))")
testKit.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (5, 5)")
testKit.MustExec("analyze table t with 0 topn")
Expand Down
11 changes: 11 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars, option)
}
failpoint.Inject("DisablePaging", func(_ failpoint.Value) {
req.Paging = false
})
if req.StoreType == kv.TiDB {
// coprocessor on TiDB doesn't support paging
req.Paging = false
}
if req.Tp != kv.ReqTypeDAG {
// coprocessor request but type is not DAG
req.Paging = false
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := NewKeyRanges(req.KeyRanges)
Expand Down
17 changes: 13 additions & 4 deletions store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,18 @@ type mockGCWorkerSuite struct {
}

func createGCWorkerSuite(t *testing.T) (s *mockGCWorkerSuite, clean func()) {
s = new(mockGCWorkerSuite)
return createGCWorkerSuiteWithStoreType(t, mockstore.EmbedUnistore)
}

func createGCWorkerSuiteWithStoreType(t *testing.T, storeType mockstore.StoreType) (s *mockGCWorkerSuite, clean func()) {
s = new(mockGCWorkerSuite)
hijackClient := func(client tikv.Client) tikv.Client {
s.client = &mockGCWorkerClient{Client: client}
client = s.client
return client
}

opts := []mockstore.MockTiKVStoreOption{
mockstore.WithStoreType(mockstore.MockTiKV),
mockstore.WithStoreType(storeType),
mockstore.WithClusterInspector(func(c testutils.Cluster) {
s.initRegion.storeIDs, s.initRegion.peerIDs, s.initRegion.regionID, _ = mockstore.BootstrapWithMultiStores(c, 3)
s.cluster = c
Expand Down Expand Up @@ -943,7 +945,14 @@ func TestResolveLockRangeMeetRegionCacheMiss(t *testing.T) {
}

func TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(t *testing.T) {
s, clean := createGCWorkerSuite(t)
// TODO: Update the test code.
// This test rely on the obsolete mock tikv, but mock tikv does not implement paging.
// So use this failpoint to force non-paging protocol.
failpoint.Enable("github.com/pingcap/tidb/store/copr/DisablePaging", `return`)
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/DisablePaging"))
}()
s, clean := createGCWorkerSuiteWithStoreType(t, mockstore.MockTiKV)
defer clean()

var (
Expand Down
7 changes: 7 additions & 0 deletions store/mockstore/mockcopr/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand All @@ -37,6 +38,12 @@ import (
// This test checks the resolve lock functionality. When a txn meets the lock of a large transaction,
// it should not block by the lock.
func TestResolvedLargeTxnLocks(t *testing.T) {
// This is required since mock tikv does not support paging.
failpoint.Enable("github.com/pingcap/tidb/store/copr/DisablePaging", `return`)
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/DisablePaging"))
}()

rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
require.NoError(t, err)

Expand Down
Loading

0 comments on commit ad1cb78

Please sign in to comment.