Skip to content

Commit

Permalink
ddl: fix placement issues (#44153)
Browse files Browse the repository at this point in the history
close #44031, close #44116
  • Loading branch information
lcwangchao authored May 24, 2023
1 parent e189cd5 commit 6807897
Show file tree
Hide file tree
Showing 3 changed files with 370 additions and 142 deletions.
66 changes: 66 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,25 @@ func alterTablePartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, addingDe
return bundles, nil
}

// When drop/truncate a partition, we should still keep the dropped partition's placement settings to avoid unnecessary region schedules.
// When a partition is not configured with a placement policy directly, its rule is in the table's placement group which will be deleted after
// partition truncated/dropped. So it is necessary to create a standalone placement group with partition id after it.
func droppedPartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, dropPartitions []model.PartitionDefinition) ([]*placement.Bundle, error) {
partitions := make([]model.PartitionDefinition, 0, len(dropPartitions))
for _, def := range dropPartitions {
def = def.Clone()
if def.PlacementPolicyRef == nil {
def.PlacementPolicyRef = tblInfo.PlacementPolicyRef
}

if def.PlacementPolicyRef != nil {
partitions = append(partitions, def)
}
}

return placement.NewPartitionListBundles(t, partitions)
}

// updatePartitionInfo merge `addingDefinitions` into `Definitions` in the tableInfo.
func updatePartitionInfo(tblInfo *model.TableInfo) {
parInfo := &model.PartitionInfo{}
Expand Down Expand Up @@ -1728,6 +1747,32 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
return ver, err
}

var bundles []*placement.Bundle
// create placement groups for each dropped partition to keep the data's placement before GC
// These placements groups will be deleted after GC
bundles, err = droppedPartitionBundles(t, tblInfo, tblInfo.Partition.DroppingDefinitions)
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}

var tableBundle *placement.Bundle
// Recompute table bundle to remove dropped partitions rules from its group
tableBundle, err = placement.NewTableBundle(t, tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if tableBundle != nil {
bundles = append(bundles, tableBundle)
}

if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, err
}

job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState)
case model.StateDeleteOnly:
Expand Down Expand Up @@ -1831,11 +1876,13 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
return ver, errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned)
}

oldPartitions := make([]model.PartitionDefinition, 0, len(oldIDs))
newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs))
for _, oldID := range oldIDs {
for i := 0; i < len(pi.Definitions); i++ {
def := &pi.Definitions[i]
if def.ID == oldID {
oldPartitions = append(oldPartitions, def.Clone())
pid, err1 := t.GenGlobalID()
if err1 != nil {
return ver, errors.Trace(err1)
Expand Down Expand Up @@ -1883,6 +1930,25 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
return ver, errors.Trace(err)
}

tableBundle, err := placement.NewTableBundle(t, tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if tableBundle != nil {
bundles = append(bundles, tableBundle)
}

// create placement groups for each dropped partition to keep the data's placement before GC
// These placements groups will be deleted after GC
keepDroppedBundles, err := droppedPartitionBundles(t, tblInfo, oldPartitions)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
bundles = append(bundles, keepDroppedBundles...)

err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
Expand Down
Loading

0 comments on commit 6807897

Please sign in to comment.