Skip to content

Commit

Permalink
cherry pick pingcap#23204 to release-5.0-rc
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
AilinKid authored and ti-srebot committed Mar 13, 2021
1 parent 0651e38 commit f556790
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 22 deletions.
176 changes: 176 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
package ddl_test

import (
"context"
"errors"
"time"

. "github.com/pingcap/check"
errors2 "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
parser_mysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
mysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -1602,3 +1606,175 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFlenErrorMsg(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:8200]Unsupported modify column: length 255 is less than origin 512, and tidb_enable_change_column_type is false")
}
<<<<<<< HEAD
=======

// Close issue #22395
// Background:
// Since the changing column is implemented as adding a new column and substitute the old one when it finished.
// The added column with NOT-NULL option will be fetched with error when it's origin default value is not set.
// It's good because the insert / update logic will cast the related column to changing column rather than use
// origin default value directly.
func (s *testColumnTypeChangeSuite) TestChangingColOriginDefaultValue(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
// Enable column change variable.
tk.Se.GetSessionVars().EnableChangeColumnType = true
defer func() {
tk.Se.GetSessionVars().EnableChangeColumnType = false
}()

tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t values(1, 1)")
tk.MustExec("insert into t values(2, 2)")

tbl := testGetTableByName(c, tk.Se, "test", "t")
originalHook := s.dom.DDL().GetHook()
hook := &ddl.TestDDLCallback{Do: s.dom}
var (
once bool
checkErr error
)
hook.OnJobRunBeforeExported = func(job *model.Job) {
if checkErr != nil {
return
}
if tbl.Meta().ID != job.TableID {
return
}
if job.SchemaState == model.StateWriteOnly || job.SchemaState == model.StateWriteReorganization {
if !once {
once = true
tbl := testGetTableByName(c, tk1.Se, "test", "t")
if len(tbl.WritableCols()) != 3 {
checkErr = errors.New("assert the writable column number error")
return
}
if tbl.WritableCols()[2].OriginDefaultValue.(string) != "0" {
checkErr = errors.New("assert the write only column origin default value error")
return
}
}
// For writable column:
// Insert/ Update should set the column with the casted-related column value.
_, err := tk1.Exec("insert into t values(3, 3)")
if err != nil {
checkErr = err
return
}
if job.SchemaState == model.StateWriteOnly {
// The casted value will be inserted into changing column too.
_, err := tk1.Exec("update t set b = -1 where a = 1")
if err != nil {
checkErr = err
return
}
} else {
// The casted value will be inserted into changing column too.
_, err := tk1.Exec("update t set b = -2 where a = 2")
if err != nil {
checkErr = err
return
}
}
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
tk.MustExec("alter table t modify column b tinyint NOT NULL")
s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook)
// Since getReorgInfo will stagnate StateWriteReorganization for a ddl round, so insert should exec 3 times.
tk.MustQuery("select * from t order by a").Check(testkit.Rows("1 -1", "2 -2", "3 3", "3 3", "3 3"))
tk.MustExec("drop table if exists t")
}

// Close issue #22820
func (s *testColumnTypeChangeSuite) TestChangingAttributeOfColumnWithFK(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

prepare := func() {
tk.MustExec("drop table if exists users")
tk.MustExec("drop table if exists orders")
tk.MustExec("CREATE TABLE users (id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, doc JSON);")
tk.MustExec("CREATE TABLE orders (id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, user_id INT NOT NULL, doc JSON, FOREIGN KEY fk_user_id (user_id) REFERENCES users(id));")
}

prepare()
// For column with FK, alter action can be performed for changing null/not null, default value, comment and so on, but column type.
tk.MustExec("alter table orders modify user_id int null;")
tbl := testGetTableByName(c, tk.Se, "test", "orders")
c.Assert(parser_mysql.HasNotNullFlag(tbl.Meta().Columns[1].Flag), Equals, false)

prepare()
tk.MustExec("alter table orders change user_id user_id2 int null")
tbl = testGetTableByName(c, tk.Se, "test", "orders")
c.Assert(tbl.Meta().Columns[1].Name.L, Equals, "user_id2")
c.Assert(parser_mysql.HasNotNullFlag(tbl.Meta().Columns[1].Flag), Equals, false)

prepare()
tk.MustExec("alter table orders modify user_id int default -1 comment \"haha\"")
tbl = testGetTableByName(c, tk.Se, "test", "orders")
c.Assert(tbl.Meta().Columns[1].Comment, Equals, "haha")
c.Assert(tbl.Meta().Columns[1].DefaultValue.(string), Equals, "-1")

prepare()
tk.MustGetErrCode("alter table orders modify user_id bigint", mysql.ErrFKIncompatibleColumns)

tk.MustExec("drop table if exists orders, users")
}

// Close issue #23202
func (s *testColumnTypeChangeSuite) TestDDLExitWhenCancelMeetPanic(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t values(1,1),(2,2)")
tk.MustExec("alter table t add index(b)")
tk.MustExec("set @@global.tidb_ddl_error_count_limit=3")

failpoint.Enable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit", `return(true)`)
defer func() {
failpoint.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit")
}()

originalHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook)

hook := &ddl.TestDDLCallback{Do: s.dom}
var jobID int64
hook.OnJobRunBeforeExported = func(job *model.Job) {
if jobID != 0 {
return
}
if job.Type == model.ActionDropIndex {
jobID = job.ID
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)

// when it panics in write-reorg state, the job will be pulled up as a cancelling job. Since drop-index with
// write-reorg can't be cancelled, so it will be converted to running state and try again (dead loop).
_, err := tk.Exec("alter table t drop index b")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled")
c.Assert(jobID > 0, Equals, true)

// Verification of the history job state.
var job *model.Job
err = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(jobID)
return errors2.Trace(err1)
})
c.Assert(err, IsNil)
c.Assert(job.ErrorCount, Equals, int64(4))
c.Assert(job.Error.Error(), Equals, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled")
}
>>>>>>> 46f5f2177... ddl: fix ddl hang over when it meets panic in cancelling path (#23204)
69 changes: 47 additions & 22 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,12 +585,55 @@ func chooseLeaseTime(t, max time.Duration) time.Duration {
return t
}

// countForPanic records the error count for DDL job.
func (w *worker) countForPanic(job *model.Job) {
// If run DDL job panic, just cancel the DDL jobs.
job.State = model.JobStateCancelling
job.ErrorCount++

// Load global DDL variables.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1))
}
errorCount := variable.GetDDLErrorCountLimit()

if job.ErrorCount > errorCount {
msg := fmt.Sprintf("panic in handling DDL logic and error count beyond the limitation %d, cancelled", errorCount)
logutil.Logger(w.logCtx).Warn(msg)
job.Error = toTError(errors.New(msg))
job.State = model.JobStateCancelled
}
}

