diff --git a/pkg/ddl/foreign_key.go b/pkg/ddl/foreign_key.go index 5d951c065f0e9..62ce58555f497 100644 --- a/pkg/ddl/foreign_key.go +++ b/pkg/ddl/foreign_key.go @@ -404,15 +404,10 @@ func checkTableHasForeignKeyReferred(is infoschemactx.MetaOnlyInfoSchema, schema return nil } -func checkDropTableHasForeignKeyReferredInOwner(infoCache *infoschema.InfoCache, job *model.Job) error { +func checkDropTableHasForeignKeyReferredInOwner(infoCache *infoschema.InfoCache, job *model.Job, args *model.DropTableArgs) error { if !variable.EnableForeignKey.Load() { return nil } - args, err := model.GetDropTableArgs(job) - if err != nil { - job.State = model.JobStateCancelled - return errors.Trace(err) - } objectIdents, fkCheck := args.Identifiers, args.FKCheck referredFK, err := checkTableHasForeignKeyReferredInOwner(infoCache, job.SchemaName, job.TableName, objectIdents, fkCheck) if err != nil { diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index edd1a2e7d5f6f..8c6784977c6fc 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -2236,6 +2236,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i job.State = model.JobStateCancelled return ver, errors.Trace(err) } + jobCtx.jobArgs = args partNames := args.PartNames metaMut := jobCtx.metaMut tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID) @@ -2378,7 +2379,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i tblInfo.Partition.DDLState = model.StateNone tblInfo.Partition.DDLAction = model.ActionNone // used by ApplyDiff in updateSchemaVersion - job.CtxVars = []any{physicalTableIDs} // TODO remove it. + args.OldPhysicalTblIDs = physicalTableIDs ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) @@ -2391,7 +2392,6 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i ) asyncNotifyEvent(jobCtx, dropPartitionEvent, job) // A background job will be created to delete old partition data. - args.OldPhysicalTblIDs = physicalTableIDs job.FillFinishedArgs(args) default: err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState) @@ -2424,6 +2424,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i job.State = model.JobStateCancelled return ver, errors.Trace(err) } + jobCtx.jobArgs = args oldIDs, newIDs := args.OldPartitionIDs, args.NewPartitionIDs if len(oldIDs) != len(newIDs) { job.State = model.JobStateCancelled @@ -2471,7 +2472,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions) - job.CtxVars = []any{oldIDs, newIDs} + args.ShouldUpdateAffectedPartitions = true ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) @@ -2613,7 +2614,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions) // used by ApplyDiff in updateSchemaVersion - job.CtxVars = []any{oldIDs, newIDs} + args.ShouldUpdateAffectedPartitions = true ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) diff --git a/pkg/ddl/schema_version.go b/pkg/ddl/schema_version.go index b2f36e29d9e7e..390e795ae9f87 100644 --- a/pkg/ddl/schema_version.go +++ b/pkg/ddl/schema_version.go @@ -48,26 +48,15 @@ func SetSchemaDiffForCreateTables(diff *model.SchemaDiff, job *model.Job) error } // SetSchemaDiffForTruncateTable set SchemaDiff for ActionTruncateTable. -func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job) error { +func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) error { // Truncate table has two table ID, should be handled differently. - args, err := model.GetTruncateTableArgs(job) - if err != nil { - return errors.Trace(err) - } + args := jobCtx.jobArgs.(*model.TruncateTableArgs) diff.TableID = args.NewTableID diff.OldTableID = job.TableID // affects are used to update placement rule cache - if job.Version == model.JobVersion1 { - if len(job.CtxVars) > 0 { - oldIDs := job.CtxVars[0].([]int64) - newIDs := job.CtxVars[1].([]int64) - diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) - } - } else { - if len(args.OldPartIDsWithPolicy) > 0 { - diff.AffectedOpts = buildPlacementAffects(args.OldPartIDsWithPolicy, args.NewPartIDsWithPolicy) - } + if len(args.OldPartIDsWithPolicy) > 0 { + diff.AffectedOpts = buildPlacementAffects(args.OldPartIDsWithPolicy, args.NewPartIDsWithPolicy) } return nil } @@ -170,23 +159,41 @@ func SetSchemaDiffForExchangeTablePartition(diff *model.SchemaDiff, job *model.J } // SetSchemaDiffForTruncateTablePartition set SchemaDiff for ActionTruncateTablePartition. -func SetSchemaDiffForTruncateTablePartition(diff *model.SchemaDiff, job *model.Job) { +func SetSchemaDiffForTruncateTablePartition(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) { diff.TableID = job.TableID - if len(job.CtxVars) > 0 { - oldIDs := job.CtxVars[0].([]int64) - newIDs := job.CtxVars[1].([]int64) - diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + args := jobCtx.jobArgs.(*model.TruncateTableArgs) + if args.ShouldUpdateAffectedPartitions { + diff.AffectedOpts = buildPlacementAffects(args.OldPartitionIDs, args.NewPartitionIDs) } } -// SetSchemaDiffForDropTable set SchemaDiff for ActionDropTablePartition, ActionRecoverTable, ActionDropTable. -func SetSchemaDiffForDropTable(diff *model.SchemaDiff, job *model.Job) { +// SetSchemaDiffForDropTable set SchemaDiff for ActionDropTable. +func SetSchemaDiffForDropTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) { // affects are used to update placement rule cache diff.TableID = job.TableID - if len(job.CtxVars) > 0 { - if oldIDs, ok := job.CtxVars[0].([]int64); ok { - diff.AffectedOpts = buildPlacementAffects(oldIDs, oldIDs) - } + args := jobCtx.jobArgs.(*model.DropTableArgs) + if len(args.OldPartitionIDs) > 0 { + diff.AffectedOpts = buildPlacementAffects(args.OldPartitionIDs, args.OldPartitionIDs) + } +} + +// SetSchemaDiffForDropTablePartition set SchemaDiff for ActionDropTablePartition. +func SetSchemaDiffForDropTablePartition(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) { + // affects are used to update placement rule cache + diff.TableID = job.TableID + args := jobCtx.jobArgs.(*model.TablePartitionArgs) + if len(args.OldPhysicalTblIDs) > 0 { + diff.AffectedOpts = buildPlacementAffects(args.OldPhysicalTblIDs, args.OldPhysicalTblIDs) + } +} + +// SetSchemaDiffForRecoverTable set SchemaDiff for ActionRecoverTable. +func SetSchemaDiffForRecoverTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) { + // affects are used to update placement rule cache + diff.TableID = job.TableID + args := jobCtx.jobArgs.(*model.RecoverArgs) + if len(args.AffectedPhysicalIDs) > 0 { + diff.AffectedOpts = buildPlacementAffects(args.AffectedPhysicalIDs, args.AffectedPhysicalIDs) } } @@ -330,7 +337,7 @@ func updateSchemaVersion(jobCtx *jobContext, job *model.Job, multiInfos ...schem case model.ActionCreateTables: err = SetSchemaDiffForCreateTables(diff, job) case model.ActionTruncateTable: - err = SetSchemaDiffForTruncateTable(diff, job) + err = SetSchemaDiffForTruncateTable(diff, job, jobCtx) case model.ActionCreateView: err = SetSchemaDiffForCreateView(diff, job) case model.ActionRenameTable: @@ -340,9 +347,13 @@ func updateSchemaVersion(jobCtx *jobContext, job *model.Job, multiInfos ...schem case model.ActionExchangeTablePartition: err = SetSchemaDiffForExchangeTablePartition(diff, job, multiInfos...) case model.ActionTruncateTablePartition: - SetSchemaDiffForTruncateTablePartition(diff, job) - case model.ActionDropTablePartition, model.ActionRecoverTable, model.ActionDropTable: - SetSchemaDiffForDropTable(diff, job) + SetSchemaDiffForTruncateTablePartition(diff, job, jobCtx) + case model.ActionDropTablePartition: + SetSchemaDiffForDropTablePartition(diff, job, jobCtx) + case model.ActionRecoverTable: + SetSchemaDiffForRecoverTable(diff, job, jobCtx) + case model.ActionDropTable: + SetSchemaDiffForDropTable(diff, job, jobCtx) case model.ActionReorganizePartition: SetSchemaDiffForReorganizePartition(diff, job) case model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index 48852dc44e279..564cb4d818062 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -58,6 +58,12 @@ func repairTableOrViewWithCheck(t *meta.Mutator, job *model.Job, schemaID int64, } func onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error) { + args, err := model.GetDropTableArgs(job) + if err != nil { + job.State = model.JobStateCancelled + return 0, errors.Trace(err) + } + jobCtx.jobArgs = args tblInfo, err := checkTableExistAndCancelNonExistJob(jobCtx.metaMut, job, job.SchemaID) if err != nil { return ver, errors.Trace(err) @@ -68,7 +74,7 @@ func onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error) case model.StatePublic: // public -> write only if job.Type == model.ActionDropTable { - err = checkDropTableHasForeignKeyReferredInOwner(jobCtx.infoCache, job) + err = checkDropTableHasForeignKeyReferredInOwner(jobCtx.infoCache, job, args) if err != nil { return ver, err } @@ -89,8 +95,8 @@ func onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error) tblInfo.State = model.StateNone oldIDs := getPartitionIDs(tblInfo) ruleIDs := append(getPartitionRuleIDs(job.SchemaName, tblInfo), fmt.Sprintf(label.TableIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L)) - job.CtxVars = []any{oldIDs} + args.OldPartitionIDs = oldIDs ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != tblInfo.State) if err != nil { return ver, errors.Trace(err) @@ -143,6 +149,7 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64, job.State = model.JobStateCancelled return ver, errors.Trace(err) } + jobCtx.jobArgs = args recoverInfo := args.RecoverTableInfos()[0] schemaID := recoverInfo.SchemaID @@ -228,6 +235,15 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64, tableInfo := tblInfo.Clone() tableInfo.State = model.StatePublic tableInfo.UpdateTS = metaMut.StartTS + + var tids []int64 + if recoverInfo.TableInfo.GetPartitionInfo() != nil { + tids = getPartitionIDs(recoverInfo.TableInfo) + tids = append(tids, recoverInfo.TableInfo.ID) + } else { + tids = []int64{recoverInfo.TableInfo.ID} + } + args.AffectedPhysicalIDs = tids ver, err = updateVersionAndTableInfo(jobCtx, job, tableInfo, true) if err != nil { return ver, errors.Trace(err) @@ -243,13 +259,6 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64, } func (w *worker) recoverTable(t *meta.Mutator, job *model.Job, recoverInfo *model.RecoverTableInfo) (ver int64, err error) { - var tids []int64 - if recoverInfo.TableInfo.GetPartitionInfo() != nil { - tids = getPartitionIDs(recoverInfo.TableInfo) - tids = append(tids, recoverInfo.TableInfo.ID) - } else { - tids = []int64{recoverInfo.TableInfo.ID} - } tableRuleID, partRuleIDs, oldRuleIDs, oldRules, err := getOldLabelRules(recoverInfo.TableInfo, recoverInfo.OldSchemaName, recoverInfo.OldTableName) if err != nil { job.State = model.JobStateCancelled @@ -288,8 +297,6 @@ func (w *worker) recoverTable(t *meta.Mutator, job *model.Job, recoverInfo *mode return ver, errors.Wrapf(err, "failed to update the label rule to PD") } - // TODO(joechenrh): tid is used in SerSchemaDiffForDropTable, remove this after refactor done. - job.CtxVars = []any{tids} return ver, nil } @@ -433,6 +440,7 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64, job.State = model.JobStateCancelled return ver, errors.Trace(err) } + jobCtx.jobArgs = args metaMut := jobCtx.metaMut tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, schemaID) if err != nil { @@ -498,12 +506,8 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64, newIDs = append(newIDs, newID) } } - if job.Version == model.JobVersion1 { - job.CtxVars = []any{oldIDs, newIDs} - } else { - args.OldPartIDsWithPolicy = oldIDs - args.NewPartIDsWithPolicy = newIDs - } + args.OldPartIDsWithPolicy = oldIDs + args.NewPartIDsWithPolicy = newIDs } tableRuleID, partRuleIDs, _, oldRules, err := getOldLabelRules(tblInfo, job.SchemaName, tblInfo.Name.L) diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index fffb9286876f3..59596a7354c82 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -283,7 +283,8 @@ func GetBatchCreateTableArgs(job *Job) (*BatchCreateTableArgs, error) { // when dropping multiple objects, each object will have a separate job type DropTableArgs struct { // below fields are only for drop table. - // when dropping multiple tables, the Identifiers is the same. + // when dropping multiple tables, the Identifiers is the same, but each drop-table + // runs in a separate job. Identifiers []ast.Ident `json:"identifiers,omitempty"` FKCheck bool `json:"fk_check,omitempty"` @@ -306,8 +307,10 @@ func (a *DropTableArgs) getFinishedArgsV1(*Job) []any { } func (a *DropTableArgs) decodeV1(job *Job) error { - intest.Assert(job.Type == ActionDropTable, "only drop table job can call GetDropTableArgs") - return job.DecodeArgs(&a.Identifiers, &a.FKCheck) + if job.Type == ActionDropTable { + return job.DecodeArgs(&a.Identifiers, &a.FKCheck) + } + return nil } // GetDropTableArgs gets the drop-table args. @@ -343,8 +346,9 @@ type TruncateTableArgs struct { OldPartitionIDs []int64 `json:"old_partition_ids,omitempty"` // context vars - NewPartIDsWithPolicy []int64 `json:"-"` - OldPartIDsWithPolicy []int64 `json:"-"` + NewPartIDsWithPolicy []int64 `json:"-"` + OldPartIDsWithPolicy []int64 `json:"-"` + ShouldUpdateAffectedPartitions bool `json:"-"` } func (a *TruncateTableArgs) getArgsV1(job *Job) []any { @@ -1103,6 +1107,9 @@ func GetAlterTableAttributesArgs(job *Job) (*AlterTableAttributesArgs, error) { type RecoverArgs struct { RecoverInfo *RecoverSchemaInfo `json:"recover_info,omitempty"` CheckFlag int64 `json:"check_flag,omitempty"` + + // used during runtime + AffectedPhysicalIDs []int64 `json:"-"` } func (a *RecoverArgs) getArgsV1(job *Job) []any {