diff --git a/pkg/planner/core/rule_collect_plan_stats.go b/pkg/planner/core/rule_collect_plan_stats.go new file mode 100644 index 0000000000000..eee774baefed6 --- /dev/null +++ b/pkg/planner/core/rule_collect_plan_stats.go @@ -0,0 +1,310 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "context" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/planner/util" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +type collectPredicateColumnsPoint struct{} + +func (collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPlan, _ *util.LogicalOptimizeOp) (LogicalPlan, bool, error) { + planChanged := false + if plan.SCtx().GetSessionVars().InRestrictedSQL { + return plan, planChanged, nil + } + predicateNeeded := variable.EnableColumnTracking.Load() + syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait.Load() + histNeeded := syncWait > 0 + predicateColumns, histNeededColumns, visitedPhysTblIDs := CollectColumnStatsUsage(plan, predicateNeeded, histNeeded) + if len(predicateColumns) > 0 { + plan.SCtx().UpdateColStatsUsage(predicateColumns) + } + + // Prepare the table metadata to avoid repeatedly fetching from the infoSchema below, and trigger extra sync/async + // stats loading for the determinate mode. + is := plan.SCtx().GetInfoSchema().(infoschema.InfoSchema) + tblID2Tbl := make(map[int64]table.Table) + visitedPhysTblIDs.ForEach(func(physicalTblID int) { + tbl, _ := infoschema.FindTableByTblOrPartID(is, int64(physicalTblID)) + if tbl == nil { + return + } + tblID2Tbl[int64(physicalTblID)] = tbl + }) + + // collect needed virtual columns from already needed columns + // Note that we use the dependingVirtualCols only to collect needed index stats, but not to trigger stats loading on + // the virtual columns themselves. It's because virtual columns themselves don't have statistics, while expression + // indexes, which are indexes on virtual columns, have statistics. We don't waste the resource here now. + dependingVirtualCols := CollectDependingVirtualCols(tblID2Tbl, histNeededColumns) + + histNeededIndices := collectSyncIndices(plan.SCtx(), append(histNeededColumns, dependingVirtualCols...), tblID2Tbl) + histNeededItems := collectHistNeededItems(histNeededColumns, histNeededIndices) + if histNeeded && len(histNeededItems) > 0 { + err := RequestLoadStats(plan.SCtx(), histNeededItems, syncWait) + return plan, planChanged, err + } + return plan, planChanged, nil +} + +func (collectPredicateColumnsPoint) name() string { + return "collect_predicate_columns_point" +} + +type syncWaitStatsLoadPoint struct{} + +func (syncWaitStatsLoadPoint) optimize(_ context.Context, plan LogicalPlan, _ *util.LogicalOptimizeOp) (LogicalPlan, bool, error) { + planChanged := false + if plan.SCtx().GetSessionVars().InRestrictedSQL { + return plan, planChanged, nil + } + if plan.SCtx().GetSessionVars().StmtCtx.IsSyncStatsFailed { + return plan, planChanged, nil + } + err := SyncWaitStatsLoad(plan) + return plan, planChanged, err +} + +func (syncWaitStatsLoadPoint) name() string { + return "sync_wait_stats_load_point" +} + +// RequestLoadStats send load column/index stats requests to stats handle +func RequestLoadStats(ctx PlanContext, neededHistItems []model.StatsLoadItem, syncWait int64) error { + maxExecutionTime := ctx.GetSessionVars().GetMaxExecutionTime() + if maxExecutionTime > 0 && maxExecutionTime < uint64(syncWait) { + syncWait = int64(maxExecutionTime) + } + failpoint.Inject("assertSyncWaitFailed", func(val failpoint.Value) { + if val.(bool) { + if syncWait != 1 { + panic("syncWait should be 1(ms)") + } + } + }) + var timeout = time.Duration(syncWait * time.Millisecond.Nanoseconds()) + stmtCtx := ctx.GetSessionVars().StmtCtx + err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededHistItems, timeout) + if err != nil { + stmtCtx.IsSyncStatsFailed = true + if variable.StatsLoadPseudoTimeout.Load() { + logutil.BgLogger().Warn("RequestLoadStats failed", zap.Error(err)) + stmtCtx.AppendWarning(err) + return nil + } + logutil.BgLogger().Warn("RequestLoadStats failed", zap.Error(err)) + return err + } + return nil +} + +// SyncWaitStatsLoad sync-wait for stats load until timeout +func SyncWaitStatsLoad(plan LogicalPlan) error { + stmtCtx := plan.SCtx().GetSessionVars().StmtCtx + if len(stmtCtx.StatsLoad.NeededItems) <= 0 { + return nil + } + err := domain.GetDomain(plan.SCtx()).StatsHandle().SyncWaitStatsLoad(stmtCtx) + if err != nil { + stmtCtx.IsSyncStatsFailed = true + if variable.StatsLoadPseudoTimeout.Load() { + logutil.BgLogger().Warn("SyncWaitStatsLoad failed", zap.Error(err)) + stmtCtx.AppendWarning(err) + return nil + } + logutil.BgLogger().Error("SyncWaitStatsLoad failed", zap.Error(err)) + return err + } + return nil +} + +// CollectDependingVirtualCols collects the virtual columns that depend on the needed columns, and returns them in a new slice. +// +// Why do we need this? +// It's mainly for stats sync loading. +// Currently, virtual columns themselves don't have statistics. But expression indexes, which are indexes on virtual +// columns, have statistics. We need to collect needed virtual columns, then needed expression index stats can be +// collected for sync loading. +// In normal cases, if a virtual column can be used, which means related statistics may be needed, the corresponding +// expressions in the query must have already been replaced with the virtual column before here. So we just need to treat +// them like normal columns in stats sync loading, which means we just extract the Column from the expressions, the +// virtual columns we want will be there. +// However, in some cases (the mv index case now), the expressions are not replaced with the virtual columns before here. +// Instead, we match the expression in the query against the expression behind the virtual columns after here when +// building the access paths. This means we are unable to known what virtual columns will be needed by just extracting +// the Column from the expressions here. So we need to manually collect the virtual columns that may be needed. +// +// Note 1: As long as a virtual column depends on the needed columns, it will be collected. This could collect some virtual +// columns that are not actually needed. +// It's OK because that's how sync loading is expected. Sync loading only needs to ensure all actually needed stats are +// triggered to be loaded. Other logic of sync loading also works like this. +// If we want to collect only the virtual columns that are actually needed, we need to make the checking logic here exactly +// the same as the logic for generating the access paths, which will make the logic here very complicated. +// +// Note 2: Only direct dependencies are considered here. +// If a virtual column depends on another virtual column, and the latter depends on the needed columns, then the former +// will not be collected. +// For example: create table t(a int, b int, c int as (a+b), d int as (c+1)); If a is needed, then c will be collected, +// but d will not be collected. +// It's because currently it's impossible that statistics related to indirectly depending columns are actually needed. +// If we need to check indirect dependency some day, we can easily extend the logic here. +func CollectDependingVirtualCols(tblID2Tbl map[int64]table.Table, neededItems []model.StatsLoadItem) []model.StatsLoadItem { + generatedCols := make([]model.StatsLoadItem, 0) + + // group the neededItems by table id + tblID2neededColIDs := make(map[int64][]int64, len(tblID2Tbl)) + for _, item := range neededItems { + if item.IsIndex { + continue + } + tblID2neededColIDs[item.TableID] = append(tblID2neededColIDs[item.TableID], item.ID) + } + + // process them by table id + for tblID, colIDs := range tblID2neededColIDs { + tbl := tblID2Tbl[tblID] + if tbl == nil { + continue + } + // collect the needed columns on this table into a set for faster lookup + colNameSet := make(map[string]struct{}, len(colIDs)) + for _, colID := range colIDs { + name := tbl.Meta().FindColumnNameByID(colID) + if name == "" { + continue + } + colNameSet[name] = struct{}{} + } + // iterate columns in this table, and collect the virtual columns that depend on the needed columns + for _, col := range tbl.Cols() { + // only handles virtual columns + if !col.IsVirtualGenerated() { + continue + } + // If this column is already needed, then skip it. + if _, ok := colNameSet[col.Name.L]; ok { + continue + } + // If there exists a needed column that is depended on by this virtual column, + // then we think this virtual column is needed. + for depCol := range col.Dependences { + if _, ok := colNameSet[depCol]; ok { + generatedCols = append(generatedCols, model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tblID, ID: col.ID, IsIndex: false}, FullLoad: true}) + break + } + } + } + } + return generatedCols +} + +// collectSyncIndices will collect the indices which includes following conditions: +// 1. the indices contained the any one of histNeededColumns, eg: histNeededColumns contained A,B columns, and idx_a is +// composed up by A column, then we thought the idx_a should be collected +// 2. The stats condition of idx_a can't meet IsFullLoad, which means its stats was evicted previously +func collectSyncIndices(ctx PlanContext, + histNeededColumns []model.StatsLoadItem, + tblID2Tbl map[int64]table.Table, +) map[model.TableItemID]struct{} { + histNeededIndices := make(map[model.TableItemID]struct{}) + stats := domain.GetDomain(ctx).StatsHandle() + for _, column := range histNeededColumns { + if column.IsIndex { + continue + } + tbl := tblID2Tbl[column.TableID] + if tbl == nil { + continue + } + colName := tbl.Meta().FindColumnNameByID(column.ID) + if colName == "" { + continue + } + for _, idx := range tbl.Indices() { + if idx.Meta().State != model.StatePublic { + continue + } + idxCol := idx.Meta().FindColumnByName(colName) + idxID := idx.Meta().ID + if idxCol != nil { + tblStats := stats.GetTableStats(tbl.Meta()) + if tblStats == nil || tblStats.Pseudo { + continue + } + _, loadNeeded := tblStats.IndexIsLoadNeeded(idxID) + if !loadNeeded { + continue + } + histNeededIndices[model.TableItemID{TableID: column.TableID, ID: idxID, IsIndex: true}] = struct{}{} + } + } + } + return histNeededIndices +} + +func collectHistNeededItems(histNeededColumns []model.StatsLoadItem, histNeededIndices map[model.TableItemID]struct{}) (histNeededItems []model.StatsLoadItem) { + histNeededItems = make([]model.StatsLoadItem, 0, len(histNeededColumns)+len(histNeededIndices)) + for idx := range histNeededIndices { + histNeededItems = append(histNeededItems, model.StatsLoadItem{TableItemID: idx, FullLoad: true}) + } + histNeededItems = append(histNeededItems, histNeededColumns...) + return +} + +func recordTableRuntimeStats(sctx PlanContext, tbls map[int64]struct{}) { + tblStats := sctx.GetSessionVars().StmtCtx.TableStats + if tblStats == nil { + tblStats = map[int64]any{} + } + for tblID := range tbls { + tblJSONStats, skip, err := recordSingleTableRuntimeStats(sctx, tblID) + if err != nil { + logutil.BgLogger().Warn("record table json stats failed", zap.Int64("tblID", tblID), zap.Error(err)) + } + if tblJSONStats == nil && !skip { + logutil.BgLogger().Warn("record table json stats failed due to empty", zap.Int64("tblID", tblID)) + } + tblStats[tblID] = tblJSONStats + } + sctx.GetSessionVars().StmtCtx.TableStats = tblStats +} + +func recordSingleTableRuntimeStats(sctx PlanContext, tblID int64) (stats *statistics.Table, skip bool, err error) { + dom := domain.GetDomain(sctx) + statsHandle := dom.StatsHandle() + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + tbl, ok := is.TableByID(tblID) + if !ok { + return nil, false, nil + } + tableInfo := tbl.Meta() + stats = statsHandle.GetTableStats(tableInfo) + // Skip the warning if the table is a temporary table because the temporary table doesn't have stats. + skip = tableInfo.TempTableType != model.TempTableNone + return stats, skip, nil +} diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go new file mode 100644 index 0000000000000..ec89ed7b52361 --- /dev/null +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -0,0 +1,518 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncload + +import ( + "fmt" + "math/rand" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/storage" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +type statsSyncLoad struct { + statsHandle statstypes.StatsHandle + StatsLoad statstypes.StatsLoad +} + +// NewStatsSyncLoad creates a new StatsSyncLoad. +func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLoad { + s := &statsSyncLoad{statsHandle: statsHandle} + cfg := config.GetGlobalConfig() + s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + return s +} + +type statsWrapper struct { + colInfo *model.ColumnInfo + idxInfo *model.IndexInfo + col *statistics.Column + idx *statistics.Index +} + +// SendLoadRequests send neededColumns requests +func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error { + remainedItems := s.removeHistLoadedColumns(neededHistItems) + + failpoint.Inject("assertSyncLoadItems", func(val failpoint.Value) { + if sc.OptimizeTracer != nil { + count := val.(int) + if len(remainedItems) != count { + panic("remained items count wrong") + } + } + }) + + if len(remainedItems) <= 0 { + return nil + } + sc.StatsLoad.Timeout = timeout + sc.StatsLoad.NeededItems = remainedItems + sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) + tasks := make([]*statstypes.NeededItemTask, 0) + for _, item := range remainedItems { + task := &statstypes.NeededItemTask{ + Item: item, + ToTimeout: time.Now().Local().Add(timeout), + ResultCh: sc.StatsLoad.ResultCh, + } + tasks = append(tasks, task) + } + timer := time.NewTimer(timeout) + defer timer.Stop() + for _, task := range tasks { + select { + case s.StatsLoad.NeededItemsCh <- task: + continue + case <-timer.C: + return errors.New("sync load stats channel is full and timeout sending task to channel") + } + } + sc.StatsLoad.LoadStartTime = time.Now() + return nil +} + +// SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout +func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error { + if len(sc.StatsLoad.NeededItems) <= 0 { + return nil + } + var errorMsgs []string + defer func() { + if len(errorMsgs) > 0 { + logutil.BgLogger().Warn("SyncWaitStatsLoad meets error", + zap.Strings("errors", errorMsgs)) + } + sc.StatsLoad.NeededItems = nil + }() + resultCheckMap := map[model.TableItemID]struct{}{} + for _, col := range sc.StatsLoad.NeededItems { + resultCheckMap[col.TableItemID] = struct{}{} + } + metrics.SyncLoadCounter.Inc() + timer := time.NewTimer(sc.StatsLoad.Timeout) + defer timer.Stop() + for { + select { + case result, ok := <-sc.StatsLoad.ResultCh: + if !ok { + return errors.New("sync load stats channel closed unexpectedly") + } + if result.HasError() { + errorMsgs = append(errorMsgs, result.ErrorMsg()) + } + delete(resultCheckMap, result.Item) + if len(resultCheckMap) == 0 { + metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) + return nil + } + case <-timer.C: + metrics.SyncLoadTimeoutCounter.Inc() + return errors.New("sync load stats timeout") + } + } +} + +// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache. +func (s *statsSyncLoad) removeHistLoadedColumns(neededItems []model.StatsLoadItem) []model.StatsLoadItem { + remainedItems := make([]model.StatsLoadItem, 0, len(neededItems)) + for _, item := range neededItems { + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + continue + } + if item.IsIndex { + _, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) + if loadNeeded { + remainedItems = append(remainedItems, item) + } + continue + } + _, loadNeeded := tbl.ColumnIsLoadNeeded(item.ID, item.FullLoad) + if loadNeeded { + remainedItems = append(remainedItems, item) + } + } + return remainedItems +} + +// AppendNeededItem appends needed columns/indices to ch, it is only used for test +func (s *statsSyncLoad) AppendNeededItem(task *statstypes.NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case s.StatsLoad.NeededItemsCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +var errExit = errors.New("Stop loading since domain is closed") + +// SubLoadWorker loads hist data for each column +func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) { + defer func() { + exitWg.Done() + logutil.BgLogger().Info("SubLoadWorker exited.") + }() + // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry + var lastTask *statstypes.NeededItemTask + for { + task, err := s.HandleOneTask(sctx, lastTask, exit) + lastTask = task + if err != nil { + switch err { + case errExit: + return + default: + // To avoid the thundering herd effect + // thundering herd effect: Everyone tries to retry a large number of requests simultaneously when a problem occurs. + r := rand.Intn(500) + time.Sleep(s.statsHandle.Lease()/10 + time.Duration(r)*time.Microsecond) + continue + } + } + } +} + +// HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. +func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) { + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("stats loading panicked", zap.Any("error", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + }() + if lastTask == nil { + task, err = s.drainColTask(sctx, exit) + if err != nil { + if err != errExit { + logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) + } + return task, err + } + } else { + task = lastTask + } + resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) { + return s.handleOneItemTask(sctx, task) + }) + timeout := time.Until(task.ToTimeout) + select { + case result := <-resultChan: + if result.Err == nil { + slr := result.Val.(*stmtctx.StatsLoadResult) + if slr.Error != nil { + return task, slr.Error + } + task.ResultCh <- *slr + return nil, nil + } + return task, result.Err + case <-time.After(timeout): + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) + return task, nil + } +} + +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) { + defer func() { + // recover for each task, worker keeps working + if r := recover(); r != nil { + logutil.BgLogger().Error("handleOneItemTask panicked", zap.Any("recover", r), zap.Stack("stack")) + err = errors.Errorf("stats loading panicked: %v", r) + } + }() + result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID} + item := result.Item + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + return result, nil + } + wrapper := &statsWrapper{} + if item.IsIndex { + index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) + if !loadNeeded { + return result, nil + } + if index != nil { + wrapper.idxInfo = index.Info + } else { + wrapper.idxInfo = tbl.ColAndIdxExistenceMap.GetIndex(item.ID) + } + } else { + col, loadNeeded := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad) + if !loadNeeded { + return result, nil + } + if col != nil { + wrapper.colInfo = col.Info + } else { + wrapper.colInfo = tbl.ColAndIdxExistenceMap.GetCol(item.ID) + } + } + t := time.Now() + needUpdate := false + wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad) + if err != nil { + result.Error = err + return result, err + } + if item.IsIndex { + if wrapper.idxInfo != nil { + needUpdate = true + } + } else { + if wrapper.colInfo != nil { + needUpdate = true + } + } + metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) + if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) { + return result, nil + } + return nil, nil +} + +// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously +func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper, isPkIsHandle bool, fullLoad bool) (*statsWrapper, error) { + failpoint.Inject("mockReadStatsForOnePanic", nil) + failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil, errors.New("gofail ReadStatsForOne error")) + } + }) + loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch + var hg *statistics.Histogram + var err error + isIndexFlag := int64(0) + hg, lastAnalyzePos, statsVer, flag, err := storage.HistMetaFromStorage(sctx, &item, w.colInfo) + if err != nil { + return nil, err + } + if hg == nil { + logutil.BgLogger().Error("fail to get hist meta for this histogram, possibly a deleted one", zap.Int64("table_id", item.TableID), + zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex)) + return nil, errors.Trace(fmt.Errorf("fail to get hist meta for this histogram, table_id:%v, hist_id:%v, is_index:%v", item.TableID, item.ID, item.IsIndex)) + } + if item.IsIndex { + isIndexFlag = 1 + } + var cms *statistics.CMSketch + var topN *statistics.TopN + var fms *statistics.FMSketch + if fullLoad { + if item.IsIndex { + hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, types.NewFieldType(mysql.TypeBlob), hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation) + if err != nil { + return nil, errors.Trace(err) + } + } else { + hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, &w.colInfo.FieldType, hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation) + if err != nil { + return nil, errors.Trace(err) + } + } + cms, topN, err = storage.CMSketchAndTopNFromStorage(sctx, item.TableID, isIndexFlag, item.ID) + if err != nil { + return nil, errors.Trace(err) + } + if loadFMSketch { + fms, err = storage.FMSketchFromStorage(sctx, item.TableID, isIndexFlag, item.ID) + if err != nil { + return nil, errors.Trace(err) + } + } + } + if item.IsIndex { + idxHist := &statistics.Index{ + Histogram: *hg, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + Info: w.idxInfo, + StatsVer: statsVer, + Flag: flag, + PhysicalID: item.TableID, + } + if statsVer != statistics.Version0 { + if fullLoad { + idxHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + idxHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + } + lastAnalyzePos.Copy(&idxHist.LastAnalyzePos) + w.idx = idxHist + } else { + colHist := &statistics.Column{ + PhysicalID: item.TableID, + Histogram: *hg, + Info: w.colInfo, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + IsHandle: isPkIsHandle && mysql.HasPriKeyFlag(w.colInfo.GetFlag()), + StatsVer: statsVer, + } + if colHist.StatsAvailable() { + if fullLoad { + colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + colHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + } + w.col = colHist + } + return w, nil +} + +// drainColTask will hang until a column task can return, and either task or error will be returned. +func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) { + // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh + for { + select { + case <-exit: + return nil, errExit + case task, ok := <-s.StatsLoad.NeededItemsCh: + if !ok { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // if the task has already timeout, no sql is sync-waiting for it, + // so do not handle it just now, put it to another channel with lower priority + if time.Now().After(task.ToTimeout) { + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + continue + } + return task, nil + case task, ok := <-s.StatsLoad.TimeoutItemsCh: + select { + case <-exit: + return nil, errExit + case task0, ok0 := <-s.StatsLoad.NeededItemsCh: + if !ok0 { + return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + } + // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh + s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + return task0, nil + default: + if !ok { + return nil, errors.New("drainColTask: cannot read from TimeoutColumnsCh, maybe the chan is closed") + } + // NeededColumnsCh is empty now, handle task from TimeoutColumnsCh + return task, nil + } + } + } +} + +// writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task. +func (*statsSyncLoad) writeToTimeoutChan(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask) { + select { + case taskCh <- task: + default: + } +} + +// writeToChanWithTimeout writes a task to a channel and blocks until timeout. +func (*statsSyncLoad) writeToChanWithTimeout(taskCh chan *statstypes.NeededItemTask, task *statstypes.NeededItemTask, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case taskCh <- task: + case <-timer.C: + return errors.New("Channel is full and timeout writing to channel") + } + return nil +} + +// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. +func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) + } + }() + select { + case resultCh <- rs: + default: + } +} + +// updateCachedItem updates the column/index hist to global statsCache. +func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) { + s.StatsLoad.Lock() + defer s.StatsLoad.Unlock() + // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions + // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. + tbl, ok := s.statsHandle.Get(item.TableID) + if !ok { + return true + } + if !item.IsIndex && colHist != nil { + c, ok := tbl.Columns[item.ID] + // - If the stats is fully loaded, + // - If the stats is meta-loaded and we also just need the meta. + if ok && (c.IsFullLoad() || !fullLoaded) { + return true + } + tbl = tbl.Copy() + tbl.Columns[item.ID] = colHist + // If the column is analyzed we refresh the map for the possible change. + if colHist.StatsAvailable() { + tbl.ColAndIdxExistenceMap.InsertCol(item.ID, colHist.Info, true) + } + // All the objects shares the same stats version. Update it here. + if colHist.StatsVer != statistics.Version0 { + tbl.StatsVer = statistics.Version0 + } + } else if item.IsIndex && idxHist != nil { + index, ok := tbl.Indices[item.ID] + // - If the stats is fully loaded, + // - If the stats is meta-loaded and we also just need the meta. + if ok && (index.IsFullLoad() || !fullLoaded) { + return true + } + tbl = tbl.Copy() + tbl.Indices[item.ID] = idxHist + // If the index is analyzed we refresh the map for the possible change. + if idxHist.IsAnalyzed() { + tbl.ColAndIdxExistenceMap.InsertIndex(item.ID, idxHist.Info, true) + // All the objects shares the same stats version. Update it here. + tbl.StatsVer = statistics.Version0 + } + } + s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil) + return true +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index cc820f2b34d03..c925f52d08e64 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1196,7 +1196,7 @@ type SessionVars struct { ReadConsistency ReadConsistencyLevel // StatsLoadSyncWait indicates how long to wait for stats load before timeout. - StatsLoadSyncWait int64 + StatsLoadSyncWait atomic.Int64 // SysdateIsNow indicates whether Sysdate is an alias of Now function SysdateIsNow bool @@ -1710,7 +1710,6 @@ func NewSessionVars(hctx HookContext) *SessionVars { TMPTableSize: DefTiDBTmpTableMaxSize, MPPStoreFailTTL: DefTiDBMPPStoreFailTTL, Rng: mathutil.NewWithTime(), - StatsLoadSyncWait: StatsLoadSyncWait.Load(), EnableLegacyInstanceScope: DefEnableLegacyInstanceScope, RemoveOrderbyInSubquery: DefTiDBRemoveOrderbyInSubquery, EnableSkewDistinctAgg: DefTiDBSkewDistinctAgg, @@ -1762,6 +1761,11 @@ func NewSessionVars(hctx HookContext) *SessionVars { vars.DiskTracker = disk.NewTracker(memory.LabelForSession, -1) vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery) vars.MemTracker.IsRootTrackerOfSess = true +<<<<<<< HEAD:sessionctx/variable/session.go +======= + vars.MemTracker.Killer = &vars.SQLKiller + vars.StatsLoadSyncWait.Store(StatsLoadSyncWait.Load()) +>>>>>>> 9bb3697349b (statistics: upgrade stats timeout checkpoint after it timeouts (#52424)):pkg/sessionctx/variable/session.go for _, engine := range config.GetGlobalConfig().IsolationRead.Engines { switch engine { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 500ddfb02b19d..ed91f6deaf1ca 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1943,7 +1943,7 @@ var defaultSysVars = []*SysVar{ }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBStatsLoadSyncWait, Value: strconv.Itoa(DefTiDBStatsLoadSyncWait), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error { - s.StatsLoadSyncWait = TidbOptInt64(val, DefTiDBStatsLoadSyncWait) + s.StatsLoadSyncWait.Store(TidbOptInt64(val, DefTiDBStatsLoadSyncWait)) return nil }, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {