Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix the covert job to rollingback job (#23903) #24080

Merged
merged 3 commits into from
Apr 16, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
cherry pick #23903 to release-5.0
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
AilinKid authored and ti-srebot committed Apr 16, 2021
commit efa9d9cae63a9648f1c0e4bc8985ee7555bc1229
67 changes: 40 additions & 27 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
@@ -17,11 +17,13 @@ import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
@@ -40,8 +42,11 @@ func updateColsNull2NotNull(tblInfo *model.TableInfo, indexInfo *model.IndexInfo
}

func convertAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) {
job.State = model.JobStateRollingback

failpoint.Inject("mockConvertAddIdxJob2RollbackJobError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(0, errors.New("mock convert add index job to rollback job error"))
}
})
if indexInfo.Primary {
nullCols, err := getNullColInfos(tblInfo, indexInfo)
if err != nil {
@@ -68,7 +73,7 @@ func convertAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.T
if err1 != nil {
return ver, errors.Trace(err1)
}

job.State = model.JobStateRollingback
return ver, errors.Trace(err)
}

@@ -139,7 +144,6 @@ func rollingbackModifyColumn(t *meta.Meta, job *model.Job) (ver int64, err error
}

func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
job.State = model.JobStateRollingback
tblInfo, columnInfo, col, _, _, err := checkAddColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
@@ -158,11 +162,12 @@ func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
if err != nil {
return ver, errors.Trace(err)
}

job.State = model.JobStateRollingback
return ver, errCancelledDDLJob
}

func rollingbackAddColumns(t *meta.Meta, job *model.Job) (ver int64, err error) {
job.State = model.JobStateRollingback
tblInfo, columnInfos, _, _, _, _, err := checkAddColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
@@ -186,11 +191,12 @@ func rollingbackAddColumns(t *meta.Meta, job *model.Job) (ver int64, err error)
if err != nil {
return ver, errors.Trace(err)
}
job.State = model.JobStateRollingback
return ver, errCancelledDDLJob
}

func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, colInfo, idxInfos, err := checkDropColumn(t, job)
_, colInfo, idxInfos, err := checkDropColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}
@@ -213,7 +219,6 @@ func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error)
// StatePublic means when the job is not running yet.
if colInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}
// In the state of drop column `write only -> delete only -> reorganization`,
@@ -223,7 +228,7 @@ func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error)
}

func rollingbackDropColumns(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, colInfos, _, idxInfos, err := checkDropColumns(t, job)
_, colInfos, _, idxInfos, err := checkDropColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
@@ -246,7 +251,6 @@ func rollingbackDropColumns(t *meta.Meta, job *model.Job) (ver int64, err error)
// StatePublic means when the job is not running yet.
if colInfos[0].State == model.StatePublic {
job.State = model.JobStateCancelled
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}
// In the state of drop columns `write only -> delete only -> reorganization`,
@@ -256,33 +260,23 @@ func rollingbackDropColumns(t *meta.Meta, job *model.Job) (ver int64, err error)
}

func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, indexInfo, err := checkDropIndex(t, job)
_, indexInfo, err := checkDropIndex(t, job)
if err != nil {
return ver, errors.Trace(err)
}

originalState := indexInfo.State
switch indexInfo.State {
case model.StateWriteOnly, model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone:
// We can not rollback now, so just continue to drop index.
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
job.State = model.JobStateRunning
return ver, nil
case model.StatePublic:
job.State = model.JobStateRollbackDone
indexInfo.State = model.StatePublic
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
default:
return ver, ErrInvalidDDLState.GenWithStackByArgs("index", indexInfo.State)
}

job.SchemaState = indexInfo.State
job.Args = []interface{}{indexInfo.Name}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}

func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) {
@@ -300,7 +294,6 @@ func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isP
}

