diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 7c39b0499719b..356e675d275b7 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -215,6 +215,7 @@ type DDL interface { ctx sessionctx.Context, schema model.CIStr, info *model.TableInfo, + involvingRef []model.InvolvingSchemaInfo, cs ...CreateTableWithInfoConfigurier) error // BatchCreateTableWithInfo is like CreateTableWithInfo, but can handle multiple tables. diff --git a/pkg/ddl/ddl_api.go b/pkg/ddl/ddl_api.go index 9e0e03cca58b7..f0e7dca192a79 100644 --- a/pkg/ddl/ddl_api.go +++ b/pkg/ddl/ddl_api.go @@ -229,6 +229,12 @@ func (d *ddl) CreateSchemaWithInfo( }}, SQLMode: ctx.GetSessionVars().SQLMode, } + if ref := dbInfo.PlacementPolicyRef; ref != nil { + job.InvolvingSchemaInfo = append(job.InvolvingSchemaInfo, model.InvolvingSchemaInfo{ + Policy: ref.Name.L, + Mode: model.SharedInvolving, + }) + } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -308,6 +314,12 @@ func (d *ddl) ModifySchemaDefaultPlacement(ctx sessionctx.Context, stmt *ast.Alt }}, SQLMode: ctx.GetSessionVars().SQLMode, } + if placementPolicyRef != nil { + job.InvolvingSchemaInfo = append(job.InvolvingSchemaInfo, model.InvolvingSchemaInfo{ + Policy: placementPolicyRef.Name.L, + Mode: model.SharedInvolving, + }) + } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) return errors.Trace(err) @@ -525,16 +537,31 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place return err } + var involvingSchemaInfo []model.InvolvingSchemaInfo + if placementPolicyRef != nil { + involvingSchemaInfo = []model.InvolvingSchemaInfo{ + { + Database: schema.Name.L, + Table: tblInfo.Name.L, + }, + { + Policy: placementPolicyRef.Name.L, + Mode: model.SharedInvolving, + }, + } + } + job := &model.Job{ - SchemaID: schema.ID, - TableID: tblInfo.ID, - SchemaName: schema.Name.L, - TableName: tblInfo.Name.L, - Type: model.ActionAlterTablePlacement, - BinlogInfo: &model.HistoryInfo{}, - CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - Args: []any{placementPolicyRef}, - SQLMode: ctx.GetSessionVars().SQLMode, + SchemaID: schema.ID, + TableID: tblInfo.ID, + SchemaName: schema.Name.L, + TableName: tblInfo.Name.L, + Type: model.ActionAlterTablePlacement, + BinlogInfo: &model.HistoryInfo{}, + CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, + Args: []any{placementPolicyRef}, + InvolvingSchemaInfo: involvingSchemaInfo, + SQLMode: ctx.GetSessionVars().SQLMode, } err = d.DoDDLJob(ctx, job) @@ -580,10 +607,10 @@ func checkMultiSchemaSpecs(_ sessionctx.Context, specs []*ast.DatabaseOption) er func (d *ddl) AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (err error) { // Resolve target charset and collation from options. var ( - toCharset, toCollate string - isAlterCharsetAndCollate, isAlterPlacement, isTiFlashReplica bool - placementPolicyRef *model.PolicyRefInfo - tiflashReplica *ast.TiFlashReplicaSpec + toCharset, toCollate string + isAlterCharsetAndCollate bool + placementPolicyRef *model.PolicyRefInfo + tiflashReplica *ast.TiFlashReplicaSpec ) err = checkMultiSchemaSpecs(sctx, stmt.Options) @@ -614,10 +641,8 @@ func (d *ddl) AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) isAlterCharsetAndCollate = true case ast.DatabaseOptionPlacementPolicy: placementPolicyRef = &model.PolicyRefInfo{Name: model.NewCIStr(val.Value)} - isAlterPlacement = true case ast.DatabaseSetTiFlashReplica: tiflashReplica = val.TiFlashReplica - isTiFlashReplica = true } } @@ -626,12 +651,12 @@ func (d *ddl) AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) return err } } - if isAlterPlacement { + if placementPolicyRef != nil { if err = d.ModifySchemaDefaultPlacement(sctx, stmt, placementPolicyRef); err != nil { return err } } - if isTiFlashReplica { + if tiflashReplica != nil { if err = d.ModifySchemaSetTiFlashReplica(sctx, stmt, tiflashReplica); err != nil { return err } @@ -2708,7 +2733,10 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) } - var referTbl table.Table + var ( + referTbl table.Table + involvingRef []model.InvolvingSchemaInfo + ) if s.ReferTable != nil { referIdent := ast.Ident{Schema: s.ReferTable.Schema, Name: s.ReferTable.Name} _, ok := is.SchemaByName(referIdent.Schema) @@ -2719,6 +2747,11 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e if err != nil { return infoschema.ErrTableNotExists.GenWithStackByArgs(referIdent.Schema, referIdent.Name) } + involvingRef = append(involvingRef, model.InvolvingSchemaInfo{ + Database: s.ReferTable.Schema.L, + Table: s.ReferTable.Name.L, + Mode: model.SharedInvolving, + }) } // build tableInfo @@ -2744,7 +2777,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e onExist = OnExistIgnore } - return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, onExist) + return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, involvingRef, onExist) } func setTemporaryType(_ sessionctx.Context, tbInfo *model.TableInfo, s *ast.CreateTableStmt) error { @@ -2771,6 +2804,7 @@ func (d *ddl) createTableWithInfoJob( ctx sessionctx.Context, dbName model.CIStr, tbInfo *model.TableInfo, + involvingRef []model.InvolvingSchemaInfo, onExist OnExist, retainID bool, ) (job *model.Job, err error) { @@ -2836,20 +2870,52 @@ func (d *ddl) createTableWithInfoJob( args = append(args, ctx.GetSessionVars().ForeignKeyChecks) } + var involvingSchemas []model.InvolvingSchemaInfo + sharedInvolvingFromTableInfo := getSharedInvolvingSchemaInfo(tbInfo) + + if sum := len(involvingRef) + len(sharedInvolvingFromTableInfo); sum > 0 { + involvingSchemas = make([]model.InvolvingSchemaInfo, 0, sum+1) + involvingSchemas = append(involvingSchemas, model.InvolvingSchemaInfo{ + Database: schema.Name.L, + Table: tbInfo.Name.L, + }) + involvingSchemas = append(involvingSchemas, involvingRef...) + involvingSchemas = append(involvingSchemas, sharedInvolvingFromTableInfo...) + } + job = &model.Job{ - SchemaID: schema.ID, - TableID: tbInfo.ID, - SchemaName: schema.Name.L, - TableName: tbInfo.Name.L, - Type: actionType, - BinlogInfo: &model.HistoryInfo{}, - Args: args, - CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - SQLMode: ctx.GetSessionVars().SQLMode, + SchemaID: schema.ID, + TableID: tbInfo.ID, + SchemaName: schema.Name.L, + TableName: tbInfo.Name.L, + Type: actionType, + BinlogInfo: &model.HistoryInfo{}, + Args: args, + CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, + InvolvingSchemaInfo: involvingSchemas, + SQLMode: ctx.GetSessionVars().SQLMode, } return job, nil } +func getSharedInvolvingSchemaInfo(info *model.TableInfo) []model.InvolvingSchemaInfo { + ret := make([]model.InvolvingSchemaInfo, 0, len(info.ForeignKeys)+1) + for _, fk := range info.ForeignKeys { + ret = append(ret, model.InvolvingSchemaInfo{ + Database: fk.RefSchema.L, + Table: fk.RefTable.L, + Mode: model.SharedInvolving, + }) + } + if ref := info.PlacementPolicyRef; ref != nil { + ret = append(ret, model.InvolvingSchemaInfo{ + Policy: ref.Name.L, + Mode: model.SharedInvolving, + }) + } + return ret +} + func (d *ddl) createTableWithInfoPost( ctx sessionctx.Context, tbInfo *model.TableInfo, @@ -2894,11 +2960,14 @@ func (d *ddl) CreateTableWithInfo( ctx sessionctx.Context, dbName model.CIStr, tbInfo *model.TableInfo, + involvingRef []model.InvolvingSchemaInfo, cs ...CreateTableWithInfoConfigurier, ) (err error) { c := GetCreateTableWithInfoConfig(cs) - job, err := d.createTableWithInfoJob(ctx, dbName, tbInfo, c.OnExist, !c.ShouldAllocTableID(tbInfo)) + job, err := d.createTableWithInfoJob( + ctx, dbName, tbInfo, involvingRef, c.OnExist, !c.ShouldAllocTableID(tbInfo), + ) if err != nil { return err } @@ -2984,7 +3053,7 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, } } - job, err := d.createTableWithInfoJob(ctx, dbName, info, c.OnExist, true) + job, err := d.createTableWithInfoJob(ctx, dbName, info, nil, c.OnExist, true) if err != nil { return errors.Trace(err) } @@ -3006,11 +3075,13 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, return errors.Trace(fmt.Errorf("except table info")) } args = append(args, info) - jobs.InvolvingSchemaInfo = append(jobs.InvolvingSchemaInfo, - model.InvolvingSchemaInfo{ + if sharedInv := getSharedInvolvingSchemaInfo(info); len(sharedInv) > 0 { + jobs.InvolvingSchemaInfo = append(jobs.InvolvingSchemaInfo, model.InvolvingSchemaInfo{ Database: dbName.L, Table: info.Name.L, }) + jobs.InvolvingSchemaInfo = append(jobs.InvolvingSchemaInfo, sharedInv...) + } } if len(args) == 0 { return nil @@ -3147,8 +3218,7 @@ func (d *ddl) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy *mode BinlogInfo: &model.HistoryInfo{}, Args: []any{policy, onExist == OnExistReplace}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ - Database: model.InvolvingNone, - Table: model.InvolvingNone, + Policy: policy.Name.L, }}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, @@ -3273,7 +3343,7 @@ func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (er } func (d *ddl) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) (err error) { - viewInfo, err := BuildViewInfo(ctx, s) + viewInfo, err := BuildViewInfo(s) if err != nil { return err } @@ -3308,11 +3378,11 @@ func (d *ddl) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) (err err onExist = OnExistReplace } - return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, onExist) + return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, nil, onExist) } // BuildViewInfo builds a ViewInfo structure from an ast.CreateViewStmt. -func BuildViewInfo(_ sessionctx.Context, s *ast.CreateViewStmt) (*model.ViewInfo, error) { +func BuildViewInfo(s *ast.CreateViewStmt) (*model.ViewInfo, error) { // Always Use `format.RestoreNameBackQuotes` to restore `SELECT` statement despite the `ANSI_QUOTES` SQL Mode is enabled or not. restoreFlag := format.RestoreStringSingleQuotes | format.RestoreKeyWordUppercase | format.RestoreNameBackQuotes var sb strings.Builder @@ -7994,7 +8064,18 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode BinlogInfo: &model.HistoryInfo{}, Args: []any{fkInfo, fkCheck}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - SQLMode: ctx.GetSessionVars().SQLMode, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{ + { + Database: schema.Name.L, + Table: t.Meta().Name.L, + }, + { + Database: fkInfo.RefSchema.L, + Table: fkInfo.RefTable.L, + Mode: model.SharedInvolving, + }, + }, + SQLMode: ctx.GetSessionVars().SQLMode, } err = d.DoDDLJob(ctx, job) @@ -8677,7 +8758,7 @@ func (d *ddl) CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStm onExist = OnExistIgnore } - return d.CreateTableWithInfo(ctx, ident.Schema, tbInfo, onExist) + return d.CreateTableWithInfo(ctx, ident.Schema, tbInfo, nil, onExist) } func (d *ddl) AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt) error { @@ -8896,16 +8977,31 @@ func (d *ddl) AlterTablePartitionPlacement(ctx sessionctx.Context, tableIdent as return errors.Trace(err) } + var involveSchemaInfo []model.InvolvingSchemaInfo + if policyRefInfo != nil { + involveSchemaInfo = []model.InvolvingSchemaInfo{ + { + Database: schema.Name.L, + Table: tblInfo.Name.L, + }, + { + Policy: policyRefInfo.Name.L, + Mode: model.SharedInvolving, + }, + } + } + job := &model.Job{ - SchemaID: schema.ID, - TableID: tblInfo.ID, - SchemaName: schema.Name.L, - TableName: tblInfo.Name.L, - Type: model.ActionAlterTablePartitionPlacement, - BinlogInfo: &model.HistoryInfo{}, - Args: []any{partitionID, policyRefInfo}, - CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - SQLMode: ctx.GetSessionVars().SQLMode, + SchemaID: schema.ID, + TableID: tblInfo.ID, + SchemaName: schema.Name.L, + TableName: tblInfo.Name.L, + Type: model.ActionAlterTablePartitionPlacement, + BinlogInfo: &model.HistoryInfo{}, + Args: []any{partitionID, policyRefInfo}, + CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, + InvolvingSchemaInfo: involveSchemaInfo, + SQLMode: ctx.GetSessionVars().SQLMode, } err = d.DoDDLJob(ctx, job) @@ -9068,8 +9164,7 @@ func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceG CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, Args: []any{groupInfo, false}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ - Database: model.InvolvingNone, - Table: model.InvolvingNone, + ResourceGroup: groupInfo.Name.L, }}, SQLMode: ctx.GetSessionVars().SQLMode, } @@ -9120,8 +9215,7 @@ func (d *ddl) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGr CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, Args: []any{groupName}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ - Database: model.InvolvingNone, - Table: model.InvolvingNone, + ResourceGroup: groupName.L, }}, SQLMode: ctx.GetSessionVars().SQLMode, } @@ -9178,8 +9272,7 @@ func (d *ddl) AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResource CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, Args: []any{newGroupInfo}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ - Database: model.InvolvingNone, - Table: model.InvolvingNone, + ResourceGroup: newGroupInfo.Name.L, }}, SQLMode: ctx.GetSessionVars().SQLMode, } @@ -9244,8 +9337,7 @@ func (d *ddl) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacemen CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, Args: []any{policyName}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ - Database: model.InvolvingNone, - Table: model.InvolvingNone, + Policy: policyName.L, }}, SQLMode: ctx.GetSessionVars().SQLMode, } @@ -9284,8 +9376,7 @@ func (d *ddl) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacem CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, Args: []any{newPolicyInfo}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ - Database: model.InvolvingNone, - Table: model.InvolvingNone, + Policy: newPolicyInfo.Name.L, }}, SQLMode: ctx.GetSessionVars().SQLMode, } diff --git a/pkg/ddl/ddl_running_jobs.go b/pkg/ddl/ddl_running_jobs.go index b04d07d25d532..5c85ff5311878 100644 --- a/pkg/ddl/ddl_running_jobs.go +++ b/pkg/ddl/ddl_running_jobs.go @@ -37,27 +37,170 @@ type runningJobs struct { ids map[int64]struct{} idsStrGetter func() string + + exclusive *objects + shared *objects + // to implement the fair lock semantics, we need to save the pending exclusive + // object requests to block future shared object requests. + pending *objects +} + +// objects uses maps to count the involved number of objects. If the count is +// zero, the entry will be deleted from the map to keep map size small. +type objects struct { // database -> table -> struct{} // // if the job is only related to a database, the table-level entry key is // model.InvolvingAll. When remove a job, runningJobs will make sure no // zero-length map exists in table-level. - schemas map[string]map[string]struct{} + schemas map[string]map[string]int + placementPolicies map[string]int + resourceGroups map[string]int +} + +func newObjects() *objects { + return &objects{ + schemas: make(map[string]map[string]int), + placementPolicies: make(map[string]int), + resourceGroups: make(map[string]int), + } +} + +func (o *objects) empty() bool { + return len(o.schemas) == 0 && len(o.placementPolicies) == 0 && len(o.resourceGroups) == 0 } -// TODO(lance6716): support jobs involving objects that are exclusive (ALTER -// PLACEMENT POLICY vs ALTER PLACEMENT POLICY) and shared (ALTER TABLE PLACEMENT -// POLICY vs ALTER TABLE PLACEMENT POLICY) func newRunningJobs() *runningJobs { return &runningJobs{ ids: make(map[int64]struct{}), idsStrGetter: func() string { return "" }, - schemas: make(map[string]map[string]struct{}), + exclusive: newObjects(), + shared: newObjects(), + pending: newObjects(), } } -// add should only add the argument that passed the last checkRunnable. -func (j *runningJobs) add(jobID int64, involves []model.InvolvingSchemaInfo) { +// checkRunnable checks whether the job can be run. If the caller found a +// runnable job and decides to add it, it must addRunning before next +// checkRunnable invocation. Otherwise, it should addPending before next +// checkRunnable invocation. +func (j *runningJobs) checkRunnable(jobID int64, involves []model.InvolvingSchemaInfo) bool { + j.mu.RLock() + defer j.mu.RUnlock() + + if _, ok := j.ids[jobID]; ok { + // should not happen + if intest.InTest { + panic(fmt.Sprintf("job %d is already running", jobID)) + } + return false + } + // Currently flashback cluster is the only DDL that involves ALL schemas. + if _, ok := j.exclusive.schemas[model.InvolvingAll]; ok { + return false + } + + if j.exclusive.empty() && j.shared.empty() && j.pending.empty() { + return true + } + + for _, info := range involves { + intest.Assert( + !(info.Database == model.InvolvingNone && info.Table != model.InvolvingNone), + "job %d is invalid. While database is empty, involved table name is not empty: %s", + jobID, info.Table, + ) + intest.Assert( + !(info.Database != model.InvolvingNone && info.Table == model.InvolvingNone), + "job %d is invalid. While table is empty, involved database name is not empty: %s", + jobID, info.Database, + ) + + if info.Database == model.InvolvingAll && info.Table == model.InvolvingAll && + info.Mode == model.ExclusiveInvolving { + // check for involving all databases and tables, where the only case is FLASHBACK + // CLUSTER. Because now runningJobs is not totally empty, we can return false. + return false + } + + var toCheck []*objects + switch info.Mode { + case model.ExclusiveInvolving: + // Exclusive objects conflicts with running exclusive and shared objects. And + // because shared will be concurrently removed by removeRunning in another + // goroutine, we also check pending objects. + toCheck = []*objects{j.exclusive, j.shared, j.pending} + case model.SharedInvolving: + // Shared objects conflicts with running exclusive objects and pending exclusive + // objects. + toCheck = []*objects{j.exclusive, j.pending} + default: + panic(fmt.Sprintf("unknown involving mode: %d", info.Mode)) + } + + for _, checkingObj := range toCheck { + if info.Database != model.InvolvingNone { + if hasSchemaConflict(info.Database, info.Table, checkingObj.schemas) { + return false + } + // model.InvolvingSchemaInfo is like an enumerate type + intest.Assert( + info.Policy == "" && info.ResourceGroup == "", + "InvolvingSchemaInfo should be like an enumerate type: %#v", + info, + ) + continue + } + + if info.Policy != "" { + if _, ok := checkingObj.placementPolicies[info.Policy]; ok { + return false + } + intest.Assert( + info.ResourceGroup == "", + "InvolvingSchemaInfo should be like an enumerate type: %#v", + info, + ) + continue + } + intest.Assert( + info.ResourceGroup != "", + "InvolvingSchemaInfo should be like an enumerate type: %#v", + info, + ) + if _, ok := checkingObj.resourceGroups[info.ResourceGroup]; ok { + return false + } + } + } + return true +} + +func hasSchemaConflict( + requestDatabase, requestTable string, + schemas map[string]map[string]int, +) bool { + tbls, ok := schemas[requestDatabase] + if !ok { + return false + } + if requestTable == model.InvolvingAll { + // we rely on no zero-length map exists in table-level. So if the table-level + // entry exists, it must conflict with InvolvingAll. + return true + } + if _, ok2 := tbls[model.InvolvingAll]; ok2 { + return true + } + if _, ok2 := tbls[requestTable]; ok2 { + return true + } + return false +} + +// addRunning should only add the argument that passed the last checkRunnable. +// The added jobs can be removed by removeRunning. +func (j *runningJobs) addRunning(jobID int64, involves []model.InvolvingSchemaInfo) { j.mu.Lock() defer j.mu.Unlock() @@ -65,29 +208,33 @@ func (j *runningJobs) add(jobID int64, involves []model.InvolvingSchemaInfo) { j.updateIDsStrGetter() for _, info := range involves { - // DDL jobs related to placement policies and resource groups - if info.Database == model.InvolvingNone { - // should not happen - if intest.InTest { - if info.Table != model.InvolvingNone { - panic(fmt.Sprintf( - "job %d is invalid, involved table name is not empty: %s", - jobID, info.Table, - )) - } - } - continue + var toAdd *objects + switch info.Mode { + case model.ExclusiveInvolving: + toAdd = j.exclusive + case model.SharedInvolving: + toAdd = j.shared + default: + panic(fmt.Sprintf("unknown involving mode: %d", info.Mode)) } - if _, ok := j.schemas[info.Database]; !ok { - j.schemas[info.Database] = make(map[string]struct{}) + if info.Database != model.InvolvingNone { + if _, ok := toAdd.schemas[info.Database]; !ok { + toAdd.schemas[info.Database] = make(map[string]int) + } + toAdd.schemas[info.Database][info.Table]++ + } + if info.Policy != model.InvolvingNone { + toAdd.placementPolicies[info.Policy]++ + } + if info.ResourceGroup != model.InvolvingNone { + toAdd.resourceGroups[info.ResourceGroup]++ } - j.schemas[info.Database][info.Table] = struct{}{} } } -// remove can be concurrently called with add and checkRunnable. -func (j *runningJobs) remove(jobID int64, involves []model.InvolvingSchemaInfo) { +// removeRunning can be concurrently called with add and checkRunnable. +func (j *runningJobs) removeRunning(jobID int64, involves []model.InvolvingSchemaInfo) { j.mu.Lock() defer j.mu.Unlock() @@ -100,15 +247,80 @@ func (j *runningJobs) remove(jobID int64, involves []model.InvolvingSchemaInfo) j.updateIDsStrGetter() for _, info := range involves { - if db, ok := j.schemas[info.Database]; ok { - delete(db, info.Table) + var toRemove *objects + switch info.Mode { + case model.ExclusiveInvolving: + toRemove = j.exclusive + case model.SharedInvolving: + toRemove = j.shared + default: + panic(fmt.Sprintf("unknown involving mode: %d", info.Mode)) + } + + if info.Database != model.InvolvingNone { + if db, ok := toRemove.schemas[info.Database]; ok { + if info.Table != model.InvolvingNone { + db[info.Table]-- + if db[info.Table] == 0 { + delete(db, info.Table) + } + } + } + if len(toRemove.schemas[info.Database]) == 0 { + delete(toRemove.schemas, info.Database) + } + } + + if len(info.Policy) > 0 { + toRemove.placementPolicies[info.Policy]-- + if toRemove.placementPolicies[info.Policy] == 0 { + delete(toRemove.placementPolicies, info.Policy) + } + } + + if len(info.ResourceGroup) > 0 { + toRemove.resourceGroups[info.ResourceGroup]-- + if toRemove.resourceGroups[info.ResourceGroup] == 0 { + delete(toRemove.resourceGroups, info.ResourceGroup) + } + } + } +} + +// addPending is used to record the exclusive objects of jobs that can not run, +// to block following jobs which has intersected shared objects with the pending +// jobs. So we can have a "fair lock" semantics. +// +// The pending jobs can be removed by resetAllPending. +func (j *runningJobs) addPending(involves []model.InvolvingSchemaInfo) { + j.mu.Lock() + defer j.mu.Unlock() + + for _, info := range involves { + if info.Database != model.InvolvingNone { + if _, ok := j.pending.schemas[info.Database]; !ok { + j.pending.schemas[info.Database] = make(map[string]int) + } + j.pending.schemas[info.Database][info.Table]++ } - if len(j.schemas[info.Database]) == 0 { - delete(j.schemas, info.Database) + if info.Policy != model.InvolvingNone { + j.pending.placementPolicies[info.Policy]++ + } + if info.ResourceGroup != model.InvolvingNone { + j.pending.resourceGroups[info.ResourceGroup]++ } } } +// resetAllPending should be called when caller finishes the round of getting a +// runnable DDL job. +func (j *runningJobs) resetAllPending() { + j.mu.Lock() + defer j.mu.Unlock() + + j.pending = newObjects() +} + func (j *runningJobs) updateIDsStrGetter() { var ( once sync.Once @@ -136,47 +348,3 @@ func (j *runningJobs) allIDs() string { defer j.mu.RUnlock() return j.idsStrGetter() } - -// checkRunnable checks whether the job can be run. If the caller found a -// runnable job and decides to add it, it must add before next checkRunnable -// invocation. -func (j *runningJobs) checkRunnable(jobID int64, involves []model.InvolvingSchemaInfo) bool { - j.mu.RLock() - defer j.mu.RUnlock() - - if _, ok := j.ids[jobID]; ok { - // should not happen - if intest.InTest { - panic(fmt.Sprintf("job %d is already running", jobID)) - } - return false - } - // Currently flashback cluster is the only DDL that involves ALL schemas. - if _, ok := j.schemas[model.InvolvingAll]; ok { - return false - } - - for _, info := range involves { - if info.Database == model.InvolvingAll { - if len(j.schemas) != 0 { - return false - } - continue - } - - tbls, ok := j.schemas[info.Database] - if !ok { - continue - } - if info.Table == model.InvolvingAll { - return false - } - if _, ok2 := tbls[model.InvolvingAll]; ok2 { - return false - } - if _, ok2 := tbls[info.Table]; ok2 { - return false - } - } - return true -} diff --git a/pkg/ddl/ddl_running_jobs_test.go b/pkg/ddl/ddl_running_jobs_test.go index 8638ebd73ea6e..dce4fb630fef7 100644 --- a/pkg/ddl/ddl_running_jobs_test.go +++ b/pkg/ddl/ddl_running_jobs_test.go @@ -29,7 +29,7 @@ import ( ) func mkJob(id int64, schemaTableNames ...string) (int64, []model.InvolvingSchemaInfo) { - schemaInfos := make([]model.InvolvingSchemaInfo, len(schemaTableNames)) + schemaInfos := make([]model.InvolvingSchemaInfo, 0, len(schemaTableNames)) for _, schemaTableName := range schemaTableNames { ss := strings.Split(schemaTableName, ".") schemaInfos = append(schemaInfos, model.InvolvingSchemaInfo{ @@ -41,32 +41,42 @@ func mkJob(id int64, schemaTableNames ...string) (int64, []model.InvolvingSchema } func checkInvariants(t *testing.T, j *runningJobs) { - // check table-level entry should not have zero length - for _, tables := range j.schemas { - require.Greater(t, len(tables), 0) + for _, checkingObj := range []*objects{j.exclusive, j.shared, j.pending} { + for _, tables := range checkingObj.schemas { + // check table-level entry should not have zero length + require.Greater(t, len(tables), 0) + for _, v := range tables { + require.Greater(t, v, 0) + } + } + for _, v := range checkingObj.placementPolicies { + require.Greater(t, v, 0) + } + for _, v := range checkingObj.resourceGroups { + require.Greater(t, v, 0) + } } } -func TestRunningJobs(t *testing.T) { - orderedAllIDs := func(ids string) string { - if ids == "" { - return "" - } +func orderedAllIDs(ids string) string { + if ids == "" { + return "" + } - ss := strings.Split(ids, ",") - ssid := make([]int, len(ss)) - for i := range ss { - id, err := strconv.Atoi(ss[i]) - require.NoError(t, err) - ssid[i] = id - } - sort.Ints(ssid) - for i := range ssid { - ss[i] = strconv.Itoa(ssid[i]) - } - return strings.Join(ss, ",") + ss := strings.Split(ids, ",") + ssid := make([]int, len(ss)) + for i := range ss { + id, _ := strconv.Atoi(ss[i]) + ssid[i] = id + } + sort.Ints(ssid) + for i := range ssid { + ss[i] = strconv.Itoa(ssid[i]) } + return strings.Join(ss, ",") +} +func TestRunningJobs(t *testing.T) { j := newRunningJobs() require.Equal(t, "", j.allIDs()) checkInvariants(t, j) @@ -77,11 +87,11 @@ func TestRunningJobs(t *testing.T) { jobID1, involves1 := mkJob(1, "db1.t1", "db1.t2") runnable = j.checkRunnable(jobID1, involves1) require.True(t, runnable) - j.add(jobID1, involves1) + j.addRunning(jobID1, involves1) jobID2, involves2 := mkJob(2, "db2.t3") runnable = j.checkRunnable(jobID2, involves2) require.True(t, runnable) - j.add(jobID2, involves2) + j.addRunning(jobID2, involves2) require.Equal(t, "1,2", orderedAllIDs(j.allIDs())) checkInvariants(t, j) @@ -97,20 +107,20 @@ func TestRunningJobs(t *testing.T) { jobID3, involves3 := mkJob(3, "db1.*") runnable = j.checkRunnable(jobID3, involves3) require.False(t, runnable) - j.remove(jobID1, involves1) + j.removeRunning(jobID1, involves1) runnable = j.checkRunnable(jobID3, involves3) require.True(t, runnable) - j.add(jobID3, involves3) + j.addRunning(jobID3, involves3) require.Equal(t, "2,3", orderedAllIDs(j.allIDs())) checkInvariants(t, j) runnable = j.checkRunnable(mkJob(0, "db1.t100")) require.False(t, runnable) - jobID4, involves4 := mkJob(4, "db4.t100") + jobID4, involves4 := mkJob(4, "db4.t100", "db2.t6") runnable = j.checkRunnable(jobID4, involves4) require.True(t, runnable) - j.add(jobID4, involves4) + j.addRunning(jobID4, involves4) require.Equal(t, "2,3,4", orderedAllIDs(j.allIDs())) checkInvariants(t, j) @@ -118,18 +128,233 @@ func TestRunningJobs(t *testing.T) { runnable = j.checkRunnable(jobID5, involves5) require.False(t, runnable) - j.remove(jobID2, involves2) - j.remove(jobID3, involves3) - j.remove(jobID4, involves4) + j.removeRunning(jobID2, involves2) + j.removeRunning(jobID3, involves3) + j.removeRunning(jobID4, involves4) require.Equal(t, "", orderedAllIDs(j.allIDs())) checkInvariants(t, j) runnable = j.checkRunnable(jobID5, involves5) require.True(t, runnable) - j.add(jobID5, involves5) + j.addRunning(jobID5, involves5) require.Equal(t, "5", orderedAllIDs(j.allIDs())) checkInvariants(t, j) runnable = j.checkRunnable(mkJob(0, "db1.t1")) require.False(t, runnable) } + +func TestSchemaPolicyAndResourceGroup(t *testing.T) { + j := newRunningJobs() + + jobID1, involves1 := mkJob(1, "db1.t1", "db1.t2") + runnable := j.checkRunnable(jobID1, involves1) + require.True(t, runnable) + j.addRunning(jobID1, involves1) + + failedInvolves := []model.InvolvingSchemaInfo{ + {Policy: "p0"}, + {Database: "db1", Table: model.InvolvingAll}, + } + runnable = j.checkRunnable(0, failedInvolves) + require.False(t, runnable) + + failedInvolves = []model.InvolvingSchemaInfo{ + {Database: model.InvolvingAll, Table: model.InvolvingAll}, + {ResourceGroup: "g0"}, + } + runnable = j.checkRunnable(0, failedInvolves) + require.False(t, runnable) + + jobID2 := int64(2) + involves2 := []model.InvolvingSchemaInfo{ + {Database: "db2", Table: model.InvolvingAll}, + {Policy: "p0"}, + {ResourceGroup: "g0"}, + } + runnable = j.checkRunnable(jobID2, involves2) + require.True(t, runnable) + j.addRunning(jobID2, involves2) + + jobID3 := int64(3) + involves3 := []model.InvolvingSchemaInfo{ + {Policy: "p1"}, + {ResourceGroup: "g1"}, + } + runnable = j.checkRunnable(jobID3, involves3) + require.True(t, runnable) + j.addRunning(jobID3, involves3) + require.Equal(t, "1,2,3", orderedAllIDs(j.allIDs())) + checkInvariants(t, j) + + failedInvolves = []model.InvolvingSchemaInfo{ + {ResourceGroup: "g0"}, + } + runnable = j.checkRunnable(0, failedInvolves) + require.False(t, runnable) + + j.removeRunning(jobID2, involves2) + require.Equal(t, "1,3", orderedAllIDs(j.allIDs())) + checkInvariants(t, j) + + jobID4 := int64(4) + involves4 := []model.InvolvingSchemaInfo{ + {Policy: "p0"}, + {Database: "db3", Table: "t3"}, + } + runnable = j.checkRunnable(jobID4, involves4) + require.True(t, runnable) + j.addRunning(jobID4, involves4) + require.Equal(t, "1,3,4", orderedAllIDs(j.allIDs())) + checkInvariants(t, j) + + failedInvolves = []model.InvolvingSchemaInfo{ + {Database: "db3", Table: "t3"}, + } + runnable = j.checkRunnable(0, failedInvolves) + require.False(t, runnable) + failedInvolves = []model.InvolvingSchemaInfo{ + {Policy: "p1"}, + } + runnable = j.checkRunnable(0, failedInvolves) + require.False(t, runnable) +} + +func TestExclusiveShared(t *testing.T) { + j := newRunningJobs() + + jobID1, involves1 := mkJob(1, "db1.t1", "db1.t2") + runnable := j.checkRunnable(jobID1, involves1) + require.True(t, runnable) + j.addRunning(jobID1, involves1) + + failedInvolves := []model.InvolvingSchemaInfo{ + {Database: "db2", Table: model.InvolvingAll}, + {Database: "db1", Table: "t1", Mode: model.SharedInvolving}, + } + runnable = j.checkRunnable(0, failedInvolves) + require.False(t, runnable) + + jobID2 := int64(2) + involves2 := []model.InvolvingSchemaInfo{ + {Database: "db3", Table: model.InvolvingAll}, + {Database: "db2", Table: "t2", Mode: model.SharedInvolving}, + } + runnable = j.checkRunnable(jobID2, involves2) + require.True(t, runnable) + j.addRunning(jobID2, involves2) + + jobID3 := int64(3) + involves3 := []model.InvolvingSchemaInfo{ + {Database: "db4", Table: model.InvolvingAll}, + {Database: "db2", Table: "t2", Mode: model.SharedInvolving}, + } + runnable = j.checkRunnable(jobID3, involves3) + require.True(t, runnable) + j.addRunning(jobID3, involves3) + require.Equal(t, "1,2,3", orderedAllIDs(j.allIDs())) + checkInvariants(t, j) + + pendingInvolves := []model.InvolvingSchemaInfo{ + {Database: "db2", Table: "t2"}, + } + runnable = j.checkRunnable(0, pendingInvolves) + require.False(t, runnable) + j.addPending(pendingInvolves) + require.Equal(t, "1,2,3", orderedAllIDs(j.allIDs())) + checkInvariants(t, j) + + // because there's a pending job on db2.t2, next job on db2.t2 should be blocked + jobID4 := int64(4) + involves4 := []model.InvolvingSchemaInfo{ + {Database: "db100", Table: model.InvolvingAll}, + {Database: "db2", Table: "t2", Mode: model.SharedInvolving}, + } + runnable = j.checkRunnable(jobID4, involves4) + require.False(t, runnable) + require.Equal(t, "1,2,3", orderedAllIDs(j.allIDs())) + checkInvariants(t, j) + + // mimic all running job is finished and here's the next round to get jobs + j.resetAllPending() + j.removeRunning(jobID1, involves1) + j.removeRunning(jobID2, involves2) + j.removeRunning(jobID3, involves3) + checkInvariants(t, j) + + runnable = j.checkRunnable(0, pendingInvolves) + require.True(t, runnable) + + // new test round + + jobID5 := int64(5) + involves5 := []model.InvolvingSchemaInfo{ + {Policy: "p1", Mode: model.SharedInvolving}, + {Policy: "p2", Mode: model.SharedInvolving}, + } + runnable = j.checkRunnable(jobID5, involves5) + require.True(t, runnable) + j.addRunning(jobID5, involves5) + + jobID6 := int64(6) + involves6 := []model.InvolvingSchemaInfo{ + {Policy: "p1", Mode: model.SharedInvolving}, + {ResourceGroup: "g1", Mode: model.SharedInvolving}, + } + runnable = j.checkRunnable(jobID6, involves6) + require.True(t, runnable) + j.addRunning(jobID6, involves6) + + require.Equal(t, "5,6", orderedAllIDs(j.allIDs())) + checkInvariants(t, j) + + pendingInvolves = []model.InvolvingSchemaInfo{ + {Policy: "p1"}, + {ResourceGroup: "g2"}, + } + runnable = j.checkRunnable(0, pendingInvolves) + require.False(t, runnable) + j.addPending(pendingInvolves) + + secondPendingInvolves := []model.InvolvingSchemaInfo{ + {ResourceGroup: "g2"}, + {ResourceGroup: "g3"}, + } + runnable = j.checkRunnable(0, secondPendingInvolves) + require.False(t, runnable) + j.addPending(secondPendingInvolves) + + // we have two shared p1 objects, test when one is finished and another round starts. + + j.removeRunning(jobID6, involves6) + require.Equal(t, "5", orderedAllIDs(j.allIDs())) + checkInvariants(t, j) + j.resetAllPending() + + runnable = j.checkRunnable(0, pendingInvolves) + require.False(t, runnable) + j.addPending(pendingInvolves) + runnable = j.checkRunnable(0, secondPendingInvolves) + require.False(t, runnable) + j.addPending(secondPendingInvolves) + + // all shared p1 objects is removed + + j.removeRunning(jobID5, involves5) + require.Equal(t, "", orderedAllIDs(j.allIDs())) + checkInvariants(t, j) + + // no p1 in exclusive and shared. But p1 exists in pending, so this job can not run + thirdPendingInvolves := []model.InvolvingSchemaInfo{ + {Policy: "p1"}, + } + runnable = j.checkRunnable(0, thirdPendingInvolves) + require.False(t, runnable) + j.addPending(thirdPendingInvolves) + + // now another round starts, the first pending job can run + + j.resetAllPending() + runnable = j.checkRunnable(0, pendingInvolves) + require.True(t, runnable) +} diff --git a/pkg/ddl/foreign_key_test.go b/pkg/ddl/foreign_key_test.go index 3ace5eb81c093..24e0459ae231c 100644 --- a/pkg/ddl/foreign_key_test.go +++ b/pkg/ddl/foreign_key_test.go @@ -213,9 +213,6 @@ func TestForeignKey(t *testing.T) { } func TestTruncateOrDropTableWithForeignKeyReferred2(t *testing.T) { - // TODO(lance6716): enable this test case after introduce exclusive and shared objects on runningJobs. - t.Skip("this test implicitly depends on DDL job dependency on FK and " + - "TestTruncateOrDropTableWithForeignKeyReferred. Will enable it soon.") store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease) d := dom.DDL() tk := testkit.NewTestKit(t, store) @@ -312,9 +309,6 @@ func TestDropIndexNeededInForeignKey2(t *testing.T) { } func TestDropDatabaseWithForeignKeyReferred2(t *testing.T) { - // TODO(lance6716): enable this test case after introduce exclusive and shared objects on runningJobs. - t.Skip("this test implicitly depends on DDL job dependency on FK and " + - "TestDropDatabaseWithForeignKeyReferred. Will enable it soon.") store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease) d := dom.DDL() tk := testkit.NewTestKit(t, store) diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 71392ae6dcf4b..4c5c49f309a42 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -187,7 +187,10 @@ func (s *jobScheduler) close() { } } +// getJob reads tidb_ddl_job and returns the first runnable DDL job. func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error) { + defer s.runningJobs.resetAllPending() + not := "not" label := "get_job_general" if tp == jobTypeReorg { @@ -230,7 +233,9 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error) return &job, nil } - if !s.runningJobs.checkRunnable(job.ID, job.GetInvolvingSchemaInfo()) { + involving := job.GetInvolvingSchemaInfo() + if !s.runningJobs.checkRunnable(job.ID, involving) { + s.runningJobs.addPending(involving) continue } @@ -239,6 +244,7 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error) "[ddl] handle ddl job failed: mark job is processing meet error", zap.Error(err), zap.Stringer("job", &job)) + s.runningJobs.addPending(involving) return nil, errors.Trace(err) } return &job, nil @@ -523,12 +529,12 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model. failpoint.InjectCall("beforeDelivery2Worker", job) injectFailPointForGetJob(job) jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo() - s.runningJobs.add(jobID, involvedSchemaInfos) + s.runningJobs.addRunning(jobID, involvedSchemaInfos) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc() s.wg.RunWithLog(func() { defer func() { failpoint.InjectCall("afterDelivery2Worker", job) - s.runningJobs.remove(jobID, involvedSchemaInfos) + s.runningJobs.removeRunning(jobID, involvedSchemaInfos) asyncNotify(s.ddlJobNotifyCh) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() if wk.ctx.Err() != nil && ingest.LitBackCtxMgr != nil { diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index 5c8badf738311..9ab293daf4ce9 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -15,7 +15,10 @@ package ddl import ( + "fmt" + "github.com/pingcap/errors" + ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" @@ -23,6 +26,8 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/intest" + "go.uber.org/zap" ) func (d *ddl) MultiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info *model.MultiSchemaInfo) error { @@ -35,18 +40,86 @@ func (d *ddl) MultiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info *mode return errors.Trace(err) } + logFn := ddllogutil.DDLLogger().Warn + if intest.InTest { + logFn = ddllogutil.DDLLogger().Fatal + } + + var involvingSchemaInfo []model.InvolvingSchemaInfo + for _, j := range subJobs { + switch j.Type { + case model.ActionAlterTablePlacement: + ref, ok := j.Args[0].(*model.PolicyRefInfo) + if !ok { + logFn("unexpected type of policy reference info", + zap.Any("args[0]", j.Args[0]), + zap.String("type", fmt.Sprintf("%T", j.Args[0]))) + continue + } + if ref == nil { + continue + } + involvingSchemaInfo = append(involvingSchemaInfo, model.InvolvingSchemaInfo{ + Policy: ref.Name.L, + Mode: model.SharedInvolving, + }) + case model.ActionAddForeignKey: + ref, ok := j.Args[0].(*model.FKInfo) + if !ok { + logFn("unexpected type of foreign key info", + zap.Any("args[0]", j.Args[0]), + zap.String("type", fmt.Sprintf("%T", j.Args[0]))) + continue + } + involvingSchemaInfo = append(involvingSchemaInfo, model.InvolvingSchemaInfo{ + Database: ref.RefSchema.L, + Table: ref.RefTable.L, + Mode: model.SharedInvolving, + }) + case model.ActionAlterTablePartitionPlacement: + if len(j.Args) < 2 { + logFn("unexpected number of arguments for partition placement", + zap.Int("len(args)", len(j.Args)), + zap.Any("args", j.Args)) + continue + } + ref, ok := j.Args[1].(*model.PolicyRefInfo) + if !ok { + logFn("unexpected type of policy reference info", + zap.Any("args[0]", j.Args[0]), + zap.String("type", fmt.Sprintf("%T", j.Args[0]))) + continue + } + if ref == nil { + continue + } + involvingSchemaInfo = append(involvingSchemaInfo, model.InvolvingSchemaInfo{ + Policy: ref.Name.L, + Mode: model.SharedInvolving, + }) + } + } + + if len(involvingSchemaInfo) > 0 { + involvingSchemaInfo = append(involvingSchemaInfo, model.InvolvingSchemaInfo{ + Database: schema.Name.L, + Table: t.Meta().Name.L, + }) + } + job := &model.Job{ - SchemaID: schema.ID, - TableID: t.Meta().ID, - SchemaName: schema.Name.L, - TableName: t.Meta().Name.L, - Type: model.ActionMultiSchemaChange, - BinlogInfo: &model.HistoryInfo{}, - Args: nil, - MultiSchemaInfo: info, - ReorgMeta: nil, - CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - SQLMode: ctx.GetSessionVars().SQLMode, + SchemaID: schema.ID, + TableID: t.Meta().ID, + SchemaName: schema.Name.L, + TableName: t.Meta().Name.L, + Type: model.ActionMultiSchemaChange, + BinlogInfo: &model.HistoryInfo{}, + Args: nil, + MultiSchemaInfo: info, + ReorgMeta: nil, + CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, + InvolvingSchemaInfo: involvingSchemaInfo, + SQLMode: ctx.GetSessionVars().SQLMode, } if containsDistTaskSubJob(subJobs) { job.ReorgMeta, err = newReorgMetaFromVariables(job, ctx) diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index f6c4f6e9078d2..69808f725e870 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -770,7 +770,7 @@ func TestCreateTableWithInfoPlacement(t *testing.T) { tk.MustExec("drop placement policy p1") tk.MustExec("create placement policy p1 followers=2") tk.Session().SetValue(sessionctx.QueryString, "skip") - require.Nil(t, dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test2"), tbl, ddl.OnExistError)) + require.Nil(t, dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test2"), tbl, nil, ddl.OnExistError)) tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" + " `a` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) @@ -791,7 +791,7 @@ func TestCreateTableWithInfoPlacement(t *testing.T) { tbl2.Name = model.NewCIStr("t3") tbl2.PlacementPolicyRef.Name = model.NewCIStr("pxx") tk.Session().SetValue(sessionctx.QueryString, "skip") - err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test2"), tbl2, ddl.OnExistError) + err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test2"), tbl2, nil, ddl.OnExistError) require.Equal(t, "[schema:8239]Unknown placement policy 'pxx'", err.Error()) } diff --git a/pkg/ddl/placement_sql_test.go b/pkg/ddl/placement_sql_test.go index f8432c14e421b..31a40afea7784 100644 --- a/pkg/ddl/placement_sql_test.go +++ b/pkg/ddl/placement_sql_test.go @@ -534,7 +534,7 @@ func TestPlacementMode(t *testing.T) { require.NotNil(t, tbl.PlacementPolicyRef) tbl.Name = model.NewCIStr("t2") tk.Session().SetValue(sessionctx.QueryString, "skip") - err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test"), tbl, ddl.OnExistError) + err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test"), tbl, nil, ddl.OnExistError) require.NoError(t, err) tk.MustQuery("show create table t2").Check(testkit.Rows("t2 CREATE TABLE `t2` (\n" + " `id` int(11) DEFAULT NULL\n" + @@ -548,7 +548,7 @@ func TestPlacementMode(t *testing.T) { tbl.Name = model.NewCIStr("t2") tbl.PlacementPolicyRef.Name = model.NewCIStr("pxx") tk.Session().SetValue(sessionctx.QueryString, "skip") - err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test"), tbl, ddl.OnExistError) + err = dom.DDL().CreateTableWithInfo(tk.Session(), model.NewCIStr("test"), tbl, nil, ddl.OnExistError) require.NoError(t, err) tk.MustQuery("show create table t2").Check(testkit.Rows("t2 CREATE TABLE `t2` (\n" + " `id` int(11) DEFAULT NULL\n" + diff --git a/pkg/ddl/schematracker/checker.go b/pkg/ddl/schematracker/checker.go index 67bee1b5adab7..afd1de6f38679 100644 --- a/pkg/ddl/schematracker/checker.go +++ b/pkg/ddl/schematracker/checker.go @@ -463,7 +463,7 @@ func (d *Checker) CreateSchemaWithInfo(ctx sessionctx.Context, info *model.DBInf } // CreateTableWithInfo implements the DDL interface. -func (*Checker) CreateTableWithInfo(_ sessionctx.Context, _ model.CIStr, _ *model.TableInfo, _ ...ddl.CreateTableWithInfoConfigurier) error { +func (*Checker) CreateTableWithInfo(_ sessionctx.Context, _ model.CIStr, _ *model.TableInfo, _ []model.InvolvingSchemaInfo, _ ...ddl.CreateTableWithInfoConfigurier) error { //TODO implement me panic("implement me") } diff --git a/pkg/ddl/schematracker/dm_tracker.go b/pkg/ddl/schematracker/dm_tracker.go index e4079b29fb5d5..34d5ff3dc5d52 100644 --- a/pkg/ddl/schematracker/dm_tracker.go +++ b/pkg/ddl/schematracker/dm_tracker.go @@ -230,7 +230,7 @@ func (d SchemaTracker) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStm onExist = ddl.OnExistIgnore } - return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, onExist) + return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, nil, onExist) } // CreateTableWithInfo implements the DDL interface. @@ -238,6 +238,7 @@ func (d SchemaTracker) CreateTableWithInfo( _ sessionctx.Context, dbName model.CIStr, info *model.TableInfo, + _ []model.InvolvingSchemaInfo, cs ...ddl.CreateTableWithInfoConfigurier, ) error { c := ddl.GetCreateTableWithInfoConfig(cs) @@ -264,7 +265,7 @@ func (d SchemaTracker) CreateTableWithInfo( // CreateView implements the DDL interface. func (d SchemaTracker) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) error { - viewInfo, err := ddl.BuildViewInfo(ctx, s) + viewInfo, err := ddl.BuildViewInfo(s) if err != nil { return err } @@ -290,7 +291,7 @@ func (d SchemaTracker) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) onExist = ddl.OnExistReplace } - return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, onExist) + return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, nil, onExist) } // DropTable implements the DDL interface. @@ -1189,7 +1190,7 @@ func (SchemaTracker) AlterResourceGroup(_ sessionctx.Context, _ *ast.AlterResour // BatchCreateTableWithInfo implements the DDL interface, it will call CreateTableWithInfo for each table. func (d SchemaTracker) BatchCreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { for _, tableInfo := range info { - if err := d.CreateTableWithInfo(ctx, schema, tableInfo, cs...); err != nil { + if err := d.CreateTableWithInfo(ctx, schema, tableInfo, nil, cs...); err != nil { return err } } diff --git a/pkg/executor/brie_utils.go b/pkg/executor/brie_utils.go index 81ee8251add6b..e7511b755ba79 100644 --- a/pkg/executor/brie_utils.go +++ b/pkg/executor/brie_utils.go @@ -109,7 +109,7 @@ func BRIECreateTable( table = table.Clone() - return d.CreateTableWithInfo(sctx, dbName, table, append(cs, ddl.OnExistIgnore)...) + return d.CreateTableWithInfo(sctx, dbName, table, nil, append(cs, ddl.OnExistIgnore)...) } // BRIECreateTables creates the tables with OnExistIgnore option in batch diff --git a/pkg/parser/model/ddl.go b/pkg/parser/model/ddl.go index 335a50059554a..bad69f041779a 100644 --- a/pkg/parser/model/ddl.go +++ b/pkg/parser/model/ddl.go @@ -585,15 +585,39 @@ type Job struct { SQLMode mysql.SQLMode `json:"sql_mode"` } -// InvolvingSchemaInfo returns the schema info involved in the job. -// The value should be stored in lower case. +// InvolvingSchemaInfo returns the schema info involved in the job. The value +// should be stored in lower case. Only one type of the three member types +// (Database&Table, Policy, ResourceGroup) should only be set in a +// InvolvingSchemaInfo. type InvolvingSchemaInfo struct { - Database string `json:"database"` - Table string `json:"table"` + Database string `json:"database,omitempty"` + Table string `json:"table,omitempty"` + Policy string `json:"policy,omitempty"` + ResourceGroup string `json:"resource_group,omitempty"` + Mode InvolvingSchemaInfoMode `json:"mode,omitempty"` } +// InvolvingSchemaInfoMode is used by InvolvingSchemaInfo.Mode. +type InvolvingSchemaInfoMode int + +// ExclusiveInvolving and SharedInvolving are considered like the exclusive lock +// and shared lock when calculate DDL job dependencies. And we also implement the +// fair lock semantic which means if we have job A/B/C arrive in order, and job B +// (exclusive request object 0) is waiting for the running job A (shared request +// object 0), and job C (shared request object 0) arrives, job C should also be +// blocked until job B is finished although job A & C has no dependency. +const ( + // ExclusiveInvolving is the default value to keep compatibility with old + // versions. + ExclusiveInvolving InvolvingSchemaInfoMode = iota + SharedInvolving +) + const ( - // InvolvingAll means all schemas/tables are affected. + // InvolvingAll means all schemas/tables are affected. It's used in + // InvolvingSchemaInfo.Database/Tables fields. When both the Database and Tables + // are InvolvingAll it also means all placement policies and resource groups are + // affected. Currently the only case is FLASHBACK CLUSTER. InvolvingAll = "*" // InvolvingNone means no schema/table is affected. InvolvingNone = "" @@ -1034,7 +1058,7 @@ func (job *Job) GetInvolvingSchemaInfo() []InvolvingSchemaInfo { } table := job.TableName // for schema related DDL, such as 'drop schema xxx' - if table == "" { + if len(job.SchemaName) > 0 && table == "" { table = InvolvingAll } return []InvolvingSchemaInfo{