Skip to content

Commit

Permalink
*: add tidb_analyze_distsql_scan_concurrency (#48829)
Browse files Browse the repository at this point in the history
close #48949
  • Loading branch information
hawkingrei authored Dec 5, 2023
1 parent 555048b commit 32cf9b9
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 5 deletions.
20 changes: 16 additions & 4 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2699,11 +2699,17 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(
PartitionName: task.PartitionName,
SampleRateReason: sampleRateReason,
}

var concurrency int
if b.ctx.GetSessionVars().InRestrictedSQL {
// In restricted SQL, we use the default value of DistSQLScanConcurrency. it is copied from tidb_sysproc_scan_concurrency.
concurrency = b.ctx.GetSessionVars().DistSQLScanConcurrency()
} else {
concurrency = b.ctx.GetSessionVars().AnalyzeDistSQLScanConcurrency()
}
base := baseAnalyzeExec{
ctx: b.ctx,
tableID: task.TableID,
concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency(),
concurrency: concurrency,
analyzePB: &tipb.AnalyzeReq{
Tp: tipb.AnalyzeType_TypeFullSampling,
Flags: sc.PushDownFlags(),
Expand Down Expand Up @@ -2833,11 +2839,17 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(
failpoint.Inject("injectAnalyzeSnapshot", func(val failpoint.Value) {
startTS = uint64(val.(int))
})

var concurrency int
if b.ctx.GetSessionVars().InRestrictedSQL {
// In restricted SQL, we use the default value of DistSQLScanConcurrency. it is copied from tidb_sysproc_scan_concurrency.
concurrency = b.ctx.GetSessionVars().DistSQLScanConcurrency()
} else {
concurrency = b.ctx.GetSessionVars().AnalyzeDistSQLScanConcurrency()
}
base := baseAnalyzeExec{
ctx: b.ctx,
tableID: task.TableID,
concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency(),
concurrency: concurrency,
analyzePB: &tipb.AnalyzeReq{
Tp: tipb.AnalyzeType_TypeColumn,
Flags: sc.PushDownFlags(),
Expand Down
14 changes: 13 additions & 1 deletion pkg/executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,19 @@ func TestValidateSetVar(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

err := tk.ExecToErr("set global tidb_distsql_scan_concurrency='fff';")
err := tk.ExecToErr("set global tidb_analyze_distsql_scan_concurrency='fff';")
require.True(t, terror.ErrorEqual(err, variable.ErrWrongTypeForVar), fmt.Sprintf("err %v", err))

tk.MustExec("set global tidb_analyze_distsql_scan_concurrency=-2;")
tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_analyze_distsql_scan_concurrency value: '-2'"))

err = tk.ExecToErr("set @@tidb_analyze_distsql_scan_concurrency='fff';")
require.True(t, terror.ErrorEqual(err, variable.ErrWrongTypeForVar), fmt.Sprintf("err %v", err))

tk.MustExec("set @@tidb_analyze_distsql_scan_concurrency=-2;")
tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_analyze_distsql_scan_concurrency value: '-2'"))

err = tk.ExecToErr("set global tidb_distsql_scan_concurrency='fff';")
require.True(t, terror.ErrorEqual(err, variable.ErrWrongTypeForVar), fmt.Sprintf("err %v", err))

tk.MustExec("set global tidb_distsql_scan_concurrency=-2;")
Expand Down
14 changes: 14 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,6 +2037,7 @@ func NewSessionVars(hctx HookContext) *SessionVars {
hashJoinConcurrency: DefTiDBHashJoinConcurrency,
projectionConcurrency: DefTiDBProjectionConcurrency,
distSQLScanConcurrency: DefDistSQLScanConcurrency,
analyzeDistSQLScanConcurrency: DefAnalyzeDistSQLScanConcurrency,
hashAggPartialConcurrency: DefTiDBHashAggPartialConcurrency,
hashAggFinalConcurrency: DefTiDBHashAggFinalConcurrency,
windowConcurrency: DefTiDBWindowConcurrency,
Expand Down Expand Up @@ -2740,6 +2741,9 @@ type Concurrency struct {
// distSQLScanConcurrency is the number of concurrent dist SQL scan worker.
distSQLScanConcurrency int

// analyzeDistSQLScanConcurrency is the number of concurrent dist SQL scan worker when to analyze.
analyzeDistSQLScanConcurrency int

// hashJoinConcurrency is the number of concurrent hash join outer worker.
// hashJoinConcurrency is deprecated, use ExecutorConcurrency instead.
hashJoinConcurrency int
Expand Down Expand Up @@ -2799,6 +2803,11 @@ func (c *Concurrency) SetDistSQLScanConcurrency(n int) {
c.distSQLScanConcurrency = n
}

// SetAnalyzeDistSQLScanConcurrency set the number of concurrent dist SQL scan worker when to analyze.
func (c *Concurrency) SetAnalyzeDistSQLScanConcurrency(n int) {
c.analyzeDistSQLScanConcurrency = n
}

// SetHashJoinConcurrency set the number of concurrent hash join outer worker.
func (c *Concurrency) SetHashJoinConcurrency(n int) {
c.hashJoinConcurrency = n
Expand Down Expand Up @@ -2865,6 +2874,11 @@ func (c *Concurrency) DistSQLScanConcurrency() int {
return c.distSQLScanConcurrency
}

// AnalyzeDistSQLScanConcurrency return the number of concurrent dist SQL scan worker when to analyze.
func (c *Concurrency) AnalyzeDistSQLScanConcurrency() int {
return c.analyzeDistSQLScanConcurrency
}

// HashJoinConcurrency return the number of concurrent hash join outer worker.
func (c *Concurrency) HashJoinConcurrency() int {
if c.hashJoinConcurrency != ConcurrencyUnset {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,10 @@ var defaultSysVars = []*SysVar{
s.distSQLScanConcurrency = tidbOptPositiveInt32(val, DefDistSQLScanConcurrency)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzeDistSQLScanConcurrency, Value: strconv.Itoa(DefAnalyzeDistSQLScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error {
s.analyzeDistSQLScanConcurrency = tidbOptPositiveInt32(val, DefAnalyzeDistSQLScanConcurrency)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptInSubqToJoinAndAgg, Value: BoolToOnOff(DefOptInSubqToJoinAndAgg), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.SetAllowInSubqToJoinAndAgg(TiDBOptOn(val))
return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ const (
// If the query has a LIMIT clause, high concurrency makes the system do much more work than needed.
TiDBDistSQLScanConcurrency = "tidb_distsql_scan_concurrency"

// TiDBAnalyzeDistSQLScanConcurrency is used to set the concurrency of a distsql scan task for analyze statement.
TiDBAnalyzeDistSQLScanConcurrency = "tidb_analyze_distsql_scan_concurrency"

// TiDBOptInSubqToJoinAndAgg is used to enable/disable the optimizer rule of rewriting IN subquery.
TiDBOptInSubqToJoinAndAgg = "tidb_opt_insubq_to_join_and_agg"

Expand Down Expand Up @@ -1141,6 +1144,7 @@ const (
DefIndexJoinBatchSize = 25000
DefIndexLookupSize = 20000
DefDistSQLScanConcurrency = 15
DefAnalyzeDistSQLScanConcurrency = 4
DefBuildStatsConcurrency = 2
DefBuildSamplingStatsConcurrency = 2
DefAutoAnalyzeRatio = 0.5
Expand Down

0 comments on commit 32cf9b9

Please sign in to comment.