diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index a2b1e6997aa35..1ff5868803489 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -462,6 +462,45 @@ func DecodeCMSketchAndTopN(data []byte, topNRows []chunk.Row) (*CMSketch, *TopN, return cm, topN, nil } +// DecodeTopN decodes a TopN from the given byte slice. +func DecodeTopN(topNRows []chunk.Row) (*TopN, error) { + pbTopN := make([]*tipb.CMSketchTopN, 0, len(topNRows)) + for _, row := range topNRows { + data := make([]byte, len(row.GetBytes(0))) + copy(data, row.GetBytes(0)) + pbTopN = append(pbTopN, &tipb.CMSketchTopN{ + Data: data, + Count: row.GetUint64(1), + }) + } + return TopNFromProto(pbTopN), nil +} + +// DecodeCMSketch encodes the given CMSketch to byte slice. +func DecodeCMSketch(data []byte) (*CMSketch, error) { + if len(data) == 0 { + return nil, nil + } + protoSketch := &tipb.CMSketch{} + err := protoSketch.Unmarshal(data) + if err != nil { + return nil, errors.Trace(err) + } + if len(protoSketch.Rows) == 0 { + return nil, nil + } + c := NewCMSketch(int32(len(protoSketch.Rows)), int32(len(protoSketch.Rows[0].Counters))) + for i, row := range protoSketch.Rows { + c.count = 0 + for j, counter := range row.Counters { + c.table[i][j] = counter + c.count = c.count + uint64(counter) + } + } + c.defaultValue = protoSketch.DefaultValue + return c, nil +} + // TotalCount returns the total count in the sketch, it is only used for test. func (c *CMSketch) TotalCount() uint64 { if c == nil { diff --git a/statistics/handle/globalstats/BUILD.bazel b/statistics/handle/globalstats/BUILD.bazel index 1e618c237131c..56a311629a2c7 100644 --- a/statistics/handle/globalstats/BUILD.bazel +++ b/statistics/handle/globalstats/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "globalstats", srcs = [ "global_stats.go", + "global_stats_async.go", "merge_worker.go", "topn.go", ], @@ -14,13 +15,18 @@ go_library( "//parser/ast", "//parser/model", "//sessionctx", + "//sessionctx/stmtctx", "//statistics", + "//statistics/handle/storage", + "//statistics/handle/util", "//table", "//types", "//util/hack", "//util/logutil", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_tiancaiamao_gp//:gp", + "@org_golang_x_sync//errgroup", "@org_uber_go_zap//:zap", ], ) diff --git a/statistics/handle/globalstats/global_stats.go b/statistics/handle/globalstats/global_stats.go index 6ebf5d33e55ce..cde00f7e33e2b 100644 --- a/statistics/handle/globalstats/global_stats.go +++ b/statistics/handle/globalstats/global_stats.go @@ -15,8 +15,6 @@ package globalstats import ( - "fmt" - "github.com/pingcap/errors" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/ast" @@ -24,7 +22,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" "github.com/tiancaiamao/gp" "go.uber.org/zap" @@ -66,9 +63,8 @@ func newGlobalStats(histCount int) *GlobalStats { } type ( - getTableByPhysicalIDFunc func(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) - loadTablePartitionStatsFunc func(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) - // GlobalStatusHandler is used to handle the global-level stats. + getTableByPhysicalIDFunc func(is infoschema.InfoSchema, tableID int64) (table.Table, bool) + callWithSCtxFunc func(f func(sctx sessionctx.Context) error, flags ...int) (err error) ) // MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo. @@ -80,188 +76,18 @@ func MergePartitionStats2GlobalStats( globalTableInfo *model.TableInfo, isIndex bool, histIDs []int64, - allPartitionStats map[int64]*statistics.Table, getTableByPhysicalIDFn getTableByPhysicalIDFunc, - loadTablePartitionStatsFn loadTablePartitionStatsFunc, + callWithSCtxFunc callWithSCtxFunc, ) (globalStats *GlobalStats, err error) { - externalCache := false - if allPartitionStats != nil { - externalCache = true - } - - partitionNum := len(globalTableInfo.Partition.Definitions) - if len(histIDs) == 0 { - for _, col := range globalTableInfo.Columns { - // The virtual generated column stats can not be merged to the global stats. - if col.IsVirtualGenerated() { - continue - } - histIDs = append(histIDs, col.ID) - } - } - - // Initialized the globalStats. - globalStats = newGlobalStats(len(histIDs)) - - // Slice Dimensions Explanation - // First dimension: Column or Index Stats - // Second dimension: Partition Tables - // Because all topN and histograms need to be collected before they can be merged. - // So we should store all the partition-level stats first, and merge them together. - allHg := make([][]*statistics.Histogram, globalStats.Num) - allCms := make([][]*statistics.CMSketch, globalStats.Num) - allTopN := make([][]*statistics.TopN, globalStats.Num) - allFms := make([][]*statistics.FMSketch, globalStats.Num) - for i := 0; i < globalStats.Num; i++ { - allHg[i] = make([]*statistics.Histogram, 0, partitionNum) - allCms[i] = make([]*statistics.CMSketch, 0, partitionNum) - allTopN[i] = make([]*statistics.TopN, 0, partitionNum) - allFms[i] = make([]*statistics.FMSketch, 0, partitionNum) - } - - skipMissingPartitionStats := sc.GetSessionVars().SkipMissingPartitionStats - for _, def := range globalTableInfo.Partition.Definitions { - partitionID := def.ID - partitionTable, ok := getTableByPhysicalIDFn(is, partitionID) - if !ok { - err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) - return - } - tableInfo := partitionTable.Meta() - var partitionStats *statistics.Table - var okLoad bool - if allPartitionStats != nil { - partitionStats, okLoad = allPartitionStats[partitionID] - } else { - okLoad = false - } - // If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats - if !okLoad { - var err1 error - partitionStats, err1 = loadTablePartitionStatsFn(tableInfo, &def) - if err1 != nil { - if skipMissingPartitionStats && types.ErrPartitionStatsMissing.Equal(err1) { - globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, fmt.Sprintf("partition `%s`", def.Name.L)) - continue - } - err = err1 - return - } - if externalCache { - allPartitionStats[partitionID] = partitionStats - } - } - - for i := 0; i < globalStats.Num; i++ { - // GetStatsInfo will return the copy of the statsInfo, so we don't need to worry about the data race. - // partitionStats will be released after the for loop. - hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex, externalCache) - skipPartition := false - if !analyzed { - var missingPart string - if !isIndex { - missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) - } else { - missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) - } - if !skipMissingPartitionStats { - err = types.ErrPartitionStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) - return - } - globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart) - skipPartition = true - } - - // Partition stats is not empty but column stats(hist, topN) is missing. - if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) { - var missingPart string - if !isIndex { - missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) - } else { - missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) - } - if !skipMissingPartitionStats { - err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) - return - } - globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topN") - skipPartition = true - } - - if i == 0 { - // In a partition, we will only update globalStats.Count once. - globalStats.Count += partitionStats.RealtimeCount - globalStats.ModifyCount += partitionStats.ModifyCount - } - - if !skipPartition { - allHg[i] = append(allHg[i], hg) - allCms[i] = append(allCms[i], cms) - allTopN[i] = append(allTopN[i], topN) - allFms[i] = append(allFms[i], fms) - } - } + worker, err := NewAsyncMergePartitionStats2GlobalStats(gpool, globalTableInfo, histIDs, is, getTableByPhysicalIDFn, callWithSCtxFunc) + if err != nil { + return nil, errors.Trace(err) } - - // After collect all the statistics from the partition-level stats, - // we should merge them together. - for i := 0; i < globalStats.Num; i++ { - if len(allHg[i]) == 0 { - // If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0` - // correctly. It can avoid unexpected behaviors such as nil pointer panic. - continue - } - // FMSketch use many memory, so we first deal with it and then destroy it. - // Merge FMSketch. - globalStats.Fms[i] = allFms[i][0] - for j := 1; j < len(allFms[i]); j++ { - globalStats.Fms[i].MergeFMSketch(allFms[i][j]) - allFms[i][j].DestroyAndPutToPool() - } - - // Update the global NDV. - globalStatsNDV := globalStats.Fms[i].NDV() - if globalStatsNDV > globalStats.Count { - globalStatsNDV = globalStats.Count - } - globalStats.Fms[i].DestroyAndPutToPool() - - // Merge CMSketch. - globalStats.Cms[i] = allCms[i][0] - for j := 1; j < len(allCms[i]); j++ { - err = globalStats.Cms[i].MergeCMSketch(allCms[i][j]) - if err != nil { - return - } - } - - // Merge topN. - // Note: We need to merge TopN before merging the histogram. - // Because after merging TopN, some numbers will be left. - // These remaining topN numbers will be used as a separate bucket for later histogram merging. - var poppedTopN []statistics.TopNMeta - wrapper := NewStatsWrapper(allHg[i], allTopN[i]) - globalStats.TopN[i], poppedTopN, allHg[i], err = mergeGlobalStatsTopN(gpool, sc, wrapper, - sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex) - if err != nil { - return - } - - // Merge histogram. - globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], poppedTopN, - int64(opts[ast.AnalyzeOptNumBuckets]), isIndex) - if err != nil { - return - } - - // NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them. - for j := range globalStats.Hg[i].Buckets { - globalStats.Hg[i].Buckets[j].NDV = 0 - } - - globalStats.Hg[i].NDV = globalStatsNDV + err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex) + if err != nil { + return nil, errors.Trace(err) } - return + return worker.Result(), nil } // MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. @@ -270,26 +96,30 @@ func MergePartitionStats2GlobalStatsByTableID( gpool *gp.Pool, opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, - physicalID int64, + tableID int64, isIndex bool, histIDs []int64, - allPartitionStats map[int64]*statistics.Table, getTableByPhysicalIDFn getTableByPhysicalIDFunc, - loadTablePartitionStatsFn loadTablePartitionStatsFunc, + callWithSCtxFunc callWithSCtxFunc, ) (globalStats *GlobalStats, err error) { // Get the partition table IDs. - globalTable, ok := getTableByPhysicalIDFn(is, physicalID) + globalTable, ok := getTableByPhysicalIDFn(is, tableID) if !ok { - err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID) + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", tableID) return } globalTableInfo := globalTable.Meta() - globalStats, err = MergePartitionStats2GlobalStats(sc, gpool, opts, is, globalTableInfo, isIndex, histIDs, allPartitionStats, getTableByPhysicalIDFn, loadTablePartitionStatsFn) + + worker, err := NewAsyncMergePartitionStats2GlobalStats(gpool, globalTableInfo, histIDs, is, getTableByPhysicalIDFn, callWithSCtxFunc) if err != nil { - return + return nil, errors.Trace(err) } - + err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex) + if err != nil { + return nil, errors.Trace(err) + } + globalStats = worker.Result() if len(globalStats.MissingPartitionStats) > 0 { var item string if !isIndex { diff --git a/statistics/handle/globalstats/global_stats_async.go b/statistics/handle/globalstats/global_stats_async.go new file mode 100644 index 0000000000000..bf4e1cfed57f5 --- /dev/null +++ b/statistics/handle/globalstats/global_stats_async.go @@ -0,0 +1,501 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package globalstats + +import ( + "context" + stderrors "errors" + "fmt" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/statistics/handle/storage" + "github.com/pingcap/tidb/statistics/handle/util" + "github.com/pingcap/tidb/types" + "github.com/tiancaiamao/gp" + "golang.org/x/sync/errgroup" +) + +type mergeItem[T any] struct { + item T + idx int +} + +type skipItem struct { + histID int64 + partitionID int64 +} + +// toSQLIndex is used to convert bool to int64. +func toSQLIndex(isIndex bool) int { + var index = int(0) + if isIndex { + index = 1 + } + return index +} + +// AsyncMergePartitionStats2GlobalStats is used to merge partition stats to global stats. +// it divides the merge task into two parts. +// - IOWorker: load stats from storage. it will load fmsketch, cmsketch, histogram and topn. and send them to cpuWorker. +// - CPUWorker: merge the stats from IOWorker and generate global stats. +// +// ┌────────────────────────┐ ┌───────────────────────┐ +// │ │ │ │ +// │ │ │ │ +// │ │ │ │ +// │ IOWorker │ │ CPUWorker │ +// │ │ ────► │ │ +// │ │ │ │ +// │ │ │ │ +// │ │ │ │ +// └────────────────────────┘ └───────────────────────┘ +type AsyncMergePartitionStats2GlobalStats struct { + is infoschema.InfoSchema + globalStats *GlobalStats + pool util.SessionPool + cmsketch chan mergeItem[*statistics.CMSketch] + fmsketch chan mergeItem[*statistics.FMSketch] + histogramAndTopn chan mergeItem[*StatsWrapper] + gpool *gp.Pool + allPartitionStats map[int64]*statistics.Table + PartitionDefinition map[int64]model.PartitionDefinition + tableInfo map[int64]*model.TableInfo + // key is partition id and histID + skipPartition map[skipItem]struct{} + getTableByPhysicalIDFn getTableByPhysicalIDFunc + callWithSCtxFunc callWithSCtxFunc + exitWhenErrChan chan struct{} + globalTableInfo *model.TableInfo + histIDs []int64 + globalStatsNDV []int64 + partitionIDs []int64 + partitionNum int + skipMissingPartitionStats bool +} + +// NewAsyncMergePartitionStats2GlobalStats creates a new AsyncMergePartitionStats2GlobalStats. +func NewAsyncMergePartitionStats2GlobalStats( + gpool *gp.Pool, + globalTableInfo *model.TableInfo, + histIDs []int64, + is infoschema.InfoSchema, + getTableByPhysicalIDFn getTableByPhysicalIDFunc, + callWithSCtxFunc callWithSCtxFunc) (*AsyncMergePartitionStats2GlobalStats, error) { + partitionNum := len(globalTableInfo.Partition.Definitions) + return &AsyncMergePartitionStats2GlobalStats{ + callWithSCtxFunc: callWithSCtxFunc, + cmsketch: make(chan mergeItem[*statistics.CMSketch], 5), + fmsketch: make(chan mergeItem[*statistics.FMSketch], 5), + histogramAndTopn: make(chan mergeItem[*StatsWrapper], 5), + PartitionDefinition: make(map[int64]model.PartitionDefinition), + tableInfo: make(map[int64]*model.TableInfo), + partitionIDs: make([]int64, 0, partitionNum), + exitWhenErrChan: make(chan struct{}), + skipPartition: make(map[skipItem]struct{}), + gpool: gpool, + allPartitionStats: make(map[int64]*statistics.Table), + globalTableInfo: globalTableInfo, + getTableByPhysicalIDFn: getTableByPhysicalIDFn, + histIDs: histIDs, + is: is, + partitionNum: partitionNum, + }, nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) prepare(sctx sessionctx.Context, isIndex bool) (err error) { + if len(a.histIDs) == 0 { + for _, col := range a.globalTableInfo.Columns { + // The virtual generated column stats can not be merged to the global stats. + if col.IsVirtualGenerated() { + continue + } + a.histIDs = append(a.histIDs, col.ID) + } + } + a.globalStats = newGlobalStats(len(a.histIDs)) + a.globalStats.Num = len(a.histIDs) + a.globalStatsNDV = make([]int64, 0, a.globalStats.Num) + // get all partition stats + for _, def := range a.globalTableInfo.Partition.Definitions { + partitionID := def.ID + a.partitionIDs = append(a.partitionIDs, partitionID) + a.PartitionDefinition[partitionID] = def + partitionTable, ok := a.getTableByPhysicalIDFn(a.is, partitionID) + if !ok { + return errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) + } + tableInfo := partitionTable.Meta() + a.tableInfo[partitionID] = tableInfo + realtimeCount, modifyCount, isNull, err := storage.StatsMetaCountAndModifyCount(sctx, partitionID) + if err != nil { + return err + } + if !isNull { + // In a partition, we will only update globalStats.Count once. + a.globalStats.Count += realtimeCount + a.globalStats.ModifyCount += modifyCount + } + err1 := skipPartition(sctx, partitionID, isIndex) + if err1 != nil { + // no idx so idx = 0 + err := a.dealWithSkipPartition(partitionID, isIndex, 0, err1) + if err != nil { + return err + } + if types.ErrPartitionStatsMissing.Equal(err1) { + continue + } + } + for idx, hist := range a.histIDs { + err1 := skipColumnPartition(sctx, partitionID, isIndex, hist) + if err1 != nil { + err := a.dealWithSkipPartition(partitionID, isIndex, idx, err1) + if err != nil { + return err + } + if types.ErrPartitionStatsMissing.Equal(err1) { + break + } + } + } + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealWithSkipPartition(partitionID int64, isIndex bool, idx int, err error) error { + switch { + case types.ErrPartitionStatsMissing.Equal(err): + return a.dealErrPartitionStatsMissing(partitionID) + case types.ErrPartitionColumnStatsMissing.Equal(err): + return a.dealErrPartitionColumnStatsMissing(isIndex, partitionID, idx) + default: + return err + } +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealErrPartitionStatsMissing(partitionID int64) error { + missingPart := fmt.Sprintf("partition `%s`", a.PartitionDefinition[partitionID].Name.L) + a.globalStats.MissingPartitionStats = append(a.globalStats.MissingPartitionStats, missingPart) + for _, histID := range a.histIDs { + a.skipPartition[skipItem{ + histID: histID, + partitionID: partitionID, + }] = struct{}{} + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealErrPartitionColumnStatsMissing(isIndex bool, partitionID int64, idx int) error { + var missingPart string + if isIndex { + missingPart = fmt.Sprintf("partition `%s` index `%s`", a.PartitionDefinition[partitionID].Name.L, a.tableInfo[partitionID].FindIndexNameByID(a.histIDs[idx])) + } else { + missingPart = fmt.Sprintf("partition `%s` column `%s`", a.PartitionDefinition[partitionID].Name.L, a.tableInfo[partitionID].FindColumnNameByID(a.histIDs[idx])) + } + if !a.skipMissingPartitionStats { + return types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", a.tableInfo[partitionID].Name.L, missingPart)) + } + a.globalStats.MissingPartitionStats = append(a.globalStats.MissingPartitionStats, missingPart) + a.skipPartition[skipItem{ + histID: a.histIDs[idx], + partitionID: partitionID, + }] = struct{}{} + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context, isIndex bool) (err error) { + defer func() { + if r := recover(); r != nil { + close(a.exitWhenErrChan) + err = errors.New(fmt.Sprint(r)) + } + }() + err = a.loadFmsketch(sctx, isIndex) + if err != nil { + close(a.exitWhenErrChan) + return err + } + close(a.fmsketch) + err = a.loadCMsketch(sctx, isIndex) + if err != nil { + close(a.exitWhenErrChan) + return err + } + close(a.cmsketch) + err = a.loadHistogramAndTopN(sctx, a.globalTableInfo, isIndex) + if err != nil { + close(a.exitWhenErrChan) + return err + } + close(a.histogramAndTopn) + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) { + defer func() { + if r := recover(); r != nil { + close(a.exitWhenErrChan) + err = errors.New(fmt.Sprint(r)) + } + }() + a.dealFMSketch() + select { + case <-a.exitWhenErrChan: + return nil + default: + for i := 0; i < a.globalStats.Num; i++ { + // Update the global NDV. + globalStatsNDV := a.globalStats.Fms[i].NDV() + if globalStatsNDV > a.globalStats.Count { + globalStatsNDV = a.globalStats.Count + } + a.globalStatsNDV = append(a.globalStatsNDV, globalStatsNDV) + a.globalStats.Fms[i].DestroyAndPutToPool() + } + } + err = a.dealCMSketch() + if err != nil { + return err + } + err = a.dealHistogramAndTopN(stmtCtx, sctx, opts, isIndex, tz, analyzeVersion) + if err != nil { + return err + } + return nil +} + +// Result returns the global stats. +func (a *AsyncMergePartitionStats2GlobalStats) Result() *GlobalStats { + return a.globalStats +} + +// MergePartitionStats2GlobalStats merges partition stats to global stats. +func (a *AsyncMergePartitionStats2GlobalStats) MergePartitionStats2GlobalStats( + sctx sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, + isIndex bool, +) error { + a.skipMissingPartitionStats = sctx.GetSessionVars().SkipMissingPartitionStats + tz := sctx.GetSessionVars().StmtCtx.TimeZone + analyzeVersion := sctx.GetSessionVars().AnalyzeVersion + stmtCtx := sctx.GetSessionVars().StmtCtx + return a.callWithSCtxFunc( + func(sctx sessionctx.Context) error { + err := a.prepare(sctx, isIndex) + if err != nil { + return err + } + ctx := context.Background() + metawg, _ := errgroup.WithContext(ctx) + mergeWg, _ := errgroup.WithContext(ctx) + metawg.Go(func() error { + return a.ioWorker(sctx, isIndex) + }) + mergeWg.Go(func() error { + return a.cpuWorker(stmtCtx, sctx, opts, isIndex, tz, analyzeVersion) + }) + err = metawg.Wait() + if err != nil { + if err1 := mergeWg.Wait(); err1 != nil { + err = stderrors.Join(err, err1) + } + return err + } + return mergeWg.Wait() + }, + ) +} + +func (a *AsyncMergePartitionStats2GlobalStats) loadFmsketch(sctx sessionctx.Context, isIndex bool) error { + for i := 0; i < a.globalStats.Num; i++ { + // load fmsketch from tikv + for _, partitionID := range a.partitionIDs { + _, ok := a.skipPartition[skipItem{ + histID: a.histIDs[i], + partitionID: partitionID, + }] + if ok { + continue + } + fmsketch, err := storage.FMSketchFromStorage(sctx, partitionID, int64(toSQLIndex(isIndex)), a.histIDs[i]) + if err != nil { + return err + } + select { + case a.fmsketch <- mergeItem[*statistics.FMSketch]{ + fmsketch, i, + }: + case <-a.exitWhenErrChan: + return nil + } + } + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Context, isIndex bool) error { + failpoint.Inject("PanicInIOWorker", nil) + for i := 0; i < a.globalStats.Num; i++ { + for _, partitionID := range a.partitionIDs { + _, ok := a.skipPartition[skipItem{ + histID: a.histIDs[i], + partitionID: partitionID, + }] + if ok { + continue + } + cmsketch, err := storage.CMSketchFromStorage(sctx, partitionID, toSQLIndex(isIndex), a.histIDs[i]) + if err != nil { + return err + } + a.cmsketch <- mergeItem[*statistics.CMSketch]{ + cmsketch, i, + } + select { + case a.cmsketch <- mergeItem[*statistics.CMSketch]{ + cmsketch, i, + }: + case <-a.exitWhenErrChan: + return nil + } + } + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx sessionctx.Context, tableInfo *model.TableInfo, isIndex bool) error { + for i := 0; i < a.globalStats.Num; i++ { + hists := make([]*statistics.Histogram, 0, a.partitionNum) + topn := make([]*statistics.TopN, 0, a.partitionNum) + for _, partitionID := range a.partitionIDs { + _, ok := a.skipPartition[skipItem{ + histID: a.histIDs[i], + partitionID: partitionID, + }] + if ok { + continue + } + h, err := storage.LoadHistogram(sctx, partitionID, toSQLIndex(isIndex), a.histIDs[i], tableInfo) + if err != nil { + return err + } + t, err := storage.TopNFromStorage(sctx, partitionID, toSQLIndex(isIndex), a.histIDs[i]) + if err != nil { + return err + } + hists = append(hists, h) + topn = append(topn, t) + } + select { + case a.histogramAndTopn <- mergeItem[*StatsWrapper]{ + NewStatsWrapper(hists, topn), i, + }: + case <-a.exitWhenErrChan: + return nil + } + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealFMSketch() { + failpoint.Inject("PanicInCPUWorker", nil) + for { + select { + case fms, ok := <-a.fmsketch: + if !ok { + return + } + if a.globalStats.Fms[fms.idx] == nil { + a.globalStats.Fms[fms.idx] = fms.item + } else { + a.globalStats.Fms[fms.idx].MergeFMSketch(fms.item) + } + case <-a.exitWhenErrChan: + return + } + } +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error { + for { + select { + case cms, ok := <-a.cmsketch: + if !ok { + return nil + } + if a.globalStats.Cms[cms.idx] == nil { + a.globalStats.Cms[cms.idx] = cms.item + } else { + err := a.globalStats.Cms[cms.idx].MergeCMSketch(cms.item) + if err != nil { + return err + } + } + case <-a.exitWhenErrChan: + return nil + } + } +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) { + for { + select { + case item, ok := <-a.histogramAndTopn: + if !ok { + return nil + } + var err error + var poppedTopN []statistics.TopNMeta + var allhg []*statistics.Histogram + wrapper := item.item + a.globalStats.TopN[item.idx], poppedTopN, allhg, err = mergeGlobalStatsTopN(a.gpool, sctx, wrapper, + tz, analyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex) + if err != nil { + return err + } + + // Merge histogram. + a.globalStats.Hg[item.idx], err = statistics.MergePartitionHist2GlobalHist(stmtCtx, allhg, poppedTopN, + int64(opts[ast.AnalyzeOptNumBuckets]), isIndex) + if err != nil { + return err + } + + // NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them. + for j := range a.globalStats.Hg[item.idx].Buckets { + a.globalStats.Hg[item.idx].Buckets[j].NDV = 0 + } + a.globalStats.Hg[item.idx].NDV = a.globalStatsNDV[item.idx] + case <-a.exitWhenErrChan: + return nil + } + } +} + +func skipPartition(sctx sessionctx.Context, partitionID int64, isIndex bool) error { + return storage.CheckSkipPartition(sctx, partitionID, toSQLIndex(isIndex)) +} + +func skipColumnPartition(sctx sessionctx.Context, partitionID int64, isIndex bool, histsID int64) error { + return storage.CheckSkipColumnPartiion(sctx, partitionID, toSQLIndex(isIndex), histsID) +} diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 7017105583969..6ba2eb9fb1c0d 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -120,7 +120,6 @@ func (h *Handle) Clear() { // NewHandle creates a Handle for update stats. func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool util.SessionPool, tracker sessionctx.SysProcTracker, autoAnalyzeProcIDGetter func() uint64) (*Handle, error) { cfg := config.GetGlobalConfig() - handle := &Handle{ gpool: gp.New(math.MaxInt16, time.Minute), ddlEventCh: make(chan *ddlUtil.Event, 1000), @@ -247,15 +246,14 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { } // MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. -func (h *Handle) MergePartitionStats2GlobalStatsByTableID( - sc sessionctx.Context, +func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, physicalID int64, isIndex bool, histIDs []int64, - allPartitionStats map[int64]*statistics.Table, + _ map[int64]*statistics.Table, ) (globalStats *globalstats.GlobalStats, err error) { - return globalstats.MergePartitionStats2GlobalStatsByTableID(sc, h.gpool, opts, is, physicalID, isIndex, histIDs, allPartitionStats, h.TableInfoByID, h.loadTablePartitionStats) + return globalstats.MergePartitionStats2GlobalStatsByTableID(sc, h.gpool, opts, is, physicalID, isIndex, histIDs, h.TableInfoByID, h.callWithSCtx) } func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) { @@ -280,10 +278,11 @@ func (h *Handle) mergePartitionStats2GlobalStats( globalTableInfo *model.TableInfo, isIndex bool, histIDs []int64, - allPartitionStats map[int64]*statistics.Table, + _ map[int64]*statistics.Table, ) (gstats *globalstats.GlobalStats, err error) { err = h.callWithSCtx(func(sctx sessionctx.Context) error { - gstats, err = globalstats.MergePartitionStats2GlobalStats(sctx, h.gpool, opts, is, globalTableInfo, isIndex, histIDs, allPartitionStats, h.TableInfoByID, h.loadTablePartitionStats) + gstats, err = globalstats.MergePartitionStats2GlobalStats(sctx, h.gpool, opts, is, globalTableInfo, isIndex, + histIDs, h.TableInfoByID, h.callWithSCtx) return err }) return diff --git a/statistics/handle/handletest/globalstats/BUILD.bazel b/statistics/handle/handletest/globalstats/BUILD.bazel index 6cb0ab3204c84..9a8e5cd524fbc 100644 --- a/statistics/handle/handletest/globalstats/BUILD.bazel +++ b/statistics/handle/handletest/globalstats/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 12, + shard_count = 14, deps = [ "//config", "//parser/model", diff --git a/statistics/handle/handletest/globalstats/globalstats_test.go b/statistics/handle/handletest/globalstats/globalstats_test.go index c0f2160d0e69e..35b51502502d7 100644 --- a/statistics/handle/handletest/globalstats/globalstats_test.go +++ b/statistics/handle/handletest/globalstats/globalstats_test.go @@ -59,6 +59,33 @@ func TestShowGlobalStats(t *testing.T) { require.Len(t, tk.MustQuery("show stats_healthy where partition_name='global'").Rows(), 1) } +func simpleTest(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int, key(a)) partition by hash(a) partitions 10") + tk.MustExec("insert into t values (1), (2), (3), (4), (5), (6), (8), (10), (20), (30)") + tk.MustExec("analyze table t with 0 topn, 1 buckets") +} + +func TestGlobalStatsPanicInIOWorker(t *testing.T) { + fpName := "github.com/pingcap/tidb/statistics/handle/globalstats/PanicInIOWorker" + require.NoError(t, failpoint.Enable(fpName, "panic(\"inject panic\")")) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsPanicInCPUWorker(t *testing.T) { + fpName := "github.com/pingcap/tidb/statistics/handle/globalstats/PanicInCPUWorker" + require.NoError(t, failpoint.Enable(fpName, "panic(\"inject panic\")")) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + func TestBuildGlobalLevelStats(t *testing.T) { store := testkit.CreateMockStore(t) testKit := testkit.NewTestKit(t, store) diff --git a/statistics/handle/storage/read.go b/statistics/handle/storage/read.go index ff7a94142b3cb..b03be31699fc2 100644 --- a/statistics/handle/storage/read.go +++ b/statistics/handle/storage/read.go @@ -118,6 +118,24 @@ func CMSketchAndTopNFromStorage(sctx sessionctx.Context, tblID int64, isIndex, h return statistics.DecodeCMSketchAndTopN(rows[0].GetBytes(0), topNRows) } +// CMSketchFromStorage reads CMSketch from storage +func CMSketchFromStorage(sctx sessionctx.Context, tblID int64, isIndex int, histID int64) (_ *statistics.CMSketch, err error) { + rows, _, err := util.ExecRows(sctx, "select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil || len(rows) == 0 { + return nil, err + } + return statistics.DecodeCMSketch(rows[0].GetBytes(0)) +} + +// TopNFromStorage reads TopN from storage +func TopNFromStorage(sctx sessionctx.Context, tblID int64, isIndex int, histID int64) (_ *statistics.TopN, err error) { + rows, _, err := util.ExecRows(sctx, "select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil || len(rows) == 0 { + return nil, err + } + return statistics.DecodeTopN(rows) +} + // FMSketchFromStorage reads FMSketch from storage func FMSketchFromStorage(sctx sessionctx.Context, tblID int64, isIndex, histID int64) (_ *statistics.FMSketch, err error) { rows, _, err := util.ExecRows(sctx, "select value from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) @@ -127,8 +145,32 @@ func FMSketchFromStorage(sctx sessionctx.Context, tblID int64, isIndex, histID i return statistics.DecodeFMSketch(rows[0].GetBytes(0)) } +// CheckSkipPartition checks if we can skip loading the partition. +func CheckSkipPartition(sctx sessionctx.Context, tblID int64, isIndex int) error { + rows, _, err := util.ExecRows(sctx, "select distinct_count from mysql.stats_histograms where table_id =%? and is_index = %?", tblID, isIndex) + if err != nil { + return err + } + if len(rows) == 0 { + return types.ErrPartitionStatsMissing + } + return nil +} + +// CheckSkipColumnPartiion checks if we can skip loading the partition. +func CheckSkipColumnPartiion(sctx sessionctx.Context, tblID int64, isIndex int, histsID int64) error { + rows, _, err := util.ExecRows(sctx, "select distinct_count from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histsID) + if err != nil { + return err + } + if len(rows) == 0 { + return types.ErrPartitionColumnStatsMissing + } + return nil +} + // ExtendedStatsFromStorage reads extended stats from storage. -func ExtendedStatsFromStorage(sctx sessionctx.Context, table *statistics.Table, physicalID int64, loadAll bool) (*statistics.Table, error) { +func ExtendedStatsFromStorage(sctx sessionctx.Context, table *statistics.Table, tableID int64, loadAll bool) (*statistics.Table, error) { failpoint.Inject("injectExtStatsLoadErr", func() { failpoint.Return(nil, errors.New("gofail extendedStatsFromStorage error")) }) @@ -139,7 +181,7 @@ func ExtendedStatsFromStorage(sctx sessionctx.Context, table *statistics.Table, table.ExtendedStats = statistics.NewExtendedStatsColl() } rows, _, err := util.ExecRows(sctx, "select name, status, type, column_ids, stats, version from mysql.stats_extended where table_id = %? and status in (%?, %?, %?) and version > %?", - physicalID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsDeleted, lastVersion) + tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsDeleted, lastVersion) if err != nil || len(rows) == 0 { return table, nil } @@ -365,12 +407,12 @@ func columnStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *stati } // TableStatsFromStorage loads table stats info from storage. -func TableStatsFromStorage(sctx sessionctx.Context, snapshot uint64, tableInfo *model.TableInfo, physicalID int64, loadAll bool, lease time.Duration, table *statistics.Table) (_ *statistics.Table, err error) { +func TableStatsFromStorage(sctx sessionctx.Context, snapshot uint64, tableInfo *model.TableInfo, tableID int64, loadAll bool, lease time.Duration, table *statistics.Table) (_ *statistics.Table, err error) { // If table stats is pseudo, we also need to copy it, since we will use the column stats when // the average error rate of it is small. if table == nil || snapshot > 0 { histColl := statistics.HistColl{ - PhysicalID: physicalID, + PhysicalID: tableID, HavePhysicalID: true, Columns: make(map[int64]*statistics.Column, len(tableInfo.Columns)), Indices: make(map[int64]*statistics.Index, len(tableInfo.Indices)), @@ -384,14 +426,14 @@ func TableStatsFromStorage(sctx sessionctx.Context, snapshot uint64, tableInfo * } table.Pseudo = false - realtimeCount, modidyCount, isNull, err := StatsMetaCountAndModifyCount(sctx, physicalID) + realtimeCount, modidyCount, isNull, err := StatsMetaCountAndModifyCount(sctx, tableID) if err != nil || isNull { return nil, err } table.ModifyCount = modidyCount table.RealtimeCount = realtimeCount - rows, _, err := util.ExecRows(sctx, "select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %?", physicalID) + rows, _, err := util.ExecRows(sctx, "select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %?", tableID) // Check deleted table. if err != nil || len(rows) == 0 { return nil, nil @@ -406,7 +448,34 @@ func TableStatsFromStorage(sctx sessionctx.Context, snapshot uint64, tableInfo * return nil, err } } - return ExtendedStatsFromStorage(sctx, table, physicalID, loadAll) + return ExtendedStatsFromStorage(sctx, table, tableID, loadAll) +} + +// LoadHistogram will load histogram from storage. +func LoadHistogram(sctx sessionctx.Context, tableID int64, isIndex int, histID int64, tableInfo *model.TableInfo) (*statistics.Histogram, error) { + row, _, err := util.ExecRows(sctx, "select distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, histID) + if err != nil || len(row) == 0 { + return nil, err + } + distinct := row[0].GetInt64(0) + histVer := row[0].GetUint64(1) + nullCount := row[0].GetInt64(2) + var totColSize int64 + var corr float64 + var tp types.FieldType + if isIndex == 0 { + totColSize = row[0].GetInt64(3) + corr = row[0].GetFloat64(6) + for _, colInfo := range tableInfo.Columns { + if histID != colInfo.ID { + continue + } + tp = colInfo.FieldType + break + } + return HistogramFromStorage(sctx, tableID, histID, &tp, distinct, isIndex, histVer, nullCount, totColSize, corr) + } + return HistogramFromStorage(sctx, tableID, histID, types.NewFieldType(mysql.TypeBlob), distinct, isIndex, histVer, nullCount, 0, 0) } // LoadNeededHistograms will load histograms for those needed columns/indices.