Skip to content

Commit

Permalink
*: Reorganize partition one extra state (pingcap#56974)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss committed Nov 29, 2024
1 parent f07d030 commit ac46b59
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 141 deletions.
109 changes: 68 additions & 41 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3017,36 +3017,29 @@ func getNewGlobal(partInfo *model.PartitionInfo, idx *model.IndexInfo) bool {
return idx.Global
}

func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, []model.PartitionDefinition, []model.PartitionDefinition, error) {
func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
partNames, partInfo := args.PartNames, args.PartInfo
var addingDefs, droppingDefs []model.PartitionDefinition
if tblInfo.Partition != nil {
addingDefs = tblInfo.Partition.AddingDefinitions
droppingDefs = tblInfo.Partition.DroppingDefinitions
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
} else {
tblInfo.Partition = getPartitionInfoTypeNone()
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.Definitions[0].ID = tblInfo.ID
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
}
if len(addingDefs) == 0 {
addingDefs = []model.PartitionDefinition{}
}
if len(droppingDefs) == 0 {
droppingDefs = []model.PartitionDefinition{}
if job.SchemaState == model.StateNone {
if tblInfo.Partition != nil {
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
} else {
tblInfo.Partition = getPartitionInfoTypeNone()
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.Definitions[0].ID = tblInfo.ID
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
}
}
return tblInfo, partNames, partInfo, droppingDefs, addingDefs, nil
return tblInfo, partNames, partInfo, nil
}

// onReorganizePartition reorganized the partitioning of a table including its indexes.
Expand All @@ -3068,8 +3061,9 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
//
// job.SchemaState goes through the following SchemaState(s):
// StateNone -> StateDeleteOnly -> StateWriteOnly -> StateWriteReorganization
// -> StateDeleteOrganization -> StatePublic
// -> StateDeleteOrganization -> StatePublic -> Done
// There are more details embedded in the implementation, but the high level changes are:
//
// StateNone -> StateDeleteOnly:
//
// Various checks and validations.
Expand All @@ -3095,13 +3089,20 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
// and if new unique indexes are added, it also updates them with the rest of data from
// the non-touched partitions.
// For indexes that are to be replaced with new ones (old/new global index),
// mark the old indexes as StateDeleteReorganization and new ones as StatePublic
// mark the old indexes as StateWriteOnly and new ones as StatePublic
// Finally make the table visible with the new partition definitions.
// I.e. in this state clients will read from the old set of partitions,
// and will read the new set of partitions in StateDeleteReorganization.
// and next state will read the new set of partitions in StateDeleteReorganization.
//
// StateDeleteOrganization -> StatePublic:
//
// Now we mark all replaced (old) indexes as StateDeleteOnly
// in case DeleteRange would be called directly after the DDL,
// this way there will be no orphan records inserted after DeleteRanges
// has cleaned up the old partitions and old global indexes.
//
// StatePublic -> Done:
//
// Now all heavy lifting is done, and we just need to finalize and drop things, while still doing
// double writes, since previous state sees the old partitions/indexes.
// Remove the old indexes and old partitions from the TableInfo.
Expand All @@ -3110,10 +3111,10 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
// if ALTER TABLE t PARTITION BY/REMOVE PARTITIONING:
// Recreate the table with the new TableID, by DropTableOrView+CreateTableOrView
//
// StatePublic:
// Done:
//
// Everything now looks as it should, no memory of old partitions/indexes,
// and no more double writing, since the previous state is only reading the new partitions/indexes.
// and no more double writing, since the previous state is only using the new partitions/indexes.
//
// Note: Special handling is also required in tables.newPartitionedTable(),
// to get per partition indexes in the right state.
Expand All @@ -3133,7 +3134,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, nil
}

tblInfo, partNames, partInfo, _, addingDefinitions, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
tblInfo, partNames, partInfo, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
if err != nil {
return ver, err
}
Expand Down Expand Up @@ -3362,7 +3363,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// For available state, the new added partition should wait its replica to
// be finished, otherwise the query to this partition will be blocked.
count := tblInfo.TiFlashReplica.Count
needRetry, err := checkPartitionReplica(count, addingDefinitions, jobCtx)
needRetry, err := checkPartitionReplica(count, tblInfo.Partition.AddingDefinitions, jobCtx)
if err != nil {
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
Expand All @@ -3376,7 +3377,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver

// When TiFlash Replica is ready, we must move them into `AvailablePartitionIDs`.
// Since onUpdateFlashReplicaStatus cannot see the partitions yet (not public)
for _, d := range addingDefinitions {
for _, d := range tblInfo.Partition.AddingDefinitions {
tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, d.ID)
}
}
Expand Down Expand Up @@ -3491,6 +3492,37 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)

case model.StateDeleteReorganization:
// Need to have one more state before completing, due to:
// - DeleteRanges could possibly start directly after DDL causing
// inserts during previous state (DeleteReorg) could insert after the cleanup
// leaving data in dropped partitions/indexes that will not be cleaned up again.
// - Updates in previous state (DeleteReorg) could have duplicate errors, if the row
// was deleted or updated in after finish (so here we need to have DeleteOnly index state!
// And we cannot rollback in this state!

// Stop double writing to the indexes, only do Deletes!
// so that previous could do inserts, we do delete and allow second insert for
// previous state clients!
for _, index := range tblInfo.Indices {
isNew, ok := tblInfo.Partition.DDLChangedIndex[index.ID]
if !ok || isNew {
continue
}
// Old index, should not be visible any longer,
// but needs to be deleted, in case previous state clients inserts.
index.State = model.StateDeleteOnly
}
failpoint.Inject("reorgPartFail3", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
failpoint.Return(ver, errors.New("Injected error by reorgPartFail3"))
}
})
job.SchemaState = model.StatePublic
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)

