Skip to content

Commit

Permalink
cherry pick pingcap#24230 to release-5.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
xhebox authored and ti-srebot committed May 10, 2021
1 parent 1f4105e commit 85737b5
Show file tree
Hide file tree
Showing 32 changed files with 288 additions and 88 deletions.
7 changes: 3 additions & 4 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,9 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.TaskID = sv.StmtCtx.TaskID
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.GetReplicaRead()
if sv.SnapshotInfoschema != nil {
builder.Request.SchemaVar = infoschema.GetInfoSchemaBySessionVars(sv).SchemaMetaVersion()
} else {
builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion
// in tests, it may be null
if is, ok := sv.GetInfoSchema().(infoschema.InfoSchema); ok {
builder.Request.SchemaVar = is.SchemaMetaVersion()
}
builder.txnScope = sv.TxnCtx.TxnScope
builder.IsStaleness = sv.TxnCtx.IsStaleness
Expand Down
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (a *ExecStmt) IsReadOnly(vars *variable.SessionVars) bool {
// RebuildPlan rebuilds current execute statement plan.
// It returns the current information schema version that 'a' is using.
func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
is := infoschema.GetInfoSchema(a.Ctx)
is := a.Ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
a.InfoSchema = is
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, is, plannercore.InTxnRetry); err != nil {
return 0, err
Expand Down
6 changes: 5 additions & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
if needGlobalStats {
for globalStatsID, info := range globalStatsMap {
<<<<<<< HEAD
globalStats, err := statsHandle.MergePartitionStats2GlobalStats(e.ctx, e.opts, infoschema.GetInfoSchema(e.ctx), globalStatsID.tableID, info.isIndex, info.idxID)
=======
globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(e.ctx, e.opts, e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema), globalStatsID.tableID, info.isIndex, info.idxID)
>>>>>>> 5e9e0e6e3... *: consitent get infoschema (#24230)
if err != nil {
if types.ErrPartitionStatsMissing.Equal(err) {
// When we find some partition-level stats are missing, we need to report warning.
Expand All @@ -188,7 +192,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
}
}
return statsHandle.Update(infoschema.GetInfoSchema(e.ctx))
return statsHandle.Update(e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema))
}

func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) {
Expand Down
12 changes: 6 additions & 6 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ PARTITION BY RANGE ( a ) (
}
tk.MustExec("analyze table t")

is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context))
is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
pi := table.Meta().GetPartitionInfo()
Expand All @@ -95,7 +95,7 @@ PARTITION BY RANGE ( a ) (
tk.MustExec(fmt.Sprintf(`insert into t values (%d, %d, "hello")`, i, i))
}
tk.MustExec("alter table t analyze partition p0")
is = infoschema.GetInfoSchema(tk.Se.(sessionctx.Context))
is = tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
table, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
pi = table.Meta().GetPartitionInfo()
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *testSuite1) TestAnalyzeParameters(c *C) {

tk.MustExec("set @@tidb_enable_fast_analyze = 1")
tk.MustExec("analyze table t with 30 samples")
is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context))
is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := table.Meta()
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *testSuite1) TestAnalyzeTooLongColumns(c *C) {
tk.MustExec(fmt.Sprintf("insert into t values ('%s')", value))

tk.MustExec("analyze table t")
is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context))
is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := table.Meta()
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *testSuite1) TestAnalyzeIndexExtractTopN(c *C) {
tk.MustExec("set @@session.tidb_analyze_version=2")
tk.MustExec("analyze table t with 10 cmsketch width")

is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context))
is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := table.Meta()
Expand Down Expand Up @@ -434,7 +434,7 @@ func (s *testFastAnalyze) TestFastAnalyze(c *C) {
}
tk.MustExec("analyze table t with 5 buckets, 6 samples")

is := infoschema.GetInfoSchema(tk.Se.(sessionctx.Context))
is := tk.Se.(sessionctx.Context).GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := table.Meta()
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3418,7 +3418,7 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
SetFromInfoSchema(e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)).
Build()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
ctx = opentracing.ContextWithSpan(ctx, span1)
}

