Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[store/tikv] support batch coprocessor for TiFlash (#16030) #16226

Merged
merged 2 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func (builder *RequestBuilder) SetStoreType(storeType kv.StoreType) *RequestBuil
return builder
}

// SetAllowBatchCop sets `BatchCop` property.
func (builder *RequestBuilder) SetAllowBatchCop(batchCop bool) *RequestBuilder {
builder.Request.BatchCop = batchCop
return builder
}

func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel {
switch builder.Tp {
case kv.ReqTypeAnalyze:
Expand Down
21 changes: 21 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,26 @@ func containsLimit(execs []*tipb.Executor) bool {
return false
}

// When allow batch cop is 1, only agg / topN uses batch cop.
// When allow batch cop is 2, every query uses batch cop.
func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) {
if e.storeType != kv.TiFlash || e.keepOrder {
return
}
switch e.ctx.GetSessionVars().AllowBatchCop {
case 1:
for _, p := range v.TablePlans {
switch p.(type) {
case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN:
e.batchCop = true
}
}
case 2:
e.batchCop = true
}
return
}

func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) {
dagReq, streaming, err := b.constructDAGReq(v.TablePlans)
if err != nil {
Expand Down Expand Up @@ -2235,6 +2255,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
plans: v.TablePlans,
storeType: v.StoreType,
}
e.setBatchCop(v)
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand Down
3 changes: 3 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type TableReaderExecutor struct {
virtualColumnIndex []int
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType
// batchCop indicates whether use super batch coprocessor request, only works for TiFlash engine.
batchCop bool
}

// Open initialzes necessary variables for using this executor.
Expand Down Expand Up @@ -201,6 +203,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetAllowBatchCop(e.batchCop).
Build()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904
github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200407074807-436f1c8c4cff
github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904 h1:pMFUXvhJ62hX8m0Q4RsL7L+hSW1mAMG26So5eFMoAtI=
github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef h1:t+bOucRUlIlzW+6S32qG8ufu4iC8F8LEld4Rdhhp1Aw=
github.com/pingcap/kvproto v0.0.0-20200409034505-a5af800ca2ef/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ type Request struct {
Cacheable bool
// SchemaVer is for any schema-ful storage to validate schema correctness if necessary.
SchemaVar int64
// BatchCop indicates whether send batch coprocessor request to tiflash.
BatchCop bool
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1944,6 +1944,7 @@ var builtinGlobalVariable = []string{
variable.TiDBEnableNoopFuncs,
variable.TiDBEnableIndexMerge,
variable.TiDBTxnMode,
variable.TiDBAllowBatchCop,
variable.TiDBRowFormatVersion,
variable.TiDBEnableStmtSummary,
variable.TiDBStmtSummaryInternalQuery,
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ type SessionVars struct {
// This variable is currently not recommended to be turned on.
AllowWriteRowID bool

// AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join.
// If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop.
AllowBatchCop int

// CorrelationThreshold is the guard to enable row count estimation using column order correlation.
CorrelationThreshold float64

Expand Down Expand Up @@ -721,6 +725,8 @@ func NewSessionVars() *SessionVars {
}
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

vars.AllowBatchCop = DefTiDBAllowBatchCop

var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
enableChunkRPC = "1"
Expand Down Expand Up @@ -1080,6 +1086,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.IndexLookupJoinConcurrency = tidbOptPositiveInt32(val, DefIndexLookupJoinConcurrency)
case TiDBIndexJoinBatchSize:
s.IndexJoinBatchSize = tidbOptPositiveInt32(val, DefIndexJoinBatchSize)
case TiDBAllowBatchCop:
s.AllowBatchCop = int(tidbOptInt64(val, DefTiDBAllowBatchCop))
case TiDBIndexLookupSize:
s.IndexLookupSize = tidbOptPositiveInt32(val, DefIndexLookupSize)
case TiDBHashJoinConcurrency:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
{ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)},
{ScopeGlobal | ScopeSession, TiDBAllowBatchCop, strconv.Itoa(DefTiDBAllowBatchCop)},
{ScopeGlobal | ScopeSession, TiDBInitChunkSize, strconv.Itoa(DefInitChunkSize)},
{ScopeGlobal | ScopeSession, TiDBEnableCascadesPlanner, "0"},
{ScopeGlobal | ScopeSession, TiDBEnableIndexMerge, "0"},
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ const (
// TiDBMaxChunkSize is used to control the max chunk size during query execution.
TiDBMaxChunkSize = "tidb_max_chunk_size"

// TiDBAllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join.
// If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop.
TiDBAllowBatchCop = "tidb_allow_batch_cop"

// TiDBInitChunkSize is used to control the init chunk size during query execution.
TiDBInitChunkSize = "tidb_init_chunk_size"

Expand Down Expand Up @@ -442,6 +446,7 @@ const (
DefTiDBHashJoinConcurrency = 5
DefTiDBProjectionConcurrency = 4
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBAllowBatchCop = 1
DefTiDBTxnMode = ""
DefTiDBRowFormatV1 = 1
DefTiDBRowFormatV2 = 2
Expand Down
9 changes: 9 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,15 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
return value, nil
case TiDBAllowBatchCop:
v, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return value, ErrWrongTypeForVar.GenWithStackByArgs(name)
}
if v < 0 || v > 2 {
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
return value, nil
case TiDBOptCPUFactor,
TiDBOptCopCPUFactor,
TiDBOptNetworkFactor,
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.IndexSerialScanConcurrency, Equals, DefIndexSerialScanConcurrency)
c.Assert(vars.IndexLookupJoinConcurrency, Equals, DefIndexLookupJoinConcurrency)
c.Assert(vars.HashJoinConcurrency, Equals, DefTiDBHashJoinConcurrency)
c.Assert(vars.AllowBatchCop, Equals, DefTiDBAllowBatchCop)
c.Assert(vars.ProjectionConcurrency, Equals, int64(DefTiDBProjectionConcurrency))
c.Assert(vars.HashAggPartialConcurrency, Equals, DefTiDBHashAggPartialConcurrency)
c.Assert(vars.HashAggFinalConcurrency, Equals, DefTiDBHashAggFinalConcurrency)
Expand Down
Loading