case model.StatePublic:
// Drop the droppingDefinitions and finish the DDL
// This state is needed for the case where client A sees the schema
// with version of StateWriteReorg and would not see updates of
Expand All @@ -3515,7 +3547,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique && indexInfo.State == model.StateWriteOnly {
if indexInfo.Unique && indexInfo.State == model.StateDeleteOnly {
// Drop the old unique (possible global) index, see onDropIndex
indexInfo.State = model.StateNone
DropIndexColumnFlag(tblInfo, indexInfo)
Expand All @@ -3530,10 +3562,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
for _, indexInfo := range dropIndices {
removeIndexInfo(tblInfo, indexInfo)
}
failpoint.Inject("reorgPartFail3", func(val failpoint.Value) {
failpoint.Inject("reorgPartFail4", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
failpoint.Return(ver, errors.New("Injected error by reorgPartFail3"))
failpoint.Return(ver, errors.New("Injected error by reorgPartFail4"))
}
})
var oldTblID int64
Expand Down Expand Up @@ -3567,12 +3599,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// ALTER TABLE ... PARTITION BY
tblInfo.Partition.ClearReorgIntermediateInfo()
}
failpoint.Inject("reorgPartFail4", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
failpoint.Return(ver, errors.New("Injected error by reorgPartFail4"))
}
})
err = metaMut.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -3593,6 +3619,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
})
args.OldPhysicalTblIDs = physicalTableIDs
args.NewPartitionIDs = newIDs
job.SchemaState = model.StateNone
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,11 @@ func onRollbackReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int6
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if job.SchemaState == model.StatePublic {
// We started to destroy the old indexes, so we can no longer rollback!
job.State = model.JobStateRunning
return ver, nil
}
jobCtx.jobArgs = args

return rollbackReorganizePartitionWithErr(jobCtx, job, dbterror.ErrCancelledDDLJob)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func SetSchemaDiffForReorganizePartition(diff *model.SchemaDiff, job *model.Job,
func SetSchemaDiffForPartitionModify(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
diff.TableID = job.TableID
diff.OldTableID = job.TableID
if job.SchemaState == model.StateDeleteReorganization {
if job.SchemaState == model.StateNone {
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
partInfo := args.PartInfo
// Final part, new table id is assigned
Expand Down
43 changes: 29 additions & 14 deletions pkg/ddl/tests/partition/multi_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,25 @@ func TestMultiSchemaReorganizePartition(t *testing.T) {
tkO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID))
tkNO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID)))

logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID))
logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID), zap.String("state", schemaState))

testID++
tkNO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID))
tkO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID)))

// Test for Index, specially between WriteOnly and DeleteOnly, but better to test all states.
// if tkNO (DeleteOnly) updates a row, the new index should be deleted, but not inserted.
// It will be inserted by backfill in WriteReorganize.
// If not deleted, then there would be an orphan entry in the index!
tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+100, testID))
tkNO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100)))
tkNO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+100)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100)))
tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+99, testID-1))
tkO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID-1)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99)))
tkO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+99)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99)))
tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID, testID))
tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID-1, testID-1))

switch schemaState {
case model.StateDeleteOnly.String():
// tkNO sees original table/partitions as before the DDL stated
Expand Down Expand Up @@ -387,16 +400,19 @@ func TestMultiSchemaReorganizePartition(t *testing.T) {
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (200),\n" +
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
case model.StatePublic.String():
// not tested, both tkO and tkNO sees same partitions
case model.StateNone.String():
// not tested, both tkO and tkNO sees same partitions
default:
require.Failf(t, "unhandled schema state '%s'", schemaState)
require.Failf(t, "unhandled schema state", "State '%s'", schemaState)
}
}
postFn := func(tkO *testkit.TestKit, store kv.Storage) {
tkO.MustQuery(`select * from t where b = 5`).Sort().Check(testkit.Rows("5 5"))
tkO.MustQuery(`select * from t where b = "5"`).Sort().Check(testkit.Rows("5 5"))
tkO.MustExec(`admin check table t`)
tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "998 998", "999 999"))
tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "15 15", "16 16", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "994 994", "995 995", "998 998", "999 999"))
// TODO: Verify that there are no KV entries for old partitions or old indexes!!!
delRange := tkO.MustQuery(`select * from mysql.gc_delete_range_done`).Rows()
s := ""
Expand Down Expand Up @@ -441,17 +457,16 @@ func TestMultiSchemaReorganizePartition(t *testing.T) {
require.False(t, HaveEntriesForTableIndex(t, tkO, part.ID, locIdx), "Local index id %d for partition id %d has still entries!", locIdx, tableID)
}
}
// TODO: Fix cleanup issues, most likely it needs one more SchemaState in onReorganizePartition
//PartitionLoop:
// for _, partID := range originalPartitions {
// for _, def := range tbl.Meta().Partition.Definitions {
// if def.ID == partID {
// continue PartitionLoop
// }
// }
// // old partitions removed
// require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID)
// }
PartitionLoop:
for _, partID := range originalPartitions {
for _, def := range tbl.Meta().Partition.Definitions {
if def.ID == partID {
continue PartitionLoop
}
}
// old partitions removed
require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID)
}
}
runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn)
}
Expand Down
Loading

0 comments on commit ac46b59

Please sign in to comment.