diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index 2a13c5569d16b..a305c56ae2990 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -2699,11 +2699,17 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown( PartitionName: task.PartitionName, SampleRateReason: sampleRateReason, } - + var concurrency int + if b.ctx.GetSessionVars().InRestrictedSQL { + // In restricted SQL, we use the default value of DistSQLScanConcurrency. it is copied from tidb_sysproc_scan_concurrency. + concurrency = b.ctx.GetSessionVars().DistSQLScanConcurrency() + } else { + concurrency = b.ctx.GetSessionVars().AnalyzeDistSQLScanConcurrency() + } base := baseAnalyzeExec{ ctx: b.ctx, tableID: task.TableID, - concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency(), + concurrency: concurrency, analyzePB: &tipb.AnalyzeReq{ Tp: tipb.AnalyzeType_TypeFullSampling, Flags: sc.PushDownFlags(), @@ -2833,11 +2839,17 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown( failpoint.Inject("injectAnalyzeSnapshot", func(val failpoint.Value) { startTS = uint64(val.(int)) }) - + var concurrency int + if b.ctx.GetSessionVars().InRestrictedSQL { + // In restricted SQL, we use the default value of DistSQLScanConcurrency. it is copied from tidb_sysproc_scan_concurrency. + concurrency = b.ctx.GetSessionVars().DistSQLScanConcurrency() + } else { + concurrency = b.ctx.GetSessionVars().AnalyzeDistSQLScanConcurrency() + } base := baseAnalyzeExec{ ctx: b.ctx, tableID: task.TableID, - concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency(), + concurrency: concurrency, analyzePB: &tipb.AnalyzeReq{ Tp: tipb.AnalyzeType_TypeColumn, Flags: sc.PushDownFlags(), diff --git a/pkg/executor/set_test.go b/pkg/executor/set_test.go index 98146d8eb686c..682a5a3b897d9 100644 --- a/pkg/executor/set_test.go +++ b/pkg/executor/set_test.go @@ -976,7 +976,19 @@ func TestValidateSetVar(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) - err := tk.ExecToErr("set global tidb_distsql_scan_concurrency='fff';") + err := tk.ExecToErr("set global tidb_analyze_distsql_scan_concurrency='fff';") + require.True(t, terror.ErrorEqual(err, variable.ErrWrongTypeForVar), fmt.Sprintf("err %v", err)) + + tk.MustExec("set global tidb_analyze_distsql_scan_concurrency=-2;") + tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_analyze_distsql_scan_concurrency value: '-2'")) + + err = tk.ExecToErr("set @@tidb_analyze_distsql_scan_concurrency='fff';") + require.True(t, terror.ErrorEqual(err, variable.ErrWrongTypeForVar), fmt.Sprintf("err %v", err)) + + tk.MustExec("set @@tidb_analyze_distsql_scan_concurrency=-2;") + tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_analyze_distsql_scan_concurrency value: '-2'")) + + err = tk.ExecToErr("set global tidb_distsql_scan_concurrency='fff';") require.True(t, terror.ErrorEqual(err, variable.ErrWrongTypeForVar), fmt.Sprintf("err %v", err)) tk.MustExec("set global tidb_distsql_scan_concurrency=-2;") diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index 670ab863c6fd9..eac7ef14c9f9b 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -2037,6 +2037,7 @@ func NewSessionVars(hctx HookContext) *SessionVars { hashJoinConcurrency: DefTiDBHashJoinConcurrency, projectionConcurrency: DefTiDBProjectionConcurrency, distSQLScanConcurrency: DefDistSQLScanConcurrency, + analyzeDistSQLScanConcurrency: DefAnalyzeDistSQLScanConcurrency, hashAggPartialConcurrency: DefTiDBHashAggPartialConcurrency, hashAggFinalConcurrency: DefTiDBHashAggFinalConcurrency, windowConcurrency: DefTiDBWindowConcurrency, @@ -2740,6 +2741,9 @@ type Concurrency struct { // distSQLScanConcurrency is the number of concurrent dist SQL scan worker. distSQLScanConcurrency int + // analyzeDistSQLScanConcurrency is the number of concurrent dist SQL scan worker when to analyze. + analyzeDistSQLScanConcurrency int + // hashJoinConcurrency is the number of concurrent hash join outer worker. // hashJoinConcurrency is deprecated, use ExecutorConcurrency instead. hashJoinConcurrency int @@ -2799,6 +2803,11 @@ func (c *Concurrency) SetDistSQLScanConcurrency(n int) { c.distSQLScanConcurrency = n } +// SetAnalyzeDistSQLScanConcurrency set the number of concurrent dist SQL scan worker when to analyze. +func (c *Concurrency) SetAnalyzeDistSQLScanConcurrency(n int) { + c.analyzeDistSQLScanConcurrency = n +} + // SetHashJoinConcurrency set the number of concurrent hash join outer worker. func (c *Concurrency) SetHashJoinConcurrency(n int) { c.hashJoinConcurrency = n @@ -2865,6 +2874,11 @@ func (c *Concurrency) DistSQLScanConcurrency() int { return c.distSQLScanConcurrency } +// AnalyzeDistSQLScanConcurrency return the number of concurrent dist SQL scan worker when to analyze. +func (c *Concurrency) AnalyzeDistSQLScanConcurrency() int { + return c.analyzeDistSQLScanConcurrency +} + // HashJoinConcurrency return the number of concurrent hash join outer worker. func (c *Concurrency) HashJoinConcurrency() int { if c.hashJoinConcurrency != ConcurrencyUnset { diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 84e8c358b7491..532cd0fdab3a6 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -1671,6 +1671,10 @@ var defaultSysVars = []*SysVar{ s.distSQLScanConcurrency = tidbOptPositiveInt32(val, DefDistSQLScanConcurrency) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzeDistSQLScanConcurrency, Value: strconv.Itoa(DefAnalyzeDistSQLScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error { + s.analyzeDistSQLScanConcurrency = tidbOptPositiveInt32(val, DefAnalyzeDistSQLScanConcurrency) + return nil + }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptInSubqToJoinAndAgg, Value: BoolToOnOff(DefOptInSubqToJoinAndAgg), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { s.SetAllowInSubqToJoinAndAgg(TiDBOptOn(val)) return nil diff --git a/pkg/sessionctx/variable/tidb_vars.go b/pkg/sessionctx/variable/tidb_vars.go index 276fe055c7c17..2ef9ba709befb 100644 --- a/pkg/sessionctx/variable/tidb_vars.go +++ b/pkg/sessionctx/variable/tidb_vars.go @@ -298,6 +298,9 @@ const ( // If the query has a LIMIT clause, high concurrency makes the system do much more work than needed. TiDBDistSQLScanConcurrency = "tidb_distsql_scan_concurrency" + // TiDBAnalyzeDistSQLScanConcurrency is used to set the concurrency of a distsql scan task for analyze statement. + TiDBAnalyzeDistSQLScanConcurrency = "tidb_analyze_distsql_scan_concurrency" + // TiDBOptInSubqToJoinAndAgg is used to enable/disable the optimizer rule of rewriting IN subquery. TiDBOptInSubqToJoinAndAgg = "tidb_opt_insubq_to_join_and_agg" @@ -1141,6 +1144,7 @@ const ( DefIndexJoinBatchSize = 25000 DefIndexLookupSize = 20000 DefDistSQLScanConcurrency = 15 + DefAnalyzeDistSQLScanConcurrency = 4 DefBuildStatsConcurrency = 2 DefBuildSamplingStatsConcurrency = 2 DefAutoAnalyzeRatio = 0.5