Skip to content

Commit

Permalink
ddl: use job args to pass runtime info for truncate/drop/recover tabl…
Browse files Browse the repository at this point in the history
…e/partition (#56632)

ref #54436
  • Loading branch information
D3Hunter authored Oct 14, 2024
1 parent 10647c9 commit 87f42ea
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 62 deletions.
7 changes: 1 addition & 6 deletions pkg/ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,10 @@ func checkTableHasForeignKeyReferred(is infoschemactx.MetaOnlyInfoSchema, schema
return nil
}

func checkDropTableHasForeignKeyReferredInOwner(infoCache *infoschema.InfoCache, job *model.Job) error {
func checkDropTableHasForeignKeyReferredInOwner(infoCache *infoschema.InfoCache, job *model.Job, args *model.DropTableArgs) error {
if !variable.EnableForeignKey.Load() {
return nil
}
args, err := model.GetDropTableArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return errors.Trace(err)
}
objectIdents, fkCheck := args.Identifiers, args.FKCheck
referredFK, err := checkTableHasForeignKeyReferredInOwner(infoCache, job.SchemaName, job.TableName, objectIdents, fkCheck)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2236,6 +2236,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
jobCtx.jobArgs = args
partNames := args.PartNames
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
Expand Down Expand Up @@ -2378,7 +2379,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []any{physicalTableIDs} // TODO remove it.
args.OldPhysicalTblIDs = physicalTableIDs
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -2391,7 +2392,6 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
)
asyncNotifyEvent(jobCtx, dropPartitionEvent, job)
// A background job will be created to delete old partition data.
args.OldPhysicalTblIDs = physicalTableIDs
job.FillFinishedArgs(args)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState)
Expand Down Expand Up @@ -2424,6 +2424,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
jobCtx.jobArgs = args
oldIDs, newIDs := args.OldPartitionIDs, args.NewPartitionIDs
if len(oldIDs) != len(newIDs) {
job.State = model.JobStateCancelled
Expand Down Expand Up @@ -2471,7 +2472,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i

preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions)

job.CtxVars = []any{oldIDs, newIDs}
args.ShouldUpdateAffectedPartitions = true
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -2613,7 +2614,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions)

// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []any{oldIDs, newIDs}
args.ShouldUpdateAffectedPartitions = true
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
71 changes: 41 additions & 30 deletions pkg/ddl/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,15 @@ func SetSchemaDiffForCreateTables(diff *model.SchemaDiff, job *model.Job) error
}

