Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Reorganize partition one extra state #56974

Merged
merged 40 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6df378f
Better index management during rollback of partitioning DDLs
mjonss Oct 22, 2024
27978e0
Linting
mjonss Oct 22, 2024
8046319
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 22, 2024
10f5f61
Reworked index handling in Reorganize Partition.
mjonss Oct 23, 2024
cc4f5a8
Fixed assertion issues.
mjonss Oct 23, 2024
ee82920
reverted non-needed changes.
mjonss Oct 23, 2024
c45e3e9
Reverted non-needed changes.
mjonss Oct 23, 2024
16d1628
Simplified index management during partitioned table initialization.
mjonss Oct 24, 2024
a45bba6
Improved test and fixed more issues.
mjonss Oct 24, 2024
f68c29d
minor simplification and fixed failpoint leading to panic
mjonss Oct 24, 2024
ff3d980
Added test for verifying cleanup.
mjonss Oct 24, 2024
91fb5f9
Disabled test due to cleanup not fixed.
mjonss Oct 24, 2024
bf5defc
Simplified new partitions index states.
mjonss Oct 24, 2024
6103faf
Skipping global index entries changes.
mjonss Oct 24, 2024
f803d7b
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 28, 2024
526e781
Added extra state in REORGANIZE PARTITION
mjonss Oct 28, 2024
1d92581
Simplified shouldAssert to a skipAssert flag instead.
mjonss Oct 28, 2024
e2df898
Fixed schema version diffs
mjonss Oct 29, 2024
8224d5d
Fixed test failures and updated tests.
mjonss Oct 29, 2024
08fab0b
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 29, 2024
5112320
Merge branch 'reorg-part-fix-56634' into reorg-partition-one-extra-state
mjonss Oct 29, 2024
677ac4a
Linting
mjonss Oct 30, 2024
e3125e2
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 30, 2024
e8b54eb
Merge branch 'reorg-part-fix-56634' into reorg-partition-one-extra-state
mjonss Oct 30, 2024
28f21e4
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 30, 2024
b49c525
Merge branch 'reorg-part-fix-56634' into reorg-partition-one-extra-state
mjonss Oct 30, 2024
d4347d9
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Nov 1, 2024
f21b86e
Removed limitation of global index cannot have all partitioning columns
mjonss Nov 1, 2024
07df994
Merge branch 'reorg-part-fix-56634' into reorg-partition-one-extra-state
mjonss Nov 1, 2024
03d049e
review comments addressed, simplified if statement.
mjonss Nov 4, 2024
4f1c046
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Nov 5, 2024
83355db
Merge branch 'reorg-part-fix-56634' into reorg-partition-one-extra-state
mjonss Nov 5, 2024
05b13fb
Updated comments and removed dead code.
mjonss Nov 5, 2024
6599fe2
Merge remote-tracking branch 'pingcap/master' into reorg-partition-on…
mjonss Nov 15, 2024
332f235
Fixed test case
mjonss Nov 15, 2024
3c1d49e
Updated test
mjonss Nov 19, 2024
ff8e6d7
Merge remote-tracking branch 'pingcap/master' into reorg-partition-on…
mjonss Nov 19, 2024
f486ac5
Enabled test, which this PR fixes
mjonss Nov 20, 2024
e4f718f
Removed line that should have been for failpoint
mjonss Nov 20, 2024
d9ca928
Merge remote-tracking branch 'pingcap/master' into reorg-partition-on…
mjonss Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 68 additions & 41 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2986,36 +2986,29 @@ func getNewGlobal(partInfo *model.PartitionInfo, idx *model.IndexInfo) bool {
return idx.Global
}

func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, []model.PartitionDefinition, []model.PartitionDefinition, error) {
func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
partNames, partInfo := args.PartNames, args.PartInfo
var addingDefs, droppingDefs []model.PartitionDefinition
if tblInfo.Partition != nil {
addingDefs = tblInfo.Partition.AddingDefinitions
droppingDefs = tblInfo.Partition.DroppingDefinitions
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
} else {
tblInfo.Partition = getPartitionInfoTypeNone()
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.Definitions[0].ID = tblInfo.ID
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
}
if len(addingDefs) == 0 {
addingDefs = []model.PartitionDefinition{}
}
if len(droppingDefs) == 0 {
droppingDefs = []model.PartitionDefinition{}
if job.SchemaState == model.StateNone {
if tblInfo.Partition != nil {
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
} else {
tblInfo.Partition = getPartitionInfoTypeNone()
tblInfo.Partition.NewTableID = partInfo.NewTableID
tblInfo.Partition.Definitions[0].ID = tblInfo.ID
tblInfo.Partition.DDLType = partInfo.Type
tblInfo.Partition.DDLExpr = partInfo.Expr
tblInfo.Partition.DDLColumns = partInfo.Columns
}
}
return tblInfo, partNames, partInfo, droppingDefs, addingDefs, nil
return tblInfo, partNames, partInfo, nil
}

// onReorganizePartition reorganized the partitioning of a table including its indexes.
Expand All @@ -3037,8 +3030,9 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
//
// job.SchemaState goes through the following SchemaState(s):
// StateNone -> StateDeleteOnly -> StateWriteOnly -> StateWriteReorganization
// -> StateDeleteOrganization -> StatePublic
// -> StateDeleteOrganization -> StatePublic -> Done
// There are more details embedded in the implementation, but the high level changes are:
//
// StateNone -> StateDeleteOnly:
//
// Various checks and validations.
Expand All @@ -3064,13 +3058,20 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
// and if new unique indexes are added, it also updates them with the rest of data from
// the non-touched partitions.
// For indexes that are to be replaced with new ones (old/new global index),
// mark the old indexes as StateDeleteReorganization and new ones as StatePublic
// mark the old indexes as StateWriteOnly and new ones as StatePublic
// Finally make the table visible with the new partition definitions.
// I.e. in this state clients will read from the old set of partitions,
// and will read the new set of partitions in StateDeleteReorganization.
// and next state will read the new set of partitions in StateDeleteReorganization.
//
// StateDeleteOrganization -> StatePublic:
//
// Now we mark all replaced (old) indexes as StateDeleteOnly
// in case DeleteRange would be called directly after the DDL,
// this way there will be no orphan records inserted after DeleteRanges
// has cleaned up the old partitions and old global indexes.
//
// StatePublic -> Done:
//
// Now all heavy lifting is done, and we just need to finalize and drop things, while still doing
// double writes, since previous state sees the old partitions/indexes.
// Remove the old indexes and old partitions from the TableInfo.
Expand All @@ -3079,10 +3080,10 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
// if ALTER TABLE t PARTITION BY/REMOVE PARTITIONING:
// Recreate the table with the new TableID, by DropTableOrView+CreateTableOrView
//
// StatePublic:
// Done:
//
// Everything now looks as it should, no memory of old partitions/indexes,
// and no more double writing, since the previous state is only reading the new partitions/indexes.
// and no more double writing, since the previous state is only using the new partitions/indexes.
//
// Note: Special handling is also required in tables.newPartitionedTable(),
// to get per partition indexes in the right state.
Expand All @@ -3102,7 +3103,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, nil
}

tblInfo, partNames, partInfo, _, addingDefinitions, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
tblInfo, partNames, partInfo, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
if err != nil {
return ver, err
}
Expand Down Expand Up @@ -3331,7 +3332,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// For available state, the new added partition should wait its replica to
// be finished, otherwise the query to this partition will be blocked.
count := tblInfo.TiFlashReplica.Count
needRetry, err := checkPartitionReplica(count, addingDefinitions, jobCtx)
needRetry, err := checkPartitionReplica(count, tblInfo.Partition.AddingDefinitions, jobCtx)
if err != nil {
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
Expand All @@ -3345,7 +3346,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver

// When TiFlash Replica is ready, we must move them into `AvailablePartitionIDs`.
// Since onUpdateFlashReplicaStatus cannot see the partitions yet (not public)
for _, d := range addingDefinitions {
for _, d := range tblInfo.Partition.AddingDefinitions {
tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, d.ID)
}
}
Expand Down Expand Up @@ -3460,6 +3461,37 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)

case model.StateDeleteReorganization:
// Need to have one more state before completing, due to:
// - DeleteRanges could possibly start directly after DDL causing
// inserts during previous state (DeleteReorg) could insert after the cleanup
// leaving data in dropped partitions/indexes that will not be cleaned up again.
// - Updates in previous state (DeleteReorg) could have duplicate errors, if the row
// was deleted or updated in after finish (so here we need to have DeleteOnly index state!
// And we cannot rollback in this state!

// Stop double writing to the indexes, only do Deletes!
// so that previous could do inserts, we do delete and allow second insert for
// previous state clients!
for _, index := range tblInfo.Indices {
isNew, ok := tblInfo.Partition.DDLChangedIndex[index.ID]
if !ok || isNew {
continue
}
// Old index, should not be visible any longer,
// but needs to be deleted, in case previous state clients inserts.
index.State = model.StateDeleteOnly
}
failpoint.Inject("reorgPartFail3", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
failpoint.Return(ver, errors.New("Injected error by reorgPartFail3"))
}
})
job.SchemaState = model.StatePublic
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)

