From 64ce2c0e8a7d8884621915ad9dc2850058ebc50b Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 29 Nov 2024 16:15:29 +0800 Subject: [PATCH] *: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114) (#57812) ref pingcap/tidb#45133, close pingcap/tidb#56822, close pingcap/tidb#57510 --- pkg/ddl/delete_range.go | 17 +- pkg/ddl/partition.go | 362 ++++++++----- pkg/ddl/rollingback.go | 5 + pkg/ddl/sanity_check.go | 4 +- pkg/ddl/schema_version.go | 2 +- pkg/ddl/tests/partition/BUILD.bazel | 1 + pkg/ddl/tests/partition/db_partition_test.go | 110 +++- pkg/ddl/tests/partition/multi_domain_test.go | 510 +++++++++++++----- .../tests/partition/reorg_partition_test.go | 150 ++++-- pkg/meta/model/job.go | 6 + pkg/meta/model/job_args.go | 16 +- pkg/table/tables/partition.go | 48 +- pkg/table/tables/tables.go | 41 +- pkg/tablecodec/tablecodec.go | 8 +- .../integrationtest/r/ddl/db_partition.result | 4 +- .../integrationtest/r/globalindex/misc.result | 9 + tests/integrationtest/t/globalindex/misc.test | 8 + 17 files changed, 918 insertions(+), 383 deletions(-) diff --git a/pkg/ddl/delete_range.go b/pkg/ddl/delete_range.go index f4c558e53ae82..486db3fd336d1 100644 --- a/pkg/ddl/delete_range.go +++ b/pkg/ddl/delete_range.go @@ -327,13 +327,24 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap // always delete the table range, even when it's a partitioned table where // it may contain global index regions. return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID")) - case model.ActionDropTablePartition, model.ActionReorganizePartition, - model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: + case model.ActionDropTablePartition: args, err := model.GetFinishedTablePartitionArgs(job) if err != nil { return errors.Trace(err) } - return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize/drop partition: physical table ID(s)")) + return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "drop partition: physical table ID(s)")) + case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: + // Delete dropped partitions, as well as replaced global indexes. + args, err := model.GetFinishedTablePartitionArgs(job) + if err != nil { + return errors.Trace(err) + } + for _, idx := range args.OldGlobalIndexes { + if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, idx.TableID, []int64{idx.IndexID}, ea, "reorganize partition, replaced global indexes"); err != nil { + return errors.Trace(err) + } + } + return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize partition: physical table ID(s)")) case model.ActionTruncateTablePartition: args, err := model.GetTruncateTableArgs(job) if err != nil { diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 43bcd1a785001..705438a5ba4fe 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -47,7 +47,6 @@ import ( pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/opcode" - "github.com/pingcap/tidb/pkg/parser/terror" field_types "github.com/pingcap/tidb/pkg/parser/types" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" @@ -2158,10 +2157,16 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) ( dropIndices = append(dropIndices, indexInfo) } } + var deleteIndices []model.TableIDIndexID for _, indexInfo := range dropIndices { DropIndexColumnFlag(tblInfo, indexInfo) RemoveDependentHiddenColumns(tblInfo, indexInfo) removeIndexInfo(tblInfo, indexInfo) + if indexInfo.Global { + deleteIndices = append(deleteIndices, model.TableIDIndexID{TableID: tblInfo.ID, IndexID: indexInfo.ID}) + } + // All other indexes has only been applied to new partitions, that is deleted in whole, + // including indexes. } if tblInfo.Partition != nil { tblInfo.Partition.ClearReorgIntermediateInfo() @@ -2173,12 +2178,13 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) ( } job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo) args.OldPhysicalTblIDs = physicalTableIDs + args.OldGlobalIndexes = deleteIndices job.FillFinishedArgs(args) return ver, nil } // onDropTablePartition deletes old partition meta. -// States: +// States in reverse order: // StateNone // // Old partitions are queued to be deleted (delete_range), global index up-to-date @@ -3017,36 +3023,29 @@ func getNewGlobal(partInfo *model.PartitionInfo, idx *model.IndexInfo) bool { return idx.Global } -func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, []model.PartitionDefinition, []model.PartitionDefinition, error) { +func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, error) { schemaID := job.SchemaID tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) if err != nil { - return nil, nil, nil, nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } partNames, partInfo := args.PartNames, args.PartInfo - var addingDefs, droppingDefs []model.PartitionDefinition - if tblInfo.Partition != nil { - addingDefs = tblInfo.Partition.AddingDefinitions - droppingDefs = tblInfo.Partition.DroppingDefinitions - tblInfo.Partition.NewTableID = partInfo.NewTableID - tblInfo.Partition.DDLType = partInfo.Type - tblInfo.Partition.DDLExpr = partInfo.Expr - tblInfo.Partition.DDLColumns = partInfo.Columns - } else { - tblInfo.Partition = getPartitionInfoTypeNone() - tblInfo.Partition.NewTableID = partInfo.NewTableID - tblInfo.Partition.Definitions[0].ID = tblInfo.ID - tblInfo.Partition.DDLType = partInfo.Type - tblInfo.Partition.DDLExpr = partInfo.Expr - tblInfo.Partition.DDLColumns = partInfo.Columns - } - if len(addingDefs) == 0 { - addingDefs = []model.PartitionDefinition{} - } - if len(droppingDefs) == 0 { - droppingDefs = []model.PartitionDefinition{} + if job.SchemaState == model.StateNone { + if tblInfo.Partition != nil { + tblInfo.Partition.NewTableID = partInfo.NewTableID + tblInfo.Partition.DDLType = partInfo.Type + tblInfo.Partition.DDLExpr = partInfo.Expr + tblInfo.Partition.DDLColumns = partInfo.Columns + } else { + tblInfo.Partition = getPartitionInfoTypeNone() + tblInfo.Partition.NewTableID = partInfo.NewTableID + tblInfo.Partition.Definitions[0].ID = tblInfo.ID + tblInfo.Partition.DDLType = partInfo.Type + tblInfo.Partition.DDLExpr = partInfo.Expr + tblInfo.Partition.DDLColumns = partInfo.Columns + } } - return tblInfo, partNames, partInfo, droppingDefs, addingDefs, nil + return tblInfo, partNames, partInfo, nil } // onReorganizePartition reorganized the partitioning of a table including its indexes. @@ -3068,8 +3067,9 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar // // job.SchemaState goes through the following SchemaState(s): // StateNone -> StateDeleteOnly -> StateWriteOnly -> StateWriteReorganization -// -> StateDeleteOrganization -> StatePublic +// -> StateDeleteOrganization -> StatePublic -> Done // There are more details embedded in the implementation, but the high level changes are: +// // StateNone -> StateDeleteOnly: // // Various checks and validations. @@ -3095,13 +3095,20 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar // and if new unique indexes are added, it also updates them with the rest of data from // the non-touched partitions. // For indexes that are to be replaced with new ones (old/new global index), -// mark the old indexes as StateDeleteReorganization and new ones as StatePublic +// mark the old indexes as StateWriteOnly and new ones as StatePublic // Finally make the table visible with the new partition definitions. // I.e. in this state clients will read from the old set of partitions, -// and will read the new set of partitions in StateDeleteReorganization. +// and next state will read the new set of partitions in StateDeleteReorganization. // // StateDeleteOrganization -> StatePublic: // +// Now we mark all replaced (old) indexes as StateDeleteOnly +// in case DeleteRange would be called directly after the DDL, +// this way there will be no orphan records inserted after DeleteRanges +// has cleaned up the old partitions and old global indexes. +// +// StatePublic -> Done: +// // Now all heavy lifting is done, and we just need to finalize and drop things, while still doing // double writes, since previous state sees the old partitions/indexes. // Remove the old indexes and old partitions from the TableInfo. @@ -3110,10 +3117,10 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar // if ALTER TABLE t PARTITION BY/REMOVE PARTITIONING: // Recreate the table with the new TableID, by DropTableOrView+CreateTableOrView // -// StatePublic: +// Done: // // Everything now looks as it should, no memory of old partitions/indexes, -// and no more double writing, since the previous state is only reading the new partitions/indexes. +// and no more double writing, since the previous state is only using the new partitions/indexes. // // Note: Special handling is also required in tables.newPartitionedTable(), // to get per partition indexes in the right state. @@ -3133,7 +3140,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver return ver, nil } - tblInfo, partNames, partInfo, _, addingDefinitions, err := getReorgPartitionInfo(jobCtx.metaMut, job, args) + tblInfo, partNames, partInfo, err := getReorgPartitionInfo(jobCtx.metaMut, job, args) if err != nil { return ver, err } @@ -3362,7 +3369,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // For available state, the new added partition should wait its replica to // be finished, otherwise the query to this partition will be blocked. count := tblInfo.TiFlashReplica.Count - needRetry, err := checkPartitionReplica(count, addingDefinitions, jobCtx) + needRetry, err := checkPartitionReplica(count, tblInfo.Partition.AddingDefinitions, jobCtx) if err != nil { return rollbackReorganizePartitionWithErr(jobCtx, job, err) } @@ -3376,7 +3383,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // When TiFlash Replica is ready, we must move them into `AvailablePartitionIDs`. // Since onUpdateFlashReplicaStatus cannot see the partitions yet (not public) - for _, d := range addingDefinitions { + for _, d := range tblInfo.Partition.AddingDefinitions { tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, d.ID) } } @@ -3491,6 +3498,37 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) case model.StateDeleteReorganization: + // Need to have one more state before completing, due to: + // - DeleteRanges could possibly start directly after DDL causing + // inserts during previous state (DeleteReorg) could insert after the cleanup + // leaving data in dropped partitions/indexes that will not be cleaned up again. + // - Updates in previous state (DeleteReorg) could have duplicate errors, if the row + // was deleted or updated in after finish (so here we need to have DeleteOnly index state! + // And we cannot rollback in this state! + + // Stop double writing to the indexes, only do Deletes! + // so that previous could do inserts, we do delete and allow second insert for + // previous state clients! + for _, index := range tblInfo.Indices { + isNew, ok := tblInfo.Partition.DDLChangedIndex[index.ID] + if !ok || isNew { + continue + } + // Old index, should not be visible any longer, + // but needs to be deleted, in case previous state clients inserts. + index.State = model.StateDeleteOnly + } + failpoint.Inject("reorgPartFail3", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + failpoint.Return(ver, errors.New("Injected error by reorgPartFail3")) + } + }) + job.SchemaState = model.StatePublic + tblInfo.Partition.DDLState = job.SchemaState + ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) + + case model.StatePublic: // Drop the droppingDefinitions and finish the DDL // This state is needed for the case where client A sees the schema // with version of StateWriteReorg and would not see updates of @@ -3515,7 +3553,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver var dropIndices []*model.IndexInfo for _, indexInfo := range tblInfo.Indices { - if indexInfo.Unique && indexInfo.State == model.StateWriteOnly { + if indexInfo.Unique && indexInfo.State == model.StateDeleteOnly { // Drop the old unique (possible global) index, see onDropIndex indexInfo.State = model.StateNone DropIndexColumnFlag(tblInfo, indexInfo) @@ -3523,17 +3561,18 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver dropIndices = append(dropIndices, indexInfo) } } - // TODO: verify that the indexes are dropped, - // and that StateDeleteOnly+StateDeleteReorganization is not needed. - // local indexes is not an issue, since they will be gone with the dropped + // Local indexes is not an issue, since they will be gone with the dropped // partitions, but replaced global indexes should be checked! for _, indexInfo := range dropIndices { removeIndexInfo(tblInfo, indexInfo) + if indexInfo.Global { + args.OldGlobalIndexes = append(args.OldGlobalIndexes, model.TableIDIndexID{TableID: tblInfo.ID, IndexID: indexInfo.ID}) + } } - failpoint.Inject("reorgPartFail3", func(val failpoint.Value) { + failpoint.Inject("reorgPartFail4", func(val failpoint.Value) { if val.(bool) { job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 - failpoint.Return(ver, errors.New("Injected error by reorgPartFail3")) + failpoint.Return(ver, errors.New("Injected error by reorgPartFail4")) } }) var oldTblID int64 @@ -3567,12 +3606,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // ALTER TABLE ... PARTITION BY tblInfo.Partition.ClearReorgIntermediateInfo() } - failpoint.Inject("reorgPartFail4", func(val failpoint.Value) { - if val.(bool) { - job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 - failpoint.Return(ver, errors.New("Injected error by reorgPartFail4")) - } - }) err = metaMut.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs) if err != nil { return ver, errors.Trace(err) @@ -3593,6 +3626,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver }) args.OldPhysicalTblIDs = physicalTableIDs args.NewPartitionIDs = newIDs + job.SchemaState = model.StateNone ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) @@ -3665,16 +3699,49 @@ func doPartitionReorgWork(w *worker, jobCtx *jobContext, job *model.Job, tbl tab } defer w.sessPool.Put(sctx) rh := newReorgHandler(sess.NewSession(sctx)) - indices := make([]*model.IndexInfo, 0, len(tbl.Meta().Indices)) - for _, index := range tbl.Meta().Indices { - if index.Global && index.State == model.StatePublic { - // Skip old global indexes, but rebuild all other indexes - continue + reorgTblInfo := tbl.Meta().Clone() + var elements []*meta.Element + isClustered := tbl.Meta().PKIsHandle || tbl.Meta().IsCommonHandle + if isClustered { + indices := make([]*model.IndexInfo, 0, len(tbl.Meta().Indices)) + for _, index := range tbl.Meta().Indices { + if isNew, ok := tbl.Meta().GetPartitionInfo().DDLChangedIndex[index.ID]; ok && !isNew { + // Skip old replaced indexes, but rebuild all other indexes + continue + } + indices = append(indices, index) } - indices = append(indices, index) + elements = BuildElements(tbl.Meta().Columns[0], indices) + } else { + // Non-clustered tables needs to generate new _tidb_rowid for each row, since + // there might be duplicates due to EXCHANGE PARTITION. + // That means that we can not first copy all table records and then + // recreate all indexes, since we cannot determine if a table record + // has been copied or not, since its _tidb_rowid handle has been recreated + // in the new partition. + // So we will read a batch of records from one partition at a time, + // do a BatchGet for all the record keys in the new partitions, + // to see if any of the records is already there with the same handle/_tidb_rowid + // which means they were double written and does not need to be copied. + // use AddRecord for all non-matching records. + // TODO: if there is an issue where we will retry the same batch and we have committed + // backfilled records and indexes without committing the updated reorgInfo start/end key, + // then the DDL can fail due to duplicate key. + reorgTblInfo.Indices = reorgTblInfo.Indices[:0] + for _, index := range tbl.Meta().Indices { + if isNew, ok := tbl.Meta().GetPartitionInfo().DDLChangedIndex[index.ID]; ok && !isNew { + // Skip old replaced indexes, but rebuild all other indexes + continue + } + reorgTblInfo.Indices = append(reorgTblInfo.Indices, index) + } + elements = BuildElements(tbl.Meta().Columns[0], reorgTblInfo.Indices) + } + reorgTbl, err := getTable(jobCtx.getAutoIDRequirement(), job.SchemaID, reorgTblInfo) + if err != nil { + return false, ver, errors.Trace(err) } - elements := BuildElements(tbl.Meta().Columns[0], indices) - partTbl, ok := tbl.(table.PartitionedTable) + partTbl, ok := reorgTbl.(table.PartitionedTable) if !ok { return false, ver, dbterror.ErrUnsupportedReorganizePartition.GenWithStackByArgs() } @@ -3683,12 +3750,12 @@ func doPartitionReorgWork(w *worker, jobCtx *jobContext, job *model.Job, tbl tab return false, ver, errors.Trace(err) } reorgInfo, err := getReorgInfoFromPartitions(jobCtx.oldDDLCtx.jobContext(job.ID, job.ReorgMeta), jobCtx, rh, job, dbInfo, partTbl, physTblIDs, elements) - err = w.runReorgJob(reorgInfo, tbl.Meta(), func() (reorgErr error) { + err = w.runReorgJob(reorgInfo, reorgTbl.Meta(), func() (reorgErr error) { defer tidbutil.Recover(metrics.LabelDDL, "doPartitionReorgWork", func() { reorgErr = dbterror.ErrCancelledDDLJob.GenWithStack("reorganize partition for table `%v` panic", tbl.Meta().Name) }, false) - return w.reorgPartitionDataAndIndex(jobCtx.stepCtx, tbl, reorgInfo) + return w.reorgPartitionDataAndIndex(jobCtx.stepCtx, reorgTbl, reorgInfo) }) if err != nil { if dbterror.ErrPausedDDLJob.Equal(err) { @@ -3707,6 +3774,7 @@ func doPartitionReorgWork(w *worker, jobCtx *jobContext, job *model.Job, tbl tab zap.Stringer("job", job), zap.Error(err1)) } logutil.DDLLogger().Warn("reorg partition job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err)) + // TODO: Test and verify that this returns an error on the ALTER TABLE session. ver, err = rollbackReorganizePartitionWithErr(jobCtx, job, err) return false, ver, errors.Trace(err) } @@ -3715,6 +3783,7 @@ func doPartitionReorgWork(w *worker, jobCtx *jobContext, job *model.Job, tbl tab type reorgPartitionWorker struct { *backfillCtx + records int // Static allocated to limit memory allocations rowRecords []*rowRecord rowDecoder *decoder.RowDecoder @@ -3722,6 +3791,15 @@ type reorgPartitionWorker struct { writeColOffsetMap map[int64]int maxOffset int reorgedTbl table.PartitionedTable + // Only used for non-clustered tables, since we need to re-generate _tidb_rowid, + // and check if the old _tidb_rowid was already written or not. + // If the old _tidb_rowid already exists, then the row is already backfilled (double written) + // and can be skipped. Otherwise, we will insert it and generate index entries. + rows [][]types.Datum + // The original _tidb_rowids, used to check if already backfilled (double written). + oldKeys []kv.Key + // partition ids of the new rows + newPids []int64 } func newReorgPartitionWorker(i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*reorgPartitionWorker, error) { @@ -3774,43 +3852,66 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task } txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName) - rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) + nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { return errors.Trace(err) } taskCtx.nextKey = nextKey taskCtx.done = taskDone - warningsMap := make(map[errors.ErrorID]*terror.Error) - warningsCountMap := make(map[errors.ErrorID]int64) - for _, prr := range rowRecords { - taskCtx.scanCount++ + isClustered := w.reorgedTbl.Meta().IsCommonHandle || w.reorgedTbl.Meta().PKIsHandle + if !isClustered { + // non-clustered table, we need to replace the _tidb_rowid handles since + // there may be duplicates across different partitions, due to EXCHANGE PARTITION. + // Meaning we need to check here if a record was double written to the new partition, + // i.e. concurrently written by StateWriteOnly or StateWriteReorganization. + // and we should skip it. + var found map[string][]byte + if len(w.oldKeys) > 0 { + // we must check if old IDs already been written, + // i.e. double written by StateWriteOnly or StateWriteReorganization. + // The good thing is that we can then also skip the index generation for that row and we don't need to + // check if duplicate index entries was already copied either! + // TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel? + found, err = txn.BatchGet(ctx, w.oldKeys) + if err != nil { + return errors.Trace(err) + } + } + + for i := 0; i < w.records; i++ { + taskCtx.scanCount++ + if len(w.oldKeys) > 0 { + if _, ok := found[string(w.oldKeys[i])]; ok { + // Alredy filled + continue + } + tbl := w.reorgedTbl.GetPartition(w.newPids[i]) + if tbl == nil { + return dbterror.ErrUnsupportedReorganizePartition.GenWithStackByArgs() + } + // TODO: is this looking up each index entry at a time or in an optimistic way and only checks + // at commit time? + // AddRecord will assign a new _tidb_rowid, since we don't provide one. + _, err = tbl.AddRecord(w.tblCtx, txn, w.rows[i]) + if err != nil { + return errors.Trace(err) + } + taskCtx.addedCount++ + } + } + return nil + } + // Clustered table, use tried implementation + for _, prr := range w.rowRecords { + taskCtx.scanCount++ err = txn.Set(prr.key, prr.vals) if err != nil { return errors.Trace(err) } taskCtx.addedCount++ - if prr.warning != nil { - if _, ok := warningsCountMap[prr.warning.ID()]; ok { - warningsCountMap[prr.warning.ID()]++ - } else { - warningsCountMap[prr.warning.ID()] = 1 - warningsMap[prr.warning.ID()] = prr.warning - } - } - // TODO: Future optimization: also write the indexes here? - // What if the transaction limit is just enough for a single row, without index? - // Hmm, how could that be in the first place? - // For now, implement the batch-txn w.addTableIndex, - // since it already exists and is in use } - - // Collect the warnings. - taskCtx.warnings, taskCtx.warningsCount = warningsMap, warningsCountMap - - // also add the index entries here? And make sure they are not added somewhere else - return nil }) logSlowOperations(time.Since(oprStartTime), "BackfillData", 3000) @@ -3818,15 +3919,24 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task return } -func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*rowRecord, kv.Key, bool, error) { +func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) (kv.Key, bool, error) { w.rowRecords = w.rowRecords[:0] + w.records = 0 + isClustered := w.reorgedTbl.Meta().IsCommonHandle || w.reorgedTbl.Meta().PKIsHandle + if !isClustered { + if cap(w.rows) < w.batchCnt { + w.rows = make([][]types.Datum, w.batchCnt) + } + } + w.oldKeys = w.oldKeys[:0] + w.newPids = w.newPids[:0] startTime := time.Now() // taskDone means that the added handle is out of taskRange.endHandle. taskDone := false sysTZ := w.loc - tmpRow := make([]types.Datum, w.maxOffset+1) + tmpRow := make([]types.Datum, len(w.reorgedTbl.Cols())) var lastAccessedHandle kv.Key oprStartTime := startTime err := iterateSnapshotKeys(w.jobContext, w.ddlCtx.store, taskRange.priority, w.table.RecordPrefix(), txn.StartTS(), taskRange.startKey, taskRange.endKey, @@ -3837,7 +3947,7 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo taskDone = recordKey.Cmp(taskRange.endKey) >= 0 - if taskDone || len(w.rowRecords) >= w.batchCnt { + if taskDone || w.records >= w.batchCnt { return false, nil } @@ -3846,47 +3956,52 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo return false, errors.Trace(err) } - // Set the partitioning columns and calculate which partition to write to - for colID, offset := range w.writeColOffsetMap { - d, ok := w.rowMap[colID] - if !ok { - return false, dbterror.ErrUnsupportedReorganizePartition.GenWithStackByArgs() + if isClustered { + // Set all partitioning columns and calculate which partition to write to + for colID, offset := range w.writeColOffsetMap { + d, ok := w.rowMap[colID] + if !ok { + return false, dbterror.ErrUnsupportedReorganizePartition.GenWithStackByArgs() + } + tmpRow[offset] = d + } + } else { + // _tidb_rowid needs to be regenerated, due to EXCHANGE PARTITION, meaning we cannot + // delay the index generation, but need to check if the current _tidb_rowid already exists + // in the new partition or not, before we write the newly generated one. + // and later in the caller of this function write both Record and all indexes + + // Set all columns and calculate which partition to write to + // We will later copy the row, so use all writable columns + for _, col := range w.reorgedTbl.WritableCols() { + d, ok := w.rowMap[col.ID] + if !ok { + return false, dbterror.ErrUnsupportedReorganizePartition.GenWithStackByArgs() + } + tmpRow[col.Offset] = d } - tmpRow[offset] = d } p, err := w.reorgedTbl.GetPartitionByRow(w.exprCtx.GetEvalCtx(), tmpRow) if err != nil { return false, errors.Trace(err) } - var newKey kv.Key - if w.reorgedTbl.Meta().PKIsHandle || w.reorgedTbl.Meta().IsCommonHandle { - pid := p.GetPhysicalID() - newKey = tablecodec.EncodeTablePrefix(pid) - newKey = append(newKey, recordKey[len(newKey):]...) + if isClustered { + newKey := tablecodec.EncodeTablePrefix(p.GetPhysicalID()) + newKey = append(newKey, recordKey[tablecodec.TableSplitKeyLen:]...) + w.rowRecords = append(w.rowRecords, &rowRecord{key: newKey, vals: rawRow}) + w.records++ } else { - // Non-clustered table / not unique _tidb_rowid for the whole table - // Generate new _tidb_rowid if exists. - // Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions! - if reserved, ok := w.tblCtx.GetReservedRowIDAlloc(); ok && reserved.Exhausted() { - // TODO: Which autoid allocator to use? - ids := uint64(max(1, w.batchCnt-len(w.rowRecords))) - // Keep using the original table's allocator - var baseRowID, maxRowID int64 - baseRowID, maxRowID, err = tables.AllocHandleIDs(w.ctx, w.tblCtx, w.reorgedTbl, ids) - if err != nil { - return false, errors.Trace(err) - } - reserved.Reset(baseRowID, maxRowID) + if cap(w.rows[w.records]) < len(tmpRow) { + w.rows[w.records] = make([]types.Datum, len(tmpRow)) } - recordID, err := tables.AllocHandle(w.ctx, w.tblCtx, w.reorgedTbl) - if err != nil { - return false, errors.Trace(err) - } - newKey = tablecodec.EncodeRecordKey(p.RecordPrefix(), recordID) + copy(w.rows[w.records], tmpRow) + w.newPids = append(w.newPids, p.GetPhysicalID()) + + oldKey := tablecodec.EncodeTablePrefix(p.GetPhysicalID()) + oldKey = append(oldKey, recordKey[tablecodec.TableSplitKeyLen:]...) + w.oldKeys = append(w.oldKeys, oldKey) + w.records++ } - w.rowRecords = append(w.rowRecords, &rowRecord{ - key: newKey, vals: rawRow, - }) w.cleanRowMap() lastAccessedHandle = recordKey @@ -3897,7 +4012,7 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo return true, nil }) - if len(w.rowRecords) == 0 { + if w.records == 0 { taskDone = true } @@ -3905,7 +4020,7 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo zap.Uint64("txnStartTS", txn.StartTS()), zap.Stringer("taskRange", &taskRange), zap.Duration("takeTime", time.Since(startTime))) - return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err) + return getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err) } func (w *reorgPartitionWorker) cleanRowMap() { @@ -3940,8 +4055,11 @@ func (w *worker) reorgPartitionDataAndIndex( // - Transactions on different TiDB nodes/domains may see different states of the table/partitions // - We cannot have multiple partition ids for a unique index entry. + isClustered := t.Meta().PKIsHandle || t.Meta().IsCommonHandle + // Copy the data from the DroppingDefinitions to the AddingDefinitions if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { + // if non-clustered table it will also create its indexes! err = w.updatePhysicalTableRow(ctx, t, reorgInfo) if err != nil { return errors.Trace(err) @@ -4001,9 +4119,13 @@ func (w *worker) reorgPartitionDataAndIndex( pi := t.Meta().GetPartitionInfo() if _, err = findNextPartitionID(reorgInfo.PhysicalTableID, pi.AddingDefinitions); err == nil { // Now build all the indexes in the new partitions - err = w.addTableIndex(ctx, t, reorgInfo) - if err != nil { - return errors.Trace(err) + // apart from non-clustered index tables, where new partitions already + // created its indexes together with the table records. + if isClustered { + err = w.addTableIndex(ctx, t, reorgInfo) + if err != nil { + return errors.Trace(err) + } } // All indexes are up-to-date for new partitions, // now we only need to add the existing non-touched partitions diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index 2960f109b669c..74ce27403f107 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -376,6 +376,11 @@ func onRollbackReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int6 job.State = model.JobStateCancelled return ver, errors.Trace(err) } + if job.SchemaState == model.StatePublic { + // We started to destroy the old indexes, so we can no longer rollback! + job.State = model.JobStateRunning + return ver, nil + } jobCtx.jobArgs = args return rollbackReorganizePartitionWithErr(jobCtx, job, dbterror.ErrCancelledDDLJob) diff --git a/pkg/ddl/sanity_check.go b/pkg/ddl/sanity_check.go index 19e4be7f82f99..9c876c5606ecf 100644 --- a/pkg/ddl/sanity_check.go +++ b/pkg/ddl/sanity_check.go @@ -46,7 +46,7 @@ func (e *executor) checkDeleteRangeCnt(job *model.Job) { panic(err) } if actualCnt != expectedCnt { - panic(fmt.Sprintf("expect delete range count %d, actual count %d", expectedCnt, actualCnt)) + panic(fmt.Sprintf("expect delete range count %d, actual count %d for job type '%s'", expectedCnt, actualCnt, job.Type.String())) } } @@ -110,7 +110,7 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) { if err != nil { return 0, errors.Trace(err) } - return len(args.OldPhysicalTblIDs), nil + return len(args.OldPhysicalTblIDs) + len(args.OldGlobalIndexes), nil case model.ActionAddIndex, model.ActionAddPrimaryKey: args, err := model.GetFinishedModifyIndexArgs(job) if err != nil { diff --git a/pkg/ddl/schema_version.go b/pkg/ddl/schema_version.go index bbd823b7953de..3b206bb60ac79 100644 --- a/pkg/ddl/schema_version.go +++ b/pkg/ddl/schema_version.go @@ -221,7 +221,7 @@ func SetSchemaDiffForReorganizePartition(diff *model.SchemaDiff, job *model.Job, func SetSchemaDiffForPartitionModify(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) { diff.TableID = job.TableID diff.OldTableID = job.TableID - if job.SchemaState == model.StateDeleteReorganization { + if job.SchemaState == model.StateNone { args := jobCtx.jobArgs.(*model.TablePartitionArgs) partInfo := args.PartInfo // Final part, new table id is assigned diff --git a/pkg/ddl/tests/partition/BUILD.bazel b/pkg/ddl/tests/partition/BUILD.bazel index 8f6564a732171..7ebf8f987333b 100644 --- a/pkg/ddl/tests/partition/BUILD.bazel +++ b/pkg/ddl/tests/partition/BUILD.bazel @@ -30,6 +30,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/sessiontxn", + "//pkg/store/gcworker", "//pkg/store/mockstore", "//pkg/table", "//pkg/table/tables", diff --git a/pkg/ddl/tests/partition/db_partition_test.go b/pkg/ddl/tests/partition/db_partition_test.go index 18f1999063d84..7e0634440e871 100644 --- a/pkg/ddl/tests/partition/db_partition_test.go +++ b/pkg/ddl/tests/partition/db_partition_test.go @@ -1336,14 +1336,32 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) { partition p3 values less than (30) );`) tk.MustExec("alter table test_global add unique index idx_b (b) global") - tk.MustExec("insert into test_global values (1, 1, 1), (8, 8, 8), (11, 11, 11), (12, 12, 12);") + tk.MustExec("insert into test_global values (1, 1, 1), (2, 2, 2), (11, 11, 11), (12, 12, 12)") + doneMap := make(map[model.SchemaState]struct{}) testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunBefore", func(job *model.Job) { assert.Equal(t, model.ActionDropTablePartition, job.Type) - if job.SchemaState == model.StateDeleteOnly { - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - tk2.MustExec("insert into test_global values (9, 9, 9)") + if _, ok := doneMap[job.SchemaState]; ok { + return + } + doneMap[job.SchemaState] = struct{}{} + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + switch job.SchemaState { + case model.StatePublic: + tk2.MustExec("insert into test_global values (3, 3, 3)") + tk2.MustExec("insert into test_global values (13, 13, 13)") + case model.StateWriteOnly: + tk2.MustContainErrMsg("insert into test_global values (4, 4, 4)", "[table:1526]Table has no partition for value matching a partition being dropped, 'p1'") + tk2.MustExec("insert into test_global values (14, 14, 14)") + case model.StateDeleteOnly: + tk2.MustExec("insert into test_global values (5, 5, 5)") + tk2.MustExec("insert into test_global values (15, 15, 15)") + case model.StateDeleteReorganization: + tk2.MustExec("insert into test_global values (6, 6, 6)") + tk2.MustExec("insert into test_global values (16, 16, 16)") + default: + require.Fail(t, "invalid schema state '%s'", job.SchemaState.String()) } }) @@ -1352,7 +1370,7 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) { tk1.MustExec("alter table test_global drop partition p1") tk.MustExec("analyze table test_global") - tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("9 9 9", "11 11 11", "12 12 12")) + tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("5 5 5", "6 6 6", "11 11 11", "12 12 12", "13 13 13", "14 14 14", "15 15 15", "16 16 16")) } func TestGlobalIndexUpdateInDropPartition(t *testing.T) { @@ -3168,10 +3186,10 @@ func TestRemovePartitioningAutoIDs(t *testing.T) { tk3.MustExec(`COMMIT`) tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows( "13 11 11", "14 2 2", "15 12 12", "17 16 18", - "19 18 4", "21 20 5", "23 22 6", "25 24 7", "30 29 9")) + "19 18 4", "21 20 5", "23 22 6", "25 24 7", "29 28 9")) tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows( "13 11 11", "14 2 2", "15 12 12", "17 16 18", - "19 18 4", "23 22 6", "27 26 8", "32 31 10")) + "19 18 4", "23 22 6", "27 26 8", "31 30 10")) waitFor(4, "t", "write reorganization") tk3.MustExec(`BEGIN`) @@ -3181,30 +3199,66 @@ func TestRemovePartitioningAutoIDs(t *testing.T) { tk3.MustExec(`insert into t values (null, 23)`) tk2.MustExec(`COMMIT`) - /* - // Currently there is an duplicate entry issue, so it will rollback in WriteReorganization - // instead of continuing. - waitFor(4, "t", "delete reorganization") - tk2.MustExec(`BEGIN`) - tk2.MustExec(`insert into t values (null, 24)`) + waitFor(4, "t", "delete reorganization") + tk2.MustExec(`BEGIN`) + tk2.MustExec(`insert into t values (null, 24)`) - tk3.MustExec(`insert into t values (null, 25)`) - tk2.MustExec(`insert into t values (null, 26)`) - */ + tk3.MustExec(`insert into t values (null, 25)`) + tk2.MustExec(`insert into t values (null, 26)`) tk3.MustExec(`COMMIT`) + tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows( + "27 26 8", + "30012 12 12", + "30013 18 4", + "30014 24 7", + "30015 16 18", + "30016 22 6", + "30017 28 9", + "30018 11 11", + "30019 2 2", + "30020 20 5", + "31 30 10", + "35 34 22", + "39 38 24", + "43 42 26")) tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows( - "13 11 11", "14 2 2", "15 12 12", "17 16 18", - "19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9", - "32 31 10", "35 34 21", "38 37 22", "41 40 23")) - - //waitFor(4, "t", "public") - //tk2.MustExec(`commit`) - // TODO: Investigate and fix, but it is also related to https://github.com/pingcap/tidb/issues/46904 - require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '31' for key 't.PRIMARY'") + "27 26 8", + "30012 12 12", + "30013 18 4", + "30014 24 7", + "30015 16 18", + "30016 22 6", + "30017 28 9", + "30018 11 11", + "30019 2 2", + "30020 20 5", + "31 30 10", + "33 32 21", + "35 34 22", + "37 36 23", + "41 40 25")) + + waitFor(4, "t", "public") + tk2.MustExec(`commit`) tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows( - "13 11 11", "14 2 2", "15 12 12", "17 16 18", - "19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9", - "32 31 10", "35 34 21", "38 37 22", "41 40 23")) + "27 26 8", + "30012 12 12", + "30013 18 4", + "30014 24 7", + "30015 16 18", + "30016 22 6", + "30017 28 9", + "30018 11 11", + "30019 2 2", + "30020 20 5", + "31 30 10", + "33 32 21", + "35 34 22", + "37 36 23", + "39 38 24", + "41 40 25", + "43 42 26")) + require.NoError(t, <-alterChan) } func TestAlterLastIntervalPartition(t *testing.T) { diff --git a/pkg/ddl/tests/partition/multi_domain_test.go b/pkg/ddl/tests/partition/multi_domain_test.go index eb618bd1f1f4f..e435bc7cc8708 100644 --- a/pkg/ddl/tests/partition/multi_domain_test.go +++ b/pkg/ddl/tests/partition/multi_domain_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/hex" "fmt" + "math" "testing" "time" @@ -27,6 +28,7 @@ import ( pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessiontxn" + "github.com/pingcap/tidb/pkg/store/gcworker" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" @@ -55,7 +57,7 @@ func TestMultiSchemaReorganizePartitionIssue56819(t *testing.T) { } func TestMultiSchemaDropRangePartition(t *testing.T) { - createSQL := `create table t (a int primary key, b varchar(255)) partition by range (a) (partition p0 values less than (100), partition p1 values less than (200))` + createSQL := `create table t (a int primary key, b varchar(255), unique key (b) global, unique key (b,a) global, unique key (b,a)) partition by range (a) (partition p0 values less than (100), partition p1 values less than (200))` initFn := func(tkO *testkit.TestKit) { tkO.MustExec(`insert into t values (1,1),(2,2),(101,101),(102,102)`) } @@ -70,39 +72,42 @@ func TestMultiSchemaDropRangePartition(t *testing.T) { // tkO see non-readable/non-writable p0 partition, and should try to read from p1 // in case there is something written to overlapping p1 tkO.MustContainErrMsg(`insert into t values (1,1)`, "[table:1526]Table has no partition for value matching a partition being dropped, 'p0'") - tkNO.MustContainErrMsg(`insert into t values (1,1)`, "[kv:1062]Duplicate entry '1' for key 't.PRIMARY'") - tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.PRIMARY'") - tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.PRIMARY'") + tkNO.MustContainErrMsg(`insert into t values (1,1)`, "[kv:1062]Duplicate entry '1' for key 't.") + tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.") + tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.") tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "101 101", "102 102", "2 2")) tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("101 101", "102 102")) case "delete only": // tkNO see non-readable/non-writable p0 partition, and should try to read from p1 // in case there is something written to overlapping p1 // tkO is not aware of p0. - tkO.MustExec(`insert into t values (1,2)`) - tkNO.MustContainErrMsg(`insert into t values (1,2)`, "[table:1526]Table has no partition for value matching a partition being dropped, 'p0'") - tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.PRIMARY'") - tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.PRIMARY'") - tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 2", "101 101", "102 102")) + tkO.MustExec(`insert into t values (1,20)`) + tkNO.MustContainErrMsg(`insert into t values (1,20)`, "[table:1526]Table has no partition for value matching a partition being dropped, 'p0'") + tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.") + tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.") + tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 20", "101 101", "102 102")) // Original row should not be seen in StateWriteOnly tkNO.MustQuery(`select * from t partition (p0)`).Sort().Check(testkit.Rows()) tkNO.MustContainErrMsg(`select * from t partition (pNonExisting)`, "[table:1735]Unknown partition 'pnonexisting' in table 't'") - tkNO.MustQuery(`select * from t partition (p1)`).Sort().Check(testkit.Rows("1 2", "101 101", "102 102")) - tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("1 2", "101 101", "102 102")) - tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("1 2", "101 101", "102 102")) - tkNO.MustQuery(`select * from t where a = 1`).Sort().Check(testkit.Rows("1 2")) - tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3`).Sort().Check(testkit.Rows("1 2")) - tkNO.MustQuery(`select * from t where a in (1,2,3)`).Sort().Check(testkit.Rows("1 2")) - tkNO.MustQuery(`select * from t where a < 100`).Sort().Check(testkit.Rows("1 2")) + tkNO.MustQuery(`select * from t partition (p1)`).Sort().Check(testkit.Rows("1 20", "101 101", "102 102")) + tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("1 20", "101 101", "102 102")) + tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("1 20", "101 101", "102 102")) + tkNO.MustQuery(`select * from t where a = 1`).Sort().Check(testkit.Rows("1 20")) + tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3`).Sort().Check(testkit.Rows("1 20")) + tkNO.MustQuery(`select * from t where a in (1,2,3)`).Sort().Check(testkit.Rows("1 20")) + tkNO.MustQuery(`select * from t where a < 100`).Sort().Check(testkit.Rows("1 20")) - tkNO.MustQuery(`select * from t where b = 2`).Sort().Check(testkit.Rows("1 2")) + tkNO.MustQuery(`select * from t where b = 20`).Sort().Check(testkit.Rows("1 20")) // TODO: Test update and delete! // TODO: test key, hash and list partition without default partition :) tkNO.MustQuery(`show create table t`).Check(testkit.Rows("" + "t CREATE TABLE `t` (\n" + " `a` int(11) NOT NULL,\n" + " `b` varchar(255) DEFAULT NULL,\n" + - " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `b` (`b`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_2` (`b`,`a`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_3` (`b`,`a`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + "PARTITION BY RANGE (`a`)\n" + "(PARTITION `p0` VALUES LESS THAN (100),\n" + @@ -111,7 +116,10 @@ func TestMultiSchemaDropRangePartition(t *testing.T) { "t CREATE TABLE `t` (\n" + " `a` int(11) NOT NULL,\n" + " `b` varchar(255) DEFAULT NULL,\n" + - " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `b` (`b`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_2` (`b`,`a`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_3` (`b`,`a`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + "PARTITION BY RANGE (`a`)\n" + "(PARTITION `p1` VALUES LESS THAN (200))")) @@ -127,7 +135,7 @@ func TestMultiSchemaDropRangePartition(t *testing.T) { } func TestMultiSchemaDropListDefaultPartition(t *testing.T) { - createSQL := `create table t (a int primary key, b varchar(255)) partition by list (a) (partition p0 values in (1,2,3), partition p1 values in (100,101,102,DEFAULT))` + createSQL := `create table t (a int primary key, b varchar(255), unique key (b) global, unique key (b,a) global, unique key (b,a)) partition by list (a) (partition p0 values in (1,2,3), partition p1 values in (100,101,102,DEFAULT))` initFn := func(tkO *testkit.TestKit) { tkO.MustExec(`insert into t values (1,1),(2,2),(101,101),(102,102)`) } @@ -142,32 +150,32 @@ func TestMultiSchemaDropListDefaultPartition(t *testing.T) { // tkO see non-readable/non-writable p0 partition, and should try to read from p1 // in case there is something written to overlapping p1 tkO.MustContainErrMsg(`insert into t values (1,1)`, "[table:1526]Table has no partition for value matching a partition being dropped, 'p0'") - tkNO.MustContainErrMsg(`insert into t values (1,1)`, "[kv:1062]Duplicate entry '1' for key 't.PRIMARY'") - tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.PRIMARY'") - tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.PRIMARY'") + tkNO.MustContainErrMsg(`insert into t values (1,1)`, "[kv:1062]Duplicate entry '1' for key 't.") + tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.") + tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.") tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "101 101", "102 102", "2 2")) tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("101 101", "102 102")) case "delete only": // tkNO see non-readable/non-writable p0 partition, and should try to read from p1 // in case there is something written to overlapping p1 // tkO is not aware of p0. - tkO.MustExec(`insert into t values (1,2)`) - tkNO.MustContainErrMsg(`insert into t values (1,2)`, "[table:1526]Table has no partition for value matching a partition being dropped, 'p0'") - tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.PRIMARY'") - tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.PRIMARY'") - tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 2", "101 101", "102 102")) + tkO.MustExec(`insert into t values (1,20)`) + tkNO.MustContainErrMsg(`insert into t values (1,20)`, "[table:1526]Table has no partition for value matching a partition being dropped, 'p0'") + tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.") + tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.") + tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 20", "101 101", "102 102")) // Original row should not be seen in StateWriteOnly tkNO.MustQuery(`select * from t partition (p0)`).Sort().Check(testkit.Rows()) tkNO.MustContainErrMsg(`select * from t partition (pNonExisting)`, "[table:1735]Unknown partition 'pnonexisting' in table 't'") - tkNO.MustQuery(`select * from t partition (p1)`).Sort().Check(testkit.Rows("1 2", "101 101", "102 102")) - tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("1 2", "101 101", "102 102")) - tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("1 2", "101 101", "102 102")) - tkNO.MustQuery(`select * from t where a = 1`).Sort().Check(testkit.Rows("1 2")) - tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3`).Sort().Check(testkit.Rows("1 2")) - tkNO.MustQuery(`select * from t where a in (1,2,3)`).Sort().Check(testkit.Rows("1 2")) - tkNO.MustQuery(`select * from t where a < 100`).Sort().Check(testkit.Rows("1 2")) + tkNO.MustQuery(`select * from t partition (p1)`).Sort().Check(testkit.Rows("1 20", "101 101", "102 102")) + tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("1 20", "101 101", "102 102")) + tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("1 20", "101 101", "102 102")) + tkNO.MustQuery(`select * from t where a = 1`).Sort().Check(testkit.Rows("1 20")) + tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3`).Sort().Check(testkit.Rows("1 20")) + tkNO.MustQuery(`select * from t where a in (1,2,3)`).Sort().Check(testkit.Rows("1 20")) + tkNO.MustQuery(`select * from t where a < 100`).Sort().Check(testkit.Rows("1 20")) - tkNO.MustQuery(`select * from t where b = 2`).Sort().Check(testkit.Rows("1 2")) + tkNO.MustQuery(`select * from t where b = 20`).Sort().Check(testkit.Rows("1 20")) // TODO: Test update and delete! // TODO: test key, hash and list partition without default partition :) // Should we see the partition or not?!? @@ -175,7 +183,10 @@ func TestMultiSchemaDropListDefaultPartition(t *testing.T) { "t CREATE TABLE `t` (\n" + " `a` int(11) NOT NULL,\n" + " `b` varchar(255) DEFAULT NULL,\n" + - " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `b` (`b`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_2` (`b`,`a`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_3` (`b`,`a`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + "PARTITION BY LIST (`a`)\n" + "(PARTITION `p0` VALUES IN (1,2,3),\n" + @@ -184,7 +195,10 @@ func TestMultiSchemaDropListDefaultPartition(t *testing.T) { "t CREATE TABLE `t` (\n" + " `a` int(11) NOT NULL,\n" + " `b` varchar(255) DEFAULT NULL,\n" + - " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `b` (`b`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_2` (`b`,`a`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_3` (`b`,`a`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + "PARTITION BY LIST (`a`)\n" + "(PARTITION `p1` VALUES IN (100,101,102,DEFAULT))")) @@ -200,7 +214,7 @@ func TestMultiSchemaDropListDefaultPartition(t *testing.T) { } func TestMultiSchemaDropListColumnsDefaultPartition(t *testing.T) { - createSQL := `create table t (a int, b varchar(255), c varchar (255), primary key (a,b)) partition by list columns (a,b) (partition p0 values in ((1,"1"),(2,"2"),(3,"3")), partition p1 values in ((100,"100"),(101,"101"),(102,"102"),DEFAULT))` + createSQL := `create table t (a int, b varchar(255), c varchar (255), primary key (a,b), unique key (a) global, unique key (b,a) global, unique key (c) global, unique key (b,a)) partition by list columns (a,b) (partition p0 values in ((1,"1"),(2,"2"),(3,"3")), partition p1 values in ((100,"100"),(101,"101"),(102,"102"),DEFAULT))` initFn := func(tkO *testkit.TestKit) { tkO.MustExec(`insert into t values (1,1,1),(2,2,2),(101,101,101),(102,102,102)`) } @@ -215,34 +229,36 @@ func TestMultiSchemaDropListColumnsDefaultPartition(t *testing.T) { // tkO see non-readable/non-writable p0 partition, and should try to read from p1 // in case there is something written to overlapping p1 tkO.MustContainErrMsg(`insert into t values (1,1,1)`, "[table:1526]Table has no partition for value matching a partition being dropped, 'p0'") - tkNO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1-1' for key 't.PRIMARY'") - tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101-101' for key 't.PRIMARY'") - tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101-101' for key 't.PRIMARY'") + tkNO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1' for key 't.a_2'") + tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.a_2'") + tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.a_2'") tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2")) tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("101 101 101", "102 102 102")) case "delete only": // tkNO see non-readable/non-writable p0 partition, and should try to read from p1 // in case there is something written to overlapping p1 // tkO is not aware of p0. - tkO.MustExec(`insert into t values (1,1,2)`) - tkNO.MustContainErrMsg(`insert into t values (1,1,2)`, "[table:1526]Table has no partition for value matching a partition being dropped, 'p0'") - tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101-101' for key 't.PRIMARY'") - tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101-101' for key 't.PRIMARY'") - tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 2", "101 101 101", "102 102 102")) + tkO.MustExec(`insert into t values (3,3,3)`) + tkO.MustContainErrMsg(`insert into t values (1,1,2)`, "[kv:1062]Duplicate entry '1' for key 't.a_2") + tkNO.MustContainErrMsg(`insert into t values (3,3,3)`, "[table:1526]Table has no partition for value matching a partition being dropped, 'p0'") + tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.a_2'") + tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.a_2'") + tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("101 101 101", "102 102 102", "3 3 3")) + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("101 101 101", "102 102 102", "3 3 3")) // Original row should not be seen in StateWriteOnly tkNO.MustQuery(`select * from t partition (p0)`).Sort().Check(testkit.Rows()) tkNO.MustContainErrMsg(`select * from t partition (pNonExisting)`, "[table:1735]Unknown partition 'pnonexisting' in table 't'") - tkNO.MustQuery(`select * from t partition (p1)`).Sort().Check(testkit.Rows("1 1 2", "101 101 101", "102 102 102")) - tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("1 1 2", "101 101 101", "102 102 102")) - tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("1 1 2", "101 101 101", "102 102 102")) - tkNO.MustQuery(`select * from t where a = 1`).Sort().Check(testkit.Rows("1 1 2")) - tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3`).Sort().Check(testkit.Rows("1 1 2")) - tkNO.MustQuery(`select * from t where a in (1,2,3) or b in ("1","2")`).Sort().Check(testkit.Rows("1 1 2")) - tkNO.MustQuery(`select * from t where a in (1,2,3)`).Sort().Check(testkit.Rows("1 1 2")) - tkNO.MustQuery(`select * from t where a < 100`).Sort().Check(testkit.Rows("1 1 2")) + tkNO.MustQuery(`select * from t partition (p1)`).Sort().Check(testkit.Rows("101 101 101", "102 102 102", "3 3 3")) + tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("101 101 101", "102 102 102", "3 3 3")) + tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("101 101 101", "102 102 102", "3 3 3")) + tkNO.MustQuery(`select * from t where a = 3`).Sort().Check(testkit.Rows("3 3 3")) + tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3`).Sort().Check(testkit.Rows("3 3 3")) + tkNO.MustQuery(`select * from t where a in (1,2,3) or b in ("1","2")`).Sort().Check(testkit.Rows("3 3 3")) + tkNO.MustQuery(`select * from t where a in (1,2,3)`).Sort().Check(testkit.Rows("3 3 3")) + tkNO.MustQuery(`select * from t where a < 100`).Sort().Check(testkit.Rows("3 3 3")) - tkNO.MustQuery(`select * from t where c = "2"`).Sort().Check(testkit.Rows("1 1 2")) - tkNO.MustQuery(`select * from t where b = "1"`).Sort().Check(testkit.Rows("1 1 2")) + tkNO.MustQuery(`select * from t where c = "2"`).Sort().Check(testkit.Rows("2 2 2")) + tkNO.MustQuery(`select * from t where b = "3"`).Sort().Check(testkit.Rows("3 3 3")) // TODO: Test update and delete! // TODO: test key, hash and list partition without default partition :) // Should we see the partition or not?!? @@ -251,7 +267,11 @@ func TestMultiSchemaDropListColumnsDefaultPartition(t *testing.T) { " `a` int(11) NOT NULL,\n" + " `b` varchar(255) NOT NULL,\n" + " `c` varchar(255) DEFAULT NULL,\n" + - " PRIMARY KEY (`a`,`b`) /*T![clustered_index] CLUSTERED */\n" + + " PRIMARY KEY (`a`,`b`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `a_2` (`a`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b` (`b`,`a`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `c` (`c`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_2` (`b`,`a`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + "PARTITION BY LIST COLUMNS(`a`,`b`)\n" + "(PARTITION `p0` VALUES IN ((1,'1'),(2,'2'),(3,'3')),\n" + @@ -261,7 +281,11 @@ func TestMultiSchemaDropListColumnsDefaultPartition(t *testing.T) { " `a` int(11) NOT NULL,\n" + " `b` varchar(255) NOT NULL,\n" + " `c` varchar(255) DEFAULT NULL,\n" + - " PRIMARY KEY (`a`,`b`) /*T![clustered_index] CLUSTERED */\n" + + " PRIMARY KEY (`a`,`b`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `a_2` (`a`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b` (`b`,`a`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `c` (`c`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `b_2` (`b`,`a`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + "PARTITION BY LIST COLUMNS(`a`,`b`)\n" + "(PARTITION `p1` VALUES IN ((100,'100'),(101,'101'),(102,'102'),DEFAULT))")) @@ -278,27 +302,8 @@ func TestMultiSchemaDropListColumnsDefaultPartition(t *testing.T) { func TestMultiSchemaReorganizePartition(t *testing.T) { createSQL := `create table t (a int primary key, b varchar(255), unique index idx_b_global (b) global) partition by range (a) (partition p1 values less than (200), partition pMax values less than (maxvalue))` - originalPartitions := make([]int64, 0, 2) - originalIndexIDs := make([]int64, 0, 1) - originalGlobalIndexIDs := make([]int64, 0, 1) - tableID := int64(0) initFn := func(tkO *testkit.TestKit) { tkO.MustExec(`insert into t values (1,1),(2,2),(101,101),(102,102),(998,998),(999,999)`) - ctx := tkO.Session() - is := domain.GetDomain(ctx).InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) - require.NoError(t, err) - tableID = tbl.Meta().ID - for _, def := range tbl.Meta().Partition.Definitions { - originalPartitions = append(originalPartitions, def.ID) - } - for _, idx := range tbl.Meta().Indices { - if idx.Global { - originalGlobalIndexIDs = append(originalGlobalIndexIDs, idx.ID) - continue - } - originalIndexIDs = append(originalIndexIDs, idx.ID) - } } alterSQL := `alter table t reorganize partition p1 into (partition p0 values less than (100), partition p1 values less than (200))` @@ -313,8 +318,8 @@ func TestMultiSchemaReorganizePartition(t *testing.T) { tkNO.MustContainErrMsg(`insert into t values (1,2)`+dbgStr, "[kv:1062]Duplicate entry") tkO.MustContainErrMsg(`insert into t values (101,101)`+dbgStr, "[kv:1062]Duplicate entry") tkNO.MustContainErrMsg(`insert into t values (101,101)`+dbgStr, "[kv:1062]Duplicate entry") - tkO.MustContainErrMsg(`insert into t values (999,999)`+dbgStr, "[kv:1062]Duplicate entry '999' for key 't.idx_b_global'") - tkNO.MustContainErrMsg(`insert into t values (999,999)`+dbgStr, "[kv:1062]Duplicate entry '999' for key 't.idx_b_global'") + tkO.MustContainErrMsg(`insert into t values (999,999)`+dbgStr, "[kv:1062]Duplicate entry '999' for key 't.") + tkNO.MustContainErrMsg(`insert into t values (999,999)`+dbgStr, "[kv:1062]Duplicate entry '999' for key 't.") tkNO.MustQuery(`select * from t where a = 1` + dbgStr).Sort().Check(testkit.Rows("1 1")) tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3` + dbgStr).Sort().Check(testkit.Rows("1 1", "2 2")) tkNO.MustQuery(`select * from t where a in (1,2,3)` + dbgStr).Sort().Check(testkit.Rows("1 1", "2 2")) @@ -336,12 +341,25 @@ func TestMultiSchemaReorganizePartition(t *testing.T) { tkO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID)) tkNO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID))) - logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID)) + logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID), zap.String("state", schemaState)) testID++ tkNO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID)) tkO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID))) + // Test for Index, specially between WriteOnly and DeleteOnly, but better to test all states. + // if tkNO (DeleteOnly) updates a row, the new index should be deleted, but not inserted. + // It will be inserted by backfill in WriteReorganize. + // If not deleted, then there would be an orphan entry in the index! + tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+100, testID)) + tkNO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100))) + tkNO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+100)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100))) + tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+99, testID-1)) + tkO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID-1)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99))) + tkO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+99)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99))) + tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID, testID)) + tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID-1, testID-1)) + switch schemaState { case model.StateDeleteOnly.String(): // tkNO sees original table/partitions as before the DDL stated @@ -387,71 +405,166 @@ func TestMultiSchemaReorganizePartition(t *testing.T) { "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (200),\n" + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + case model.StatePublic.String(): + // not tested, both tkO and tkNO sees same partitions case model.StateNone.String(): + // not tested, both tkO and tkNO sees same partitions default: - require.Failf(t, "unhandled schema state '%s'", schemaState) + require.Failf(t, "unhandled schema state", "State '%s'", schemaState) } } postFn := func(tkO *testkit.TestKit, store kv.Storage) { tkO.MustQuery(`select * from t where b = 5`).Sort().Check(testkit.Rows("5 5")) tkO.MustQuery(`select * from t where b = "5"`).Sort().Check(testkit.Rows("5 5")) tkO.MustExec(`admin check table t`) - tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "998 998", "999 999")) - // TODO: Verify that there are no KV entries for old partitions or old indexes!!! - delRange := tkO.MustQuery(`select * from mysql.gc_delete_range_done`).Rows() - s := "" - for _, row := range delRange { - if s != "" { - s += "\n" - } - for i, col := range row { - if i != 0 { - s += " " - } - s += col.(string) - } - } - logutil.BgLogger().Info("gc_delete_range_done", zap.String("rows", s)) - tkO.MustQuery(`select * from mysql.gc_delete_range`).Check(testkit.Rows()) - ctx := tkO.Session() - is := domain.GetDomain(ctx).InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) - require.NoError(t, err) - tableID = tbl.Meta().ID - // Save this for the fix of https://github.com/pingcap/tidb/issues/56822 - //GlobalLoop: - // for _, globIdx := range originalGlobalIndexIDs { - // for _, idx := range tbl.Meta().Indices { - // if idx.ID == globIdx { - // continue GlobalLoop - // } - // } - // // Global index removed - // require.False(t, HaveEntriesForTableIndex(t, tkO, tableID, globIdx), "Global index id %d for table id %d has still entries!", globIdx, tableID) - // } - LocalLoop: - for _, locIdx := range originalIndexIDs { - for _, idx := range tbl.Meta().Indices { - if idx.ID == locIdx { - continue LocalLoop - } - } - // local index removed - for _, part := range tbl.Meta().Partition.Definitions { - require.False(t, HaveEntriesForTableIndex(t, tkO, part.ID, locIdx), "Local index id %d for partition id %d has still entries!", locIdx, tableID) + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "15 15", "16 16", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "994 994", "995 995", "998 998", "999 999")) + tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `idx_b_global` (`b`) /*T![global_index] GLOBAL */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (100),\n" + + " PARTITION `p1` VALUES LESS THAN (200),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn) +} + +// Also tests for conversions of unique indexes +// 1 unique non-global - to become global +// 2 unique global - to become non-global +// 3 unique non-global - to stay non-global +// 4 unique global - to stay global +func TestMultiSchemaPartitionByGlobalIndex(t *testing.T) { + createSQL := `create table t (a int primary key nonclustered global, b varchar(255), c bigint, unique index idx_b_global (b) global, unique key idx_ba (b,a), unique key idx_ab (a,b) global, unique key idx_c_global (c) global, unique key idx_cab (c,a,b)) partition by key (a,b) partitions 3` + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(`insert into t values (1,1,1),(2,2,2),(101,101,101),(102,102,102)`) + } + alterSQL := `alter table t partition by key (b,a) partitions 5 update indexes (idx_ba global, idx_ab local)` + doneStateWriteReorganize := false + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + switch schemaState { + case model.StateDeleteOnly.String(): + // tkNO sees original table/partitions as before the DDL stated + // tkO uses the original table/partitions, but should also delete from the newly created + // Global Index, to replace the existing one. + tkO.MustContainErrMsg(`insert into t values (1,2,3)`, "[kv:1062]Duplicate entry '2' for key 't.idx_b") + tkNO.MustContainErrMsg(`insert into t values (1,2,3)`, "[kv:1062]Duplicate entry '2' for key 't.idx_b") + tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b") + tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b") + tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2")) + tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2")) + tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2")) + tkNO.MustQuery(`select * from t where a = 1`).Sort().Check(testkit.Rows("1 1 1")) + tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3`).Sort().Check(testkit.Rows("1 1 1", "2 2 2")) + tkNO.MustQuery(`select * from t where a in (1,2,3)`).Sort().Check(testkit.Rows("1 1 1", "2 2 2")) + tkNO.MustQuery(`select * from t where a < 100`).Sort().Check(testkit.Rows("1 1 1", "2 2 2")) + + tkNO.MustQuery(`select * from t where b = 2`).Sort().Check(testkit.Rows("2 2 2")) + tkO.MustExec(`insert into t values (3,3,3)`) + tkNO.MustExec(`insert into t values (4,4,4)`) + tkNO.MustQuery(`select * from t where a = 3`).Sort().Check(testkit.Rows("3 3 3")) + tkO.MustQuery(`select * from t where a = 4`).Sort().Check(testkit.Rows("4 4 4")) + case model.StateWriteOnly.String(): + // Both tkO and tkNO uses the original table/partitions, + // but tkO should also update the newly created + // Global Index, and tkNO should only delete from it. + tkO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1' for key 't.idx_b") + tkNO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1' for key 't.idx_b") + tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b") + tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b") + tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2", "3 3 3", "4 4 4")) + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2", "3 3 3", "4 4 4")) + logutil.BgLogger().Info("insert into t values (5,5,5)") + tkO.MustExec(`insert into t values (5,5,5)`) + tkNO.MustExec(`insert into t values (6,6,6)`) + tkNO.MustQuery(`select * from t where a = 5`).Sort().Check(testkit.Rows("5 5 5")) + tkO.MustQuery(`select * from t where a = 6`).Sort().Check(testkit.Rows("6 6 6")) + case model.StateWriteReorganization.String(): + // It will go through StateWriteReorg more than once. + if doneStateWriteReorganize { + break } + doneStateWriteReorganize = true + // Both tkO and tkNO uses the original table/partitions, + // and should also update the newly created Global Index. + tkO.MustExec(`insert into t values (7,7,7)`) + tkNO.MustExec(`insert into t values (8,8,8)`) + tkNO.MustQuery(`select * from t where b = 7`).Check(testkit.Rows("7 7 7")) + tkO.MustQuery(`select * from t where b = 8`).Check(testkit.Rows("8 8 8")) + case model.StateDeleteReorganization.String(): + // Both tkO now sees the new partitions, and should use the new Global Index, + // plus double write to the old one. + // tkNO uses the original table/partitions, + // and should also update the newly created Global Index. + tkO.MustExec(`insert into t values (9,9,9)`) + tkNO.MustExec(`insert into t values (10,10,10)`) + tkNO.MustQuery(`select * from t where b = 9`).Check(testkit.Rows("9 9 9")) + tkO.MustQuery(`select * from t where b = 10`).Check(testkit.Rows("10 10 10")) + // TODO: Test update and delete! + // TODO: test key, hash and list partition without default partition :) + tkNO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` bigint(20) DEFAULT NULL,\n" + + " UNIQUE KEY `idx_b_global` (`b`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `idx_ba` (`b`,`a`),\n" + + " UNIQUE KEY `idx_ab` (`a`,`b`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `idx_c_global` (`c`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `idx_cab` (`c`,`a`,`b`),\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */ /*T![global_index] GLOBAL */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY KEY (`a`,`b`) PARTITIONS 3")) + tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` bigint(20) DEFAULT NULL,\n" + + " UNIQUE KEY `idx_cab` (`c`,`a`,`b`),\n" + + " UNIQUE KEY `idx_b_global` (`b`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `idx_ba` (`b`,`a`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `idx_ab` (`a`,`b`),\n" + + " UNIQUE KEY `idx_c_global` (`c`) /*T![global_index] GLOBAL */,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */ /*T![global_index] GLOBAL */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY KEY (`b`,`a`) PARTITIONS 5")) + case model.StatePublic.String(): + tkO.MustExec(`insert into t values (11,11,11)`) + tkNO.MustExec(`insert into t values (12,12,12)`) + case model.StateNone.String(): + tkO.MustExec(`insert into t values (13,13,13)`) + tkNO.MustExec(`insert into t values (14,14,14)`) + tkO.MustQuery(`select * from t where b = 11`).Check(testkit.Rows("11 11 11")) + default: + require.Failf(t, "unhandled schema state '%s'", schemaState) } - // TODO: Fix cleanup issues, most likely it needs one more SchemaState in onReorganizePartition - //PartitionLoop: - // for _, partID := range originalPartitions { - // for _, def := range tbl.Meta().Partition.Definitions { - // if def.ID == partID { - // continue PartitionLoop - // } - // } - // // old partitions removed - // require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID) - // } + } + postFn := func(tkO *testkit.TestKit, _ kv.Storage) { + tkO.MustQuery(`select * from t where b = 5`).Check(testkit.Rows("5 5 5")) + tkO.MustExec(`admin check table t`) + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "1 1 1", + "10 10 10", + "101 101 101", + "102 102 102", + "11 11 11", + "12 12 12", + "13 13 13", + "14 14 14", + "2 2 2", + "3 3 3", + "4 4 4", + "5 5 5", + "6 6 6", + "7 7 7", + "8 8 8", + "9 9 9")) } runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn) } @@ -698,6 +811,7 @@ func TestMultiSchemaDropUniqueIndex(t *testing.T) { //} func runMultiSchemaTest(t *testing.T, createSQL, alterSQL string, initFn func(*testkit.TestKit), postFn func(*testkit.TestKit, kv.Storage), loopFn func(tO, tNO *testkit.TestKit)) { + // When debugging, increase the lease, so the schema does not auto reload :) distCtx := testkit.NewDistExecutionContextWithLease(t, 2, 15*time.Second) store := distCtx.Store domOwner := distCtx.GetDomain(0) @@ -724,7 +838,30 @@ func runMultiSchemaTest(t *testing.T, createSQL, alterSQL string, initFn func(*t tkDDLOwner.MustExec(createSQL) domOwner.Reload() domNonOwner.Reload() + + originalPartitions := make([]int64, 0, 2) + originalIndexIDs := make([]int64, 0, 1) + originalGlobalIndexIDs := make([]int64, 0, 1) + ctx := tkO.Session() + is := domain.GetDomain(ctx).InfoSchema() + tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + tableID := tbl.Meta().ID + if tbl.Meta().Partition != nil { + for _, def := range tbl.Meta().Partition.Definitions { + originalPartitions = append(originalPartitions, def.ID) + } + } + for _, idx := range tbl.Meta().Indices { + if idx.Global { + originalGlobalIndexIDs = append(originalGlobalIndexIDs, idx.ID) + continue + } + originalIndexIDs = append(originalIndexIDs, idx.ID) + } + initFn(tkO) + verStart := domNonOwner.InfoSchema().SchemaMetaVersion() hookChan := make(chan struct{}) hookFunc := func(job *model.Job) { @@ -760,6 +897,7 @@ func runMultiSchemaTest(t *testing.T, createSQL, alterSQL string, initFn func(*t domOwner.Reload() if domNonOwner.InfoSchema().SchemaMetaVersion() == domOwner.InfoSchema().SchemaMetaVersion() { // looping over reorganize data/indexes + logutil.BgLogger().Info("XXXXXXXXXXX Schema Version has not changed") hookChan <- struct{}{} continue } @@ -781,6 +919,68 @@ func runMultiSchemaTest(t *testing.T, createSQL, alterSQL string, initFn func(*t hookChan <- struct{}{} } logutil.BgLogger().Info("XXXXXXXXXXX states loop done") + // Verify that there are no KV entries for old partitions or old indexes!!! + gcWorker, err := gcworker.NewMockGCWorker(store) + require.NoError(t, err) + err = gcWorker.DeleteRanges(context.Background(), uint64(math.MaxInt64)) + require.NoError(t, err) + delRange := tkO.MustQuery(`select * from mysql.gc_delete_range_done`).Rows() + s := "" + for _, row := range delRange { + if s != "" { + s += "\n" + } + for i, col := range row { + if i != 0 { + s += " " + } + s += col.(string) + } + } + logutil.BgLogger().Info("gc_delete_range_done", zap.String("rows", s)) + tkO.MustQuery(`select * from mysql.gc_delete_range`).Check(testkit.Rows()) + ctx = tkO.Session() + is = domain.GetDomain(ctx).InfoSchema() + tbl, err = is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) + require.NoError(t, err) + newTableID := tbl.Meta().ID + if tableID != newTableID { + require.False(t, HaveEntriesForTableIndex(t, tkO, tableID, 0), "Old table id %d has still entries!", tableID) + } +GlobalLoop: + for _, globIdx := range originalGlobalIndexIDs { + for _, idx := range tbl.Meta().Indices { + if idx.ID == globIdx { + continue GlobalLoop + } + } + // Global index removed + require.False(t, HaveEntriesForTableIndex(t, tkO, tableID, globIdx), "Global index id %d for table id %d has still entries!", globIdx, tableID) + } +LocalLoop: + for _, locIdx := range originalIndexIDs { + for _, idx := range tbl.Meta().Indices { + if idx.ID == locIdx { + continue LocalLoop + } + } + // local index removed + if tbl.Meta().Partition != nil { + for _, part := range tbl.Meta().Partition.Definitions { + require.False(t, HaveEntriesForTableIndex(t, tkO, part.ID, locIdx), "Local index id %d for partition id %d has still entries!", locIdx, tableID) + } + } + } +PartitionLoop: + for _, partID := range originalPartitions { + for _, def := range tbl.Meta().Partition.Definitions { + if def.ID == partID { + continue PartitionLoop + } + } + // old partitions removed + require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID) + } if postFn != nil { postFn(tkO, store) } @@ -823,6 +1023,44 @@ func HaveEntriesForTableIndex(t *testing.T, tk *testkit.TestKit, tableID, indexI return false } +func TestMultiSchemaReorganizeNoPK(t *testing.T) { + createSQL := `create table t (c1 INT, c2 CHAR(255), c3 CHAR(255), c4 CHAR(255), c5 CHAR(255)) partition by range (c1) (partition p1 values less than (200), partition pMax values less than (maxvalue))` + i := 1 + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(fmt.Sprintf(`insert into t values (%d,repeat('%d', 25),repeat('%d', 25),repeat('%d', 25),repeat('%d', 25))`, i, 9786756453-i, 6821527184-i, 4185725186-i, 7483634197-i)) + i++ + tkO.MustExec(fmt.Sprintf(`insert into t values (%d,repeat('%d', 25),repeat('%d', 25),repeat('%d', 25),repeat('%d', 25))`, i, 9786756453-i, 6821527184-i, 4185725186-i, 7483634197-i)) + i++ + } + alterSQL := `alter table t reorganize partition p1 into (partition p0 values less than (100), partition p1 values less than (200))` + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + tkO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s',concat('O-', repeat('%d', 25)),repeat('%d', 25),repeat('%d', 25))`, i, schemaState, 6821527184-i, 4185725186-i, 7483634197-i)) + i++ + tkNO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s',concat('NO-',repeat('%d', 25)),repeat('%d', 25),repeat('%d', 25))`, i, schemaState, 6821527184-i, 4185725186-i, 7483634197-i)) + i++ + } + postFn := func(tkO *testkit.TestKit, _ kv.Storage) { + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rowsdelete reorganizationpublic O-6821527173682152717368215271736821527173682152717368215271736821527173682152717368215271736821527173682152717368215271736821527173682152717368215271736821527173682152717368215271736821527173682152717368215271736821527173682152717368215271736821527173 4185725175418572517541857251754185725175418572517541857251754185725175418572517541857251754185725175418572517541857251754185725175418572517541857251754185725175418572517541857251754185725175418572517541857251754185725175418572517541857251754185725175 7483634186748363418674836341867483634186748363418674836341867483634186748363418674836341867483634186748363418674836341867483634186748363418674836341867483634186748363418674836341867483634186748363418674836341867483634186748363418674836341867483634186", + "12 publicnonenone NO-6821527170682152717068215271706821527170682152717068215271706821527170682152717068215271706821527170682152717068215271706821527170682152717068215271706821527170682152717068215271706821527170682152717068215271706821527170682152717068215271706821527170 4185725172418572517241857251724185725172418572517241857251724185725172418572517241857251724185725172418572517241857251724185725172418572517241857251724185725172418572517241857251724185725172418572517241857251724185725172418572517241857251724185725172 7483634183748363418374836341837483634183748363418374836341837483634183748363418374836341837483634183748363418374836341837483634183748363418374836341837483634183748363418374836341837483634183748363418374836341837483634183748363418374836341837483634183", + "2 9786756451978675645197867564519786756451978675645197867564519786756451978675645197867564519786756451978675645197867564519786756451978675645197867564519786756451978675645197867564519786756451978675645197867564519786756451978675645197867564519786756451 6821527182682152718268215271826821527182682152718268215271826821527182682152718268215271826821527182682152718268215271826821527182682152718268215271826821527182682152718268215271826821527182682152718268215271826821527182682152718268215271826821527182 4185725184418572518441857251844185725184418572518441857251844185725184418572518441857251844185725184418572518441857251844185725184418572518441857251844185725184418572518441857251844185725184418572518441857251844185725184418572518441857251844185725184 7483634195748363419574836341957483634195748363419574836341957483634195748363419574836341957483634195748363419574836341957483634195748363419574836341957483634195748363419574836341957483634195748363419574836341957483634195748363419574836341957483634195", + "3 delete onlydelete onlywrite onlywrite onlywrite reorganizationwrite reorganizationdelete reorganization} + runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn) +} + // TestMultiSchemaTruncatePartitionWithGlobalIndex to show behavior when // truncating a partition with a global index func TestMultiSchemaTruncatePartitionWithGlobalIndex(t *testing.T) { diff --git a/pkg/ddl/tests/partition/reorg_partition_test.go b/pkg/ddl/tests/partition/reorg_partition_test.go index 54b883c9eff53..572b7b39a4efa 100644 --- a/pkg/ddl/tests/partition/reorg_partition_test.go +++ b/pkg/ddl/tests/partition/reorg_partition_test.go @@ -185,7 +185,7 @@ func TestReorgPartitionFailures(t *testing.T) { afterResult := testkit.Rows( "1 1 1", "12 12 21", "13 13 13", "17 17 17", "18 18 18", "2 2 2", "23 23 32", "45 45 54", "5 5 5", ) - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestRemovePartitionFailures(t *testing.T) { @@ -210,7 +210,7 @@ func TestRemovePartitionFailures(t *testing.T) { `delete from t where b = 102`, } afterResult := testkit.Rows("1 1 1", "101 101 101", "2 2 2", "3 3 3", "4 4 4", "9 9 104") - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestPartitionByFailures(t *testing.T) { @@ -261,7 +261,7 @@ func TestReorganizePartitionListFailures(t *testing.T) { `delete from t where b = 3`, } afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestPartitionByListFailures(t *testing.T) { @@ -309,7 +309,7 @@ func TestAddHashPartitionFailures(t *testing.T) { `delete from t where b = 3`, } afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestCoalesceKeyPartitionFailures(t *testing.T) { @@ -332,7 +332,7 @@ func TestCoalesceKeyPartitionFailures(t *testing.T) { `delete from t where b = 3`, } afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") - testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) } func TestPartitionByNonPartitionedTable(t *testing.T) { @@ -340,7 +340,7 @@ func TestPartitionByNonPartitionedTable(t *testing.T) { alter := `alter table t partition by range (a) (partition p0 values less than (20))` beforeResult := testkit.Rows() afterResult := testkit.Rows() - testReorganizePartitionFailures(t, create, alter, nil, beforeResult, nil, afterResult, "Fail4") + testReorganizePartitionFailures(t, create, alter, nil, beforeResult, nil, afterResult) } func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterResult [][]any, skipTests ...string) { @@ -356,20 +356,24 @@ func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, b // Cancel means we set job.State = JobStateCancelled, as in no need to do more // Rollback means we do full rollback before returning error. tests := []struct { - name string - count int + name string + count int + rollForwardFrom int }{ { "Cancel", 1, + -1, }, { "Fail", 5, + 4, }, { "Rollback", 4, + -1, }, } oldWaitTimeWhenErrorOccurred := ddl.WaitTimeWhenErrorOccurred @@ -386,44 +390,59 @@ func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, b continue SUBTEST } } - tk.MustExec(createSQL) + suffixComment := ` /* ` + suffix + ` */` + tk.MustExec(createSQL + suffixComment) for _, sql := range beforeDML { - tk.MustExec(sql + ` /* ` + suffix + ` */`) + tk.MustExec(sql + suffixComment) } - tk.MustQuery(`select * from t /* ` + suffix + ` */`).Sort().Check(beforeResult) + tk.MustQuery(`select * from t ` + suffixComment).Sort().Check(beforeResult) tOrg := external.GetTableByName(t, tk, "test", "t") var idxID int64 if len(tOrg.Meta().Indices) > 0 { idxID = tOrg.Meta().Indices[0].ID } - oldCreate := tk.MustQuery(`show create table t`).Rows() + oldCreate := tk.MustQuery(`show create table t` + suffixComment).Rows() name := "github.com/pingcap/tidb/pkg/ddl/reorgPart" + suffix - testfailpoint.Enable(t, name, `return(true)`) - err := tk.ExecToErr(alterSQL) - require.Error(t, err, "failpoint reorgPart"+suffix) - require.ErrorContains(t, err, "Injected error by reorgPart"+suffix) - testfailpoint.Disable(t, name) - tk.MustQuery(`show create table t /* ` + suffix + ` */`).Check(oldCreate) + term := "return(true)" + if test.rollForwardFrom > 0 && test.rollForwardFrom <= i { + term = "10*" + term + } + testfailpoint.Enable(t, name, term) + err := tk.ExecToErr(alterSQL + suffixComment) tt := external.GetTableByName(t, tk, "test", "t") partition := tt.Meta().Partition - if partition == nil { - require.Nil(t, tOrg.Meta().Partition, suffix) + rollback := false + if test.rollForwardFrom > 0 && test.rollForwardFrom <= i { + require.NoError(t, err) } else { - require.Equal(t, len(tOrg.Meta().Partition.Definitions), len(partition.Definitions), suffix) - require.Equal(t, 0, len(partition.AddingDefinitions), suffix) - require.Equal(t, 0, len(partition.DroppingDefinitions), suffix) + rollback = true + require.Error(t, err, "failpoint reorgPart"+suffix) + // TODO: gracefully handle failures during WriteReorg also for nonclustered tables + // with unique indexes. + // Currently it can also do: + // Error "[kv:1062]Duplicate entry '7' for key 't.c'" does not contain "Injected error by reorgPartFail2" + //require.ErrorContains(t, err, "Injected error by reorgPart"+suffix) + tk.MustQuery(`show create table t` + suffixComment).Check(oldCreate) + if partition == nil { + require.Nil(t, tOrg.Meta().Partition, suffix) + } else { + require.Equal(t, len(tOrg.Meta().Partition.Definitions), len(partition.Definitions), suffix) + require.Equal(t, 0, len(partition.AddingDefinitions), suffix) + require.Equal(t, 0, len(partition.DroppingDefinitions), suffix) + } + noNewTablesAfter(t, tk, tk.Session(), tOrg, suffix) } + testfailpoint.Disable(t, name) require.Equal(t, len(tOrg.Meta().Indices), len(tt.Meta().Indices), suffix) - if idxID != 0 { + if rollback && idxID != 0 { require.Equal(t, idxID, tt.Meta().Indices[0].ID, suffix) } - noNewTablesAfter(t, tk, tk.Session(), tOrg, suffix) - tk.MustExec(`admin check table t /* ` + suffix + ` */`) + tk.MustExec(`admin check table t` + suffixComment) for _, sql := range afterDML { - tk.MustExec(sql + " /* " + suffix + " */") + tk.MustExec(sql + suffixComment) } - tk.MustQuery(`select * from t /* ` + suffix + ` */`).Sort().Check(afterResult) - tk.MustExec(`drop table t /* ` + suffix + ` */`) + tk.MustQuery(`select * from t` + suffixComment).Sort().Check(afterResult) + tk.MustExec(`drop table t` + suffixComment) // TODO: Check TiFlash replicas // TODO: Check Label rules // TODO: Check bundles @@ -461,7 +480,8 @@ func TestReorgPartitionConcurrent(t *testing.T) { (job.SchemaState == model.StateDeleteOnly || job.SchemaState == model.StateWriteOnly || job.SchemaState == model.StateWriteReorganization || - job.SchemaState == model.StateDeleteReorganization) && + job.SchemaState == model.StateDeleteReorganization || + job.SchemaState == model.StatePublic) && currState != job.SchemaState { currState = job.SchemaState <-wait @@ -557,6 +577,61 @@ func TestReorgPartitionConcurrent(t *testing.T) { "15 15 15", "16 16 16")) currTbl.Meta().Partition = currPart + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1a` VALUES LESS THAN (15),\n" + + " PARTITION `p1b` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + wait <- true + + // StatePublic + wait <- true + tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+ + "10 10 10", + "12 12b 12", + "14 14 14", + "15 15 15", + "16 16 16")) + publicInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() + require.Equal(t, int64(1), publicInfoSchema.SchemaMetaVersion()-deleteReorgInfoSchema.SchemaMetaVersion()) + tk.MustExec(`insert into t values (17, "17", 17)`) + oldTbl, err = deleteReorgInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) + require.NoError(t, err) + partDef = oldTbl.Meta().Partition.Definitions[1] + require.Equal(t, "p1a", partDef.Name.O) + rows = getNumRowsFromPartitionDefs(t, tk, oldTbl, oldTbl.Meta().Partition.Definitions[1:2]) + require.Equal(t, 3, rows) + tk.MustQuery(`select * from t partition (p1a)`).Sort().Check(testkit.Rows("10 10 10", "12 12b 12", "14 14 14")) + currTbl, err = publicInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) + require.NoError(t, err) + currPart = currTbl.Meta().Partition + currTbl.Meta().Partition = oldTbl.Meta().Partition + tk.MustQuery(`select * from t where b = "17"`).Sort().Check(testkit.Rows("17 17 17")) + tk.MustExec(`admin check table t`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1a` VALUES LESS THAN (15),\n" + + " PARTITION `p1b` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + currTbl.Meta().Partition = currPart wait <- true syncOnChanged <- true // This reads the new schema (Schema update completed) @@ -565,11 +640,12 @@ func TestReorgPartitionConcurrent(t *testing.T) { "12 12b 12", "14 14 14", "15 15 15", - "16 16 16")) + "16 16 16", + "17 17 17")) tk.MustExec(`admin check table t`) newInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() - require.Equal(t, int64(1), newInfoSchema.SchemaMetaVersion()-deleteReorgInfoSchema.SchemaMetaVersion()) - oldTbl, err = deleteReorgInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) + require.Equal(t, int64(1), newInfoSchema.SchemaMetaVersion()-publicInfoSchema.SchemaMetaVersion()) + oldTbl, err = publicInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) require.NoError(t, err) partDef = oldTbl.Meta().Partition.Definitions[1] require.Equal(t, "p1a", partDef.Name.O) @@ -587,7 +663,7 @@ func TestReorgPartitionConcurrent(t *testing.T) { " PARTITION `p1a` VALUES LESS THAN (15),\n" + " PARTITION `p1b` VALUES LESS THAN (20),\n" + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) - newTbl, err := deleteReorgInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) + newTbl, err := newInfoSchema.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) require.NoError(t, err) newPart := newTbl.Meta().Partition newTbl.Meta().Partition = oldTbl.Meta().Partition @@ -935,13 +1011,13 @@ func TestPartitionByColumnChecks(t *testing.T) { } func TestPartitionIssue56634(t *testing.T) { - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/updateVersionAndTableInfoErrInStateDeleteReorganization", `return(1)`) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/updateVersionAndTableInfoErrInStateDeleteReorganization", `4*return(1)`) store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) - tk.MustExec("set global tidb_ddl_error_count_limit = 3") tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int)") - tk.MustContainErrMsg("alter table t partition by range(a) (partition p1 values less than (20))", "[ddl:-1]DDL job rollback, error msg: Injected error in StateDeleteReorganization") // should NOT panic + // Changed, since StatePublic can no longer rollback! + tk.MustExec("alter table t partition by range(a) (partition p1 values less than (20))") } diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 3876f217bd29c..b5b509c20c46a 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -845,6 +845,12 @@ func (job *Job) IsRollbackable() bool { job.SchemaState == StateWriteOnly { return false } + case ActionReorganizePartition, ActionRemovePartitioning, ActionAlterTablePartitioning: + if job.SchemaState == StatePublic { + // We will double write until this state, here we will do DeleteOnly on indexes, + // so no-longer rollbackable. + return false + } } return true } diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 0f2b93caac40d..4669a3b9b76b3 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -405,6 +405,12 @@ func GetFinishedTruncateTableArgs(job *Job) (*TruncateTableArgs, error) { return getOrDecodeArgsV2[*TruncateTableArgs](job) } +// TableIDIndexID contains TableID+IndexID of index ranges to be deleted +type TableIDIndexID struct { + TableID int64 + IndexID int64 +} + // TablePartitionArgs is the arguments for table partition related jobs, including: // - ActionAlterTablePartitioning // - ActionRemovePartitioning @@ -420,7 +426,8 @@ type TablePartitionArgs struct { PartInfo *PartitionInfo `json:"part_info,omitempty"` // set on finished - OldPhysicalTblIDs []int64 `json:"old_physical_tbl_ids,omitempty"` + OldPhysicalTblIDs []int64 `json:"old_physical_tbl_ids,omitempty"` + OldGlobalIndexes []TableIDIndexID `json:"old_global_indexes,omitempty"` // runtime info NewPartitionIDs []int64 `json:"-"` @@ -438,7 +445,7 @@ func (a *TablePartitionArgs) getArgsV1(job *Job) []any { func (a *TablePartitionArgs) getFinishedArgsV1(job *Job) []any { intest.Assert(job.Type != ActionAddTablePartition || job.State == JobStateRollbackDone, "add table partition job should not call getFinishedArgsV1 if not rollback") - return []any{a.OldPhysicalTblIDs} + return []any{a.OldPhysicalTblIDs, a.OldGlobalIndexes} } func (a *TablePartitionArgs) decodeV1(job *Job) error { @@ -488,10 +495,11 @@ func GetTablePartitionArgs(job *Job) (*TablePartitionArgs, error) { func GetFinishedTablePartitionArgs(job *Job) (*TablePartitionArgs, error) { if job.Version == JobVersion1 { var oldPhysicalTblIDs []int64 - if err := job.decodeArgs(&oldPhysicalTblIDs); err != nil { + var oldIndexes []TableIDIndexID + if err := job.decodeArgs(&oldPhysicalTblIDs, &oldIndexes); err != nil { return nil, errors.Trace(err) } - return &TablePartitionArgs{OldPhysicalTblIDs: oldPhysicalTblIDs}, nil + return &TablePartitionArgs{OldPhysicalTblIDs: oldPhysicalTblIDs, OldGlobalIndexes: oldIndexes}, nil } return getOrDecodeArgsV2[*TablePartitionArgs](job) } diff --git a/pkg/table/tables/partition.go b/pkg/table/tables/partition.go index 0ddc1b6277238..095f06507cadc 100644 --- a/pkg/table/tables/partition.go +++ b/pkg/table/tables/partition.go @@ -196,14 +196,14 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part default: return ret, nil } - // In StateWriteReorganization we are using the 'old' partition definitions + // In WriteReorganization we are using the 'old' partition definitions // and if any new change happens in DroppingDefinitions, it needs to be done // also in AddingDefinitions (with new evaluation of the new expression) - // In StateDeleteReorganization we are using the 'new' partition definitions + // In DeleteReorganization/Public we are using the 'new' partition definitions // and if any new change happens in AddingDefinitions, it needs to be done // also in DroppingDefinitions (since session running on schema version -1) - // should also see the changes - if pi.DDLState == model.StateDeleteReorganization { + // should also see the changes. + if pi.DDLState == model.StateDeleteReorganization || pi.DDLState == model.StatePublic { // TODO: Explicitly explain the different DDL/New fields! if pi.NewTableID != 0 { ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.DDLType, pi.DDLExpr, pi.DDLColumns, pi.DroppingDefinitions) @@ -224,6 +224,7 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part if err != nil { return nil, err } + p.skipAssert = true partitions[def.ID] = p ret.doubleWritePartitions[def.ID] = nil } @@ -247,6 +248,7 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part if err != nil { return nil, err } + p.skipAssert = true partitions[def.ID] = p } } @@ -1475,18 +1477,23 @@ func (t *partitionedTable) locateReorgPartition(ctx expression.EvalContext, r [] // all partitions will be reorganized, // so we can use the number in Dropping or AddingDefinitions, // depending on current state. - num := len(pi.AddingDefinitions) - if pi.DDLState == model.StateDeleteReorganization { - num = len(pi.DroppingDefinitions) + reorgDefs := pi.AddingDefinitions + switch pi.DDLAction { + case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: + if pi.DDLState == model.StatePublic { + reorgDefs = pi.DroppingDefinitions + } + fallthrough + default: + if pi.DDLState == model.StateDeleteReorganization { + reorgDefs = pi.DroppingDefinitions + } } - idx, err := t.locatePartitionCommon(ctx, pi.DDLType, t.reorgPartitionExpr, uint64(num), columnsSet, r) + idx, err := t.locatePartitionCommon(ctx, pi.DDLType, t.reorgPartitionExpr, uint64(len(reorgDefs)), columnsSet, r) if err != nil { return 0, errors.Trace(err) } - if pi.DDLState == model.StateDeleteReorganization { - return pi.DroppingDefinitions[idx].ID, nil - } - return pi.AddingDefinitions[idx].ID, nil + return reorgDefs[idx].ID, nil } func (t *partitionedTable) locateRangeColumnPartition(ctx expression.EvalContext, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { @@ -1656,7 +1663,9 @@ func GetReorganizedPartitionedTable(t table.Table) (table.PartitionedTable, erro pi.Type = pi.DDLType pi.Expr = pi.DDLExpr pi.Columns = pi.DDLColumns - tblInfo.ID = pi.NewTableID + if pi.NewTableID != 0 { + tblInfo.ID = pi.NewTableID + } constraints, err := table.LoadCheckConstraint(tblInfo) if err != nil { @@ -1759,7 +1768,7 @@ func partitionedTableAddRecord(ctx table.MutateContext, txn kv.Transaction, t *p if err != nil { return } - if t.Meta().Partition.DDLState == model.StateDeleteOnly { + if t.Meta().Partition.DDLState == model.StateDeleteOnly || t.Meta().Partition.DDLState == model.StatePublic { return } if _, ok := t.reorganizePartitions[pid]; ok { @@ -1769,6 +1778,10 @@ func partitionedTableAddRecord(ctx table.MutateContext, txn kv.Transaction, t *p return nil, errors.Trace(err) } tbl = t.getPartition(pid) + if !tbl.Meta().PKIsHandle && !tbl.Meta().IsCommonHandle { + // Preserve the _tidb_rowid also in the new partition! + r = append(r, types.NewIntDatum(recordID.IntValue())) + } recordID, err = tbl.addRecord(ctx, txn, r, opt) if err != nil { return @@ -1893,6 +1906,7 @@ func partitionedTableUpdateRecord(ctx table.MutateContext, txn kv.Transaction, t sh := memBuffer.Staging() defer memBuffer.Cleanup(sh) + deleteOnly := t.Meta().Partition.DDLState == model.StateDeleteOnly || t.Meta().Partition.DDLState == model.StatePublic // The old and new data locate in different partitions. // Remove record from old partition and add record to new partition. if from != to { @@ -1938,7 +1952,7 @@ func partitionedTableUpdateRecord(ctx table.MutateContext, txn kv.Transaction, t return errors.Trace(err) } } - if newTo != 0 && t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly { + if newTo != 0 && !deleteOnly { _, err = t.getPartition(newTo).addRecord(ctx, txn, newData, opt.GetAddRecordOpt()) if err != nil { return errors.Trace(err) @@ -1965,7 +1979,7 @@ func partitionedTableUpdateRecord(ctx table.MutateContext, txn kv.Transaction, t } if newTo == newFrom { tbl = t.getPartition(newTo) - if t.Meta().Partition.DDLState == model.StateDeleteOnly { + if deleteOnly { err = tbl.RemoveRecord(ctx, txn, h, currData) } else { err = tbl.updateRecord(ctx, txn, h, currData, newData, touched, opt) @@ -1981,7 +1995,7 @@ func partitionedTableUpdateRecord(ctx table.MutateContext, txn kv.Transaction, t if err != nil { return errors.Trace(err) } - if t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly { + if !deleteOnly { tbl = t.getPartition(newTo) _, err = tbl.addRecord(ctx, txn, newData, opt.GetAddRecordOpt()) if err != nil { diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index cff833538f04b..79fedefe90b3c 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tblctx" @@ -83,6 +82,9 @@ type TableCommon struct { // recordPrefix and indexPrefix are generated using physicalTableID. recordPrefix kv.Key indexPrefix kv.Key + + // skipAssert is used for partitions that are in WriteOnly/DeleteOnly state. + skipAssert bool } // ResetColumnsCache implements testingKnob interface. @@ -408,31 +410,6 @@ func (t *TableCommon) RecordKey(h kv.Handle) kv.Key { return tablecodec.EncodeRecordKey(t.recordPrefix, h) } -// shouldAssert checks if the partition should be in consistent -// state and can have assertion. -func (t *TableCommon) shouldAssert(level variable.AssertionLevel) bool { - p := t.Meta().Partition - if p != nil { - // This disables asserting during Reorganize Partition. - switch level { - case variable.AssertionLevelFast: - // Fast option, just skip assertion for all partitions. - if p.DDLState != model.StateNone && p.DDLState != model.StatePublic { - return false - } - case variable.AssertionLevelStrict: - // Strict, only disable assertion for intermediate partitions. - // If there were an easy way to get from a TableCommon back to the partitioned table... - for i := range p.AddingDefinitions { - if t.physicalTableID == p.AddingDefinitions[i].ID { - return false - } - } - } - } - return true -} - // UpdateRecord implements table.Table UpdateRecord interface. // `touched` means which columns are really modified, used for secondary indices. // Length of `oldData` and `newData` equals to length of `t.WritableCols()`. @@ -535,10 +512,10 @@ func (t *TableCommon) updateRecord(sctx table.MutateContext, txn kv.Transaction, } }) - if t.shouldAssert(sctx.TxnAssertionLevel()) { - err = txn.SetAssertion(key, kv.SetAssertExist) - } else { + if t.skipAssert { err = txn.SetAssertion(key, kv.SetAssertUnknown) + } else { + err = txn.SetAssertion(key, kv.SetAssertExist) } if err != nil { return err @@ -1238,10 +1215,10 @@ func (t *TableCommon) removeRowData(ctx table.MutateContext, txn kv.Transaction, } } }) - if t.shouldAssert(ctx.TxnAssertionLevel()) { - err = txn.SetAssertion(key, kv.SetAssertExist) - } else { + if t.skipAssert { err = txn.SetAssertion(key, kv.SetAssertUnknown) + } else { + err = txn.SetAssertion(key, kv.SetAssertExist) } if err != nil { return err diff --git a/pkg/tablecodec/tablecodec.go b/pkg/tablecodec/tablecodec.go index ea888ccbb772d..a28c21bb177e0 100644 --- a/pkg/tablecodec/tablecodec.go +++ b/pkg/tablecodec/tablecodec.go @@ -832,7 +832,13 @@ func reEncodeHandleConsiderNewCollation(handle kv.Handle, columns []rowcodec.Col if len(restoreData) == 0 { return cHandleBytes, nil } - return decodeRestoredValuesV5(columns, cHandleBytes, restoreData) + // Remove some extra columns(ID < 0), such like `model.ExtraPhysTblID`. + // They are not belong to common handle and no need to restore data. + idx := len(columns) + for idx > 0 && columns[idx-1].ID < 0 { + idx-- + } + return decodeRestoredValuesV5(columns[:idx], cHandleBytes, restoreData) } func decodeRestoredValues(columns []rowcodec.ColInfo, restoredVal []byte) ([][]byte, error) { diff --git a/tests/integrationtest/r/ddl/db_partition.result b/tests/integrationtest/r/ddl/db_partition.result index 7c1992a7408b0..fb359b6f234d0 100644 --- a/tests/integrationtest/r/ddl/db_partition.result +++ b/tests/integrationtest/r/ddl/db_partition.result @@ -3244,7 +3244,7 @@ id store_id 1 1 select *,_tidb_rowid from t; id store_id _tidb_rowid -0 18 30257 +0 18 30002 1 1 30001 drop table t, t1; create table t (id int not null, store_id int not null ) partition by range (store_id) (partition p0 values less than (6), partition p1 values less than (11), partition p2 values less than (16), partition p3 values less than (21)); @@ -3267,7 +3267,7 @@ id store_id 1 1 select *,_tidb_rowid from t; id store_id _tidb_rowid -0 18 30257 +0 18 30002 1 1 30001 drop table t, t1; drop table if exists aaa; diff --git a/tests/integrationtest/r/globalindex/misc.result b/tests/integrationtest/r/globalindex/misc.result index 89694a3d76fa4..4a6f600e04d2a 100644 --- a/tests/integrationtest/r/globalindex/misc.result +++ b/tests/integrationtest/r/globalindex/misc.result @@ -238,3 +238,12 @@ t CREATE TABLE `t` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin PARTITION BY KEY (`b`) PARTITIONS 3 drop table t; +drop table if exists t; +create table t (a int, b varchar(255), c varchar (255), primary key (a,b), unique key (c) global) partition by list columns (a,b) (partition p0 values in ((1,"1"),(2,"2"),(3,"3")), partition p1 values in ((100,"100"),(101,"101"),(102,"102"),DEFAULT)); +insert into t values (1,1,1),(2,2,2),(101,101,101),(102,102,102); +select * from t; +a b c +1 1 1 +101 101 101 +102 102 102 +2 2 2 diff --git a/tests/integrationtest/t/globalindex/misc.test b/tests/integrationtest/t/globalindex/misc.test index cded23d78282a..b4ef7b441d554 100644 --- a/tests/integrationtest/t/globalindex/misc.test +++ b/tests/integrationtest/t/globalindex/misc.test @@ -152,3 +152,11 @@ show create table t; alter table t partition by key (b) partitions 3; show create table t; drop table t; + +# TestRestoreValuedWithGlobalIndex +drop table if exists t; +create table t (a int, b varchar(255), c varchar (255), primary key (a,b), unique key (c) global) partition by list columns (a,b) (partition p0 values in ((1,"1"),(2,"2"),(3,"3")), partition p1 values in ((100,"100"),(101,"101"),(102,"102"),DEFAULT)); +insert into t values (1,1,1),(2,2,2),(101,101,101),(102,102,102); +--sorted_result +select * from t; +