diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 2bd7c5df65c04..e5d1e23f437a0 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -266,6 +266,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.SetResourceGroupTagger(sv.StmtCtx.GetResourceGroupTagger()) if sv.EnablePaging { builder.SetPaging(true) + builder.Request.MinPagingSize = uint64(sv.MinPagingSize) } builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL builder.RequestSource.RequestSourceType = sv.RequestSourceType diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 71156d890a9a9..32cb9818e2452 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -19,7 +19,9 @@ import ( "context" "fmt" "math/rand" + "regexp" "runtime/pprof" + "strconv" "strings" "testing" "time" @@ -34,6 +36,7 @@ import ( "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/mock" + "github.com/pingcap/tidb/util/paging" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" ) @@ -418,3 +421,59 @@ func TestPartitionTableIndexJoinIndexLookUp(t *testing.T) { tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ t1.* from t t1, t t2 use index(a) where t1.a=t2.b and " + cond).Sort().Check(result) } } + +func TestCoprocessorPagingSize(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t_paging (a int, b int, key(a), key(b))") + nRows := 512 + values := make([]string, 0, nRows) + for i := 0; i < nRows; i++ { + values = append(values, fmt.Sprintf("(%v, %v)", rand.Intn(nRows), rand.Intn(nRows))) + } + tk.MustExec(fmt.Sprintf("insert into t_paging values %v", strings.Join(values, ", "))) + tk.MustQuery("select @@tidb_min_paging_size").Check(testkit.Rows(strconv.FormatUint(paging.MinPagingSize, 10))) + + // When the min paging size is small, we need more RPC roundtrip! + // Check 'rpc_num' in the execution information + // + // mysql> explain analyze select * from t_paging; + // +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + // | id |task | execution info | + // +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + // | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, rpc_num: 10, rpc_time: 6.69ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15} | + // | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} | + // +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + // 2 rows in set (0.01 sec) + + getRPCNumFromExplain := func(rows [][]interface{}) (res uint64) { + re := regexp.MustCompile("rpc_num: ([0-9]+)") + for _, row := range rows { + buf := bytes.NewBufferString("") + _, _ = fmt.Fprintf(buf, "%s\n", row) + if matched := re.FindStringSubmatch(buf.String()); matched != nil { + require.Equal(t, len(matched), 2) + c, err := strconv.ParseUint(matched[1], 10, 64) + require.NoError(t, err) + return c + } + } + return res + } + + // This is required here because only the chunk encoding collect the execution information and contains 'rpc_num'. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + + tk.MustExec("set @@tidb_min_paging_size = 1") + rows := tk.MustQuery("explain analyze select * from t_paging").Rows() + rpcNum := getRPCNumFromExplain(rows) + require.Greater(t, rpcNum, uint64(2)) + + tk.MustExec("set @@tidb_min_paging_size = 1000") + rows = tk.MustQuery("explain analyze select * from t_paging").Rows() + rpcNum = getRPCNumFromExplain(rows) + require.Equal(t, rpcNum, uint64(1)) +} diff --git a/kv/kv.go b/kv/kv.go index d65d6498c12d1..5481d57ea0696 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -368,6 +368,8 @@ type Request struct { ResourceGroupTagger tikvrpc.ResourceGroupTagger // Paging indicates whether the request is a paging request. Paging bool + // MinPagingSize is used when Paging is true. + MinPagingSize uint64 // RequestSource indicates whether the request is an internal request. RequestSource util.RequestSource } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index add261a90d449..54eb0a55de423 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1433,6 +1433,7 @@ func NewSessionVars() *SessionVars { IndexLookupSize: DefIndexLookupSize, InitChunkSize: DefInitChunkSize, MaxChunkSize: DefMaxChunkSize, + MinPagingSize: DefMinPagingSize, } vars.DMLBatchSize = DefDMLBatchSize vars.AllowBatchCop = DefTiDBAllowBatchCop @@ -2173,6 +2174,9 @@ type BatchSize struct { // MaxChunkSize defines max row count of a Chunk during query execution. MaxChunkSize int + + // MinPagingSize defines the min size used by the coprocessor paging protocol. + MinPagingSize int } const ( diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index dba7781dfbe1f..fe83218698719 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" + "github.com/pingcap/tidb/util/paging" "github.com/pingcap/tidb/util/stmtsummary" "github.com/pingcap/tidb/util/tikvutil" "github.com/pingcap/tidb/util/tls" @@ -1667,6 +1668,10 @@ var defaultSysVars = []*SysVar{ metrics.ToggleSimplifiedMode(TiDBOptOn(s)) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBMinPagingSize, Value: strconv.Itoa(DefMinPagingSize), Type: TypeUnsigned, MinValue: 1, MaxValue: paging.MaxPagingSize, SetSession: func(s *SessionVars, val string) error { + s.MinPagingSize = tidbOptPositiveInt32(val, DefMinPagingSize) + return nil + }}, {Scope: ScopeSession, Name: TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0) return nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 54b49f840f494..ff3cfdb41dd4a 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/util/paging" "go.uber.org/atomic" ) @@ -363,6 +364,9 @@ const ( // TiDBInitChunkSize is used to control the init chunk size during query execution. TiDBInitChunkSize = "tidb_init_chunk_size" + // TiDBMinPagingSize is used to control the min paging size in the coprocessor paging protocol. + TiDBMinPagingSize = "tidb_min_paging_size" + // TiDBEnableCascadesPlanner is used to control whether to enable the cascades planner. TiDBEnableCascadesPlanner = "tidb_enable_cascades_planner" @@ -818,6 +822,7 @@ const ( DefBatchCommit = false DefCurretTS = 0 DefInitChunkSize = 32 + DefMinPagingSize = int(paging.MinPagingSize) DefMaxChunkSize = 1024 DefDMLBatchSize = 0 DefMaxPreparedStmtCount = -1 diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 4641a8c2f1e0d..49362e0fc3969 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -432,6 +432,14 @@ func TestVarsutil(t *testing.T) { err = SetSessionSystemVar(v, TiDBTableCacheLease, "123") require.Error(t, err) require.Regexp(t, "'tidb_table_cache_lease' is a GLOBAL variable and should be set with SET GLOBAL", err.Error()) + + val, err = GetSessionOrGlobalSystemVar(v, TiDBMinPagingSize) + require.NoError(t, err) + require.Equal(t, strconv.Itoa(DefMinPagingSize), val) + + err = SetSessionSystemVar(v, TiDBMinPagingSize, "123") + require.NoError(t, err) + require.Equal(t, v.MinPagingSize, 123) } func TestValidate(t *testing.T) { diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 29cc437b182e6..e7529fe6a5201 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -211,7 +211,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv // the size will grow every round. pagingSize := uint64(0) if req.Paging { - pagingSize = paging.MinPagingSize + pagingSize = req.MinPagingSize } tasks = append(tasks, &copTask{ region: loc.Location.Region, @@ -868,6 +868,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti // So we finish here. return nil, nil } + // calculate next ranges and grow the paging size task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc) if task.ranges.Len() == 0 { diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 7f2efa0e2db71..c32ce383374d1 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -509,6 +509,7 @@ func TestBuildPagingTasks(t *testing.T) { req := &kv.Request{} req.Paging = true + req.MinPagingSize = paging.MinPagingSize flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil)