Skip to content

Commit

Permalink
ddl: DROP TABLE/VIEW/SEQUENCE now use XXXStmt as parameter (#35741)
Browse files Browse the repository at this point in the history
ref #35665
  • Loading branch information
lance6716 authored Jun 29, 2022
1 parent ed5e63a commit 0d69344
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 189 deletions.
6 changes: 3 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ type DDL interface {
DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr,
columnNames []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error
DropIndex(ctx sessionctx.Context, tableIdent ast.Ident, indexName model.CIStr, ifExists bool) error
Expand All @@ -122,7 +122,7 @@ type DDL interface {
UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error
RepairTable(ctx sessionctx.Context, table *ast.TableName, createStmt *ast.CreateTableStmt) error
CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStmt) error
DropSequence(ctx sessionctx.Context, tableIdent ast.Ident, ifExists bool) (err error)
DropSequence(ctx sessionctx.Context, stmt *ast.DropSequenceStmt) (err error)
AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt) error
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
Expand Down
237 changes: 156 additions & 81 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5326,71 +5326,172 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
return errors.Trace(err)
}

// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}
// If one drop those tables by mistake, it's difficult to recover.
// In the worst case, the whole TiDB cluster fails to bootstrap, so we prevent user from dropping them.
var systemTables = map[string]struct{}{
"tidb": {},
"gc_delete_range": {},
"gc_delete_range_done": {},
}

if tb.Meta().IsView() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}
if tb.Meta().IsSequence() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
func isSystemTable(schema, table string) bool {
if schema != "mysql" {
return false
}
if tb.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
return dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Table")
if _, ok := systemTables[table]; ok {
return true
}
return false
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: schema.State,
TableName: tb.Meta().Name.L,
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{},
}
type objectType int

const (
tableObject objectType = iota
viewObject
sequenceObject
)

// dropTableObject provides common logic to DROP TABLE/VIEW/SEQUENCE.
func (d *ddl) dropTableObject(
ctx sessionctx.Context,
objects []*ast.TableName,
ifExists bool,
tableObjectType objectType,
) error {
var (
notExistTables []string
sessVars = ctx.GetSessionVars()
is = d.GetInfoSchemaWithInterceptor(ctx)
dropExistErr *terror.Error
jobType model.ActionType
)

switch tableObjectType {
case tableObject:
dropExistErr = infoschema.ErrTableDropExists
jobType = model.ActionDropTable
case viewObject:
dropExistErr = infoschema.ErrTableDropExists
jobType = model.ActionDropView
case sequenceObject:
dropExistErr = infoschema.ErrSequenceDropExists
jobType = model.ActionDropSequence
}

for _, tn := range objects {
fullti := ast.Ident{Schema: tn.Schema, Name: tn.Name}
schema, ok := is.SchemaByName(tn.Schema)
if !ok {
// TODO: we should return special error for table not exist, checking "not exist" is not enough,
// because some other errors may contain this error string too.
notExistTables = append(notExistTables, fullti.String())
continue
}
tableInfo, err := is.TableByName(tn.Schema, tn.Name)
if err != nil && infoschema.ErrTableNotExists.Equal(err) {
notExistTables = append(notExistTables, fullti.String())
continue
} else if err != nil {
return err
}

// prechecks before build DDL job

// Protect important system table from been dropped by a mistake.
// I can hardly find a case that a user really need to do this.
if isSystemTable(tn.Schema.L, tn.Name.L) {
return errors.Errorf("Drop tidb system table '%s.%s' is forbidden", tn.Schema.L, tn.Name.L)
}
switch tableObjectType {
case tableObject:
if !tableInfo.Meta().IsBaseTable() {
notExistTables = append(notExistTables, fullti.String())
continue
}

tempTableType := tableInfo.Meta().TempTableType
if config.CheckTableBeforeDrop && tempTableType == model.TempTableNone {
logutil.BgLogger().Warn("admin check table before drop",
zap.String("database", fullti.Schema.O),
zap.String("table", fullti.Name.O),
)
exec := ctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(context.TODO(), nil, "admin check table %n.%n", fullti.Schema.O, fullti.Name.O)
if err != nil {
return err
}
}

if tableInfo.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
return dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Table")
}
case viewObject:
if !tableInfo.Meta().IsView() {
return dbterror.ErrWrongObject.GenWithStackByArgs(fullti.Schema, fullti.Name, "VIEW")
}
case sequenceObject:
if !tableInfo.Meta().IsSequence() {
err = dbterror.ErrWrongObject.GenWithStackByArgs(fullti.Schema, fullti.Name, "SEQUENCE")
if ifExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
continue
}
return err
}
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tableInfo.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: schema.State,
TableName: tableInfo.Meta().Name.L,
Type: jobType,
BinlogInfo: &model.HistoryInfo{},
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
notExistTables = append(notExistTables, fullti.String())
continue
} else if err != nil {
return errors.Trace(err)
}

// unlock table after drop
if tableObjectType != tableObject {
continue
}
if !config.TableLockEnabled() {
continue
}
if ok, _ := ctx.CheckTableLocked(tableInfo.Meta().ID); ok {
ctx.ReleaseTableLockByTableIDs([]int64{tableInfo.Meta().ID})
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
if err != nil {
return errors.Trace(err)
}
if !config.TableLockEnabled() {
return nil
if len(notExistTables) > 0 && !ifExists {
return dropExistErr.GenWithStackByArgs(strings.Join(notExistTables, ","))
}
if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok {
ctx.ReleaseTableLockByTableIDs([]int64{tb.Meta().ID})
// We need add warning when use if exists.
if len(notExistTables) > 0 && ifExists {
for _, table := range notExistTables {
sessVars.StmtCtx.AppendNote(dropExistErr.GenWithStackByArgs(table))
}
}
return nil
}

// DropView will proceed even if some view in the list does not exists.
func (d *ddl) DropView(ctx sessionctx.Context, ti ast.Ident) (err error) {
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}

if !tb.Meta().IsView() {
return dbterror.ErrWrongObject.GenWithStackByArgs(ti.Schema, ti.Name, "VIEW")
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: tb.Meta().State,
TableName: tb.Meta().Name.L,
Type: model.ActionDropView,
BinlogInfo: &model.HistoryInfo{},
}
// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
return d.dropTableObject(ctx, stmt.Tables, stmt.IfExists, tableObject)
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
// DropView will proceed even if some view in the list does not exists.
func (d *ddl) DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
return d.dropTableObject(ctx, stmt.Tables, stmt.IfExists, viewObject)
}

func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
Expand Down Expand Up @@ -6676,34 +6777,8 @@ func (d *ddl) AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt)
return errors.Trace(err)
}

func (d *ddl) DropSequence(ctx sessionctx.Context, ti ast.Ident, ifExists bool) (err error) {
schema, tbl, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}

if !tbl.Meta().IsSequence() {
err = dbterror.ErrWrongObject.GenWithStackByArgs(ti.Schema, ti.Name, "SEQUENCE")
if ifExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
return err
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tbl.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: tbl.Meta().State,
TableName: tbl.Meta().Name.L,
Type: model.ActionDropSequence,
BinlogInfo: &model.HistoryInfo{},
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
func (d *ddl) DropSequence(ctx sessionctx.Context, stmt *ast.DropSequenceStmt) (err error) {
return d.dropTableObject(ctx, stmt.Sequences, stmt.IfExists, sequenceObject)
}

func (d *ddl) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, indexName model.CIStr, visibility ast.IndexVisibility) error {
Expand Down
Loading

0 comments on commit 0d69344

Please sign in to comment.