Skip to content

Commit

Permalink
*: add variable for priority queue (#51447)
Browse files Browse the repository at this point in the history
ref #50132
  • Loading branch information
Rustin170506 authored Mar 1, 2024
1 parent bd17c5b commit 6dd47fe
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 174 deletions.
12 changes: 11 additions & 1 deletion pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,14 +975,24 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBEnableAutoAnalyze, Value: BoolToOnOff(DefTiDBEnableAutoAnalyze), Type: TypeBool,
{
Scope: ScopeGlobal, Name: TiDBEnableAutoAnalyze, Value: BoolToOnOff(DefTiDBEnableAutoAnalyze), Type: TypeBool,
GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
return BoolToOnOff(RunAutoAnalyze.Load()), nil
},
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
RunAutoAnalyze.Store(TiDBOptOn(val))
return nil
},
}, {
Scope: ScopeGlobal, Name: TiDBEnableAutoAnalyzePriorityQueue, Value: BoolToOnOff(DefTiDBEnableAutoAnalyzePriorityQueue), Type: TypeBool,
GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
return BoolToOnOff(EnableAutoAnalyzePriorityQueue.Load()), nil
},
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
EnableAutoAnalyzePriorityQueue.Store(TiDBOptOn(val))
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBGOGCTunerThreshold, Value: strconv.FormatFloat(DefTiDBGOGCTunerThreshold, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64,
GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
Expand Down
30 changes: 17 additions & 13 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,8 @@ const (
TiDBMemQuotaAnalyze = "tidb_mem_quota_analyze"
// TiDBEnableAutoAnalyze determines whether TiDB executes automatic analysis.
TiDBEnableAutoAnalyze = "tidb_enable_auto_analyze"
// TiDBEnableAutoAnalyzePriorityQueue determines whether TiDB executes automatic analysis with priority queue.
TiDBEnableAutoAnalyzePriorityQueue = "tidb_enable_auto_analyze_priority_queue"
// TiDBMemOOMAction indicates what operation TiDB perform when a single SQL statement exceeds
// the memory quota specified by tidb_mem_quota_query and cannot be spilled to disk.
TiDBMemOOMAction = "tidb_mem_oom_action"
Expand Down Expand Up @@ -1357,6 +1359,7 @@ const (
DefTiDBBatchDMLIgnoreError = false
DefTiDBMemQuotaAnalyze = -1
DefTiDBEnableAutoAnalyze = true
DefTiDBEnableAutoAnalyzePriorityQueue = false
DefTiDBMemOOMAction = "CANCEL"
DefTiDBMaxAutoAnalyzeTime = 12 * 60 * 60
DefTiDBEnablePrepPlanCache = true
Expand Down Expand Up @@ -1478,19 +1481,20 @@ const (

// Process global variables.
var (
ProcessGeneralLog = atomic.NewBool(false)
RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze)
GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays))
QueryLogMaxLen = atomic.NewInt32(DefTiDBQueryLogMaxLen)
EnablePProfSQLCPU = atomic.NewBool(false)
EnableBatchDML = atomic.NewBool(false)
EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM)
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency
ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit
ddlReorgRowFormat int64 = DefTiDBRowFormatV2
maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount
ProcessGeneralLog = atomic.NewBool(false)
RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze)
EnableAutoAnalyzePriorityQueue = atomic.NewBool(DefTiDBEnableAutoAnalyzePriorityQueue)
GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays))
QueryLogMaxLen = atomic.NewInt32(DefTiDBQueryLogMaxLen)
EnablePProfSQLCPU = atomic.NewBool(false)
EnableBatchDML = atomic.NewBool(false)
EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM)
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency
ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit
ddlReorgRowFormat int64 = DefTiDBRowFormatV2
maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount
// DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
DDLSlowOprThreshold = config.GetGlobalConfig().Instance.DDLSlowOprThreshold
ForcePriority = int32(DefTiDBForcePriority)
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/autoanalyze/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/autoanalyze/refresher",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
Expand All @@ -33,7 +34,7 @@ go_test(
timeout = "short",
srcs = ["autoanalyze_test.go"],
flaky = True,
shard_count = 11,
shard_count = 12,
deps = [
":autoanalyze",
"//pkg/domain/infosync",
Expand Down
10 changes: 10 additions & 0 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
Expand Down Expand Up @@ -290,6 +291,15 @@ func HandleAutoAnalyze(
)
}
}()
if variable.EnableAutoAnalyzePriorityQueue.Load() {
r := refresher.NewRefresher(statsHandle, sysProcTracker)
err := r.RebuildTableAnalysisJobQueue()
if err != nil {
statslogutil.StatsLogger().Error("rebuild table analysis job queue failed", zap.Error(err))
return false
}
return r.PickOneTableAndAnalyzeByPriority()
}

