Skip to content

Commit

Permalink
parser: turn some functions into member functions of model.Job (#34175)
Browse files Browse the repository at this point in the history
ref #31716
  • Loading branch information
hawkingrei authored Apr 24, 2022
1 parent 161e835 commit ffa4ba9
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 59 deletions.
2 changes: 1 addition & 1 deletion ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestCancel(t *testing.T) {

hookFunc := func(job *model.Job) {
if job.SchemaState == allTestCase[i].cancelState && !cancel {
if job.SchemaState == model.StateWriteReorganization && ddl.MayNeedReorg(job) && job.RowCount == 0 {
if job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 {
return
}
rs := tkCancel.MustQuery(fmt.Sprintf("admin cancel ddl jobs %d", job.ID))
Expand Down
18 changes: 1 addition & 17 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,29 +561,13 @@ func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) {
}
}

// MayNeedReorg indicates that this job may need to reorganize the data.
func MayNeedReorg(job *model.Job) bool {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
return true
case model.ActionModifyColumn:
if len(job.CtxVars) > 0 {
needReorg, ok := job.CtxVars[0].(bool)
return ok && needReorg
}
return false
default:
return false
}
}

func (d *ddl) asyncNotifyWorker(job *model.Job) {
// If the workers don't run, we needn't notify workers.
if !RunWorker {
return
}
var worker *worker
if MayNeedReorg(job) {
if job.MayNeedReorg() {
worker = d.workers[addIdxWorker]
} else {
worker = d.workers[generalWorker]
Expand Down
7 changes: 3 additions & 4 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
pumpcli "github.com/pingcap/tidb/tidb-binlog/pump_client"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/resourcegrouptag"
Expand Down Expand Up @@ -240,7 +239,7 @@ func buildJobDependence(t *meta.Meta, curJob *model.Job) error {
// Jobs in the same queue are ordered. If we want to find a job's dependency-job, we need to look for
// it from the other queue. So if the job is "ActionAddIndex" job, we need find its dependency-job from DefaultJobList.
jobListKey := meta.DefaultJobListKey
if !MayNeedReorg(curJob) {
if !curJob.MayNeedReorg() {
jobListKey = meta.AddIndexJobListKey
}
jobs, err := t.GetAllDDLJobsInQueue(jobListKey)
Expand Down Expand Up @@ -306,7 +305,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) {
return errors.Trace(err)
}
jobListKey := meta.DefaultJobListKey
if MayNeedReorg(job) {
if job.MayNeedReorg() {
jobListKey = meta.AddIndexJobListKey
}
failpoint.Inject("MockModifyJobArg", func(val failpoint.Value) {
Expand Down Expand Up @@ -800,7 +799,7 @@ func (w *worker) countForError(err error, job *model.Job) error {
logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1))
}
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) {
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && job.IsRollbackable() {
logutil.Logger(w.logCtx).Warn("[ddl] DDL job error count exceed the limit, cancelling it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit()))
job.State = model.JobStateCancelling
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func rollingbackDropIndexes(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
switch indexInfo.State {
case model.StateWriteOnly, model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone:
// We can not rollback now, so just continue to drop index.
// Normally won't fetch here, because there is a check when canceling DDL jobs. See function: IsJobRollbackable.
// Normally won't fetch here, because there is a check when canceling DDL jobs. See function: IsRollbackable.
job.State = model.JobStateRunning
return ver, nil
case model.StatePublic:
Expand Down
50 changes: 50 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,56 @@ func (job *Job) NotStarted() bool {
return job.State == JobStateNone || job.State == JobStateQueueing
}

// MayNeedReorg indicates that this job may need to reorganize the data.
func (job *Job) MayNeedReorg() bool {
switch job.Type {
case ActionAddIndex, ActionAddPrimaryKey:
return true
case ActionModifyColumn:
if len(job.CtxVars) > 0 {
needReorg, ok := job.CtxVars[0].(bool)
return ok && needReorg
}
return false
default:
return false
}
}

// IsRollbackable checks whether the job can be rollback.
func (job *Job) IsRollbackable() bool {
switch job.Type {
case ActionDropIndex, ActionDropPrimaryKey, ActionDropIndexes:
// We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization or StateWriteOnly, otherwise there will be an inconsistent issue between record and index.
// In WriteOnly state, we can rollback for normal index but can't rollback for expression index(need to drop hidden column). Since we can't
// know the type of index here, we consider all indices except primary index as non-rollbackable.
// TODO: distinguish normal index and expression index so that we can rollback `DropIndex` for normal index in WriteOnly state.
// TODO: make DropPrimaryKey rollbackable in WriteOnly, it need to deal with some tests.
if job.SchemaState == StateDeleteOnly ||
job.SchemaState == StateDeleteReorganization ||
job.SchemaState == StateWriteOnly {
return false
}
case ActionDropSchema, ActionDropTable, ActionDropSequence:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == StateWriteOnly ||
job.SchemaState == StateDeleteOnly {
return false
}
case ActionAddTablePartition:
return job.SchemaState == StateNone || job.SchemaState == StateReplicaOnly
case ActionDropColumn, ActionDropColumns, ActionDropTablePartition,
ActionRebaseAutoID, ActionShardRowID,
ActionTruncateTable, ActionAddForeignKey,
ActionDropForeignKey, ActionRenameTable,
ActionModifyTableCharsetAndCollate, ActionTruncateTablePartition,
ActionModifySchemaCharsetAndCollate, ActionRepairTable,
ActionModifyTableAutoIdCache, ActionModifySchemaDefaultPlacement:
return job.SchemaState == StateNone
}
return true
}

// JobState is for job state.
type JobState byte

Expand Down
36 changes: 1 addition & 35 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,40 +89,6 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
return info, nil
}

// IsJobRollbackable checks whether the job can be rollback.
func IsJobRollbackable(job *model.Job) bool {
switch job.Type {
case model.ActionDropIndex, model.ActionDropPrimaryKey, model.ActionDropIndexes:
// We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization or StateWriteOnly, otherwise there will be an inconsistent issue between record and index.
// In WriteOnly state, we can rollback for normal index but can't rollback for expression index(need to drop hidden column). Since we can't
// know the type of index here, we consider all indices except primary index as non-rollbackable.
// TODO: distinguish normal index and expression index so that we can rollback `DropIndex` for normal index in WriteOnly state.
// TODO: make DropPrimaryKey rollbackable in WriteOnly, it need to deal with some tests.
if job.SchemaState == model.StateDeleteOnly ||
job.SchemaState == model.StateDeleteReorganization ||
job.SchemaState == model.StateWriteOnly {
return false
}
case model.ActionDropSchema, model.ActionDropTable, model.ActionDropSequence:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == model.StateWriteOnly ||
job.SchemaState == model.StateDeleteOnly {
return false
}
case model.ActionAddTablePartition:
return job.SchemaState == model.StateNone || job.SchemaState == model.StateReplicaOnly
case model.ActionDropColumn, model.ActionDropColumns, model.ActionDropTablePartition,
model.ActionRebaseAutoID, model.ActionShardRowID,
model.ActionTruncateTable, model.ActionAddForeignKey,
model.ActionDropForeignKey, model.ActionRenameTable,
model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition,
model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable,
model.ActionModifyTableAutoIdCache, model.ActionModifySchemaDefaultPlacement:
return job.SchemaState == model.StateNone
}
return true
}

// CancelJobs cancels the DDL jobs.
func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if len(ids) == 0 {
Expand Down Expand Up @@ -162,7 +128,7 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
continue
}
if !IsJobRollbackable(job) {
if !job.IsRollbackable() {
errs[i] = ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion util/admin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func TestIsJobRollbackable(t *testing.T) {
for _, ca := range cases {
job.Type = ca.tp
job.SchemaState = ca.state
re := IsJobRollbackable(job)
re := job.IsRollbackable()
require.Equal(t, ca.result, re)
}
}
Expand Down

0 comments on commit ffa4ba9

Please sign in to comment.