case model.StatePublic:
// Drop the droppingDefinitions and finish the DDL
// This state is needed for the case where client A sees the schema
// with version of StateWriteReorg and would not see updates of
Expand All @@ -3484,7 +3516,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique && indexInfo.State == model.StateWriteOnly {
if indexInfo.Unique && indexInfo.State == model.StateDeleteOnly {
// Drop the old unique (possible global) index, see onDropIndex
indexInfo.State = model.StateNone
DropIndexColumnFlag(tblInfo, indexInfo)
Expand All @@ -3499,10 +3531,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
for _, indexInfo := range dropIndices {
removeIndexInfo(tblInfo, indexInfo)
}
failpoint.Inject("reorgPartFail3", func(val failpoint.Value) {
failpoint.Inject("reorgPartFail4", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
failpoint.Return(ver, errors.New("Injected error by reorgPartFail3"))
failpoint.Return(ver, errors.New("Injected error by reorgPartFail4"))
}
})
var oldTblID int64
Expand Down Expand Up @@ -3536,12 +3568,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// ALTER TABLE ... PARTITION BY
tblInfo.Partition.ClearReorgIntermediateInfo()
}
failpoint.Inject("reorgPartFail4", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
failpoint.Return(ver, errors.New("Injected error by reorgPartFail4"))
}
})
err = metaMut.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -3562,6 +3588,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
})
args.OldPhysicalTblIDs = physicalTableIDs
args.NewPartitionIDs = newIDs
job.SchemaState = model.StateNone
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ func onRollbackReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int6
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if job.SchemaState == model.StatePublic {
// We started to destroy the old indexes, so we can no longer rollback!
job.State = model.JobStateRunning
return ver, nil
}
jobCtx.jobArgs = args

return rollbackReorganizePartitionWithErr(jobCtx, job, dbterror.ErrCancelledDDLJob)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func SetSchemaDiffForReorganizePartition(diff *model.SchemaDiff, job *model.Job,
func SetSchemaDiffForPartitionModify(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
diff.TableID = job.TableID
diff.OldTableID = job.TableID
if job.SchemaState == model.StateDeleteReorganization {
if job.SchemaState == model.StateNone {
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
partInfo := args.PartInfo
// Final part, new table id is assigned
Expand Down
43 changes: 29 additions & 14 deletions pkg/ddl/tests/partition/multi_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,25 @@ func TestMultiSchemaReorganizePartition(t *testing.T) {
tkO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID))
tkNO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID)))

logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID))
logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID), zap.String("state", schemaState))

testID++
tkNO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID))
tkO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID)))

// Test for Index, specially between WriteOnly and DeleteOnly, but better to test all states.
// if tkNO (DeleteOnly) updates a row, the new index should be deleted, but not inserted.
// It will be inserted by backfill in WriteReorganize.
// If not deleted, then there would be an orphan entry in the index!
tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+100, testID))
tkNO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100)))
tkNO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+100)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100)))
tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+99, testID-1))
tkO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID-1)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99)))
tkO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+99)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99)))
tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID, testID))
tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID-1, testID-1))

switch schemaState {
case model.StateDeleteOnly.String():
// tkNO sees original table/partitions as before the DDL stated
Expand Down Expand Up @@ -387,16 +400,19 @@ func TestMultiSchemaReorganizePartition(t *testing.T) {
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (200),\n" +
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
case model.StatePublic.String():
// not tested, both tkO and tkNO sees same partitions
case model.StateNone.String():
// not tested, both tkO and tkNO sees same partitions
default:
require.Failf(t, "unhandled schema state '%s'", schemaState)
require.Failf(t, "unhandled schema state", "State '%s'", schemaState)
}
}
postFn := func(tkO *testkit.TestKit, store kv.Storage) {
tkO.MustQuery(`select * from t where b = 5`).Sort().Check(testkit.Rows("5 5"))
tkO.MustQuery(`select * from t where b = "5"`).Sort().Check(testkit.Rows("5 5"))
tkO.MustExec(`admin check table t`)
tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "998 998", "999 999"))
tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "15 15", "16 16", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "994 994", "995 995", "998 998", "999 999"))
// TODO: Verify that there are no KV entries for old partitions or old indexes!!!
delRange := tkO.MustQuery(`select * from mysql.gc_delete_range_done`).Rows()
s := ""
Expand Down Expand Up @@ -441,17 +457,16 @@ func TestMultiSchemaReorganizePartition(t *testing.T) {
require.False(t, HaveEntriesForTableIndex(t, tkO, part.ID, locIdx), "Local index id %d for partition id %d has still entries!", locIdx, tableID)
}
}
// TODO: Fix cleanup issues, most likely it needs one more SchemaState in onReorganizePartition
//PartitionLoop:
// for _, partID := range originalPartitions {
// for _, def := range tbl.Meta().Partition.Definitions {
// if def.ID == partID {
// continue PartitionLoop
// }
// }
// // old partitions removed
// require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID)
// }
PartitionLoop:
for _, partID := range originalPartitions {
for _, def := range tbl.Meta().Partition.Definitions {
if def.ID == partID {
continue PartitionLoop
}
}
// old partitions removed
require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID)
}
}
runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn)
}
Expand Down
Loading