From ffa4ba91e3c7d40932e5f6009566b72b8a41c9c4 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Sun, 24 Apr 2022 14:42:49 +0800 Subject: [PATCH] parser: turn some functions into member functions of model.Job (#34175) ref pingcap/tidb#31716 --- ddl/cancel_test.go | 2 +- ddl/ddl.go | 18 +-------------- ddl/ddl_worker.go | 7 +++--- ddl/rollingback.go | 2 +- parser/model/ddl.go | 50 ++++++++++++++++++++++++++++++++++++++++ util/admin/admin.go | 36 +---------------------------- util/admin/admin_test.go | 2 +- 7 files changed, 58 insertions(+), 59 deletions(-) diff --git a/ddl/cancel_test.go b/ddl/cancel_test.go index df2dc7bad97b8..fddd7fb220d85 100644 --- a/ddl/cancel_test.go +++ b/ddl/cancel_test.go @@ -198,7 +198,7 @@ func TestCancel(t *testing.T) { hookFunc := func(job *model.Job) { if job.SchemaState == allTestCase[i].cancelState && !cancel { - if job.SchemaState == model.StateWriteReorganization && ddl.MayNeedReorg(job) && job.RowCount == 0 { + if job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 { return } rs := tkCancel.MustQuery(fmt.Sprintf("admin cancel ddl jobs %d", job.ID)) diff --git a/ddl/ddl.go b/ddl/ddl.go index ed17af0ecede4..c825e6935cf87 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -561,29 +561,13 @@ func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) { } } -// MayNeedReorg indicates that this job may need to reorganize the data. -func MayNeedReorg(job *model.Job) bool { - switch job.Type { - case model.ActionAddIndex, model.ActionAddPrimaryKey: - return true - case model.ActionModifyColumn: - if len(job.CtxVars) > 0 { - needReorg, ok := job.CtxVars[0].(bool) - return ok && needReorg - } - return false - default: - return false - } -} - func (d *ddl) asyncNotifyWorker(job *model.Job) { // If the workers don't run, we needn't notify workers. if !RunWorker { return } var worker *worker - if MayNeedReorg(job) { + if job.MayNeedReorg() { worker = d.workers[addIdxWorker] } else { worker = d.workers[generalWorker] diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 82c707a6333b8..57058eb38a205 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" pumpcli "github.com/pingcap/tidb/tidb-binlog/pump_client" tidbutil "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/resourcegrouptag" @@ -240,7 +239,7 @@ func buildJobDependence(t *meta.Meta, curJob *model.Job) error { // Jobs in the same queue are ordered. If we want to find a job's dependency-job, we need to look for // it from the other queue. So if the job is "ActionAddIndex" job, we need find its dependency-job from DefaultJobList. jobListKey := meta.DefaultJobListKey - if !MayNeedReorg(curJob) { + if !curJob.MayNeedReorg() { jobListKey = meta.AddIndexJobListKey } jobs, err := t.GetAllDDLJobsInQueue(jobListKey) @@ -306,7 +305,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { return errors.Trace(err) } jobListKey := meta.DefaultJobListKey - if MayNeedReorg(job) { + if job.MayNeedReorg() { jobListKey = meta.AddIndexJobListKey } failpoint.Inject("MockModifyJobArg", func(val failpoint.Value) { @@ -800,7 +799,7 @@ func (w *worker) countForError(err error, job *model.Job) error { logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1)) } // Check error limit to avoid falling into an infinite loop. - if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) { + if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && job.IsRollbackable() { logutil.Logger(w.logCtx).Warn("[ddl] DDL job error count exceed the limit, cancelling it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit())) job.State = model.JobStateCancelling } diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 7b176b75a2d08..83304b2bb7ef7 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -303,7 +303,7 @@ func rollingbackDropIndexes(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, switch indexInfo.State { case model.StateWriteOnly, model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone: // We can not rollback now, so just continue to drop index. - // Normally won't fetch here, because there is a check when canceling DDL jobs. See function: IsJobRollbackable. + // Normally won't fetch here, because there is a check when canceling DDL jobs. See function: IsRollbackable. job.State = model.JobStateRunning return ver, nil case model.StatePublic: diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 6b6ecda4a3453..2c5d7b33f5bf2 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -511,6 +511,56 @@ func (job *Job) NotStarted() bool { return job.State == JobStateNone || job.State == JobStateQueueing } +// MayNeedReorg indicates that this job may need to reorganize the data. +func (job *Job) MayNeedReorg() bool { + switch job.Type { + case ActionAddIndex, ActionAddPrimaryKey: + return true + case ActionModifyColumn: + if len(job.CtxVars) > 0 { + needReorg, ok := job.CtxVars[0].(bool) + return ok && needReorg + } + return false + default: + return false + } +} + +// IsRollbackable checks whether the job can be rollback. +func (job *Job) IsRollbackable() bool { + switch job.Type { + case ActionDropIndex, ActionDropPrimaryKey, ActionDropIndexes: + // We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization or StateWriteOnly, otherwise there will be an inconsistent issue between record and index. + // In WriteOnly state, we can rollback for normal index but can't rollback for expression index(need to drop hidden column). Since we can't + // know the type of index here, we consider all indices except primary index as non-rollbackable. + // TODO: distinguish normal index and expression index so that we can rollback `DropIndex` for normal index in WriteOnly state. + // TODO: make DropPrimaryKey rollbackable in WriteOnly, it need to deal with some tests. + if job.SchemaState == StateDeleteOnly || + job.SchemaState == StateDeleteReorganization || + job.SchemaState == StateWriteOnly { + return false + } + case ActionDropSchema, ActionDropTable, ActionDropSequence: + // To simplify the rollback logic, cannot be canceled in the following states. + if job.SchemaState == StateWriteOnly || + job.SchemaState == StateDeleteOnly { + return false + } + case ActionAddTablePartition: + return job.SchemaState == StateNone || job.SchemaState == StateReplicaOnly + case ActionDropColumn, ActionDropColumns, ActionDropTablePartition, + ActionRebaseAutoID, ActionShardRowID, + ActionTruncateTable, ActionAddForeignKey, + ActionDropForeignKey, ActionRenameTable, + ActionModifyTableCharsetAndCollate, ActionTruncateTablePartition, + ActionModifySchemaCharsetAndCollate, ActionRepairTable, + ActionModifyTableAutoIdCache, ActionModifySchemaDefaultPlacement: + return job.SchemaState == StateNone + } + return true +} + // JobState is for job state. type JobState byte diff --git a/util/admin/admin.go b/util/admin/admin.go index 8bb29e142f190..99309122d579a 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -89,40 +89,6 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) { return info, nil } -// IsJobRollbackable checks whether the job can be rollback. -func IsJobRollbackable(job *model.Job) bool { - switch job.Type { - case model.ActionDropIndex, model.ActionDropPrimaryKey, model.ActionDropIndexes: - // We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization or StateWriteOnly, otherwise there will be an inconsistent issue between record and index. - // In WriteOnly state, we can rollback for normal index but can't rollback for expression index(need to drop hidden column). Since we can't - // know the type of index here, we consider all indices except primary index as non-rollbackable. - // TODO: distinguish normal index and expression index so that we can rollback `DropIndex` for normal index in WriteOnly state. - // TODO: make DropPrimaryKey rollbackable in WriteOnly, it need to deal with some tests. - if job.SchemaState == model.StateDeleteOnly || - job.SchemaState == model.StateDeleteReorganization || - job.SchemaState == model.StateWriteOnly { - return false - } - case model.ActionDropSchema, model.ActionDropTable, model.ActionDropSequence: - // To simplify the rollback logic, cannot be canceled in the following states. - if job.SchemaState == model.StateWriteOnly || - job.SchemaState == model.StateDeleteOnly { - return false - } - case model.ActionAddTablePartition: - return job.SchemaState == model.StateNone || job.SchemaState == model.StateReplicaOnly - case model.ActionDropColumn, model.ActionDropColumns, model.ActionDropTablePartition, - model.ActionRebaseAutoID, model.ActionShardRowID, - model.ActionTruncateTable, model.ActionAddForeignKey, - model.ActionDropForeignKey, model.ActionRenameTable, - model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition, - model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable, - model.ActionModifyTableAutoIdCache, model.ActionModifySchemaDefaultPlacement: - return job.SchemaState == model.StateNone - } - return true -} - // CancelJobs cancels the DDL jobs. func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { if len(ids) == 0 { @@ -162,7 +128,7 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() { continue } - if !IsJobRollbackable(job) { + if !job.IsRollbackable() { errs[i] = ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID) continue } diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index de8d63188fa45..4e8b453297bb6 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -346,7 +346,7 @@ func TestIsJobRollbackable(t *testing.T) { for _, ca := range cases { job.Type = ca.tp job.SchemaState = ca.state - re := IsJobRollbackable(job) + re := job.IsRollbackable() require.Equal(t, ca.result, re) } }