// SetSchemaDiffForTruncateTable set SchemaDiff for ActionTruncateTable.
func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job) error {
func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) error {
// Truncate table has two table ID, should be handled differently.
args, err := model.GetTruncateTableArgs(job)
if err != nil {
return errors.Trace(err)
}
args := jobCtx.jobArgs.(*model.TruncateTableArgs)
diff.TableID = args.NewTableID
diff.OldTableID = job.TableID

// affects are used to update placement rule cache
if job.Version == model.JobVersion1 {
if len(job.CtxVars) > 0 {
oldIDs := job.CtxVars[0].([]int64)
newIDs := job.CtxVars[1].([]int64)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
} else {
if len(args.OldPartIDsWithPolicy) > 0 {
diff.AffectedOpts = buildPlacementAffects(args.OldPartIDsWithPolicy, args.NewPartIDsWithPolicy)
}
if len(args.OldPartIDsWithPolicy) > 0 {
diff.AffectedOpts = buildPlacementAffects(args.OldPartIDsWithPolicy, args.NewPartIDsWithPolicy)
}
return nil
}
Expand Down Expand Up @@ -170,23 +159,41 @@ func SetSchemaDiffForExchangeTablePartition(diff *model.SchemaDiff, job *model.J
}

// SetSchemaDiffForTruncateTablePartition set SchemaDiff for ActionTruncateTablePartition.
func SetSchemaDiffForTruncateTablePartition(diff *model.SchemaDiff, job *model.Job) {
func SetSchemaDiffForTruncateTablePartition(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
oldIDs := job.CtxVars[0].([]int64)
newIDs := job.CtxVars[1].([]int64)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
args := jobCtx.jobArgs.(*model.TruncateTableArgs)
if args.ShouldUpdateAffectedPartitions {
diff.AffectedOpts = buildPlacementAffects(args.OldPartitionIDs, args.NewPartitionIDs)
}
}

// SetSchemaDiffForDropTable set SchemaDiff for ActionDropTablePartition, ActionRecoverTable, ActionDropTable.
func SetSchemaDiffForDropTable(diff *model.SchemaDiff, job *model.Job) {
// SetSchemaDiffForDropTable set SchemaDiff for ActionDropTable.
func SetSchemaDiffForDropTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
// affects are used to update placement rule cache
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
if oldIDs, ok := job.CtxVars[0].([]int64); ok {
diff.AffectedOpts = buildPlacementAffects(oldIDs, oldIDs)
}
args := jobCtx.jobArgs.(*model.DropTableArgs)
if len(args.OldPartitionIDs) > 0 {
diff.AffectedOpts = buildPlacementAffects(args.OldPartitionIDs, args.OldPartitionIDs)
}
}

// SetSchemaDiffForDropTablePartition set SchemaDiff for ActionDropTablePartition.
func SetSchemaDiffForDropTablePartition(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
// affects are used to update placement rule cache
diff.TableID = job.TableID
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
if len(args.OldPhysicalTblIDs) > 0 {
diff.AffectedOpts = buildPlacementAffects(args.OldPhysicalTblIDs, args.OldPhysicalTblIDs)
}
}

// SetSchemaDiffForRecoverTable set SchemaDiff for ActionRecoverTable.
func SetSchemaDiffForRecoverTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
// affects are used to update placement rule cache
diff.TableID = job.TableID
args := jobCtx.jobArgs.(*model.RecoverArgs)
if len(args.AffectedPhysicalIDs) > 0 {
diff.AffectedOpts = buildPlacementAffects(args.AffectedPhysicalIDs, args.AffectedPhysicalIDs)
}
}

Expand Down Expand Up @@ -330,7 +337,7 @@ func updateSchemaVersion(jobCtx *jobContext, job *model.Job, multiInfos ...schem
case model.ActionCreateTables:
err = SetSchemaDiffForCreateTables(diff, job)
case model.ActionTruncateTable:
err = SetSchemaDiffForTruncateTable(diff, job)
err = SetSchemaDiffForTruncateTable(diff, job, jobCtx)
case model.ActionCreateView:
err = SetSchemaDiffForCreateView(diff, job)
case model.ActionRenameTable:
Expand All @@ -340,9 +347,13 @@ func updateSchemaVersion(jobCtx *jobContext, job *model.Job, multiInfos ...schem
case model.ActionExchangeTablePartition:
err = SetSchemaDiffForExchangeTablePartition(diff, job, multiInfos...)
case model.ActionTruncateTablePartition:
SetSchemaDiffForTruncateTablePartition(diff, job)
case model.ActionDropTablePartition, model.ActionRecoverTable, model.ActionDropTable:
SetSchemaDiffForDropTable(diff, job)
SetSchemaDiffForTruncateTablePartition(diff, job, jobCtx)
case model.ActionDropTablePartition:
SetSchemaDiffForDropTablePartition(diff, job, jobCtx)
case model.ActionRecoverTable:
SetSchemaDiffForRecoverTable(diff, job, jobCtx)
case model.ActionDropTable:
SetSchemaDiffForDropTable(diff, job, jobCtx)
case model.ActionReorganizePartition:
SetSchemaDiffForReorganizePartition(diff, job)
case model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
Expand Down
38 changes: 21 additions & 17 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func repairTableOrViewWithCheck(t *meta.Mutator, job *model.Job, schemaID int64,
}

func onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetDropTableArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Trace(err)
}
jobCtx.jobArgs = args
tblInfo, err := checkTableExistAndCancelNonExistJob(jobCtx.metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -68,7 +74,7 @@ func onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error)
case model.StatePublic:
// public -> write only
if job.Type == model.ActionDropTable {
err = checkDropTableHasForeignKeyReferredInOwner(jobCtx.infoCache, job)
err = checkDropTableHasForeignKeyReferredInOwner(jobCtx.infoCache, job, args)
if err != nil {
return ver, err
}
Expand All @@ -89,8 +95,8 @@ func onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error)
tblInfo.State = model.StateNone
oldIDs := getPartitionIDs(tblInfo)
ruleIDs := append(getPartitionRuleIDs(job.SchemaName, tblInfo), fmt.Sprintf(label.TableIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L))
job.CtxVars = []any{oldIDs}

args.OldPartitionIDs = oldIDs
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != tblInfo.State)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -143,6 +149,7 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64,
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
jobCtx.jobArgs = args
recoverInfo := args.RecoverTableInfos()[0]

schemaID := recoverInfo.SchemaID
Expand Down Expand Up @@ -228,6 +235,15 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64,
tableInfo := tblInfo.Clone()
tableInfo.State = model.StatePublic
tableInfo.UpdateTS = metaMut.StartTS

var tids []int64
if recoverInfo.TableInfo.GetPartitionInfo() != nil {
tids = getPartitionIDs(recoverInfo.TableInfo)
tids = append(tids, recoverInfo.TableInfo.ID)
} else {
tids = []int64{recoverInfo.TableInfo.ID}
}
args.AffectedPhysicalIDs = tids
ver, err = updateVersionAndTableInfo(jobCtx, job, tableInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -243,13 +259,6 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64,
}

func (w *worker) recoverTable(t *meta.Mutator, job *model.Job, recoverInfo *model.RecoverTableInfo) (ver int64, err error) {
var tids []int64
if recoverInfo.TableInfo.GetPartitionInfo() != nil {
tids = getPartitionIDs(recoverInfo.TableInfo)
tids = append(tids, recoverInfo.TableInfo.ID)
} else {
tids = []int64{recoverInfo.TableInfo.ID}
}
tableRuleID, partRuleIDs, oldRuleIDs, oldRules, err := getOldLabelRules(recoverInfo.TableInfo, recoverInfo.OldSchemaName, recoverInfo.OldTableName)
if err != nil {
job.State = model.JobStateCancelled
Expand Down Expand Up @@ -288,8 +297,6 @@ func (w *worker) recoverTable(t *meta.Mutator, job *model.Job, recoverInfo *mode
return ver, errors.Wrapf(err, "failed to update the label rule to PD")
}

// TODO(joechenrh): tid is used in SerSchemaDiffForDropTable, remove this after refactor done.
job.CtxVars = []any{tids}
return ver, nil
}

Expand Down Expand Up @@ -433,6 +440,7 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64,
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
jobCtx.jobArgs = args
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, schemaID)
if err != nil {
Expand Down Expand Up @@ -498,12 +506,8 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64,
newIDs = append(newIDs, newID)
}
}
if job.Version == model.JobVersion1 {
job.CtxVars = []any{oldIDs, newIDs}
} else {
args.OldPartIDsWithPolicy = oldIDs
args.NewPartIDsWithPolicy = newIDs
}
args.OldPartIDsWithPolicy = oldIDs
args.NewPartIDsWithPolicy = newIDs
}

