Skip to content

Commit

Permalink
Merge branch 'master' into fix-get-range
Browse files Browse the repository at this point in the history
  • Loading branch information
joccau authored Jul 13, 2022
2 parents a68aaeb + 0b427e1 commit 468765a
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 1 deletion.
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.SetResourceGroupTagger(sv.StmtCtx.GetResourceGroupTagger())
if sv.EnablePaging {
builder.SetPaging(true)
builder.Request.MinPagingSize = uint64(sv.MinPagingSize)
}
builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL
builder.RequestSource.RequestSourceType = sv.RequestSourceType
Expand Down
59 changes: 59 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"context"
"fmt"
"math/rand"
"regexp"
"runtime/pprof"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/paging"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
)
Expand Down Expand Up @@ -418,3 +421,59 @@ func TestPartitionTableIndexJoinIndexLookUp(t *testing.T) {
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ t1.* from t t1, t t2 use index(a) where t1.a=t2.b and " + cond).Sort().Check(result)
}
}

func TestCoprocessorPagingSize(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t_paging (a int, b int, key(a), key(b))")
nRows := 512
values := make([]string, 0, nRows)
for i := 0; i < nRows; i++ {
values = append(values, fmt.Sprintf("(%v, %v)", rand.Intn(nRows), rand.Intn(nRows)))
}
tk.MustExec(fmt.Sprintf("insert into t_paging values %v", strings.Join(values, ", ")))
tk.MustQuery("select @@tidb_min_paging_size").Check(testkit.Rows(strconv.FormatUint(paging.MinPagingSize, 10)))

// When the min paging size is small, we need more RPC roundtrip!
// Check 'rpc_num' in the execution information
//
// mysql> explain analyze select * from t_paging;
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | id |task | execution info |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, rpc_num: 10, rpc_time: 6.69ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15} |
// | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// 2 rows in set (0.01 sec)

getRPCNumFromExplain := func(rows [][]interface{}) (res uint64) {
re := regexp.MustCompile("rpc_num: ([0-9]+)")
for _, row := range rows {
buf := bytes.NewBufferString("")
_, _ = fmt.Fprintf(buf, "%s\n", row)
if matched := re.FindStringSubmatch(buf.String()); matched != nil {
require.Equal(t, len(matched), 2)
c, err := strconv.ParseUint(matched[1], 10, 64)
require.NoError(t, err)
return c
}
}
return res
}

// This is required here because only the chunk encoding collect the execution information and contains 'rpc_num'.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

tk.MustExec("set @@tidb_min_paging_size = 1")
rows := tk.MustQuery("explain analyze select * from t_paging").Rows()
rpcNum := getRPCNumFromExplain(rows)
require.Greater(t, rpcNum, uint64(2))

tk.MustExec("set @@tidb_min_paging_size = 1000")
rows = tk.MustQuery("explain analyze select * from t_paging").Rows()
rpcNum = getRPCNumFromExplain(rows)
require.Equal(t, rpcNum, uint64(1))
}
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ type Request struct {
ResourceGroupTagger tikvrpc.ResourceGroupTagger
// Paging indicates whether the request is a paging request.
Paging bool
// MinPagingSize is used when Paging is true.
MinPagingSize uint64
// RequestSource indicates whether the request is an internal request.
RequestSource util.RequestSource
}
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,7 @@ func NewSessionVars() *SessionVars {
IndexLookupSize: DefIndexLookupSize,
InitChunkSize: DefInitChunkSize,
MaxChunkSize: DefMaxChunkSize,
MinPagingSize: DefMinPagingSize,
}
vars.DMLBatchSize = DefDMLBatchSize
vars.AllowBatchCop = DefTiDBAllowBatchCop
Expand Down Expand Up @@ -2173,6 +2174,9 @@ type BatchSize struct {

// MaxChunkSize defines max row count of a Chunk during query execution.
MaxChunkSize int

// MinPagingSize defines the min size used by the coprocessor paging protocol.
MinPagingSize int
}

const (
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/paging"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/tikvutil"
"github.com/pingcap/tidb/util/tls"
Expand Down Expand Up @@ -1667,6 +1668,10 @@ var defaultSysVars = []*SysVar{
metrics.ToggleSimplifiedMode(TiDBOptOn(s))
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMinPagingSize, Value: strconv.Itoa(DefMinPagingSize), Type: TypeUnsigned, MinValue: 1, MaxValue: paging.MaxPagingSize, SetSession: func(s *SessionVars, val string) error {
s.MinPagingSize = tidbOptPositiveInt32(val, DefMinPagingSize)
return nil
}},
{Scope: ScopeSession, Name: TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0)
return nil
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/paging"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -363,6 +364,9 @@ const (
// TiDBInitChunkSize is used to control the init chunk size during query execution.
TiDBInitChunkSize = "tidb_init_chunk_size"

// TiDBMinPagingSize is used to control the min paging size in the coprocessor paging protocol.
TiDBMinPagingSize = "tidb_min_paging_size"

// TiDBEnableCascadesPlanner is used to control whether to enable the cascades planner.
TiDBEnableCascadesPlanner = "tidb_enable_cascades_planner"

Expand Down Expand Up @@ -818,6 +822,7 @@ const (
DefBatchCommit = false
DefCurretTS = 0
DefInitChunkSize = 32
DefMinPagingSize = int(paging.MinPagingSize)
DefMaxChunkSize = 1024
DefDMLBatchSize = 0
DefMaxPreparedStmtCount = -1
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ func TestVarsutil(t *testing.T) {
err = SetSessionSystemVar(v, TiDBTableCacheLease, "123")
require.Error(t, err)
require.Regexp(t, "'tidb_table_cache_lease' is a GLOBAL variable and should be set with SET GLOBAL", err.Error())

val, err = GetSessionOrGlobalSystemVar(v, TiDBMinPagingSize)
require.NoError(t, err)
require.Equal(t, strconv.Itoa(DefMinPagingSize), val)

err = SetSessionSystemVar(v, TiDBMinPagingSize, "123")
require.NoError(t, err)
require.Equal(t, v.MinPagingSize, 123)
}

func TestValidate(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
// the size will grow every round.
pagingSize := uint64(0)
if req.Paging {
pagingSize = paging.MinPagingSize
pagingSize = req.MinPagingSize
}
tasks = append(tasks, &copTask{
region: loc.Location.Region,
Expand Down Expand Up @@ -868,6 +868,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti
// So we finish here.
return nil, nil
}

// calculate next ranges and grow the paging size
task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc)
if task.ranges.Len() == 0 {
Expand Down
1 change: 1 addition & 0 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func TestBuildPagingTasks(t *testing.T) {

req := &kv.Request{}
req.Paging = true
req.MinPagingSize = paging.MinPagingSize
flashReq := &kv.Request{}
flashReq.StoreType = kv.TiFlash
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil)
Expand Down

0 comments on commit 468765a

Please sign in to comment.