diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index 91b9f394e6fd5..f4564169cbc64 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -975,7 +975,8 @@ var defaultSysVars = []*SysVar{ return nil }, }, - {Scope: ScopeGlobal, Name: TiDBEnableAutoAnalyze, Value: BoolToOnOff(DefTiDBEnableAutoAnalyze), Type: TypeBool, + { + Scope: ScopeGlobal, Name: TiDBEnableAutoAnalyze, Value: BoolToOnOff(DefTiDBEnableAutoAnalyze), Type: TypeBool, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { return BoolToOnOff(RunAutoAnalyze.Load()), nil }, @@ -983,6 +984,15 @@ var defaultSysVars = []*SysVar{ RunAutoAnalyze.Store(TiDBOptOn(val)) return nil }, + }, { + Scope: ScopeGlobal, Name: TiDBEnableAutoAnalyzePriorityQueue, Value: BoolToOnOff(DefTiDBEnableAutoAnalyzePriorityQueue), Type: TypeBool, + GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { + return BoolToOnOff(EnableAutoAnalyzePriorityQueue.Load()), nil + }, + SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + EnableAutoAnalyzePriorityQueue.Store(TiDBOptOn(val)) + return nil + }, }, {Scope: ScopeGlobal, Name: TiDBGOGCTunerThreshold, Value: strconv.FormatFloat(DefTiDBGOGCTunerThreshold, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { diff --git a/pkg/sessionctx/variable/tidb_vars.go b/pkg/sessionctx/variable/tidb_vars.go index dc4462535b412..92f6edbd32589 100644 --- a/pkg/sessionctx/variable/tidb_vars.go +++ b/pkg/sessionctx/variable/tidb_vars.go @@ -995,6 +995,8 @@ const ( TiDBMemQuotaAnalyze = "tidb_mem_quota_analyze" // TiDBEnableAutoAnalyze determines whether TiDB executes automatic analysis. TiDBEnableAutoAnalyze = "tidb_enable_auto_analyze" + // TiDBEnableAutoAnalyzePriorityQueue determines whether TiDB executes automatic analysis with priority queue. + TiDBEnableAutoAnalyzePriorityQueue = "tidb_enable_auto_analyze_priority_queue" // TiDBMemOOMAction indicates what operation TiDB perform when a single SQL statement exceeds // the memory quota specified by tidb_mem_quota_query and cannot be spilled to disk. TiDBMemOOMAction = "tidb_mem_oom_action" @@ -1357,6 +1359,7 @@ const ( DefTiDBBatchDMLIgnoreError = false DefTiDBMemQuotaAnalyze = -1 DefTiDBEnableAutoAnalyze = true + DefTiDBEnableAutoAnalyzePriorityQueue = false DefTiDBMemOOMAction = "CANCEL" DefTiDBMaxAutoAnalyzeTime = 12 * 60 * 60 DefTiDBEnablePrepPlanCache = true @@ -1478,19 +1481,20 @@ const ( // Process global variables. var ( - ProcessGeneralLog = atomic.NewBool(false) - RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze) - GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays)) - QueryLogMaxLen = atomic.NewInt32(DefTiDBQueryLogMaxLen) - EnablePProfSQLCPU = atomic.NewBool(false) - EnableBatchDML = atomic.NewBool(false) - EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM) - ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount - ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize - ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency - ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit - ddlReorgRowFormat int64 = DefTiDBRowFormatV2 - maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount + ProcessGeneralLog = atomic.NewBool(false) + RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze) + EnableAutoAnalyzePriorityQueue = atomic.NewBool(DefTiDBEnableAutoAnalyzePriorityQueue) + GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays)) + QueryLogMaxLen = atomic.NewInt32(DefTiDBQueryLogMaxLen) + EnablePProfSQLCPU = atomic.NewBool(false) + EnableBatchDML = atomic.NewBool(false) + EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM) + ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount + ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize + ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency + ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit + ddlReorgRowFormat int64 = DefTiDBRowFormatV2 + maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. DDLSlowOprThreshold = config.GetGlobalConfig().Instance.DDLSlowOprThreshold ForcePriority = int32(DefTiDBForcePriority) diff --git a/pkg/statistics/handle/autoanalyze/BUILD.bazel b/pkg/statistics/handle/autoanalyze/BUILD.bazel index 1b67c6cd08df5..edb582fef634e 100644 --- a/pkg/statistics/handle/autoanalyze/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/sessionctx/variable", "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/exec", + "//pkg/statistics/handle/autoanalyze/refresher", "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/types", @@ -33,7 +34,7 @@ go_test( timeout = "short", srcs = ["autoanalyze_test.go"], flaky = True, - shard_count = 11, + shard_count = 12, deps = [ ":autoanalyze", "//pkg/domain/infosync", diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index c7c49ba801897..a5422881939c2 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher" "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" @@ -290,6 +291,15 @@ func HandleAutoAnalyze( ) } }() + if variable.EnableAutoAnalyzePriorityQueue.Load() { + r := refresher.NewRefresher(statsHandle, sysProcTracker) + err := r.RebuildTableAnalysisJobQueue() + if err != nil { + statslogutil.StatsLogger().Error("rebuild table analysis job queue failed", zap.Error(err)) + return false + } + return r.PickOneTableAndAnalyzeByPriority() + } parameters := exec.GetAutoAnalyzeParameters(sctx) autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go index cb799c0aaf947..bdc003c3fb3d8 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go @@ -41,6 +41,28 @@ import ( "go.uber.org/mock/gomock" ) +func TestEnableAutoAnalyzePriorityQueue(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int)") + tk.MustExec("insert into t values (1)") + // Enable auto analyze priority queue. + tk.MustExec("SET GLOBAL tidb_enable_auto_analyze_priority_queue=ON") + require.True(t, variable.EnableAutoAnalyzePriorityQueue.Load()) + h := dom.StatsHandle() + err := h.HandleDDLEvent(<-h.DDLEventCh()) + require.NoError(t, err) + require.NoError(t, h.DumpStatsDeltaToKV(true)) + is := dom.InfoSchema() + require.NoError(t, h.Update(is)) + exec.AutoAnalyzeMinCnt = 0 + defer func() { + exec.AutoAnalyzeMinCnt = 1000 + }() + require.True(t, dom.StatsHandle().HandleAutoAnalyze()) +} + func TestAutoAnalyzeLockedTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel index e3b073514a1c6..5c9e7bb61a225 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel @@ -31,10 +31,10 @@ go_test( "main_test.go", "queue_test.go", ], - embed = [":priorityqueue"], flaky = True, shard_count = 17, deps = [ + ":priorityqueue", "//pkg/parser/model", "//pkg/session", "//pkg/sessionctx", diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/calculator.go b/pkg/statistics/handle/autoanalyze/priorityqueue/calculator.go index 3e4357c0d9bd7..7989e0b46a476 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/calculator.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/calculator.go @@ -18,9 +18,9 @@ import "math" const ( // EventNone represents no special event. - eventNone = 0.0 + EventNone = 0.0 // EventNewIndex represents a special event for newly added indexes. - eventNewIndex = 2.0 + EventNewIndex = 2.0 ) // TODO: make these configurable. @@ -58,13 +58,15 @@ func (pc *PriorityCalculator) CalculateWeight(job *TableAnalysisJob) float64 { return changeRatioWeight*math.Log10(1+changeRatio) + sizeWeight*(1-math.Log10(1+job.TableSize)) + analysisInterval*math.Log10(1+math.Sqrt(job.LastAnalysisDuration.Seconds())) + - pc.getSpecialEvent(job) + pc.GetSpecialEvent(job) } -func (*PriorityCalculator) getSpecialEvent(job *TableAnalysisJob) float64 { +// GetSpecialEvent returns the special event weight. +// Exported for testing purposes. +func (*PriorityCalculator) GetSpecialEvent(job *TableAnalysisJob) float64 { if job.HasNewlyAddedIndex() { - return eventNewIndex + return EventNewIndex } - return eventNone + return EventNone } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/calculator_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/calculator_test.go index 17de40d6fcaff..b7ed3dacec87a 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/calculator_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/calculator_test.go @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package priorityqueue +package priorityqueue_test import ( "testing" "time" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" "github.com/stretchr/testify/require" ) @@ -29,8 +30,8 @@ type testData struct { } func TestCalculateWeight(t *testing.T) { - // Note: all group are sorted by weight in ascending order. - pc := NewPriorityCalculator() + // Note: all groups are sorted by weight in ascending order. + pc := priorityqueue.NewPriorityCalculator() // Only focus on change percentage. Bigger change percentage, higher weight. changePercentageGroup := []testData{ { @@ -106,10 +107,10 @@ func TestCalculateWeight(t *testing.T) { // testWeightCalculation is a helper function to test the weight calculation. // It will check if the weight is increasing for each test data group. -func testWeightCalculation(t *testing.T, pc *PriorityCalculator, group []testData) { +func testWeightCalculation(t *testing.T, pc *priorityqueue.PriorityCalculator, group []testData) { prevWeight := -1.0 for _, tc := range group { - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ ChangePercentage: tc.ChangePercentage, TableSize: tc.TableSize, LastAnalysisDuration: tc.LastAnalysisDuration, @@ -122,23 +123,23 @@ func testWeightCalculation(t *testing.T, pc *PriorityCalculator, group []testDat } func TestGetSpecialEvent(t *testing.T) { - pc := NewPriorityCalculator() + pc := priorityqueue.NewPriorityCalculator() - jobWithIndex := &TableAnalysisJob{ + jobWithIndex := &priorityqueue.TableAnalysisJob{ PartitionIndexes: map[string][]string{ "index1": {"p1", "p2"}, }, } - require.Equal(t, eventNewIndex, pc.getSpecialEvent(jobWithIndex)) + require.Equal(t, priorityqueue.EventNewIndex, pc.GetSpecialEvent(jobWithIndex)) - jobWithIndex = &TableAnalysisJob{ + jobWithIndex = &priorityqueue.TableAnalysisJob{ Indexes: []string{"index1"}, } - require.Equal(t, eventNewIndex, pc.getSpecialEvent(jobWithIndex)) + require.Equal(t, priorityqueue.EventNewIndex, pc.GetSpecialEvent(jobWithIndex)) - jobWithoutIndex := &TableAnalysisJob{ + jobWithoutIndex := &priorityqueue.TableAnalysisJob{ PartitionIndexes: map[string][]string{}, Indexes: []string{}, } - require.Equal(t, eventNone, pc.getSpecialEvent(jobWithoutIndex)) + require.Equal(t, priorityqueue.EventNone, pc.GetSpecialEvent(jobWithoutIndex)) } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go b/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go index 9b4ddd2dedbb1..95f09bfddbcff 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/interval.go @@ -21,8 +21,8 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/util" ) -// noRecord is used to indicate that there is no related record in mysql.analyze_jobs. -const noRecord = -1 +// NoRecord is used to indicate that there is no related record in mysql.analyze_jobs. +const NoRecord = -1 // justFailed is used to indicate that the last analysis has just failed. const justFailed = 0 @@ -80,9 +80,9 @@ const lastFailedDurationQueryForPartition = ` JOIN mysql.analyze_jobs aj ON aj.id = latest_failures.max_id; ` -// getAverageAnalysisDuration returns the average duration of the last 5 successful analyses for each specified partition. +// GetAverageAnalysisDuration returns the average duration of the last 5 successful analyses for each specified partition. // If there are no successful analyses, it returns 0. -func getAverageAnalysisDuration( +func GetAverageAnalysisDuration( sctx sessionctx.Context, schema, tableName string, partitionNames ...string, @@ -99,25 +99,25 @@ func getAverageAnalysisDuration( rows, _, err := util.ExecRows(sctx, query, params...) if err != nil { - return noRecord, err + return NoRecord, err } // NOTE: if there are no successful analyses, we return 0. if len(rows) == 0 || rows[0].IsNull(0) { - return noRecord, nil + return NoRecord, nil } avgDuration := rows[0].GetMyDecimal(0) duration, err := avgDuration.ToFloat64() if err != nil { - return noRecord, err + return NoRecord, err } return time.Duration(duration) * time.Second, nil } -// getLastFailedAnalysisDuration returns the duration since the last failed analysis. +// GetLastFailedAnalysisDuration returns the duration since the last failed analysis. // If there is no failed analysis, it returns 0. -func getLastFailedAnalysisDuration( +func GetLastFailedAnalysisDuration( sctx sessionctx.Context, schema, tableName string, partitionNames ...string, @@ -134,12 +134,12 @@ func getLastFailedAnalysisDuration( rows, _, err := util.ExecRows(sctx, query, params...) if err != nil { - return noRecord, err + return NoRecord, err } // NOTE: if there are no failed analyses, we return 0. if len(rows) == 0 || rows[0].IsNull(0) { - return noRecord, nil + return NoRecord, nil } lastFailedDuration := rows[0].GetUint64(0) if lastFailedDuration == 0 { diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/interval_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/interval_test.go index a8014c3cc98a5..980c41fc5ef23 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/interval_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/interval_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package priorityqueue +package priorityqueue_test import ( "testing" @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) @@ -32,26 +33,26 @@ func TestGetAverageAnalysisDuration(t *testing.T) { // Empty table. se := tk.Session() sctx := se.(sessionctx.Context) - avgDuration, err := getAverageAnalysisDuration( + avgDuration, err := priorityqueue.GetAverageAnalysisDuration( sctx, "example_schema", "example_table", "example_partition", ) require.NoError(t, err) - require.Equal(t, time.Duration(noRecord), avgDuration) + require.Equal(t, time.Duration(priorityqueue.NoRecord), avgDuration) initJobs(tk) // Partitioned table. insertMultipleFinishedJobs(tk, "example_table", "example_partition") // Only one partition. - avgDuration, err = getAverageAnalysisDuration( + avgDuration, err = priorityqueue.GetAverageAnalysisDuration( sctx, "example_schema", "example_table", "example_partition", ) require.NoError(t, err) require.Equal(t, time.Duration(3600)*time.Second, avgDuration) // Multiple partitions. - avgDuration, err = getAverageAnalysisDuration( + avgDuration, err = priorityqueue.GetAverageAnalysisDuration( sctx, "example_schema", "example_table", "example_partition", "example_partition1", ) @@ -59,7 +60,7 @@ func TestGetAverageAnalysisDuration(t *testing.T) { require.Equal(t, time.Duration(3600)*time.Second, avgDuration) // Non-partitioned table. insertMultipleFinishedJobs(tk, "example_table1", "") - avgDuration, err = getAverageAnalysisDuration(sctx, "example_schema", "example_table1") + avgDuration, err = priorityqueue.GetAverageAnalysisDuration(sctx, "example_schema", "example_table1") require.NoError(t, err) require.Equal(t, time.Duration(3600)*time.Second, avgDuration) } @@ -97,26 +98,26 @@ func TestGetLastFailedAnalysisDuration(t *testing.T) { // Empty table. se := tk.Session() sctx := se.(sessionctx.Context) - lastFailedDuration, err := getLastFailedAnalysisDuration( + lastFailedDuration, err := priorityqueue.GetLastFailedAnalysisDuration( sctx, "example_schema", "example_table", "example_partition", ) require.NoError(t, err) - require.Equal(t, time.Duration(noRecord), lastFailedDuration) + require.Equal(t, time.Duration(priorityqueue.NoRecord), lastFailedDuration) initJobs(tk) // Partitioned table. insertFailedJob(tk, "example_schema", "example_table", "example_partition") insertFailedJob(tk, "example_schema", "example_table", "example_partition1") // Only one partition. - lastFailedDuration, err = getLastFailedAnalysisDuration( + lastFailedDuration, err = priorityqueue.GetLastFailedAnalysisDuration( sctx, "example_schema", "example_table", "example_partition", ) require.NoError(t, err) require.GreaterOrEqual(t, lastFailedDuration, time.Duration(24)*time.Hour) // Multiple partitions. - lastFailedDuration, err = getLastFailedAnalysisDuration( + lastFailedDuration, err = priorityqueue.GetLastFailedAnalysisDuration( sctx, "example_schema", "example_table", "example_partition", "example_partition1", ) @@ -124,7 +125,7 @@ func TestGetLastFailedAnalysisDuration(t *testing.T) { require.GreaterOrEqual(t, lastFailedDuration, time.Duration(24)*time.Hour) // Non-partitioned table. insertFailedJob(tk, "example_schema", "example_table1", "") - lastFailedDuration, err = getLastFailedAnalysisDuration(sctx, "example_schema", "example_table1") + lastFailedDuration, err = priorityqueue.GetLastFailedAnalysisDuration(sctx, "example_schema", "example_table1") require.NoError(t, err) require.GreaterOrEqual(t, lastFailedDuration, time.Duration(24)*time.Hour) } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index 86faf05fb2d8c..dd5ae973e7127 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -124,7 +124,7 @@ func isValidToAnalyze( partitionNames ...string, ) (bool, string) { lastFailedAnalysisDuration, err := - getLastFailedAnalysisDuration(sctx, schema, table, partitionNames...) + GetLastFailedAnalysisDuration(sctx, schema, table, partitionNames...) if err != nil { statslogutil.StatsLogger().Warn( "Fail to get last failed analysis duration", @@ -137,7 +137,7 @@ func isValidToAnalyze( } averageAnalysisDuration, err := - getAverageAnalysisDuration(sctx, schema, table, partitionNames...) + GetAverageAnalysisDuration(sctx, schema, table, partitionNames...) if err != nil { statslogutil.StatsLogger().Warn( "Fail to get average analysis duration", @@ -163,7 +163,7 @@ func isValidToAnalyze( // Failed analysis duration is less than 2 times the average analysis duration. // Skip this table to avoid too much failed analysis. - onlyFailedAnalysis := lastFailedAnalysisDuration != noRecord && averageAnalysisDuration == noRecord + onlyFailedAnalysis := lastFailedAnalysisDuration != NoRecord && averageAnalysisDuration == NoRecord if onlyFailedAnalysis && lastFailedAnalysisDuration < defaultFailedAnalysisWaitTime { statslogutil.StatsLogger().Info( fmt.Sprintf("Skip analysis because the last failed analysis duration is less than %v", defaultFailedAnalysisWaitTime), @@ -176,7 +176,7 @@ func isValidToAnalyze( return false, fmt.Sprintf("last failed analysis duration is less than %v", defaultFailedAnalysisWaitTime) } // Failed analysis duration is less than 2 times the average analysis duration. - meetSkipCondition := lastFailedAnalysisDuration != noRecord && + meetSkipCondition := lastFailedAnalysisDuration != NoRecord && lastFailedAnalysisDuration < 2*averageAnalysisDuration if meetSkipCondition { statslogutil.StatsLogger().Info( @@ -199,12 +199,18 @@ func (j *TableAnalysisJob) Execute( sysProcTracker sessionctx.SysProcTracker, ) error { return statsutil.CallWithSCtx(statsHandle.SPool(), func(sctx sessionctx.Context) error { - j.analyze(sctx, statsHandle, sysProcTracker) + j.Analyze(sctx, statsHandle, sysProcTracker) return nil }) } -func (j *TableAnalysisJob) analyze(sctx sessionctx.Context, statsHandle statstypes.StatsHandle, sysProcTracker sessionctx.SysProcTracker) { +// Analyze performs analysis on the specified table, indexes, partitions, or partition indexes. +// Exported for testing purposes. +func (j *TableAnalysisJob) Analyze( + sctx sessionctx.Context, + statsHandle statstypes.StatsHandle, + sysProcTracker sessionctx.SysProcTracker, +) { switch j.getAnalyzeType() { case analyzeTable: j.analyzeTable(sctx, statsHandle, sysProcTracker) @@ -213,7 +219,7 @@ func (j *TableAnalysisJob) analyze(sctx sessionctx.Context, statsHandle statstyp case analyzePartition: j.analyzePartitions(sctx, statsHandle, sysProcTracker) case analyzePartitionIndex: - j.analyzePartitionIndexes(sctx, statsHandle, sysProcTracker) + j.AnalyzePartitionIndexes(sctx, statsHandle, sysProcTracker) } } @@ -235,7 +241,7 @@ func (j *TableAnalysisJob) analyzeTable( statsHandle statstypes.StatsHandle, sysProcTracker sessionctx.SysProcTracker, ) { - sql, params := j.genSQLForAnalyzeTable() + sql, params := j.GenSQLForAnalyzeTable() exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...) } @@ -245,7 +251,7 @@ func (j *TableAnalysisJob) analyzeIndexes( sysProcTracker sessionctx.SysProcTracker, ) { for _, index := range j.Indexes { - sql, params := j.genSQLForAnalyzeIndex(index) + sql, params := j.GenSQLForAnalyzeIndex(index) exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...) } } @@ -277,8 +283,8 @@ func (j *TableAnalysisJob) analyzePartitions( } } -// analyzePartitionIndexes performs analysis on the specified partition indexes. -func (j *TableAnalysisJob) analyzePartitionIndexes( +// AnalyzePartitionIndexes performs analysis on the specified partition indexes. +func (j *TableAnalysisJob) AnalyzePartitionIndexes( sctx sessionctx.Context, statsHandle statstypes.StatsHandle, sysProcTracker sessionctx.SysProcTracker, @@ -318,16 +324,16 @@ func getPartitionSQL(prefix, suffix string, numPartitions int) string { return sqlBuilder.String() } -// genSQLForAnalyzeTable generates the SQL for analyzing the specified table. -func (j *TableAnalysisJob) genSQLForAnalyzeTable() (string, []any) { +// GenSQLForAnalyzeTable generates the SQL for analyzing the specified table. +func (j *TableAnalysisJob) GenSQLForAnalyzeTable() (string, []any) { sql := "analyze table %n.%n" params := []any{j.TableSchema, j.TableName} return sql, params } -// genSQLForAnalyzeIndex generates the SQL for analyzing the specified index. -func (j *TableAnalysisJob) genSQLForAnalyzeIndex(index string) (string, []any) { +// GenSQLForAnalyzeIndex generates the SQL for analyzing the specified index. +func (j *TableAnalysisJob) GenSQLForAnalyzeIndex(index string) (string, []any) { sql := "analyze table %n.%n index %n" params := []any{j.TableSchema, j.TableName, index} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job_test.go index f8a31116e1098..a3e143abed2d4 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package priorityqueue +package priorityqueue_test import ( "testing" @@ -20,12 +20,13 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) func TestGenSQLForAnalyzeTable(t *testing.T) { - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ TableSchema: "test_schema", TableName: "test_table", } @@ -33,14 +34,14 @@ func TestGenSQLForAnalyzeTable(t *testing.T) { expectedSQL := "analyze table %n.%n" expectedParams := []any{"test_schema", "test_table"} - sql, params := job.genSQLForAnalyzeTable() + sql, params := job.GenSQLForAnalyzeTable() require.Equal(t, expectedSQL, sql) require.Equal(t, expectedParams, params) } func TestGenSQLForAnalyzeIndex(t *testing.T) { - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ TableSchema: "test_schema", TableName: "test_table", } @@ -50,7 +51,7 @@ func TestGenSQLForAnalyzeIndex(t *testing.T) { expectedSQL := "analyze table %n.%n index %n" expectedParams := []any{"test_schema", "test_table", index} - sql, params := job.genSQLForAnalyzeIndex(index) + sql, params := job.GenSQLForAnalyzeIndex(index) require.Equal(t, expectedSQL, sql) require.Equal(t, expectedParams, params) @@ -63,7 +64,7 @@ func TestAnalyzeTable(t *testing.T) { tk.MustExec("create table t (a int, b int, index idx(a))") tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3)") - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ TableSchema: "test", TableName: "t", TableStatsVer: 2, @@ -80,7 +81,7 @@ func TestAnalyzeTable(t *testing.T) { tblStats := handle.GetTableStats(tbl.Meta()) require.True(t, tblStats.Pseudo) - job.analyze(sctx, handle, dom.SysProcTracker()) + job.Analyze(sctx, handle, dom.SysProcTracker()) // Check the result of analyze. is = dom.InfoSchema() tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) @@ -96,7 +97,7 @@ func TestAnalyzeIndexes(t *testing.T) { tk.MustExec("create table t (a int, b int, index idx(a))") tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3)") - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ TableSchema: "test", TableName: "t", Indexes: []string{"idx"}, @@ -112,7 +113,7 @@ func TestAnalyzeIndexes(t *testing.T) { tblStats := handle.GetTableStats(tbl.Meta()) require.False(t, tblStats.Indices[1].IsAnalyzed()) - job.analyze(sctx, handle, dom.SysProcTracker()) + job.Analyze(sctx, handle, dom.SysProcTracker()) // Check the result of analyze. is = dom.InfoSchema() tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) @@ -122,7 +123,7 @@ func TestAnalyzeIndexes(t *testing.T) { require.True(t, tblStats.Indices[1].IsAnalyzed()) // Add a new index. tk.MustExec("alter table t add index idx2(b)") - job = &TableAnalysisJob{ + job = &priorityqueue.TableAnalysisJob{ TableSchema: "test", TableName: "t", Indexes: []string{"idx", "idx2"}, @@ -136,7 +137,7 @@ func TestAnalyzeIndexes(t *testing.T) { tblStats = handle.GetTableStats(tbl.Meta()) require.Len(t, tblStats.Indices, 1) - job.analyze(sctx, handle, dom.SysProcTracker()) + job.Analyze(sctx, handle, dom.SysProcTracker()) // Check the result of analyze. is = dom.InfoSchema() tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) @@ -153,7 +154,7 @@ func TestAnalyzePartitions(t *testing.T) { tk.MustExec("create table t (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))") tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3)") - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ TableSchema: "test", TableName: "t", Partitions: []string{"p0", "p1"}, @@ -172,7 +173,7 @@ func TestAnalyzePartitions(t *testing.T) { tblStats := handle.GetPartitionStats(tbl.Meta(), pid) require.True(t, tblStats.Pseudo) - job.analyze(sctx, handle, dom.SysProcTracker()) + job.Analyze(sctx, handle, dom.SysProcTracker()) // Check the result of analyze. is = dom.InfoSchema() tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) @@ -190,7 +191,7 @@ func TestAnalyzePartitionIndexes(t *testing.T) { tk.MustExec("create table t (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))") tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3)") - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ TableSchema: "test", TableName: "t", PartitionIndexes: map[string][]string{ @@ -214,7 +215,7 @@ func TestAnalyzePartitionIndexes(t *testing.T) { require.NotNil(t, tblStats.Indices[1]) require.False(t, tblStats.Indices[1].IsAnalyzed()) - job.analyzePartitionIndexes(sctx, handle, dom.SysProcTracker()) + job.AnalyzePartitionIndexes(sctx, handle, dom.SysProcTracker()) // Check the result of analyze. is = dom.InfoSchema() tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) @@ -241,7 +242,7 @@ func TestIsValidToAnalyze(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec(session.CreateAnalyzeJobs) - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ TableSchema: "example_schema", TableName: "example_table1", Weight: 2, @@ -281,7 +282,7 @@ func TestIsValidToAnalyzeWhenOnlyHasFailedAnalysisRecords(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec(session.CreateAnalyzeJobs) - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ TableSchema: "example_schema", TableName: "example_table1", Weight: 2, @@ -311,7 +312,7 @@ func TestIsValidToAnalyzeForPartitionedTba(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec(session.CreateAnalyzeJobs) - job := &TableAnalysisJob{ + job := &priorityqueue.TableAnalysisJob{ TableSchema: "example_schema", TableName: "example_table", Weight: 2, @@ -356,12 +357,12 @@ func TestIsValidToAnalyzeForPartitionedTba(t *testing.T) { func TestStringer(t *testing.T) { tests := []struct { name string - job *TableAnalysisJob + job *priorityqueue.TableAnalysisJob want string }{ { name: "analyze table", - job: &TableAnalysisJob{ + job: &priorityqueue.TableAnalysisJob{ TableID: 1, TableSchema: "test_schema", TableName: "test_table", @@ -373,7 +374,7 @@ func TestStringer(t *testing.T) { }, { name: "analyze table index", - job: &TableAnalysisJob{ + job: &priorityqueue.TableAnalysisJob{ TableID: 2, TableSchema: "test_schema", TableName: "test_table", @@ -386,7 +387,7 @@ func TestStringer(t *testing.T) { }, { name: "analyze partitions", - job: &TableAnalysisJob{ + job: &priorityqueue.TableAnalysisJob{ TableID: 3, TableSchema: "test_schema", TableName: "test_table", @@ -399,7 +400,7 @@ func TestStringer(t *testing.T) { }, { name: "analyze partition indexes", - job: &TableAnalysisJob{ + job: &priorityqueue.TableAnalysisJob{ TableID: 4, TableSchema: "test_schema", TableName: "test_table", diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/main_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/main_test.go index d4c40b7725e0c..91c7de7a6817a 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/main_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/main_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package priorityqueue +package priorityqueue_test import ( "testing" diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index 380f2c0b37c1a..85a81341686e7 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -18,13 +18,13 @@ import "container/heap" // AnalysisPriorityQueue is a priority queue for TableAnalysisJobs. type AnalysisPriorityQueue struct { - inner *analysisInnerQueue + inner *AnalysisInnerQueue } // NewAnalysisPriorityQueue creates a new AnalysisPriorityQueue. func NewAnalysisPriorityQueue() *AnalysisPriorityQueue { q := &AnalysisPriorityQueue{ - inner: &analysisInnerQueue{}, + inner: &AnalysisInnerQueue{}, } heap.Init(q.inner) return q @@ -45,28 +45,29 @@ func (apq *AnalysisPriorityQueue) Len() int { return apq.inner.Len() } -// An analysisInnerQueue implements heap.Interface and holds TableAnalysisJobs. -type analysisInnerQueue []*TableAnalysisJob +// An AnalysisInnerQueue implements heap.Interface and holds TableAnalysisJobs. +// Exported for testing purposes. You should not use this directly. +type AnalysisInnerQueue []*TableAnalysisJob // Implement the sort.Interface methods for the priority queue. -func (aq analysisInnerQueue) Len() int { return len(aq) } -func (aq analysisInnerQueue) Less(i, j int) bool { +func (aq AnalysisInnerQueue) Len() int { return len(aq) } +func (aq AnalysisInnerQueue) Less(i, j int) bool { // We want Pop to give us the highest, not lowest, priority, so we use greater than here. return aq[i].Weight > aq[j].Weight } -func (aq analysisInnerQueue) Swap(i, j int) { +func (aq AnalysisInnerQueue) Swap(i, j int) { aq[i], aq[j] = aq[j], aq[i] } // Push adds an item to the priority queue. -func (aq *analysisInnerQueue) Push(x any) { +func (aq *AnalysisInnerQueue) Push(x any) { item := x.(*TableAnalysisJob) *aq = append(*aq, item) } // Pop removes the highest priority item from the queue. -func (aq *analysisInnerQueue) Pop() any { +func (aq *AnalysisInnerQueue) Pop() any { old := *aq n := len(old) item := old[n-1] diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go index 4acf129237635..9c5e4979e58ac 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go @@ -12,23 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -package priorityqueue +package priorityqueue_test import ( "container/heap" "testing" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" "github.com/stretchr/testify/require" ) func TestAnalysisInnerQueue(t *testing.T) { // Test data - job1 := &TableAnalysisJob{Weight: 10} - job2 := &TableAnalysisJob{Weight: 5} - job3 := &TableAnalysisJob{Weight: 15} + job1 := &priorityqueue.TableAnalysisJob{Weight: 10} + job2 := &priorityqueue.TableAnalysisJob{Weight: 5} + job3 := &priorityqueue.TableAnalysisJob{Weight: 15} // Create an empty priority queue - queue := analysisInnerQueue{} + queue := priorityqueue.AnalysisInnerQueue{} // Push items into the queue heap.Push(&queue, job1) @@ -48,23 +49,23 @@ func TestAnalysisInnerQueue(t *testing.T) { func TestPushPopAnalysisInnerQueue(t *testing.T) { // Test Push and Pop operations together - queue := analysisInnerQueue{} - heap.Push(&queue, &TableAnalysisJob{Weight: 10}) - heap.Push(&queue, &TableAnalysisJob{Weight: 5}) + queue := priorityqueue.AnalysisInnerQueue{} + heap.Push(&queue, &priorityqueue.TableAnalysisJob{Weight: 10}) + heap.Push(&queue, &priorityqueue.TableAnalysisJob{Weight: 5}) - poppedItem := heap.Pop(&queue).(*TableAnalysisJob) + poppedItem := heap.Pop(&queue).(*priorityqueue.TableAnalysisJob) require.Equal(t, float64(10), poppedItem.Weight, "Popped item should have weight 10") require.Equal(t, 1, queue.Len(), "After Pop, length of the queue should be 1") } func TestAnalysisPriorityQueue(t *testing.T) { // Test data - job1 := &TableAnalysisJob{Weight: 10} - job2 := &TableAnalysisJob{Weight: 5} - job3 := &TableAnalysisJob{Weight: 15} + job1 := &priorityqueue.TableAnalysisJob{Weight: 10} + job2 := &priorityqueue.TableAnalysisJob{Weight: 5} + job3 := &priorityqueue.TableAnalysisJob{Weight: 15} // Create a priority queue - queue := NewAnalysisPriorityQueue() + queue := priorityqueue.NewAnalysisPriorityQueue() // Push items into the queue queue.Push(job1) diff --git a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel index d600e3bf5b8e9..9245626b6b976 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel @@ -27,10 +27,10 @@ go_test( name = "refresher_test", timeout = "short", srcs = ["refresher_test.go"], - embed = [":refresher"], flaky = True, shard_count = 9, deps = [ + ":refresher", "//pkg/parser/model", "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/exec", diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index cc46fc3c202a0..6322ec96f54a9 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -47,37 +47,40 @@ type Refresher struct { statsHandle statstypes.StatsHandle sysProcTracker sessionctx.SysProcTracker - jobs *priorityqueue.AnalysisPriorityQueue + // Jobs is the priority queue of analysis jobs. + // Exported for testing purposes. + Jobs *priorityqueue.AnalysisPriorityQueue } // NewRefresher creates a new Refresher and starts the goroutine. func NewRefresher( statsHandle statstypes.StatsHandle, sysProcTracker sessionctx.SysProcTracker, -) (*Refresher, error) { +) *Refresher { r := &Refresher{ statsHandle: statsHandle, sysProcTracker: sysProcTracker, - jobs: priorityqueue.NewAnalysisPriorityQueue(), + Jobs: priorityqueue.NewAnalysisPriorityQueue(), } - return r, nil + return r } -func (r *Refresher) pickOneTableAndAnalyzeByPriority() { +// PickOneTableAndAnalyzeByPriority picks one table and analyzes it by priority. +func (r *Refresher) PickOneTableAndAnalyzeByPriority() bool { se, err := r.statsHandle.SPool().Get() if err != nil { statslogutil.StatsLogger().Error( "Get session context failed", zap.Error(err), ) - return + return false } defer r.statsHandle.SPool().Put(se) sctx := se.(sessionctx.Context) // Pick the table with the highest weight. - for r.jobs.Len() > 0 { - job := r.jobs.Pop() + for r.Jobs.Len() > 0 { + job := r.Jobs.Pop() if valid, failReason := job.IsValidToAnalyze( sctx, ); !valid { @@ -104,13 +107,18 @@ func (r *Refresher) pickOneTableAndAnalyzeByPriority() { ) } // Only analyze one table each time. - return + return true } + statslogutil.StatsLogger().Info( + "No table to analyze", + ) + return false } -func (r *Refresher) rebuildTableAnalysisJobQueue() error { +// RebuildTableAnalysisJobQueue rebuilds the priority queue of analysis jobs. +func (r *Refresher) RebuildTableAnalysisJobQueue() error { // Reset the priority queue. - r.jobs = priorityqueue.NewAnalysisPriorityQueue() + r.Jobs = priorityqueue.NewAnalysisPriorityQueue() if err := statsutil.CallWithSCtx( r.statsHandle.SPool(), @@ -169,11 +177,11 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error { return } // Push the job onto the queue. - r.jobs.Push(job) + r.Jobs.Push(job) } // No partitions, analyze the whole table. if pi == nil { - job := createTableAnalysisJob( + job := CreateTableAnalysisJob( sctx, db, tblInfo, @@ -198,7 +206,7 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error { // If the prune mode is static, we need to analyze every partition as a separate table. if pruneMode == variable.Static { for _, def := range pi.Definitions { - job := createTableAnalysisJob( + job := CreateTableAnalysisJob( sctx, db, tblInfo, @@ -234,7 +242,8 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error { return nil } -func createTableAnalysisJob( +// CreateTableAnalysisJob creates a TableAnalysisJob for the physical table. +func CreateTableAnalysisJob( sctx sessionctx.Context, tableSchema string, tblInfo *model.TableInfo, @@ -245,10 +254,10 @@ func createTableAnalysisJob( tableStatsVer := sctx.GetSessionVars().AnalyzeVersion statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer) - changePercentage := calculateChangePercentage(tblStats, autoAnalyzeRatio) + changePercentage := CalculateChangePercentage(tblStats, autoAnalyzeRatio) tableSize := calculateTableSize(tblInfo, tblStats) - lastAnalysisDuration := getTableLastAnalyzeDuration(tblStats, currentTs) - indexes := checkIndexesNeedAnalyze(tblInfo, tblStats) + lastAnalysisDuration := GetTableLastAnalyzeDuration(tblStats, currentTs) + indexes := CheckIndexesNeedAnalyze(tblInfo, tblStats) // No need to analyze. if changePercentage == 0 && len(indexes) == 0 { @@ -269,7 +278,9 @@ func createTableAnalysisJob( return job } -func calculateChangePercentage( +// CalculateChangePercentage calculates the change percentage of the table +// based on the change count and the analysis count. +func CalculateChangePercentage( tblStats *statistics.Table, autoAnalyzeRatio float64, ) float64 { @@ -307,7 +318,8 @@ func calculateTableSize( return tblCnt * colCnt } -func getTableLastAnalyzeDuration( +// GetTableLastAnalyzeDuration gets the duration since the last analysis of the table. +func GetTableLastAnalyzeDuration( tblStats *statistics.Table, currentTs uint64, ) time.Duration { @@ -345,7 +357,8 @@ func findLastAnalyzeTime( return oracle.GetTimeFromTS(maxVersion) } -func checkIndexesNeedAnalyze( +// CheckIndexesNeedAnalyze checks if the indexes of the table need to be analyzed. +func CheckIndexesNeedAnalyze( tblInfo *model.TableInfo, tblStats *statistics.Table, ) []string { @@ -380,14 +393,14 @@ func createTableAnalysisJobForPartitions( tableStatsVer := sctx.GetSessionVars().AnalyzeVersion statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer) - averageChangePercentage, avgSize, minLastAnalyzeDuration, partitionNames := calculateIndicatorsForPartitions( + averageChangePercentage, avgSize, minLastAnalyzeDuration, partitionNames := CalculateIndicatorsForPartitions( tblInfo, partitionStats, defs, autoAnalyzeRatio, currentTs, ) - partitionIndexes := checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable( + partitionIndexes := CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable( tblInfo, defs, partitionStats, @@ -412,12 +425,12 @@ func createTableAnalysisJobForPartitions( return job } -// calculateIndicatorsForPartitions calculates the average change percentage, +// CalculateIndicatorsForPartitions calculates the average change percentage, // average size and average last analyze duration for the partitions that meet the threshold. // Change percentage is the ratio of the number of modified rows to the total number of rows. // Size is the product of the number of rows and the number of columns. // Last analyze duration is the duration since the last analyze. -func calculateIndicatorsForPartitions( +func CalculateIndicatorsForPartitions( tblInfo *model.TableInfo, partitionStats map[int64]*statistics.Table, defs []model.PartitionDefinition, @@ -438,7 +451,7 @@ func calculateIndicatorsForPartitions( for _, def := range defs { tblStats := partitionStats[def.ID] - changePercent := calculateChangePercentage(tblStats, autoAnalyzeRatio) + changePercent := CalculateChangePercentage(tblStats, autoAnalyzeRatio) // No need to analyze the partition because it doesn't meet the threshold or stats are not loaded yet. if changePercent == 0 { continue @@ -447,7 +460,7 @@ func calculateIndicatorsForPartitions( totalChangePercent += changePercent // size = count * cols totalSize += float64(tblStats.RealtimeCount) * cols - lastAnalyzeDuration := getTableLastAnalyzeDuration(tblStats, currentTs) + lastAnalyzeDuration := GetTableLastAnalyzeDuration(tblStats, currentTs) totalLastAnalyzeDuration += lastAnalyzeDuration partitionNames = append(partitionNames, def.Name.O) count++ @@ -463,10 +476,10 @@ func calculateIndicatorsForPartitions( return avgChange, avgSize, avgLastAnalyzeDuration, partitionNames } -// checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable checks if the indexes of the partitioned table need to be analyzed. +// CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable checks if the indexes of the partitioned table need to be analyzed. // It returns a map from index name to the names of the partitions that need to be analyzed. // NOTE: This is only for newly added indexes. -func checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable( +func CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable( tblInfo *model.TableInfo, defs []model.PartitionDefinition, partitionStats map[int64]*statistics.Table, diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go index e924ea47bfdcd..b7d77c15f8161 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package refresher +package refresher_test import ( "math" @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -41,10 +42,9 @@ func TestPickOneTableAndAnalyzeByPriority(t *testing.T) { handle := dom.StatsHandle() sysProcTracker := dom.SysProcTracker() - r, err := NewRefresher(handle, sysProcTracker) - require.NoError(t, err) + r := refresher.NewRefresher(handle, sysProcTracker) // No jobs in the queue. - r.pickOneTableAndAnalyzeByPriority() + r.PickOneTableAndAnalyzeByPriority() // The table is not analyzed. is := dom.InfoSchema() tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) @@ -61,7 +61,7 @@ func TestPickOneTableAndAnalyzeByPriority(t *testing.T) { ChangePercentage: 0.5, Weight: 1, } - r.jobs.Push(job1) + r.Jobs.Push(job1) tbl2, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) require.NoError(t, err) job2 := &priorityqueue.TableAnalysisJob{ @@ -71,8 +71,8 @@ func TestPickOneTableAndAnalyzeByPriority(t *testing.T) { ChangePercentage: 0.5, Weight: 0.9, } - r.jobs.Push(job2) - r.pickOneTableAndAnalyzeByPriority() + r.Jobs.Push(job2) + r.PickOneTableAndAnalyzeByPriority() // The table is analyzed. tblStats1 = handle.GetPartitionStats(tbl1.Meta(), pid1) require.False(t, tblStats1.Pseudo) @@ -81,7 +81,7 @@ func TestPickOneTableAndAnalyzeByPriority(t *testing.T) { tblStats2 := handle.GetPartitionStats(tbl2.Meta(), pid2) require.True(t, tblStats2.Pseudo) // Do one more round. - r.pickOneTableAndAnalyzeByPriority() + r.PickOneTableAndAnalyzeByPriority() // t2 is analyzed. pid2 = tbl2.Meta().GetPartitionInfo().Definitions[0].ID tblStats2 = handle.GetPartitionStats(tbl2.Meta(), pid2) @@ -100,10 +100,9 @@ func TestPickOneTableAndAnalyzeByPriorityWithFailedAnalysis(t *testing.T) { handle := dom.StatsHandle() sysProcTracker := dom.SysProcTracker() - r, err := NewRefresher(handle, sysProcTracker) - require.NoError(t, err) + r := refresher.NewRefresher(handle, sysProcTracker) // No jobs in the queue. - r.pickOneTableAndAnalyzeByPriority() + r.PickOneTableAndAnalyzeByPriority() // The table is not analyzed. is := dom.InfoSchema() tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) @@ -120,7 +119,7 @@ func TestPickOneTableAndAnalyzeByPriorityWithFailedAnalysis(t *testing.T) { ChangePercentage: 0.5, Weight: 1, } - r.jobs.Push(job1) + r.Jobs.Push(job1) tbl2, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) require.NoError(t, err) job2 := &priorityqueue.TableAnalysisJob{ @@ -130,12 +129,12 @@ func TestPickOneTableAndAnalyzeByPriorityWithFailedAnalysis(t *testing.T) { ChangePercentage: 0.5, Weight: 0.9, } - r.jobs.Push(job2) + r.Jobs.Push(job2) // Add a failed job to t1. startTime := tk.MustQuery("select now() - interval 2 second").Rows()[0][0].(string) insertFailedJobForPartitionWithStartTime(tk, "test", "t1", "p0", startTime) - r.pickOneTableAndAnalyzeByPriority() + r.PickOneTableAndAnalyzeByPriority() // t2 is analyzed. pid2 := tbl2.Meta().GetPartitionInfo().Definitions[0].ID tblStats2 := handle.GetPartitionStats(tbl2.Meta(), pid2) @@ -199,21 +198,20 @@ func TestRebuildTableAnalysisJobQueue(t *testing.T) { require.Nil(t, handle.Update(dom.InfoSchema())) sysProcTracker := dom.SysProcTracker() - r, err := NewRefresher(handle, sysProcTracker) - require.NoError(t, err) + r := refresher.NewRefresher(handle, sysProcTracker) // Rebuild the job queue. No jobs are added. - err = r.rebuildTableAnalysisJobQueue() + err := r.RebuildTableAnalysisJobQueue() require.NoError(t, err) - require.Equal(t, 0, r.jobs.Len()) + require.Equal(t, 0, r.Jobs.Len()) // Insert more data into t1. tk.MustExec("insert into t1 values (4, 4), (5, 5), (6, 6)") require.Nil(t, handle.DumpStatsDeltaToKV(true)) require.Nil(t, handle.Update(dom.InfoSchema())) - err = r.rebuildTableAnalysisJobQueue() + err = r.RebuildTableAnalysisJobQueue() require.NoError(t, err) - require.Equal(t, 1, r.jobs.Len()) - job1 := r.jobs.Pop() + require.Equal(t, 1, r.Jobs.Len()) + job1 := r.Jobs.Pop() require.Equal(t, 1.2, math.Round(job1.Weight*10)/10) require.Equal(t, float64(1), job1.ChangePercentage) require.Equal(t, float64(6*2), job1.TableSize) @@ -302,7 +300,7 @@ func TestCalculateChangePercentage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := calculateChangePercentage(tt.tblStats, tt.autoAnalyzeRatio) + got := refresher.CalculateChangePercentage(tt.tblStats, tt.autoAnalyzeRatio) require.Equal(t, tt.want, got) }) } @@ -330,7 +328,7 @@ func TestGetTableLastAnalyzeDuration(t *testing.T) { currentTs := oracle.GoTimeToTS(currentTime) want := 24 * time.Hour - got := getTableLastAnalyzeDuration(tblStats, currentTs) + got := refresher.GetTableLastAnalyzeDuration(tblStats, currentTs) require.Equal(t, want, got) } @@ -343,7 +341,7 @@ func TestGetTableLastAnalyzeDurationForUnanalyzedTable(t *testing.T) { currentTs := oracle.GoTimeToTS(currentTime) want := 1800 * time.Second - got := getTableLastAnalyzeDuration(tblStats, currentTs) + got := refresher.GetTableLastAnalyzeDuration(tblStats, currentTs) require.Equal(t, want, got) } @@ -396,7 +394,7 @@ func TestCheckIndexesNeedAnalyze(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := checkIndexesNeedAnalyze(tt.tblInfo, tt.tblStats) + got := refresher.CheckIndexesNeedAnalyze(tt.tblInfo, tt.tblStats) require.Equal(t, tt.want, got) }) } @@ -643,7 +641,7 @@ func TestCalculateIndicatorsForPartitions(t *testing.T) { gotAvgSize, gotAvgLastAnalyzeDuration, gotPartitions := - calculateIndicatorsForPartitions( + refresher.CalculateIndicatorsForPartitions( tt.tblInfo, tt.partitionStats, tt.defs, @@ -714,7 +712,7 @@ func TestCheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(t *testing.T) { }, } - partitionIndexes := checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable(&tblInfo, defs, partitionStats) + partitionIndexes := refresher.CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(&tblInfo, defs, partitionStats) expected := map[string][]string{"index1": {"p0", "p1"}, "index2": {"p0"}} require.Equal(t, len(expected), len(partitionIndexes))