Skip to content

Commit

Permalink
planner: use TableInfo.DBID to locate schema (#57785) (#57870)
Browse files Browse the repository at this point in the history
close #57779, close #57783
  • Loading branch information
ti-chi-bot authored Dec 2, 2024
1 parent afb68c6 commit a248b96
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions pkg/planner/core/memtable_infoschema_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/set"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -110,24 +113,37 @@ func (e *InfoSchemaBaseExtractor) GetBase() *InfoSchemaBaseExtractor {
return e
}

// ListSchemas lists related schemas from predicate.
// ListSchemas lists all schemas from predicate. If no schema is specified, it lists
// all schemas in the storage.
func (e *InfoSchemaBaseExtractor) ListSchemas(is infoschema.InfoSchema) []pmodel.CIStr {
ec := e.extractableColumns
schemas := e.getSchemaObjectNames(ec.schema)
if len(schemas) == 0 {
extractedSchemas, unspecified := e.listPredicateSchemas(is)
if unspecified {
ret := is.AllSchemaNames()
slices.SortFunc(ret, func(a, b pmodel.CIStr) int {
return strings.Compare(a.L, b.L)
})
return filterSchemaObjectByRegexp(e, ec.schema, ret, extractStrCIStr)
return filterSchemaObjectByRegexp(e, e.extractableColumns.schema, ret, extractStrCIStr)
}
return extractedSchemas
}

// listPredicateSchemas lists all schemas specified in predicates.
// If no schema is specified in predicates, return `unspecified` as true.
func (e *InfoSchemaBaseExtractor) listPredicateSchemas(
is infoschema.InfoSchema,
) (schemas []pmodel.CIStr, unspecified bool) {
ec := e.extractableColumns
schemas = e.getSchemaObjectNames(ec.schema)
if len(schemas) == 0 {
return nil, true
}
ret := schemas[:0]
for _, s := range schemas {
if n, ok := is.SchemaByName(s); ok {
ret = append(ret, n.Name)
}
}
return filterSchemaObjectByRegexp(e, ec.schema, ret, extractStrCIStr)
return filterSchemaObjectByRegexp(e, ec.schema, ret, extractStrCIStr), false
}

// ListSchemasAndTables lists related tables and their corresponding schemas from predicate.
Expand All @@ -137,7 +153,6 @@ func (e *InfoSchemaBaseExtractor) ListSchemasAndTables(
is infoschema.InfoSchema,
) ([]pmodel.CIStr, []*model.TableInfo, error) {
ec := e.extractableColumns
schemas := e.ListSchemas(is)
var tableNames []pmodel.CIStr
if ec.table != "" {
tableNames = e.getSchemaObjectNames(ec.table)
Expand All @@ -150,7 +165,7 @@ func (e *InfoSchemaBaseExtractor) ListSchemasAndTables(
findTablesByID(is, tableIDs, tableNames, tableMap)
tableSlice := maps.Values(tableMap)
tableSlice = filterSchemaObjectByRegexp(e, ec.table, tableSlice, extractStrTableInfo)
return findSchemasForTables(ctx, is, schemas, tableSlice)
return findSchemasForTables(e, is, tableSlice)
}
}
if ec.partitionID != "" {
Expand All @@ -160,13 +175,15 @@ func (e *InfoSchemaBaseExtractor) ListSchemasAndTables(
findTablesByPartID(is, partIDs, tableNames, tableMap)
tableSlice := maps.Values(tableMap)
tableSlice = filterSchemaObjectByRegexp(e, ec.table, tableSlice, extractStrTableInfo)
return findSchemasForTables(ctx, is, schemas, tableSlice)
return findSchemasForTables(e, is, tableSlice)
}
}
if len(tableNames) > 0 {
tableNames = filterSchemaObjectByRegexp(e, ec.table, tableNames, extractStrCIStr)
schemas := e.ListSchemas(is)
return findTableAndSchemaByName(ctx, is, schemas, tableNames)
}
schemas := e.ListSchemas(is)
return listTablesForEachSchema(ctx, e, is, schemas)
}

Expand Down Expand Up @@ -723,23 +740,28 @@ func listTablesForEachSchema(
// returns a schema slice and a table slice that has the same length.
// Note that input arg "tableSlice" will be changed in place.
func findSchemasForTables(
ctx context.Context,
e *InfoSchemaBaseExtractor,
is infoschema.InfoSchema,
schemas []pmodel.CIStr,
tableSlice []*model.TableInfo,
) ([]pmodel.CIStr, []*model.TableInfo, error) {
schemas, unspecified := e.listPredicateSchemas(is)
schemaSlice := make([]pmodel.CIStr, 0, len(tableSlice))
for i, tbl := range tableSlice {
dbInfo, ok := is.SchemaByID(tbl.DBID)
intest.Assert(ok)
if !ok {
logutil.BgLogger().Warn("schema not found for table info",
zap.Int64("tableID", tbl.ID), zap.Int64("dbID", tbl.DBID))
continue
}
if unspecified { // all schemas should be included.
schemaSlice = append(schemaSlice, dbInfo.Name)
continue
}

found := false
for _, s := range schemas {
isTbl, err := is.TableByName(ctx, s, tbl.Name)
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrTableNotExists) {
continue
}
return nil, nil, errors.Trace(err)
}
if isTbl.Meta().ID == tbl.ID {
if s.L == dbInfo.Name.L {
schemaSlice = append(schemaSlice, s)
found = true
break
Expand Down

0 comments on commit a248b96

Please sign in to comment.