tableRuleID, partRuleIDs, _, oldRules, err := getOldLabelRules(tblInfo, job.SchemaName, tblInfo.Name.L)
Expand Down
17 changes: 12 additions & 5 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ func GetBatchCreateTableArgs(job *Job) (*BatchCreateTableArgs, error) {
// when dropping multiple objects, each object will have a separate job
type DropTableArgs struct {
// below fields are only for drop table.
// when dropping multiple tables, the Identifiers is the same.
// when dropping multiple tables, the Identifiers is the same, but each drop-table
// runs in a separate job.
Identifiers []ast.Ident `json:"identifiers,omitempty"`
FKCheck bool `json:"fk_check,omitempty"`

Expand All @@ -306,8 +307,10 @@ func (a *DropTableArgs) getFinishedArgsV1(*Job) []any {
}

func (a *DropTableArgs) decodeV1(job *Job) error {
intest.Assert(job.Type == ActionDropTable, "only drop table job can call GetDropTableArgs")
return job.DecodeArgs(&a.Identifiers, &a.FKCheck)
if job.Type == ActionDropTable {
return job.DecodeArgs(&a.Identifiers, &a.FKCheck)
}
return nil
}

// GetDropTableArgs gets the drop-table args.
Expand Down Expand Up @@ -343,8 +346,9 @@ type TruncateTableArgs struct {
OldPartitionIDs []int64 `json:"old_partition_ids,omitempty"`

// context vars
NewPartIDsWithPolicy []int64 `json:"-"`
OldPartIDsWithPolicy []int64 `json:"-"`
NewPartIDsWithPolicy []int64 `json:"-"`
OldPartIDsWithPolicy []int64 `json:"-"`
ShouldUpdateAffectedPartitions bool `json:"-"`
}

func (a *TruncateTableArgs) getArgsV1(job *Job) []any {
Expand Down Expand Up @@ -1103,6 +1107,9 @@ func GetAlterTableAttributesArgs(job *Job) (*AlterTableAttributesArgs, error) {
type RecoverArgs struct {
RecoverInfo *RecoverSchemaInfo `json:"recover_info,omitempty"`
CheckFlag int64 `json:"check_flag,omitempty"`

// used during runtime
AffectedPhysicalIDs []int64 `json:"-"`
}

func (a *RecoverArgs) getArgsV1(job *Job) []any {
Expand Down

0 comments on commit 87f42ea

Please sign in to comment.