From ade54b5345593e545d62925d5d1175d6f2018899 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Fri, 20 Oct 2023 10:48:30 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #47778 Signed-off-by: ti-chi-bot --- pkg/statistics/handle/storage/json.go | 344 ++++++++++ .../handle/storage/stats_read_writer.go | 617 ++++++++++++++++++ pkg/statistics/handle/util/util.go | 296 +++++++++ 3 files changed, 1257 insertions(+) create mode 100644 pkg/statistics/handle/storage/json.go create mode 100644 pkg/statistics/handle/storage/stats_read_writer.go create mode 100644 pkg/statistics/handle/util/util.go diff --git a/pkg/statistics/handle/storage/json.go b/pkg/statistics/handle/storage/json.go new file mode 100644 index 0000000000000..a174d38aaa566 --- /dev/null +++ b/pkg/statistics/handle/storage/json.go @@ -0,0 +1,344 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "bytes" + "encoding/json" + "io" + "sync/atomic" + "time" + + "github.com/klauspost/compress/gzip" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + compressutil "github.com/pingcap/tidb/pkg/util/compress" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/memory" + "go.uber.org/zap" +) + +func dumpJSONExtendedStats(statsColl *statistics.ExtendedStatsColl) []*util.JSONExtendedStats { + if statsColl == nil || len(statsColl.Stats) == 0 { + return nil + } + stats := make([]*util.JSONExtendedStats, 0, len(statsColl.Stats)) + for name, item := range statsColl.Stats { + js := &util.JSONExtendedStats{ + StatsName: name, + ColIDs: item.ColIDs, + Tp: item.Tp, + ScalarVals: item.ScalarVals, + StringVals: item.StringVals, + } + stats = append(stats, js) + } + return stats +} + +func extendedStatsFromJSON(statsColl []*util.JSONExtendedStats) *statistics.ExtendedStatsColl { + if len(statsColl) == 0 { + return nil + } + stats := statistics.NewExtendedStatsColl() + for _, js := range statsColl { + item := &statistics.ExtendedStatsItem{ + ColIDs: js.ColIDs, + Tp: js.Tp, + ScalarVals: js.ScalarVals, + StringVals: js.StringVals, + } + stats.Stats[js.StatsName] = item + } + return stats +} + +func dumpJSONCol(hist *statistics.Histogram, cmsketch *statistics.CMSketch, topn *statistics.TopN, fmsketch *statistics.FMSketch, statsVer *int64) *util.JSONColumn { + jsonCol := &util.JSONColumn{ + Histogram: statistics.HistogramToProto(hist), + NullCount: hist.NullCount, + TotColSize: hist.TotColSize, + LastUpdateVersion: hist.LastUpdateVersion, + Correlation: hist.Correlation, + StatsVer: statsVer, + } + if cmsketch != nil || topn != nil { + jsonCol.CMSketch = statistics.CMSketchToProto(cmsketch, topn) + } + if fmsketch != nil { + jsonCol.FMSketch = statistics.FMSketchToProto(fmsketch) + } + return jsonCol +} + +// GenJSONTableFromStats generate jsonTable from tableInfo and stats +func GenJSONTableFromStats(sctx sessionctx.Context, dbName string, tableInfo *model.TableInfo, tbl *statistics.Table) (*util.JSONTable, error) { + tracker := memory.NewTracker(memory.LabelForAnalyzeMemory, -1) + tracker.AttachTo(sctx.GetSessionVars().MemTracker) + defer tracker.Detach() + jsonTbl := &util.JSONTable{ + DatabaseName: dbName, + TableName: tableInfo.Name.L, + Columns: make(map[string]*util.JSONColumn, len(tbl.Columns)), + Indices: make(map[string]*util.JSONColumn, len(tbl.Indices)), + Count: tbl.RealtimeCount, + ModifyCount: tbl.ModifyCount, + Version: tbl.Version, + } + for _, col := range tbl.Columns { + sc := stmtctx.NewStmtCtxWithTimeZone(time.UTC) + hist, err := col.ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return nil, errors.Trace(err) + } + proto := dumpJSONCol(hist, col.CMSketch, col.TopN, col.FMSketch, &col.StatsVer) + tracker.Consume(proto.TotalMemoryUsage()) + if atomic.LoadUint32(&sctx.GetSessionVars().Killed) == 1 { + return nil, errors.Trace(statistics.ErrQueryInterrupted) + } + jsonTbl.Columns[col.Info.Name.L] = proto + col.FMSketch.DestroyAndPutToPool() + } + for _, idx := range tbl.Indices { + proto := dumpJSONCol(&idx.Histogram, idx.CMSketch, idx.TopN, nil, &idx.StatsVer) + tracker.Consume(proto.TotalMemoryUsage()) + if atomic.LoadUint32(&sctx.GetSessionVars().Killed) == 1 { + return nil, errors.Trace(statistics.ErrQueryInterrupted) + } + jsonTbl.Indices[idx.Info.Name.L] = proto + } + jsonTbl.ExtStats = dumpJSONExtendedStats(tbl.ExtendedStats) + return jsonTbl, nil +} + +// TableStatsFromJSON loads statistic from JSONTable and return the Table of statistic. +func TableStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl *util.JSONTable) (*statistics.Table, error) { + newHistColl := statistics.HistColl{ + PhysicalID: physicalID, + HavePhysicalID: true, + RealtimeCount: jsonTbl.Count, + ModifyCount: jsonTbl.ModifyCount, + Columns: make(map[int64]*statistics.Column, len(jsonTbl.Columns)), + Indices: make(map[int64]*statistics.Index, len(jsonTbl.Indices)), + } + tbl := &statistics.Table{ + HistColl: newHistColl, + } + for id, jsonIdx := range jsonTbl.Indices { + for _, idxInfo := range tableInfo.Indices { + if idxInfo.Name.L != id { + continue + } + hist := statistics.HistogramFromProto(jsonIdx.Histogram) + hist.ID, hist.NullCount, hist.LastUpdateVersion, hist.Correlation = idxInfo.ID, jsonIdx.NullCount, jsonIdx.LastUpdateVersion, jsonIdx.Correlation + cm, topN := statistics.CMSketchAndTopNFromProto(jsonIdx.CMSketch) + statsVer := int64(statistics.Version0) + if jsonIdx.StatsVer != nil { + statsVer = *jsonIdx.StatsVer + } else if jsonIdx.Histogram.Ndv > 0 || jsonIdx.NullCount > 0 { + // If the statistics are collected without setting stats version(which happens in v4.0 and earlier versions), + // we set it to 1. + statsVer = int64(statistics.Version1) + } + idx := &statistics.Index{ + Histogram: *hist, + CMSketch: cm, + TopN: topN, + Info: idxInfo, + StatsVer: statsVer, + PhysicalID: physicalID, + StatsLoadedStatus: statistics.NewStatsFullLoadStatus(), + } + tbl.Indices[idx.ID] = idx + } + } + + for id, jsonCol := range jsonTbl.Columns { + for _, colInfo := range tableInfo.Columns { + if colInfo.Name.L != id { + continue + } + hist := statistics.HistogramFromProto(jsonCol.Histogram) + sc := stmtctx.NewStmtCtxWithTimeZone(time.UTC) + tmpFT := colInfo.FieldType + // For new collation data, when storing the bounds of the histogram, we store the collate key instead of the + // original value. + // But there's additional conversion logic for new collation data, and the collate key might be longer than + // the FieldType.flen. + // If we use the original FieldType here, there might be errors like "Invalid utf8mb4 character string" + // or "Data too long". + // So we change it to TypeBlob to bypass those logics here. + if colInfo.FieldType.EvalType() == types.ETString && colInfo.FieldType.GetType() != mysql.TypeEnum && colInfo.FieldType.GetType() != mysql.TypeSet { + tmpFT = *types.NewFieldType(mysql.TypeBlob) + } + hist, err := hist.ConvertTo(sc, &tmpFT) + if err != nil { + return nil, errors.Trace(err) + } + cm, topN := statistics.CMSketchAndTopNFromProto(jsonCol.CMSketch) + fms := statistics.FMSketchFromProto(jsonCol.FMSketch) + hist.ID, hist.NullCount, hist.LastUpdateVersion, hist.TotColSize, hist.Correlation = colInfo.ID, jsonCol.NullCount, jsonCol.LastUpdateVersion, jsonCol.TotColSize, jsonCol.Correlation + statsVer := int64(statistics.Version0) + if jsonCol.StatsVer != nil { + statsVer = *jsonCol.StatsVer + } else if jsonCol.Histogram.Ndv > 0 || jsonCol.NullCount > 0 { + // If the statistics are collected without setting stats version(which happens in v4.0 and earlier versions), + // we set it to 1. + statsVer = int64(statistics.Version1) + } + col := &statistics.Column{ + PhysicalID: physicalID, + Histogram: *hist, + CMSketch: cm, + TopN: topN, + FMSketch: fms, + Info: colInfo, + IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + StatsVer: statsVer, + StatsLoadedStatus: statistics.NewStatsFullLoadStatus(), + } + tbl.Columns[col.ID] = col + } + } + tbl.ExtendedStats = extendedStatsFromJSON(jsonTbl.ExtStats) + return tbl, nil +} + +// JSONTableToBlocks convert JSONTable to json, then compresses it to blocks by gzip. +func JSONTableToBlocks(jsTable *util.JSONTable, blockSize int) ([][]byte, error) { + data, err := json.Marshal(jsTable) + if err != nil { + return nil, errors.Trace(err) + } + var gzippedData bytes.Buffer + gzipWriter := compressutil.GzipWriterPool.Get().(*gzip.Writer) + defer compressutil.GzipWriterPool.Put(gzipWriter) + gzipWriter.Reset(&gzippedData) + if _, err := gzipWriter.Write(data); err != nil { + return nil, errors.Trace(err) + } + if err := gzipWriter.Close(); err != nil { + return nil, errors.Trace(err) + } + blocksNum := gzippedData.Len() / blockSize + if gzippedData.Len()%blockSize != 0 { + blocksNum = blocksNum + 1 + } + blocks := make([][]byte, blocksNum) + for i := 0; i < blocksNum-1; i++ { + blocks[i] = gzippedData.Bytes()[blockSize*i : blockSize*(i+1)] + } + blocks[blocksNum-1] = gzippedData.Bytes()[blockSize*(blocksNum-1):] + return blocks, nil +} + +// BlocksToJSONTable convert gzip-compressed blocks to JSONTable +func BlocksToJSONTable(blocks [][]byte) (*util.JSONTable, error) { + if len(blocks) == 0 { + return nil, errors.New("Block empty error") + } + data := blocks[0] + for i := 1; i < len(blocks); i++ { + data = append(data, blocks[i]...) + } + gzippedData := bytes.NewReader(data) + gzipReader := compressutil.GzipReaderPool.Get().(*gzip.Reader) + if err := gzipReader.Reset(gzippedData); err != nil { + compressutil.GzipReaderPool.Put(gzipReader) + return nil, err + } + defer func() { + compressutil.GzipReaderPool.Put(gzipReader) + }() + if err := gzipReader.Close(); err != nil { + return nil, err + } + jsonStr, err := io.ReadAll(gzipReader) + if err != nil { + return nil, errors.Trace(err) + } + jsonTbl := util.JSONTable{} + err = json.Unmarshal(jsonStr, &jsonTbl) + if err != nil { + return nil, errors.Trace(err) + } + return &jsonTbl, nil +} + +// TableHistoricalStatsToJSON converts the historical stats of a table to JSONTable. +func TableHistoricalStatsToJSON(sctx sessionctx.Context, physicalID int64, snapshot uint64) (jt *util.JSONTable, exist bool, err error) { + if _, err := util.Exec(sctx, "begin"); err != nil { + return nil, false, err + } + defer func() { + err = util.FinishTransaction(sctx, err) + }() + + // get meta version + rows, _, err := util.ExecRows(sctx, "select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot) + if err != nil { + return nil, false, errors.AddStack(err) + } + if len(rows) < 1 { + logutil.BgLogger().Warn("failed to get records of stats_meta_history", + zap.Int64("table-id", physicalID), + zap.Uint64("snapshotTS", snapshot)) + return nil, false, nil + } + statsMetaVersion := rows[0].GetInt64(0) + // get stats meta + rows, _, err = util.ExecRows(sctx, "select modify_count, count from mysql.stats_meta_history where table_id = %? and version = %?", physicalID, statsMetaVersion) + if err != nil { + return nil, false, errors.AddStack(err) + } + modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) + + // get stats version + rows, _, err = util.ExecRows(sctx, "select distinct version from mysql.stats_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot) + if err != nil { + return nil, false, errors.AddStack(err) + } + if len(rows) < 1 { + logutil.BgLogger().Warn("failed to get record of stats_history", + zap.Int64("table-id", physicalID), + zap.Uint64("snapshotTS", snapshot)) + return nil, false, nil + } + statsVersion := rows[0].GetInt64(0) + + // get stats + rows, _, err = util.ExecRows(sctx, "select stats_data from mysql.stats_history where table_id = %? and version = %? order by seq_no", physicalID, statsVersion) + if err != nil { + return nil, false, errors.AddStack(err) + } + blocks := make([][]byte, 0) + for _, row := range rows { + blocks = append(blocks, row.GetBytes(0)) + } + jsonTbl, err := BlocksToJSONTable(blocks) + if err != nil { + return nil, false, errors.AddStack(err) + } + jsonTbl.Count = count + jsonTbl.ModifyCount = modifyCount + jsonTbl.IsHistoricalStats = true + return jsonTbl, true, nil +} diff --git a/pkg/statistics/handle/storage/stats_read_writer.go b/pkg/statistics/handle/storage/stats_read_writer.go new file mode 100644 index 0000000000000..b35879bce7c4e --- /dev/null +++ b/pkg/statistics/handle/storage/stats_read_writer.go @@ -0,0 +1,617 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "fmt" + "runtime" + "sync" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics" + handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/sqlexec" +) + +// statsReadWriter implements the util.StatsReadWriter interface. +type statsReadWriter struct { + statsHandler util.StatsHandle +} + +// NewStatsReadWriter creates a new StatsReadWriter. +func NewStatsReadWriter(statsHandler util.StatsHandle) util.StatsReadWriter { + return &statsReadWriter{statsHandler: statsHandler} +} + +// InsertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value. +// This operation also updates version. +func (s *statsReadWriter) InsertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + } + }() + + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + startTS, err := util.GetStartTS(sctx) + if err != nil { + return errors.Trace(err) + } + + // First of all, we update the version. + _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID) + if err != nil { + return err + } + statsVer = startTS + // If we didn't update anything by last SQL, it means the stats of this table does not exist. + if sctx.GetSessionVars().StmtCtx.AffectedRows() > 0 { + // By this step we can get the count of this table, then we can sure the count and repeats of bucket. + var rs sqlexec.RecordSet + rs, err = util.Exec(sctx, "select count from mysql.stats_meta where table_id = %?", physicalID) + if err != nil { + return err + } + defer terror.Call(rs.Close) + req := rs.NewChunk(nil) + err = rs.Next(context.Background(), req) + if err != nil { + return err + } + count := req.GetRow(0).GetInt64(0) + for _, colInfo := range colInfos { + value := types.NewDatum(colInfo.GetOriginDefaultValue()) + value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, &colInfo.FieldType) + if err != nil { + return err + } + if value.IsNull() { + // If the adding column has default value null, all the existing rows have null value on the newly added column. + if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil { + return err + } + } else { + // If this stats exists, we insert histogram meta first, the distinct_count will always be one. + if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil { + return err + } + value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return err + } + // There must be only one bucket for this new column and the value is the default value. + if _, err := util.Exec(sctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil { + return err + } + } + } + } + return nil + }, util.FlagWrapTxn) +} + +// InsertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the +// new columns and indices which belong to this table. +func (s *statsReadWriter) InsertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + } + }() + + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + startTS, err := util.GetStartTS(sctx) + if err != nil { + return errors.Trace(err) + } + if _, err := util.Exec(sctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil { + return err + } + statsVer = startTS + for _, col := range info.Columns { + if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil { + return err + } + } + for _, idx := range info.Indices { + if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil { + return err + } + } + return nil + }, util.FlagWrapTxn) +} + +// ChangeGlobalStatsID changes the table ID in global-stats to the new table ID. +func (s *statsReadWriter) ChangeGlobalStatsID(from, to int64) (err error) { + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + for _, table := range []string{"stats_meta", "stats_top_n", "stats_fm_sketch", "stats_buckets", "stats_histograms", "column_stats_usage"} { + _, err = util.Exec(sctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from) + if err != nil { + return err + } + } + return nil + }, util.FlagWrapTxn) +} + +// ResetTableStats2KVForDrop resets the count to 0. +func (s *statsReadWriter) ResetTableStats2KVForDrop(physicalID int64) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + } + }() + + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + startTS, err := util.GetStartTS(sctx) + if err != nil { + return errors.Trace(err) + } + if _, err := util.Exec(sctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil { + return err + } + return nil + }, util.FlagWrapTxn) +} + +// UpdateStatsVersion will set statistics version to the newest TS, +// then tidb-server will reload automatic. +func (s *statsReadWriter) UpdateStatsVersion() error { + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + return UpdateStatsVersion(sctx) + }, util.FlagWrapTxn) +} + +// SaveTableStatsToStorage saves the stats of a table to storage. +func (s *statsReadWriter) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveTableStatsToStorage(sctx, results, analyzeSnapshot) + return err + }) + if err == nil && statsVer != 0 { + tableID := results.TableID.GetStatisticsID() + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, true) + } + return err +} + +// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. +func (s *statsReadWriter) StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + count, modifyCount, _, err = StatsMetaCountAndModifyCount(sctx, tableID) + return err + }, util.FlagWrapTxn) + return +} + +// TableStatsFromStorage loads table stats info from storage. +func (s *statsReadWriter) TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + var ok bool + statsTbl, ok = s.statsHandler.Get(physicalID) + if !ok { + statsTbl = nil + } + statsTbl, err = TableStatsFromStorage(sctx, snapshot, tableInfo, physicalID, loadAll, s.statsHandler.Lease(), statsTbl) + return err + }, util.FlagWrapTxn) + return +} + +// SaveStatsToStorage saves the stats to storage. +// If count is negative, both count and modify count would not be used and not be written to the table. Unless, corresponding +// fields in the stats_meta table will be updated. +// TODO: refactor to reduce the number of parameters +func (s *statsReadWriter) SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, + cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveStatsToStorage(sctx, tableID, + count, modifyCount, isIndex, hg, cms, topN, statsVersion, isAnalyzed, updateAnalyzeTime) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) + } + return +} + +// saveMetaToStorage saves stats meta to the storage. +func (s *statsReadWriter) saveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveMetaToStorage(sctx, tableID, count, modifyCount) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) + } + return +} + +// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. +func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = InsertExtendedStats(sctx, s.statsHandler, statsName, colIDs, tp, tableID, ifNotExists) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + } + return +} + +// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. +func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = MarkExtendedStatsDeleted(sctx, s.statsHandler, statsName, tableID, ifExists) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + } + return +} + +// SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. +func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveExtendedStatsToStorage(sctx, tableID, extStats, isLoad) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + } + return +} + +// LoadNeededHistograms will load histograms for those needed columns/indices. +func (s *statsReadWriter) LoadNeededHistograms() (err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch + return LoadNeededHistograms(sctx, s.statsHandler, loadFMSketch) + }, util.FlagWrapTxn) + return err +} + +// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. +func (s *statsReadWriter) ReloadExtendedStatistics() error { + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + tables := make([]*statistics.Table, 0, s.statsHandler.Len()) + for _, tbl := range s.statsHandler.Values() { + t, err := ExtendedStatsFromStorage(sctx, tbl.Copy(), tbl.PhysicalID, true) + if err != nil { + return err + } + tables = append(tables, t) + } + s.statsHandler.UpdateStatsCache(tables, nil) + return nil + }, util.FlagWrapTxn) +} + +// DumpStatsToJSON dumps statistic to json. +func (s *statsReadWriter) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, + historyStatsExec sqlexec.RestrictedSQLExecutor, dumpPartitionStats bool) (*util.JSONTable, error) { + var snapshot uint64 + if historyStatsExec != nil { + sctx := historyStatsExec.(sessionctx.Context) + snapshot = sctx.GetSessionVars().SnapshotTS + } + return s.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot, dumpPartitionStats) +} + +// DumpHistoricalStatsBySnapshot dumped json tables from mysql.stats_meta_history and mysql.stats_history. +// As implemented in getTableHistoricalStatsToJSONWithFallback, if historical stats are nonexistent, it will fall back +// to the latest stats, and these table names (and partition names) will be returned in fallbackTbls. +func (s *statsReadWriter) DumpHistoricalStatsBySnapshot( + dbName string, + tableInfo *model.TableInfo, + snapshot uint64, +) ( + jt *util.JSONTable, + fallbackTbls []string, + err error, +) { + historicalStatsEnabled, err := s.statsHandler.CheckHistoricalStatsEnable() + if err != nil { + return nil, nil, errors.Errorf("check %v failed: %v", variable.TiDBEnableHistoricalStats, err) + } + if !historicalStatsEnabled { + return nil, nil, errors.Errorf("%v should be enabled", variable.TiDBEnableHistoricalStats) + } + + defer func() { + if err == nil { + handle_metrics.DumpHistoricalStatsSuccessCounter.Inc() + } else { + handle_metrics.DumpHistoricalStatsFailedCounter.Inc() + } + }() + pi := tableInfo.GetPartitionInfo() + if pi == nil { + jt, fallback, err := s.getTableHistoricalStatsToJSONWithFallback(dbName, tableInfo, tableInfo.ID, snapshot) + if fallback { + fallbackTbls = append(fallbackTbls, fmt.Sprintf("%s.%s", dbName, tableInfo.Name.O)) + } + return jt, fallbackTbls, err + } + jsonTbl := &util.JSONTable{ + DatabaseName: dbName, + TableName: tableInfo.Name.L, + Partitions: make(map[string]*util.JSONTable, len(pi.Definitions)), + } + for _, def := range pi.Definitions { + tbl, fallback, err := s.getTableHistoricalStatsToJSONWithFallback(dbName, tableInfo, def.ID, snapshot) + if err != nil { + return nil, nil, errors.Trace(err) + } + if fallback { + fallbackTbls = append(fallbackTbls, fmt.Sprintf("%s.%s %s", dbName, tableInfo.Name.O, def.Name.O)) + } + jsonTbl.Partitions[def.Name.L] = tbl + } + tbl, fallback, err := s.getTableHistoricalStatsToJSONWithFallback(dbName, tableInfo, tableInfo.ID, snapshot) + if err != nil { + return nil, nil, err + } + if fallback { + fallbackTbls = append(fallbackTbls, fmt.Sprintf("%s.%s global", dbName, tableInfo.Name.O)) + } + // dump its global-stats if existed + if tbl != nil { + jsonTbl.Partitions[util.TiDBGlobalStats] = tbl + } + return jsonTbl, fallbackTbls, nil +} + +// DumpStatsToJSONBySnapshot dumps statistic to json. +func (s *statsReadWriter) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*util.JSONTable, error) { + pruneMode, err := s.statsHandler.GetCurrentPruneMode() + if err != nil { + return nil, err + } + isDynamicMode := variable.PartitionPruneMode(pruneMode) == variable.Dynamic + pi := tableInfo.GetPartitionInfo() + if pi == nil { + return s.TableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot) + } + jsonTbl := &util.JSONTable{ + DatabaseName: dbName, + TableName: tableInfo.Name.L, + Partitions: make(map[string]*util.JSONTable, len(pi.Definitions)), + } + // dump partition stats only if in static mode or enable dumpPartitionStats flag in dynamic mode + if !isDynamicMode || dumpPartitionStats { + for _, def := range pi.Definitions { + tbl, err := s.TableStatsToJSON(dbName, tableInfo, def.ID, snapshot) + if err != nil { + return nil, errors.Trace(err) + } + if tbl == nil { + continue + } + jsonTbl.Partitions[def.Name.L] = tbl + } + } + // dump its global-stats if existed + tbl, err := s.TableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot) + if err != nil { + return nil, errors.Trace(err) + } + if tbl != nil { + jsonTbl.Partitions[util.TiDBGlobalStats] = tbl + } + return jsonTbl, nil +} + +// getTableHistoricalStatsToJSONWithFallback try to get table historical stats, if not exist, directly fallback to the +// latest stats, and the second return value would be true. +func (s *statsReadWriter) getTableHistoricalStatsToJSONWithFallback( + dbName string, + tableInfo *model.TableInfo, + physicalID int64, + snapshot uint64, +) ( + *util.JSONTable, + bool, + error, +) { + jt, exist, err := s.tableHistoricalStatsToJSON(physicalID, snapshot) + if err != nil { + return nil, false, err + } + if !exist { + jt, err = s.TableStatsToJSON(dbName, tableInfo, physicalID, 0) + fallback := true + if snapshot == 0 { + fallback = false + } + return jt, fallback, err + } + return jt, false, nil +} + +func (s *statsReadWriter) tableHistoricalStatsToJSON(physicalID int64, snapshot uint64) (jt *util.JSONTable, exist bool, err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + jt, exist, err = TableHistoricalStatsToJSON(sctx, physicalID, snapshot) + return err + }, util.FlagWrapTxn) + return +} + +// TableStatsToJSON dumps statistic to json. +func (s *statsReadWriter) TableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*util.JSONTable, error) { + tbl, err := s.TableStatsFromStorage(tableInfo, physicalID, true, snapshot) + if err != nil || tbl == nil { + return nil, err + } + var jsonTbl *util.JSONTable + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + tbl.Version, tbl.ModifyCount, tbl.RealtimeCount, err = StatsMetaByTableIDFromStorage(sctx, physicalID, snapshot) + if err != nil { + return err + } + jsonTbl, err = GenJSONTableFromStats(sctx, dbName, tableInfo, tbl) + return err + }) + if err != nil { + return nil, err + } + return jsonTbl, nil +} + +// TestLoadStatsErr is only for test. +type TestLoadStatsErr struct{} + +// LoadStatsFromJSON will load statistic from JSONTable, and save it to the storage. +// In final, it will also udpate the stats cache. +func (s *statsReadWriter) LoadStatsFromJSON(ctx context.Context, is infoschema.InfoSchema, + jsonTbl *util.JSONTable, concurrencyForPartition uint8) error { + if err := s.LoadStatsFromJSONNoUpdate(ctx, is, jsonTbl, concurrencyForPartition); err != nil { + return errors.Trace(err) + } + return errors.Trace(s.statsHandler.Update(is)) +} + +// LoadStatsFromJSONNoUpdate will load statistic from JSONTable, and save it to the storage. +func (s *statsReadWriter) LoadStatsFromJSONNoUpdate(ctx context.Context, is infoschema.InfoSchema, + jsonTbl *util.JSONTable, concurrencyForPartition uint8) error { + nCPU := uint8(runtime.GOMAXPROCS(0)) + if concurrencyForPartition == 0 { + concurrencyForPartition = nCPU / 2 // default + } + if concurrencyForPartition > nCPU { + concurrencyForPartition = nCPU // for safety + } + + table, err := is.TableByName(model.NewCIStr(jsonTbl.DatabaseName), model.NewCIStr(jsonTbl.TableName)) + if err != nil { + return errors.Trace(err) + } + tableInfo := table.Meta() + pi := tableInfo.GetPartitionInfo() + if pi == nil || jsonTbl.Partitions == nil { + err := s.loadStatsFromJSON(tableInfo, tableInfo.ID, jsonTbl) + if err != nil { + return errors.Trace(err) + } + } else { + // load partition statistics concurrently + taskCh := make(chan model.PartitionDefinition, len(pi.Definitions)) + for _, def := range pi.Definitions { + taskCh <- def + } + close(taskCh) + var wg sync.WaitGroup + e := new(atomic.Pointer[error]) + for i := 0; i < int(concurrencyForPartition); i++ { + wg.Add(1) + s.statsHandler.GPool().Go(func() { + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("%v", r) + e.CompareAndSwap(nil, &err) + } + wg.Done() + }() + + for def := range taskCh { + tbl := jsonTbl.Partitions[def.Name.L] + if tbl == nil { + continue + } + + loadFunc := s.loadStatsFromJSON + if intest.InTest && ctx.Value(TestLoadStatsErr{}) != nil { + loadFunc = ctx.Value(TestLoadStatsErr{}).(func(*model.TableInfo, int64, *util.JSONTable) error) + } + + err := loadFunc(tableInfo, def.ID, tbl) + if err != nil { + e.CompareAndSwap(nil, &err) + return + } + if e.Load() != nil { + return + } + } + }) + } + wg.Wait() + if e.Load() != nil { + return *e.Load() + } + + // load global-stats if existed + if globalStats, ok := jsonTbl.Partitions[util.TiDBGlobalStats]; ok { + if err := s.loadStatsFromJSON(tableInfo, tableInfo.ID, globalStats); err != nil { + return errors.Trace(err) + } + } + } + return nil +} + +func (s *statsReadWriter) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl *util.JSONTable) error { + tbl, err := TableStatsFromJSON(tableInfo, physicalID, jsonTbl) + if err != nil { + return errors.Trace(err) + } + + for _, col := range tbl.Columns { + // loadStatsFromJSON doesn't support partition table now. + // The table level count and modify_count would be overridden by the SaveMetaToStorage below, so we don't need + // to care about them here. + err = s.SaveStatsToStorage(tbl.PhysicalID, tbl.RealtimeCount, 0, 0, &col.Histogram, col.CMSketch, col.TopN, int(col.GetStatsVer()), 1, false, util.StatsMetaHistorySourceLoadStats) + if err != nil { + return errors.Trace(err) + } + } + for _, idx := range tbl.Indices { + // loadStatsFromJSON doesn't support partition table now. + // The table level count and modify_count would be overridden by the SaveMetaToStorage below, so we don't need + // to care about them here. + err = s.SaveStatsToStorage(tbl.PhysicalID, tbl.RealtimeCount, 0, 1, &idx.Histogram, idx.CMSketch, idx.TopN, int(idx.GetStatsVer()), 1, false, util.StatsMetaHistorySourceLoadStats) + if err != nil { + return errors.Trace(err) + } + } + err = s.SaveExtendedStatsToStorage(tbl.PhysicalID, tbl.ExtendedStats, true) + if err != nil { + return errors.Trace(err) + } + return s.saveMetaToStorage(tbl.PhysicalID, tbl.RealtimeCount, tbl.ModifyCount, util.StatsMetaHistorySourceLoadStats) +} diff --git a/pkg/statistics/handle/util/util.go b/pkg/statistics/handle/util/util.go new file mode 100644 index 0000000000000..5ed0da604c73c --- /dev/null +++ b/pkg/statistics/handle/util/util.go @@ -0,0 +1,296 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "strconv" + "time" + + "github.com/ngaut/pools" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/pingcap/tidb/pkg/util/sqlexec/mock" + "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/oracle" +) + +const ( + // StatsMetaHistorySourceAnalyze indicates stats history meta source from analyze + StatsMetaHistorySourceAnalyze = "analyze" + // StatsMetaHistorySourceLoadStats indicates stats history meta source from load stats + StatsMetaHistorySourceLoadStats = "load stats" + // StatsMetaHistorySourceFlushStats indicates stats history meta source from flush stats + StatsMetaHistorySourceFlushStats = "flush stats" + // StatsMetaHistorySourceSchemaChange indicates stats history meta source from schema change + StatsMetaHistorySourceSchemaChange = "schema change" + // StatsMetaHistorySourceExtendedStats indicates stats history meta source from extended stats + StatsMetaHistorySourceExtendedStats = "extended stats" + + // TiDBGlobalStats represents the global-stats for a partitioned table. + TiDBGlobalStats = "global" +) + +var ( + // UseCurrentSessionOpt to make sure the sql is executed in current session. + UseCurrentSessionOpt = []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession} + + // StatsCtx is used to mark the request is from stats module. + StatsCtx = kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) +) + +// SessionPool is used to recycle sessionctx. +type SessionPool interface { + Get() (pools.Resource, error) + Put(pools.Resource) +} + +// FinishTransaction will execute `commit` when error is nil, otherwise `rollback`. +func FinishTransaction(sctx sessionctx.Context, err error) error { + if err == nil { + _, _, err = ExecRows(sctx, "commit") + } else { + _, _, err1 := ExecRows(sctx, "rollback") + terror.Log(errors.Trace(err1)) + } + return errors.Trace(err) +} + +var ( + // FlagWrapTxn indicates whether to wrap a transaction. + FlagWrapTxn = 0 +) + +// CallWithSCtx allocates a sctx from the pool and call the f(). +func CallWithSCtx(pool SessionPool, f func(sctx sessionctx.Context) error, flags ...int) (err error) { + se, err := pool.Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + pool.Put(se) + } + }() + sctx := se.(sessionctx.Context) + if err := UpdateSCtxVarsForStats(sctx); err != nil { // update stats variables automatically + return err + } + + wrapTxn := false + for _, flag := range flags { + if flag == FlagWrapTxn { + wrapTxn = true + } + } + if wrapTxn { + err = WrapTxn(sctx, f) + } else { + err = f(sctx) + } + return err +} + +// UpdateSCtxVarsForStats updates all necessary variables that may affect the behavior of statistics. +func UpdateSCtxVarsForStats(sctx sessionctx.Context) error { + // analyzer version + verInString, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) + if err != nil { + return err + } + ver, err := strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeVersion = int(ver) + + // enable historical stats + val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) + if err != nil { + return err + } + sctx.GetSessionVars().EnableHistoricalStats = variable.TiDBOptOn(val) + + // partition mode + pruneMode, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBPartitionPruneMode) + if err != nil { + return err + } + sctx.GetSessionVars().PartitionPruneMode.Store(pruneMode) + + // enable analyze snapshot + analyzeSnapshot, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAnalyzeSnapshot) + if err != nil { + return err + } + sctx.GetSessionVars().EnableAnalyzeSnapshot = variable.TiDBOptOn(analyzeSnapshot) + + // enable skip column types + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeSkipColumnTypes = variable.ParseAnalyzeSkipColumnTypes(val) + + // skip missing partition stats + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSkipMissingPartitionStats) + if err != nil { + return err + } + sctx.GetSessionVars().SkipMissingPartitionStats = variable.TiDBOptOn(val) + verInString, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMergePartitionStatsConcurrency) + if err != nil { + return err + } + ver, err = strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzePartitionMergeConcurrency = int(ver) + return nil +} + +// WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility. +func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) { + // TODO: check whether this sctx is already in a txn + if _, _, err := ExecRows(sctx, "begin"); err != nil { + return err + } + defer func() { + err = FinishTransaction(sctx, err) + }() + err = f(sctx) + return +} + +// GetStartTS gets the start ts from current transaction. +func GetStartTS(sctx sessionctx.Context) (uint64, error) { + txn, err := sctx.Txn(true) + if err != nil { + return 0, err + } + return txn.StartTS(), nil +} + +// Exec is a helper function to execute sql and return RecordSet. +func Exec(sctx sessionctx.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) { + sqlExec, ok := sctx.(sqlexec.SQLExecutor) + if !ok { + return nil, errors.Errorf("invalid sql executor") + } + // TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor + return sqlExec.ExecuteInternal(StatsCtx, sql, args...) +} + +// ExecRows is a helper function to execute sql and return rows and fields. +func ExecRows(sctx sessionctx.Context, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + if intest.InTest { + if v := sctx.Value(mock.MockRestrictedSQLExecutorKey{}); v != nil { + return v.(*mock.MockRestrictedSQLExecutor).ExecRestrictedSQL(StatsCtx, + UseCurrentSessionOpt, sql, args...) + } + } + + sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + return nil, nil, errors.Errorf("invalid sql executor") + } + return sqlExec.ExecRestrictedSQL(StatsCtx, UseCurrentSessionOpt, sql, args...) +} + +// ExecWithOpts is a helper function to execute sql and return rows and fields. +func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + return nil, nil, errors.Errorf("invalid sql executor") + } + return sqlExec.ExecRestrictedSQL(StatsCtx, opts, sql, args...) +} + +// DurationToTS converts duration to timestamp. +func DurationToTS(d time.Duration) uint64 { + return oracle.ComposeTS(d.Nanoseconds()/int64(time.Millisecond), 0) +} + +// GetFullTableName returns the full table name. +func GetFullTableName(is infoschema.InfoSchema, tblInfo *model.TableInfo) string { + for _, schema := range is.AllSchemas() { + if t, err := is.TableByName(schema.Name, tblInfo.Name); err == nil { + if t.Meta().ID == tblInfo.ID { + return schema.Name.O + "." + tblInfo.Name.O + } + } + } + return strconv.FormatInt(tblInfo.ID, 10) +} + +// JSONTable is used for dumping statistics. +type JSONTable struct { + Columns map[string]*JSONColumn `json:"columns"` + Indices map[string]*JSONColumn `json:"indices"` + Partitions map[string]*JSONTable `json:"partitions"` + DatabaseName string `json:"database_name"` + TableName string `json:"table_name"` + ExtStats []*JSONExtendedStats `json:"ext_stats"` + Count int64 `json:"count"` + ModifyCount int64 `json:"modify_count"` + Version uint64 `json:"version"` + IsHistoricalStats bool `json:"is_historical_stats"` +} + +// JSONExtendedStats is used for dumping extended statistics. +type JSONExtendedStats struct { + StatsName string `json:"stats_name"` + StringVals string `json:"string_vals"` + ColIDs []int64 `json:"cols"` + ScalarVals float64 `json:"scalar_vals"` + Tp uint8 `json:"type"` +} + +// JSONColumn is used for dumping statistics. +type JSONColumn struct { + Histogram *tipb.Histogram `json:"histogram"` + CMSketch *tipb.CMSketch `json:"cm_sketch"` + FMSketch *tipb.FMSketch `json:"fm_sketch"` + // StatsVer is a pointer here since the old version json file would not contain version information. + StatsVer *int64 `json:"stats_ver"` + NullCount int64 `json:"null_count"` + TotColSize int64 `json:"tot_col_size"` + LastUpdateVersion uint64 `json:"last_update_version"` + Correlation float64 `json:"correlation"` +} + +// TotalMemoryUsage returns the total memory usage of this column. +func (col *JSONColumn) TotalMemoryUsage() (size int64) { + if col.Histogram != nil { + size += int64(col.Histogram.Size()) + } + if col.CMSketch != nil { + size += int64(col.CMSketch.Size()) + } + if col.FMSketch != nil { + size += int64(col.FMSketch.Size()) + } + return size +} From 884a144a6b9675a9b3f0f39709ea9322701287e4 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 6 Nov 2023 16:36:12 +0800 Subject: [PATCH 2/3] update Signed-off-by: Weizhen Wang --- pkg/statistics/handle/storage/json.go | 344 ---------- .../handle/storage/stats_read_writer.go | 617 ------------------ pkg/statistics/handle/util/util.go | 296 --------- 3 files changed, 1257 deletions(-) delete mode 100644 pkg/statistics/handle/storage/json.go delete mode 100644 pkg/statistics/handle/storage/stats_read_writer.go delete mode 100644 pkg/statistics/handle/util/util.go diff --git a/pkg/statistics/handle/storage/json.go b/pkg/statistics/handle/storage/json.go deleted file mode 100644 index a174d38aaa566..0000000000000 --- a/pkg/statistics/handle/storage/json.go +++ /dev/null @@ -1,344 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "bytes" - "encoding/json" - "io" - "sync/atomic" - "time" - - "github.com/klauspost/compress/gzip" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" - "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/types" - compressutil "github.com/pingcap/tidb/pkg/util/compress" - "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/memory" - "go.uber.org/zap" -) - -func dumpJSONExtendedStats(statsColl *statistics.ExtendedStatsColl) []*util.JSONExtendedStats { - if statsColl == nil || len(statsColl.Stats) == 0 { - return nil - } - stats := make([]*util.JSONExtendedStats, 0, len(statsColl.Stats)) - for name, item := range statsColl.Stats { - js := &util.JSONExtendedStats{ - StatsName: name, - ColIDs: item.ColIDs, - Tp: item.Tp, - ScalarVals: item.ScalarVals, - StringVals: item.StringVals, - } - stats = append(stats, js) - } - return stats -} - -func extendedStatsFromJSON(statsColl []*util.JSONExtendedStats) *statistics.ExtendedStatsColl { - if len(statsColl) == 0 { - return nil - } - stats := statistics.NewExtendedStatsColl() - for _, js := range statsColl { - item := &statistics.ExtendedStatsItem{ - ColIDs: js.ColIDs, - Tp: js.Tp, - ScalarVals: js.ScalarVals, - StringVals: js.StringVals, - } - stats.Stats[js.StatsName] = item - } - return stats -} - -func dumpJSONCol(hist *statistics.Histogram, cmsketch *statistics.CMSketch, topn *statistics.TopN, fmsketch *statistics.FMSketch, statsVer *int64) *util.JSONColumn { - jsonCol := &util.JSONColumn{ - Histogram: statistics.HistogramToProto(hist), - NullCount: hist.NullCount, - TotColSize: hist.TotColSize, - LastUpdateVersion: hist.LastUpdateVersion, - Correlation: hist.Correlation, - StatsVer: statsVer, - } - if cmsketch != nil || topn != nil { - jsonCol.CMSketch = statistics.CMSketchToProto(cmsketch, topn) - } - if fmsketch != nil { - jsonCol.FMSketch = statistics.FMSketchToProto(fmsketch) - } - return jsonCol -} - -// GenJSONTableFromStats generate jsonTable from tableInfo and stats -func GenJSONTableFromStats(sctx sessionctx.Context, dbName string, tableInfo *model.TableInfo, tbl *statistics.Table) (*util.JSONTable, error) { - tracker := memory.NewTracker(memory.LabelForAnalyzeMemory, -1) - tracker.AttachTo(sctx.GetSessionVars().MemTracker) - defer tracker.Detach() - jsonTbl := &util.JSONTable{ - DatabaseName: dbName, - TableName: tableInfo.Name.L, - Columns: make(map[string]*util.JSONColumn, len(tbl.Columns)), - Indices: make(map[string]*util.JSONColumn, len(tbl.Indices)), - Count: tbl.RealtimeCount, - ModifyCount: tbl.ModifyCount, - Version: tbl.Version, - } - for _, col := range tbl.Columns { - sc := stmtctx.NewStmtCtxWithTimeZone(time.UTC) - hist, err := col.ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) - if err != nil { - return nil, errors.Trace(err) - } - proto := dumpJSONCol(hist, col.CMSketch, col.TopN, col.FMSketch, &col.StatsVer) - tracker.Consume(proto.TotalMemoryUsage()) - if atomic.LoadUint32(&sctx.GetSessionVars().Killed) == 1 { - return nil, errors.Trace(statistics.ErrQueryInterrupted) - } - jsonTbl.Columns[col.Info.Name.L] = proto - col.FMSketch.DestroyAndPutToPool() - } - for _, idx := range tbl.Indices { - proto := dumpJSONCol(&idx.Histogram, idx.CMSketch, idx.TopN, nil, &idx.StatsVer) - tracker.Consume(proto.TotalMemoryUsage()) - if atomic.LoadUint32(&sctx.GetSessionVars().Killed) == 1 { - return nil, errors.Trace(statistics.ErrQueryInterrupted) - } - jsonTbl.Indices[idx.Info.Name.L] = proto - } - jsonTbl.ExtStats = dumpJSONExtendedStats(tbl.ExtendedStats) - return jsonTbl, nil -} - -// TableStatsFromJSON loads statistic from JSONTable and return the Table of statistic. -func TableStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl *util.JSONTable) (*statistics.Table, error) { - newHistColl := statistics.HistColl{ - PhysicalID: physicalID, - HavePhysicalID: true, - RealtimeCount: jsonTbl.Count, - ModifyCount: jsonTbl.ModifyCount, - Columns: make(map[int64]*statistics.Column, len(jsonTbl.Columns)), - Indices: make(map[int64]*statistics.Index, len(jsonTbl.Indices)), - } - tbl := &statistics.Table{ - HistColl: newHistColl, - } - for id, jsonIdx := range jsonTbl.Indices { - for _, idxInfo := range tableInfo.Indices { - if idxInfo.Name.L != id { - continue - } - hist := statistics.HistogramFromProto(jsonIdx.Histogram) - hist.ID, hist.NullCount, hist.LastUpdateVersion, hist.Correlation = idxInfo.ID, jsonIdx.NullCount, jsonIdx.LastUpdateVersion, jsonIdx.Correlation - cm, topN := statistics.CMSketchAndTopNFromProto(jsonIdx.CMSketch) - statsVer := int64(statistics.Version0) - if jsonIdx.StatsVer != nil { - statsVer = *jsonIdx.StatsVer - } else if jsonIdx.Histogram.Ndv > 0 || jsonIdx.NullCount > 0 { - // If the statistics are collected without setting stats version(which happens in v4.0 and earlier versions), - // we set it to 1. - statsVer = int64(statistics.Version1) - } - idx := &statistics.Index{ - Histogram: *hist, - CMSketch: cm, - TopN: topN, - Info: idxInfo, - StatsVer: statsVer, - PhysicalID: physicalID, - StatsLoadedStatus: statistics.NewStatsFullLoadStatus(), - } - tbl.Indices[idx.ID] = idx - } - } - - for id, jsonCol := range jsonTbl.Columns { - for _, colInfo := range tableInfo.Columns { - if colInfo.Name.L != id { - continue - } - hist := statistics.HistogramFromProto(jsonCol.Histogram) - sc := stmtctx.NewStmtCtxWithTimeZone(time.UTC) - tmpFT := colInfo.FieldType - // For new collation data, when storing the bounds of the histogram, we store the collate key instead of the - // original value. - // But there's additional conversion logic for new collation data, and the collate key might be longer than - // the FieldType.flen. - // If we use the original FieldType here, there might be errors like "Invalid utf8mb4 character string" - // or "Data too long". - // So we change it to TypeBlob to bypass those logics here. - if colInfo.FieldType.EvalType() == types.ETString && colInfo.FieldType.GetType() != mysql.TypeEnum && colInfo.FieldType.GetType() != mysql.TypeSet { - tmpFT = *types.NewFieldType(mysql.TypeBlob) - } - hist, err := hist.ConvertTo(sc, &tmpFT) - if err != nil { - return nil, errors.Trace(err) - } - cm, topN := statistics.CMSketchAndTopNFromProto(jsonCol.CMSketch) - fms := statistics.FMSketchFromProto(jsonCol.FMSketch) - hist.ID, hist.NullCount, hist.LastUpdateVersion, hist.TotColSize, hist.Correlation = colInfo.ID, jsonCol.NullCount, jsonCol.LastUpdateVersion, jsonCol.TotColSize, jsonCol.Correlation - statsVer := int64(statistics.Version0) - if jsonCol.StatsVer != nil { - statsVer = *jsonCol.StatsVer - } else if jsonCol.Histogram.Ndv > 0 || jsonCol.NullCount > 0 { - // If the statistics are collected without setting stats version(which happens in v4.0 and earlier versions), - // we set it to 1. - statsVer = int64(statistics.Version1) - } - col := &statistics.Column{ - PhysicalID: physicalID, - Histogram: *hist, - CMSketch: cm, - TopN: topN, - FMSketch: fms, - Info: colInfo, - IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), - StatsVer: statsVer, - StatsLoadedStatus: statistics.NewStatsFullLoadStatus(), - } - tbl.Columns[col.ID] = col - } - } - tbl.ExtendedStats = extendedStatsFromJSON(jsonTbl.ExtStats) - return tbl, nil -} - -// JSONTableToBlocks convert JSONTable to json, then compresses it to blocks by gzip. -func JSONTableToBlocks(jsTable *util.JSONTable, blockSize int) ([][]byte, error) { - data, err := json.Marshal(jsTable) - if err != nil { - return nil, errors.Trace(err) - } - var gzippedData bytes.Buffer - gzipWriter := compressutil.GzipWriterPool.Get().(*gzip.Writer) - defer compressutil.GzipWriterPool.Put(gzipWriter) - gzipWriter.Reset(&gzippedData) - if _, err := gzipWriter.Write(data); err != nil { - return nil, errors.Trace(err) - } - if err := gzipWriter.Close(); err != nil { - return nil, errors.Trace(err) - } - blocksNum := gzippedData.Len() / blockSize - if gzippedData.Len()%blockSize != 0 { - blocksNum = blocksNum + 1 - } - blocks := make([][]byte, blocksNum) - for i := 0; i < blocksNum-1; i++ { - blocks[i] = gzippedData.Bytes()[blockSize*i : blockSize*(i+1)] - } - blocks[blocksNum-1] = gzippedData.Bytes()[blockSize*(blocksNum-1):] - return blocks, nil -} - -// BlocksToJSONTable convert gzip-compressed blocks to JSONTable -func BlocksToJSONTable(blocks [][]byte) (*util.JSONTable, error) { - if len(blocks) == 0 { - return nil, errors.New("Block empty error") - } - data := blocks[0] - for i := 1; i < len(blocks); i++ { - data = append(data, blocks[i]...) - } - gzippedData := bytes.NewReader(data) - gzipReader := compressutil.GzipReaderPool.Get().(*gzip.Reader) - if err := gzipReader.Reset(gzippedData); err != nil { - compressutil.GzipReaderPool.Put(gzipReader) - return nil, err - } - defer func() { - compressutil.GzipReaderPool.Put(gzipReader) - }() - if err := gzipReader.Close(); err != nil { - return nil, err - } - jsonStr, err := io.ReadAll(gzipReader) - if err != nil { - return nil, errors.Trace(err) - } - jsonTbl := util.JSONTable{} - err = json.Unmarshal(jsonStr, &jsonTbl) - if err != nil { - return nil, errors.Trace(err) - } - return &jsonTbl, nil -} - -// TableHistoricalStatsToJSON converts the historical stats of a table to JSONTable. -func TableHistoricalStatsToJSON(sctx sessionctx.Context, physicalID int64, snapshot uint64) (jt *util.JSONTable, exist bool, err error) { - if _, err := util.Exec(sctx, "begin"); err != nil { - return nil, false, err - } - defer func() { - err = util.FinishTransaction(sctx, err) - }() - - // get meta version - rows, _, err := util.ExecRows(sctx, "select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot) - if err != nil { - return nil, false, errors.AddStack(err) - } - if len(rows) < 1 { - logutil.BgLogger().Warn("failed to get records of stats_meta_history", - zap.Int64("table-id", physicalID), - zap.Uint64("snapshotTS", snapshot)) - return nil, false, nil - } - statsMetaVersion := rows[0].GetInt64(0) - // get stats meta - rows, _, err = util.ExecRows(sctx, "select modify_count, count from mysql.stats_meta_history where table_id = %? and version = %?", physicalID, statsMetaVersion) - if err != nil { - return nil, false, errors.AddStack(err) - } - modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) - - // get stats version - rows, _, err = util.ExecRows(sctx, "select distinct version from mysql.stats_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot) - if err != nil { - return nil, false, errors.AddStack(err) - } - if len(rows) < 1 { - logutil.BgLogger().Warn("failed to get record of stats_history", - zap.Int64("table-id", physicalID), - zap.Uint64("snapshotTS", snapshot)) - return nil, false, nil - } - statsVersion := rows[0].GetInt64(0) - - // get stats - rows, _, err = util.ExecRows(sctx, "select stats_data from mysql.stats_history where table_id = %? and version = %? order by seq_no", physicalID, statsVersion) - if err != nil { - return nil, false, errors.AddStack(err) - } - blocks := make([][]byte, 0) - for _, row := range rows { - blocks = append(blocks, row.GetBytes(0)) - } - jsonTbl, err := BlocksToJSONTable(blocks) - if err != nil { - return nil, false, errors.AddStack(err) - } - jsonTbl.Count = count - jsonTbl.ModifyCount = modifyCount - jsonTbl.IsHistoricalStats = true - return jsonTbl, true, nil -} diff --git a/pkg/statistics/handle/storage/stats_read_writer.go b/pkg/statistics/handle/storage/stats_read_writer.go deleted file mode 100644 index b35879bce7c4e..0000000000000 --- a/pkg/statistics/handle/storage/stats_read_writer.go +++ /dev/null @@ -1,617 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "context" - "fmt" - "runtime" - "sync" - "sync/atomic" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/statistics" - handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics" - "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/intest" - "github.com/pingcap/tidb/pkg/util/sqlexec" -) - -// statsReadWriter implements the util.StatsReadWriter interface. -type statsReadWriter struct { - statsHandler util.StatsHandle -} - -// NewStatsReadWriter creates a new StatsReadWriter. -func NewStatsReadWriter(statsHandler util.StatsHandle) util.StatsReadWriter { - return &statsReadWriter{statsHandler: statsHandler} -} - -// InsertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value. -// This operation also updates version. -func (s *statsReadWriter) InsertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) { - statsVer := uint64(0) - defer func() { - if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) - } - }() - - return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - startTS, err := util.GetStartTS(sctx) - if err != nil { - return errors.Trace(err) - } - - // First of all, we update the version. - _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID) - if err != nil { - return err - } - statsVer = startTS - // If we didn't update anything by last SQL, it means the stats of this table does not exist. - if sctx.GetSessionVars().StmtCtx.AffectedRows() > 0 { - // By this step we can get the count of this table, then we can sure the count and repeats of bucket. - var rs sqlexec.RecordSet - rs, err = util.Exec(sctx, "select count from mysql.stats_meta where table_id = %?", physicalID) - if err != nil { - return err - } - defer terror.Call(rs.Close) - req := rs.NewChunk(nil) - err = rs.Next(context.Background(), req) - if err != nil { - return err - } - count := req.GetRow(0).GetInt64(0) - for _, colInfo := range colInfos { - value := types.NewDatum(colInfo.GetOriginDefaultValue()) - value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, &colInfo.FieldType) - if err != nil { - return err - } - if value.IsNull() { - // If the adding column has default value null, all the existing rows have null value on the newly added column. - if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil { - return err - } - } else { - // If this stats exists, we insert histogram meta first, the distinct_count will always be one. - if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil { - return err - } - value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob)) - if err != nil { - return err - } - // There must be only one bucket for this new column and the value is the default value. - if _, err := util.Exec(sctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil { - return err - } - } - } - } - return nil - }, util.FlagWrapTxn) -} - -// InsertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the -// new columns and indices which belong to this table. -func (s *statsReadWriter) InsertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) { - statsVer := uint64(0) - defer func() { - if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) - } - }() - - return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - startTS, err := util.GetStartTS(sctx) - if err != nil { - return errors.Trace(err) - } - if _, err := util.Exec(sctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil { - return err - } - statsVer = startTS - for _, col := range info.Columns { - if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil { - return err - } - } - for _, idx := range info.Indices { - if _, err := util.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil { - return err - } - } - return nil - }, util.FlagWrapTxn) -} - -// ChangeGlobalStatsID changes the table ID in global-stats to the new table ID. -func (s *statsReadWriter) ChangeGlobalStatsID(from, to int64) (err error) { - return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - for _, table := range []string{"stats_meta", "stats_top_n", "stats_fm_sketch", "stats_buckets", "stats_histograms", "column_stats_usage"} { - _, err = util.Exec(sctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from) - if err != nil { - return err - } - } - return nil - }, util.FlagWrapTxn) -} - -// ResetTableStats2KVForDrop resets the count to 0. -func (s *statsReadWriter) ResetTableStats2KVForDrop(physicalID int64) (err error) { - statsVer := uint64(0) - defer func() { - if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) - } - }() - - return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - startTS, err := util.GetStartTS(sctx) - if err != nil { - return errors.Trace(err) - } - if _, err := util.Exec(sctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil { - return err - } - return nil - }, util.FlagWrapTxn) -} - -// UpdateStatsVersion will set statistics version to the newest TS, -// then tidb-server will reload automatic. -func (s *statsReadWriter) UpdateStatsVersion() error { - return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - return UpdateStatsVersion(sctx) - }, util.FlagWrapTxn) -} - -// SaveTableStatsToStorage saves the stats of a table to storage. -func (s *statsReadWriter) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) { - var statsVer uint64 - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - statsVer, err = SaveTableStatsToStorage(sctx, results, analyzeSnapshot) - return err - }) - if err == nil && statsVer != 0 { - tableID := results.TableID.GetStatisticsID() - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, true) - } - return err -} - -// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. -func (s *statsReadWriter) StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) { - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - count, modifyCount, _, err = StatsMetaCountAndModifyCount(sctx, tableID) - return err - }, util.FlagWrapTxn) - return -} - -// TableStatsFromStorage loads table stats info from storage. -func (s *statsReadWriter) TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) { - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - var ok bool - statsTbl, ok = s.statsHandler.Get(physicalID) - if !ok { - statsTbl = nil - } - statsTbl, err = TableStatsFromStorage(sctx, snapshot, tableInfo, physicalID, loadAll, s.statsHandler.Lease(), statsTbl) - return err - }, util.FlagWrapTxn) - return -} - -// SaveStatsToStorage saves the stats to storage. -// If count is negative, both count and modify count would not be used and not be written to the table. Unless, corresponding -// fields in the stats_meta table will be updated. -// TODO: refactor to reduce the number of parameters -func (s *statsReadWriter) SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, - cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) { - var statsVer uint64 - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - statsVer, err = SaveStatsToStorage(sctx, tableID, - count, modifyCount, isIndex, hg, cms, topN, statsVersion, isAnalyzed, updateAnalyzeTime) - return err - }) - if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) - } - return -} - -// saveMetaToStorage saves stats meta to the storage. -func (s *statsReadWriter) saveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) { - var statsVer uint64 - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - statsVer, err = SaveMetaToStorage(sctx, tableID, count, modifyCount) - return err - }) - if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) - } - return -} - -// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. -func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) { - var statsVer uint64 - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - statsVer, err = InsertExtendedStats(sctx, s.statsHandler, statsName, colIDs, tp, tableID, ifNotExists) - return err - }) - if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) - } - return -} - -// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. -func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) { - var statsVer uint64 - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - statsVer, err = MarkExtendedStatsDeleted(sctx, s.statsHandler, statsName, tableID, ifExists) - return err - }) - if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) - } - return -} - -// SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. -func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) { - var statsVer uint64 - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - statsVer, err = SaveExtendedStatsToStorage(sctx, tableID, extStats, isLoad) - return err - }) - if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) - } - return -} - -// LoadNeededHistograms will load histograms for those needed columns/indices. -func (s *statsReadWriter) LoadNeededHistograms() (err error) { - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch - return LoadNeededHistograms(sctx, s.statsHandler, loadFMSketch) - }, util.FlagWrapTxn) - return err -} - -// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. -func (s *statsReadWriter) ReloadExtendedStatistics() error { - return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - tables := make([]*statistics.Table, 0, s.statsHandler.Len()) - for _, tbl := range s.statsHandler.Values() { - t, err := ExtendedStatsFromStorage(sctx, tbl.Copy(), tbl.PhysicalID, true) - if err != nil { - return err - } - tables = append(tables, t) - } - s.statsHandler.UpdateStatsCache(tables, nil) - return nil - }, util.FlagWrapTxn) -} - -// DumpStatsToJSON dumps statistic to json. -func (s *statsReadWriter) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, - historyStatsExec sqlexec.RestrictedSQLExecutor, dumpPartitionStats bool) (*util.JSONTable, error) { - var snapshot uint64 - if historyStatsExec != nil { - sctx := historyStatsExec.(sessionctx.Context) - snapshot = sctx.GetSessionVars().SnapshotTS - } - return s.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot, dumpPartitionStats) -} - -// DumpHistoricalStatsBySnapshot dumped json tables from mysql.stats_meta_history and mysql.stats_history. -// As implemented in getTableHistoricalStatsToJSONWithFallback, if historical stats are nonexistent, it will fall back -// to the latest stats, and these table names (and partition names) will be returned in fallbackTbls. -func (s *statsReadWriter) DumpHistoricalStatsBySnapshot( - dbName string, - tableInfo *model.TableInfo, - snapshot uint64, -) ( - jt *util.JSONTable, - fallbackTbls []string, - err error, -) { - historicalStatsEnabled, err := s.statsHandler.CheckHistoricalStatsEnable() - if err != nil { - return nil, nil, errors.Errorf("check %v failed: %v", variable.TiDBEnableHistoricalStats, err) - } - if !historicalStatsEnabled { - return nil, nil, errors.Errorf("%v should be enabled", variable.TiDBEnableHistoricalStats) - } - - defer func() { - if err == nil { - handle_metrics.DumpHistoricalStatsSuccessCounter.Inc() - } else { - handle_metrics.DumpHistoricalStatsFailedCounter.Inc() - } - }() - pi := tableInfo.GetPartitionInfo() - if pi == nil { - jt, fallback, err := s.getTableHistoricalStatsToJSONWithFallback(dbName, tableInfo, tableInfo.ID, snapshot) - if fallback { - fallbackTbls = append(fallbackTbls, fmt.Sprintf("%s.%s", dbName, tableInfo.Name.O)) - } - return jt, fallbackTbls, err - } - jsonTbl := &util.JSONTable{ - DatabaseName: dbName, - TableName: tableInfo.Name.L, - Partitions: make(map[string]*util.JSONTable, len(pi.Definitions)), - } - for _, def := range pi.Definitions { - tbl, fallback, err := s.getTableHistoricalStatsToJSONWithFallback(dbName, tableInfo, def.ID, snapshot) - if err != nil { - return nil, nil, errors.Trace(err) - } - if fallback { - fallbackTbls = append(fallbackTbls, fmt.Sprintf("%s.%s %s", dbName, tableInfo.Name.O, def.Name.O)) - } - jsonTbl.Partitions[def.Name.L] = tbl - } - tbl, fallback, err := s.getTableHistoricalStatsToJSONWithFallback(dbName, tableInfo, tableInfo.ID, snapshot) - if err != nil { - return nil, nil, err - } - if fallback { - fallbackTbls = append(fallbackTbls, fmt.Sprintf("%s.%s global", dbName, tableInfo.Name.O)) - } - // dump its global-stats if existed - if tbl != nil { - jsonTbl.Partitions[util.TiDBGlobalStats] = tbl - } - return jsonTbl, fallbackTbls, nil -} - -// DumpStatsToJSONBySnapshot dumps statistic to json. -func (s *statsReadWriter) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*util.JSONTable, error) { - pruneMode, err := s.statsHandler.GetCurrentPruneMode() - if err != nil { - return nil, err - } - isDynamicMode := variable.PartitionPruneMode(pruneMode) == variable.Dynamic - pi := tableInfo.GetPartitionInfo() - if pi == nil { - return s.TableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot) - } - jsonTbl := &util.JSONTable{ - DatabaseName: dbName, - TableName: tableInfo.Name.L, - Partitions: make(map[string]*util.JSONTable, len(pi.Definitions)), - } - // dump partition stats only if in static mode or enable dumpPartitionStats flag in dynamic mode - if !isDynamicMode || dumpPartitionStats { - for _, def := range pi.Definitions { - tbl, err := s.TableStatsToJSON(dbName, tableInfo, def.ID, snapshot) - if err != nil { - return nil, errors.Trace(err) - } - if tbl == nil { - continue - } - jsonTbl.Partitions[def.Name.L] = tbl - } - } - // dump its global-stats if existed - tbl, err := s.TableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot) - if err != nil { - return nil, errors.Trace(err) - } - if tbl != nil { - jsonTbl.Partitions[util.TiDBGlobalStats] = tbl - } - return jsonTbl, nil -} - -// getTableHistoricalStatsToJSONWithFallback try to get table historical stats, if not exist, directly fallback to the -// latest stats, and the second return value would be true. -func (s *statsReadWriter) getTableHistoricalStatsToJSONWithFallback( - dbName string, - tableInfo *model.TableInfo, - physicalID int64, - snapshot uint64, -) ( - *util.JSONTable, - bool, - error, -) { - jt, exist, err := s.tableHistoricalStatsToJSON(physicalID, snapshot) - if err != nil { - return nil, false, err - } - if !exist { - jt, err = s.TableStatsToJSON(dbName, tableInfo, physicalID, 0) - fallback := true - if snapshot == 0 { - fallback = false - } - return jt, fallback, err - } - return jt, false, nil -} - -func (s *statsReadWriter) tableHistoricalStatsToJSON(physicalID int64, snapshot uint64) (jt *util.JSONTable, exist bool, err error) { - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - jt, exist, err = TableHistoricalStatsToJSON(sctx, physicalID, snapshot) - return err - }, util.FlagWrapTxn) - return -} - -// TableStatsToJSON dumps statistic to json. -func (s *statsReadWriter) TableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*util.JSONTable, error) { - tbl, err := s.TableStatsFromStorage(tableInfo, physicalID, true, snapshot) - if err != nil || tbl == nil { - return nil, err - } - var jsonTbl *util.JSONTable - err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { - tbl.Version, tbl.ModifyCount, tbl.RealtimeCount, err = StatsMetaByTableIDFromStorage(sctx, physicalID, snapshot) - if err != nil { - return err - } - jsonTbl, err = GenJSONTableFromStats(sctx, dbName, tableInfo, tbl) - return err - }) - if err != nil { - return nil, err - } - return jsonTbl, nil -} - -// TestLoadStatsErr is only for test. -type TestLoadStatsErr struct{} - -// LoadStatsFromJSON will load statistic from JSONTable, and save it to the storage. -// In final, it will also udpate the stats cache. -func (s *statsReadWriter) LoadStatsFromJSON(ctx context.Context, is infoschema.InfoSchema, - jsonTbl *util.JSONTable, concurrencyForPartition uint8) error { - if err := s.LoadStatsFromJSONNoUpdate(ctx, is, jsonTbl, concurrencyForPartition); err != nil { - return errors.Trace(err) - } - return errors.Trace(s.statsHandler.Update(is)) -} - -// LoadStatsFromJSONNoUpdate will load statistic from JSONTable, and save it to the storage. -func (s *statsReadWriter) LoadStatsFromJSONNoUpdate(ctx context.Context, is infoschema.InfoSchema, - jsonTbl *util.JSONTable, concurrencyForPartition uint8) error { - nCPU := uint8(runtime.GOMAXPROCS(0)) - if concurrencyForPartition == 0 { - concurrencyForPartition = nCPU / 2 // default - } - if concurrencyForPartition > nCPU { - concurrencyForPartition = nCPU // for safety - } - - table, err := is.TableByName(model.NewCIStr(jsonTbl.DatabaseName), model.NewCIStr(jsonTbl.TableName)) - if err != nil { - return errors.Trace(err) - } - tableInfo := table.Meta() - pi := tableInfo.GetPartitionInfo() - if pi == nil || jsonTbl.Partitions == nil { - err := s.loadStatsFromJSON(tableInfo, tableInfo.ID, jsonTbl) - if err != nil { - return errors.Trace(err) - } - } else { - // load partition statistics concurrently - taskCh := make(chan model.PartitionDefinition, len(pi.Definitions)) - for _, def := range pi.Definitions { - taskCh <- def - } - close(taskCh) - var wg sync.WaitGroup - e := new(atomic.Pointer[error]) - for i := 0; i < int(concurrencyForPartition); i++ { - wg.Add(1) - s.statsHandler.GPool().Go(func() { - defer func() { - if r := recover(); r != nil { - err := fmt.Errorf("%v", r) - e.CompareAndSwap(nil, &err) - } - wg.Done() - }() - - for def := range taskCh { - tbl := jsonTbl.Partitions[def.Name.L] - if tbl == nil { - continue - } - - loadFunc := s.loadStatsFromJSON - if intest.InTest && ctx.Value(TestLoadStatsErr{}) != nil { - loadFunc = ctx.Value(TestLoadStatsErr{}).(func(*model.TableInfo, int64, *util.JSONTable) error) - } - - err := loadFunc(tableInfo, def.ID, tbl) - if err != nil { - e.CompareAndSwap(nil, &err) - return - } - if e.Load() != nil { - return - } - } - }) - } - wg.Wait() - if e.Load() != nil { - return *e.Load() - } - - // load global-stats if existed - if globalStats, ok := jsonTbl.Partitions[util.TiDBGlobalStats]; ok { - if err := s.loadStatsFromJSON(tableInfo, tableInfo.ID, globalStats); err != nil { - return errors.Trace(err) - } - } - } - return nil -} - -func (s *statsReadWriter) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl *util.JSONTable) error { - tbl, err := TableStatsFromJSON(tableInfo, physicalID, jsonTbl) - if err != nil { - return errors.Trace(err) - } - - for _, col := range tbl.Columns { - // loadStatsFromJSON doesn't support partition table now. - // The table level count and modify_count would be overridden by the SaveMetaToStorage below, so we don't need - // to care about them here. - err = s.SaveStatsToStorage(tbl.PhysicalID, tbl.RealtimeCount, 0, 0, &col.Histogram, col.CMSketch, col.TopN, int(col.GetStatsVer()), 1, false, util.StatsMetaHistorySourceLoadStats) - if err != nil { - return errors.Trace(err) - } - } - for _, idx := range tbl.Indices { - // loadStatsFromJSON doesn't support partition table now. - // The table level count and modify_count would be overridden by the SaveMetaToStorage below, so we don't need - // to care about them here. - err = s.SaveStatsToStorage(tbl.PhysicalID, tbl.RealtimeCount, 0, 1, &idx.Histogram, idx.CMSketch, idx.TopN, int(idx.GetStatsVer()), 1, false, util.StatsMetaHistorySourceLoadStats) - if err != nil { - return errors.Trace(err) - } - } - err = s.SaveExtendedStatsToStorage(tbl.PhysicalID, tbl.ExtendedStats, true) - if err != nil { - return errors.Trace(err) - } - return s.saveMetaToStorage(tbl.PhysicalID, tbl.RealtimeCount, tbl.ModifyCount, util.StatsMetaHistorySourceLoadStats) -} diff --git a/pkg/statistics/handle/util/util.go b/pkg/statistics/handle/util/util.go deleted file mode 100644 index 5ed0da604c73c..0000000000000 --- a/pkg/statistics/handle/util/util.go +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "context" - "strconv" - "time" - - "github.com/ngaut/pools" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/pingcap/tidb/pkg/util/intest" - "github.com/pingcap/tidb/pkg/util/sqlexec" - "github.com/pingcap/tidb/pkg/util/sqlexec/mock" - "github.com/pingcap/tipb/go-tipb" - "github.com/tikv/client-go/v2/oracle" -) - -const ( - // StatsMetaHistorySourceAnalyze indicates stats history meta source from analyze - StatsMetaHistorySourceAnalyze = "analyze" - // StatsMetaHistorySourceLoadStats indicates stats history meta source from load stats - StatsMetaHistorySourceLoadStats = "load stats" - // StatsMetaHistorySourceFlushStats indicates stats history meta source from flush stats - StatsMetaHistorySourceFlushStats = "flush stats" - // StatsMetaHistorySourceSchemaChange indicates stats history meta source from schema change - StatsMetaHistorySourceSchemaChange = "schema change" - // StatsMetaHistorySourceExtendedStats indicates stats history meta source from extended stats - StatsMetaHistorySourceExtendedStats = "extended stats" - - // TiDBGlobalStats represents the global-stats for a partitioned table. - TiDBGlobalStats = "global" -) - -var ( - // UseCurrentSessionOpt to make sure the sql is executed in current session. - UseCurrentSessionOpt = []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession} - - // StatsCtx is used to mark the request is from stats module. - StatsCtx = kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) -) - -// SessionPool is used to recycle sessionctx. -type SessionPool interface { - Get() (pools.Resource, error) - Put(pools.Resource) -} - -// FinishTransaction will execute `commit` when error is nil, otherwise `rollback`. -func FinishTransaction(sctx sessionctx.Context, err error) error { - if err == nil { - _, _, err = ExecRows(sctx, "commit") - } else { - _, _, err1 := ExecRows(sctx, "rollback") - terror.Log(errors.Trace(err1)) - } - return errors.Trace(err) -} - -var ( - // FlagWrapTxn indicates whether to wrap a transaction. - FlagWrapTxn = 0 -) - -// CallWithSCtx allocates a sctx from the pool and call the f(). -func CallWithSCtx(pool SessionPool, f func(sctx sessionctx.Context) error, flags ...int) (err error) { - se, err := pool.Get() - if err != nil { - return err - } - defer func() { - if err == nil { // only recycle when no error - pool.Put(se) - } - }() - sctx := se.(sessionctx.Context) - if err := UpdateSCtxVarsForStats(sctx); err != nil { // update stats variables automatically - return err - } - - wrapTxn := false - for _, flag := range flags { - if flag == FlagWrapTxn { - wrapTxn = true - } - } - if wrapTxn { - err = WrapTxn(sctx, f) - } else { - err = f(sctx) - } - return err -} - -// UpdateSCtxVarsForStats updates all necessary variables that may affect the behavior of statistics. -func UpdateSCtxVarsForStats(sctx sessionctx.Context) error { - // analyzer version - verInString, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) - if err != nil { - return err - } - ver, err := strconv.ParseInt(verInString, 10, 64) - if err != nil { - return err - } - sctx.GetSessionVars().AnalyzeVersion = int(ver) - - // enable historical stats - val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) - if err != nil { - return err - } - sctx.GetSessionVars().EnableHistoricalStats = variable.TiDBOptOn(val) - - // partition mode - pruneMode, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBPartitionPruneMode) - if err != nil { - return err - } - sctx.GetSessionVars().PartitionPruneMode.Store(pruneMode) - - // enable analyze snapshot - analyzeSnapshot, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAnalyzeSnapshot) - if err != nil { - return err - } - sctx.GetSessionVars().EnableAnalyzeSnapshot = variable.TiDBOptOn(analyzeSnapshot) - - // enable skip column types - val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) - if err != nil { - return err - } - sctx.GetSessionVars().AnalyzeSkipColumnTypes = variable.ParseAnalyzeSkipColumnTypes(val) - - // skip missing partition stats - val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSkipMissingPartitionStats) - if err != nil { - return err - } - sctx.GetSessionVars().SkipMissingPartitionStats = variable.TiDBOptOn(val) - verInString, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMergePartitionStatsConcurrency) - if err != nil { - return err - } - ver, err = strconv.ParseInt(verInString, 10, 64) - if err != nil { - return err - } - sctx.GetSessionVars().AnalyzePartitionMergeConcurrency = int(ver) - return nil -} - -// WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility. -func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) { - // TODO: check whether this sctx is already in a txn - if _, _, err := ExecRows(sctx, "begin"); err != nil { - return err - } - defer func() { - err = FinishTransaction(sctx, err) - }() - err = f(sctx) - return -} - -// GetStartTS gets the start ts from current transaction. -func GetStartTS(sctx sessionctx.Context) (uint64, error) { - txn, err := sctx.Txn(true) - if err != nil { - return 0, err - } - return txn.StartTS(), nil -} - -// Exec is a helper function to execute sql and return RecordSet. -func Exec(sctx sessionctx.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) { - sqlExec, ok := sctx.(sqlexec.SQLExecutor) - if !ok { - return nil, errors.Errorf("invalid sql executor") - } - // TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor - return sqlExec.ExecuteInternal(StatsCtx, sql, args...) -} - -// ExecRows is a helper function to execute sql and return rows and fields. -func ExecRows(sctx sessionctx.Context, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { - if intest.InTest { - if v := sctx.Value(mock.MockRestrictedSQLExecutorKey{}); v != nil { - return v.(*mock.MockRestrictedSQLExecutor).ExecRestrictedSQL(StatsCtx, - UseCurrentSessionOpt, sql, args...) - } - } - - sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) - if !ok { - return nil, nil, errors.Errorf("invalid sql executor") - } - return sqlExec.ExecRestrictedSQL(StatsCtx, UseCurrentSessionOpt, sql, args...) -} - -// ExecWithOpts is a helper function to execute sql and return rows and fields. -func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { - sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) - if !ok { - return nil, nil, errors.Errorf("invalid sql executor") - } - return sqlExec.ExecRestrictedSQL(StatsCtx, opts, sql, args...) -} - -// DurationToTS converts duration to timestamp. -func DurationToTS(d time.Duration) uint64 { - return oracle.ComposeTS(d.Nanoseconds()/int64(time.Millisecond), 0) -} - -// GetFullTableName returns the full table name. -func GetFullTableName(is infoschema.InfoSchema, tblInfo *model.TableInfo) string { - for _, schema := range is.AllSchemas() { - if t, err := is.TableByName(schema.Name, tblInfo.Name); err == nil { - if t.Meta().ID == tblInfo.ID { - return schema.Name.O + "." + tblInfo.Name.O - } - } - } - return strconv.FormatInt(tblInfo.ID, 10) -} - -// JSONTable is used for dumping statistics. -type JSONTable struct { - Columns map[string]*JSONColumn `json:"columns"` - Indices map[string]*JSONColumn `json:"indices"` - Partitions map[string]*JSONTable `json:"partitions"` - DatabaseName string `json:"database_name"` - TableName string `json:"table_name"` - ExtStats []*JSONExtendedStats `json:"ext_stats"` - Count int64 `json:"count"` - ModifyCount int64 `json:"modify_count"` - Version uint64 `json:"version"` - IsHistoricalStats bool `json:"is_historical_stats"` -} - -// JSONExtendedStats is used for dumping extended statistics. -type JSONExtendedStats struct { - StatsName string `json:"stats_name"` - StringVals string `json:"string_vals"` - ColIDs []int64 `json:"cols"` - ScalarVals float64 `json:"scalar_vals"` - Tp uint8 `json:"type"` -} - -// JSONColumn is used for dumping statistics. -type JSONColumn struct { - Histogram *tipb.Histogram `json:"histogram"` - CMSketch *tipb.CMSketch `json:"cm_sketch"` - FMSketch *tipb.FMSketch `json:"fm_sketch"` - // StatsVer is a pointer here since the old version json file would not contain version information. - StatsVer *int64 `json:"stats_ver"` - NullCount int64 `json:"null_count"` - TotColSize int64 `json:"tot_col_size"` - LastUpdateVersion uint64 `json:"last_update_version"` - Correlation float64 `json:"correlation"` -} - -// TotalMemoryUsage returns the total memory usage of this column. -func (col *JSONColumn) TotalMemoryUsage() (size int64) { - if col.Histogram != nil { - size += int64(col.Histogram.Size()) - } - if col.CMSketch != nil { - size += int64(col.CMSketch.Size()) - } - if col.FMSketch != nil { - size += int64(col.FMSketch.Size()) - } - return size -} From ed4be154870d57b4089b218de284c56eca7f480b Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 6 Nov 2023 16:47:16 +0800 Subject: [PATCH 3/3] update Signed-off-by: Weizhen Wang --- statistics/handle/dump.go | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 8448f1a2f0fc4..acc090215150a 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -19,6 +19,7 @@ import ( "compress/gzip" "encoding/json" "io" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -32,6 +33,7 @@ import ( handle_metrics "github.com/pingcap/tidb/statistics/handle/metrics" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" @@ -106,6 +108,20 @@ type jsonColumn struct { StatsVer *int64 `json:"stats_ver"` } +// TotalMemoryUsage returns the total memory usage of this column. +func (col *jsonColumn) TotalMemoryUsage() (size int64) { + if col.Histogram != nil { + size += int64(col.Histogram.Size()) + } + if col.CMSketch != nil { + size += int64(col.CMSketch.Size()) + } + if col.FMSketch != nil { + size += int64(col.FMSketch.Size()) + } + return size +} + func dumpJSONCol(hist *statistics.Histogram, CMSketch *statistics.CMSketch, topn *statistics.TopN, FMSketch *statistics.FMSketch, statsVer *int64) *jsonColumn { jsonCol := &jsonColumn{ Histogram: statistics.HistogramToProto(hist), @@ -218,7 +234,10 @@ func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.Table } // GenJSONTableFromStats generate jsonTable from tableInfo and stats -func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *statistics.Table) (*JSONTable, error) { +func GenJSONTableFromStats(sctx sessionctx.Context, dbName string, tableInfo *model.TableInfo, tbl *statistics.Table) (*JSONTable, error) { + tracker := memory.NewTracker(memory.LabelForAnalyzeMemory, -1) + tracker.AttachTo(sctx.GetSessionVars().MemTracker) + defer tracker.Detach() jsonTbl := &JSONTable{ DatabaseName: dbName, TableName: tableInfo.Name.L, @@ -234,11 +253,21 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati if err != nil { return nil, errors.Trace(err) } - jsonTbl.Columns[col.Info.Name.L] = dumpJSONCol(hist, col.CMSketch, col.TopN, col.FMSketch, &col.StatsVer) + proto := dumpJSONCol(hist, col.CMSketch, col.TopN, col.FMSketch, &col.StatsVer) + tracker.Consume(proto.TotalMemoryUsage()) + if atomic.LoadUint32(&sctx.GetSessionVars().Killed) == 1 { + return nil, errors.Trace(statistics.ErrQueryInterrupted) + } + jsonTbl.Columns[col.Info.Name.L] = proto } for _, idx := range tbl.Indices { - jsonTbl.Indices[idx.Info.Name.L] = dumpJSONCol(&idx.Histogram, idx.CMSketch, idx.TopN, nil, &idx.StatsVer) + proto := dumpJSONCol(&idx.Histogram, idx.CMSketch, idx.TopN, nil, &idx.StatsVer) + tracker.Consume(proto.TotalMemoryUsage()) + if atomic.LoadUint32(&sctx.GetSessionVars().Killed) == 1 { + return nil, errors.Trace(statistics.ErrQueryInterrupted) + } + jsonTbl.Indices[idx.Info.Name.L] = proto } jsonTbl.ExtStats = dumpJSONExtendedStats(tbl.ExtendedStats) return jsonTbl, nil @@ -328,7 +357,9 @@ func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, phy if err != nil { return nil, err } - jsonTbl, err := GenJSONTableFromStats(dbName, tableInfo, tbl) + h.mu.Lock() + defer h.mu.Unlock() + jsonTbl, err := GenJSONTableFromStats(h.mu.ctx, dbName, tableInfo, tbl) if err != nil { return nil, err }