diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index a30f76c46aafb..9b757ed91fde4 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -123,6 +123,8 @@ type Engine struct { config backend.LocalEngineConfig tableInfo *checkpoints.TidbTableInfo + dupDetectOpt dupDetectOpt + // total size of SST files waiting to be ingested pendingFileSize atomic.Int64 @@ -981,7 +983,7 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter { zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)), zap.Int64("tableID", e.tableInfo.ID), zap.Stringer("engineUUID", e.UUID)) - return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger) + return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt) } func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) { diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index e2cb3a447cfbb..29ed50b743773 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/pebble" sst "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/logutil" "go.uber.org/multierr" @@ -82,6 +83,11 @@ type dupDetectIter struct { writeBatch *pebble.Batch writeBatchSize int64 logger log.Logger + option dupDetectOpt +} + +type dupDetectOpt struct { + reportErrOnDup bool } func (d *dupDetectIter) Seek(key []byte) bool { @@ -149,6 +155,14 @@ func (d *dupDetectIter) Next() bool { d.curVal = append(d.curVal[:0], d.iter.Value()...) return true } + if d.option.reportErrOnDup { + dupKey := make([]byte, len(d.curKey)) + dupVal := make([]byte, len(d.iter.Value())) + copy(dupKey, d.curKey) + copy(dupVal, d.curVal) + d.err = common.ErrFoundDuplicateKeys.FastGenByArgs(dupKey, dupVal) + return false + } if !recordFirst { d.record(d.curRawKey, d.curKey, d.curVal) recordFirst = true @@ -192,7 +206,7 @@ func (d *dupDetectIter) OpType() sst.Pair_OP { var _ Iter = &dupDetectIter{} func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter, - opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger) *dupDetectIter { + opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt dupDetectOpt) *dupDetectIter { newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter} if len(opts.LowerBound) > 0 { newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64) @@ -206,6 +220,7 @@ func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter, keyAdapter: keyAdapter, writeBatch: dupDB.NewBatch(), logger: logger, + option: dupOpt, } } diff --git a/br/pkg/lightning/backend/local/iterator_test.go b/br/pkg/lightning/backend/local/iterator_test.go index 3abb6fbc3b06c..c183963443bae 100644 --- a/br/pkg/lightning/backend/local/iterator_test.go +++ b/br/pkg/lightning/backend/local/iterator_test.go @@ -122,7 +122,7 @@ func TestDupDetectIterator(t *testing.T) { dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{}) require.NoError(t, err) var iter Iter - iter = newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L()) + iter = newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), dupDetectOpt{}) sort.Slice(pairs, func(i, j int) bool { key1 := keyAdapter.Encode(nil, pairs[i].Key, pairs[i].RowID) key2 := keyAdapter.Encode(nil, pairs[j].Key, pairs[j].RowID) @@ -217,7 +217,7 @@ func TestDupDetectIterSeek(t *testing.T) { dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{}) require.NoError(t, err) - iter := newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L()) + iter := newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), dupDetectOpt{}) require.True(t, iter.Seek([]byte{1, 2, 3, 1})) require.Equal(t, pairs[1].Val, iter.Value()) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 6ceb269f80248..4f8ca6bf3117a 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -370,6 +370,7 @@ type local struct { checkTiKVAvaliable bool duplicateDetection bool + duplicateDetectOpt dupDetectOpt duplicateDB *pebble.DB keyAdapter KeyAdapter errorMgr *errormanager.ErrorManager @@ -500,6 +501,7 @@ func NewLocalBackend( engineMemCacheSize: int(cfg.TikvImporter.EngineMemCacheSize), localWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize), duplicateDetection: duplicateDetection, + duplicateDetectOpt: dupDetectOpt{duplicateDetection && cfg.TikvImporter.DuplicateResolution == config.DupeResAlgErr}, checkTiKVAvaliable: cfg.App.CheckRequirements, duplicateDB: duplicateDB, keyAdapter: keyAdapter, @@ -804,6 +806,7 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e config: engineCfg, tableInfo: cfg.TableInfo, duplicateDetection: local.duplicateDetection, + dupDetectOpt: local.duplicateDetectOpt, duplicateDB: local.duplicateDB, errorMgr: local.errorMgr, keyAdapter: local.keyAdapter, @@ -854,6 +857,7 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, tableInfo: cfg.TableInfo, keyAdapter: local.keyAdapter, duplicateDetection: local.duplicateDetection, + dupDetectOpt: local.duplicateDetectOpt, duplicateDB: local.duplicateDB, errorMgr: local.errorMgr, logger: log.FromContext(ctx), diff --git a/br/pkg/lightning/common/errors.go b/br/pkg/lightning/common/errors.go index 1b23ff99fc1b4..14645217636a4 100644 --- a/br/pkg/lightning/common/errors.go +++ b/br/pkg/lightning/common/errors.go @@ -97,6 +97,7 @@ var ( ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus")) ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming")) ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows")) + ErrFoundDuplicateKeys = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey")) ) type withStack struct { diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 7888cd315c357..45d5f1fa334a4 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -384,6 +384,10 @@ const ( // DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the // duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows. DupeResAlgRemove + + // DupeResAlgErr reports an error and stops the import process. + // Note: this value is only used for internal. + DupeResAlgErr ) func (dra *DuplicateResolutionAlgorithm) UnmarshalTOML(v interface{}) error { diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 6f91a3b4deffc..e218ca7b014bc 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2043,19 +2043,21 @@ func (rc *Client) RestoreKVFiles( log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId)) skipFile += len(files) } else { - rc.workerPool.ApplyOnErrorGroup(eg, func() error { + rc.workerPool.ApplyOnErrorGroup(eg, func() (err error) { fileStart := time.Now() defer func() { onProgress(int64(len(files))) updateStats(uint64(kvCount), size) summary.CollectInt("File", len(files)) - filenames := make([]string, 0, len(files)) - for _, f := range files { - filenames = append(filenames, f.Path+", ") + if err == nil { + filenames := make([]string, 0, len(files)) + for _, f := range files { + filenames = append(filenames, f.Path+", ") + } + log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size), + zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames)) } - log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size), - zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames)) }() return rc.fileImporter.ImportKVFiles(ectx, files, rule, rc.shiftStartTS, rc.startTS, rc.restoreTS, supportBatch) @@ -2063,14 +2065,14 @@ func (rc *Client) RestoreKVFiles( } } - if supportBatch { - err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc) - } else { - err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc) - } - if err != nil { + rc.workerPool.ApplyOnErrorGroup(eg, func() error { + if supportBatch { + err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc) + } else { + err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc) + } return errors.Trace(err) - } + }) log.Info("total skip files due to table id not matched", zap.Int("count", skipFile)) if skipFile > 0 { diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 903b721f0a644..8a5cd0425e221 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -393,7 +393,8 @@ func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() { if cfg.PitrBatchSize == 0 { cfg.PitrBatchSize = defaultPiTRBatchSize } - + // another goroutine is used to iterate the backup file + cfg.PitrConcurrency += 1 log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency))) cfg.Config.Concurrency = cfg.PitrConcurrency } diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index e9641c3e2a4c8..747a7c25c0f12 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -53,6 +53,7 @@ go_library( ":ddl_friend", ], deps = [ + "//br/pkg/lightning/common", "//br/pkg/utils", "//config", "//ddl/ingest", diff --git a/ddl/index.go b/ddl/index.go index 49c6ca888cc4b..221005f58c211 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/ingest" @@ -38,6 +39,7 @@ import ( "github.com/pingcap/tidb/parser/charset" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" @@ -836,8 +838,11 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } err = bc.FinishImport(indexInfo.ID, indexInfo.Unique, tbl) if err != nil { - if kv.ErrKeyExists.Equal(err) { + if kv.ErrKeyExists.Equal(err) || common.ErrFoundDuplicateKeys.Equal(err) { logutil.BgLogger().Warn("[ddl] import index duplicate key, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) + if common.ErrFoundDuplicateKeys.Equal(err) { + err = convertToKeyExistsErr(err, indexInfo, tbl.Meta()) + } ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) } else { logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err)) @@ -879,6 +884,22 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } } +func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error { + tErr, ok := errors.Cause(originErr).(*terror.Error) + if !ok { + return originErr + } + if len(tErr.Args()) != 2 { + return originErr + } + key, keyIsByte := tErr.Args()[0].([]byte) + value, valIsByte := tErr.Args()[1].([]byte) + if !keyIsByte || !valIsByte { + return originErr + } + return genKeyExistsErr(key, value, idxInfo, tblInfo) +} + func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} @@ -1246,7 +1267,7 @@ func newAddIndexWorker(decodeColMap map[int64]decoder.Column, id int, t table.Ph if err != nil { return nil, errors.Trace(err) } - lwCtx, err = ei.NewWriterCtx(id) + lwCtx, err = ei.NewWriterCtx(id, indexInfo.Unique) if err != nil { return nil, err } @@ -1485,17 +1506,26 @@ func (w *addIndexWorker) checkHandleExists(key kv.Key, value []byte, handle kv.H if hasBeenBackFilled { return nil } + return genKeyExistsErr(key, value, idxInfo, tblInfo) +} + +func genKeyExistsErr(key, value []byte, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error { + idxColLen := len(idxInfo.Columns) + indexName := fmt.Sprintf("%s.%s", tblInfo.Name.String(), idxInfo.Name.String()) colInfos := tables.BuildRowcodecColInfoForIndexColumns(idxInfo, tblInfo) values, err := tablecodec.DecodeIndexKV(key, value, idxColLen, tablecodec.HandleNotNeeded, colInfos) if err != nil { - return err + logutil.BgLogger().Warn("decode index key value failed", zap.String("index", indexName), + zap.String("key", hex.EncodeToString(key)), zap.String("value", hex.EncodeToString(value)), zap.Error(err)) + return kv.ErrKeyExists.FastGenByArgs(key, indexName) } - indexName := fmt.Sprintf("%s.%s", w.index.TableMeta().Name.String(), w.index.Meta().Name.String()) valueStr := make([]string, 0, idxColLen) for i, val := range values[:idxColLen] { d, err := tablecodec.DecodeColumnValue(val, colInfos[i].Ft, time.Local) if err != nil { - return kv.ErrKeyExists.FastGenByArgs(key.String(), indexName) + logutil.BgLogger().Warn("decode column value failed", zap.String("index", indexName), + zap.String("key", hex.EncodeToString(key)), zap.String("value", hex.EncodeToString(value)), zap.Error(err)) + return kv.ErrKeyExists.FastGenByArgs(key, indexName) } str, err := d.ToString() if err != nil { diff --git a/ddl/ingest/config.go b/ddl/ingest/config.go index e9c1458b1ab0a..7fd251a361939 100644 --- a/ddl/ingest/config.go +++ b/ddl/ingest/config.go @@ -47,7 +47,7 @@ func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config adjustImportMemory(memRoot, cfg) cfg.Checkpoint.Enable = true if unique { - cfg.TikvImporter.DuplicateResolution = config.DupeResAlgRecord + cfg.TikvImporter.DuplicateResolution = config.DupeResAlgErr } else { cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone } diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index c7ed29a71d017..0c9409bf7657e 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -17,6 +17,7 @@ package ingest import ( "context" "strconv" + "sync/atomic" "github.com/google/uuid" "github.com/pingcap/tidb/br/pkg/lightning/backend" @@ -41,6 +42,7 @@ type engineInfo struct { writerCache generic.SyncMap[int, *backend.LocalEngineWriter] memRoot MemRoot diskRoot DiskRoot + rowSeq atomic.Int64 } // NewEngineInfo create a new EngineInfo struct. @@ -144,17 +146,18 @@ func (ei *engineInfo) ImportAndClean() error { // WriterContext is used to keep a lightning local writer for each backfill worker. type WriterContext struct { ctx context.Context + rowSeq func() int64 lWrite *backend.LocalEngineWriter } -func (ei *engineInfo) NewWriterCtx(id int) (*WriterContext, error) { +func (ei *engineInfo) NewWriterCtx(id int, unique bool) (*WriterContext, error) { ei.memRoot.RefreshConsumption() ok := ei.memRoot.CheckConsume(StructSizeWriterCtx) if !ok { return nil, genEngineAllocMemFailedErr(ei.memRoot, ei.jobID, ei.indexID) } - wCtx, err := ei.newWriterContext(id) + wCtx, err := ei.newWriterContext(id, unique) if err != nil { logutil.BgLogger().Error(LitErrCreateContextFail, zap.Error(err), zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID), @@ -175,7 +178,7 @@ func (ei *engineInfo) NewWriterCtx(id int) (*WriterContext, error) { // If local writer not exist, then create new one and store it into engine info writer cache. // note: operate ei.writeCache map is not thread safe please make sure there is sync mechanism to // make sure the safe. -func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) { +func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContext, error) { lWrite, exist := ei.writerCache.Load(workerID) if !exist { var err error @@ -186,10 +189,16 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) { // Cache the local writer. ei.writerCache.Store(workerID, lWrite) } - return &WriterContext{ + wc := &WriterContext{ ctx: ei.ctx, lWrite: lWrite, - }, nil + } + if unique { + wc.rowSeq = func() int64 { + return ei.rowSeq.Add(1) + } + } + return wc, nil } func (ei *engineInfo) closeWriters() error { @@ -213,6 +222,9 @@ func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error { kvs := make([]common.KvPair, 1) kvs[0].Key = key kvs[0].Val = idxVal + if wCtx.rowSeq != nil { + kvs[0].RowID = wCtx.rowSeq() + } row := kv.MakeRowsFromKvPairs(kvs) return wCtx.lWrite.WriteRows(wCtx.ctx, nil, row) } diff --git a/errors.toml b/errors.toml index e2708289f132c..f6f484984f8b5 100644 --- a/errors.toml +++ b/errors.toml @@ -526,6 +526,11 @@ error = ''' encode kv error in file %s at offset %d ''' +["Lightning:Restore:ErrFoundDuplicateKey"] +error = ''' +found duplicate key '%s', value '%s' +''' + ["Lightning:Restore:ErrInvalidMetaStatus"] error = ''' invalid meta status: '%s' diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 6843ef6149749..07b54089395da 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -379,6 +379,32 @@ func TestAddIndexIngestPanicOnCopRead(t *testing.T) { require.True(t, strings.Contains(jobTp, "txn-merge"), jobTp) } +func TestAddIndexIngestUniqueKey(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + + tk.MustExec("create table t (a int primary key, b int);") + tk.MustExec("insert into t values (1, 1), (10000, 1);") + tk.MustExec("split table t by (5000);") + tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'") + + tk.MustExec("drop table t;") + tk.MustExec("create table t (a varchar(255) primary key, b int);") + tk.MustExec("insert into t values ('a', 1), ('z', 1);") + tk.MustExec("split table t by ('m');") + tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'") + + tk.MustExec("drop table t;") + tk.MustExec("create table t (a varchar(255) primary key, b int, c char(5));") + tk.MustExec("insert into t values ('a', 1, 'c1'), ('d', 2, 'c1'), ('x', 1, 'c2'), ('z', 1, 'c1');") + tk.MustExec("split table t by ('m');") + tk.MustGetErrMsg("alter table t add unique index idx(b, c);", "[kv:1062]Duplicate entry '1-c1' for key 't.idx'") +} + func TestAddIndexIngestCancel(t *testing.T) { store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) tk := testkit.NewTestKit(t, store)