Skip to content

Commit

Permalink
ddl: support drop sequence in TiDB (#14442)
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored and sre-bot committed Jan 20, 2020
1 parent 5c5dd7b commit b25c824
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 16 deletions.
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type DDL interface {
UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error
RepairTable(ctx sessionctx.Context, table *ast.TableName, createStmt *ast.CreateTableStmt) error
CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStmt) error
DropSequence(ctx sessionctx.Context, tableIdent ast.Ident, ifExists bool) (err error)

// GetLease returns current schema lease time.
GetLease() time.Duration
Expand Down
31 changes: 31 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3334,6 +3334,9 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
if tb.Meta().IsView() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}
if tb.Meta().IsSequence() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}

job := &model.Job{
SchemaID: schema.ID,
Expand Down Expand Up @@ -4364,3 +4367,31 @@ func (d *ddl) CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStm
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) DropSequence(ctx sessionctx.Context, ti ast.Ident, ifExists bool) (err error) {
schema, tbl, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}

if !tbl.Meta().IsSequence() {
err = ErrWrongObject.GenWithStackByArgs(ti.Schema, ti.Name, "SEQUENCE")
if ifExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
return err
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tbl.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionDropSequence,
BinlogInfo: &model.HistoryInfo{},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
2 changes: 1 addition & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onRepairTable(d, t, job)
case model.ActionCreateView:
ver, err = onCreateView(d, t, job)
case model.ActionDropTable, model.ActionDropView:
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
ver, err = onDropTableOrView(t, job)
case model.ActionDropTablePartition:
ver, err = onDropTablePartition(t, job)
Expand Down
2 changes: 1 addition & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
ver, err = rollingbackDropColumn(t, job)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
ver, err = rollingbackDropIndex(t, job)
case model.ActionDropTable, model.ActionDropView:
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
err = rollingbackDropTableOrView(t, job)
case model.ActionDropTablePartition:
ver, err = rollingbackDropTablePartition(t, job)
Expand Down
69 changes: 69 additions & 0 deletions ddl/sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/testkit"
)
Expand Down Expand Up @@ -84,3 +86,70 @@ func (s *testSequenceSuite) TestCreateSequence(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1142]CREATE command denied to user 'localhost'@'myuser' for table 'my_seq'")
}

func (s *testSequenceSuite) TestDropSequence(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use test")
s.tk.MustExec("drop sequence if exists seq")

// Test sequence is unknown.
s.tk.MustGetErrCode("drop sequence seq", mysql.ErrUnknownSequence)

// Test non-existed sequence can't drop successfully.
s.tk.MustExec("create sequence seq")
_, err := s.tk.Exec("drop sequence seq, seq2")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[schema:4139]Unknown SEQUENCE: 'test.seq2'")

// Test the specified object is not sequence.
s.tk.MustExec("create table seq3 (a int)")
_, err = s.tk.Exec("drop sequence seq3")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, ddl.ErrWrongObject), IsTrue)

// Test schema is not exist.
_, err = s.tk.Exec("drop sequence unknown.seq")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[schema:4139]Unknown SEQUENCE: 'unknown.seq'")

// Test drop sequence successfully.
s.tk.MustExec("create sequence seq")
_, err = s.tk.Exec("drop sequence seq")
c.Assert(err, IsNil)
_, err = s.tk.Exec("drop sequence seq")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[schema:4139]Unknown SEQUENCE: 'test.seq'")

// Test drop table when the object is a sequence.
s.tk.MustExec("create sequence seq")
_, err = s.tk.Exec("drop table seq")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[schema:1051]Unknown table 'test.seq'")

// Test drop view when the object is a sequence.
_, err = s.tk.Exec("drop view seq")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, ddl.ErrWrongObject), IsTrue)
s.tk.MustExec("drop sequence seq")

// Test drop privilege.
s.tk.MustExec("drop user if exists myuser@localhost")
s.tk.MustExec("create user myuser@localhost")
s.tk.MustExec("flush privileges")

tk1 := testkit.NewTestKit(c, s.store)
se, err := session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(se.Auth(&auth.UserIdentity{Username: "myuser", Hostname: "localhost"}, nil, nil), IsTrue)
tk1.Se = se

// grant the myuser the access to database test.
s.tk.MustExec("create sequence my_seq")
s.tk.MustExec("grant select on test.* to 'myuser'@'localhost'")
s.tk.MustExec("flush privileges")

