From 3dfeb69507611606b91f125a3c7fff24be5d807e Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Wed, 27 Nov 2024 23:25:23 +0800 Subject: [PATCH] This is an automated cherry-pick of #57723 Signed-off-by: ti-chi-bot --- pkg/statistics/BUILD.bazel | 113 ++ pkg/statistics/column.go | 272 +++++ pkg/statistics/handle/storage/read.go | 814 +++++++++++++ .../handle/syncload/stats_syncload.go | 626 ++++++++++ pkg/statistics/integration_test.go | 614 ++++++++++ pkg/statistics/table.go | 1066 +++++++++++++++++ 6 files changed, 3505 insertions(+) create mode 100644 pkg/statistics/BUILD.bazel create mode 100644 pkg/statistics/column.go create mode 100644 pkg/statistics/handle/storage/read.go create mode 100644 pkg/statistics/handle/syncload/stats_syncload.go create mode 100644 pkg/statistics/integration_test.go create mode 100644 pkg/statistics/table.go diff --git a/pkg/statistics/BUILD.bazel b/pkg/statistics/BUILD.bazel new file mode 100644 index 0000000000000..cad48db7936fe --- /dev/null +++ b/pkg/statistics/BUILD.bazel @@ -0,0 +1,113 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "statistics", + srcs = [ + "analyze.go", + "analyze_jobs.go", + "builder.go", + "builder_ext_stats.go", + "cmsketch.go", + "cmsketch_util.go", + "column.go", + "debugtrace.go", + "estimate.go", + "fmsketch.go", + "histogram.go", + "index.go", + "row_sampler.go", + "sample.go", + "scalar.go", + "table.go", + ], + importpath = "github.com/pingcap/tidb/pkg/statistics", + visibility = ["//visibility:public"], + deps = [ + "//pkg/expression", + "//pkg/kv", + "//pkg/meta/model", + "//pkg/parser/ast", + "//pkg/parser/charset", + "//pkg/parser/mysql", + "//pkg/parser/terror", + "//pkg/planner/core/resolve", + "//pkg/planner/planctx", + "//pkg/planner/util/debugtrace", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/sessionctx/variable", + "//pkg/statistics/asyncload", + "//pkg/statistics/handle/logutil", + "//pkg/tablecodec", + "//pkg/types", + "//pkg/util/chunk", + "//pkg/util/codec", + "//pkg/util/collate", + "//pkg/util/context", + "//pkg/util/dbterror", + "//pkg/util/fastrand", + "//pkg/util/hack", + "//pkg/util/intest", + "//pkg/util/logutil", + "//pkg/util/memory", + "//pkg/util/ranger", + "//pkg/util/sqlexec", + "@com_github_dolthub_swiss//:swiss", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_twmb_murmur3//:murmur3", + "@org_golang_x_exp//maps", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "statistics_test", + timeout = "short", + srcs = [ + "bench_daily_test.go", + "builder_test.go", + "cmsketch_test.go", + "fmsketch_test.go", + "histogram_bench_test.go", + "histogram_test.go", + "integration_test.go", + "main_test.go", + "sample_test.go", + "scalar_test.go", + "statistics_test.go", + ], + data = glob(["testdata/**"]), + embed = [":statistics"], + flaky = True, + shard_count = 38, + deps = [ + "//pkg/config", + "//pkg/meta/model", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/planner/core/resolve", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/testkit", + "//pkg/testkit/analyzehelper", + "//pkg/testkit/testdata", + "//pkg/testkit/testmain", + "//pkg/testkit/testsetup", + "//pkg/types", + "//pkg/util/benchdaily", + "//pkg/util/chunk", + "//pkg/util/codec", + "//pkg/util/collate", + "//pkg/util/memory", + "//pkg/util/mock", + "//pkg/util/ranger", + "//pkg/util/sqlexec", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/pkg/statistics/column.go b/pkg/statistics/column.go new file mode 100644 index 0000000000000..5c6c66e239926 --- /dev/null +++ b/pkg/statistics/column.go @@ -0,0 +1,272 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/planner/planctx" + "github.com/pingcap/tidb/pkg/planner/util/debugtrace" + "github.com/pingcap/tidb/pkg/statistics/asyncload" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" +) + +// Column represents a column histogram. +type Column struct { + LastAnalyzePos types.Datum + CMSketch *CMSketch + TopN *TopN + FMSketch *FMSketch + Info *model.ColumnInfo + Histogram + + // StatsLoadedStatus indicates the status of column statistics + StatsLoadedStatus + // PhysicalID is the physical table id, + // or it could possibly be -1, which means "stats not available". + // The -1 case could happen in a pseudo stats table, and in this case, this stats should not trigger stats loading. + PhysicalID int64 + Flag int64 + StatsVer int64 // StatsVer is the version of the current stats, used to maintain compatibility + + IsHandle bool +} + +// Copy copies the column. +func (c *Column) Copy() *Column { + if c == nil { + return nil + } + nc := &Column{ + PhysicalID: c.PhysicalID, + Flag: c.Flag, + StatsVer: c.StatsVer, + IsHandle: c.IsHandle, + } + c.LastAnalyzePos.Copy(&nc.LastAnalyzePos) + if c.CMSketch != nil { + nc.CMSketch = c.CMSketch.Copy() + } + if c.TopN != nil { + nc.TopN = c.TopN.Copy() + } + if c.FMSketch != nil { + nc.FMSketch = c.FMSketch.Copy() + } + if c.Info != nil { + nc.Info = c.Info.Clone() + } + nc.Histogram = *c.Histogram.Copy() + nc.StatsLoadedStatus = c.StatsLoadedStatus.Copy() + return nc +} + +func (c *Column) String() string { + return c.Histogram.ToString(0) +} + +// TotalRowCount returns the total count of this column. +func (c *Column) TotalRowCount() float64 { + if c.StatsVer >= Version2 { + return c.Histogram.TotalRowCount() + float64(c.TopN.TotalCount()) + } + return c.Histogram.TotalRowCount() +} + +// NotNullCount returns the count of this column which is not null. +func (c *Column) NotNullCount() float64 { + if c.StatsVer >= Version2 { + return c.Histogram.NotNullCount() + float64(c.TopN.TotalCount()) + } + return c.Histogram.NotNullCount() +} + +// GetIncreaseFactor get the increase factor to adjust the final estimated count when the table is modified. +func (c *Column) GetIncreaseFactor(realtimeRowCount int64) float64 { + columnCount := c.TotalRowCount() + if columnCount == 0 { + // avoid dividing by 0 + return 1.0 + } + return float64(realtimeRowCount) / columnCount +} + +// MemoryUsage returns the total memory usage of Histogram, CMSketch, FMSketch in Column. +// We ignore the size of other metadata in Column +func (c *Column) MemoryUsage() CacheItemMemoryUsage { + var sum int64 + columnMemUsage := &ColumnMemUsage{ + ColumnID: c.Info.ID, + } + histogramMemUsage := c.Histogram.MemoryUsage() + columnMemUsage.HistogramMemUsage = histogramMemUsage + sum = histogramMemUsage + if c.CMSketch != nil { + cmSketchMemUsage := c.CMSketch.MemoryUsage() + columnMemUsage.CMSketchMemUsage = cmSketchMemUsage + sum += cmSketchMemUsage + } + if c.TopN != nil { + topnMemUsage := c.TopN.MemoryUsage() + columnMemUsage.TopNMemUsage = topnMemUsage + sum += topnMemUsage + } + if c.FMSketch != nil { + fmSketchMemUsage := c.FMSketch.MemoryUsage() + columnMemUsage.FMSketchMemUsage = fmSketchMemUsage + sum += fmSketchMemUsage + } + columnMemUsage.TotalMemUsage = sum + return columnMemUsage +} + +// ColumnStatsIsInvalid checks if this column is invalid. +// If this column has histogram but not loaded yet, +// then we mark it as need histogram. +func ColumnStatsIsInvalid(colStats *Column, sctx planctx.PlanContext, histColl *HistColl, cid int64) (res bool) { + var totalCount float64 + var ndv int64 + var inValidForCollPseudo, essentialLoaded bool + if sctx.GetSessionVars().StmtCtx.EnableOptimizerDebugTrace { + debugtrace.EnterContextCommon(sctx) + defer func() { + debugtrace.RecordAnyValuesWithNames(sctx, + "IsInvalid", res, + "InValidForCollPseudo", inValidForCollPseudo, + "TotalCount", totalCount, + "NDV", ndv, + "EssentialLoaded", essentialLoaded, + ) + debugtrace.LeaveContextCommon(sctx) + }() + } + if sctx != nil { + stmtctx := sctx.GetSessionVars().StmtCtx + if (colStats == nil || !colStats.IsStatsInitialized() || colStats.IsLoadNeeded()) && + stmtctx != nil && + !histColl.CanNotTriggerLoad { + asyncload.AsyncLoadHistogramNeededItems.Insert(model.TableItemID{ + TableID: histColl.PhysicalID, + ID: cid, + IsIndex: false, + IsSyncLoadFailed: sctx.GetSessionVars().StmtCtx.StatsLoad.Timeout > 0, + }, true) + } + } + if histColl.Pseudo { + inValidForCollPseudo = true + return true + } + if colStats == nil { + totalCount = -1 + ndv = -1 + essentialLoaded = false + return true + } + // In some cases, some statistics in column would be evicted + // For example: the cmsketch of the column might be evicted while the histogram and the topn are still exists + // In this case, we will think this column as valid due to we can still use the rest of the statistics to do optimize. + totalCount = colStats.TotalRowCount() + essentialLoaded = colStats.IsEssentialStatsLoaded() + ndv = colStats.Histogram.NDV + return totalCount == 0 || (!essentialLoaded && ndv > 0) +} + +// ItemID implements TableCacheItem +func (c *Column) ItemID() int64 { + return c.Info.ID +} + +// DropUnnecessaryData drops the unnecessary data for the column. +func (c *Column) DropUnnecessaryData() { + if c.StatsVer < Version2 { + c.CMSketch = nil + } + c.TopN = nil + c.Histogram.Bounds = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, 0) + c.Histogram.Buckets = make([]Bucket, 0) + c.Histogram.Scalars = make([]scalar, 0) + c.evictedStatus = AllEvicted +} + +// IsAllEvicted indicates whether all stats evicted +func (c *Column) IsAllEvicted() bool { + return c == nil || (c.statsInitialized && c.evictedStatus >= AllEvicted) +} + +// GetEvictedStatus indicates the evicted status +func (c *Column) GetEvictedStatus() int { + return c.evictedStatus +} + +// IsStatsInitialized indicates whether stats is initialized +func (c *Column) IsStatsInitialized() bool { + return c.statsInitialized +} + +// GetStatsVer indicates the stats version +func (c *Column) GetStatsVer() int64 { + return c.StatsVer +} + +// IsCMSExist indicates whether CMSketch exists +func (c *Column) IsCMSExist() bool { + return c.CMSketch != nil +} + +// StatusToString gets the string info of StatsLoadedStatus +func (s StatsLoadedStatus) StatusToString() string { + if !s.statsInitialized { + return "unInitialized" + } + switch s.evictedStatus { + case AllLoaded: + return "allLoaded" + case AllEvicted: + return "allEvicted" + } + return "unknown" +} + +// IsAnalyzed indicates whether the column is analyzed. +// The set of IsAnalyzed columns is a subset of the set of StatsAvailable columns. +func (c *Column) IsAnalyzed() bool { + return c.GetStatsVer() != Version0 +} + +// StatsAvailable indicates whether the column stats are collected. +// Note: +// 1. The function merely talks about whether the stats are collected, regardless of the stats loaded status. +// 2. The function is used to decide StatsLoadedStatus.statsInitialized when reading the column stats from storage. +// 3. There are two cases that StatsAvailable is true: +// a. IsAnalyzed is true. +// b. The column is newly-added/modified and its stats are generated according to the default value. +func (c *Column) StatsAvailable() bool { + // Typically, when the column is analyzed, StatsVer is set to Version1/Version2, so we check IsAnalyzed(). + // However, when we add/modify a column, its stats are generated according to the default value without setting + // StatsVer, so we check NDV > 0 || NullCount > 0 for the case. + return c.IsAnalyzed() || c.NDV > 0 || c.NullCount > 0 +} + +// EmptyColumn creates an empty column object. It may be used for pseudo estimation or to stop loading unexisting stats. +func EmptyColumn(tid int64, pkIsHandle bool, colInfo *model.ColumnInfo) *Column { + return &Column{ + PhysicalID: tid, + Info: colInfo, + Histogram: *NewHistogram(colInfo.ID, 0, 0, 0, &colInfo.FieldType, 0, 0), + IsHandle: pkIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + } +} diff --git a/pkg/statistics/handle/storage/read.go b/pkg/statistics/handle/storage/read.go new file mode 100644 index 0000000000000..79d1468c52f77 --- /dev/null +++ b/pkg/statistics/handle/storage/read.go @@ -0,0 +1,814 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "encoding/json" + "strconv" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/asyncload" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/memory" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "go.uber.org/zap" +) + +// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. +func StatsMetaCountAndModifyCount( + ctx context.Context, + sctx sessionctx.Context, + tableID int64, +) (count, modifyCount int64, isNull bool, err error) { + return statsMetaCountAndModifyCount(ctx, sctx, tableID, false) +} + +// StatsMetaCountAndModifyCountForUpdate reads count and modify_count for the given table from mysql.stats_meta with lock. +func StatsMetaCountAndModifyCountForUpdate( + ctx context.Context, + sctx sessionctx.Context, + tableID int64, +) (count, modifyCount int64, isNull bool, err error) { + return statsMetaCountAndModifyCount(ctx, sctx, tableID, true) +} + +func statsMetaCountAndModifyCount( + ctx context.Context, + sctx sessionctx.Context, + tableID int64, + forUpdate bool, +) (count, modifyCount int64, isNull bool, err error) { + sql := "select count, modify_count from mysql.stats_meta where table_id = %?" + if forUpdate { + sql += " for update" + } + rows, _, err := util.ExecRowsWithCtx(ctx, sctx, sql, tableID) + if err != nil { + return 0, 0, false, err + } + if len(rows) == 0 { + return 0, 0, true, nil + } + count = int64(rows[0].GetUint64(0)) + modifyCount = rows[0].GetInt64(1) + return count, modifyCount, false, nil +} + +// HistMetaFromStorageWithHighPriority reads the meta info of the histogram from the storage. +func HistMetaFromStorageWithHighPriority(sctx sessionctx.Context, item *model.TableItemID, possibleColInfo *model.ColumnInfo) (*statistics.Histogram, *types.Datum, int64, int64, error) { + isIndex := 0 + var tp *types.FieldType + if item.IsIndex { + isIndex = 1 + tp = types.NewFieldType(mysql.TypeBlob) + } else { + tp = &possibleColInfo.FieldType + } + rows, _, err := util.ExecRows(sctx, + "select high_priority distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", + item.TableID, + item.ID, + isIndex, + ) + if err != nil { + return nil, nil, 0, 0, err + } + if len(rows) == 0 { + return nil, nil, 0, 0, nil + } + hist := statistics.NewHistogram(item.ID, rows[0].GetInt64(0), rows[0].GetInt64(2), rows[0].GetUint64(1), tp, chunk.InitialCapacity, rows[0].GetInt64(3)) + hist.Correlation = rows[0].GetFloat64(5) + lastPos := rows[0].GetDatum(7, types.NewFieldType(mysql.TypeBlob)) + return hist, &lastPos, rows[0].GetInt64(4), rows[0].GetInt64(6), nil +} + +// HistogramFromStorageWithPriority wraps the HistogramFromStorage with the given kv.Priority. +// Sync load and async load will use high priority to get data. +func HistogramFromStorageWithPriority( + sctx sessionctx.Context, + tableID int64, + colID int64, + tp *types.FieldType, + distinct int64, + isIndex int, + ver uint64, + nullCount int64, + totColSize int64, + corr float64, + priority int, +) (*statistics.Histogram, error) { + selectPrefix := "select " + switch priority { + case kv.PriorityHigh: + selectPrefix += "high_priority " + case kv.PriorityLow: + selectPrefix += "low_priority " + } + rows, fields, err := util.ExecRows(sctx, selectPrefix+"count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %? order by bucket_id", tableID, isIndex, colID) + if err != nil { + return nil, errors.Trace(err) + } + bucketSize := len(rows) + hg := statistics.NewHistogram(colID, distinct, nullCount, ver, tp, bucketSize, totColSize) + hg.Correlation = corr + totalCount := int64(0) + for i := 0; i < bucketSize; i++ { + count := rows[i].GetInt64(0) + repeats := rows[i].GetInt64(1) + var upperBound, lowerBound types.Datum + if isIndex == 1 { + lowerBound = rows[i].GetDatum(2, &fields[2].Column.FieldType) + upperBound = rows[i].GetDatum(3, &fields[3].Column.FieldType) + } else { + d := rows[i].GetDatum(2, &fields[2].Column.FieldType) + // For new collation data, when storing the bounds of the histogram, we store the collate key instead of the + // original value. + // But there's additional conversion logic for new collation data, and the collate key might be longer than + // the FieldType.flen. + // If we use the original FieldType here, there might be errors like "Invalid utf8mb4 character string" + // or "Data too long". + // So we change it to TypeBlob to bypass those logics here. + if tp.EvalType() == types.ETString && tp.GetType() != mysql.TypeEnum && tp.GetType() != mysql.TypeSet { + tp = types.NewFieldType(mysql.TypeBlob) + } + lowerBound, err = d.ConvertTo(statistics.UTCWithAllowInvalidDateCtx, tp) + if err != nil { + return nil, errors.Trace(err) + } + d = rows[i].GetDatum(3, &fields[3].Column.FieldType) + upperBound, err = d.ConvertTo(statistics.UTCWithAllowInvalidDateCtx, tp) + if err != nil { + return nil, errors.Trace(err) + } + } + totalCount += count + hg.AppendBucketWithNDV(&lowerBound, &upperBound, totalCount, repeats, rows[i].GetInt64(4)) + } + hg.PreCalculateScalar() + return hg, nil +} + +// CMSketchAndTopNFromStorageWithHighPriority reads CMSketch and TopN from storage. +func CMSketchAndTopNFromStorageWithHighPriority(sctx sessionctx.Context, tblID int64, isIndex, histID, statsVer int64) (_ *statistics.CMSketch, _ *statistics.TopN, err error) { + topNRows, _, err := util.ExecRows(sctx, "select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil { + return nil, nil, err + } + // If we are on version higher than 1. Don't read Count-Min Sketch. + if statsVer > statistics.Version1 { + return statistics.DecodeCMSketchAndTopN(nil, topNRows) + } + rows, _, err := util.ExecRows(sctx, "select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil { + return nil, nil, err + } + if len(rows) == 0 { + return statistics.DecodeCMSketchAndTopN(nil, topNRows) + } + return statistics.DecodeCMSketchAndTopN(rows[0].GetBytes(0), topNRows) +} + +// CMSketchFromStorage reads CMSketch from storage +func CMSketchFromStorage(sctx sessionctx.Context, tblID int64, isIndex int, histID int64) (_ *statistics.CMSketch, err error) { + rows, _, err := util.ExecRows(sctx, "select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil || len(rows) == 0 { + return nil, err + } + return statistics.DecodeCMSketch(rows[0].GetBytes(0)) +} + +// TopNFromStorage reads TopN from storage +func TopNFromStorage(sctx sessionctx.Context, tblID int64, isIndex int, histID int64) (_ *statistics.TopN, err error) { + rows, _, err := util.ExecRows(sctx, "select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil || len(rows) == 0 { + return nil, err + } + return statistics.DecodeTopN(rows), nil +} + +// FMSketchFromStorage reads FMSketch from storage +func FMSketchFromStorage(sctx sessionctx.Context, tblID int64, isIndex, histID int64) (_ *statistics.FMSketch, err error) { + rows, _, err := util.ExecRows(sctx, "select value from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil || len(rows) == 0 { + return nil, err + } + return statistics.DecodeFMSketch(rows[0].GetBytes(0)) +} + +// CheckSkipPartition checks if we can skip loading the partition. +func CheckSkipPartition(sctx sessionctx.Context, tblID int64, isIndex int) error { + rows, _, err := util.ExecRows(sctx, "select distinct_count from mysql.stats_histograms where table_id =%? and is_index = %?", tblID, isIndex) + if err != nil { + return err + } + if len(rows) == 0 { + return types.ErrPartitionStatsMissing + } + return nil +} + +// CheckSkipColumnPartiion checks if we can skip loading the partition. +func CheckSkipColumnPartiion(sctx sessionctx.Context, tblID int64, isIndex int, histsID int64) error { + rows, _, err := util.ExecRows(sctx, "select distinct_count from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histsID) + if err != nil { + return err + } + if len(rows) == 0 { + return types.ErrPartitionColumnStatsMissing + } + return nil +} + +// ExtendedStatsFromStorage reads extended stats from storage. +func ExtendedStatsFromStorage(sctx sessionctx.Context, table *statistics.Table, tableID int64, loadAll bool) (*statistics.Table, error) { + failpoint.Inject("injectExtStatsLoadErr", func() { + failpoint.Return(nil, errors.New("gofail extendedStatsFromStorage error")) + }) + lastVersion := uint64(0) + if table.ExtendedStats != nil && !loadAll { + lastVersion = table.ExtendedStats.LastUpdateVersion + } else { + table.ExtendedStats = statistics.NewExtendedStatsColl() + } + rows, _, err := util.ExecRows(sctx, "select name, status, type, column_ids, stats, version from mysql.stats_extended where table_id = %? and status in (%?, %?, %?) and version > %?", + tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsDeleted, lastVersion) + if err != nil || len(rows) == 0 { + return table, nil + } + for _, row := range rows { + lastVersion = max(lastVersion, row.GetUint64(5)) + name := row.GetString(0) + status := uint8(row.GetInt64(1)) + if status == statistics.ExtendedStatsDeleted || status == statistics.ExtendedStatsInited { + delete(table.ExtendedStats.Stats, name) + } else { + item := &statistics.ExtendedStatsItem{ + Tp: uint8(row.GetInt64(2)), + } + colIDs := row.GetString(3) + err := json.Unmarshal([]byte(colIDs), &item.ColIDs) + if err != nil { + statslogutil.StatsLogger().Error("decode column IDs failed", zap.String("column_ids", colIDs), zap.Error(err)) + return nil, err + } + statsStr := row.GetString(4) + if item.Tp == ast.StatsTypeCardinality || item.Tp == ast.StatsTypeCorrelation { + if statsStr != "" { + item.ScalarVals, err = strconv.ParseFloat(statsStr, 64) + if err != nil { + statslogutil.StatsLogger().Error("parse scalar stats failed", zap.String("stats", statsStr), zap.Error(err)) + return nil, err + } + } + } else { + item.StringVals = statsStr + } + table.ExtendedStats.Stats[name] = item + } + } + table.ExtendedStats.LastUpdateVersion = lastVersion + return table, nil +} + +func indexStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, loadAll bool, lease time.Duration, tracker *memory.Tracker) error { + histID := row.GetInt64(2) + distinct := row.GetInt64(3) + histVer := row.GetUint64(4) + nullCount := row.GetInt64(5) + statsVer := row.GetInt64(7) + idx := table.GetIdx(histID) + flag := row.GetInt64(8) + lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob)) + + for _, idxInfo := range tableInfo.Indices { + if histID != idxInfo.ID { + continue + } + table.ColAndIdxExistenceMap.InsertIndex(idxInfo.ID, statsVer != statistics.Version0) + // All the objects in the table shares the same stats version. + // Update here. + if statsVer != statistics.Version0 { + table.StatsVer = int(statsVer) + table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, histVer) + } + // We will not load buckets, topn and cmsketch if: + // 1. lease > 0, and: + // 2. the index doesn't have any of buckets, topn, cmsketch in memory before, and: + // 3. loadAll is false. + // 4. lite-init-stats is true(remove the condition when lite init stats is GA). + notNeedLoad := lease > 0 && + (idx == nil || ((!idx.IsStatsInitialized() || idx.IsAllEvicted()) && idx.LastUpdateVersion < histVer)) && + !loadAll && + config.GetGlobalConfig().Performance.LiteInitStats + if notNeedLoad { + // If we don't have this index in memory, skip it. + if idx == nil { + return nil + } + idx = &statistics.Index{ + Histogram: *statistics.NewHistogram(histID, distinct, nullCount, histVer, types.NewFieldType(mysql.TypeBlob), 0, 0), + StatsVer: statsVer, + Info: idxInfo, + Flag: flag, + PhysicalID: table.PhysicalID, + } + if idx.IsAnalyzed() { + idx.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + lastAnalyzePos.Copy(&idx.LastAnalyzePos) + break + } + if idx == nil || idx.LastUpdateVersion < histVer || loadAll { + hg, err := HistogramFromStorageWithPriority(sctx, table.PhysicalID, histID, types.NewFieldType(mysql.TypeBlob), distinct, 1, histVer, nullCount, 0, 0, kv.PriorityNormal) + if err != nil { + return errors.Trace(err) + } + cms, topN, err := CMSketchAndTopNFromStorageWithHighPriority(sctx, table.PhysicalID, 1, idxInfo.ID, statsVer) + if err != nil { + return errors.Trace(err) + } + var fmSketch *statistics.FMSketch + if loadAll { + // FMSketch is only used when merging partition stats into global stats. When merging partition stats into global stats, + // we load all the statistics, i.e., loadAll is true. + fmSketch, err = FMSketchFromStorage(sctx, table.PhysicalID, 1, histID) + if err != nil { + return errors.Trace(err) + } + } + idx = &statistics.Index{ + Histogram: *hg, + CMSketch: cms, + TopN: topN, + FMSketch: fmSketch, + Info: idxInfo, + StatsVer: statsVer, + Flag: flag, + PhysicalID: table.PhysicalID, + } + if statsVer != statistics.Version0 { + idx.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } + lastAnalyzePos.Copy(&idx.LastAnalyzePos) + } + break + } + if idx != nil { + if tracker != nil { + tracker.Consume(idx.MemoryUsage().TotalMemoryUsage()) + } + table.SetIdx(histID, idx) + } else { + logutil.BgLogger().Debug("we cannot find index id in table info. It may be deleted.", zap.Int64("indexID", histID), zap.String("table", tableInfo.Name.O)) + } + return nil +} + +func columnStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, loadAll bool, lease time.Duration, tracker *memory.Tracker) error { + histID := row.GetInt64(2) + distinct := row.GetInt64(3) + histVer := row.GetUint64(4) + nullCount := row.GetInt64(5) + totColSize := row.GetInt64(6) + statsVer := row.GetInt64(7) + correlation := row.GetFloat64(9) + lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob)) + col := table.GetCol(histID) + flag := row.GetInt64(8) + + for _, colInfo := range tableInfo.Columns { + if histID != colInfo.ID { + continue + } + table.ColAndIdxExistenceMap.InsertCol(histID, statsVer != statistics.Version0 || distinct > 0 || nullCount > 0) + // All the objects in the table shares the same stats version. + // Update here. + if statsVer != statistics.Version0 { + table.StatsVer = int(statsVer) + table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, histVer) + } + isHandle := tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()) + // We will not load buckets, topn and cmsketch if: + // 1. lease > 0, and: + // 2. this column is not handle or lite-init-stats is true(remove the condition when lite init stats is GA), and: + // 3. the column doesn't have any of buckets, topn, cmsketch in memory before, and: + // 4. loadAll is false. + // + // Here is the explanation of the condition `!col.IsStatsInitialized() || col.IsAllEvicted()`. + // For one column: + // 1. If there is no stats for it in the storage(i.e., analyze has never been executed before), then its stats status + // would be `!col.IsStatsInitialized()`. In this case we should go the `notNeedLoad` path. + // 2. If there exists stats for it in the storage but its stats status is `col.IsAllEvicted()`, there are two + // sub cases for this case. One is that the column stats have never been used/needed by the optimizer so they have + // never been loaded. The other is that the column stats were loaded and then evicted. For the both sub cases, + // we should go the `notNeedLoad` path. + // 3. If some parts(Histogram/TopN/CMSketch) of stats for it exist in TiDB memory currently, we choose to load all of + // its new stats once we find stats version is updated. + notNeedLoad := lease > 0 && + (!isHandle || config.GetGlobalConfig().Performance.LiteInitStats) && + (col == nil || ((!col.IsStatsInitialized() || col.IsAllEvicted()) && col.LastUpdateVersion < histVer)) && + !loadAll + if notNeedLoad { + // If we don't have the column in memory currently, just skip it. + if col == nil { + return nil + } + col = &statistics.Column{ + PhysicalID: table.PhysicalID, + Histogram: *statistics.NewHistogram(histID, distinct, nullCount, histVer, &colInfo.FieldType, 0, totColSize), + Info: colInfo, + IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + Flag: flag, + StatsVer: statsVer, + } + if col.StatsAvailable() { + col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + lastAnalyzePos.Copy(&col.LastAnalyzePos) + col.Histogram.Correlation = correlation + break + } + if col == nil || col.LastUpdateVersion < histVer || loadAll { + hg, err := HistogramFromStorageWithPriority(sctx, table.PhysicalID, histID, &colInfo.FieldType, distinct, 0, histVer, nullCount, totColSize, correlation, kv.PriorityNormal) + if err != nil { + return errors.Trace(err) + } + cms, topN, err := CMSketchAndTopNFromStorageWithHighPriority(sctx, table.PhysicalID, 0, colInfo.ID, statsVer) + if err != nil { + return errors.Trace(err) + } + var fmSketch *statistics.FMSketch + if loadAll { + // FMSketch is only used when merging partition stats into global stats. When merging partition stats into global stats, + // we load all the statistics, i.e., loadAll is true. + fmSketch, err = FMSketchFromStorage(sctx, table.PhysicalID, 0, histID) + if err != nil { + return errors.Trace(err) + } + } + col = &statistics.Column{ + PhysicalID: table.PhysicalID, + Histogram: *hg, + Info: colInfo, + CMSketch: cms, + TopN: topN, + FMSketch: fmSketch, + IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + Flag: flag, + StatsVer: statsVer, + } + if col.StatsAvailable() { + col.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } + lastAnalyzePos.Copy(&col.LastAnalyzePos) + break + } + if col.TotColSize != totColSize { + newCol := *col + newCol.TotColSize = totColSize + col = &newCol + } + break + } + if col != nil { + if tracker != nil { + tracker.Consume(col.MemoryUsage().TotalMemoryUsage()) + } + table.SetCol(col.ID, col) + } else { + // If we didn't find a Column or Index in tableInfo, we won't load the histogram for it. + // But don't worry, next lease the ddl will be updated, and we will load a same table for two times to + // avoid error. + logutil.BgLogger().Debug("we cannot find column in table info now. It may be deleted", zap.Int64("colID", histID), zap.String("table", tableInfo.Name.O)) + } + return nil +} + +// TableStatsFromStorage loads table stats info from storage. +func TableStatsFromStorage(sctx sessionctx.Context, snapshot uint64, tableInfo *model.TableInfo, tableID int64, loadAll bool, lease time.Duration, table *statistics.Table) (_ *statistics.Table, err error) { + tracker := memory.NewTracker(memory.LabelForAnalyzeMemory, -1) + tracker.AttachTo(sctx.GetSessionVars().MemTracker) + defer tracker.Detach() + // If table stats is pseudo, we also need to copy it, since we will use the column stats when + // the average error rate of it is small. + if table == nil || snapshot > 0 { + histColl := *statistics.NewHistColl(tableID, true, 0, 0, 4, 4) + table = &statistics.Table{ + HistColl: histColl, + ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(len(tableInfo.Columns), len(tableInfo.Indices)), + } + } else { + // We copy it before writing to avoid race. + table = table.Copy() + } + table.Pseudo = false + + realtimeCount, modidyCount, isNull, err := StatsMetaCountAndModifyCount(util.StatsCtx, sctx, tableID) + if err != nil || isNull { + return nil, err + } + table.ModifyCount = modidyCount + table.RealtimeCount = realtimeCount + + rows, _, err := util.ExecRows(sctx, "select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %?", tableID) + // Check deleted table. + if err != nil || len(rows) == 0 { + return nil, nil + } + for _, row := range rows { + if err := sctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil { + return nil, err + } + if row.GetInt64(1) > 0 { + err = indexStatsFromStorage(sctx, row, table, tableInfo, loadAll, lease, tracker) + } else { + err = columnStatsFromStorage(sctx, row, table, tableInfo, loadAll, lease, tracker) + } + if err != nil { + return nil, err + } + } + table.ColAndIdxExistenceMap.SetChecked() + return ExtendedStatsFromStorage(sctx, table, tableID, loadAll) +} + +// LoadHistogram will load histogram from storage. +func LoadHistogram(sctx sessionctx.Context, tableID int64, isIndex int, histID int64, tableInfo *model.TableInfo) (*statistics.Histogram, error) { + row, _, err := util.ExecRows(sctx, "select distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, histID) + if err != nil || len(row) == 0 { + return nil, err + } + distinct := row[0].GetInt64(0) + histVer := row[0].GetUint64(1) + nullCount := row[0].GetInt64(2) + var totColSize int64 + var corr float64 + var tp types.FieldType + if isIndex == 0 { + totColSize = row[0].GetInt64(3) + corr = row[0].GetFloat64(6) + for _, colInfo := range tableInfo.Columns { + if histID != colInfo.ID { + continue + } + tp = colInfo.FieldType + break + } + return HistogramFromStorageWithPriority(sctx, tableID, histID, &tp, distinct, isIndex, histVer, nullCount, totColSize, corr, kv.PriorityNormal) + } + return HistogramFromStorageWithPriority(sctx, tableID, histID, types.NewFieldType(mysql.TypeBlob), distinct, isIndex, histVer, nullCount, 0, 0, kv.PriorityNormal) +} + +// LoadNeededHistograms will load histograms for those needed columns/indices. +func LoadNeededHistograms(sctx sessionctx.Context, is infoschema.InfoSchema, statsHandle statstypes.StatsHandle, loadFMSketch bool) (err error) { + items := asyncload.AsyncLoadHistogramNeededItems.AllItems() + for _, item := range items { + if !item.IsIndex { + err = loadNeededColumnHistograms(sctx, statsHandle, item.TableItemID, loadFMSketch, item.FullLoad) + } else { + // Index is always full load. + err = loadNeededIndexHistograms(sctx, is, statsHandle, item.TableItemID, loadFMSketch) + } + if err != nil { + return err + } + } + return nil +} + +// CleanFakeItemsForShowHistInFlights cleans the invalid inserted items. +func CleanFakeItemsForShowHistInFlights(statsCache statstypes.StatsCache) int { + items := asyncload.AsyncLoadHistogramNeededItems.AllItems() + reallyNeeded := 0 + for _, item := range items { + tbl, ok := statsCache.Get(item.TableID) + if !ok { + asyncload.AsyncLoadHistogramNeededItems.Delete(item.TableItemID) + continue + } + loadNeeded := false + if item.IsIndex { + _, loadNeeded = tbl.IndexIsLoadNeeded(item.ID) + } else { + var analyzed bool + _, loadNeeded, analyzed = tbl.ColumnIsLoadNeeded(item.ID, item.FullLoad) + loadNeeded = loadNeeded && analyzed + } + if !loadNeeded { + asyncload.AsyncLoadHistogramNeededItems.Delete(item.TableItemID) + continue + } + reallyNeeded++ + } + return reallyNeeded +} + +func loadNeededColumnHistograms(sctx sessionctx.Context, statsHandle statstypes.StatsHandle, col model.TableItemID, loadFMSketch bool, fullLoad bool) (err error) { + statsTbl, ok := statsHandle.Get(col.TableID) + if !ok { + return nil + } + // Now, we cannot init the column info in the ColAndIdxExistenceMap when to disable lite-init-stats. + // so we have to get the column info from the domain. + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + tbl, ok := statsHandle.TableInfoByID(is, col.TableID) + if !ok { + return nil + } + tblInfo := tbl.Meta() + colInfo := tblInfo.GetColumnByID(col.ID) + if colInfo == nil { + asyncload.AsyncLoadHistogramNeededItems.Delete(col) + return nil + } + + _, loadNeeded, analyzed := statsTbl.ColumnIsLoadNeeded(col.ID, true) + if !loadNeeded || !analyzed { + // If this column is not analyzed yet and we don't have it in memory. + // We create a fake one for the pseudo estimation. + // Otherwise, it will trigger the sync/async load again, even if the column has not been analyzed. + if loadNeeded && !analyzed { + fakeCol := statistics.EmptyColumn(tblInfo.ID, tblInfo.PKIsHandle, colInfo) + statsTbl.SetCol(col.ID, fakeCol) + statsHandle.UpdateStatsCache([]*statistics.Table{statsTbl}, nil) + } + asyncload.AsyncLoadHistogramNeededItems.Delete(col) + return nil + } + + hg, _, statsVer, _, err := HistMetaFromStorageWithHighPriority(sctx, &col, colInfo) + if hg == nil || err != nil { + asyncload.AsyncLoadHistogramNeededItems.Delete(col) + return err + } + var ( + cms *statistics.CMSketch + topN *statistics.TopN + fms *statistics.FMSketch + ) + if fullLoad { + hg, err = HistogramFromStorageWithPriority(sctx, col.TableID, col.ID, &colInfo.FieldType, hg.NDV, 0, hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation, kv.PriorityHigh) + if err != nil { + return errors.Trace(err) + } + cms, topN, err = CMSketchAndTopNFromStorageWithHighPriority(sctx, col.TableID, 0, col.ID, statsVer) + if err != nil { + return errors.Trace(err) + } + if loadFMSketch { + fms, err = FMSketchFromStorage(sctx, col.TableID, 0, col.ID) + if err != nil { + return errors.Trace(err) + } + } + } + + colHist := &statistics.Column{ + PhysicalID: col.TableID, + Histogram: *hg, + Info: colInfo, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + IsHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + StatsVer: statsVer, + } + // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions + // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. + statsTbl, ok = statsHandle.Get(col.TableID) + if !ok { + return nil + } + statsTbl = statsTbl.Copy() + if colHist.StatsAvailable() { + if fullLoad { + colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + colHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + if statsVer != statistics.Version0 { + statsTbl.LastAnalyzeVersion = max(statsTbl.LastAnalyzeVersion, colHist.LastUpdateVersion) + statsTbl.StatsVer = int(statsVer) + } + } + statsTbl.SetCol(col.ID, colHist) + statsHandle.UpdateStatsCache([]*statistics.Table{statsTbl}, nil) + asyncload.AsyncLoadHistogramNeededItems.Delete(col) + if col.IsSyncLoadFailed { + logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.", + zap.Int64("table_id", colHist.PhysicalID), + zap.Int64("column_id", colHist.Info.ID), + zap.String("column_name", colHist.Info.Name.O)) + } + return nil +} + +func loadNeededIndexHistograms(sctx sessionctx.Context, is infoschema.InfoSchema, statsHandle statstypes.StatsHandle, idx model.TableItemID, loadFMSketch bool) (err error) { + tbl, ok := statsHandle.Get(idx.TableID) + if !ok { + return nil + } + _, loadNeeded := tbl.IndexIsLoadNeeded(idx.ID) + if !loadNeeded { + asyncload.AsyncLoadHistogramNeededItems.Delete(idx) + return nil + } + hgMeta, lastAnalyzePos, statsVer, flag, err := HistMetaFromStorageWithHighPriority(sctx, &idx, nil) + if hgMeta == nil || err != nil { + asyncload.AsyncLoadHistogramNeededItems.Delete(idx) + return err + } + tblInfo, ok := statsHandle.TableInfoByID(is, idx.TableID) + if !ok { + return nil + } + idxInfo := tblInfo.Meta().FindIndexByID(idx.ID) + hg, err := HistogramFromStorageWithPriority(sctx, idx.TableID, idx.ID, types.NewFieldType(mysql.TypeBlob), hgMeta.NDV, 1, hgMeta.LastUpdateVersion, hgMeta.NullCount, hgMeta.TotColSize, hgMeta.Correlation, kv.PriorityHigh) + if err != nil { + return errors.Trace(err) + } + cms, topN, err := CMSketchAndTopNFromStorageWithHighPriority(sctx, idx.TableID, 1, idx.ID, statsVer) + if err != nil { + return errors.Trace(err) + } + var fms *statistics.FMSketch + if loadFMSketch { + fms, err = FMSketchFromStorage(sctx, idx.TableID, 1, idx.ID) + if err != nil { + return errors.Trace(err) + } + } + idxHist := &statistics.Index{Histogram: *hg, CMSketch: cms, TopN: topN, FMSketch: fms, + Info: idxInfo, StatsVer: statsVer, + Flag: flag, PhysicalID: idx.TableID, + StatsLoadedStatus: statistics.NewStatsFullLoadStatus()} + lastAnalyzePos.Copy(&idxHist.LastAnalyzePos) + + tbl, ok = statsHandle.Get(idx.TableID) + if !ok { + return nil + } + tbl = tbl.Copy() + if idxHist.StatsVer != statistics.Version0 { + tbl.StatsVer = int(idxHist.StatsVer) + tbl.LastAnalyzeVersion = max(tbl.LastAnalyzeVersion, idxHist.LastUpdateVersion) + } + tbl.SetIdx(idx.ID, idxHist) + statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) + if idx.IsSyncLoadFailed { + logutil.BgLogger().Warn("Hist for index should already be loaded as sync but not found.", + zap.Int64("table_id", idx.TableID), + zap.Int64("index_id", idxHist.Info.ID), + zap.String("index_name", idxHist.Info.Name.O)) + } + asyncload.AsyncLoadHistogramNeededItems.Delete(idx) + return nil +} + +// StatsMetaByTableIDFromStorage gets the stats meta of a table from storage. +func StatsMetaByTableIDFromStorage(sctx sessionctx.Context, tableID int64, snapshot uint64) (version uint64, modifyCount, count int64, err error) { + var rows []chunk.Row + if snapshot == 0 { + rows, _, err = util.ExecRows(sctx, + "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID) + } else { + rows, _, err = util.ExecWithOpts(sctx, + []sqlexec.OptionFuncAlias{sqlexec.ExecOptionWithSnapshot(snapshot), sqlexec.ExecOptionUseCurSession}, + "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID) + } + if err != nil || len(rows) == 0 { + return + } + version = rows[0].GetUint64(0) + modifyCount = rows[0].GetInt64(1) + count = rows[0].GetInt64(2) + return +} diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go new file mode 100644 index 0000000000000..202db41eaad83 --- /dev/null +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -0,0 +1,626 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncload + +import ( + stderrors "errors" + "math/rand" + "runtime" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/storage" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" + "golang.org/x/sync/singleflight" +) + +// RetryCount is the max retry count for a sync load task. +const RetryCount = 2 + +// GetSyncLoadConcurrencyByCPU returns the concurrency of sync load by CPU. +func GetSyncLoadConcurrencyByCPU() int { + core := runtime.GOMAXPROCS(0) + if core <= 8 { + return 5 + } else if core <= 16 { + return 6 + } else if core <= 32 { + return 8 + } + return 10 +} + +type statsSyncLoad struct { + statsHandle statstypes.StatsHandle + is infoschema.InfoSchema + StatsLoad statstypes.StatsLoad +} + +var globalStatsSyncLoadSingleFlight singleflight.Group + +// NewStatsSyncLoad creates a new StatsSyncLoad. +func NewStatsSyncLoad(is infoschema.InfoSchema, statsHandle statstypes.StatsHandle) statstypes.StatsSyncLoad { + s := &statsSyncLoad{statsHandle: statsHandle, is: is} + cfg := config.GetGlobalConfig() + s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + return s +} + +type statsWrapper struct { + colInfo *model.ColumnInfo + idxInfo *model.IndexInfo + col *statistics.Column + idx *statistics.Index +} + +// SendLoadRequests send neededColumns requests +func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error { + remainedItems := s.removeHistLoadedColumns(neededHistItems) + + failpoint.Inject("assertSyncLoadItems", func(val failpoint.Value) { + if sc.OptimizeTracer != nil { + count := val.(int) + if len(remainedItems) != count { + panic("remained items count wrong") + } + } + }) + if len(remainedItems) <= 0 { + return nil + } + sc.StatsLoad.Timeout = timeout + sc.StatsLoad.NeededItems = remainedItems + sc.StatsLoad.ResultCh = make([]<-chan singleflight.Result, 0, len(remainedItems)) + for _, item := range remainedItems { + localItem := item + resultCh := globalStatsSyncLoadSingleFlight.DoChan(localItem.Key(), func() (any, error) { + timer := time.NewTimer(timeout) + defer timer.Stop() + task := &statstypes.NeededItemTask{ + Item: localItem, + ToTimeout: time.Now().Local().Add(timeout), + ResultCh: make(chan stmtctx.StatsLoadResult, 1), + } + select { + case s.StatsLoad.NeededItemsCh <- task: + metrics.SyncLoadDedupCounter.Inc() + select { + case <-timer.C: + return nil, errors.New("sync load took too long to return") + case result, ok := <-task.ResultCh: + intest.Assert(ok, "task.ResultCh cannot be closed") + return result, nil + } + case <-timer.C: + return nil, errors.New("sync load stats channel is full and timeout sending task to channel") + } + }) + sc.StatsLoad.ResultCh = append(sc.StatsLoad.ResultCh, resultCh) + } + sc.StatsLoad.LoadStartTime = time.Now() + return nil +} + +// SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout +func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { + if len(sc.StatsLoad.NeededItems) <= 0 { + return nil + } + var errorMsgs []string + defer func() { + if len(errorMsgs) > 0 { + logutil.BgLogger().Warn("SyncWaitStatsLoad meets error", + zap.Strings("errors", errorMsgs)) + } + sc.StatsLoad.NeededItems = nil + }() + resultCheckMap := map[model.TableItemID]struct{}{} + for _, col := range sc.StatsLoad.NeededItems { + resultCheckMap[col.TableItemID] = struct{}{} + } + timer := time.NewTimer(sc.StatsLoad.Timeout) + defer timer.Stop() + for _, resultCh := range sc.StatsLoad.ResultCh { + select { + case result, ok := <-resultCh: + metrics.SyncLoadCounter.Inc() + if !ok { + return errors.New("sync load stats channel closed unexpectedly") + } + // this error is from statsSyncLoad.SendLoadRequests which start to task and send task into worker, + // not the stats loading error + if result.Err != nil { + errorMsgs = append(errorMsgs, result.Err.Error()) + } else { + val := result.Val.(stmtctx.StatsLoadResult) + // this error is from the stats loading error + if val.HasError() { + errorMsgs = append(errorMsgs, val.ErrorMsg()) + } + delete(resultCheckMap, val.Item) + } + case <-timer.C: + metrics.SyncLoadCounter.Inc() + metrics.SyncLoadTimeoutCounter.Inc() + return errors.New("sync load stats timeout") + } + } + if len(resultCheckMap) == 0 { + metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) + return nil + } + return nil +} + +// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache. +func (s *statsSyncLoad) removeHistLoadedColumns(neededItems []model.StatsLoadItem) []model.StatsLoadItem { + remainedItems := make([]model.StatsLoadItem, 0, len(neededItems)) + for _, item := range neededItems { + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + continue + } + if item.IsIndex { + _, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) + if loadNeeded { + remainedItems = append(remainedItems, item) + } + continue + } + _, loadNeeded, _ := tbl.ColumnIsLoadNeeded(item.ID, item.FullLoad) + if loadNeeded { + remainedItems = append(remainedItems, item) + } + } + return remainedItems +} + +// AppendNeededItem appends needed columns/indices to ch, it is only used for test +func (s *statsSyncLoad) AppendNeededItem(task *statstypes.NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case s.StatsLoad.NeededItemsCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +var errExit = errors.New("Stop loading since domain is closed") + +// SubLoadWorker loads hist data for each column +func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { + defer func() { + exitWg.Done() + logutil.BgLogger().Info("SubLoadWorker exited.") + }() + // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry + var lastTask *statstypes.NeededItemTask + for { + task, err := s.HandleOneTask(sctx, lastTask, exit) + lastTask = task + if err != nil { + switch err { + case errExit: + return + default: + // To avoid the thundering herd effect + // thundering herd effect: Everyone tries to retry a large number of requests simultaneously when a problem occurs. + r := rand.Intn(500) + time.Sleep(s.statsHandle.Lease()/10 + time.Duration(r)*time.Microsecond) + continue + } + } + } +} + +// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. +// - If the task is handled successfully, return nil, nil. +// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. +// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. +func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) { + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("stats loading panicked", zap.Any("error", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + }() + if lastTask == nil { + task, err = s.drainColTask(sctx, exit) + if err != nil { + if err != errExit { + logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) + } + return task, err + } + } else { + task = lastTask + } + result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID} + err = s.handleOneItemTask(task) + if err == nil { + task.ResultCh <- result + return nil, nil + } + if !isVaildForRetry(task) { + result.Error = err + task.ResultCh <- result + return nil, nil + } + return task, err +} + +func isVaildForRetry(task *statstypes.NeededItemTask) bool { + task.Retry++ + return task.Retry <= RetryCount +} + +func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err error) { + se, err := s.statsHandle.SPool().Get() + if err != nil { + return err + } + sctx := se.(sessionctx.Context) + sctx.GetSessionVars().StmtCtx.Priority = mysql.HighPriority + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + if err == nil { // only recycle when no error + sctx.GetSessionVars().StmtCtx.Priority = mysql.NoPriority + s.statsHandle.SPool().Put(se) + } + }() + var skipTypes map[string]struct{} + val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) + if err != nil { + logutil.BgLogger().Warn("failed to get global variable", zap.Error(err)) + } else { + skipTypes = variable.ParseAnalyzeSkipColumnTypes(val) + } + + item := task.Item.TableItemID + tbl, ok := s.statsHandle.Get(item.TableID) + + if !ok { + return nil + } + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + tblInfo, ok := s.statsHandle.TableInfoByID(is, item.TableID) + if !ok { + return nil + } + isPkIsHandle := tblInfo.Meta().PKIsHandle + wrapper := &statsWrapper{} + if item.IsIndex { + index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) + if !loadNeeded { + return nil + } + if index != nil { + wrapper.idxInfo = index.Info + } else { + wrapper.idxInfo = tblInfo.Meta().FindIndexByID(item.ID) + } + } else { + col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad) + if !loadNeeded { + return nil + } + if col != nil { + wrapper.colInfo = col.Info + } else { + // Now, we cannot init the column info in the ColAndIdxExistenceMap when to disable lite-init-stats. + // so we have to get the column info from the domain. + wrapper.colInfo = tblInfo.Meta().GetColumnByID(item.ID) + } + if skipTypes != nil { + _, skip := skipTypes[types.TypeToStr(wrapper.colInfo.FieldType.GetType(), wrapper.colInfo.FieldType.GetCharset())] + if skip { + return nil + } + } + + // If this column is not analyzed yet and we don't have it in memory. + // We create a fake one for the pseudo estimation. + // Otherwise, it will trigger the sync/async load again, even if the column has not been analyzed. + if loadNeeded && !analyzed { + wrapper.col = statistics.EmptyColumn(item.TableID, isPkIsHandle, wrapper.colInfo) + s.updateCachedItem(tblInfo, item, wrapper.col, wrapper.idx, task.Item.FullLoad) + return nil + } + } + failpoint.Inject("handleOneItemTaskPanic", nil) + t := time.Now() + needUpdate := false + wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, isPkIsHandle, task.Item.FullLoad) + if stderrors.Is(err, errGetHistMeta) { + return nil + } + if err != nil { + return err + } + if item.IsIndex { + if wrapper.idxInfo != nil { + needUpdate = true + } + } else { + if wrapper.colInfo != nil { + needUpdate = true + } + } + metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) + if needUpdate { + s.updateCachedItem(tblInfo, item, wrapper.col, wrapper.idx, task.Item.FullLoad) + } + return nil +} + +var errGetHistMeta = errors.New("fail to get hist meta") + +// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously +func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper, isPkIsHandle bool, fullLoad bool) (*statsWrapper, error) { + failpoint.Inject("mockReadStatsForOnePanic", nil) + failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, errors.New("gofail ReadStatsForOne error")) + } + }) + loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch + var hg *statistics.Histogram + var err error + isIndexFlag := int64(0) + hg, lastAnalyzePos, statsVer, flag, err := storage.HistMetaFromStorageWithHighPriority(sctx, &item, w.colInfo) + if err != nil { + return nil, err + } + if hg == nil { + logutil.BgLogger().Warn("fail to get hist meta for this histogram, possibly a deleted one", zap.Int64("table_id", item.TableID), + zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex)) + return nil, errGetHistMeta + } + if item.IsIndex { + isIndexFlag = 1 + } + var cms *statistics.CMSketch + var topN *statistics.TopN + var fms *statistics.FMSketch + if fullLoad { + if item.IsIndex { + hg, err = storage.HistogramFromStorageWithPriority(sctx, item.TableID, item.ID, types.NewFieldType(mysql.TypeBlob), hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation, kv.PriorityHigh) + if err != nil { + return nil, errors.Trace(err) + } + } else { + hg, err = storage.HistogramFromStorageWithPriority(sctx, item.TableID, item.ID, &w.colInfo.FieldType, hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation, kv.PriorityHigh) + if err != nil { + return nil, errors.Trace(err) + } + } + cms, topN, err = storage.CMSketchAndTopNFromStorageWithHighPriority(sctx, item.TableID, isIndexFlag, item.ID, statsVer) + if err != nil { + return nil, errors.Trace(err) + } + if loadFMSketch { + fms, err = storage.FMSketchFromStorage(sctx, item.TableID, isIndexFlag, item.ID) + if err != nil { + return nil, errors.Trace(err) + } + } + } + if item.IsIndex { + idxHist := &statistics.Index{ + Histogram: *hg, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + Info: w.idxInfo, + StatsVer: statsVer, + Flag: flag, + PhysicalID: item.TableID, + } + if statsVer != statistics.Version0 { + if fullLoad { + idxHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + idxHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + } + lastAnalyzePos.Copy(&idxHist.LastAnalyzePos) + w.idx = idxHist + } else { + colHist := &statistics.Column{ + PhysicalID: item.TableID, + Histogram: *hg, + Info: w.colInfo, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + IsHandle: isPkIsHandle && mysql.HasPriKeyFlag(w.colInfo.GetFlag()), + StatsVer: statsVer, + } + if colHist.StatsAvailable() { + if fullLoad { + colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + colHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + } + w.col = colHist + } + return w, nil +} + +// drainColTask will hang until a column task can return, and either task or error will be returned. +func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) { + // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh + for { + select { + case <-exit: + return nil, errExit + case task, ok := <-s.StatsLoad.NeededItemsCh: + if !ok { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // if the task has already timeout, no sql is sync-waiting for it, + // so do not handle it just now, put it to another channel with lower priority + if time.Now().After(task.ToTimeout) { + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + continue + } + return task, nil + case task, ok := <-s.StatsLoad.TimeoutItemsCh: + select { + case <-exit: + return nil, errExit + case task0, ok0 := <-s.StatsLoad.NeededItemsCh: + if !ok0 { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + return task0, nil + default: + if !ok { + return nil, errors.New("drainColTask: cannot read from TimeoutColumnsCh, maybe the chan is closed") + } + // NeededColumnsCh is empty now, handle task from TimeoutColumnsCh + return task, nil + } + } + } +} + +// writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. +func (*statsSyncLoad) writeToTimeoutChan(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask) { + select { + case taskCh <- task: + default: + } +} + +// writeToChanWithTimeout writes a task to a channel and blocks until timeout. +func (*statsSyncLoad) writeToChanWithTimeout(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case taskCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. +func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) + } + }() + select { + case resultCh <- rs: + default: + } +} + +// updateCachedItem updates the column/index hist to global statsCache. +func (s *statsSyncLoad) updateCachedItem(tblInfo table.Table, item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) { + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() + // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions + // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + return false + } + if !tbl.ColAndIdxExistenceMap.Checked() { + tbl = tbl.Copy() + for _, col := range tbl.HistColl.GetColSlice() { + if tblInfo.Meta().FindColumnByID(col.ID) == nil { + tbl.HistColl.DelCol(col.ID) + tbl.ColAndIdxExistenceMap.DeleteColAnalyzed(col.ID) + } + } + for _, idx := range tbl.HistColl.GetIdxSlice() { + if tblInfo.Meta().FindIndexByID(idx.ID) == nil { + tbl.HistColl.DelIdx(idx.ID) + tbl.ColAndIdxExistenceMap.DeleteIdxAnalyzed(idx.ID) + } + } + tbl.ColAndIdxExistenceMap.SetChecked() + } + if !item.IsIndex && colHist != nil { + c := tbl.GetCol(item.ID) + // - If the stats is fully loaded, + // - If the stats is meta-loaded and we also just need the meta. + if c != nil && (c.IsFullLoad() || !fullLoaded) { + return false + } + tbl = tbl.Copy() + tbl.SetCol(item.ID, colHist) + + // If the column is analyzed we refresh the map for the possible change. + if colHist.StatsAvailable() { + tbl.ColAndIdxExistenceMap.InsertCol(item.ID, true) + } + // All the objects shares the same stats version. Update it here. + if colHist.StatsVer != statistics.Version0 { + tbl.StatsVer = statistics.Version0 + } + // we have to refresh the map for the possible change to ensure that the map information is not missing. + tbl.ColAndIdxExistenceMap.InsertCol(item.ID, colHist.StatsAvailable()) + } else if item.IsIndex && idxHist != nil { + index := tbl.GetIdx(item.ID) + // - If the stats is fully loaded, + // - If the stats is meta-loaded and we also just need the meta. + if index != nil && (index.IsFullLoad() || !fullLoaded) { + return true + } + tbl = tbl.Copy() + tbl.SetIdx(item.ID, idxHist) + // If the index is analyzed we refresh the map for the possible change. + if idxHist.IsAnalyzed() { + tbl.ColAndIdxExistenceMap.InsertIndex(item.ID, true) + // All the objects shares the same stats version. Update it here. + tbl.StatsVer = statistics.Version0 + } + } + s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) + return true +} diff --git a/pkg/statistics/integration_test.go b/pkg/statistics/integration_test.go new file mode 100644 index 0000000000000..6f498c125ec60 --- /dev/null +++ b/pkg/statistics/integration_test.go @@ -0,0 +1,614 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics_test + +import ( + "context" + "fmt" + "math" + "strconv" + "strings" + "testing" + "time" + + "github.com/pingcap/failpoint" + metamodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/analyzehelper" + "github.com/pingcap/tidb/pkg/testkit/testdata" + "github.com/stretchr/testify/require" +) + +func TestChangeVerTo2Behavior(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + originalVal1 := tk.MustQuery("select @@tidb_persist_analyze_options").Rows()[0][0].(string) + defer func() { + tk.MustExec(fmt.Sprintf("set global tidb_persist_analyze_options = %v", originalVal1)) + }() + tk.MustExec("set global tidb_persist_analyze_options=false") + + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, index idx(a))") + tk.MustExec("set @@session.tidb_analyze_version = 1") + tk.MustExec("insert into t values(1, 1), (1, 2), (1, 3)") + analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a", "b") + tk.MustExec("analyze table t") + is := dom.InfoSchema() + tblT, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + h := dom.StatsHandle() + require.NoError(t, h.Update(context.Background(), is)) + statsTblT := h.GetTableStats(tblT.Meta()) + // Analyze table with version 1 success, all statistics are version 1. + statsTblT.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.Equal(t, int64(1), col.GetStatsVer()) + return false + }) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(1), idx.GetStatsVer()) + return false + }) + tk.MustExec("set @@session.tidb_analyze_version = 2") + tk.MustExec("analyze table t index idx") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 The analyze version from the session is not compatible with the existing statistics of the table. Use the existing version instead")) + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(1), idx.GetStatsVer()) + return false + }) + tk.MustExec("analyze table t index") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 The analyze version from the session is not compatible with the existing statistics of the table. Use the existing version instead")) + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(1), idx.GetStatsVer()) + return false + }) + tk.MustExec("analyze table t ") + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.Equal(t, int64(2), col.GetStatsVer()) + return false + }) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(2), idx.GetStatsVer()) + return false + }) + tk.MustExec("set @@session.tidb_analyze_version = 1") + tk.MustExec("analyze table t index idx") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 The analyze version from the session is not compatible with the existing statistics of the table. Use the existing version instead", + "Warning 1105 The version 2 would collect all statistics not only the selected indexes")) + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(2), idx.GetStatsVer()) + return false + }) + tk.MustExec("analyze table t index") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 The analyze version from the session is not compatible with the existing statistics of the table. Use the existing version instead", + "Warning 1105 The version 2 would collect all statistics not only the selected indexes")) + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(2), idx.GetStatsVer()) + return false + }) + tk.MustExec("analyze table t ") + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.Equal(t, int64(1), col.GetStatsVer()) + return false + }) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(1), idx.GetStatsVer()) + return false + }) +} + +func TestChangeVerTo2BehaviorWithPersistedOptions(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + originalVal1 := tk.MustQuery("select @@tidb_persist_analyze_options").Rows()[0][0].(string) + defer func() { + tk.MustExec(fmt.Sprintf("set global tidb_persist_analyze_options = %v", originalVal1)) + }() + tk.MustExec("set global tidb_persist_analyze_options=true") + + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, index idx(a))") + tk.MustExec("set @@session.tidb_analyze_version = 1") + tk.MustExec("insert into t values(1, 1), (1, 2), (1, 3)") + analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a", "b") + tk.MustExec("analyze table t") + is := dom.InfoSchema() + tblT, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + h := dom.StatsHandle() + require.NoError(t, h.Update(context.Background(), is)) + statsTblT := h.GetTableStats(tblT.Meta()) + // Analyze table with version 1 success, all statistics are version 1. + statsTblT.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.Equal(t, int64(1), col.GetStatsVer()) + return false + }) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(1), idx.GetStatsVer()) + return false + }) + tk.MustExec("set @@session.tidb_analyze_version = 2") + tk.MustExec("analyze table t index idx") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 The analyze version from the session is not compatible with the existing statistics of the table. Use the existing version instead")) + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(1), idx.GetStatsVer()) + return false + }) + tk.MustExec("analyze table t index") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 The analyze version from the session is not compatible with the existing statistics of the table. Use the existing version instead")) + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(1), idx.GetStatsVer()) + return false + }) + tk.MustExec("analyze table t ") + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.Equal(t, int64(2), col.GetStatsVer()) + return false + }) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(2), idx.GetStatsVer()) + return false + }) + tk.MustExec("set @@session.tidb_analyze_version = 1") + tk.MustExec("analyze table t index idx") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 The analyze version from the session is not compatible with the existing statistics of the table. Use the existing version instead", + "Warning 1105 The version 2 would collect all statistics not only the selected indexes", + "Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t, reason to use this rate is \"use min(1, 110000/3) as the sample-rate=1\"")) // since fallback to ver2 path, should do samplerate adjustment + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(2), idx.GetStatsVer()) + return false + }) + tk.MustExec("analyze table t index") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 The analyze version from the session is not compatible with the existing statistics of the table. Use the existing version instead", + "Warning 1105 The version 2 would collect all statistics not only the selected indexes", + "Note 1105 Analyze use auto adjusted sample rate 1.000000 for table test.t, reason to use this rate is \"use min(1, 110000/3) as the sample-rate=1\"")) + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(2), idx.GetStatsVer()) + return false + }) + tk.MustExec("analyze table t ") + require.NoError(t, h.Update(context.Background(), is)) + statsTblT = h.GetTableStats(tblT.Meta()) + statsTblT.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.Equal(t, int64(1), col.GetStatsVer()) + return false + }) + statsTblT.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.Equal(t, int64(1), idx.GetStatsVer()) + return false + }) +} + +func TestExpBackoffEstimation(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`set @@tidb_enable_non_prepared_plan_cache=0`) // estRows won't be updated if hit cache. + tk.MustExec("set tidb_cost_model_version=2") + tk.MustExec("create table exp_backoff(a int, b int, c int, d int, index idx(a, b, c, d))") + tk.MustExec("insert into exp_backoff values(1, 1, 1, 1), (1, 1, 1, 2), (1, 1, 2, 3), (1, 2, 2, 4), (1, 2, 3, 5)") + tk.MustExec("set @@session.tidb_analyze_version=2") + tk.MustExec("analyze table exp_backoff") + var ( + input []string + output [][]string + ) + integrationSuiteData := statistics.GetIntegrationSuiteData() + integrationSuiteData.LoadTestCases(t, &input, &output) + inputLen := len(input) + // The test cases are: + // Query a = 1, b = 1, c = 1, d >= 3 and d <= 5 separately. We got 5, 3, 2, 3. + // And then query and a = 1 and b = 1 and c = 1 and d >= 3 and d <= 5. It's result should follow the exp backoff, + // which is 2/5 * (3/5)^{1/2} * (3/5)*{1/4} * 1^{1/8} * 5 = 1.3634. + for i := 0; i < inputLen-1; i++ { + testdata.OnRecord(func() { + output[i] = testdata.ConvertRowsToStrings(tk.MustQuery(input[i]).Rows()) + }) + tk.MustQuery(input[i]).Check(testkit.Rows(output[i]...)) + } + + // The last case is that no column is loaded and we get no stats at all. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cardinality/cleanEstResults", `return(true)`)) + testdata.OnRecord(func() { + output[inputLen-1] = testdata.ConvertRowsToStrings(tk.MustQuery(input[inputLen-1]).Rows()) + }) + tk.MustQuery(input[inputLen-1]).Check(testkit.Rows(output[inputLen-1]...)) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cardinality/cleanEstResults")) +} + +func TestNULLOnFullSampling(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("set @@session.tidb_analyze_version = 2;") + tk.MustExec("create table t(a int, index idx(a))") + tk.MustExec("insert into t values(1), (1), (1), (2), (2), (3), (4), (null), (null), (null)") + var ( + input []string + output [][]string + ) + tk.MustExec("analyze table t with 2 topn") + is := dom.InfoSchema() + tblT, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + h := dom.StatsHandle() + require.NoError(t, h.Update(context.Background(), is)) + statsTblT := h.GetTableStats(tblT.Meta()) + // Check the null count is 3. + statsTblT.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.Equal(t, int64(3), col.NullCount) + return false + }) + integrationSuiteData := statistics.GetIntegrationSuiteData() + integrationSuiteData.LoadTestCases(t, &input, &output) + // Check the topn and buckets contains no null values. + for i := 0; i < len(input); i++ { + testdata.OnRecord(func() { + output[i] = testdata.ConvertRowsToStrings(tk.MustQuery(input[i]).Rows()) + }) + tk.MustQuery(input[i]).Check(testkit.Rows(output[i]...)) + } +} + +func TestAnalyzeSnapshot(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("set @@session.tidb_analyze_version = 2;") + tk.MustExec("create table t(a int, index(a))") + tk.MustExec("insert into t values(1), (1), (1)") + tk.MustExec("analyze table t") + rows := tk.MustQuery("select count, snapshot, version from mysql.stats_meta").Rows() + require.Len(t, rows, 1) + require.Equal(t, "3", rows[0][0]) + s1Str := rows[0][1].(string) + s1, err := strconv.ParseUint(s1Str, 10, 64) + require.NoError(t, err) + require.True(t, s1 < math.MaxUint64) + + // TestHistogramsWithSameTxnTS + v1 := rows[0][2].(string) + rows = tk.MustQuery("select version from mysql.stats_histograms").Rows() + require.Len(t, rows, 2) + v2 := rows[0][0].(string) + require.Equal(t, v1, v2) + v3 := rows[1][0].(string) + require.Equal(t, v2, v3) + + tk.MustExec("insert into t values(1), (1), (1)") + tk.MustExec("analyze table t") + rows = tk.MustQuery("select count, snapshot from mysql.stats_meta").Rows() + require.Len(t, rows, 1) + require.Equal(t, "6", rows[0][0]) + s2Str := rows[0][1].(string) + s2, err := strconv.ParseUint(s2Str, 10, 64) + require.NoError(t, err) + require.True(t, s2 < math.MaxUint64) + require.True(t, s2 > s1) +} + +func TestOutdatedStatsCheck(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + oriStart := tk.MustQuery("select @@tidb_auto_analyze_start_time").Rows()[0][0].(string) + oriEnd := tk.MustQuery("select @@tidb_auto_analyze_end_time").Rows()[0][0].(string) + statistics.AutoAnalyzeMinCnt = 0 + defer func() { + statistics.AutoAnalyzeMinCnt = 1000 + tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_start_time='%v'", oriStart)) + tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_end_time='%v'", oriEnd)) + }() + tk.MustExec("set global tidb_auto_analyze_start_time='00:00 +0000'") + tk.MustExec("set global tidb_auto_analyze_end_time='23:59 +0000'") + tk.MustExec("set session tidb_enable_pseudo_for_outdated_stats=1") + + h := dom.StatsHandle() + tk.MustExec("use test") + tk.MustExec("create table t (a int)") + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + tk.MustExec("insert into t values (1)" + strings.Repeat(", (1)", 19)) // 20 rows + analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + is := dom.InfoSchema() + require.NoError(t, h.Update(context.Background(), is)) + // To pass the stats.Pseudo check in autoAnalyzeTable + tk.MustExec("analyze table t") + tk.MustExec("explain select * from t where a = 1") + require.NoError(t, h.LoadNeededHistograms(dom.InfoSchema())) + + getStatsHealthy := func() int { + rows := tk.MustQuery("show stats_healthy where db_name = 'test' and table_name = 't'").Rows() + require.Len(t, rows, 1) + healthy, err := strconv.Atoi(rows[0][3].(string)) + require.NoError(t, err) + return healthy + } + + tk.MustExec("insert into t values (1)" + strings.Repeat(", (1)", 13)) // 34 rows + require.NoError(t, h.DumpStatsDeltaToKV(true)) + require.NoError(t, h.Update(context.Background(), is)) + require.Equal(t, getStatsHealthy(), 30) + require.False(t, hasPseudoStats(tk.MustQuery("explain select * from t where a = 1").Rows())) + tk.MustExec("insert into t values (1)") // 35 rows + require.NoError(t, h.DumpStatsDeltaToKV(true)) + require.NoError(t, h.Update(context.Background(), is)) + require.Equal(t, getStatsHealthy(), 25) + require.True(t, hasPseudoStats(tk.MustQuery("explain select * from t where a = 1").Rows())) + + tk.MustExec("analyze table t") + + tk.MustExec("delete from t limit 24") // 11 rows + require.NoError(t, h.DumpStatsDeltaToKV(true)) + require.NoError(t, h.Update(context.Background(), is)) + require.Equal(t, getStatsHealthy(), 31) + require.False(t, hasPseudoStats(tk.MustQuery("explain select * from t where a = 1").Rows())) + + tk.MustExec("delete from t limit 1") // 10 rows + require.NoError(t, h.DumpStatsDeltaToKV(true)) + require.NoError(t, h.Update(context.Background(), is)) + require.Equal(t, getStatsHealthy(), 28) + require.True(t, hasPseudoStats(tk.MustQuery("explain select * from t where a = 1").Rows())) +} + +func hasPseudoStats(rows [][]any) bool { + for i := range rows { + if strings.Contains(rows[i][4].(string), "stats:pseudo") { + return true + } + } + return false +} + +func TestShowHistogramsLoadStatus(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + h := dom.StatsHandle() + origLease := h.Lease() + h.SetLease(time.Second) + defer func() { h.SetLease(origLease) }() + tk.MustExec("use test") + tk.MustExec("create table t(a int primary key, b int, c int, index idx(b, c))") + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + tk.MustExec("insert into t values (1,2,3), (4,5,6)") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t") + require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) + rows := tk.MustQuery("show stats_histograms where db_name = 'test' and table_name = 't'").Rows() + for _, row := range rows { + require.Equal(t, "allEvicted", row[10].(string)) + } +} + +func TestSingleColumnIndexNDV(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + h := dom.StatsHandle() + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, c varchar(20), d varchar(20), index idx_a(a), index idx_b(b), index idx_c(c), index idx_d(d))") + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + tk.MustExec("insert into t values (1, 1, 'xxx', 'zzz'), (2, 2, 'yyy', 'zzz'), (1, 3, null, 'zzz')") + for i := 0; i < 5; i++ { + tk.MustExec("insert into t select * from t") + } + tk.MustExec("analyze table t") + rows := tk.MustQuery("show stats_histograms where db_name = 'test' and table_name = 't'").Sort().Rows() + expectedResults := [][]string{ + {"a", "2", "0"}, {"b", "3", "0"}, {"c", "2", "32"}, {"d", "1", "0"}, + {"idx_a", "2", "0"}, {"idx_b", "3", "0"}, {"idx_c", "2", "32"}, {"idx_d", "1", "0"}, + } + for i, row := range rows { + require.Equal(t, expectedResults[i][0], row[3]) // column_name + require.Equal(t, expectedResults[i][1], row[6]) // distinct_count + require.Equal(t, expectedResults[i][2], row[7]) // null_count + } +} + +func TestColumnStatsLazyLoad(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + h := dom.StatsHandle() + originLease := h.Lease() + defer h.SetLease(originLease) + // Set `Lease` to `Millisecond` to enable column stats lazy load. + h.SetLease(time.Millisecond) + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values (1,2), (3,4), (5,6), (7,8)") + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a", "b") + tk.MustExec("analyze table t") + is := dom.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblInfo := tbl.Meta() + c1 := tblInfo.Columns[0] + c2 := tblInfo.Columns[1] + require.True(t, h.GetTableStats(tblInfo).GetCol(c1.ID).IsAllEvicted()) + require.True(t, h.GetTableStats(tblInfo).GetCol(c2.ID).IsAllEvicted()) + tk.MustExec("analyze table t") + require.True(t, h.GetTableStats(tblInfo).GetCol(c1.ID).IsAllEvicted()) + require.True(t, h.GetTableStats(tblInfo).GetCol(c2.ID).IsAllEvicted()) +} + +func TestUpdateNotLoadIndexFMSketch(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + h := dom.StatsHandle() + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (10),partition p1 values less than maxvalue)") + tk.MustExec("insert into t values (1,2), (3,4), (5,6), (7,8)") + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + tk.MustExec("analyze table t") + is := dom.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblInfo := tbl.Meta() + idxInfo := tblInfo.Indices[0] + p0 := tblInfo.Partition.Definitions[0] + p1 := tblInfo.Partition.Definitions[1] + require.Nil(t, h.GetPartitionStats(tblInfo, p0.ID).GetIdx(idxInfo.ID).FMSketch) + require.Nil(t, h.GetPartitionStats(tblInfo, p1.ID).GetIdx(idxInfo.ID).FMSketch) + h.Clear() + require.NoError(t, h.Update(context.Background(), is)) + require.Nil(t, h.GetPartitionStats(tblInfo, p0.ID).GetIdx(idxInfo.ID).FMSketch) + require.Nil(t, h.GetPartitionStats(tblInfo, p1.ID).GetIdx(idxInfo.ID).FMSketch) +} + +func TestIssue44369(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + h := dom.StatsHandle() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, index iab(a,b));") + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + tk.MustExec("insert into t value(1,1);") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t;") + is := dom.InfoSchema() + require.NoError(t, h.Update(context.Background(), is)) + tk.MustExec("alter table t rename column b to bb;") + tk.MustExec("select * from t where a = 10 and bb > 20;") +} + +func TestTableLastAnalyzeVersion(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + h := dom.StatsHandle() + tk := testkit.NewTestKit(t, store) + + // Only create table should not set the last_analyze_version + tk.MustExec("use test") + tk.MustExec("create table t(a int);") + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + is := dom.InfoSchema() + require.NoError(t, h.Update(context.Background(), is)) + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + statsTbl, found := h.Get(tbl.Meta().ID) + require.True(t, found) + require.Equal(t, uint64(0), statsTbl.LastAnalyzeVersion) + + // Only alter table should not set the last_analyze_version + tk.MustExec("alter table t add column b int default 0") + is = dom.InfoSchema() + tbl, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + require.NoError(t, h.Update(context.Background(), is)) + statsTbl, found = h.Get(tbl.Meta().ID) + require.True(t, found) + require.Equal(t, uint64(0), statsTbl.LastAnalyzeVersion) + tk.MustExec("alter table t add index idx(a)") + is = dom.InfoSchema() + tbl, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + e := <-h.DDLEventCh() + require.Equal(t, metamodel.ActionAddIndex, e.GetType()) + require.Equal(t, 0, len(h.DDLEventCh())) + require.NoError(t, err) + require.NoError(t, h.Update(context.Background(), is)) + statsTbl, found = h.Get(tbl.Meta().ID) + require.True(t, found) + require.Equal(t, uint64(0), statsTbl.LastAnalyzeVersion) + + // INSERT and updating the modify_count should not set the last_analyze_version + tk.MustExec("insert into t values(1, 1)") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + require.NoError(t, h.Update(context.Background(), is)) + statsTbl, found = h.Get(tbl.Meta().ID) + require.True(t, found) + require.Equal(t, uint64(0), statsTbl.LastAnalyzeVersion) + + // After analyze, last_analyze_version is set. + tk.MustExec("analyze table t") + require.NoError(t, h.Update(context.Background(), is)) + statsTbl, found = h.Get(tbl.Meta().ID) + require.True(t, found) + require.NotEqual(t, uint64(0), statsTbl.LastAnalyzeVersion) +} + +func TestGlobalIndexWithAnalyzeVersion1AndHistoricalStats(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("set tidb_analyze_version = 1") + tk.MustExec("set global tidb_enable_historical_stats = true") + defer tk.MustExec("set global tidb_enable_historical_stats = default") + + tk.MustExec("use test") + tk.MustExec(`CREATE TABLE t ( a int, b int, c int default 0) + PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20), + PARTITION p2 VALUES LESS THAN (30), + PARTITION p3 VALUES LESS THAN (40))`) + tk.MustExec("ALTER TABLE t ADD UNIQUE INDEX idx(b) GLOBAL") + tk.MustExec("INSERT INTO t(a, b) values(1, 1), (2, 2), (3, 3), (15, 15), (25, 25), (35, 35)") + + tblID := dom.MustGetTableID(t, "test", "t") + + for i := 0; i < 10; i++ { + tk.MustExec("analyze table t") + } + // Each analyze will only generate one record + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id=%d", tblID)).Equal(testkit.Rows("10")) +} + +func TestLastAnalyzeVersionNotChangedWithAsyncStatsLoad(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("set @@tidb_stats_load_sync_wait = 0;") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int);") + require.NoError(t, dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh())) + require.NoError(t, dom.StatsHandle().Update(context.Background(), dom.InfoSchema())) + tk.MustExec("insert into t values (1, 1);") + err := dom.StatsHandle().DumpStatsDeltaToKV(true) + require.NoError(t, err) + tk.MustExec("alter table t add column c int default 1;") + dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh()) + tk.MustExec("select * from t where a = 1 or b = 1 or c = 1;") + require.NoError(t, dom.StatsHandle().LoadNeededHistograms(dom.InfoSchema())) + result := tk.MustQuery("show stats_meta where table_name = 't'") + require.Len(t, result.Rows(), 1) + // The last analyze time. + require.Equal(t, "", result.Rows()[0][6]) +} diff --git a/pkg/statistics/table.go b/pkg/statistics/table.go new file mode 100644 index 0000000000000..804e115056c7b --- /dev/null +++ b/pkg/statistics/table.go @@ -0,0 +1,1066 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "cmp" + "fmt" + "slices" + "strings" + + "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/planner/planctx" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/ranger" + "go.uber.org/atomic" + "golang.org/x/exp/maps" +) + +const ( + // PseudoVersion means the pseudo statistics version is 0. + PseudoVersion uint64 = 0 + + // PseudoRowCount export for other pkg to use. + // When we haven't analyzed a table, we use pseudo statistics to estimate costs. + // It has row count 10000, equal condition selects 1/1000 of total rows, less condition selects 1/3 of total rows, + // between condition selects 1/40 of total rows. + PseudoRowCount = 10000 +) + +// AutoAnalyzeMinCnt means if the count of table is less than this value, we don't need to do auto analyze. +// Exported for testing. +var AutoAnalyzeMinCnt int64 = 1000 + +var ( + // Below functions are used to solve cycle import problem. + // Note: all functions below will be removed after finishing moving all estimation functions into the cardinality package. + + // GetRowCountByIndexRanges is a function type to get row count by index ranges. + GetRowCountByIndexRanges func(sctx planctx.PlanContext, coll *HistColl, idxID int64, indexRanges []*ranger.Range) (result float64, err error) + + // GetRowCountByIntColumnRanges is a function type to get row count by int column ranges. + GetRowCountByIntColumnRanges func(sctx planctx.PlanContext, coll *HistColl, colID int64, intRanges []*ranger.Range) (result float64, err error) + + // GetRowCountByColumnRanges is a function type to get row count by column ranges. + GetRowCountByColumnRanges func(sctx planctx.PlanContext, coll *HistColl, colID int64, colRanges []*ranger.Range) (result float64, err error) +) + +// Table represents statistics for a table. +type Table struct { + ExtendedStats *ExtendedStatsColl + + ColAndIdxExistenceMap *ColAndIdxExistenceMap + HistColl + Version uint64 + // It's the timestamp of the last analyze time. + // We used it in auto-analyze to determine if this table has been analyzed. + // The source of this field comes from two parts: + // 1. Initialized by snapshot when loading stats_meta. + // 2. Updated by the analysis time of a specific column or index when loading the histogram of the column or index. + LastAnalyzeVersion uint64 + // TblInfoUpdateTS is the UpdateTS of the TableInfo used when filling this struct. + // It is the schema version of the corresponding table. It is used to skip redundant + // loading of stats, i.e, if the cached stats is already update-to-date with mysql.stats_xxx tables, + // and the schema of the table does not change, we don't need to load the stats for this + // table again. + TblInfoUpdateTS uint64 + + IsPkIsHandle bool +} + +// ColAndIdxExistenceMap is the meta map for statistics.Table. +// It can tell whether a column/index really has its statistics. So we won't send useless kv request when we do online stats loading. +type ColAndIdxExistenceMap struct { + checked bool + colAnalyzed map[int64]bool + idxAnalyzed map[int64]bool +} + +// DeleteColAnalyzed deletes the column with the given id. +func (m *ColAndIdxExistenceMap) DeleteColAnalyzed(id int64) { + delete(m.colAnalyzed, id) +} + +// DeleteIdxAnalyzed deletes the index with the given id. +func (m *ColAndIdxExistenceMap) DeleteIdxAnalyzed(id int64) { + delete(m.idxAnalyzed, id) +} + +// Checked returns whether the map has been checked. +func (m *ColAndIdxExistenceMap) Checked() bool { + return m.checked +} + +// SetChecked set the map as checked. +func (m *ColAndIdxExistenceMap) SetChecked() { + m.checked = true +} + +// HasAnalyzed checks whether a column/index stats exists and it has stats. +// TODO: the map should only keep the analyzed cols. +// There's three possible status of column/index's statistics: +// 1. We don't have this column/index. +// 2. We have it, but it hasn't been analyzed yet. +// 3. We have it and its statistics. +// +// To figure out three status, we use HasAnalyzed's TRUE value to represents the status 3. The Has's FALSE to represents the status 1. +func (m *ColAndIdxExistenceMap) HasAnalyzed(id int64, isIndex bool) bool { + if isIndex { + analyzed, ok := m.idxAnalyzed[id] + return ok && analyzed + } + analyzed, ok := m.colAnalyzed[id] + return ok && analyzed +} + +// InsertCol inserts a column with its meta into the map. +func (m *ColAndIdxExistenceMap) InsertCol(id int64, analyzed bool) { + m.colAnalyzed[id] = analyzed +} + +// InsertIndex inserts an index with its meta into the map. +func (m *ColAndIdxExistenceMap) InsertIndex(id int64, analyzed bool) { + m.idxAnalyzed[id] = analyzed +} + +// IsEmpty checks whether the map is empty. +func (m *ColAndIdxExistenceMap) IsEmpty() bool { + return len(m.colAnalyzed)+len(m.idxAnalyzed) == 0 +} + +// ColNum returns the number of columns in the map. +func (m *ColAndIdxExistenceMap) ColNum() int { + return len(m.colAnalyzed) +} + +// Clone deeply copies the map. +func (m *ColAndIdxExistenceMap) Clone() *ColAndIdxExistenceMap { + mm := NewColAndIndexExistenceMap(len(m.colAnalyzed), len(m.idxAnalyzed)) + mm.colAnalyzed = maps.Clone(m.colAnalyzed) + mm.idxAnalyzed = maps.Clone(m.idxAnalyzed) + return mm +} + +const ( + defaultColCap = 16 + defaultIdxCap = 4 +) + +// NewColAndIndexExistenceMapWithoutSize return a new object with default capacity. +func NewColAndIndexExistenceMapWithoutSize() *ColAndIdxExistenceMap { + return &ColAndIdxExistenceMap{ + colAnalyzed: make(map[int64]bool, defaultColCap), + idxAnalyzed: make(map[int64]bool, defaultIdxCap), + } +} + +// NewColAndIndexExistenceMap return a new object with the given capcity. +func NewColAndIndexExistenceMap(colCap, idxCap int) *ColAndIdxExistenceMap { + return &ColAndIdxExistenceMap{ + colAnalyzed: make(map[int64]bool, colCap), + idxAnalyzed: make(map[int64]bool, idxCap), + } +} + +// ColAndIdxExistenceMapIsEqual is used in testing, checking whether the two are equal. +func ColAndIdxExistenceMapIsEqual(m1, m2 *ColAndIdxExistenceMap) bool { + return maps.Equal(m1.colAnalyzed, m2.colAnalyzed) && maps.Equal(m1.idxAnalyzed, m2.idxAnalyzed) +} + +// ExtendedStatsItem is the cached item of a mysql.stats_extended record. +type ExtendedStatsItem struct { + StringVals string + ColIDs []int64 + ScalarVals float64 + Tp uint8 +} + +// ExtendedStatsColl is a collection of cached items for mysql.stats_extended records. +type ExtendedStatsColl struct { + Stats map[string]*ExtendedStatsItem + LastUpdateVersion uint64 +} + +// NewExtendedStatsColl allocate an ExtendedStatsColl struct. +func NewExtendedStatsColl() *ExtendedStatsColl { + return &ExtendedStatsColl{Stats: make(map[string]*ExtendedStatsItem)} +} + +const ( + // ExtendedStatsInited is the status for extended stats which are just registered but have not been analyzed yet. + ExtendedStatsInited uint8 = iota + // ExtendedStatsAnalyzed is the status for extended stats which have been collected in analyze. + ExtendedStatsAnalyzed + // ExtendedStatsDeleted is the status for extended stats which were dropped. These "deleted" records would be removed from storage by GCStats(). + ExtendedStatsDeleted +) + +// HistColl is a collection of histograms. It collects enough information for plan to calculate the selectivity. +type HistColl struct { + // Note that when used in a query, Column use UniqueID as the key while Indices use the index ID in the + // metadata. (See GenerateHistCollFromColumnInfo() for details) + columns map[int64]*Column + indices map[int64]*Index + PhysicalID int64 + // TODO: add AnalyzeCount here + RealtimeCount int64 // RealtimeCount is the current table row count, maintained by applying stats delta based on AnalyzeCount. + ModifyCount int64 // Total modify count in a table. + + // The version of the statistics, refer to Version0, Version1, Version2 and so on. + StatsVer int + // HavePhysicalID is true means this HistColl is from single table and have its ID's information. + // The physical id is used when try to load column stats from storage. + HavePhysicalID bool + Pseudo bool + + /* + Fields below are only used in a query, like for estimation, and they will be useless when stored in + the stats cache. (See GenerateHistCollFromColumnInfo() for details) + */ + + CanNotTriggerLoad bool + // Idx2ColUniqueIDs maps the index id to its column UniqueIDs. It's used to calculate the selectivity in planner. + Idx2ColUniqueIDs map[int64][]int64 + // ColUniqueID2IdxIDs maps the column UniqueID to a list index ids whose first column is it. + // It's used to calculate the selectivity in planner. + ColUniqueID2IdxIDs map[int64][]int64 + // UniqueID2colInfoID maps the column UniqueID to its ID in the metadata. + UniqueID2colInfoID map[int64]int64 + // MVIdx2Columns maps the index id to its columns by expression.Column. + // For normal index, the column id is enough, as we already have in Idx2ColUniqueIDs. But currently, mv index needs more + // information to match the filter against the mv index columns, and we need this map to provide this information. + MVIdx2Columns map[int64][]*expression.Column +} + +// NewHistColl creates a new HistColl. +func NewHistColl(id int64, havePhysicalID bool, realtimeCnt, modifyCnt int64, colNum, idxNum int) *HistColl { + return &HistColl{ + columns: make(map[int64]*Column, colNum), + indices: make(map[int64]*Index, idxNum), + PhysicalID: id, + HavePhysicalID: havePhysicalID, + RealtimeCount: realtimeCnt, + ModifyCount: modifyCnt, + Idx2ColUniqueIDs: make(map[int64][]int64), + ColUniqueID2IdxIDs: make(map[int64][]int64), + UniqueID2colInfoID: make(map[int64]int64), + MVIdx2Columns: make(map[int64][]*expression.Column), + } +} + +// NewHistCollWithColsAndIdxs creates a new HistColl with given columns and indices. +func NewHistCollWithColsAndIdxs(id int64, havePhysicalID bool, realtimeCnt, modifyCnt int64, cols map[int64]*Column, idxs map[int64]*Index) *HistColl { + return &HistColl{ + columns: cols, + indices: idxs, + PhysicalID: id, + HavePhysicalID: havePhysicalID, + RealtimeCount: realtimeCnt, + ModifyCount: modifyCnt, + Idx2ColUniqueIDs: make(map[int64][]int64), + ColUniqueID2IdxIDs: make(map[int64][]int64), + UniqueID2colInfoID: make(map[int64]int64), + MVIdx2Columns: make(map[int64][]*expression.Column), + } +} + +// SetCol sets the column with the given id. +func (coll *HistColl) SetCol(id int64, col *Column) { + coll.columns[id] = col +} + +// SetIdx sets the index with the given id. +func (coll *HistColl) SetIdx(id int64, idx *Index) { + coll.indices[id] = idx +} + +// GetCol gets the column with the given id. +func (coll *HistColl) GetCol(id int64) *Column { + return coll.columns[id] +} + +// GetIdx gets the index with the given id. +func (coll *HistColl) GetIdx(id int64) *Index { + return coll.indices[id] +} + +// ForEachColumnImmutable iterates all columns in the HistColl. +// The bool return value of f is used to control the iteration. If f returns true, the iteration will be stopped. +// Warning: Don't change the content when calling this function. +func (coll *HistColl) ForEachColumnImmutable(f func(int64, *Column) bool) { + for id, col := range coll.columns { + if f(id, col) { + return + } + } +} + +// ForEachIndexImmutable iterates all columns in the HistColl. +// The bool return value of f is used to control the iteration. If f returns true, the iteration will be stopped. +// WARNING: Don't change the content when calling this function. +func (coll *HistColl) ForEachIndexImmutable(f func(int64, *Index) bool) { + for id, idx := range coll.indices { + if f(id, idx) { + return + } + } +} + +// ColNum returns the number of columns in the HistColl. +func (coll *HistColl) ColNum() int { + return len(coll.columns) +} + +// IdxNum returns the number of indices in the HistColl. +func (coll *HistColl) IdxNum() int { + return len(coll.indices) +} + +// DelCol deletes the column with the given id. +func (coll *HistColl) DelCol(id int64) { + delete(coll.columns, id) +} + +// DelIdx deletes the index with the given id. +func (coll *HistColl) DelIdx(id int64) { + delete(coll.indices, id) +} + +// StableOrderColSlice returns a slice of columns in stable order. +func (coll *HistColl) StableOrderColSlice() []*Column { + cols := make([]*Column, 0, len(coll.columns)) + for _, col := range coll.columns { + cols = append(cols, col) + } + slices.SortFunc(cols, func(c1, c2 *Column) int { + return cmp.Compare(c1.ID, c2.ID) + }) + return cols +} + +// GetColSlice returns a slice of columns without order. +func (coll *HistColl) GetColSlice() []*Column { + cols := make([]*Column, 0, len(coll.columns)) + for _, col := range coll.columns { + cols = append(cols, col) + } + return cols +} + +// StableOrderIdxSlice returns a slice of indices in stable order. +func (coll *HistColl) StableOrderIdxSlice() []*Index { + idxs := make([]*Index, 0, len(coll.indices)) + for _, idx := range coll.indices { + idxs = append(idxs, idx) + } + slices.SortFunc(idxs, func(i1, i2 *Index) int { + return cmp.Compare(i1.ID, i2.ID) + }) + return idxs +} + +// GetIdxSlice returns a slice of indices without order. +func (coll *HistColl) GetIdxSlice() []*Index { + idxs := make([]*Index, 0, len(coll.indices)) + for _, idx := range coll.indices { + idxs = append(idxs, idx) + } + return idxs +} + +// SetAllIndexFullLoadForBootstrap sets all indices' stats loaded status to full load for bootstrap. +func (coll *HistColl) SetAllIndexFullLoadForBootstrap() { + for _, idx := range coll.indices { + idx.StatsLoadedStatus = NewStatsFullLoadStatus() + } +} + +// CalcPreScalar calculates the pre-calculated scalar for all columns and indices. +func (coll *HistColl) CalcPreScalar() { + for _, idx := range coll.indices { + for i := 1; i < idx.Len(); i++ { + idx.Buckets[i].Count += idx.Buckets[i-1].Count + } + idx.PreCalculateScalar() + } + for _, col := range coll.columns { + for i := 1; i < col.Len(); i++ { + col.Buckets[i].Count += col.Buckets[i-1].Count + } + col.PreCalculateScalar() + } +} + +// DropEvicted will drop the unnecessary data for all columns and indices. It's triggerred by stats cache. +func (coll *HistColl) DropEvicted() { + for _, col := range coll.columns { + if !col.IsStatsInitialized() || col.GetEvictedStatus() == AllEvicted { + continue + } + col.DropUnnecessaryData() + } + for _, idx := range coll.indices { + if !idx.IsStatsInitialized() || idx.GetEvictedStatus() == AllEvicted { + continue + } + idx.DropUnnecessaryData() + } +} + +// TableMemoryUsage records tbl memory usage +type TableMemoryUsage struct { + ColumnsMemUsage map[int64]CacheItemMemoryUsage + IndicesMemUsage map[int64]CacheItemMemoryUsage + TableID int64 + TotalMemUsage int64 +} + +// TotalIdxTrackingMemUsage returns total indices' tracking memory usage +func (t *TableMemoryUsage) TotalIdxTrackingMemUsage() (sum int64) { + for _, idx := range t.IndicesMemUsage { + sum += idx.TrackingMemUsage() + } + return sum +} + +// TotalColTrackingMemUsage returns total columns' tracking memory usage +func (t *TableMemoryUsage) TotalColTrackingMemUsage() (sum int64) { + for _, col := range t.ColumnsMemUsage { + sum += col.TrackingMemUsage() + } + return sum +} + +// TotalTrackingMemUsage return total tracking memory usage +func (t *TableMemoryUsage) TotalTrackingMemUsage() int64 { + return t.TotalIdxTrackingMemUsage() + t.TotalColTrackingMemUsage() +} + +// TableCacheItem indicates the unit item stored in statsCache, eg: Column/Index +type TableCacheItem interface { + ItemID() int64 + MemoryUsage() CacheItemMemoryUsage + IsAllEvicted() bool + GetEvictedStatus() int + + DropUnnecessaryData() + IsStatsInitialized() bool + GetStatsVer() int64 +} + +// CacheItemMemoryUsage indicates the memory usage of TableCacheItem +type CacheItemMemoryUsage interface { + ItemID() int64 + TotalMemoryUsage() int64 + TrackingMemUsage() int64 + HistMemUsage() int64 + TopnMemUsage() int64 + CMSMemUsage() int64 +} + +// ColumnMemUsage records column memory usage +type ColumnMemUsage struct { + ColumnID int64 + HistogramMemUsage int64 + CMSketchMemUsage int64 + FMSketchMemUsage int64 + TopNMemUsage int64 + TotalMemUsage int64 +} + +// TotalMemoryUsage implements CacheItemMemoryUsage +func (c *ColumnMemUsage) TotalMemoryUsage() int64 { + return c.TotalMemUsage +} + +// ItemID implements CacheItemMemoryUsage +func (c *ColumnMemUsage) ItemID() int64 { + return c.ColumnID +} + +// TrackingMemUsage implements CacheItemMemoryUsage +func (c *ColumnMemUsage) TrackingMemUsage() int64 { + return c.CMSketchMemUsage + c.TopNMemUsage + c.HistogramMemUsage +} + +// HistMemUsage implements CacheItemMemoryUsage +func (c *ColumnMemUsage) HistMemUsage() int64 { + return c.HistogramMemUsage +} + +// TopnMemUsage implements CacheItemMemoryUsage +func (c *ColumnMemUsage) TopnMemUsage() int64 { + return c.TopNMemUsage +} + +// CMSMemUsage implements CacheItemMemoryUsage +func (c *ColumnMemUsage) CMSMemUsage() int64 { + return c.CMSketchMemUsage +} + +// IndexMemUsage records index memory usage +type IndexMemUsage struct { + IndexID int64 + HistogramMemUsage int64 + CMSketchMemUsage int64 + TopNMemUsage int64 + TotalMemUsage int64 +} + +// TotalMemoryUsage implements CacheItemMemoryUsage +func (c *IndexMemUsage) TotalMemoryUsage() int64 { + return c.TotalMemUsage +} + +// ItemID implements CacheItemMemoryUsage +func (c *IndexMemUsage) ItemID() int64 { + return c.IndexID +} + +// TrackingMemUsage implements CacheItemMemoryUsage +func (c *IndexMemUsage) TrackingMemUsage() int64 { + return c.CMSketchMemUsage + c.TopNMemUsage + c.HistogramMemUsage +} + +// HistMemUsage implements CacheItemMemoryUsage +func (c *IndexMemUsage) HistMemUsage() int64 { + return c.HistogramMemUsage +} + +// TopnMemUsage implements CacheItemMemoryUsage +func (c *IndexMemUsage) TopnMemUsage() int64 { + return c.TopNMemUsage +} + +// CMSMemUsage implements CacheItemMemoryUsage +func (c *IndexMemUsage) CMSMemUsage() int64 { + return c.CMSketchMemUsage +} + +// MemoryUsage returns the total memory usage of this Table. +// it will only calc the size of Columns and Indices stats data of table. +// We ignore the size of other metadata in Table +func (t *Table) MemoryUsage() *TableMemoryUsage { + tMemUsage := &TableMemoryUsage{ + TableID: t.PhysicalID, + ColumnsMemUsage: make(map[int64]CacheItemMemoryUsage), + IndicesMemUsage: make(map[int64]CacheItemMemoryUsage), + } + for _, col := range t.columns { + if col != nil { + colMemUsage := col.MemoryUsage() + tMemUsage.ColumnsMemUsage[colMemUsage.ItemID()] = colMemUsage + tMemUsage.TotalMemUsage += colMemUsage.TotalMemoryUsage() + } + } + for _, index := range t.indices { + if index != nil { + idxMemUsage := index.MemoryUsage() + tMemUsage.IndicesMemUsage[idxMemUsage.ItemID()] = idxMemUsage + tMemUsage.TotalMemUsage += idxMemUsage.TotalMemoryUsage() + } + } + return tMemUsage +} + +// Copy copies the current table. +func (t *Table) Copy() *Table { + newHistColl := HistColl{ + PhysicalID: t.PhysicalID, + HavePhysicalID: t.HavePhysicalID, + RealtimeCount: t.RealtimeCount, + columns: make(map[int64]*Column, len(t.columns)), + indices: make(map[int64]*Index, len(t.indices)), + Pseudo: t.Pseudo, + ModifyCount: t.ModifyCount, + StatsVer: t.StatsVer, + } + for id, col := range t.columns { + newHistColl.columns[id] = col.Copy() + } + for id, idx := range t.indices { + newHistColl.indices[id] = idx.Copy() + } + nt := &Table{ + HistColl: newHistColl, + Version: t.Version, + TblInfoUpdateTS: t.TblInfoUpdateTS, + LastAnalyzeVersion: t.LastAnalyzeVersion, + } + if t.ExtendedStats != nil { + newExtStatsColl := &ExtendedStatsColl{ + Stats: make(map[string]*ExtendedStatsItem), + LastUpdateVersion: t.ExtendedStats.LastUpdateVersion, + } + for name, item := range t.ExtendedStats.Stats { + newExtStatsColl.Stats[name] = item + } + nt.ExtendedStats = newExtStatsColl + } + if t.ColAndIdxExistenceMap != nil { + nt.ColAndIdxExistenceMap = t.ColAndIdxExistenceMap.Clone() + } + return nt +} + +// ShallowCopy copies the current table. +// It's different from Copy(). Only the struct Table (and also the embedded HistColl) is copied here. +// The internal containers, like t.Columns and t.Indices, and the stats, like TopN and Histogram are not copied. +func (t *Table) ShallowCopy() *Table { + newHistColl := HistColl{ + PhysicalID: t.PhysicalID, + HavePhysicalID: t.HavePhysicalID, + RealtimeCount: t.RealtimeCount, + columns: t.columns, + indices: t.indices, + Pseudo: t.Pseudo, + ModifyCount: t.ModifyCount, + StatsVer: t.StatsVer, + } + nt := &Table{ + HistColl: newHistColl, + Version: t.Version, + TblInfoUpdateTS: t.TblInfoUpdateTS, + ExtendedStats: t.ExtendedStats, + ColAndIdxExistenceMap: t.ColAndIdxExistenceMap, + LastAnalyzeVersion: t.LastAnalyzeVersion, + } + return nt +} + +// String implements Stringer interface. +func (t *Table) String() string { + strs := make([]string, 0, len(t.columns)+1) + strs = append(strs, fmt.Sprintf("Table:%d RealtimeCount:%d", t.PhysicalID, t.RealtimeCount)) + cols := make([]*Column, 0, len(t.columns)) + for _, col := range t.columns { + cols = append(cols, col) + } + slices.SortFunc(cols, func(i, j *Column) int { return cmp.Compare(i.ID, j.ID) }) + for _, col := range cols { + strs = append(strs, col.String()) + } + idxs := make([]*Index, 0, len(t.indices)) + for _, idx := range t.indices { + idxs = append(idxs, idx) + } + slices.SortFunc(idxs, func(i, j *Index) int { return cmp.Compare(i.ID, j.ID) }) + for _, idx := range idxs { + strs = append(strs, idx.String()) + } + // TODO: concat content of ExtendedStatsColl + return strings.Join(strs, "\n") +} + +// IndexStartWithColumn finds the first index whose first column is the given column. +func (t *Table) IndexStartWithColumn(colName string) *Index { + for _, index := range t.indices { + if index.Info.Columns[0].Name.L == colName { + return index + } + } + return nil +} + +// ColumnByName finds the statistics.Column for the given column. +func (t *Table) ColumnByName(colName string) *Column { + for _, c := range t.columns { + if c.Info.Name.L == colName { + return c + } + } + return nil +} + +// GetStatsInfo returns their statistics according to the ID of the column or index, including histogram, CMSketch, TopN and FMSketch. +// +// needCopy: In order to protect the item in the cache from being damaged, we need to copy the item. +func (t *Table) GetStatsInfo(id int64, isIndex bool, needCopy bool) (*Histogram, *CMSketch, *TopN, *FMSketch, bool) { + if isIndex { + if idxStatsInfo, ok := t.indices[id]; ok { + if needCopy { + return idxStatsInfo.Histogram.Copy(), + idxStatsInfo.CMSketch.Copy(), idxStatsInfo.TopN.Copy(), idxStatsInfo.FMSketch.Copy(), true + } + return &idxStatsInfo.Histogram, + idxStatsInfo.CMSketch, idxStatsInfo.TopN, idxStatsInfo.FMSketch, true + } + // newly added index which is not analyzed yet + return nil, nil, nil, nil, false + } + if colStatsInfo, ok := t.columns[id]; ok { + if needCopy { + return colStatsInfo.Histogram.Copy(), colStatsInfo.CMSketch.Copy(), + colStatsInfo.TopN.Copy(), colStatsInfo.FMSketch.Copy(), true + } + return &colStatsInfo.Histogram, colStatsInfo.CMSketch, + colStatsInfo.TopN, colStatsInfo.FMSketch, true + } + // newly added column which is not analyzed yet + return nil, nil, nil, nil, false +} + +// IsAnalyzed checks whether the table is analyzed or not by checking its last analyze's timestamp value. +// A valid timestamp must be greater than 0. +func (t *Table) IsAnalyzed() bool { + return t.LastAnalyzeVersion > 0 +} + +// IsEligibleForAnalysis checks whether the table is eligible for analysis. +func (t *Table) IsEligibleForAnalysis() bool { + // 1. If the statistics are either not loaded or are classified as pseudo, there is no need for analyze. + // Pseudo statistics can be created by the optimizer, so we need to double check it. + // 2. If the table is too small, we don't want to waste time to analyze it. + // Leave the opportunity to other bigger tables. + if t == nil || t.Pseudo || t.RealtimeCount < AutoAnalyzeMinCnt { + return false + } + + return true +} + +// GetAnalyzeRowCount tries to get the row count of a column or an index if possible. +// This method is useful because this row count doesn't consider the modify count. +func (coll *HistColl) GetAnalyzeRowCount() float64 { + ids := maps.Keys(coll.columns) + slices.Sort(ids) + for _, id := range ids { + col := coll.columns[id] + if col != nil && col.IsFullLoad() { + return col.TotalRowCount() + } + } + ids = maps.Keys(coll.indices) + slices.Sort(ids) + for _, id := range ids { + idx := coll.indices[id] + if idx == nil { + continue + } + if idx.Info != nil && idx.Info.MVIndex { + continue + } + if idx.IsFullLoad() { + return idx.TotalRowCount() + } + } + return -1 +} + +// GetScaledRealtimeAndModifyCnt scale the RealtimeCount and ModifyCount for some special indexes where the total row +// count is different from the total row count of the table. Currently, only the mv index is this case. +// Because we will use the RealtimeCount and ModifyCount during the estimation for ranges on this index (like the upper +// bound for the out-of-range estimation logic and the IncreaseFactor logic), we can't directly use the RealtimeCount and +// ModifyCount of the table. Instead, we should scale them before using. +// For example, if the table analyze row count is 1000 and realtime row count is 1500, and the mv index total count is 5000, +// when calculating the IncreaseFactor, it should be 1500/1000 = 1.5 for normal columns/indexes, and we should use the +// same 1.5 for mv index. But obviously, use 1500/5000 would be wrong, the correct calculation should be 7500/5000 = 1.5. +// So we add this function to get this 7500. +func (coll *HistColl) GetScaledRealtimeAndModifyCnt(idxStats *Index) (realtimeCnt, modifyCnt int64) { + // In theory, we can apply this scale logic on all indexes. But currently, we only apply it on the mv index to avoid + // any unexpected changes caused by factors like precision difference. + if idxStats == nil || idxStats.Info == nil || !idxStats.Info.MVIndex || !idxStats.IsFullLoad() { + return coll.RealtimeCount, coll.ModifyCount + } + analyzeRowCount := coll.GetAnalyzeRowCount() + if analyzeRowCount <= 0 { + return coll.RealtimeCount, coll.ModifyCount + } + idxTotalRowCount := idxStats.TotalRowCount() + if idxTotalRowCount <= 0 { + return coll.RealtimeCount, coll.ModifyCount + } + scale := idxTotalRowCount / analyzeRowCount + return int64(float64(coll.RealtimeCount) * scale), int64(float64(coll.ModifyCount) * scale) +} + +// GetStatsHealthy calculates stats healthy if the table stats is not pseudo. +// If the table stats is pseudo, it returns 0, false, otherwise it returns stats healthy, true. +func (t *Table) GetStatsHealthy() (int64, bool) { + if t == nil || t.Pseudo { + return 0, false + } + if !t.IsAnalyzed() { + return 0, true + } + var healthy int64 + count := float64(t.RealtimeCount) + if histCount := t.GetAnalyzeRowCount(); histCount > 0 { + count = histCount + } + if float64(t.ModifyCount) < count { + healthy = int64((1.0 - float64(t.ModifyCount)/count) * 100.0) + } else if t.ModifyCount == 0 { + healthy = 100 + } + return healthy, true +} + +// ColumnIsLoadNeeded checks whether the column needs trigger the async/sync load. +// The Column should be visible in the table and really has analyzed statistics in the storage. +// Also, if the stats has been loaded into the memory, we also don't need to load it. +// We return the Column together with the checking result, to avoid accessing the map multiple times. +// The first bool is whether we need to load it into memory. The second bool is whether this column has stats in the system table or not. +func (t *Table) ColumnIsLoadNeeded(id int64, fullLoad bool) (*Column, bool, bool) { + if t.Pseudo { + return nil, false, false + } + // when we use non-lite init stats, it cannot init the stats for common columns. + // so we need to force to load the stats. + col, ok := t.columns[id] + if !ok { + return nil, true, true + } + hasAnalyzed := t.ColAndIdxExistenceMap.HasAnalyzed(id, false) + + // If it's not analyzed yet. + // The real check condition: !ok && !hashAnalyzed. + // After this check, we will always have ok && hasAnalyzed. + if !hasAnalyzed { + return nil, false, false + } + + // Restore the condition from the simplified form: + // 1. ok && hasAnalyzed && fullLoad && !col.IsFullLoad => need load + // 2. ok && hasAnalyzed && !fullLoad && !col.statsInitialized => need load + if (fullLoad && !col.IsFullLoad()) || (!fullLoad && !col.statsInitialized) { + return col, true, true + } + + // Otherwise don't need load it. + return col, false, true +} + +// IndexIsLoadNeeded checks whether the index needs trigger the async/sync load. +// The Index should be visible in the table and really has analyzed statistics in the stroage. +// Also, if the stats has been loaded into the memory, we also don't need to load it. +// We return the Index together with the checking result, to avoid accessing the map multiple times. +func (t *Table) IndexIsLoadNeeded(id int64) (*Index, bool) { + idx, ok := t.indices[id] + // If the index is not in the memory, and we have its stats in the storage. We need to trigger the load. + if !ok && (t.ColAndIdxExistenceMap.HasAnalyzed(id, true) || !t.ColAndIdxExistenceMap.Checked()) { + return nil, true + } + // If the index is in the memory, we check its embedded func. + if ok && idx.IsAnalyzed() && !idx.IsFullLoad() { + return idx, true + } + return idx, false +} + +// RatioOfPseudoEstimate means if modifyCount / statsTblCount is greater than this ratio, we think the stats is invalid +// and use pseudo estimation. +var RatioOfPseudoEstimate = atomic.NewFloat64(0.7) + +// IsInitialized returns true if any column/index stats of the table is initialized. +func (t *Table) IsInitialized() bool { + for _, col := range t.columns { + if col != nil && col.IsStatsInitialized() { + return true + } + } + for _, idx := range t.indices { + if idx != nil && idx.IsStatsInitialized() { + return true + } + } + return false +} + +// IsOutdated returns true if the table stats is outdated. +func (t *Table) IsOutdated() bool { + rowcount := t.GetAnalyzeRowCount() + if rowcount < 0 { + rowcount = float64(t.RealtimeCount) + } + if rowcount > 0 && float64(t.ModifyCount)/rowcount > RatioOfPseudoEstimate.Load() { + return true + } + return false +} + +// ReleaseAndPutToPool releases data structures of Table and put itself back to pool. +func (t *Table) ReleaseAndPutToPool() { + for _, col := range t.columns { + col.FMSketch.DestroyAndPutToPool() + } + maps.Clear(t.columns) + for _, idx := range t.indices { + idx.FMSketch.DestroyAndPutToPool() + } + maps.Clear(t.indices) +} + +// ID2UniqueID generates a new HistColl whose `Columns` is built from UniqueID of given columns. +func (coll *HistColl) ID2UniqueID(columns []*expression.Column) *HistColl { + cols := make(map[int64]*Column) + for _, col := range columns { + colHist, ok := coll.columns[col.ID] + if ok { + cols[col.UniqueID] = colHist + } + } + newColl := &HistColl{ + PhysicalID: coll.PhysicalID, + HavePhysicalID: coll.HavePhysicalID, + Pseudo: coll.Pseudo, + RealtimeCount: coll.RealtimeCount, + ModifyCount: coll.ModifyCount, + columns: cols, + } + return newColl +} + +// GenerateHistCollFromColumnInfo generates a new HistColl whose ColUniqueID2IdxIDs and Idx2ColUniqueIDs is built from the given parameter. +func (coll *HistColl) GenerateHistCollFromColumnInfo(tblInfo *model.TableInfo, columns []*expression.Column) *HistColl { + newColHistMap := make(map[int64]*Column) + colInfoID2UniqueID := make(map[int64]int64, len(columns)) + uniqueID2colInfoID := make(map[int64]int64, len(columns)) + idxID2idxInfo := make(map[int64]*model.IndexInfo) + for _, col := range columns { + colInfoID2UniqueID[col.ID] = col.UniqueID + uniqueID2colInfoID[col.UniqueID] = col.ID + } + for id, colHist := range coll.columns { + uniqueID, ok := colInfoID2UniqueID[id] + // Collect the statistics by the given columns. + if ok { + newColHistMap[uniqueID] = colHist + } + } + for _, idxInfo := range tblInfo.Indices { + idxID2idxInfo[idxInfo.ID] = idxInfo + } + newIdxHistMap := make(map[int64]*Index) + idx2Columns := make(map[int64][]int64) + colID2IdxIDs := make(map[int64][]int64) + mvIdx2Columns := make(map[int64][]*expression.Column) + for id, idxHist := range coll.indices { + idxInfo := idxID2idxInfo[id] + if idxInfo == nil { + continue + } + ids := make([]int64, 0, len(idxInfo.Columns)) + for _, idxCol := range idxInfo.Columns { + uniqueID, ok := colInfoID2UniqueID[tblInfo.Columns[idxCol.Offset].ID] + if !ok { + break + } + ids = append(ids, uniqueID) + } + // If the length of the id list is 0, this index won't be used in this query. + if len(ids) == 0 { + continue + } + colID2IdxIDs[ids[0]] = append(colID2IdxIDs[ids[0]], idxHist.ID) + newIdxHistMap[idxHist.ID] = idxHist + idx2Columns[idxHist.ID] = ids + if idxInfo.MVIndex { + cols, ok := PrepareCols4MVIndex(tblInfo, idxInfo, columns, true) + if ok { + mvIdx2Columns[id] = cols + } + } + } + for _, idxIDs := range colID2IdxIDs { + slices.Sort(idxIDs) + } + newColl := &HistColl{ + PhysicalID: coll.PhysicalID, + HavePhysicalID: coll.HavePhysicalID, + Pseudo: coll.Pseudo, + RealtimeCount: coll.RealtimeCount, + ModifyCount: coll.ModifyCount, + columns: newColHistMap, + indices: newIdxHistMap, + ColUniqueID2IdxIDs: colID2IdxIDs, + Idx2ColUniqueIDs: idx2Columns, + UniqueID2colInfoID: uniqueID2colInfoID, + MVIdx2Columns: mvIdx2Columns, + } + return newColl +} + +// PseudoTable creates a pseudo table statistics. +// Usually, we don't want to trigger stats loading for pseudo table. +// But there are exceptional cases. In such cases, we should pass allowTriggerLoading as true. +// Such case could possibly happen in getStatsTable(). +func PseudoTable(tblInfo *model.TableInfo, allowTriggerLoading bool, allowFillHistMeta bool) *Table { + pseudoHistColl := HistColl{ + RealtimeCount: PseudoRowCount, + PhysicalID: tblInfo.ID, + HavePhysicalID: true, + columns: make(map[int64]*Column, 2), + indices: make(map[int64]*Index, 2), + Pseudo: true, + CanNotTriggerLoad: !allowTriggerLoading, + } + t := &Table{ + HistColl: pseudoHistColl, + ColAndIdxExistenceMap: NewColAndIndexExistenceMap(len(tblInfo.Columns), len(tblInfo.Indices)), + } + for _, col := range tblInfo.Columns { + // The column is public to use. Also we should check the column is not hidden since hidden means that it's used by expression index. + // We would not collect stats for the hidden column and we won't use the hidden column to estimate. + // Thus we don't create pseudo stats for it. + if col.State == model.StatePublic && !col.Hidden { + t.ColAndIdxExistenceMap.InsertCol(col.ID, false) + if allowFillHistMeta { + t.columns[col.ID] = &Column{ + PhysicalID: tblInfo.ID, + Info: col, + IsHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.GetFlag()), + Histogram: *NewHistogram(col.ID, 0, 0, 0, &col.FieldType, 0, 0), + } + } + } + } + for _, idx := range tblInfo.Indices { + if idx.State == model.StatePublic { + t.ColAndIdxExistenceMap.InsertIndex(idx.ID, false) + if allowFillHistMeta { + t.indices[idx.ID] = &Index{ + PhysicalID: tblInfo.ID, + Info: idx, + Histogram: *NewHistogram(idx.ID, 0, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0), + } + } + } + } + return t +} + +// CheckAnalyzeVerOnTable checks whether the given version is the one from the tbl. +// If not, it will return false and set the version to the tbl's. +// We use this check to make sure all the statistics of the table are in the same version. +func CheckAnalyzeVerOnTable(tbl *Table, version *int) bool { + if tbl.StatsVer != Version0 && tbl.StatsVer != *version { + *version = tbl.StatsVer + return false + } + return true +} + +// PrepareCols4MVIndex helps to identify the columns of an MV index. We need this information for estimation. +// This logic is shared between the estimation logic and the access path generation logic. We'd like to put the mv index +// related functions together in the planner/core package. So we use this trick here to avoid the import cycle. +var PrepareCols4MVIndex func( + tableInfo *model.TableInfo, + mvIndex *model.IndexInfo, + tblCols []*expression.Column, + checkOnly1ArrayTypeCol bool, +) (idxCols []*expression.Column, ok bool)