diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 310de50149eeb..75d8c62518686 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -230,10 +230,9 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.Request.TaskID = sv.StmtCtx.TaskID builder.Request.Priority = builder.getKVPriority(sv) builder.Request.ReplicaRead = sv.GetReplicaRead() - if sv.SnapshotInfoschema != nil { - builder.Request.SchemaVar = infoschema.GetInfoSchemaBySessionVars(sv).SchemaMetaVersion() - } else { - builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion + // in tests, it may be null + if is, ok := sv.GetInfoSchema().(infoschema.InfoSchema); ok { + builder.Request.SchemaVar = is.SchemaMetaVersion() } builder.txnScope = sv.TxnCtx.TxnScope builder.IsStaleness = sv.TxnCtx.IsStaleness diff --git a/executor/adapter.go b/executor/adapter.go index 066a1c81aeb97..8808fa5b614a8 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -268,7 +268,7 @@ func (a *ExecStmt) IsReadOnly(vars *variable.SessionVars) bool { // RebuildPlan rebuilds current execute statement plan. // It returns the current information schema version that 'a' is using. func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { - is := infoschema.GetInfoSchema(a.Ctx) + is := a.Ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) a.InfoSchema = is if err := plannercore.Preprocess(a.Ctx, a.StmtNode, is, plannercore.InTxnRetry); err != nil { return 0, err diff --git a/executor/analyze.go b/executor/analyze.go index 923e8e12b27a6..201e48926bc73 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -170,7 +170,11 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error { } if needGlobalStats { for globalStatsID, info := range globalStatsMap { +<<<<<<< HEAD globalStats, err := statsHandle.MergePartitionStats2GlobalStats(e.ctx, e.opts, infoschema.GetInfoSchema(e.ctx), globalStatsID.tableID, info.isIndex, info.idxID) +======= + globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(e.ctx, e.opts, e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema), globalStatsID.tableID, info.isIndex, info.idxID) +>>>>>>> 5e9e0e6e3... *: consitent get infoschema (#24230) if err != nil { if types.ErrPartitionStatsMissing.Equal(err) { // When we find some partition-level stats are missing, we need to report warning. @@ -188,7 +192,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error { } } } - return statsHandle.Update(infoschema.GetInfoSchema(e.ctx)) + return statsHandle.Update(e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)) } func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) { diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 30d5509f84c23..a63af80062f42 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -68,7 +68,7 @@ PARTITION BY RANGE ( a ) ( } tk.MustExec("analyze table t") - is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context)) + is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) pi := table.Meta().GetPartitionInfo() @@ -95,7 +95,7 @@ PARTITION BY RANGE ( a ) ( tk.MustExec(fmt.Sprintf(`insert into t values (%d, %d, "hello")`, i, i)) } tk.MustExec("alter table t analyze partition p0") - is = infoschema.GetInfoSchema(tk.Se.(sessionctx.Context)) + is = tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) table, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) pi = table.Meta().GetPartitionInfo() @@ -175,7 +175,7 @@ func (s *testSuite1) TestAnalyzeParameters(c *C) { tk.MustExec("set @@tidb_enable_fast_analyze = 1") tk.MustExec("analyze table t with 30 samples") - is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context)) + is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) tableInfo := table.Meta() @@ -226,7 +226,7 @@ func (s *testSuite1) TestAnalyzeTooLongColumns(c *C) { tk.MustExec(fmt.Sprintf("insert into t values ('%s')", value)) tk.MustExec("analyze table t") - is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context)) + is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) tableInfo := table.Meta() @@ -258,7 +258,7 @@ func (s *testSuite1) TestAnalyzeIndexExtractTopN(c *C) { tk.MustExec("set @@session.tidb_analyze_version=2") tk.MustExec("analyze table t with 10 cmsketch width") - is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context)) + is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) tableInfo := table.Meta() @@ -434,7 +434,7 @@ func (s *testFastAnalyze) TestFastAnalyze(c *C) { } tk.MustExec("analyze table t with 5 buckets, 6 samples") - is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context)) + is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) tableInfo := table.Meta() diff --git a/executor/builder.go b/executor/builder.go index d3793cac94f22..89f3970efe06a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3418,7 +3418,7 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)). + SetFromInfoSchema(e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)). Build() if err != nil { return nil, err diff --git a/executor/compiler.go b/executor/compiler.go index bb0f5274a159e..bb00bfe14602d 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -53,7 +53,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm ctx = opentracing.ContextWithSpan(ctx, span1) } - infoSchema := infoschema.GetInfoSchema(c.Ctx) + infoSchema := c.Ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) if err := plannercore.Preprocess(c.Ctx, stmtNode, infoSchema); err != nil { return nil, err } diff --git a/executor/coprocessor.go b/executor/coprocessor.go index 25959e5454655..490b981add461 100644 --- a/executor/coprocessor.go +++ b/executor/coprocessor.go @@ -159,7 +159,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec return nil, errors.Trace(err) } h.dagReq = dagReq - is := h.sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema) + is := h.sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) // Build physical plan. bp := core.NewPBPlanBuilder(h.sctx, is, req.Ranges) plan, err := bp.Build(dagReq.Executors) diff --git a/executor/distsql.go b/executor/distsql.go index 316697230f2db..fa2e7793c01db 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -277,16 +277,19 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) var builder distsql.RequestBuilder - kvReq, err := builder.SetKeyRanges(kvRanges). + builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). SetStartTS(e.startTS). SetDesc(e.desc). SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.memTracker). - SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)). - Build() + SetMemTracker(e.memTracker) + // for tests, infoschema may be null + if is, ok := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema); ok { + builder.SetFromInfoSchema(is) + } + kvReq, err := builder.Build() if err != nil { e.feedback.Invalidate() return err @@ -510,9 +513,70 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k if err != nil { e.feedback.Invalidate() } +<<<<<<< HEAD cancel() if err := result.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) +======= + var builder distsql.RequestBuilder + builder.SetDAGRequest(e.dagPB). + SetStartTS(e.startTS). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetStreaming(e.indexStreaming). + SetFromSessionVars(e.ctx.GetSessionVars()). + SetMemTracker(tracker) + // for tests, infoschema may be null + if is, ok := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema); ok { + builder.SetFromInfoSchema(is) + } + + for partTblIdx, kvRange := range kvRanges { + // check if executor is closed + finished := false + select { + case <-e.finished: + finished = true + default: + } + if finished { + break + } + + // init kvReq, result and worker for this partition + kvReq, err := builder.SetKeyRanges(kvRange).Build() + if err != nil { + worker.syncErr(err) + break + } + result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans), idxID) + if err != nil { + worker.syncErr(err) + break + } + worker.batchSize = initBatchSize + if worker.batchSize > worker.maxBatchSize { + worker.batchSize = worker.maxBatchSize + } + if e.partitionTableMode { + worker.partitionTable = e.prunedPartitions[partTblIdx] + } + + // fetch data from this partition + ctx1, cancel := context.WithCancel(ctx) + _, fetchErr := worker.fetchHandles(ctx1, result) + if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again + e.feedback.Invalidate() + } + cancel() + if err := result.Close(); err != nil { + logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) + } + e.ctx.StoreQueryFeedback(e.feedback) + if fetchErr != nil { + break // if any error occurs, exit after releasing all resources + } +>>>>>>> 5e9e0e6e3... *: consitent get infoschema (#24230) } e.ctx.StoreQueryFeedback(e.feedback) close(workCh) diff --git a/executor/executor_test.go b/executor/executor_test.go index e556a6dd94933..d6b0d305ea783 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2335,7 +2335,7 @@ func (s *testSuiteP2) TestIsPointGet(c *C) { "select * from help_topic where help_topic_id=1": true, "select * from help_topic where help_category_id=1": false, } - infoSchema := infoschema.GetInfoSchema(ctx) + infoSchema := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) for sqlStr, result := range tests { stmtNode, err := s.ParseOneStmt(sqlStr, "", "") @@ -2367,7 +2367,7 @@ func (s *testSuiteP2) TestClusteredIndexIsPointGet(c *C) { "select * from t where a='x' and c='x'": true, "select * from t where a='x' and c='x' and b=1": false, } - infoSchema := infoschema.GetInfoSchema(ctx) + infoSchema := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) for sqlStr, result := range tests { stmtNode, err := s.ParseOneStmt(sqlStr, "", "") c.Check(err, IsNil) diff --git a/executor/grant.go b/executor/grant.go index 06cb3811481fd..e46534f0e6791 100644 --- a/executor/grant.go +++ b/executor/grant.go @@ -72,7 +72,7 @@ func (e *GrantExec) Next(ctx context.Context, req *chunk.Chunk) error { // Make sure the table exist. if e.Level.Level == ast.GrantLevelTable { dbNameStr := model.NewCIStr(dbName) - schema := infoschema.GetInfoSchema(e.ctx) + schema := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) tbl, err := schema.TableByName(dbNameStr, model.NewCIStr(e.Level.TableName)) if err != nil { return err diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index bc3109179c7c2..01e8871cef6b4 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -240,7 +240,71 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, var err error util.WithRecovery( func() { +<<<<<<< HEAD _, err = worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) +======= + var builder distsql.RequestBuilder + builder.SetDAGRequest(e.dagPBs[workID]). + SetStartTS(e.startTS). + SetDesc(e.descs[workID]). + SetKeepOrder(false). + SetStreaming(e.partialStreamings[workID]). + SetFromSessionVars(e.ctx.GetSessionVars()). + SetMemTracker(e.memTracker). + SetFromInfoSchema(e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)) + + worker := &partialIndexWorker{ + stats: e.stats, + idxID: e.getPartitalPlanID(workID), + sc: e.ctx, + batchSize: e.maxChunkSize, + maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, + maxChunkSize: e.maxChunkSize, + } + + for parTblIdx, keyRange := range keyRanges { + // check if this executor is closed + select { + case <-e.finished: + break + default: + } + + // init kvReq and worker for this partition + kvReq, err := builder.SetKeyRanges(keyRange).Build() + if err != nil { + worker.syncErr(e.resultCh, err) + return + } + result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID)) + if err != nil { + worker.syncErr(e.resultCh, err) + return + } + worker.batchSize = e.maxChunkSize + if worker.batchSize > worker.maxBatchSize { + worker.batchSize = worker.maxBatchSize + } + if e.partitionTableMode { + worker.partition = e.prunedPartitions[parTblIdx] + } + + // fetch all data from this partition + ctx1, cancel := context.WithCancel(ctx) + _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) + if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again + e.feedbacks[workID].Invalidate() + } + if err := result.Close(); err != nil { + logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) + } + cancel() + e.ctx.StoreQueryFeedback(e.feedbacks[workID]) + if fetchErr != nil { + break + } + } +>>>>>>> 5e9e0e6e3... *: consitent get infoschema (#24230) }, e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialIndexWorker"), ) diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index fdd0e843b556f..10fa0e536d6e1 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -77,7 +77,7 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex // Cache the ret full rows in schemataRetriever if !e.initialized { - is := infoschema.GetInfoSchema(sctx) + is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) dbs := is.AllSchemas() sort.Sort(infoschema.SchemasSorter(dbs)) var err error @@ -293,7 +293,7 @@ func (c *statsCache) get(ctx sessionctx.Context) (map[int64]uint64, map[tableHis } func getAutoIncrementID(ctx sessionctx.Context, schema *model.DBInfo, tblInfo *model.TableInfo) (int64, error) { - is := infoschema.GetInfoSchema(ctx) + is := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) tbl, err := is.TableByName(schema.Name, tblInfo.Name) if err != nil { return 0, err @@ -581,7 +581,7 @@ func (e *hugeMemTableRetriever) setDataForColumns(ctx context.Context, sctx sess } func (e *hugeMemTableRetriever) dataForColumnsInTable(ctx context.Context, sctx sessionctx.Context, schema *model.DBInfo, tbl *model.TableInfo) { - if err := tryFillViewColumnType(ctx, sctx, infoschema.GetInfoSchema(sctx), schema.Name, tbl); err != nil { + if err := tryFillViewColumnType(ctx, sctx, sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema), schema.Name, tbl); err != nil { sctx.GetSessionVars().StmtCtx.AppendWarning(err) return } @@ -1307,7 +1307,7 @@ func (e *memtableRetriever) setDataForTiKVRegionStatus(ctx sessionctx.Context) e if err != nil { return err } - allSchemas := ctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema).AllSchemas() + allSchemas := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema).AllSchemas() tableInfos := tikvHelper.GetRegionsTableInfo(regionsInfo, allSchemas) for _, region := range regionsInfo.Regions { tableList := tableInfos[region.ID] @@ -1419,7 +1419,7 @@ func (e *memtableRetriever) setDataForTiDBHotRegions(ctx sessionctx.Context) err if !ok { return errors.New("Information about hot region can be gotten only when the storage is TiKV") } - allSchemas := ctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema).AllSchemas() + allSchemas := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema).AllSchemas() tikvHelper := &helper.Helper{ Store: tikvStore, RegionCache: tikvStore.GetRegionCache(), @@ -1568,7 +1568,7 @@ type initialTable struct { } func (e *tableStorageStatsRetriever) initialize(sctx sessionctx.Context) error { - is := infoschema.GetInfoSchema(sctx) + is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) var databases []string schemas := e.extractor.TableSchema tables := e.extractor.TableName @@ -1848,7 +1848,7 @@ func (e *memtableRetriever) setDataForStatementsSummary(ctx sessionctx.Context, func (e *memtableRetriever) setDataForPlacementPolicy(ctx sessionctx.Context) error { checker := privilege.GetPrivilegeManager(ctx) - is := infoschema.GetInfoSchema(ctx) + is := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) var rows [][]types.Datum for _, bundle := range is.RuleBundles() { id, err := placement.ObjectIDFromGroupID(bundle.ID) @@ -1985,7 +1985,7 @@ func (e *hugeMemTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co } if !e.initialized { - is := infoschema.GetInfoSchema(sctx) + is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) dbs := is.AllSchemas() sort.Sort(infoschema.SchemasSorter(dbs)) e.dbs = dbs diff --git a/executor/load_stats.go b/executor/load_stats.go index 83fbb3ad188f7..984f649e1291f 100644 --- a/executor/load_stats.go +++ b/executor/load_stats.go @@ -86,5 +86,5 @@ func (e *LoadStatsInfo) Update(data []byte) error { if h == nil { return errors.New("Load Stats: handle is nil") } - return h.LoadStatsFromJSON(infoschema.GetInfoSchema(e.Ctx), jsonTbl) + return h.LoadStatsFromJSON(e.Ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema), jsonTbl) } diff --git a/executor/metrics_reader_test.go b/executor/metrics_reader_test.go index 19000b4faee5d..662c3b917ce9c 100644 --- a/executor/metrics_reader_test.go +++ b/executor/metrics_reader_test.go @@ -62,7 +62,7 @@ func (s *testSuite7) TestStmtLabel(c *C) { for _, tt := range tests { stmtNode, err := parser.New().ParseOneStmt(tt.sql, "", "") c.Check(err, IsNil) - is := infoschema.GetInfoSchema(tk.Se) + is := tk.Se.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) err = plannercore.Preprocess(tk.Se.(sessionctx.Context), stmtNode, is) c.Assert(err, IsNil) _, _, err = planner.Optimize(context.TODO(), tk.Se, stmtNode, is) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 5abbfe2336757..31e3278d4eb29 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -204,7 +204,7 @@ func (s *partitionTableSuite) TestPartitionInfoDisable(c *C) { PARTITION p202010 VALUES LESS THAN ("2020-11-01"), PARTITION p202011 VALUES LESS THAN ("2020-12-01") )`) - is := infoschema.GetInfoSchema(tk.Se) + is := tk.Se.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t_info_null")) c.Assert(err, IsNil) diff --git a/executor/point_get.go b/executor/point_get.go index 704f3b6ba5fc2..01c8ab9198b3a 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -397,7 +397,7 @@ func (e *PointGetExecutor) verifyTxnScope() error { var tblID int64 var tblName string var partName string - is := infoschema.GetInfoSchema(e.ctx) + is := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) if e.partInfo != nil { tblID = e.partInfo.ID tblInfo, _, partInfo := is.FindTableByPartitionID(tblID) diff --git a/executor/prepared.go b/executor/prepared.go index c5fdd5c1bf404..448ee3b7fdc66 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -320,7 +320,7 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context, return nil, false, false, err } execStmt.BinaryArgs = args - is := infoschema.GetInfoSchema(sctx) + is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is) if err != nil { return nil, false, false, err diff --git a/executor/simple.go b/executor/simple.go index f0d3135d21e6e..6bfe4ca80d37f 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -1367,7 +1367,7 @@ func (e *SimpleExec) executeDropStats(s *ast.DropStatsStmt) (err error) { if err := h.DeleteTableStatsFromKV(statsIDs); err != nil { return err } - return h.Update(infoschema.GetInfoSchema(e.ctx)) + return h.Update(e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)) } func (e *SimpleExec) autoNewTxn() bool { diff --git a/executor/table_reader.go b/executor/table_reader.go index dea1d128559a5..5ca29092c8e4e 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -221,7 +221,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra } else { reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback) } - kvReq, err := reqBuilder. + reqBuilder. SetDAGRequest(e.dagPB). SetStartTS(e.startTS). SetDesc(e.desc). @@ -230,9 +230,12 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra SetFromSessionVars(e.ctx.GetSessionVars()). SetMemTracker(e.memTracker). SetStoreType(e.storeType). - SetAllowBatchCop(e.batchCop). - SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)). - Build() + SetAllowBatchCop(e.batchCop) + // infoschema maybe null for tests + if is, ok := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema); ok { + reqBuilder.SetFromInfoSchema(is) + } + kvReq, err := reqBuilder.Build() if err != nil { return nil, err } diff --git a/expression/builtin_info.go b/expression/builtin_info.go index 6a41b20ef75af..fda57a884f1d8 100644 --- a/expression/builtin_info.go +++ b/expression/builtin_info.go @@ -847,7 +847,7 @@ func (b *builtinNextValSig) evalInt(row chunk.Row) (int64, bool, error) { db = b.ctx.GetSessionVars().CurrentDB } // Check the tableName valid. - sequence, err := b.ctx.GetSessionVars().TxnCtx.InfoSchema.(util.SequenceSchema).SequenceByName(model.NewCIStr(db), model.NewCIStr(seq)) + sequence, err := b.ctx.GetSessionVars().GetInfoSchema().(util.SequenceSchema).SequenceByName(model.NewCIStr(db), model.NewCIStr(seq)) if err != nil { return 0, false, err } @@ -903,7 +903,7 @@ func (b *builtinLastValSig) evalInt(row chunk.Row) (int64, bool, error) { db = b.ctx.GetSessionVars().CurrentDB } // Check the tableName valid. - sequence, err := b.ctx.GetSessionVars().TxnCtx.InfoSchema.(util.SequenceSchema).SequenceByName(model.NewCIStr(db), model.NewCIStr(seq)) + sequence, err := b.ctx.GetSessionVars().GetInfoSchema().(util.SequenceSchema).SequenceByName(model.NewCIStr(db), model.NewCIStr(seq)) if err != nil { return 0, false, err } @@ -953,7 +953,7 @@ func (b *builtinSetValSig) evalInt(row chunk.Row) (int64, bool, error) { db = b.ctx.GetSessionVars().CurrentDB } // Check the tableName valid. - sequence, err := b.ctx.GetSessionVars().TxnCtx.InfoSchema.(util.SequenceSchema).SequenceByName(model.NewCIStr(db), model.NewCIStr(seq)) + sequence, err := b.ctx.GetSessionVars().GetInfoSchema().(util.SequenceSchema).SequenceByName(model.NewCIStr(db), model.NewCIStr(seq)) if err != nil { return 0, false, err } diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 4fcbdc042de85..ac8afd14605f1 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -24,12 +24,8 @@ import ( "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" ) // InfoSchema is the interface used to retrieve the schema information. @@ -386,28 +382,6 @@ func HasAutoIncrementColumn(tbInfo *model.TableInfo) (bool, string) { return false, "" } -// GetInfoSchema gets TxnCtx InfoSchema if snapshot schema is not set, -// Otherwise, snapshot schema is returned. -func GetInfoSchema(ctx sessionctx.Context) InfoSchema { - return GetInfoSchemaBySessionVars(ctx.GetSessionVars()) -} - -// GetInfoSchemaBySessionVars gets TxnCtx InfoSchema if snapshot schema is not set, -// Otherwise, snapshot schema is returned. -func GetInfoSchemaBySessionVars(sessVar *variable.SessionVars) InfoSchema { - var is InfoSchema - if snap := sessVar.SnapshotInfoschema; snap != nil { - is = snap.(InfoSchema) - logutil.BgLogger().Info("use snapshot schema", zap.Uint64("conn", sessVar.ConnectionID), zap.Int64("schemaVersion", is.SchemaMetaVersion())) - } else { - if sessVar.TxnCtx == nil || sessVar.TxnCtx.InfoSchema == nil { - return nil - } - is = sessVar.TxnCtx.InfoSchema.(InfoSchema) - } - return is -} - func (is *infoSchema) BundleByName(name string) (*placement.Bundle, bool) { is.ruleBundleMutex.RLock() defer is.ruleBundleMutex.RUnlock() diff --git a/infoschema/tables.go b/infoschema/tables.go index 9d48b67cf0189..882a893e0fd3f 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1736,7 +1736,7 @@ func (s SchemasSorter) Less(i, j int) bool { } func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column) (fullRows [][]types.Datum, err error) { - is := GetInfoSchema(ctx) + is := ctx.GetSessionVars().GetInfoSchema().(InfoSchema) dbs := is.AllSchemas() sort.Sort(SchemasSorter(dbs)) switch it.meta.Name.O { diff --git a/planner/core/cacheable_checker_test.go b/planner/core/cacheable_checker_test.go index eb33790dfd74f..fb9d05d528ec0 100644 --- a/planner/core/cacheable_checker_test.go +++ b/planner/core/cacheable_checker_test.go @@ -42,7 +42,7 @@ func (s *testCacheableSuite) TestCacheable(c *C) { tk.MustExec("create table t2(a int, b int) partition by hash(a) partitions 11") tk.MustExec("create table t3(a int, b int)") tbl := &ast.TableName{Schema: model.NewCIStr("test"), Name: model.NewCIStr("t3")} - is := infoschema.GetInfoSchema(tk.Se) + is := tk.Se.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) // test non-SelectStmt/-InsertStmt/-DeleteStmt/-UpdateStmt/-SetOprStmt var stmt ast.Node = &ast.ShowStmt{} c.Assert(core.Cacheable(stmt, is), IsFalse) diff --git a/planner/core/explain.go b/planner/core/explain.go index e3e7e4e06d0b3..913b4a88b5dac 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -306,7 +306,7 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) string { return "" } - is := infoschema.GetInfoSchema(sctx) + is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) tmp, ok := is.TableByID(ts.Table.ID) if !ok { return "partition table not found" + strconv.FormatInt(ts.Table.ID, 10) @@ -366,7 +366,7 @@ func (p *PhysicalIndexReader) accessObject(sctx sessionctx.Context) string { } var buffer bytes.Buffer - is := infoschema.GetInfoSchema(sctx) + is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) tmp, ok := is.TableByID(ts.Table.ID) if !ok { fmt.Fprintf(&buffer, "partition table not found: %d", ts.Table.ID) @@ -394,7 +394,7 @@ func (p *PhysicalIndexLookUpReader) accessObject(sctx sessionctx.Context) string } var buffer bytes.Buffer - is := infoschema.GetInfoSchema(sctx) + is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) tmp, ok := is.TableByID(ts.Table.ID) if !ok { fmt.Fprintf(&buffer, "partition table not found: %d", ts.Table.ID) @@ -417,7 +417,7 @@ func (p *PhysicalIndexMergeReader) accessObject(sctx sessionctx.Context) string return "" } - is := infoschema.GetInfoSchema(sctx) + is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) tmp, ok := is.TableByID(ts.Table.ID) if !ok { return "partition table not found" + strconv.FormatInt(ts.Table.ID, 10) diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index f10134bb849c2..7e53d56f42811 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -63,8 +63,9 @@ func evalAstExpr(sctx sessionctx.Context, expr ast.ExprNode) (types.Datum, error // rewriteAstExpr rewrites ast expression directly. func rewriteAstExpr(sctx sessionctx.Context, expr ast.ExprNode, schema *expression.Schema, names types.NameSlice) (expression.Expression, error) { var is infoschema.InfoSchema - if sctx.GetSessionVars().TxnCtx.InfoSchema != nil { - is = sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema) + // in tests, it may be null + if s, ok := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema); ok { + is = s } b, savedBlockNames := NewPlanBuilder(sctx, is, &hint.BlockHintProcessor{}) fakePlan := LogicalTableDual{}.Init(sctx, 0) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 4c85cb06582a8..08ee210f17966 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -1110,7 +1110,7 @@ func (s *testIntegrationSuite) TestPartitionPruningForEQ(c *C) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a datetime, b int) partition by range(weekday(a)) (partition p0 values less than(10), partition p1 values less than (100))") - is := infoschema.GetInfoSchema(tk.Se) + is := tk.Se.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) pt := tbl.(table.PartitionedTable) diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 6f3d524ca7490..b16a0be69c0c8 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -976,7 +976,7 @@ func checkFastPlanPrivilege(ctx sessionctx.Context, dbName, tableName string, ch }) } - infoSchema := infoschema.GetInfoSchema(ctx) + infoSchema := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) return CheckTableLock(ctx, infoSchema, visitInfos) } @@ -1282,7 +1282,7 @@ func buildPointUpdatePlan(ctx sessionctx.Context, pointPlan PhysicalPlan, dbName VirtualAssignmentsOffset: len(orderedList), }.Init(ctx) updatePlan.names = pointPlan.OutputNames() - is := infoschema.GetInfoSchema(ctx) + is := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) t, _ := is.TableByID(tbl.ID) updatePlan.tblID2Table = map[int64]table.Table{ tbl.ID: t, @@ -1458,7 +1458,7 @@ func getHashPartitionColumnName(ctx sessionctx.Context, tbl *model.TableInfo) *a if pi.Type != model.PartitionTypeHash { return nil } - is := infoschema.GetInfoSchema(ctx) + is := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) table, ok := is.TableByID(tbl.ID) if !ok { return nil diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index 9a5967d438506..720dc32609463 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -195,7 +195,7 @@ func (s *testPlanSerialSuite) TestPrepareCacheDeferredFunction(c *C) { for i := 0; i < 2; i++ { stmt, err := s.ParseOneStmt(sql1, "", "") c.Check(err, IsNil) - is := tk.Se.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema) + is := tk.Se.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) builder, _ := core.NewPlanBuilder(tk.Se, is, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) c.Check(err, IsNil) diff --git a/session/session.go b/session/session.go index 96f0538acaf61..e99f26901743d 100644 --- a/session/session.go +++ b/session/session.go @@ -401,7 +401,7 @@ func (s *session) StoreIndexUsage(tblID int64, idxID int64, rowsSelected int64) // FieldList returns fields list of a table. func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) { - is := infoschema.GetInfoSchema(s) + is := s.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) dbName := model.NewCIStr(s.GetSessionVars().CurrentDB) tName := model.NewCIStr(tableName) pm := privilege.GetPrivilegeManager(s) @@ -1602,7 +1602,7 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields // So we have to call PrepareTxnCtx here. s.PrepareTxnCtx(ctx) s.PrepareTSFuture(ctx) - prepareExec := executor.NewPrepareExec(s, infoschema.GetInfoSchema(s), sql) + prepareExec := executor.NewPrepareExec(s, s.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema), sql) err = prepareExec.Next(ctx, nil) if err != nil { return @@ -1643,7 +1643,7 @@ func (s *session) cachedPlanExec(ctx context.Context, if prepareStmt.ForUpdateRead { is = domain.GetDomain(s).InfoSchema() } else { - is = infoschema.GetInfoSchema(s) + is = s.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) } execAst := &ast.ExecuteStmt{ExecID: stmtID} if err := executor.ResetContextOfStmt(s, execAst); err != nil { @@ -1723,7 +1723,7 @@ func (s *session) IsCachedExecOk(ctx context.Context, preparedStmt *plannercore. return false, nil } // check schema version - is := infoschema.GetInfoSchema(s) + is := s.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) if prepared.SchemaVersion != is.SchemaMetaVersion() { prepared.CachedPlan = nil return false, nil @@ -2820,7 +2820,7 @@ func (s *session) checkPlacementPolicyBeforeCommit() error { txnScope = oracle.GlobalTxnScope } if txnScope != oracle.GlobalTxnScope { - is := infoschema.GetInfoSchema(s) + is := s.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) deltaMap := s.GetSessionVars().TxnCtx.TableDeltaMap for physicalTableID := range deltaMap { var tableName string diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 82546fd553d65..69e2933204d9a 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/util/timeutil" "github.com/twmb/murmur3" atomic2 "go.uber.org/atomic" + "go.uber.org/zap" ) // PreparedStmtCount is exported for test. @@ -862,6 +863,23 @@ func (s *SessionVars) BuildParserConfig() parser.ParserConfig { } } +// GetInfoSchema returns snapshotInfoSchema if snapshot schema is set. +// Otherwise, transaction infoschema is returned. +// Nil if there is no available infoschema. +func (s *SessionVars) GetInfoSchema() interface{} { + type IS interface { + SchemaMetaVersion() int64 + } + if snap, ok := s.SnapshotInfoschema.(IS); ok { + logutil.BgLogger().Info("use snapshot schema", zap.Uint64("conn", s.ConnectionID), zap.Int64("schemaVersion", snap.SchemaMetaVersion())) + return snap + } + if s.TxnCtx != nil && s.TxnCtx.InfoSchema != nil { + return s.TxnCtx.InfoSchema + } + return nil +} + // PartitionPruneMode presents the prune mode used. type PartitionPruneMode string diff --git a/statistics/handle/ddl.go b/statistics/handle/ddl.go index 4004fd1145c49..0fca7d607d5d2 100644 --- a/statistics/handle/ddl.go +++ b/statistics/handle/ddl.go @@ -52,8 +52,81 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error { } } } +<<<<<<< HEAD if pruneMode == variable.Dynamic { // TODO: need trigger full analyze +======= + } + return nil +} + +// analyzeOptionDefault saves the default values of NumBuckets and NumTopN. +// These values will be used in dynamic mode when we drop table partition and then need to merge global-stats. +// These values originally came from the analyzeOptionDefault structure in the planner/core/planbuilder.go file. +var analyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{ + ast.AnalyzeOptNumBuckets: 256, + ast.AnalyzeOptNumTopN: 20, +} + +// updateGlobalStats will trigger the merge of global-stats when we drop table partition +func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error { + // We need to merge the partition-level stats to global-stats when we drop table partition in dynamic mode. + tableID := tblInfo.ID + is := h.mu.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) + globalStats, err := h.TableStatsFromStorage(tblInfo, tableID, true, 0) + if err != nil { + return err + } + // If we do not currently have global-stats, no new global-stats will be generated. + if globalStats == nil { + return nil + } + opts := make(map[ast.AnalyzeOptionType]uint64, len(analyzeOptionDefault)) + for key, val := range analyzeOptionDefault { + opts[key] = val + } + // Use current global-stats related information to construct the opts for `MergePartitionStats2GlobalStats` function. + globalColStatsTopNNum, globalColStatsBucketNum := 0, 0 + for colID := range globalStats.Columns { + globalColStatsTopN := globalStats.Columns[colID].TopN + if globalColStatsTopN != nil && len(globalColStatsTopN.TopN) > globalColStatsTopNNum { + globalColStatsTopNNum = len(globalColStatsTopN.TopN) + } + globalColStats := globalStats.Columns[colID] + if globalColStats != nil && len(globalColStats.Buckets) > globalColStatsBucketNum { + globalColStatsBucketNum = len(globalColStats.Buckets) + } + } + if globalColStatsTopNNum != 0 { + opts[ast.AnalyzeOptNumTopN] = uint64(globalColStatsTopNNum) + } + if globalColStatsBucketNum != 0 { + opts[ast.AnalyzeOptNumBuckets] = uint64(globalColStatsBucketNum) + } + // Generate the new column global-stats + newColGlobalStats, err := h.mergePartitionStats2GlobalStats(h.mu.ctx, opts, is, tblInfo, 0, 0) + if err != nil { + return err + } + for i := 0; i < newColGlobalStats.Num; i++ { + hg, cms, topN, fms := newColGlobalStats.Hg[i], newColGlobalStats.Cms[i], newColGlobalStats.TopN[i], newColGlobalStats.Fms[i] + err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, 0, hg, cms, topN, fms, 2, 1) + if err != nil { + return err + } + } + + // Generate the new index global-stats + globalIdxStatsTopNNum, globalIdxStatsBucketNum := 0, 0 + for idx := range tblInfo.Indices { + globalIdxStatsTopN := globalStats.Indices[int64(idx)].TopN + if globalIdxStatsTopN != nil && len(globalIdxStatsTopN.TopN) > globalIdxStatsTopNNum { + globalIdxStatsTopNNum = len(globalIdxStatsTopN.TopN) + } + globalIdxStats := globalStats.Indices[int64(idx)] + if globalIdxStats != nil && len(globalIdxStats.Buckets) > globalIdxStatsBucketNum { + globalIdxStatsBucketNum = len(globalIdxStats.Buckets) +>>>>>>> 5e9e0e6e3... *: consitent get infoschema (#24230) } case model.ActionDropTablePartition: pruneMode := h.CurrentPruneMode() diff --git a/statistics/handle/update.go b/statistics/handle/update.go index c65f0885877f6..6f472fc61fdc7 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -480,7 +480,7 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up affectedRows := h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() // if it's a partitioned table and its global-stats exists, update its count and modify_count as well. - is := infoschema.GetInfoSchema(h.mu.ctx) + is := h.mu.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema) if is == nil { return false, errors.New("cannot get the information schema") }