diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 43bcd1a785001..f7be3609ea440 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -3017,36 +3017,29 @@ func getNewGlobal(partInfo *model.PartitionInfo, idx *model.IndexInfo) bool { return idx.Global } -func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, []model.PartitionDefinition, []model.PartitionDefinition, error) { +func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, error) { schemaID := job.SchemaID tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) if err != nil { - return nil, nil, nil, nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } partNames, partInfo := args.PartNames, args.PartInfo - var addingDefs, droppingDefs []model.PartitionDefinition - if tblInfo.Partition != nil { - addingDefs = tblInfo.Partition.AddingDefinitions - droppingDefs = tblInfo.Partition.DroppingDefinitions - tblInfo.Partition.NewTableID = partInfo.NewTableID - tblInfo.Partition.DDLType = partInfo.Type - tblInfo.Partition.DDLExpr = partInfo.Expr - tblInfo.Partition.DDLColumns = partInfo.Columns - } else { - tblInfo.Partition = getPartitionInfoTypeNone() - tblInfo.Partition.NewTableID = partInfo.NewTableID - tblInfo.Partition.Definitions[0].ID = tblInfo.ID - tblInfo.Partition.DDLType = partInfo.Type - tblInfo.Partition.DDLExpr = partInfo.Expr - tblInfo.Partition.DDLColumns = partInfo.Columns - } - if len(addingDefs) == 0 { - addingDefs = []model.PartitionDefinition{} - } - if len(droppingDefs) == 0 { - droppingDefs = []model.PartitionDefinition{} + if job.SchemaState == model.StateNone { + if tblInfo.Partition != nil { + tblInfo.Partition.NewTableID = partInfo.NewTableID + tblInfo.Partition.DDLType = partInfo.Type + tblInfo.Partition.DDLExpr = partInfo.Expr + tblInfo.Partition.DDLColumns = partInfo.Columns + } else { + tblInfo.Partition = getPartitionInfoTypeNone() + tblInfo.Partition.NewTableID = partInfo.NewTableID + tblInfo.Partition.Definitions[0].ID = tblInfo.ID + tblInfo.Partition.DDLType = partInfo.Type + tblInfo.Partition.DDLExpr = partInfo.Expr + tblInfo.Partition.DDLColumns = partInfo.Columns + } } - return tblInfo, partNames, partInfo, droppingDefs, addingDefs, nil + return tblInfo, partNames, partInfo, nil } // onReorganizePartition reorganized the partitioning of a table including its indexes. @@ -3068,8 +3061,9 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar // // job.SchemaState goes through the following SchemaState(s): // StateNone -> StateDeleteOnly -> StateWriteOnly -> StateWriteReorganization -// -> StateDeleteOrganization -> StatePublic +// -> StateDeleteOrganization -> StatePublic -> Done // There are more details embedded in the implementation, but the high level changes are: +// // StateNone -> StateDeleteOnly: // // Various checks and validations. @@ -3095,13 +3089,20 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar // and if new unique indexes are added, it also updates them with the rest of data from // the non-touched partitions. // For indexes that are to be replaced with new ones (old/new global index), -// mark the old indexes as StateDeleteReorganization and new ones as StatePublic +// mark the old indexes as StateWriteOnly and new ones as StatePublic // Finally make the table visible with the new partition definitions. // I.e. in this state clients will read from the old set of partitions, -// and will read the new set of partitions in StateDeleteReorganization. +// and next state will read the new set of partitions in StateDeleteReorganization. // // StateDeleteOrganization -> StatePublic: // +// Now we mark all replaced (old) indexes as StateDeleteOnly +// in case DeleteRange would be called directly after the DDL, +// this way there will be no orphan records inserted after DeleteRanges +// has cleaned up the old partitions and old global indexes. +// +// StatePublic -> Done: +// // Now all heavy lifting is done, and we just need to finalize and drop things, while still doing // double writes, since previous state sees the old partitions/indexes. // Remove the old indexes and old partitions from the TableInfo. @@ -3110,10 +3111,10 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar // if ALTER TABLE t PARTITION BY/REMOVE PARTITIONING: // Recreate the table with the new TableID, by DropTableOrView+CreateTableOrView // -// StatePublic: +// Done: // // Everything now looks as it should, no memory of old partitions/indexes, -// and no more double writing, since the previous state is only reading the new partitions/indexes. +// and no more double writing, since the previous state is only using the new partitions/indexes. // // Note: Special handling is also required in tables.newPartitionedTable(), // to get per partition indexes in the right state. @@ -3133,7 +3134,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver return ver, nil } - tblInfo, partNames, partInfo, _, addingDefinitions, err := getReorgPartitionInfo(jobCtx.metaMut, job, args) + tblInfo, partNames, partInfo, err := getReorgPartitionInfo(jobCtx.metaMut, job, args) if err != nil { return ver, err } @@ -3362,7 +3363,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // For available state, the new added partition should wait its replica to // be finished, otherwise the query to this partition will be blocked. count := tblInfo.TiFlashReplica.Count - needRetry, err := checkPartitionReplica(count, addingDefinitions, jobCtx) + needRetry, err := checkPartitionReplica(count, tblInfo.Partition.AddingDefinitions, jobCtx) if err != nil { return rollbackReorganizePartitionWithErr(jobCtx, job, err) } @@ -3376,7 +3377,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // When TiFlash Replica is ready, we must move them into `AvailablePartitionIDs`. // Since onUpdateFlashReplicaStatus cannot see the partitions yet (not public) - for _, d := range addingDefinitions { + for _, d := range tblInfo.Partition.AddingDefinitions { tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, d.ID) } } @@ -3491,6 +3492,37 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) case model.StateDeleteReorganization: + // Need to have one more state before completing, due to: + // - DeleteRanges could possibly start directly after DDL causing + // inserts during previous state (DeleteReorg) could insert after the cleanup + // leaving data in dropped partitions/indexes that will not be cleaned up again. + // - Updates in previous state (DeleteReorg) could have duplicate errors, if the row + // was deleted or updated in after finish (so here we need to have DeleteOnly index state! + // And we cannot rollback in this state! + + // Stop double writing to the indexes, only do Deletes! + // so that previous could do inserts, we do delete and allow second insert for + // previous state clients! + for _, index := range tblInfo.Indices { + isNew, ok := tblInfo.Partition.DDLChangedIndex[index.ID] + if !ok || isNew { + continue + } + // Old index, should not be visible any longer, + // but needs to be deleted, in case previous state clients inserts. + index.State = model.StateDeleteOnly + } + failpoint.Inject("reorgPartFail3", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + failpoint.Return(ver, errors.New("Injected error by reorgPartFail3")) + } + }) + job.SchemaState = model.StatePublic + tblInfo.Partition.DDLState = job.SchemaState + ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) + + case model.StatePublic: // Drop the droppingDefinitions and finish the DDL // This state is needed for the case where client A sees the schema // with version of StateWriteReorg and would not see updates of @@ -3515,7 +3547,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver var dropIndices []*model.IndexInfo for _, indexInfo := range tblInfo.Indices { - if indexInfo.Unique && indexInfo.State == model.StateWriteOnly { + if indexInfo.Unique && indexInfo.State == model.StateDeleteOnly { // Drop the old unique (possible global) index, see onDropIndex indexInfo.State = model.StateNone DropIndexColumnFlag(tblInfo, indexInfo) @@ -3530,10 +3562,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver for _, indexInfo := range dropIndices { removeIndexInfo(tblInfo, indexInfo) } - failpoint.Inject("reorgPartFail3", func(val failpoint.Value) { + failpoint.Inject("reorgPartFail4", func(val failpoint.Value) { if val.(bool) { job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 - failpoint.Return(ver, errors.New("Injected error by reorgPartFail3")) + failpoint.Return(ver, errors.New("Injected error by reorgPartFail4")) } }) var oldTblID int64 @@ -3567,12 +3599,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // ALTER TABLE ... PARTITION BY tblInfo.Partition.ClearReorgIntermediateInfo() } - failpoint.Inject("reorgPartFail4", func(val failpoint.Value) { - if val.(bool) { - job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 - failpoint.Return(ver, errors.New("Injected error by reorgPartFail4")) - } - }) err = metaMut.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs) if err != nil { return ver, errors.Trace(err) @@ -3593,6 +3619,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver }) args.OldPhysicalTblIDs = physicalTableIDs args.NewPartitionIDs = newIDs + job.SchemaState = model.StateNone ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index 2960f109b669c..74ce27403f107 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -376,6 +376,11 @@ func onRollbackReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int6 job.State = model.JobStateCancelled return ver, errors.Trace(err) } + if job.SchemaState == model.StatePublic { + // We started to destroy the old indexes, so we can no longer rollback! + job.State = model.JobStateRunning + return ver, nil + } jobCtx.jobArgs = args return rollbackReorganizePartitionWithErr(jobCtx, job, dbterror.ErrCancelledDDLJob) diff --git a/pkg/ddl/schema_version.go b/pkg/ddl/schema_version.go index bbd823b7953de..3b206bb60ac79 100644 --- a/pkg/ddl/schema_version.go +++ b/pkg/ddl/schema_version.go @@ -221,7 +221,7 @@ func SetSchemaDiffForReorganizePartition(diff *model.SchemaDiff, job *model.Job, func SetSchemaDiffForPartitionModify(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) { diff.TableID = job.TableID diff.OldTableID = job.TableID - if job.SchemaState == model.StateDeleteReorganization { + if job.SchemaState == model.StateNone { args := jobCtx.jobArgs.(*model.TablePartitionArgs) partInfo := args.PartInfo // Final part, new table id is assigned diff --git a/pkg/ddl/tests/partition/multi_domain_test.go b/pkg/ddl/tests/partition/multi_domain_test.go index eb618bd1f1f4f..b71571dac29b4 100644 --- a/pkg/ddl/tests/partition/multi_domain_test.go +++ b/pkg/ddl/tests/partition/multi_domain_test.go @@ -336,12 +336,25 @@ func TestMultiSchemaReorganizePartition(t *testing.T) { tkO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID)) tkNO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID))) - logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID)) + logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID), zap.String("state", schemaState)) testID++ tkNO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID)) tkO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID))) + // Test for Index, specially between WriteOnly and DeleteOnly, but better to test all states. + // if tkNO (DeleteOnly) updates a row, the new index should be deleted, but not inserted. + // It will be inserted by backfill in WriteReorganize. + // If not deleted, then there would be an orphan entry in the index! + tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+100, testID)) + tkNO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100))) + tkNO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+100)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100))) + tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+99, testID-1)) + tkO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID-1)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99))) + tkO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+99)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99))) + tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID, testID)) + tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID-1, testID-1)) + switch schemaState { case model.StateDeleteOnly.String(): // tkNO sees original table/partitions as before the DDL stated @@ -387,16 +400,19 @@ func TestMultiSchemaReorganizePartition(t *testing.T) { "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (200),\n" + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + case model.StatePublic.String(): + // not tested, both tkO and tkNO sees same partitions case model.StateNone.String(): + // not tested, both tkO and tkNO sees same partitions default: - require.Failf(t, "unhandled schema state '%s'", schemaState) + require.Failf(t, "unhandled schema state", "State '%s'", schemaState) } } postFn := func(tkO *testkit.TestKit, store kv.Storage) { tkO.MustQuery(`select * from t where b = 5`).Sort().Check(testkit.Rows("5 5")) tkO.MustQuery(`select * from t where b = "5"`).Sort().Check(testkit.Rows("5 5")) tkO.MustExec(`admin check table t`) - tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "998 998", "999 999")) + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "15 15", "16 16", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "994 994", "995 995", "998 998", "999 999")) // TODO: Verify that there are no KV entries for old partitions or old indexes!!! delRange := tkO.MustQuery(`select * from mysql.gc_delete_range_done`).Rows() s := "" @@ -441,17 +457,16 @@ func TestMultiSchemaReorganizePartition(t *testing.T) { require.False(t, HaveEntriesForTableIndex(t, tkO, part.ID, locIdx), "Local index id %d for partition id %d has still entries!", locIdx, tableID) } } - // TODO: Fix cleanup issues, most likely it needs one more SchemaState in onReorganizePartition - //PartitionLoop: - // for _, partID := range originalPartitions { - // for _, def := range tbl.Meta().Partition.Definitions { - // if def.ID == partID { - // continue PartitionLoop - // } - // } - // // old partitions removed - // require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID) - // } + PartitionLoop: + for _, partID := range originalPartitions { + for _, def := range tbl.Meta().Partition.Definitions { + if def.ID == partID { + continue PartitionLoop + } + } + // old partitions removed + require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID) + } } runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn) } diff --git a/pkg/ddl/tests/partition/reorg_partition_test.go b/pkg/ddl/tests/partition/reorg_partition_test.go index 54b883c9eff53..e50db1d2f4b5c 100644 --- a/pkg/ddl/tests/partition/reorg_partition_test.go +++ b/pkg/ddl/tests/partition/reorg_partition_test.go @@ -185,7 +185,7 @@ func TestReorgPartitionFailures(t *testing.T) { afterResult := testkit.Rows( "1 1 1", "12 12 21", "13 13 13", "17 17 17", "18 18 18", "2 2 2", "23 23 32", "45 45 54", "5 5 5", ) - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestRemovePartitionFailures(t *testing.T) { @@ -210,7 +210,7 @@ func TestRemovePartitionFailures(t *testing.T) { `delete from t where b = 102`, } afterResult := testkit.Rows("1 1 1", "101 101 101", "2 2 2", "3 3 3", "4 4 4", "9 9 104") - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestPartitionByFailures(t *testing.T) { @@ -261,7 +261,7 @@ func TestReorganizePartitionListFailures(t *testing.T) { `delete from t where b = 3`, } afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestPartitionByListFailures(t *testing.T) { @@ -309,7 +309,7 @@ func TestAddHashPartitionFailures(t *testing.T) { `delete from t where b = 3`, } afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestCoalesceKeyPartitionFailures(t *testing.T) { @@ -332,7 +332,7 @@ func TestCoalesceKeyPartitionFailures(t *testing.T) { `delete from t where b = 3`, } afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestPartitionByNonPartitionedTable(t *testing.T) { @@ -340,7 +340,7 @@ func TestPartitionByNonPartitionedTable(t *testing.T) { alter := `alter table t partition by range (a) (partition p0 values less than (20))` beforeResult := testkit.Rows() afterResult := testkit.Rows() - testReorganizePartitionFailures(t, create, alter, nil, beforeResult, nil, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, nil, beforeResult, nil, afterResult) } func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterResult [][]any, skipTests ...string) { @@ -356,20 +356,24 @@ func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, b // Cancel means we set job.State = JobStateCancelled, as in no need to do more // Rollback means we do full rollback before returning error. tests := []struct { - name string - count int + name string + count int + rollForwardFrom int }{ { "Cancel", 1, + -1, }, { "Fail", 5, + 4, }, { "Rollback", 4, + -1, }, } oldWaitTimeWhenErrorOccurred := ddl.WaitTimeWhenErrorOccurred @@ -386,44 +390,55 @@ func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, b continue SUBTEST } } - tk.MustExec(createSQL) + suffixComment := ` /* ` + suffix + ` */` + tk.MustExec(createSQL + suffixComment) for _, sql := range beforeDML { - tk.MustExec(sql + ` /* ` + suffix + ` */`) + tk.MustExec(sql + suffixComment) } - tk.MustQuery(`select * from t /* ` + suffix + ` */`).Sort().Check(beforeResult) + tk.MustQuery(`select * from t ` + suffixComment).Sort().Check(beforeResult) tOrg := external.GetTableByName(t, tk, "test", "t") var idxID int64 if len(tOrg.Meta().Indices) > 0 { idxID = tOrg.Meta().Indices[0].ID } - oldCreate := tk.MustQuery(`show create table t`).Rows() + oldCreate := tk.MustQuery(`show create table t` + suffixComment).Rows() name := "github.com/pingcap/tidb/pkg/ddl/reorgPart" + suffix - testfailpoint.Enable(t, name, `return(true)`) - err := tk.ExecToErr(alterSQL) - require.Error(t, err, "failpoint reorgPart"+suffix) - require.ErrorContains(t, err, "Injected error by reorgPart"+suffix) - testfailpoint.Disable(t, name) - tk.MustQuery(`show create table t /* ` + suffix + ` */`).Check(oldCreate) + term := "return(true)" + if test.rollForwardFrom > 0 && test.rollForwardFrom <= i { + term = "10*" + term + } + testfailpoint.Enable(t, name, term) + err := tk.ExecToErr(alterSQL + suffixComment) tt := external.GetTableByName(t, tk, "test", "t") partition := tt.Meta().Partition - if partition == nil { - require.Nil(t, tOrg.Meta().Partition, suffix) + rollback := false + if test.rollForwardFrom > 0 && test.rollForwardFrom <= i { + require.NoError(t, err) } else { - require.Equal(t, len(tOrg.Meta().Partition.Definitions), len(partition.Definitions), suffix) - require.Equal(t, 0, len(partition.AddingDefinitions), suffix) - require.Equal(t, 0, len(partition.DroppingDefinitions), suffix) + rollback = true + require.Error(t, err, "failpoint reorgPart"+suffix) + require.ErrorContains(t, err, "Injected error by reorgPart"+suffix) + tk.MustQuery(`show create table t` + suffixComment).Check(oldCreate) + if partition == nil { + require.Nil(t, tOrg.Meta().Partition, suffix) + } else { + require.Equal(t, len(tOrg.Meta().Partition.Definitions), len(partition.Definitions), suffix) + require.Equal(t, 0, len(partition.AddingDefinitions), suffix) + require.Equal(t, 0, len(partition.DroppingDefinitions), suffix) + } + noNewTablesAfter(t, tk, tk.Session(), tOrg, suffix) } + testfailpoint.Disable(t, name) require.Equal(t, len(tOrg.Meta().Indices), len(tt.Meta().Indices), suffix) - if idxID != 0 { + if rollback && idxID != 0 { require.Equal(t, idxID, tt.Meta().Indices[0].ID, suffix) } - noNewTablesAfter(t, tk, tk.Session(), tOrg, suffix) - tk.MustExec(`admin check table t /* ` + suffix + ` */`) + tk.MustExec(`admin check table t` + suffixComment) for _, sql := range afterDML { - tk.MustExec(sql + " /* " + suffix + " */") + tk.MustExec(sql + suffixComment) } - tk.MustQuery(`select * from t /* ` + suffix + ` */`).Sort().Check(afterResult) - tk.MustExec(`drop table t /* ` + suffix + ` */`) + tk.MustQuery(`select * from t` + suffixComment).Sort().Check(afterResult) + tk.MustExec(`drop table t` + suffixComment) // TODO: Check TiFlash replicas // TODO: Check Label rules // TODO: Check bundles @@ -461,7 +476,8 @@ func TestReorgPartitionConcurrent(t *testing.T) { (job.SchemaState == model.StateDeleteOnly || job.SchemaState == model.StateWriteOnly || job.SchemaState == model.StateWriteReorganization || - job.SchemaState == model.StateDeleteReorganization) && + job.SchemaState == model.StateDeleteReorganization || + job.SchemaState == model.StatePublic) && currState != job.SchemaState { currState = job.SchemaState <-wait @@ -557,6 +573,61 @@ func TestReorgPartitionConcurrent(t *testing.T) { "15 15 15", "16 16 16")) currTbl.Meta().Partition = currPart + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1a` VALUES LESS THAN (15),\n" + + " PARTITION `p1b` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + wait <- true + + // StatePublic + wait <- true + tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+ + "10 10 10", + "12 12b 12", + "14 14 14", + "15 15 15", + "16 16 16")) + publicInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() + require.Equal(t, int64(1), publicInfoSchema.SchemaMetaVersion()-deleteReorgInfoSchema.SchemaMetaVersion()) + tk.MustExec(`insert into t values (17, "17", 17)`) + oldTbl, err = deleteReorgInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) + require.NoError(t, err) + partDef = oldTbl.Meta().Partition.Definitions[1] + require.Equal(t, "p1a", partDef.Name.O) + rows = getNumRowsFromPartitionDefs(t, tk, oldTbl, oldTbl.Meta().Partition.Definitions[1:2]) + require.Equal(t, 3, rows) + tk.MustQuery(`select * from t partition (p1a)`).Sort().Check(testkit.Rows("10 10 10", "12 12b 12", "14 14 14")) + currTbl, err = publicInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) + require.NoError(t, err) + currPart = currTbl.Meta().Partition + currTbl.Meta().Partition = oldTbl.Meta().Partition + tk.MustQuery(`select * from t where b = "17"`).Sort().Check(testkit.Rows("17 17 17")) + tk.MustExec(`admin check table t`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1a` VALUES LESS THAN (15),\n" + + " PARTITION `p1b` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + currTbl.Meta().Partition = currPart wait <- true syncOnChanged <- true // This reads the new schema (Schema update completed) @@ -565,11 +636,12 @@ func TestReorgPartitionConcurrent(t *testing.T) { "12 12b 12", "14 14 14", "15 15 15", - "16 16 16")) + "16 16 16", + "17 17 17")) tk.MustExec(`admin check table t`) newInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() - require.Equal(t, int64(1), newInfoSchema.SchemaMetaVersion()-deleteReorgInfoSchema.SchemaMetaVersion()) - oldTbl, err = deleteReorgInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) + require.Equal(t, int64(1), newInfoSchema.SchemaMetaVersion()-publicInfoSchema.SchemaMetaVersion()) + oldTbl, err = publicInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) require.NoError(t, err) partDef = oldTbl.Meta().Partition.Definitions[1] require.Equal(t, "p1a", partDef.Name.O) @@ -587,7 +659,7 @@ func TestReorgPartitionConcurrent(t *testing.T) { " PARTITION `p1a` VALUES LESS THAN (15),\n" + " PARTITION `p1b` VALUES LESS THAN (20),\n" + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) - newTbl, err := deleteReorgInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) + newTbl, err := newInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) require.NoError(t, err) newPart := newTbl.Meta().Partition newTbl.Meta().Partition = oldTbl.Meta().Partition @@ -935,13 +1007,13 @@ func TestPartitionByColumnChecks(t *testing.T) { } func TestPartitionIssue56634(t *testing.T) { - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/updateVersionAndTableInfoErrInStateDeleteReorganization", `return(1)`) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/updateVersionAndTableInfoErrInStateDeleteReorganization", `4*return(1)`) store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) - tk.MustExec("set global tidb_ddl_error_count_limit = 3") tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int)") - tk.MustContainErrMsg("alter table t partition by range(a) (partition p1 values less than (20))", "[ddl:-1]DDL job rollback, error msg: Injected error in StateDeleteReorganization") // should NOT panic + // Changed, since StatePublic can no longer rollback! + tk.MustExec("alter table t partition by range(a) (partition p1 values less than (20))") } diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 3876f217bd29c..b5b509c20c46a 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -845,6 +845,12 @@ func (job *Job) IsRollbackable() bool { job.SchemaState == StateWriteOnly { return false } + case ActionReorganizePartition, ActionRemovePartitioning, ActionAlterTablePartitioning: + if job.SchemaState == StatePublic { + // We will double write until this state, here we will do DeleteOnly on indexes, + // so no-longer rollbackable. + return false + } } return true } diff --git a/pkg/table/tables/partition.go b/pkg/table/tables/partition.go index 0ddc1b6277238..7353e4f70166b 100644 --- a/pkg/table/tables/partition.go +++ b/pkg/table/tables/partition.go @@ -196,14 +196,14 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part default: return ret, nil } - // In StateWriteReorganization we are using the 'old' partition definitions + // In WriteReorganization we are using the 'old' partition definitions // and if any new change happens in DroppingDefinitions, it needs to be done // also in AddingDefinitions (with new evaluation of the new expression) - // In StateDeleteReorganization we are using the 'new' partition definitions + // In DeleteReorganization/Public we are using the 'new' partition definitions // and if any new change happens in AddingDefinitions, it needs to be done // also in DroppingDefinitions (since session running on schema version -1) - // should also see the changes - if pi.DDLState == model.StateDeleteReorganization { + // should also see the changes. + if pi.DDLState == model.StateDeleteReorganization || pi.DDLState == model.StatePublic { // TODO: Explicitly explain the different DDL/New fields! if pi.NewTableID != 0 { ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.DDLType, pi.DDLExpr, pi.DDLColumns, pi.DroppingDefinitions) @@ -224,6 +224,7 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part if err != nil { return nil, err } + p.skipAssert = true partitions[def.ID] = p ret.doubleWritePartitions[def.ID] = nil } @@ -247,6 +248,7 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part if err != nil { return nil, err } + p.skipAssert = true partitions[def.ID] = p } } @@ -1475,18 +1477,23 @@ func (t *partitionedTable) locateReorgPartition(ctx expression.EvalContext, r [] // all partitions will be reorganized, // so we can use the number in Dropping or AddingDefinitions, // depending on current state. - num := len(pi.AddingDefinitions) - if pi.DDLState == model.StateDeleteReorganization { - num = len(pi.DroppingDefinitions) + reorgDefs := pi.AddingDefinitions + switch pi.DDLAction { + case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: + if pi.DDLState == model.StatePublic { + reorgDefs = pi.DroppingDefinitions + } + fallthrough + default: + if pi.DDLState == model.StateDeleteReorganization { + reorgDefs = pi.DroppingDefinitions + } } - idx, err := t.locatePartitionCommon(ctx, pi.DDLType, t.reorgPartitionExpr, uint64(num), columnsSet, r) + idx, err := t.locatePartitionCommon(ctx, pi.DDLType, t.reorgPartitionExpr, uint64(len(reorgDefs)), columnsSet, r) if err != nil { return 0, errors.Trace(err) } - if pi.DDLState == model.StateDeleteReorganization { - return pi.DroppingDefinitions[idx].ID, nil - } - return pi.AddingDefinitions[idx].ID, nil + return reorgDefs[idx].ID, nil } func (t *partitionedTable) locateRangeColumnPartition(ctx expression.EvalContext, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { @@ -1759,7 +1766,7 @@ func partitionedTableAddRecord(ctx table.MutateContext, txn kv.Transaction, t *p if err != nil { return } - if t.Meta().Partition.DDLState == model.StateDeleteOnly { + if t.Meta().Partition.DDLState == model.StateDeleteOnly || t.Meta().Partition.DDLState == model.StatePublic { return } if _, ok := t.reorganizePartitions[pid]; ok { @@ -1893,6 +1900,7 @@ func partitionedTableUpdateRecord(ctx table.MutateContext, txn kv.Transaction, t sh := memBuffer.Staging() defer memBuffer.Cleanup(sh) + deleteOnly := t.Meta().Partition.DDLState == model.StateDeleteOnly || t.Meta().Partition.DDLState == model.StatePublic // The old and new data locate in different partitions. // Remove record from old partition and add record to new partition. if from != to { @@ -1938,7 +1946,7 @@ func partitionedTableUpdateRecord(ctx table.MutateContext, txn kv.Transaction, t return errors.Trace(err) } } - if newTo != 0 && t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly { + if newTo != 0 && !deleteOnly { _, err = t.getPartition(newTo).addRecord(ctx, txn, newData, opt.GetAddRecordOpt()) if err != nil { return errors.Trace(err) @@ -1965,7 +1973,7 @@ func partitionedTableUpdateRecord(ctx table.MutateContext, txn kv.Transaction, t } if newTo == newFrom { tbl = t.getPartition(newTo) - if t.Meta().Partition.DDLState == model.StateDeleteOnly { + if deleteOnly { err = tbl.RemoveRecord(ctx, txn, h, currData) } else { err = tbl.updateRecord(ctx, txn, h, currData, newData, touched, opt) @@ -1981,7 +1989,7 @@ func partitionedTableUpdateRecord(ctx table.MutateContext, txn kv.Transaction, t if err != nil { return errors.Trace(err) } - if t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly { + if !deleteOnly { tbl = t.getPartition(newTo) _, err = tbl.addRecord(ctx, txn, newData, opt.GetAddRecordOpt()) if err != nil { diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index cff833538f04b..79fedefe90b3c 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tblctx" @@ -83,6 +82,9 @@ type TableCommon struct { // recordPrefix and indexPrefix are generated using physicalTableID. recordPrefix kv.Key indexPrefix kv.Key + + // skipAssert is used for partitions that are in WriteOnly/DeleteOnly state. + skipAssert bool } // ResetColumnsCache implements testingKnob interface. @@ -408,31 +410,6 @@ func (t *TableCommon) RecordKey(h kv.Handle) kv.Key { return tablecodec.EncodeRecordKey(t.recordPrefix, h) } -// shouldAssert checks if the partition should be in consistent -// state and can have assertion. -func (t *TableCommon) shouldAssert(level variable.AssertionLevel) bool { - p := t.Meta().Partition - if p != nil { - // This disables asserting during Reorganize Partition. - switch level { - case variable.AssertionLevelFast: - // Fast option, just skip assertion for all partitions. - if p.DDLState != model.StateNone && p.DDLState != model.StatePublic { - return false - } - case variable.AssertionLevelStrict: - // Strict, only disable assertion for intermediate partitions. - // If there were an easy way to get from a TableCommon back to the partitioned table... - for i := range p.AddingDefinitions { - if t.physicalTableID == p.AddingDefinitions[i].ID { - return false - } - } - } - } - return true -} - // UpdateRecord implements table.Table UpdateRecord interface. // `touched` means which columns are really modified, used for secondary indices. // Length of `oldData` and `newData` equals to length of `t.WritableCols()`. @@ -535,10 +512,10 @@ func (t *TableCommon) updateRecord(sctx table.MutateContext, txn kv.Transaction, } }) - if t.shouldAssert(sctx.TxnAssertionLevel()) { - err = txn.SetAssertion(key, kv.SetAssertExist) - } else { + if t.skipAssert { err = txn.SetAssertion(key, kv.SetAssertUnknown) + } else { + err = txn.SetAssertion(key, kv.SetAssertExist) } if err != nil { return err @@ -1238,10 +1215,10 @@ func (t *TableCommon) removeRowData(ctx table.MutateContext, txn kv.Transaction, } } }) - if t.shouldAssert(ctx.TxnAssertionLevel()) { - err = txn.SetAssertion(key, kv.SetAssertExist) - } else { + if t.skipAssert { err = txn.SetAssertion(key, kv.SetAssertUnknown) + } else { + err = txn.SetAssertion(key, kv.SetAssertExist) } if err != nil { return err