parameters := exec.GetAutoAnalyzeParameters(sctx)
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
Expand Down
22 changes: 22 additions & 0 deletions pkg/statistics/handle/autoanalyze/autoanalyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,28 @@ import (
"go.uber.org/mock/gomock"
)

func TestEnableAutoAnalyzePriorityQueue(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int)")
tk.MustExec("insert into t values (1)")
// Enable auto analyze priority queue.
tk.MustExec("SET GLOBAL tidb_enable_auto_analyze_priority_queue=ON")
require.True(t, variable.EnableAutoAnalyzePriorityQueue.Load())
h := dom.StatsHandle()
err := h.HandleDDLEvent(<-h.DDLEventCh())
require.NoError(t, err)
require.NoError(t, h.DumpStatsDeltaToKV(true))
is := dom.InfoSchema()
require.NoError(t, h.Update(is))
exec.AutoAnalyzeMinCnt = 0
defer func() {
exec.AutoAnalyzeMinCnt = 1000
}()
require.True(t, dom.StatsHandle().HandleAutoAnalyze())
}

func TestAutoAnalyzeLockedTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ go_test(
"main_test.go",
"queue_test.go",
],
embed = [":priorityqueue"],
flaky = True,
shard_count = 17,
deps = [
":priorityqueue",
"//pkg/parser/model",
"//pkg/session",
"//pkg/sessionctx",
Expand Down
14 changes: 8 additions & 6 deletions pkg/statistics/handle/autoanalyze/priorityqueue/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import "math"

const (
// EventNone represents no special event.
eventNone = 0.0
EventNone = 0.0
// EventNewIndex represents a special event for newly added indexes.
eventNewIndex = 2.0
EventNewIndex = 2.0
)

// TODO: make these configurable.
Expand Down Expand Up @@ -58,13 +58,15 @@ func (pc *PriorityCalculator) CalculateWeight(job *TableAnalysisJob) float64 {
return changeRatioWeight*math.Log10(1+changeRatio) +
sizeWeight*(1-math.Log10(1+job.TableSize)) +
analysisInterval*math.Log10(1+math.Sqrt(job.LastAnalysisDuration.Seconds())) +
pc.getSpecialEvent(job)
pc.GetSpecialEvent(job)
}

func (*PriorityCalculator) getSpecialEvent(job *TableAnalysisJob) float64 {
// GetSpecialEvent returns the special event weight.
// Exported for testing purposes.
func (*PriorityCalculator) GetSpecialEvent(job *TableAnalysisJob) float64 {
if job.HasNewlyAddedIndex() {
return eventNewIndex
return EventNewIndex
}

return eventNone
return EventNone
}
25 changes: 13 additions & 12 deletions pkg/statistics/handle/autoanalyze/priorityqueue/calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package priorityqueue
package priorityqueue_test

import (
"testing"
"time"

"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
"github.com/stretchr/testify/require"
)

Expand All @@ -29,8 +30,8 @@ type testData struct {
}

func TestCalculateWeight(t *testing.T) {
// Note: all group are sorted by weight in ascending order.
pc := NewPriorityCalculator()
// Note: all groups are sorted by weight in ascending order.
pc := priorityqueue.NewPriorityCalculator()
// Only focus on change percentage. Bigger change percentage, higher weight.
changePercentageGroup := []testData{
{
Expand Down Expand Up @@ -106,10 +107,10 @@ func TestCalculateWeight(t *testing.T) {

// testWeightCalculation is a helper function to test the weight calculation.
// It will check if the weight is increasing for each test data group.
func testWeightCalculation(t *testing.T, pc *PriorityCalculator, group []testData) {
func testWeightCalculation(t *testing.T, pc *priorityqueue.PriorityCalculator, group []testData) {
prevWeight := -1.0
for _, tc := range group {
job := &TableAnalysisJob{
job := &priorityqueue.TableAnalysisJob{
ChangePercentage: tc.ChangePercentage,
TableSize: tc.TableSize,
LastAnalysisDuration: tc.LastAnalysisDuration,
Expand All @@ -122,23 +123,23 @@ func testWeightCalculation(t *testing.T, pc *PriorityCalculator, group []testDat
}

func TestGetSpecialEvent(t *testing.T) {
pc := NewPriorityCalculator()
pc := priorityqueue.NewPriorityCalculator()

jobWithIndex := &TableAnalysisJob{
jobWithIndex := &priorityqueue.TableAnalysisJob{
PartitionIndexes: map[string][]string{
"index1": {"p1", "p2"},
},
}
require.Equal(t, eventNewIndex, pc.getSpecialEvent(jobWithIndex))
require.Equal(t, priorityqueue.EventNewIndex, pc.GetSpecialEvent(jobWithIndex))

jobWithIndex = &TableAnalysisJob{
jobWithIndex = &priorityqueue.TableAnalysisJob{
Indexes: []string{"index1"},
}
require.Equal(t, eventNewIndex, pc.getSpecialEvent(jobWithIndex))
require.Equal(t, priorityqueue.EventNewIndex, pc.GetSpecialEvent(jobWithIndex))

jobWithoutIndex := &TableAnalysisJob{
jobWithoutIndex := &priorityqueue.TableAnalysisJob{
PartitionIndexes: map[string][]string{},
Indexes: []string{},
}
require.Equal(t, eventNone, pc.getSpecialEvent(jobWithoutIndex))
require.Equal(t, priorityqueue.EventNone, pc.GetSpecialEvent(jobWithoutIndex))
}
22 changes: 11 additions & 11 deletions pkg/statistics/handle/autoanalyze/priorityqueue/interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/util"
)

// noRecord is used to indicate that there is no related record in mysql.analyze_jobs.
const noRecord = -1
// NoRecord is used to indicate that there is no related record in mysql.analyze_jobs.
const NoRecord = -1

// justFailed is used to indicate that the last analysis has just failed.
const justFailed = 0
Expand Down Expand Up @@ -80,9 +80,9 @@ const lastFailedDurationQueryForPartition = `
JOIN mysql.analyze_jobs aj ON aj.id = latest_failures.max_id;
`

// getAverageAnalysisDuration returns the average duration of the last 5 successful analyses for each specified partition.
// GetAverageAnalysisDuration returns the average duration of the last 5 successful analyses for each specified partition.
// If there are no successful analyses, it returns 0.
func getAverageAnalysisDuration(
func GetAverageAnalysisDuration(
sctx sessionctx.Context,
schema, tableName string,
partitionNames ...string,
Expand All @@ -99,25 +99,25 @@ func getAverageAnalysisDuration(

rows, _, err := util.ExecRows(sctx, query, params...)
if err != nil {
return noRecord, err
return NoRecord, err
}

// NOTE: if there are no successful analyses, we return 0.
if len(rows) == 0 || rows[0].IsNull(0) {
return noRecord, nil
return NoRecord, nil
}
avgDuration := rows[0].GetMyDecimal(0)
duration, err := avgDuration.ToFloat64()
if err != nil {
return noRecord, err
return NoRecord, err
}

return time.Duration(duration) * time.Second, nil
}

// getLastFailedAnalysisDuration returns the duration since the last failed analysis.
// GetLastFailedAnalysisDuration returns the duration since the last failed analysis.
// If there is no failed analysis, it returns 0.
func getLastFailedAnalysisDuration(
func GetLastFailedAnalysisDuration(
sctx sessionctx.Context,
schema, tableName string,
partitionNames ...string,
Expand All @@ -134,12 +134,12 @@ func getLastFailedAnalysisDuration(

rows, _, err := util.ExecRows(sctx, query, params...)
if err != nil {
return noRecord, err
return NoRecord, err
}

// NOTE: if there are no failed analyses, we return 0.
if len(rows) == 0 || rows[0].IsNull(0) {
return noRecord, nil
return NoRecord, nil
}
lastFailedDuration := rows[0].GetUint64(0)
if lastFailedDuration == 0 {
Expand Down
Loading

0 comments on commit 6dd47fe

Please sign in to comment.