Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: make the analysis job executable #50499

Merged
merged 15 commits into from
Jan 19, 2024
7 changes: 6 additions & 1 deletion pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
],
)
Expand All @@ -20,13 +23,15 @@ go_test(
timeout = "short",
srcs = [
"interval_test.go",
"job_test.go",
"main_test.go",
"queue_test.go",
],
embed = [":priorityqueue"],
flaky = True,
shard_count = 4,
shard_count = 10,
deps = [
"//pkg/parser/model",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/testkit",
Expand Down
152 changes: 150 additions & 2 deletions pkg/statistics/handle/autoanalyze/priorityqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,156 @@

package priorityqueue

import (
"strings"

"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
)

// TableAnalysisJob defines the structure for table analysis job information.
type TableAnalysisJob struct {
// TODO: add more information about the job.
Weight float64
// Only set when partitions's indexes need to be analyzed.
PartitionIndexes map[string][]string
TableSchema string
TableName string
// Only set when table's indexes need to be analyzed.
Indexes []string
// Only set when table's partitions need to be analyzed.
Partitions []string
TableID int64
TableStatsVer int
ChangePercentage float64
Weight float64
}

// Execute executes the analyze statement.
func (j *TableAnalysisJob) Execute(
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) error {
return statsutil.CallWithSCtx(statsHandle.SPool(), func(sctx sessionctx.Context) error {
j.analyze(sctx, statsHandle, sysProcTracker)
return nil
})
}

func (j *TableAnalysisJob) analyze(sctx sessionctx.Context, statsHandle statstypes.StatsHandle, sysProcTracker sessionctx.SysProcTracker) {
switch {
case len(j.PartitionIndexes) > 0:
j.analyzePartitionIndexes(sctx, statsHandle, sysProcTracker)
case len(j.Partitions) > 0:
j.analyzePartitions(sctx, statsHandle, sysProcTracker)
case len(j.Indexes) > 0:
j.analyzeIndexes(sctx, statsHandle, sysProcTracker)
default:
j.analyzeTable(sctx, statsHandle, sysProcTracker)
}
}

func (j *TableAnalysisJob) analyzeTable(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) {
sql, params := j.genSQLForAnalyzeTable()
exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...)
}

func (j *TableAnalysisJob) analyzeIndexes(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) {
for _, index := range j.Indexes {
sql, params := j.genSQLForAnalyzeIndex(index)
exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...)
}
}

// analyzePartitions performs analysis on the specified partitions.
// This function uses a batch mode for efficiency. After analyzing the partitions,
// it's necessary to merge their statistics. By analyzing them in batches,
// we can reduce the overhead of this merging process.
func (j *TableAnalysisJob) analyzePartitions(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) {
analyzePartitionBatchSize := int(variable.AutoAnalyzePartitionBatchSize.Load())
needAnalyzePartitionNames := make([]interface{}, 0, len(j.Partitions))
for _, partition := range j.Partitions {
needAnalyzePartitionNames = append(needAnalyzePartitionNames, partition)
}
for i := 0; i < len(needAnalyzePartitionNames); i += analyzePartitionBatchSize {
start := i
end := start + analyzePartitionBatchSize
if end >= len(needAnalyzePartitionNames) {
end = len(needAnalyzePartitionNames)
}

sql := getPartitionSQL("analyze table %n.%n partition", "", end-start)
params := append([]interface{}{j.TableSchema, j.TableName}, needAnalyzePartitionNames[start:end]...)
exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...)
}
}

// analyzePartitionIndexes performs analysis on the specified partition indexes.
func (j *TableAnalysisJob) analyzePartitionIndexes(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) {
analyzePartitionBatchSize := int(variable.AutoAnalyzePartitionBatchSize.Load())

for indexName, partitionNames := range j.PartitionIndexes {
needAnalyzePartitionNames := make([]interface{}, 0, len(partitionNames))
for _, partition := range partitionNames {
needAnalyzePartitionNames = append(needAnalyzePartitionNames, partition)
}
for i := 0; i < len(needAnalyzePartitionNames); i += analyzePartitionBatchSize {
start := i
end := start + analyzePartitionBatchSize
if end >= len(needAnalyzePartitionNames) {
end = len(needAnalyzePartitionNames)
}

sql := getPartitionSQL("analyze table %n.%n partition", " index %n", end-start)
params := append([]interface{}{j.TableSchema, j.TableName}, needAnalyzePartitionNames[start:end]...)
params = append(params, indexName)
exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...)
}
}
}

func getPartitionSQL(prefix, suffix string, numPartitions int) string {
var sqlBuilder strings.Builder
sqlBuilder.WriteString(prefix)
for i := 0; i < numPartitions; i++ {
if i != 0 {
sqlBuilder.WriteString(",")
}
sqlBuilder.WriteString(" %n")
}
sqlBuilder.WriteString(suffix)
return sqlBuilder.String()
}

// genSQLForAnalyzeTable generates the SQL for analyzing the specified table.
func (j *TableAnalysisJob) genSQLForAnalyzeTable() (string, []interface{}) {
sql := "analyze table %n.%n"
params := []interface{}{j.TableSchema, j.TableName}

return sql, params
}

// genSQLForAnalyzeIndex generates the SQL for analyzing the specified index.
func (j *TableAnalysisJob) genSQLForAnalyzeIndex(index string) (string, []interface{}) {
sql := "analyze table %n.%n index %n"
params := []interface{}{j.TableSchema, j.TableName, index}

return sql, params
}
Loading