infoSchema := infoschema.GetInfoSchema(c.Ctx)
infoSchema := c.Ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
if err := plannercore.Preprocess(c.Ctx, stmtNode, infoSchema); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec
return nil, errors.Trace(err)
}
h.dagReq = dagReq
is := h.sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema)
is := h.sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
// Build physical plan.
bp := core.NewPBPlanBuilder(h.sctx, is, req.Ranges)
plan, err := bp.Build(dagReq.Executors)
Expand Down
72 changes: 68 additions & 4 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,19 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
SetMemTracker(e.memTracker)
// for tests, infoschema may be null
if is, ok := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema); ok {
builder.SetFromInfoSchema(is)
}
kvReq, err := builder.Build()
if err != nil {
e.feedback.Invalidate()
return err
Expand Down Expand Up @@ -510,9 +513,70 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
if err != nil {
e.feedback.Invalidate()
}
<<<<<<< HEAD
cancel()
if err := result.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed", zap.Error(err))
=======
var builder distsql.RequestBuilder
builder.SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(tracker)
// for tests, infoschema may be null
if is, ok := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema); ok {
builder.SetFromInfoSchema(is)
}

for partTblIdx, kvRange := range kvRanges {
// check if executor is closed
finished := false
select {
case <-e.finished:
finished = true
default:
}
if finished {
break
}

// init kvReq, result and worker for this partition
kvReq, err := builder.SetKeyRanges(kvRange).Build()
if err != nil {
worker.syncErr(err)
break
}
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans), idxID)
if err != nil {
worker.syncErr(err)
break
}
worker.batchSize = initBatchSize
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
}
if e.partitionTableMode {
worker.partitionTable = e.prunedPartitions[partTblIdx]
}

// fetch data from this partition
ctx1, cancel := context.WithCancel(ctx)
_, fetchErr := worker.fetchHandles(ctx1, result)
if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again
e.feedback.Invalidate()
}
cancel()
if err := result.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed", zap.Error(err))
}
e.ctx.StoreQueryFeedback(e.feedback)
if fetchErr != nil {
break // if any error occurs, exit after releasing all resources
}
>>>>>>> 5e9e0e6e3... *: consitent get infoschema (#24230)
}
e.ctx.StoreQueryFeedback(e.feedback)
close(workCh)
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2335,7 +2335,7 @@ func (s *testSuiteP2) TestIsPointGet(c *C) {
"select * from help_topic where help_topic_id=1": true,
"select * from help_topic where help_category_id=1": false,
}
infoSchema := infoschema.GetInfoSchema(ctx)
infoSchema := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)

for sqlStr, result := range tests {
stmtNode, err := s.ParseOneStmt(sqlStr, "", "")
Expand Down Expand Up @@ -2367,7 +2367,7 @@ func (s *testSuiteP2) TestClusteredIndexIsPointGet(c *C) {
"select * from t where a='x' and c='x'": true,
"select * from t where a='x' and c='x' and b=1": false,
}
infoSchema := infoschema.GetInfoSchema(ctx)
infoSchema := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
for sqlStr, result := range tests {
stmtNode, err := s.ParseOneStmt(sqlStr, "", "")
c.Check(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion executor/grant.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (e *GrantExec) Next(ctx context.Context, req *chunk.Chunk) error {
// Make sure the table exist.
if e.Level.Level == ast.GrantLevelTable {
dbNameStr := model.NewCIStr(dbName)
schema := infoschema.GetInfoSchema(e.ctx)
schema := e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
tbl, err := schema.TableByName(dbNameStr, model.NewCIStr(e.Level.TableName))
if err != nil {
return err
Expand Down
64 changes: 64 additions & 0 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,71 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
var err error
util.WithRecovery(
func() {
<<<<<<< HEAD
_, err = worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols)
=======
var builder distsql.RequestBuilder
builder.SetDAGRequest(e.dagPBs[workID]).
SetStartTS(e.startTS).
SetDesc(e.descs[workID]).
SetKeepOrder(false).
SetStreaming(e.partialStreamings[workID]).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
SetFromInfoSchema(e.ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema))

worker := &partialIndexWorker{
stats: e.stats,
idxID: e.getPartitalPlanID(workID),
sc: e.ctx,
batchSize: e.maxChunkSize,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
maxChunkSize: e.maxChunkSize,
}

for parTblIdx, keyRange := range keyRanges {
// check if this executor is closed
select {
case <-e.finished:
break
default:
}

// init kvReq and worker for this partition
kvReq, err := builder.SetKeyRanges(keyRange).Build()
if err != nil {
worker.syncErr(e.resultCh, err)
return
}
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID))
if err != nil {
worker.syncErr(e.resultCh, err)
return
}
worker.batchSize = e.maxChunkSize
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
}
if e.partitionTableMode {
worker.partition = e.prunedPartitions[parTblIdx]
}

// fetch all data from this partition
ctx1, cancel := context.WithCancel(ctx)
_, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols)
if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again
e.feedbacks[workID].Invalidate()
}
if err := result.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err))
}
cancel()
e.ctx.StoreQueryFeedback(e.feedbacks[workID])
if fetchErr != nil {
break
}
}
>>>>>>> 5e9e0e6e3... *: consitent get infoschema (#24230)
},
e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialIndexWorker"),
)
Expand Down
16 changes: 8 additions & 8 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex

// Cache the ret full rows in schemataRetriever
if !e.initialized {
is := infoschema.GetInfoSchema(sctx)
is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
dbs := is.AllSchemas()
sort.Sort(infoschema.SchemasSorter(dbs))
var err error
Expand Down Expand Up @@ -293,7 +293,7 @@ func (c *statsCache) get(ctx sessionctx.Context) (map[int64]uint64, map[tableHis
}

func getAutoIncrementID(ctx sessionctx.Context, schema *model.DBInfo, tblInfo *model.TableInfo) (int64, error) {
is := infoschema.GetInfoSchema(ctx)
is := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
tbl, err := is.TableByName(schema.Name, tblInfo.Name)
if err != nil {
return 0, err
Expand Down Expand Up @@ -581,7 +581,7 @@ func (e *hugeMemTableRetriever) setDataForColumns(ctx context.Context, sctx sess
}

func (e *hugeMemTableRetriever) dataForColumnsInTable(ctx context.Context, sctx sessionctx.Context, schema *model.DBInfo, tbl *model.TableInfo) {
if err := tryFillViewColumnType(ctx, sctx, infoschema.GetInfoSchema(sctx), schema.Name, tbl); err != nil {
if err := tryFillViewColumnType(ctx, sctx, sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema), schema.Name, tbl); err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(err)
return
}
Expand Down Expand Up @@ -1307,7 +1307,7 @@ func (e *memtableRetriever) setDataForTiKVRegionStatus(ctx sessionctx.Context) e
if err != nil {
return err
}
allSchemas := ctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema).AllSchemas()
allSchemas := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema).AllSchemas()
tableInfos := tikvHelper.GetRegionsTableInfo(regionsInfo, allSchemas)
for _, region := range regionsInfo.Regions {
tableList := tableInfos[region.ID]
Expand Down Expand Up @@ -1419,7 +1419,7 @@ func (e *memtableRetriever) setDataForTiDBHotRegions(ctx sessionctx.Context) err
if !ok {
return errors.New("Information about hot region can be gotten only when the storage is TiKV")
}
allSchemas := ctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema).AllSchemas()
allSchemas := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema).AllSchemas()
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
Expand Down Expand Up @@ -1568,7 +1568,7 @@ type initialTable struct {
}

func (e *tableStorageStatsRetriever) initialize(sctx sessionctx.Context) error {
is := infoschema.GetInfoSchema(sctx)
is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
var databases []string
schemas := e.extractor.TableSchema
tables := e.extractor.TableName
Expand Down Expand Up @@ -1848,7 +1848,7 @@ func (e *memtableRetriever) setDataForStatementsSummary(ctx sessionctx.Context,

func (e *memtableRetriever) setDataForPlacementPolicy(ctx sessionctx.Context) error {
checker := privilege.GetPrivilegeManager(ctx)
is := infoschema.GetInfoSchema(ctx)
is := ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
var rows [][]types.Datum
for _, bundle := range is.RuleBundles() {
id, err := placement.ObjectIDFromGroupID(bundle.ID)
Expand Down Expand Up @@ -1985,7 +1985,7 @@ func (e *hugeMemTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co
}

if !e.initialized {
is := infoschema.GetInfoSchema(sctx)
is := sctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema)
dbs := is.AllSchemas()
sort.Sort(infoschema.SchemasSorter(dbs))
e.dbs = dbs
Expand Down
2 changes: 1 addition & 1 deletion executor/load_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,5 @@ func (e *LoadStatsInfo) Update(data []byte) error {
if h == nil {
return errors.New("Load Stats: handle is nil")
}
return h.LoadStatsFromJSON(infoschema.GetInfoSchema(e.Ctx), jsonTbl)
return h.LoadStatsFromJSON(e.Ctx.GetSessionVars().GetInfoSchema().(infoschema.InfoSchema), jsonTbl)
}
Loading

0 comments on commit 85737b5

Please sign in to comment.