Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add tidb_min_paging_size system variable #36107

Merged
merged 7 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.SetResourceGroupTagger(sv.StmtCtx.GetResourceGroupTagger())
if sv.EnablePaging {
builder.SetPaging(true)
builder.Request.MinPagingSize = uint64(sv.MinPagingSize)
}
builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL
builder.RequestSource.RequestSourceType = sv.RequestSourceType
Expand Down
59 changes: 59 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"context"
"fmt"
"math/rand"
"regexp"
"runtime/pprof"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/paging"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
)
Expand Down Expand Up @@ -418,3 +421,59 @@ func TestPartitionTableIndexJoinIndexLookUp(t *testing.T) {
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ t1.* from t t1, t t2 use index(a) where t1.a=t2.b and " + cond).Sort().Check(result)
}
}

func TestCoprocessorPagingSize(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t_paging (a int, b int, key(a), key(b))")
nRows := 512
values := make([]string, 0, nRows)
for i := 0; i < nRows; i++ {
values = append(values, fmt.Sprintf("(%v, %v)", rand.Intn(nRows), rand.Intn(nRows)))
}
tk.MustExec(fmt.Sprintf("insert into t_paging values %v", strings.Join(values, ", ")))
tk.MustQuery("select @@tidb_min_paging_size").Check(testkit.Rows(strconv.FormatUint(paging.MinPagingSize, 10)))

// When the min paging size is small, we need more RPC roundtrip!
// Check 'rpc_num' in the execution information
//
// mysql> explain analyze select * from t_paging;
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | id |task | execution info |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, rpc_num: 10, rpc_time: 6.69ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15} |
// | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// 2 rows in set (0.01 sec)

getRPCNumFromExplain := func(rows [][]interface{}) (res uint64) {
re := regexp.MustCompile("rpc_num: ([0-9]+)")
for _, row := range rows {
buf := bytes.NewBufferString("")
_, _ = fmt.Fprintf(buf, "%s\n", row)
if matched := re.FindStringSubmatch(buf.String()); matched != nil {
require.Equal(t, len(matched), 2)
c, err := strconv.ParseUint(matched[1], 10, 64)
require.NoError(t, err)
return c
}
}
return res
}

// This is required here because only the chunk encoding collect the execution information and contains 'rpc_num'.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

tk.MustExec("set @@tidb_min_paging_size = 1")
rows := tk.MustQuery("explain analyze select * from t_paging").Rows()
rpcNum := getRPCNumFromExplain(rows)
require.Greater(t, rpcNum, uint64(2))

tk.MustExec("set @@tidb_min_paging_size = 1000")
rows = tk.MustQuery("explain analyze select * from t_paging").Rows()
rpcNum = getRPCNumFromExplain(rows)
require.Equal(t, rpcNum, uint64(1))
}
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ type Request struct {
ResourceGroupTagger tikvrpc.ResourceGroupTagger
// Paging indicates whether the request is a paging request.
Paging bool
// MinPagingSize is used when Paging is true.
MinPagingSize uint64
// RequestSource indicates whether the request is an internal request.
RequestSource util.RequestSource
}
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,7 @@ func NewSessionVars() *SessionVars {
IndexLookupSize: DefIndexLookupSize,
InitChunkSize: DefInitChunkSize,
MaxChunkSize: DefMaxChunkSize,
MinPagingSize: DefMinPagingSize,
}
vars.DMLBatchSize = DefDMLBatchSize
vars.AllowBatchCop = DefTiDBAllowBatchCop
Expand Down Expand Up @@ -2173,6 +2174,9 @@ type BatchSize struct {

// MaxChunkSize defines max row count of a Chunk during query execution.
MaxChunkSize int

// MinPagingSize defines the min size used by the coprocessor paging protocol.
MinPagingSize int
}