func convertAddTablePartitionJob2RollbackJob(t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) {
job.State = model.JobStateRollingback
addingDefinitions := tblInfo.Partition.AddingDefinitions
partNames := make([]string, 0, len(addingDefinitions))
for _, pd := range addingDefinitions {
@@ -311,6 +304,7 @@ func convertAddTablePartitionJob2RollbackJob(t *meta.Meta, job *model.Job, other
if err != nil {
return ver, errors.Trace(err)
}
job.State = model.JobStateRollingback
return ver, errors.Trace(otherwiseErr)
}

@@ -447,12 +441,31 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
if job.Error == nil {
job.Error = toTError(err)
}
if !job.Error.Equal(errCancelledDDLJob) {
job.Error = terror.GetErrClass(job.Error).Synthesize(terror.ErrCode(job.Error.Code()),
fmt.Sprintf("DDL job rollback, error msg: %s", terror.ToSQLError(job.Error).Message))
}
job.ErrorCount++

if errCancelledDDLJob.Equal(err) {
// The job is normally cancelled.
if !job.Error.Equal(errCancelledDDLJob) {
job.Error = terror.GetErrClass(job.Error).Synthesize(terror.ErrCode(job.Error.Code()),
fmt.Sprintf("DDL job rollback, error msg: %s", terror.ToSQLError(job.Error).Message))
}
} else {
// A job canceling meet other error.
//
// Once `convertJob2RollbackJob` meets an error, the job state can't be set as `JobStateRollingback` since
// job state and args may not be correctly overwritten. The job will be fetched to run with the cancelling
// state again. So we should check the error count here.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1))
}
errorCount := variable.GetDDLErrorCountLimit()
if job.ErrorCount > errorCount {
logutil.Logger(w.logCtx).Warn("[ddl] rollback DDL job error count exceed the limit, cancelled it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", errorCount))
job.Error = toTError(errors.Errorf("rollback DDL job error count exceed the limit %d, cancelled it now", errorCount))
job.State = model.JobStateCancelled
}
}

if job.State != model.JobStateRollingback && job.State != model.JobStateCancelled {
logutil.Logger(w.logCtx).Error("[ddl] run DDL job failed", zap.String("job", job.String()), zap.Error(err))
} else {
101 changes: 101 additions & 0 deletions ddl/rollingback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (
"context"
"strconv"

. "github.com/pingcap/check"
errors2 "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/testkit"
)

var _ = SerialSuites(&testRollingBackSuite{&testDBSuite{}})

type testRollingBackSuite struct{ *testDBSuite }

// TestCancelJobMeetError is used to test canceling ddl job failure when convert ddl job to a rollingback job.
func (s *testRollingBackSuite) TestCancelAddIndexJobError(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk1 := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk1.MustExec("use test")

tk.MustExec("create table t_cancel_add_index (a int)")
tk.MustExec("insert into t_cancel_add_index values(1),(2),(3)")
tk.MustExec("set @@global.tidb_ddl_error_count_limit=3")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockConvertAddIdxJob2RollbackJobError", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockConvertAddIdxJob2RollbackJobError"), IsNil)
}()

tbl := testGetTableByName(c, tk.Se, "test", "t_cancel_add_index")
c.Assert(tbl, NotNil)

d := s.dom.DDL()
hook := &ddl.TestDDLCallback{Do: s.dom}
var (
checkErr error
jobID int64
res sqlexec.RecordSet
)
hook.OnJobUpdatedExported = func(job *model.Job) {
if job.TableID != tbl.Meta().ID {
return
}
if job.Type != model.ActionAddIndex {
return
}
if job.SchemaState == model.StateDeleteOnly {
jobID = job.ID
res, checkErr = tk1.Exec("admin cancel ddl jobs " + strconv.Itoa(int(job.ID)))
// drain the result set here, otherwise the cancel action won't take effect immediately.
chk := res.NewChunk()
if err := res.Next(context.Background(), chk); err != nil {
checkErr = err
return
}
if err := res.Close(); err != nil {
checkErr = err
}
}
}
d.(ddl.DDLForTest).SetHook(hook)

// This will hang on stateDeleteOnly, and the job will be canceled.
_, err := tk.Exec("alter table t_cancel_add_index add index idx(a)")
c.Assert(err, NotNil)
c.Assert(checkErr, IsNil)
c.Assert(err.Error(), Equals, "[ddl:-1]rollback DDL job error count exceed the limit 3, cancelled it now")

// Verification of the history job state.
var job *model.Job
err = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(jobID)
return errors2.Trace(err1)
})
c.Assert(err, IsNil)
c.Assert(job.ErrorCount, Equals, int64(4))
c.Assert(job.Error.Error(), Equals, "[ddl:-1]rollback DDL job error count exceed the limit 3, cancelled it now")
}