tk1.MustExec("use test")
_, err = tk1.Exec("drop sequence my_seq")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1142]DROP command denied to user 'localhost'@'myuser' for table 'my_seq'")
}
10 changes: 8 additions & 2 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,14 @@ func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
return ver, errors.Trace(err)
}
if err = t.DropTableOrView(job.SchemaID, job.TableID, true); err != nil {
break
if tblInfo.IsSequence() {
if err = t.DropSequence(job.SchemaID, job.TableID, true); err != nil {
break
}
} else {
if err = t.DropTableOrView(job.SchemaID, job.TableID, true); err != nil {
break
}
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
Expand Down
50 changes: 41 additions & 9 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
case *ast.DropDatabaseStmt:
err = e.executeDropDatabase(x)
case *ast.DropTableStmt:
err = e.executeDropTableOrView(x)
if x.IsView {
err = e.executeDropView(x)
} else {
err = e.executeDropTable(x)
}
case *ast.RecoverTableStmt:
err = e.executeRecoverTable(x)
case *ast.FlashBackTableStmt:
Expand All @@ -115,6 +119,8 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
err = e.executeRepairTable(x)
case *ast.CreateSequenceStmt:
err = e.executeCreateSequence(x)
case *ast.DropSequenceStmt:
err = e.executeDropSequence(x)

}
if err != nil {
Expand Down Expand Up @@ -251,9 +257,30 @@ func isSystemTable(schema, table string) bool {
return false
}

func (e *DDLExec) executeDropTableOrView(s *ast.DropTableStmt) error {
type objectType int

const (
tableObject objectType = iota
viewObject
sequenceObject
)

func (e *DDLExec) executeDropTable(s *ast.DropTableStmt) error {
return e.dropTableObject(s.Tables, tableObject, s.IfExists)
}

func (e *DDLExec) executeDropView(s *ast.DropTableStmt) error {
return e.dropTableObject(s.Tables, viewObject, s.IfExists)
}

func (e *DDLExec) executeDropSequence(s *ast.DropSequenceStmt) error {
return e.dropTableObject(s.Sequences, sequenceObject, s.IfExists)
}

// dropTableObject actually applies to `tableObject`, `viewObject` and `sequenceObject`.
func (e *DDLExec) dropTableObject(objects []*ast.TableName, obt objectType, ifExists bool) error {
var notExistTables []string
for _, tn := range s.Tables {
for _, tn := range objects {
fullti := ast.Ident{Schema: tn.Schema, Name: tn.Name}
_, ok := e.is.SchemaByName(tn.Schema)
if !ok {
Expand All @@ -276,7 +303,7 @@ func (e *DDLExec) executeDropTableOrView(s *ast.DropTableStmt) error {
return errors.Errorf("Drop tidb system table '%s.%s' is forbidden", tn.Schema.L, tn.Name.L)
}

if config.CheckTableBeforeDrop {
if obt == tableObject && config.CheckTableBeforeDrop {
logutil.BgLogger().Warn("admin check table before drop",
zap.String("database", fullti.Schema.O),
zap.String("table", fullti.Name.O),
Expand All @@ -287,19 +314,24 @@ func (e *DDLExec) executeDropTableOrView(s *ast.DropTableStmt) error {
return err
}
}

if s.IsView {
err = domain.GetDomain(e.ctx).DDL().DropView(e.ctx, fullti)
} else {
switch obt {
case tableObject:
err = domain.GetDomain(e.ctx).DDL().DropTable(e.ctx, fullti)
case viewObject:
err = domain.GetDomain(e.ctx).DDL().DropView(e.ctx, fullti)
case sequenceObject:
err = domain.GetDomain(e.ctx).DDL().DropSequence(e.ctx, fullti, ifExists)
}
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
notExistTables = append(notExistTables, fullti.String())
} else if err != nil {
return err
}
}
if len(notExistTables) > 0 && !s.IfExists {
if len(notExistTables) > 0 && !ifExists {
if obt == sequenceObject {
return infoschema.ErrSequenceDropExists.GenWithStackByArgs(strings.Join(notExistTables, ","))
}
return infoschema.ErrTableDropExists.GenWithStackByArgs(strings.Join(notExistTables, ","))
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
switch diff.Type {
case model.ActionCreateTable, model.ActionCreateSequence, model.ActionRecoverTable, model.ActionRepairTable:
newTableID = diff.TableID
case model.ActionDropTable, model.ActionDropView:
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
oldTableID = diff.TableID
case model.ActionTruncateTable:
oldTableID = diff.OldTableID
Expand Down
3 changes: 3 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ var (
ErrTableExists = terror.ClassSchema.New(mysql.ErrTableExists, mysql.MySQLErrName[mysql.ErrTableExists])
// ErrTableDropExists returns for dropping a non-existent table.
ErrTableDropExists = terror.ClassSchema.New(mysql.ErrBadTable, mysql.MySQLErrName[mysql.ErrBadTable])
// ErrSequenceDropExists returns for dropping a non-exist sequence.
ErrSequenceDropExists = terror.ClassSchema.New(mysql.ErrUnknownSequence, mysql.MySQLErrName[mysql.ErrUnknownSequence])
// ErrColumnNotExists returns for column not exists.
ErrColumnNotExists = terror.ClassSchema.New(mysql.ErrBadField, mysql.MySQLErrName[mysql.ErrBadField])
// ErrColumnExists returns for column already exists.
Expand Down Expand Up @@ -342,6 +344,7 @@ func init() {
mysql.ErrBadUser: mysql.ErrBadUser,
mysql.ErrUserAlreadyExists: mysql.ErrUserAlreadyExists,
mysql.ErrTableLocked: mysql.ErrTableLocked,
mysql.ErrUnknownSequence: mysql.ErrUnknownSequence,
}
terror.ErrClassToMySQLCodes[terror.ClassSchema] = schemaMySQLErrCodes

Expand Down
11 changes: 11 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,17 @@ func (m *Meta) DropDatabase(dbID int64) error {
return nil
}

// DropSequence drops sequence in database.
// Sequence is made of table struct and kv value pair.
func (m *Meta) DropSequence(dbID int64, tblID int64, delAutoID bool) error {
err := m.DropTableOrView(dbID, tblID, delAutoID)
if err != nil {
return err
}
err = m.txn.HDel(m.dbKey(dbID), m.sequenceKey(tblID))
return errors.Trace(err)
}

// DropTableOrView drops table in database.
// If delAutoID is true, it will delete the auto_increment id key-value of the table.
// For rename table, we do not need to rename auto_increment id key-value.
Expand Down
9 changes: 9 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2569,6 +2569,15 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, tableVal.Schema.L,
tableVal.Name.L, "", authErr)
}
case *ast.DropSequenceStmt:
for _, sequence := range v.Sequences {
if b.ctx.GetSessionVars().User != nil {
authErr = ErrTableaccessDenied.GenWithStackByArgs("DROP", b.ctx.GetSessionVars().User.Hostname,
b.ctx.GetSessionVars().User.Username, sequence.Name.L)
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, sequence.Schema.L,
sequence.Name.L, "", authErr)
}
case *ast.TruncateTableStmt:
if b.ctx.GetSessionVars().User != nil {
authErr = ErrTableaccessDenied.GenWithStackByArgs("DROP", b.ctx.GetSessionVars().User.Hostname,
Expand Down
13 changes: 12 additions & 1 deletion planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
case *ast.CreateSequenceStmt:
p.flag |= inCreateOrDropTable
p.resolveCreateSequenceStmt(node)
case *ast.DropSequenceStmt:
p.flag |= inCreateOrDropTable
p.checkDropSequenceGrammar(node)
default:
p.flag &= ^parentIsJoin
}
Expand Down Expand Up @@ -453,8 +456,16 @@ func (p *preprocessor) checkCreateViewGrammar(stmt *ast.CreateViewStmt) {
}
}

func (p *preprocessor) checkDropSequenceGrammar(stmt *ast.DropSequenceStmt) {
p.checkDropTableNames(stmt.Sequences)
}

func (p *preprocessor) checkDropTableGrammar(stmt *ast.DropTableStmt) {
for _, t := range stmt.Tables {
p.checkDropTableNames(stmt.Tables)
}

func (p *preprocessor) checkDropTableNames(tables []*ast.TableName) {
for _, t := range tables {
if isIncorrectName(t.Name.String()) {
p.err = ddl.ErrWrongTableName.GenWithStackByArgs(t.Name.String())
return
Expand Down
2 changes: 1 addition & 1 deletion util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func IsJobRollbackable(job *model.Job) bool {
job.SchemaState == model.StateWriteOnly {
return false
}
case model.ActionDropSchema, model.ActionDropTable:
case model.ActionDropSchema, model.ActionDropTable, model.ActionDropSequence:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == model.StateWriteOnly ||
job.SchemaState == model.StateDeleteOnly {
Expand Down

0 comments on commit b25c824

Please sign in to comment.