const (
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/paging"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/tikvutil"
"github.com/pingcap/tidb/util/tls"
Expand Down Expand Up @@ -1667,6 +1668,10 @@ var defaultSysVars = []*SysVar{
metrics.ToggleSimplifiedMode(TiDBOptOn(s))
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMinPagingSize, Value: strconv.Itoa(DefMinPagingSize), Type: TypeUnsigned, MinValue: 1, MaxValue: paging.MaxPagingSize, SetSession: func(s *SessionVars, val string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is only for testing, is it necessary to be a global variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but I see some other variable use global | session, such as tidb_init_chunk_size, tidb_max_chunk_size

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can not come up with a scenario of set global, then no. TiDBInitChunkSize is historical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the benchbot, we can test with this:

python3 main.py gen-benchbot-case --email "[email protected]" --token 'tcmsp_XXX' --testbed_size "2xl" \
--bench_type sysbench --bench_sub_types "select_random_ranges" \
--duration=30m --threads 100 \
--arch amd64 \
--versions nightly \
--tidb_urls "http://fileserver.pingcap.net/download/builds/pingcap/tidb-check/pr/80b30c51fb509e267d1fca9ed5122b70ded1a172/centos7/tidb-server.tar.gz" \
--testbed_size "2xl" \
--tidb_configs "{prepared-plan-cache: {enabled: true}}" \
--tidb_globals "set global tidb_enable_paging = 1; set global tidb_min_paging_size = 128;" \
--tidb_globals "set global tidb_enable_paging = 1; set global tidb_min_paging_size = 256;" \
--tidb_globals "set global tidb_enable_paging = 1; set global tidb_min_paging_size = 512;" \
--tidb_globals "set global tidb_enable_paging = 0;"

--tidb_globals requires global variables @xhebox

s.MinPagingSize = tidbOptPositiveInt32(val, DefMinPagingSize)
return nil
}},
{Scope: ScopeSession, Name: TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0)
return nil
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/paging"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -363,6 +364,9 @@ const (
// TiDBInitChunkSize is used to control the init chunk size during query execution.
TiDBInitChunkSize = "tidb_init_chunk_size"

// TiDBMinPagingSize is used to control the min paging size in the coprocessor paging protocol.
TiDBMinPagingSize = "tidb_min_paging_size"

// TiDBEnableCascadesPlanner is used to control whether to enable the cascades planner.
TiDBEnableCascadesPlanner = "tidb_enable_cascades_planner"

Expand Down Expand Up @@ -818,6 +822,7 @@ const (
DefBatchCommit = false
DefCurretTS = 0
DefInitChunkSize = 32
DefMinPagingSize = int(paging.MinPagingSize)
DefMaxChunkSize = 1024
DefDMLBatchSize = 0
DefMaxPreparedStmtCount = -1
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ func TestVarsutil(t *testing.T) {
err = SetSessionSystemVar(v, TiDBTableCacheLease, "123")
require.Error(t, err)
require.Regexp(t, "'tidb_table_cache_lease' is a GLOBAL variable and should be set with SET GLOBAL", err.Error())

val, err = GetSessionOrGlobalSystemVar(v, TiDBMinPagingSize)
require.NoError(t, err)
require.Equal(t, strconv.Itoa(DefMinPagingSize), val)

err = SetSessionSystemVar(v, TiDBMinPagingSize, "123")
require.NoError(t, err)
require.Equal(t, v.MinPagingSize, 123)
}

func TestValidate(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
// the size will grow every round.
pagingSize := uint64(0)
if req.Paging {
pagingSize = paging.MinPagingSize
pagingSize = req.MinPagingSize
}
tasks = append(tasks, &copTask{
region: loc.Location.Region,
Expand Down Expand Up @@ -868,6 +868,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti
// So we finish here.
return nil, nil
}

// calculate next ranges and grow the paging size
task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc)
if task.ranges.Len() == 0 {
Expand Down
1 change: 1 addition & 0 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func TestBuildPagingTasks(t *testing.T) {

req := &kv.Request{}
req.Paging = true
req.MinPagingSize = paging.MinPagingSize
flashReq := &kv.Request{}
flashReq.StoreType = kv.TiFlash
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil)
Expand Down