From 9a03a0c0deba816d52dd7b35c04fd067378c2401 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 8 Jun 2022 17:57:15 +0800 Subject: [PATCH 1/6] *: support paging protocol on unistore --- executor/analyze_test.go | 4 ++ executor/copr_cache_test.go | 4 ++ planner/core/cbo_test.go | 4 ++ planner/core/find_best_task.go | 1 + planner/core/integration_test.go | 18 ++++++ planner/core/physical_plan_test.go | 4 ++ privilege/privileges/cache.go | 1 + session/session.go | 2 + sessionctx/variable/sysvar.go | 2 +- statistics/handle/update_test.go | 33 ++++++++++ store/copr/coprocessor.go | 17 +++++ store/gcworker/gc_worker_test.go | 19 ++++-- store/mockstore/mockcopr/executor_test.go | 6 ++ .../unistore/cophandler/cop_handler.go | 43 ++++++++++--- .../unistore/cophandler/cop_handler_test.go | 4 +- store/mockstore/unistore/cophandler/mpp.go | 3 + .../mockstore/unistore/cophandler/mpp_exec.go | 64 ++++++++++++++++++- store/mockstore/unistore/tikv/server.go | 2 + testkit/mockstore.go | 3 + .../realtikvtest/sessiontest/session_test.go | 7 ++ 20 files changed, 220 insertions(+), 21 deletions(-) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 736fb1bc6bf0b..e034b9087c594 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -1019,6 +1019,10 @@ func TestAnalyzeIncremental(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("set @@tidb_analyze_version = 1") + + // TODO(tiancaiamao) analyze protocol has not implement paging yet. + tk.MustExec("set @@tidb_enable_paging = off") + testAnalyzeIncremental(tk, t, dom) } diff --git a/executor/copr_cache_test.go b/executor/copr_cache_test.go index 3d9e73f7cd212..1bf1a84b20d52 100644 --- a/executor/copr_cache_test.go +++ b/executor/copr_cache_test.go @@ -53,6 +53,10 @@ func TestIntegrationCopCache(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t (a int primary key)") + // TODO(tiancaiamao) update the test and support cop cache for paging. + tk.MustExec("set @@tidb_enable_paging = off") + + tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) tid := tblInfo.Meta().ID diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index 12d0555ae31f7..44add53a2698b 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -822,6 +822,10 @@ func TestLimitIndexEstimation(t *testing.T) { SQL string Plan []string } + // When paging is used, there is a 'paging:true' makes the explain output differ. + // IndexLookUp 0.00 root paging:true + tk.MustExec("set @@tidb_enable_paging = off") + analyzeSuiteData := core.GetAnalyzeSuiteData() analyzeSuiteData.GetTestCases(t, &input, &output) for i, tt := range input { diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index c720879d15ed4..1d3709faacfc2 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -507,6 +507,7 @@ func (p *LogicalMemTable) findBestTask(prop *property.PhysicalProperty, planCoun if !prop.IsSortItemEmpty() || planCounter.Empty() { return invalidTask, 0, nil } + memTable := PhysicalMemTable{ DBName: p.DBName, Table: p.TableInfo, diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index ac8a18beb381f..ac7a57292eeb4 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -177,6 +177,9 @@ func TestPushLimitDownIndexLookUpReader(t *testing.T) { tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)") tk.MustExec("analyze table tbl") + // When paging is enabled, there would be a 'paging: true' in the explain result. + tk.MustExec("set @@tidb_enable_paging = off") + var input []string var output []struct { SQL string @@ -3681,6 +3684,10 @@ func TestExtendedStatsSwitch(t *testing.T) { tk.MustQuery("select stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows( "1.000000 1", )) + + // When paging is enabled, there would be a 'paging: true' in the explain result. + tk.MustExec("set @@tidb_enable_paging = off") + // Estimated index scan count is 4 using extended stats. tk.MustQuery("explain format = 'brief' select * from t use index(b) where a > 3 order by b limit 1").Check(testkit.Rows( "Limit 1.00 root offset:0, count:1", @@ -4550,6 +4557,9 @@ func TestLimitIndexLookUpKeepOrder(t *testing.T) { tk.MustExec("drop table if exists t;") tk.MustExec("create table t(a int, b int, c int, d int, index idx(a,b,c));") + // When paging is enabled, there would be a 'paging: true' in the explain result. + tk.MustExec("set @@tidb_enable_paging = off") + var input []string var output []struct { SQL string @@ -4748,6 +4758,9 @@ func TestMultiColMaxOneRow(t *testing.T) { tk.MustExec("create table t1(a int)") tk.MustExec("create table t2(a int, b int, c int, primary key(a,b))") + // When paging is enabled, there would be a 'paging: true' in the explain result. + tk.MustExec("set @@tidb_enable_paging = off") + var input []string var output []struct { SQL string @@ -5520,6 +5533,8 @@ func TestPreferRangeScanForUnsignedIntHandle(t *testing.T) { // Default RPC encoding may cause statistics explain result differ and then the test unstable. tk.MustExec("set @@tidb_enable_chunk_rpc = on") + // When paging is enabled, there would be a 'paging: true' in the explain result. + tk.MustExec("set @@tidb_enable_paging = off") var input []string var output []struct { @@ -5559,6 +5574,9 @@ func TestIssue27083(t *testing.T) { require.Nil(t, do.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll)) tk.MustExec("analyze table t") + // When paging is enabled, there would be a 'paging: true' in the explain result. + tk.MustExec("set @@tidb_enable_paging = off") + var input []string var output []struct { SQL string diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index fe5c5cba7da00..97b80f92251c1 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -538,6 +538,10 @@ func TestEliminateMaxOneRow(t *testing.T) { tk.MustExec("create table t2(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL)") tk.MustExec("create table t3(a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, c int(11) DEFAULT NULL, UNIQUE KEY idx_abc (a, b, c))") + // When paging is used, there is a 'paging:true' makes the explain output differ. + // IndexLookUp 0.00 root paging:true + tk.MustExec("set @@tidb_enable_paging = off") + for i, ts := range input { testdata.OnRecord(func() { output[i].SQL = ts diff --git a/privilege/privileges/cache.go b/privilege/privileges/cache.go index 9e8198650d25b..ee0dfd17b1467 100644 --- a/privilege/privileges/cache.go +++ b/privilege/privileges/cache.go @@ -567,6 +567,7 @@ func (p *MySQLPrivilege) LoadDefaultRoles(ctx sessionctx.Context) error { func (p *MySQLPrivilege) loadTable(sctx sessionctx.Context, sql string, decodeTableRow func(chunk.Row, []*ast.ResultField) error) error { ctx := context.Background() + fmt.Println(" sql ==", sql) rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql) if err != nil { return errors.Trace(err) diff --git a/session/session.go b/session/session.go index e6b5997b78c4d..5a41ef80cae25 100644 --- a/session/session.go +++ b/session/session.go @@ -2882,7 +2882,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } if !config.GetGlobalConfig().Security.SkipGrantTable { + fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") err = dom.LoadPrivilegeLoop(ses[3]) + fmt.Println("after !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") if err != nil { return nil, err } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 84f6ca8a2bbdb..423eb40851a31 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1502,7 +1502,7 @@ var defaultSysVars = []*SysVar{ s.RegardNULLAsPoint = TiDBOptOn(val) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePaging, Value: Off, Type: TypeBool, Hidden: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePaging, Value: On, Type: TypeBool, Hidden: true, SetSession: func(s *SessionVars, val string) error { s.EnablePaging = TiDBOptOn(val) return nil }}, diff --git a/statistics/handle/update_test.go b/statistics/handle/update_test.go index 174c8d48b139e..cfbcb55689766 100644 --- a/statistics/handle/update_test.go +++ b/statistics/handle/update_test.go @@ -733,6 +733,10 @@ func TestUpdateErrorRate(t *testing.T) { testKit := testkit.NewTestKit(t, store) testKit.MustExec("use test") + + // TODO(tiancaiamao): query feedback is broken when paging is on. + testKit.MustExec("set @@tidb_enable_paging = off") + testKit.MustExec("set @@session.tidb_analyze_version = 0") testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))") err := h.HandleDDLEvent(<-h.DDLEventCh()) @@ -916,6 +920,10 @@ func TestQueryFeedback(t *testing.T) { defer clean() testKit := testkit.NewTestKit(t, store) testKit.MustExec("use test") + + // TODO(tiancaiamao): query feedback is broken when paging is on. + testKit.MustExec("set @@tidb_enable_paging = off") + testKit.MustExec("set @@session.tidb_analyze_version = 0") testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))") testKit.MustExec("insert into t values (1,2),(2,2),(4,5)") @@ -1174,6 +1182,10 @@ func TestUpdateStatsByLocalFeedback(t *testing.T) { defer clean() testKit := testkit.NewTestKit(t, store) testKit.MustExec("use test") + + // TODO(tiancaiamao): query feedback is broken when paging is on. + testKit.MustExec("set @@tidb_enable_paging = off") + testKit.MustExec("set @@session.tidb_analyze_version = 0") testKit.MustExec(`set @@tidb_partition_prune_mode='` + string(variable.Static) + `'`) testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))") @@ -1613,6 +1625,10 @@ func TestIndexQueryFeedback4TopN(t *testing.T) { handle.MinLogErrorRate.Store(0) testKit.MustExec("use test") + + // TODO(tiancaiamao): query feedback is broken when paging is on. + testKit.MustExec("set @@tidb_enable_paging = off") + testKit.MustExec("set @@session.tidb_analyze_version = 0") testKit.MustExec("create table t (a bigint(64), index idx(a))") for i := 0; i < 20; i++ { @@ -1664,6 +1680,10 @@ func TestAbnormalIndexFeedback(t *testing.T) { handle.MinLogErrorRate.Store(0) testKit.MustExec("use test") + + // TODO(tiancaiamao): query feedback is broken when paging is on. + testKit.MustExec("set @@tidb_enable_paging = off") + testKit.MustExec("create table t (a bigint(64), b bigint(64), index idx_ab(a,b))") for i := 0; i < 20; i++ { testKit.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i/5, i)) @@ -1741,6 +1761,10 @@ func TestFeedbackRanges(t *testing.T) { handle.MinLogErrorRate.Store(0) testKit.MustExec("use test") + + // TODO(tiancaiamao): query feedback is broken when paging is on. + testKit.MustExec("set @@tidb_enable_paging = off") + testKit.MustExec("create table t (a tinyint, b tinyint, primary key(a), index idx(a, b))") for i := 0; i < 20; i++ { testKit.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i)) @@ -1820,6 +1844,10 @@ func TestUnsignedFeedbackRanges(t *testing.T) { handle.MinLogErrorRate.Store(0) testKit.MustExec("use test") + + // TODO(tiancaiamao): query feedback is broken when paging is on. + testKit.MustExec("set @@tidb_enable_paging = off") + testKit.MustExec("set @@session.tidb_analyze_version = 0") testKit.MustExec("create table t (a tinyint unsigned, primary key(a))") testKit.MustExec("create table t1 (a bigint unsigned, primary key(a))") @@ -1881,6 +1909,7 @@ func TestUnsignedFeedbackRanges(t *testing.T) { for _, test := range tests { table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr(test.tblName)) require.NoError(t, err) + fmt.Println(" sql ===", test.sql) testKit.MustQuery(test.sql) require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) require.NoError(t, h.DumpStatsFeedbackToKV()) @@ -2013,6 +2042,10 @@ func TestFeedbackCounter(t *testing.T) { err := metrics.StoreQueryFeedbackCounter.WithLabelValues(metrics.LblOK).Write(oldNum) require.NoError(t, err) testKit.MustExec("use test") + + // TODO(tiancaiamao): query feedback is broken when paging is on. + testKit.MustExec("set @@tidb_enable_paging = off") + testKit.MustExec("create table t (a int, b int, index idx_a(a))") testKit.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (5, 5)") testKit.MustExec("analyze table t with 0 topn") diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 09b48bfbe2679..234752ce442b2 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -82,6 +82,21 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa logutil.BgLogger().Debug("send batch requests") return c.sendBatch(ctx, req, vars, option) } + + failpoint.Inject("DisablePaging", func(_ failpoint.Value) { + req.Paging = false + }) + + if req.StoreType == kv.TiDB { + // TiDB coprocessor doesn't support paging + req.Paging = false + fmt.Println("set the xxxxxxx to false ... because store type ks tidb!!!!!") + } + if req.Tp != kv.ReqTypeDAG { + // coprocessor request but type is not DAG + req.Paging = false + fmt.Println("set the xxxxxxx to false ... because req type not dag!!!!!") + } if req.Streaming && req.Paging { return copErrorResponse{errors.New("streaming and paging are both on")} } @@ -921,6 +936,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti return nil, errors.New("lastRange in paging should not be nil") } // calculate next ranges and grow the paging size + fmt.Println("handle cop paging result ... paging range ==", pagingRange) task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc) if task.ranges.Len() == 0 { return nil, nil @@ -1152,6 +1168,7 @@ func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *copro left, _ := ranges.Split(split.Start) return left } + fmt.Println("calculate remain ...", split.End) _, right := ranges.Split(split.End) return right } diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index 137cfb6eab2bb..33587cc2d85b1 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -97,16 +97,18 @@ type mockGCWorkerSuite struct { } func createGCWorkerSuite(t *testing.T) (s *mockGCWorkerSuite, clean func()) { - s = new(mockGCWorkerSuite) + return createGCWorkerSuiteWithStoreType(t, mockstore.EmbedUnistore) +} +func createGCWorkerSuiteWithStoreType(t *testing.T, storeType mockstore.StoreType) (s *mockGCWorkerSuite, clean func()) { + s = new(mockGCWorkerSuite) hijackClient := func(client tikv.Client) tikv.Client { s.client = &mockGCWorkerClient{Client: client} client = s.client return client } - opts := []mockstore.MockTiKVStoreOption{ - mockstore.WithStoreType(mockstore.MockTiKV), + mockstore.WithStoreType(storeType), mockstore.WithClusterInspector(func(c testutils.Cluster) { s.initRegion.storeIDs, s.initRegion.peerIDs, s.initRegion.regionID, _ = mockstore.BootstrapWithMultiStores(c, 3) s.cluster = c @@ -117,7 +119,6 @@ func createGCWorkerSuite(t *testing.T) (s *mockGCWorkerSuite, clean func()) { return c }), } - s.oracle = &oracles.MockOracle{} s.store, s.dom, clean = testkit.CreateMockStoreWithOracle(t, s.oracle, opts...) s.tikvStore = s.store.(tikv.Storage) @@ -943,9 +944,17 @@ func TestResolveLockRangeMeetRegionCacheMiss(t *testing.T) { } func TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(t *testing.T) { - s, clean := createGCWorkerSuite(t) + // TODO: Update the test code. + // This test rely on the obsolete mock tikv, but mock tikv does not implement paging. + // So use this failpoint to force non-paging protocol. + failpoint.Enable("github.com/pingcap/tidb/store/copr/DisablePaging", `return`) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/DisablePaging")) + }() + s, clean := createGCWorkerSuiteWithStoreType(t, mockstore.MockTiKV) defer clean() + var ( firstAccess = true firstAccessRef = &firstAccess diff --git a/store/mockstore/mockcopr/executor_test.go b/store/mockstore/mockcopr/executor_test.go index af75c322e6fd6..7dc269c80acf7 100644 --- a/store/mockstore/mockcopr/executor_test.go +++ b/store/mockstore/mockcopr/executor_test.go @@ -37,6 +37,12 @@ import ( // This test checks the resolve lock functionality. When a txn meets the lock of a large transaction, // it should not block by the lock. func TestResolvedLargeTxnLocks(t *testing.T) { + // This is required since mock tikv does not support paging. + failpoint.Enable("github.com/pingcap/tidb/store/copr/DisablePaging", `return`) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/DisablePaging")) + }() + rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) require.NoError(t, err) diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index b5fbeaeb0841b..8526e0795c6a8 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -141,7 +141,7 @@ func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemSt return resp } - exec, chunks, counts, ndvs, err := buildAndRunMPPExecutor(dagCtx, dagReq) + exec, chunks, lastRange, counts, ndvs, err := buildAndRunMPPExecutor(dagCtx, dagReq, req.PagingSize) if err != nil { errMsg := err.Error() @@ -149,12 +149,12 @@ func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemSt resp.OtherError = err.Error() return resp } - return genRespWithMPPExec(nil, nil, nil, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) + return genRespWithMPPExec(nil, lastRange, nil, nil, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) } - return genRespWithMPPExec(chunks, counts, ndvs, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) + return genRespWithMPPExec(chunks, lastRange, counts, ndvs, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) } -func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest) (mppExec, []tipb.Chunk, []int64, []int64, error) { +func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest, pagingSize uint64) (mppExec, []tipb.Chunk, *coprocessor.KeyRange, []int64, []int64, error) { rootExec := dagReq.RootExecutor if rootExec == nil { rootExec = ExecutorListsToTree(dagReq.Executors) @@ -175,15 +175,25 @@ func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest) (mppExe counts: counts, ndvs: ndvs, } + var lastRange *coprocessor.KeyRange + if pagingSize > 0 { + lastRange = &coprocessor.KeyRange{} + builder.paging = lastRange + } exec, err := builder.buildMPPExecutor(rootExec) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err + } + chunks, err := mppExecute(exec, dagCtx, dagReq, pagingSize) + fmt.Println("paging range result ==", lastRange, dagCtx.keyRanges) + if lastRange != nil && len(lastRange.Start)==0 && len(lastRange.End) == 0 { + fmt.Println("what the fuck???") + lastRange = nil } - chunks, err := mppExecute(exec, dagCtx, dagReq) - return exec, chunks, counts, ndvs, err + return exec, chunks, lastRange, counts, ndvs, err } -func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest) (chunks []tipb.Chunk, err error) { +func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest, pagingSize uint64) (chunks []tipb.Chunk, err error) { err = exec.open() defer func() { err := exec.stop() @@ -195,6 +205,9 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest) (chun return } + fmt.Println(" 666666666666 mpp execute?") + + var totalRows uint64 var chk *chunk.Chunk fields := exec.getFieldTypes() for { @@ -208,6 +221,14 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest) (chun chunks, err = useDefaultEncoding(chk, dagCtx, dagReq, fields, chunks) case tipb.EncodeType_TypeChunk: chunks = useChunkEncoding(chk, dagReq, fields, chunks) + if pagingSize > 0 { + fmt.Println("paging size ==", pagingSize) + totalRows += uint64(chk.NumRows()) + if totalRows > pagingSize { + fmt.Println("break ... total rows ==", totalRows) + break + } + } default: err = fmt.Errorf("unsupported DAG request encode type %s", dagReq.EncodeType) } @@ -444,8 +465,10 @@ func (e *ErrLocked) Error() string { return fmt.Sprintf("key is locked, key: %q, Type: %v, primary: %q, startTS: %v", e.Key, e.LockType, e.Primary, e.StartTS) } -func genRespWithMPPExec(chunks []tipb.Chunk, counts, ndvs []int64, exec mppExec, dagReq *tipb.DAGRequest, err error, warnings []stmtctx.SQLWarn, dur time.Duration) *coprocessor.Response { - resp := &coprocessor.Response{} +func genRespWithMPPExec(chunks []tipb.Chunk, lastRange *coprocessor.KeyRange, counts, ndvs []int64, exec mppExec, dagReq *tipb.DAGRequest, err error, warnings []stmtctx.SQLWarn, dur time.Duration) *coprocessor.Response { + resp := &coprocessor.Response{ + Range: lastRange, + } selResp := &tipb.SelectResponse{ Error: toPBError(err), Chunks: chunks, diff --git a/store/mockstore/unistore/cophandler/cop_handler_test.go b/store/mockstore/unistore/cophandler/cop_handler_test.go index 1408a8c83fe4f..127b233b98a82 100644 --- a/store/mockstore/unistore/cophandler/cop_handler_test.go +++ b/store/mockstore/unistore/cophandler/cop_handler_test.go @@ -410,7 +410,7 @@ func TestMppExecutor(t *testing.T) { dagCtx := newDagContext(store, []kv.KeyRange{getTestPointRange(tableID, 1)}, dagRequest, dagRequestStartTs) - _, _, rowCount, _, err := buildAndRunMPPExecutor(dagCtx, dagRequest) + _, _, _, rowCount, _, err := buildAndRunMPPExecutor(dagCtx, dagRequest, 0) require.Equal(t, rowCount[0], int64(1)) require.NoError(t, err) } @@ -614,7 +614,7 @@ func BenchmarkExecutors(b *testing.B) { // }) b.Run(fmt.Sprintf("(row=%d, limit=%d)", row, lim), func(b *testing.B) { for i := 0; i < b.N; i++ { - _, _, _, _, err := buildAndRunMPPExecutor(dagCtx, dagReq) + _, _, _, _, _, err := buildAndRunMPPExecutor(dagCtx, dagReq, 0) if err != nil { b.Fatal(err) } diff --git a/store/mockstore/unistore/cophandler/mpp.go b/store/mockstore/unistore/cophandler/mpp.go index 8b25a5e1c0974..d7ba1b96ff02e 100644 --- a/store/mockstore/unistore/cophandler/mpp.go +++ b/store/mockstore/unistore/cophandler/mpp.go @@ -58,6 +58,7 @@ type mppExecBuilder struct { dagCtx *dagContext counts []int64 ndvs []int64 + paging *coprocessor.KeyRange } func (b *mppExecBuilder) buildMPPTableScan(pb *tipb.TableScan) (*tableScanExec, error) { @@ -73,6 +74,7 @@ func (b *mppExecBuilder) buildMPPTableScan(pb *tipb.TableScan) (*tableScanExec, counts: b.counts, ndvs: b.ndvs, desc: pb.Desc, + paging: b.paging, } if b.dagCtx != nil { ts.lockStore = b.dagCtx.lockStore @@ -180,6 +182,7 @@ func (b *mppExecBuilder) buildIdxScan(pb *tipb.IndexScan) (*indexScanExec, error hdlStatus: hdlStatus, desc: pb.Desc, physTblIDColIdx: physTblIDColIdx, + paging: b.paging, } return idxScan, nil } diff --git a/store/mockstore/unistore/cophandler/mpp_exec.go b/store/mockstore/unistore/cophandler/mpp_exec.go index 724cf42b90ec1..4bdf4670868d0 100644 --- a/store/mockstore/unistore/cophandler/mpp_exec.go +++ b/store/mockstore/unistore/cophandler/mpp_exec.go @@ -15,6 +15,7 @@ package cophandler import ( + "fmt" "bytes" "encoding/binary" "io" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -101,6 +103,7 @@ func (b *baseMPPExec) stop() error { type scanResult struct { chk *chunk.Chunk + lastProcessedKey kv.Key err error } @@ -126,11 +129,14 @@ type tableScanExec struct { // if ExtraPhysTblIDCol is requested, fill in the physical table id in this column position physTblIDColIdx *int + // This is used to update the paging range result, updated in next(). + paging *coprocessor.KeyRange } func (e *tableScanExec) SkipValue() bool { return false } func (e *tableScanExec) Process(key, value []byte) error { + // fmt.Println("process ... last key ==", kv.Key(key)) handle, err := tablecodec.DecodeRowKey(key) if err != nil { return errors.Trace(err) @@ -148,8 +154,9 @@ func (e *tableScanExec) Process(key, value []byte) error { if e.chk.IsFull() { select { - case e.result <- scanResult{chk: e.chk, err: nil}: + case e.result <- scanResult{chk: e.chk, lastProcessedKey: kv.Key(key), err: nil}: e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, DefaultBatchSize) + // fmt.Println("run here!!!!") case <-e.done: return dbreader.ErrScanBreak } @@ -173,12 +180,14 @@ func (e *tableScanExec) open() error { } } e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, DefaultBatchSize) - e.result = make(chan scanResult, 1) + e.result = make(chan scanResult, 0) e.done = make(chan struct{}) e.wg.Run(func() { // close the channel when done scanning, so that next() will got nil chunk defer close(e.result) - for i, ran := range e.kvRanges { + var i int + var ran kv.KeyRange + for i, ran = range e.kvRanges { oldCnt := e.rowCnt if e.desc { err = e.dbReader.ReverseScan(ran.StartKey, ran.EndKey, math.MaxInt64, e.startTS, e) @@ -211,6 +220,26 @@ func (e *tableScanExec) open() error { func (e *tableScanExec) next() (*chunk.Chunk, error) { result := <-e.result + + fmt.Println("run in table scan next()", e.paging) + // Update the range for coprocessor paging protocol. + if e.paging != nil && result.err == nil { + fmt.Println("e.paging is SET!") + if e.desc { + if result.lastProcessedKey != nil { + *e.paging = coprocessor.KeyRange{Start: result.lastProcessedKey} + } else { + *e.paging = coprocessor.KeyRange{Start: e.kvRanges[len(e.kvRanges)-1].StartKey} + } + } else { + if result.lastProcessedKey != nil { + *e.paging = coprocessor.KeyRange{End: result.lastProcessedKey.Next()} + } else { + *e.paging = coprocessor.KeyRange{End: e.kvRanges[len(e.kvRanges)-1].EndKey} + } + } + } + if result.chk == nil || result.err != nil { return nil, result.err } @@ -251,6 +280,9 @@ type indexScanExec struct { // if ExtraPhysTblIDCol is requested, fill in the physical table id in this column position physTblIDColIdx *int + // This is used to update the paging range result, updated in next(). + paging *coprocessor.KeyRange + chunkLastProcessedKeys []kv.Key } func (e *indexScanExec) SkipValue() bool { return false } @@ -291,6 +323,9 @@ func (e *indexScanExec) Process(key, value []byte) error { } if e.chk.IsFull() { e.chunks = append(e.chunks, e.chk) + if e.paging != nil { + e.chunkLastProcessedKeys = append(e.chunkLastProcessedKeys, key) + } e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, DefaultBatchSize) } return nil @@ -331,8 +366,31 @@ func (e *indexScanExec) next() (*chunk.Chunk, error) { if e.chkIdx < len(e.chunks) { e.chkIdx += 1 e.execSummary.updateOnlyRows(e.chunks[e.chkIdx-1].NumRows()) + if e.paging != nil { + if e.desc { + if e.chkIdx == len(e.chunks) { + *e.paging = coprocessor.KeyRange{Start: e.kvRanges[len(e.kvRanges)-1].StartKey} + } else { + *e.paging = coprocessor.KeyRange{Start: e.chunkLastProcessedKeys[e.chkIdx-1]} + } + } else { + if e.chkIdx == len(e.chunks) { + *e.paging = coprocessor.KeyRange{End: e.kvRanges[len(e.kvRanges)-1].EndKey} + } else { + *e.paging = coprocessor.KeyRange{End: e.chunkLastProcessedKeys[e.chkIdx-1].Next()} + } + } + } return e.chunks[e.chkIdx-1], nil } + + if e.paging != nil { + if e.desc { + *e.paging = coprocessor.KeyRange{Start: e.kvRanges[len(e.kvRanges)-1].StartKey} + } else { + *e.paging = coprocessor.KeyRange{End: e.kvRanges[len(e.kvRanges)-1].EndKey} + } + } return nil, nil } diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index 3e736cc7f53a5..b6387c037a000 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -15,6 +15,7 @@ package tikv import ( + "fmt" "context" "io" "sync/atomic" @@ -554,6 +555,7 @@ func (svr *Server) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeReques // Coprocessor implements implements the tikvpb.TikvServer interface. func (svr *Server) Coprocessor(_ context.Context, req *coprocessor.Request) (*coprocessor.Response, error) { + fmt.Println("HERE ... .Coprocessor???") reqCtx, err := newRequestCtx(svr, req.Context, "Coprocessor") if err != nil { return &coprocessor.Response{OtherError: convertToKeyError(err).String()}, nil diff --git a/testkit/mockstore.go b/testkit/mockstore.go index 9ab3493c3a70c..3b8308fad1694 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -17,6 +17,7 @@ package testkit import ( + "fmt" "testing" "time" @@ -46,7 +47,9 @@ func CreateMockStoreAndDomain(t testing.TB, opts ...mockstore.MockTiKVStoreOptio func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) (*domain.Domain, func()) { session.SetSchemaLease(lease) session.DisableStats4Test() + fmt.Println("before bootstrap session.....===========") dom, err := session.BootstrapSession(store) + fmt.Println("after bootstrap session.....===========") require.NoError(t, err) dom.SetStatsUpdating(true) diff --git a/tests/realtikvtest/sessiontest/session_test.go b/tests/realtikvtest/sessiontest/session_test.go index e3464f5cd6907..c3808cf8a15d2 100644 --- a/tests/realtikvtest/sessiontest/session_test.go +++ b/tests/realtikvtest/sessiontest/session_test.go @@ -1341,6 +1341,13 @@ func TestSetTxnScope(t *testing.T) { } func TestDoDDLJobQuit(t *testing.T) { + // This is required since mock tikv does not support paging. + failpoint.Enable("github.com/pingcap/tidb/store/copr/DisablePaging", `return`) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/DisablePaging")) + }() + + // test https://github.com/pingcap/tidb/issues/18714, imitate DM's use environment // use isolated store, because in below failpoint we will cancel its context store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.MockTiKV)) From a17a85ec9db60882f793c54c29c9f6ab01aa2949 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 8 Jun 2022 22:03:58 +0800 Subject: [PATCH 2/6] find a dynamic partition mode bug --- ddl/db_partition_test.go | 10 ++++++++++ executor/copr_cache_test.go | 1 - planner/core/cbo_test.go | 6 +++--- store/copr/coprocessor.go | 10 +++++++++- store/gcworker/gc_worker_test.go | 3 +-- store/mockstore/unistore/cophandler/cop_handler.go | 4 +--- store/mockstore/unistore/cophandler/mpp.go | 6 +++--- store/mockstore/unistore/cophandler/mpp_exec.go | 13 ++++++------- store/mockstore/unistore/tikv/server.go | 2 -- tests/realtikvtest/sessiontest/session_test.go | 1 - 10 files changed, 33 insertions(+), 23 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index bfd2dca76a6bf..c402a50338b7b 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -1415,9 +1415,15 @@ func TestAlterTableTruncatePartitionByList(t *testing.T) { partition p3 values in (5,null) );`) tk.MustExec(`insert into t values (1),(3),(5),(null)`) + + // TODO(tiancaiamao): remove this line after https://github.com/pingcap/tidb/issues/35242 + tk.MustExec("set @@tidb_partition_prune_mode = static") + oldTbl := external.GetTableByName(t, tk, "test", "t") tk.MustExec(`alter table t truncate partition p1`) + fmt.Println("===============") tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1", "5", "")) + fmt.Println("--------------") tbl := external.GetTableByName(t, tk, "test", "t") require.NotNil(t, tbl.Meta().Partition) part := tbl.Meta().Partition @@ -1447,6 +1453,10 @@ func TestAlterTableTruncatePartitionByListColumns(t *testing.T) { partition p1 values in ((3,'a'),(4,'b')), partition p3 values in ((5,'a'),(null,null)) );`) + + // TODO(tiancaiamao): remove this line after https://github.com/pingcap/tidb/issues/35242 + tk.MustExec("set @@tidb_partition_prune_mode = static") + tk.MustExec(`insert into t values (1,'a'),(3,'a'),(5,'a'),(null,null)`) oldTbl := external.GetTableByName(t, tk, "test", "t") tk.MustExec(`alter table t truncate partition p1`) diff --git a/executor/copr_cache_test.go b/executor/copr_cache_test.go index 1bf1a84b20d52..3f1ed4f1c3bed 100644 --- a/executor/copr_cache_test.go +++ b/executor/copr_cache_test.go @@ -56,7 +56,6 @@ func TestIntegrationCopCache(t *testing.T) { // TODO(tiancaiamao) update the test and support cop cache for paging. tk.MustExec("set @@tidb_enable_paging = off") - tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) tid := tblInfo.Meta().ID diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index 44add53a2698b..a135eadf959a0 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -822,9 +822,9 @@ func TestLimitIndexEstimation(t *testing.T) { SQL string Plan []string } - // When paging is used, there is a 'paging:true' makes the explain output differ. - // IndexLookUp 0.00 root paging:true - tk.MustExec("set @@tidb_enable_paging = off") + // When paging is used, there is a 'paging:true' makes the explain output differ. + // IndexLookUp 0.00 root paging:true + tk.MustExec("set @@tidb_enable_paging = off") analyzeSuiteData := core.GetAnalyzeSuiteData() analyzeSuiteData.GetTestCases(t, &input, &output) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 234752ce442b2..5428eb378049c 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -82,11 +82,13 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa logutil.BgLogger().Debug("send batch requests") return c.sendBatch(ctx, req, vars, option) } - + failpoint.Inject("DisablePaging", func(_ failpoint.Value) { req.Paging = false }) + fmt.Println("Send() ... kv ranges ==", req.KeyRanges) + if req.StoreType == kv.TiDB { // TiDB coprocessor doesn't support paging req.Paging = false @@ -107,6 +109,11 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa if err != nil { return copErrorResponse{err} } + + for _, t := range tasks { + fmt.Println("tasks ====", t.ranges) + } + it := &copIterator{ store: c.store, req: req, @@ -941,6 +948,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti if task.ranges.Len() == 0 { return nil, nil } + fmt.Println("calculate remain ranges ==== ", task.ranges) task.pagingSize = paging.GrowPagingSize(task.pagingSize) return []*copTask{task}, nil } diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index 33587cc2d85b1..f0d9317b82d45 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -101,7 +101,7 @@ func createGCWorkerSuite(t *testing.T) (s *mockGCWorkerSuite, clean func()) { } func createGCWorkerSuiteWithStoreType(t *testing.T, storeType mockstore.StoreType) (s *mockGCWorkerSuite, clean func()) { - s = new(mockGCWorkerSuite) + s = new(mockGCWorkerSuite) hijackClient := func(client tikv.Client) tikv.Client { s.client = &mockGCWorkerClient{Client: client} client = s.client @@ -954,7 +954,6 @@ func TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(t *testing.T) { s, clean := createGCWorkerSuiteWithStoreType(t, mockstore.MockTiKV) defer clean() - var ( firstAccess = true firstAccessRef = &firstAccess diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index 8526e0795c6a8..ea239374fd66a 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -186,7 +186,7 @@ func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest, pagingS } chunks, err := mppExecute(exec, dagCtx, dagReq, pagingSize) fmt.Println("paging range result ==", lastRange, dagCtx.keyRanges) - if lastRange != nil && len(lastRange.Start)==0 && len(lastRange.End) == 0 { + if lastRange != nil && len(lastRange.Start) == 0 && len(lastRange.End) == 0 { fmt.Println("what the fuck???") lastRange = nil } @@ -205,8 +205,6 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest, pagin return } - fmt.Println(" 666666666666 mpp execute?") - var totalRows uint64 var chk *chunk.Chunk fields := exec.getFieldTypes() diff --git a/store/mockstore/unistore/cophandler/mpp.go b/store/mockstore/unistore/cophandler/mpp.go index d7ba1b96ff02e..fb9cfeaf1aff1 100644 --- a/store/mockstore/unistore/cophandler/mpp.go +++ b/store/mockstore/unistore/cophandler/mpp.go @@ -58,7 +58,7 @@ type mppExecBuilder struct { dagCtx *dagContext counts []int64 ndvs []int64 - paging *coprocessor.KeyRange + paging *coprocessor.KeyRange } func (b *mppExecBuilder) buildMPPTableScan(pb *tipb.TableScan) (*tableScanExec, error) { @@ -74,7 +74,7 @@ func (b *mppExecBuilder) buildMPPTableScan(pb *tipb.TableScan) (*tableScanExec, counts: b.counts, ndvs: b.ndvs, desc: pb.Desc, - paging: b.paging, + paging: b.paging, } if b.dagCtx != nil { ts.lockStore = b.dagCtx.lockStore @@ -182,7 +182,7 @@ func (b *mppExecBuilder) buildIdxScan(pb *tipb.IndexScan) (*indexScanExec, error hdlStatus: hdlStatus, desc: pb.Desc, physTblIDColIdx: physTblIDColIdx, - paging: b.paging, + paging: b.paging, } return idxScan, nil } diff --git a/store/mockstore/unistore/cophandler/mpp_exec.go b/store/mockstore/unistore/cophandler/mpp_exec.go index 4bdf4670868d0..fceb877b146bd 100644 --- a/store/mockstore/unistore/cophandler/mpp_exec.go +++ b/store/mockstore/unistore/cophandler/mpp_exec.go @@ -15,9 +15,9 @@ package cophandler import ( - "fmt" "bytes" "encoding/binary" + "fmt" "io" "math" "sort" @@ -25,8 +25,8 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -102,9 +102,9 @@ func (b *baseMPPExec) stop() error { } type scanResult struct { - chk *chunk.Chunk + chk *chunk.Chunk lastProcessedKey kv.Key - err error + err error } type tableScanExec struct { @@ -186,7 +186,7 @@ func (e *tableScanExec) open() error { // close the channel when done scanning, so that next() will got nil chunk defer close(e.result) var i int - var ran kv.KeyRange + var ran kv.KeyRange for i, ran = range e.kvRanges { oldCnt := e.rowCnt if e.desc { @@ -224,7 +224,6 @@ func (e *tableScanExec) next() (*chunk.Chunk, error) { fmt.Println("run in table scan next()", e.paging) // Update the range for coprocessor paging protocol. if e.paging != nil && result.err == nil { - fmt.Println("e.paging is SET!") if e.desc { if result.lastProcessedKey != nil { *e.paging = coprocessor.KeyRange{Start: result.lastProcessedKey} @@ -281,7 +280,7 @@ type indexScanExec struct { // if ExtraPhysTblIDCol is requested, fill in the physical table id in this column position physTblIDColIdx *int // This is used to update the paging range result, updated in next(). - paging *coprocessor.KeyRange + paging *coprocessor.KeyRange chunkLastProcessedKeys []kv.Key } diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index b6387c037a000..3e736cc7f53a5 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -15,7 +15,6 @@ package tikv import ( - "fmt" "context" "io" "sync/atomic" @@ -555,7 +554,6 @@ func (svr *Server) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeReques // Coprocessor implements implements the tikvpb.TikvServer interface. func (svr *Server) Coprocessor(_ context.Context, req *coprocessor.Request) (*coprocessor.Response, error) { - fmt.Println("HERE ... .Coprocessor???") reqCtx, err := newRequestCtx(svr, req.Context, "Coprocessor") if err != nil { return &coprocessor.Response{OtherError: convertToKeyError(err).String()}, nil diff --git a/tests/realtikvtest/sessiontest/session_test.go b/tests/realtikvtest/sessiontest/session_test.go index c3808cf8a15d2..f726c1118b39f 100644 --- a/tests/realtikvtest/sessiontest/session_test.go +++ b/tests/realtikvtest/sessiontest/session_test.go @@ -1347,7 +1347,6 @@ func TestDoDDLJobQuit(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/DisablePaging")) }() - // test https://github.com/pingcap/tidb/issues/18714, imitate DM's use environment // use isolated store, because in below failpoint we will cancel its context store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.MockTiKV)) From 7fc9f67fdba0b91d09306e620c8bf696ce0e0b0a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 9 Jun 2022 14:16:53 +0800 Subject: [PATCH 3/6] fix issue 35242 --- ddl/db_partition_test.go | 6 ------ executor/builder.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index c402a50338b7b..ee054a2b82ec9 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -1416,9 +1416,6 @@ func TestAlterTableTruncatePartitionByList(t *testing.T) { );`) tk.MustExec(`insert into t values (1),(3),(5),(null)`) - // TODO(tiancaiamao): remove this line after https://github.com/pingcap/tidb/issues/35242 - tk.MustExec("set @@tidb_partition_prune_mode = static") - oldTbl := external.GetTableByName(t, tk, "test", "t") tk.MustExec(`alter table t truncate partition p1`) fmt.Println("===============") @@ -1454,9 +1451,6 @@ func TestAlterTableTruncatePartitionByListColumns(t *testing.T) { partition p3 values in ((5,'a'),(null,null)) );`) - // TODO(tiancaiamao): remove this line after https://github.com/pingcap/tidb/issues/35242 - tk.MustExec("set @@tidb_partition_prune_mode = static") - tk.MustExec(`insert into t values (1,'a'),(3,'a'),(5,'a'),(null,null)`) oldTbl := external.GetTableByName(t, tk, "test", "t") tk.MustExec(`alter table t truncate partition p1`) diff --git a/executor/builder.go b/executor/builder.go index 2973693b2c1f6..d64d316681b83 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3309,6 +3309,9 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E if len(partitions) == 0 { return &TableDualExec{baseExecutor: *ret.base()} } + + // Sort the partition is necessary to make the final multiple partition key ranges ordered. + sort.Sort(partitionSlice(partitions)) ret.kvRangeBuilder = kvRangeBuilderFromRangeAndPartition{ sctx: b.ctx, partitions: partitions, @@ -3428,6 +3431,9 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table usedPartition = append(usedPartition, p) } } + + // To make the final key ranges involving multiple partitions ordered. + sort.Sort(partitionSlice(usedPartition)) return usedPartition, true, contentPos, nil } @@ -4012,6 +4018,10 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte kvRanges = append(tmp, kvRanges...) } } + // The key ranges should be ordered. + sort.Slice(kvRanges, func(i, j int) bool { + return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0 + }) return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges) } @@ -4042,6 +4052,11 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte kvRanges = append(kvRanges, tmp...) } } + + // The key ranges should be ordered. + sort.Slice(kvRanges, func(i, j int) bool { + return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0 + }) return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges) } @@ -4070,6 +4085,21 @@ type kvRangeBuilderFromRangeAndPartition struct { partitions []table.PhysicalTable } +// partitionSlice implement the sort interface. +type partitionSlice []table.PhysicalTable + +func (s partitionSlice) Len() int { + return len(s) +} + +func (s partitionSlice) Less(i, j int) bool { + return s[i].GetPhysicalID() < s[j].GetPhysicalID() +} + +func (s partitionSlice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + func (h kvRangeBuilderFromRangeAndPartition) buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error) { ret := make([][]kv.KeyRange, 0, len(h.partitions)) pids := make([]int64, 0, len(h.partitions)) From 43df0867e381eac38c6a2556f39c07d04f253d07 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 9 Jun 2022 15:19:59 +0800 Subject: [PATCH 4/6] clean debug log --- ddl/db_partition_test.go | 4 ---- planner/core/find_best_task.go | 1 - privilege/privileges/cache.go | 1 - session/session.go | 2 -- statistics/handle/update_test.go | 1 - store/copr/coprocessor.go | 16 +--------------- store/gcworker/gc_worker_test.go | 1 + store/mockstore/mockcopr/executor_test.go | 1 + .../mockstore/unistore/cophandler/cop_handler.go | 5 +---- store/mockstore/unistore/cophandler/mpp_exec.go | 5 ----- testkit/mockstore.go | 3 --- 11 files changed, 4 insertions(+), 36 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index ee054a2b82ec9..bfd2dca76a6bf 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -1415,12 +1415,9 @@ func TestAlterTableTruncatePartitionByList(t *testing.T) { partition p3 values in (5,null) );`) tk.MustExec(`insert into t values (1),(3),(5),(null)`) - oldTbl := external.GetTableByName(t, tk, "test", "t") tk.MustExec(`alter table t truncate partition p1`) - fmt.Println("===============") tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1", "5", "")) - fmt.Println("--------------") tbl := external.GetTableByName(t, tk, "test", "t") require.NotNil(t, tbl.Meta().Partition) part := tbl.Meta().Partition @@ -1450,7 +1447,6 @@ func TestAlterTableTruncatePartitionByListColumns(t *testing.T) { partition p1 values in ((3,'a'),(4,'b')), partition p3 values in ((5,'a'),(null,null)) );`) - tk.MustExec(`insert into t values (1,'a'),(3,'a'),(5,'a'),(null,null)`) oldTbl := external.GetTableByName(t, tk, "test", "t") tk.MustExec(`alter table t truncate partition p1`) diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 1d3709faacfc2..c720879d15ed4 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -507,7 +507,6 @@ func (p *LogicalMemTable) findBestTask(prop *property.PhysicalProperty, planCoun if !prop.IsSortItemEmpty() || planCounter.Empty() { return invalidTask, 0, nil } - memTable := PhysicalMemTable{ DBName: p.DBName, Table: p.TableInfo, diff --git a/privilege/privileges/cache.go b/privilege/privileges/cache.go index ee0dfd17b1467..9e8198650d25b 100644 --- a/privilege/privileges/cache.go +++ b/privilege/privileges/cache.go @@ -567,7 +567,6 @@ func (p *MySQLPrivilege) LoadDefaultRoles(ctx sessionctx.Context) error { func (p *MySQLPrivilege) loadTable(sctx sessionctx.Context, sql string, decodeTableRow func(chunk.Row, []*ast.ResultField) error) error { ctx := context.Background() - fmt.Println(" sql ==", sql) rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql) if err != nil { return errors.Trace(err) diff --git a/session/session.go b/session/session.go index 5a41ef80cae25..e6b5997b78c4d 100644 --- a/session/session.go +++ b/session/session.go @@ -2882,9 +2882,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } if !config.GetGlobalConfig().Security.SkipGrantTable { - fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") err = dom.LoadPrivilegeLoop(ses[3]) - fmt.Println("after !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") if err != nil { return nil, err } diff --git a/statistics/handle/update_test.go b/statistics/handle/update_test.go index cfbcb55689766..0218e99dcce5a 100644 --- a/statistics/handle/update_test.go +++ b/statistics/handle/update_test.go @@ -1909,7 +1909,6 @@ func TestUnsignedFeedbackRanges(t *testing.T) { for _, test := range tests { table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr(test.tblName)) require.NoError(t, err) - fmt.Println(" sql ===", test.sql) testKit.MustQuery(test.sql) require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) require.NoError(t, h.DumpStatsFeedbackToKV()) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 5428eb378049c..19df570d6cdce 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -82,22 +82,16 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa logutil.BgLogger().Debug("send batch requests") return c.sendBatch(ctx, req, vars, option) } - failpoint.Inject("DisablePaging", func(_ failpoint.Value) { req.Paging = false }) - - fmt.Println("Send() ... kv ranges ==", req.KeyRanges) - if req.StoreType == kv.TiDB { - // TiDB coprocessor doesn't support paging + // coprocessor on TiDB doesn't support paging req.Paging = false - fmt.Println("set the xxxxxxx to false ... because store type ks tidb!!!!!") } if req.Tp != kv.ReqTypeDAG { // coprocessor request but type is not DAG req.Paging = false - fmt.Println("set the xxxxxxx to false ... because req type not dag!!!!!") } if req.Streaming && req.Paging { return copErrorResponse{errors.New("streaming and paging are both on")} @@ -109,11 +103,6 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa if err != nil { return copErrorResponse{err} } - - for _, t := range tasks { - fmt.Println("tasks ====", t.ranges) - } - it := &copIterator{ store: c.store, req: req, @@ -943,12 +932,10 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti return nil, errors.New("lastRange in paging should not be nil") } // calculate next ranges and grow the paging size - fmt.Println("handle cop paging result ... paging range ==", pagingRange) task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc) if task.ranges.Len() == 0 { return nil, nil } - fmt.Println("calculate remain ranges ==== ", task.ranges) task.pagingSize = paging.GrowPagingSize(task.pagingSize) return []*copTask{task}, nil } @@ -1176,7 +1163,6 @@ func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *copro left, _ := ranges.Split(split.Start) return left } - fmt.Println("calculate remain ...", split.End) _, right := ranges.Split(split.End) return right } diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index f0d9317b82d45..71c1f0ffea3e0 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -119,6 +119,7 @@ func createGCWorkerSuiteWithStoreType(t *testing.T, storeType mockstore.StoreTyp return c }), } + s.oracle = &oracles.MockOracle{} s.store, s.dom, clean = testkit.CreateMockStoreWithOracle(t, s.oracle, opts...) s.tikvStore = s.store.(tikv.Storage) diff --git a/store/mockstore/mockcopr/executor_test.go b/store/mockstore/mockcopr/executor_test.go index 7dc269c80acf7..3572c51b4e88c 100644 --- a/store/mockstore/mockcopr/executor_test.go +++ b/store/mockstore/mockcopr/executor_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index ea239374fd66a..3351f01f71888 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -185,9 +185,8 @@ func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest, pagingS return nil, nil, nil, nil, nil, err } chunks, err := mppExecute(exec, dagCtx, dagReq, pagingSize) - fmt.Println("paging range result ==", lastRange, dagCtx.keyRanges) if lastRange != nil && len(lastRange.Start) == 0 && len(lastRange.End) == 0 { - fmt.Println("what the fuck???") + // When should this happen, something is wrong? lastRange = nil } return exec, chunks, lastRange, counts, ndvs, err @@ -220,10 +219,8 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest, pagin case tipb.EncodeType_TypeChunk: chunks = useChunkEncoding(chk, dagReq, fields, chunks) if pagingSize > 0 { - fmt.Println("paging size ==", pagingSize) totalRows += uint64(chk.NumRows()) if totalRows > pagingSize { - fmt.Println("break ... total rows ==", totalRows) break } } diff --git a/store/mockstore/unistore/cophandler/mpp_exec.go b/store/mockstore/unistore/cophandler/mpp_exec.go index fceb877b146bd..eb105fdbd0883 100644 --- a/store/mockstore/unistore/cophandler/mpp_exec.go +++ b/store/mockstore/unistore/cophandler/mpp_exec.go @@ -17,7 +17,6 @@ package cophandler import ( "bytes" "encoding/binary" - "fmt" "io" "math" "sort" @@ -136,7 +135,6 @@ type tableScanExec struct { func (e *tableScanExec) SkipValue() bool { return false } func (e *tableScanExec) Process(key, value []byte) error { - // fmt.Println("process ... last key ==", kv.Key(key)) handle, err := tablecodec.DecodeRowKey(key) if err != nil { return errors.Trace(err) @@ -156,7 +154,6 @@ func (e *tableScanExec) Process(key, value []byte) error { select { case e.result <- scanResult{chk: e.chk, lastProcessedKey: kv.Key(key), err: nil}: e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, DefaultBatchSize) - // fmt.Println("run here!!!!") case <-e.done: return dbreader.ErrScanBreak } @@ -220,8 +217,6 @@ func (e *tableScanExec) open() error { func (e *tableScanExec) next() (*chunk.Chunk, error) { result := <-e.result - - fmt.Println("run in table scan next()", e.paging) // Update the range for coprocessor paging protocol. if e.paging != nil && result.err == nil { if e.desc { diff --git a/testkit/mockstore.go b/testkit/mockstore.go index 3b8308fad1694..9ab3493c3a70c 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -17,7 +17,6 @@ package testkit import ( - "fmt" "testing" "time" @@ -47,9 +46,7 @@ func CreateMockStoreAndDomain(t testing.TB, opts ...mockstore.MockTiKVStoreOptio func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) (*domain.Domain, func()) { session.SetSchemaLease(lease) session.DisableStats4Test() - fmt.Println("before bootstrap session.....===========") dom, err := session.BootstrapSession(store) - fmt.Println("after bootstrap session.....===========") require.NoError(t, err) dom.SetStatsUpdating(true) From 49212402686a54f502f6ccb4ed6da6208bf1bc43 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 9 Jun 2022 15:20:18 +0800 Subject: [PATCH 5/6] reset the paging default value to disabled --- sessionctx/variable/sysvar.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 423eb40851a31..84f6ca8a2bbdb 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1502,7 +1502,7 @@ var defaultSysVars = []*SysVar{ s.RegardNULLAsPoint = TiDBOptOn(val) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePaging, Value: On, Type: TypeBool, Hidden: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePaging, Value: Off, Type: TypeBool, Hidden: true, SetSession: func(s *SessionVars, val string) error { s.EnablePaging = TiDBOptOn(val) return nil }}, From 2291f310f858d4890e62d58b953b0b58d9931be5 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 9 Jun 2022 15:37:59 +0800 Subject: [PATCH 6/6] make golint happy --- store/mockstore/unistore/cophandler/mpp_exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/mockstore/unistore/cophandler/mpp_exec.go b/store/mockstore/unistore/cophandler/mpp_exec.go index eb105fdbd0883..941e9996f5acc 100644 --- a/store/mockstore/unistore/cophandler/mpp_exec.go +++ b/store/mockstore/unistore/cophandler/mpp_exec.go @@ -177,7 +177,7 @@ func (e *tableScanExec) open() error { } } e.chk = chunk.NewChunkWithCapacity(e.fieldTypes, DefaultBatchSize) - e.result = make(chan scanResult, 0) + e.result = make(chan scanResult, 1) e.done = make(chan struct{}) e.wg.Run(func() { // close the channel when done scanning, so that next() will got nil chunk