Skip to content

Commit

Permalink
ddl: only use one schema version for the non-revertible step (#36262)
Browse files Browse the repository at this point in the history
ref #14766
  • Loading branch information
tangenta authored Jul 18, 2022
1 parent 67be460 commit 8b30e52
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 18 deletions.
4 changes: 2 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
if err != nil {
if ifNotExists && infoschema.ErrColumnExists.Equal(err) {
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
job.State = model.JobStateDone
return ver, nil
}
return ver, errors.Trace(err)
Expand Down Expand Up @@ -230,7 +230,7 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
// Convert the "not exists" error to a warning.
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.State = model.JobStateDone
return ver, nil
}
return ver, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func checkMultiSchemaSpecs(_sctx sessionctx.Context, specs []*ast.DatabaseOption
for _, spec := range specs {
if spec.Tp == ast.DatabaseSetTiFlashReplica {
if hasSetTiFlashReplica {
return dbterror.ErrRunMultiSchemaChanges
return dbterror.ErrRunMultiSchemaChanges.FastGenByArgs(model.ActionSetTiFlashReplica.String())
}
hasSetTiFlashReplica = true
}
Expand Down
8 changes: 4 additions & 4 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,10 +783,10 @@ func TestAlterDatabaseErrorGrammar(t *testing.T) {
defer tear()

tk := testkit.NewTestKit(t, store)
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change for set tiflash replica")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change for set tiflash replica")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change for set tiflash replica")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change for set tiflash replica")
}

func TestAlterDatabaseBasic(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.State = model.JobStateDone
return ver, nil
}
return ver, errors.Trace(err)
Expand Down
34 changes: 27 additions & 7 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
if err != nil {
return ver, err
}
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
return ver, nil
}
// The last rollback/cancelling sub-job is done.
Expand All @@ -95,7 +95,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
handleRevertibleException(job, sub, proxyJob.Error)
return ver, err
}
Expand All @@ -106,16 +106,23 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
if err != nil {
return ver, err
}
var schemaVersionGenerated = false
subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs))
// Step the sub-jobs to the non-revertible states all at once.
// We only generate 1 schema version for these sub-job.
for i, sub := range job.MultiSchemaInfo.SubJobs {
if sub.IsFinished() {
continue
}
subJobs[i] = *sub
proxyJob := sub.ToProxyJob(job)
if schemaVersionGenerated {
proxyJob.MultiSchemaInfo.SkipVersion = true
} else {
schemaVersionGenerated = true
}
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
if err != nil || proxyJob.Error != nil {
for j := i - 1; j >= 0; j-- {
job.MultiSchemaInfo.SubJobs[j] = &subJobs[j]
Expand All @@ -137,11 +144,10 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
return ver, err
}
job.State = model.JobStateDone
return ver, err
return finishMultiSchemaJob(job, t)
}

func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror.Error) {
Expand Down Expand Up @@ -252,7 +258,7 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
info.AlterIndexes = append(info.AlterIndexes, idxName)
case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate:
default:
return dbterror.ErrRunMultiSchemaChanges
return dbterror.ErrRunMultiSchemaChanges.FastGenByArgs(job.Type.String())
}
return nil
}
Expand Down Expand Up @@ -358,3 +364,17 @@ func rollingBackMultiSchemaChange(job *model.Job) error {
job.State = model.JobStateRollingback
return dbterror.ErrCancelledDDLJob
}

func finishMultiSchemaJob(job *model.Job, t *meta.Meta) (ver int64, err error) {
for _, sub := range job.MultiSchemaInfo.SubJobs {
if ver < sub.SchemaVer {
ver = sub.SchemaVer
}
}
tblInfo, err := t.GetTable(job.SchemaID, job.TableID)
if err != nil {
return ver, err
}
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
return ver, err
}
11 changes: 11 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,17 @@ func TestMultiSchemaChangeNoSubJobs(t *testing.T) {
require.Equal(t, "create table", rs[0][3])
}

func TestMultiSchemaChangeUnsupportedType(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

tk.MustExec("create table t (a int, b int);")
tk.MustGetErrMsg("alter table t add column c int, auto_id_cache = 1;",
"[ddl:8200]Unsupported multi schema change for modify auto id cache")
}

type cancelOnceHook struct {
store kv.Storage
triggered bool
Expand Down
2 changes: 1 addition & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ func updateVersionAndTableInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo
default:
}
})
if shouldUpdateVer {
if shouldUpdateVer && (job.MultiSchemaInfo == nil || !job.MultiSchemaInfo.SkipVersion) {
ver, err = updateSchemaVersion(d, t, job)
if err != nil {
return 0, errors.Trace(err)
Expand Down
7 changes: 6 additions & 1 deletion parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ type MultiSchemaInfo struct {
SubJobs []*SubJob `json:"sub_jobs"`
Revertible bool `json:"revertible"`

// SkipVersion is used to control whether generating a new schema version for a sub-job.
SkipVersion bool `json:"-"`

AddColumns []CIStr `json:"-"`
DropColumns []CIStr `json:"-"`
ModifyColumns []CIStr `json:"-"`
Expand Down Expand Up @@ -289,6 +292,7 @@ type SubJob struct {
RowCount int64 `json:"row_count"`
Warning *terror.Error `json:"warning"`
CtxVars []interface{} `json:"-"`
SchemaVer int64 `json:"schema_version"`
}

// IsNormal returns true if the sub-job is normally running.
Expand Down Expand Up @@ -342,14 +346,15 @@ func (sub *SubJob) ToProxyJob(parentJob *Job) Job {
}

// FromProxyJob converts a proxy job to a sub-job.
func (sub *SubJob) FromProxyJob(proxyJob *Job) {
func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) {
sub.Revertible = proxyJob.MultiSchemaInfo.Revertible
sub.SchemaState = proxyJob.SchemaState
sub.SnapshotVer = proxyJob.SnapshotVer
sub.Args = proxyJob.Args
sub.State = proxyJob.State
sub.Warning = proxyJob.Warning
sub.RowCount = proxyJob.RowCount
sub.SchemaVer = ver
}

// Job is for a DDL operation.
Expand Down
2 changes: 1 addition & 1 deletion util/dbterror/ddl_terror.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
// ErrCancelledDDLJob means the DDL job is cancelled.
ErrCancelledDDLJob = ClassDDL.NewStd(mysql.ErrCancelledDDLJob)
// ErrRunMultiSchemaChanges means we run multi schema changes.
ErrRunMultiSchemaChanges = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "multi schema change"), nil))
ErrRunMultiSchemaChanges = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "multi schema change for %s"), nil))
// ErrOperateSameColumn means we change the same columns multiple times in a DDL.
ErrOperateSameColumn = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "operate same column '%s'"), nil))
// ErrOperateSameIndex means we change the same indexes multiple times in a DDL.
Expand Down

0 comments on commit 8b30e52

Please sign in to comment.