// countForError records the error count for DDL job.
func (w *worker) countForError(err error, job *model.Job) error {
job.Error = toTError(err)
job.ErrorCount++

// If job is cancelled, we shouldn't return an error and shouldn't load DDL variables.
if job.State == model.JobStateCancelled {
logutil.Logger(w.logCtx).Info("[ddl] DDL job is cancelled normally", zap.Error(err))
return nil
}
logutil.Logger(w.logCtx).Error("[ddl] run DDL job error", zap.Error(err))

// Load global DDL variables.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1))
}
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) {
logutil.Logger(w.logCtx).Warn("[ddl] DDL job error count exceed the limit, cancelling it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit()))
job.State = model.JobStateCancelling
}
return err
}

// runDDLJob runs a DDL job. It returns the current schema version in this transaction and the error.
func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
defer tidbutil.Recover(metrics.LabelDDLWorker, fmt.Sprintf("%s runDDLJob", w),
func() {
// If run DDL job panic, just cancel the DDL jobs.
job.State = model.JobStateCancelling
w.countForPanic(job)
}, false)

// Mock for run ddl job panic.
Expand Down Expand Up @@ -707,27 +750,9 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
err = errInvalidDDLJob.GenWithStack("invalid ddl job type: %v", job.Type)
}

// Save errors in job, so that others can know errors happened.
// Save errors in job if any, so that others can know errors happened.
if err != nil {
job.Error = toTError(err)
job.ErrorCount++

// If job is cancelled, we shouldn't return an error and shouldn't load DDL variables.
if job.State == model.JobStateCancelled {
logutil.Logger(w.logCtx).Info("[ddl] DDL job is cancelled normally", zap.Error(err))
return ver, nil
}
logutil.Logger(w.logCtx).Error("[ddl] run DDL job error", zap.Error(err))

// Load global ddl variables.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1))
}
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) {
logutil.Logger(w.logCtx).Warn("[ddl] DDL job error count exceed the limit, cancelling it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit()))
job.State = model.JobStateCancelling
}
err = w.countForError(err, job)
}
return
}
Expand Down
6 changes: 6 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,12 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Set column index flag.
dropIndexColumnFlag(tblInfo, indexInfo)

failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
if val.(bool) {
panic("panic test in cancelling add index")
}
})

tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)]

ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != model.StateNone)
Expand Down

0 comments on commit f556790

Please sign in to comment.