From b0015eefbe84e2ee419a901bde88d062119e85c8 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 5 Jan 2022 13:18:09 +0800 Subject: [PATCH 01/19] tmpo --- br/pkg/lightning/backend/kv/sql2kv.go | 3 +-- br/pkg/lightning/backend/local/local.go | 1 + br/pkg/lightning/checkpoints/tidb.go | 1 + ddl/index.go | 15 ++++++++++++--- ddl/sst/index.go | 4 +++- 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index fd18aed57d225..c416f7ccd0c9c 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -288,13 +288,12 @@ type KvPairs struct { memBuf *kvMemBuf } -func NewKvPairs(kvs []common.KvPair)*KvPairs{ +func NewKvPairs(kvs []common.KvPair) *KvPairs { return &KvPairs{ pairs: kvs, } } - // MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is // mainly used for testing only. The resulting Rows instance should only be used // for the importer backend. diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 88c3c2df5dd09..04c1977263a9a 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1310,6 +1310,7 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e }) engine := e.(*File) engine.db = db + engine.TS = cfg.TableInfo.TSO engine.sstIngester = dbSSTIngester{e: engine} if err = engine.loadEngineMeta(); err != nil { return errors.Trace(err) diff --git a/br/pkg/lightning/checkpoints/tidb.go b/br/pkg/lightning/checkpoints/tidb.go index d68bd68fd1595..4241f1097726a 100644 --- a/br/pkg/lightning/checkpoints/tidb.go +++ b/br/pkg/lightning/checkpoints/tidb.go @@ -25,6 +25,7 @@ type TidbDBInfo struct { type TidbTableInfo struct { ID int64 + TSO uint64 DB string Name string Core *model.TableInfo diff --git a/ddl/index.go b/ddl/index.go index 805aa3c3f4c25..a2bc340d7a50f 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -17,11 +17,12 @@ package ddl import ( "bytes" "context" - "github.com/pingcap/tidb/ddl/sst" "strings" "sync/atomic" "time" + "github.com/pingcap/tidb/ddl/sst" + tableutil "github.com/pingcap/tidb/table/tables/util" "github.com/pingcap/errors" @@ -552,8 +553,16 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo job.SnapshotVer = 0 job.SchemaState = model.StateWriteReorganization case model.StateWriteReorganization: + currentVer, err := d.store.CurrentVersion(kv.GlobalTxnScope) + currentTS := job.StartTS + if err == nil { + currentTS = currentVer.Ver + } // TODO: optimize index ddl. - sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, job.StartTS}) + err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, currentTS}) + if err != nil { + return ver, errors.Trace(err) + } // reorganization -> public tbl, err := getTable(d.store, schemaID, tblInfo) if err != nil { @@ -592,7 +601,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo return ver, errors.Trace(err) } // TODO: optimize index ddl. - err = sst.FinishIndexOp(w.ctx, job.StartTS) + err = sst.FinishIndexOp(w.ctx, currentTS) if err != nil { logutil.BgLogger().Error("FinishIndexOp err" + err.Error()) } diff --git a/ddl/sst/index.go b/ddl/sst/index.go index 18c3cc4407e28..b67dd630160e4 100644 --- a/ddl/sst/index.go +++ b/ddl/sst/index.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "fmt" + "github.com/twmb/murmur3" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -12,7 +14,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/log" tidbcfg "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/util/logutil" - "github.com/twmb/murmur3" ) func InitIndexOptimize() { @@ -48,6 +49,7 @@ func PrepareIndexOp(ctx context.Context, ddl DDLInfo) error { } cpt := checkpoints.TidbTableInfo{ genNextTblId(), + ddl.StartTs, ddl.Schema, ddl.Table.Name.String(), ddl.Table, From 6d8b736f72e89aaab9face1313e2dbc0b0da174b Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 5 Jan 2022 16:52:00 +0800 Subject: [PATCH 02/19] update code --- ddl/index.go | 14 ++------------ ddl/sst/engine.go | 20 ++++++-------------- ddl/sst/index.go | 44 ++------------------------------------------ 3 files changed, 10 insertions(+), 68 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 2fbb5cebe4a37..25fb8e34dd21c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -21,8 +21,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/tidb/ddl/sst" - tableutil "github.com/pingcap/tidb/table/tables/util" "github.com/pingcap/errors" @@ -562,7 +560,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } // TODO: optimize index ddl. if *sst.IndexDDLLightning { - err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, job.StartTS}) + err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, currentTS}) if err != nil { return ver, errors.Trace(fmt.Errorf("PrepareIndexOp err:%w", err)) } @@ -595,7 +593,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo if err != nil { logutil.BgLogger().Error("FinishIndexOp err1" + err.Error()) } else { - err = sst.FinishIndexOp(w.ctx, job.StartTS, ctx.(sqlexec.RestrictedSQLExecutor)) + err = sst.FinishIndexOp(w.ctx, currentTS, ctx.(sqlexec.RestrictedSQLExecutor)) if err != nil { logutil.BgLogger().Error("FinishIndexOp err2" + err.Error()) } @@ -617,13 +615,6 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo w.reorgCtx.cleanNotifyReorgCancel() return ver, errors.Trace(err) } - if *sst.IndexDDLLightning { - // TODO: optimize index ddl. - err = sst.FinishIndexOp(w.ctx, currentTS) - if err != nil { - logutil.BgLogger().Error("FinishIndexOp err" + err.Error()) - } - } // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. w.reorgCtx.cleanNotifyReorgCancel() @@ -1383,7 +1374,6 @@ func (w *addIndexWorker) backfillDataInTxnByRead(handleRange reorgBackfillTask) taskCtx.addedCount++ } errInTxn = sst.FlushKeyValSync(context.TODO(), w.jobStartTs, w.wc) - w.wc.Reset() if errInTxn != nil { sst.LogError("FlushKeyValSync %d paris err:%s.", len(w.wc.Fetch()), errInTxn.Error()) } diff --git a/ddl/sst/engine.go b/ddl/sst/engine.go index e799ff81f779a..de13bc327b0a1 100644 --- a/ddl/sst/engine.go +++ b/ddl/sst/engine.go @@ -2,19 +2,21 @@ package sst import ( "context" - "github.com/pingcap/tidb/parser/model" "sync" "sync/atomic" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/common" ) type engineInfo struct { - *backend.OpenedEngine - writer *backend.LocalEngineWriter - cfg *backend.EngineConfig + OpenedEngine *backend.OpenedEngine + writer *backend.LocalEngineWriter + cfg *backend.EngineConfig // TODO: use channel later; ref int32 kvs []common.KvPair @@ -27,16 +29,6 @@ func (ei *engineInfo) ResetCache() { ei.size = 0 } -func (ei *engineInfo) pushKV(k, v []byte) { - klen := len(k) - dlen := klen + len(v) - ei.size = ei.size + dlen - buf := make([]byte, dlen) - copy(buf[:klen], k) - copy(buf[klen:], v) - ei.kvs = append(ei.kvs, common.KvPair{Key: buf[:klen], Val: buf[klen:]}) -} - func (ec *engineCache) put(startTs uint64, cfg *backend.EngineConfig, en *backend.OpenedEngine, tbl *model.TableInfo) { ec.mtx.Lock() ec.cache[startTs] = &engineInfo{ diff --git a/ddl/sst/index.go b/ddl/sst/index.go index 15c9d33639ead..3784c6c1537e7 100644 --- a/ddl/sst/index.go +++ b/ddl/sst/index.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "encoding/json" "fmt" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/util/sqlexec" @@ -72,47 +73,6 @@ func PrepareIndexOp(ctx context.Context, ddl DDLInfo) error { return nil } -func IndexOperator(ctx context.Context, startTs uint64, k, v []byte) error { - LogFatal("IndexOperator logic error") - LogDebug("IndexOperator '%x','%x'", k, v) - ei, err := ec.getEngineInfo(startTs) - if err != nil { - return err - } - defer ec.releaseRef(startTs) - // - - ei.pushKV(k, v) - kvSize := ei.size - if kvSize < flush_size { - return nil - } - // - return flushKvs(ctx, ei) -} - -func flushKvs(ctx context.Context, ei *engineInfo) error { - if len(ei.kvs) <= 0 { - return nil - } - if ctx == nil { - // this may be nil,and used in WriteRows; - ctx = context.TODO() - } - LogInfo("flushKvs (%d)", len(ei.kvs)) - lw, err := ei.getWriter() - if err != nil { - return fmt.Errorf("IndexOperator.getWriter err:%w", err) - } - - err = lw.WriteRows(ctx, nil, kv.NewKvPairs(ei.kvs)) - if err != nil { - return fmt.Errorf("IndexOperator.WriteRows err:%w", err) - } - ei.ResetCache() - return nil -} - func FlushKeyValSync(ctx context.Context, startTs uint64, cache *WorkerKVCache) error { ec.mtx.RLock() ei, ok := ec.cache[startTs] @@ -129,6 +89,7 @@ func FlushKeyValSync(ctx context.Context, startTs uint64, cache *WorkerKVCache) return fmt.Errorf("IndexOperator.WriteRows err:%w", err) } ei.size += cache.Size() + cache.Reset() return nil } @@ -169,7 +130,6 @@ func FinishIndexOp(ctx context.Context, startTs uint64, exec sqlexec.RestrictedS return err } defer ec.releaseRef(startTs) - flushKvs(ctx, ei) // LogInfo("FinishIndexOp %d;kvs=%d.", startTs, ei.size) // From 0bb72963c2bc4f910c5cc92068ebd32d3488ab98 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 6 Jan 2022 16:48:46 +0800 Subject: [PATCH 03/19] support duplicate dectect for unique index --- br/pkg/lightning/backend/local/duplicate.go | 6 +++ br/pkg/lightning/backend/local/iterator.go | 7 ++++ br/pkg/lightning/backend/local/local.go | 4 ++ br/pkg/lightning/config/config.go | 9 +++- ddl/index.go | 6 ++- ddl/sst/common.go | 13 ++++-- ddl/sst/engine.go | 4 +- ddl/sst/index.go | 46 +++++++++++++++++---- 8 files changed, 78 insertions(+), 17 deletions(-) diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index fefc914fe523c..42232125b3638 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -75,6 +75,7 @@ type DuplicateManager struct { keyAdapter KeyAdapter remoteWorkerPool *utilpool.WorkerPool opts *kv.SessionOptions + duplicateAbort bool } type pendingIndexHandles struct { @@ -192,6 +193,7 @@ func NewDuplicateManager(local *local, ts uint64, opts *kv.SessionOptions) (*Dup regionConcurrency: local.tcpConcurrency, splitCli: local.splitCli, tikvCli: local.tikvCli, + duplicateAbort: local.duplicateAbort, keyAdapter: duplicateKeyAdapter{}, ts: ts, connPool: common.NewGRPCConns(), @@ -327,6 +329,10 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, } hasErr = true } + if manager.duplicateAbort && len(resp.Pairs) > 0 { + hasDupe.Store(true) + return errors.Errorf("found duplicate key %s", resp.Pairs[0]) + } if hasErr || resp.GetKeyError() != nil { r, err := manager.splitCli.GetRegionByID(ctx, region.Region.GetId()) diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index 8a04260a9ab0d..4ce39dff56282 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -19,6 +19,7 @@ import ( "context" "github.com/cockroachdb/pebble" + "github.com/pingcap/errors" sst "github.com/pingcap/kvproto/pkg/import_sstpb" "go.uber.org/multierr" "go.uber.org/zap" @@ -60,6 +61,7 @@ type duplicateIter struct { writeBatch *pebble.Batch writeBatchSize int64 logger log.Logger + duplicateAbort bool } func (d *duplicateIter) Seek(key []byte) bool { @@ -124,6 +126,11 @@ func (d *duplicateIter) Next() bool { d.curVal = append(d.curVal[:0], d.iter.Value()...) return true } + if d.duplicateAbort { + d.err = errors.Errorf("found duplicate key %s", d.curKey) + return false + } + d.logger.Debug("[detect-dupe] local duplicate key detected", logutil.Key("key", d.curKey), logutil.Key("prevValue", d.curVal), diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 04c1977263a9a..7fda7fb6234a6 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -205,6 +205,7 @@ type File struct { keyAdapter KeyAdapter duplicateDetection bool + duplicateAbort bool duplicateDB *pebble.DB errorMgr *errormanager.ErrorManager } @@ -825,6 +826,7 @@ type local struct { checkTiKVAvaliable bool duplicateDetection bool + duplicateAbort bool duplicateDB *pebble.DB errorMgr *errormanager.ErrorManager } @@ -976,6 +978,7 @@ func NewLocalBackend( engineMemCacheSize: int(cfg.TikvImporter.EngineMemCacheSize), localWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize), duplicateDetection: cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone, + duplicateAbort: cfg.TikvImporter.DuplicateResolution == config.DupeResAlgAbort, checkTiKVAvaliable: cfg.App.CheckRequirements, duplicateDB: duplicateDB, errorMgr: errorMgr, @@ -1304,6 +1307,7 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e config: engineCfg, tableInfo: cfg.TableInfo, duplicateDetection: local.duplicateDetection, + duplicateAbort: local.duplicateAbort, duplicateDB: local.duplicateDB, errorMgr: local.errorMgr, keyAdapter: keyAdapter, diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 8f2e6f2dfa9ac..f8bbbc2e69cb9 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -35,12 +35,13 @@ import ( "github.com/pingcap/errors" filter "github.com/pingcap/tidb-tools/pkg/table-filter" router "github.com/pingcap/tidb-tools/pkg/table-router" + "go.uber.org/atomic" + "go.uber.org/zap" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" tidbcfg "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/mysql" - "go.uber.org/atomic" - "go.uber.org/zap" ) const ( @@ -367,6 +368,10 @@ const ( // DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the // duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows. DupeResAlgRemove + + // DupeResAlgAbort records all duplicate records like the 'record' algorithm and remove all information related to the + // duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows. + DupeResAlgAbort ) func (dra *DuplicateResolutionAlgorithm) UnmarshalTOML(v interface{}) error { diff --git a/ddl/index.go b/ddl/index.go index 25fb8e34dd21c..19ea9715039a2 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -560,7 +560,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } // TODO: optimize index ddl. if *sst.IndexDDLLightning { - err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, currentTS}) + err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, currentTS, indexInfo.Unique}) if err != nil { return ver, errors.Trace(fmt.Errorf("PrepareIndexOp err:%w", err)) } @@ -592,10 +592,12 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo ctx, err := w.sessPool.get() if err != nil { logutil.BgLogger().Error("FinishIndexOp err1" + err.Error()) + return ver, errors.Trace(err) } else { - err = sst.FinishIndexOp(w.ctx, currentTS, ctx.(sqlexec.RestrictedSQLExecutor)) + err = sst.FinishIndexOp(w.ctx, currentTS, ctx.(sqlexec.RestrictedSQLExecutor), tbl, indexInfo.Unique) if err != nil { logutil.BgLogger().Error("FinishIndexOp err2" + err.Error()) + return ver, errors.Trace(err) } } } diff --git a/ddl/sst/common.go b/ddl/sst/common.go index 918c383d46200..9a2913f84600c 100644 --- a/ddl/sst/common.go +++ b/ddl/sst/common.go @@ -70,6 +70,7 @@ type DDLInfo struct { Schema string Table *model.TableInfo StartTs uint64 + Unique bool } func genNextTblId() int64 { @@ -115,7 +116,7 @@ func (_ glue_) Record(string, uint64) { } -func generateLightningConfig(info ClusterInfo) *config.Config { +func generateLightningConfig(info ClusterInfo, unique bool) *config.Config { cfg := config.Config{} cfg.DefaultVarsForImporterAndLocalBackend() name, err := ioutil.TempDir(*sortkv, "lightning") @@ -128,15 +129,19 @@ func generateLightningConfig(info ClusterInfo) *config.Config { // cfg.TikvImporter.RangeConcurrency = 32 cfg.Checkpoint.Enable = false cfg.TikvImporter.SortedKVDir = name - cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone + if unique { + cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone + } else { + cfg.TikvImporter.DuplicateResolution = config.DupeResAlgAbort + } cfg.TiDB.PdAddr = info.PdAddr cfg.TiDB.Host = "127.0.0.1" cfg.TiDB.StatusPort = int(info.Status) return &cfg } -func createLocalBackend(ctx context.Context, info ClusterInfo) (backend.Backend, error) { - cfg := generateLightningConfig(info) +func createLocalBackend(ctx context.Context, info ClusterInfo, unique bool) (backend.Backend, error) { + cfg := generateLightningConfig(info, unique) tls, err := cfg.ToTLS() if err != nil { return backend.Backend{}, err diff --git a/ddl/sst/engine.go b/ddl/sst/engine.go index e8f29d80112eb..23890a7e7b5c3 100644 --- a/ddl/sst/engine.go +++ b/ddl/sst/engine.go @@ -14,6 +14,7 @@ import ( ) type engineInfo struct { + backend backend.Backend OpenedEngine *backend.OpenedEngine writer *backend.LocalEngineWriter cfg *backend.EngineConfig @@ -29,9 +30,10 @@ func (ei *engineInfo) ResetCache() { ei.size = 0 } -func (ec *engineCache) put(startTs uint64, cfg *backend.EngineConfig, en *backend.OpenedEngine, tbl *model.TableInfo) { +func (ec *engineCache) put(startTs uint64, cfg *backend.EngineConfig, be backend.Backend, en *backend.OpenedEngine, tbl *model.TableInfo) { ec.mtx.Lock() ec.cache[startTs] = &engineInfo{ + be, en, nil, cfg, diff --git a/ddl/sst/index.go b/ddl/sst/index.go index 8ce7d71cc6749..9d41913169b5e 100644 --- a/ddl/sst/index.go +++ b/ddl/sst/index.go @@ -6,7 +6,12 @@ import ( "encoding/json" "fmt" + "github.com/pingcap/errors" + "go.uber.org/zap" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/sqlexec" "github.com/twmb/murmur3" @@ -46,7 +51,7 @@ func PrepareIndexOp(ctx context.Context, ddl DDLInfo) error { LogInfo("PrepareIndexOp %+v", ddl) // err == ErrNotFound info := cluster - be, err := createLocalBackend(ctx, info) + be, err := createLocalBackend(ctx, info, ddl.Unique) if err != nil { LogFatal("PrepareIndexOp.createLocalBackend err:%s.", err.Error()) return fmt.Errorf("PrepareIndexOp.createLocalBackend err:%w", err) @@ -67,9 +72,9 @@ func PrepareIndexOp(ctx context.Context, ddl DDLInfo) error { h.Write(b[:]) en, err := be.OpenEngine(ctx, &cfg, ddl.Table.Name.String(), int32(h.Sum32())) if err != nil { - return fmt.Errorf("PrepareIndexOp.OpenEngine err:%w", err) + return errors.Errorf("PrepareIndexOp.OpenEngine err:%v", err) } - ec.put(ddl.StartTs, &cfg, en, ddl.Table) + ec.put(ddl.StartTs, &cfg, be, en, ddl.Table) return nil } @@ -124,12 +129,15 @@ func fetchTableRegionSizeStats(tblId int64, exec sqlexec.RestrictedSQLExecutor) return ret, nil } -func FinishIndexOp(ctx context.Context, startTs uint64, exec sqlexec.RestrictedSQLExecutor) error { +func FinishIndexOp(ctx context.Context, startTs uint64, exec sqlexec.RestrictedSQLExecutor, tbl table.Table, unique bool) error { ei, err := ec.getEngineInfo(startTs) if err != nil { return err } - defer ec.releaseRef(startTs) + defer func() { + ec.releaseRef(startTs) + ec.ReleaseEngine(startTs) + }() // LogInfo("FinishIndexOp %d;kvs=%d.", startTs, ei.size) // @@ -147,19 +155,41 @@ func FinishIndexOp(ctx context.Context, startTs uint64, exec sqlexec.RestrictedS // closeEngine, err1 := indexEngine.Close(ctx, cfg) if err1 != nil { - return fmt.Errorf("engine.Close err:%w", err1) + return errors.Errorf("engine.Close err:%v", err1) } // use default value first; err = closeEngine.Import(ctx, int64(config.SplitRegionSize)) if err != nil { - return fmt.Errorf("engine.Import err:%w", err) + return errors.Errorf("engine.Import err:%v", err) } err = closeEngine.Cleanup(ctx) if err != nil { - return fmt.Errorf("engine.Cleanup err:%w", err) + return errors.Errorf("engine.Cleanup err:%v", err) + } + if unique { + hasDupe, err := ei.backend.CollectRemoteDuplicateRows(ctx, tbl, ei.tbl.Name.O, &kv.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + SysVars: defaultImportantVariables, + }) + if hasDupe { + return errors.Errorf("unique index conflicts detected: %v", err) + } else if err != nil { + logutil.BgLogger().Error("fail to detect unique index conflicts, unknown index status", zap.Error(err)) + return errors.Errorf("fail to detect unique index conflicts, unknown index status %v", err) + } } // should release before ReleaseEngine ec.releaseRef(startTs) ec.ReleaseEngine(startTs) return nil } + +var defaultImportantVariables = map[string]string{ + "max_allowed_packet": "67108864", + "div_precision_increment": "4", + "time_zone": "SYSTEM", + "lc_time_names": "en_US", + "default_week_format": "0", + "block_encryption_mode": "aes-128-ecb", + "group_concat_max_len": "1024", +} From 778a03a942a6eeaebbeabffbaa18339d823237ea Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Jan 2022 17:36:34 +0800 Subject: [PATCH 04/19] block batch check for lightning --- ddl/index.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 19ea9715039a2..60c41a99513f9 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1342,10 +1342,11 @@ func (w *addIndexWorker) backfillDataInTxnByRead(handleRange reorgBackfillTask) taskCtx.nextKey = nextKey taskCtx.done = taskDone - err = w.batchCheckUniqueKey(txn, idxRecords) - if err != nil { - return taskCtx, errors.Trace(err) - } + // lightning has its own uk detect + //err = w.batchCheckUniqueKey(txn, idxRecords) + //if err != nil { + // return taskCtx, errors.Trace(err) + //} for _, idxRecord := range idxRecords { taskCtx.scanCount++ From b925b902253bab10e64bb7b4b19aa3a30d2a2792 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Jan 2022 18:06:22 +0800 Subject: [PATCH 05/19] abort retry --- br/pkg/lightning/backend/local/duplicate.go | 2 +- br/pkg/lightning/backend/local/iterator.go | 4 ++-- ddl/index.go | 4 ++-- ddl/sst/index.go | 12 ++++++------ 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 42232125b3638..45ee6230fc0ca 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -331,7 +331,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, } if manager.duplicateAbort && len(resp.Pairs) > 0 { hasDupe.Store(true) - return errors.Errorf("found duplicate key %s", resp.Pairs[0]) + return tidbkv.ErrKeyExists.FastGenByArgs(resp.Pairs[0].String(), "unknown") } if hasErr || resp.GetKeyError() != nil { diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index 4ce39dff56282..8d3e656f5e06f 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -19,7 +19,6 @@ import ( "context" "github.com/cockroachdb/pebble" - "github.com/pingcap/errors" sst "github.com/pingcap/kvproto/pkg/import_sstpb" "go.uber.org/multierr" "go.uber.org/zap" @@ -28,6 +27,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/logutil" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/codec" ) @@ -127,7 +127,7 @@ func (d *duplicateIter) Next() bool { return true } if d.duplicateAbort { - d.err = errors.Errorf("found duplicate key %s", d.curKey) + d.err = tidbkv.ErrKeyExists.FastGenByArgs(d.curKey, "unknown") return false } diff --git a/ddl/index.go b/ddl/index.go index 60c41a99513f9..221f72c1f5a67 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1378,12 +1378,12 @@ func (w *addIndexWorker) backfillDataInTxnByRead(handleRange reorgBackfillTask) } errInTxn = sst.FlushKeyValSync(context.TODO(), w.jobStartTs, w.wc) if errInTxn != nil { - sst.LogError("FlushKeyValSync %d paris err:%s.", len(w.wc.Fetch()), errInTxn.Error()) + sst.LogError("FlushKeyValSync %d paris err: %v.", len(w.wc.Fetch()), errInTxn.Error()) } // sst.LogInfo("handleRange=%s(%s -> %s); %x -> %x.", // handleRange.String(), handleRange.startKey, handleRange.endKey, minKey, maxKey) logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000) - return taskCtx, errInTxn + return taskCtx, errors.Trace(errInTxn) } // BackfillDataInTxn will backfill table index in a transaction, lock corresponding rowKey, if the value of rowKey is changed, diff --git a/ddl/sst/index.go b/ddl/sst/index.go index 9d41913169b5e..d842fd098e46b 100644 --- a/ddl/sst/index.go +++ b/ddl/sst/index.go @@ -54,7 +54,7 @@ func PrepareIndexOp(ctx context.Context, ddl DDLInfo) error { be, err := createLocalBackend(ctx, info, ddl.Unique) if err != nil { LogFatal("PrepareIndexOp.createLocalBackend err:%s.", err.Error()) - return fmt.Errorf("PrepareIndexOp.createLocalBackend err:%w", err) + return errors.Errorf("PrepareIndexOp.createLocalBackend err:%w", err) } cpt := checkpoints.TidbTableInfo{ genNextTblId(), @@ -87,11 +87,11 @@ func FlushKeyValSync(ctx context.Context, startTs uint64, cache *WorkerKVCache) } lw, err := ei.getWriter() if err != nil { - return fmt.Errorf("IndexOperator.getWriter err:%w", err) + return errors.Errorf("IndexOperator.getWriter err:%v", err) } err = lw.WriteRows(ctx, nil, kv.NewKvPairs(cache.Fetch())) if err != nil { - return fmt.Errorf("IndexOperator.WriteRows err:%w", err) + return errors.Errorf("IndexOperator.WriteRows err:%v", err) } ei.size += cache.Size() cache.Reset() @@ -103,11 +103,11 @@ func fetchTableRegionSizeStats(tblId int64, exec sqlexec.RestrictedSQLExecutor) query := "SELECT REGION_ID, APPROXIMATE_SIZE FROM information_schema.TIKV_REGION_STATUS WHERE TABLE_ID = %?" sn, err := exec.ParseWithParams(context.TODO(), query, tblId) if err != nil { - return nil, fmt.Errorf("ParseWithParams err:%w", err) + return nil, errors.Errorf("ParseWithParams err: %v", err) } rows, _, err := exec.ExecRestrictedStmt(context.TODO(), sn) if err != nil { - return nil, fmt.Errorf("ExecRestrictedStmt err:%w", err) + return nil, errors.Errorf("ExecRestrictedStmt err: %v", err) } // parse values; ret = make(map[uint64]int64, len(rows)) @@ -117,7 +117,7 @@ func fetchTableRegionSizeStats(tblId int64, exec sqlexec.RestrictedSQLExecutor) ) for idx, row := range rows { if 2 != row.Len() { - return nil, fmt.Errorf("row %d has %d fields", idx, row.Len()) + return nil, errors.Errorf("row %d has %d fields", idx, row.Len()) } regionID = row.GetUint64(0) size = row.GetInt64(1) From adb33ce64d92351e58f21e35652a89fda0f13af0 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Jan 2022 22:13:48 +0800 Subject: [PATCH 06/19] add abort duplicate --- br/pkg/lightning/backend/local/iterator.go | 13 +++++++------ ddl/index.go | 3 +-- ddl/sst/index.go | 14 +++++++------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index 8d3e656f5e06f..b265c9d4e889f 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -190,12 +190,13 @@ func newDuplicateIter(ctx context.Context, engineFile *File, opts *pebble.IterOp zap.Int64("tableID", engineFile.tableInfo.ID), zap.Stringer("engineUUID", engineFile.UUID)) return &duplicateIter{ - ctx: ctx, - iter: engineFile.db.NewIter(newOpts), - engineFile: engineFile, - keyAdapter: engineFile.keyAdapter, - writeBatch: engineFile.duplicateDB.NewBatch(), - logger: logger, + ctx: ctx, + iter: engineFile.db.NewIter(newOpts), + engineFile: engineFile, + keyAdapter: engineFile.keyAdapter, + writeBatch: engineFile.duplicateDB.NewBatch(), + logger: logger, + duplicateAbort: engineFile.duplicateAbort, } } diff --git a/ddl/index.go b/ddl/index.go index 221f72c1f5a67..e97ca22dbc86c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -16,7 +16,6 @@ package ddl import ( "context" - "fmt" "strings" "sync/atomic" "time" @@ -562,7 +561,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo if *sst.IndexDDLLightning { err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, currentTS, indexInfo.Unique}) if err != nil { - return ver, errors.Trace(fmt.Errorf("PrepareIndexOp err:%w", err)) + return ver, errors.Annotate(err, "PrepareIndexOp err") } } diff --git a/ddl/sst/index.go b/ddl/sst/index.go index d842fd098e46b..ca9e85dc1574d 100644 --- a/ddl/sst/index.go +++ b/ddl/sst/index.go @@ -87,11 +87,11 @@ func FlushKeyValSync(ctx context.Context, startTs uint64, cache *WorkerKVCache) } lw, err := ei.getWriter() if err != nil { - return errors.Errorf("IndexOperator.getWriter err:%v", err) + return errors.Annotate(err, "IndexOperator.getWriter err") } err = lw.WriteRows(ctx, nil, kv.NewKvPairs(cache.Fetch())) if err != nil { - return errors.Errorf("IndexOperator.WriteRows err:%v", err) + return errors.Annotate(err, "IndexOperator.WriteRows err") } ei.size += cache.Size() cache.Reset() @@ -155,16 +155,16 @@ func FinishIndexOp(ctx context.Context, startTs uint64, exec sqlexec.RestrictedS // closeEngine, err1 := indexEngine.Close(ctx, cfg) if err1 != nil { - return errors.Errorf("engine.Close err:%v", err1) + return errors.Annotate(err, "engine.Close err") } // use default value first; err = closeEngine.Import(ctx, int64(config.SplitRegionSize)) if err != nil { - return errors.Errorf("engine.Import err:%v", err) + return errors.Annotate(err, "engine.Import err") } err = closeEngine.Cleanup(ctx) if err != nil { - return errors.Errorf("engine.Cleanup err:%v", err) + return errors.Annotate(err, "engine.Cleanup err:%v") } if unique { hasDupe, err := ei.backend.CollectRemoteDuplicateRows(ctx, tbl, ei.tbl.Name.O, &kv.SessionOptions{ @@ -172,10 +172,10 @@ func FinishIndexOp(ctx context.Context, startTs uint64, exec sqlexec.RestrictedS SysVars: defaultImportantVariables, }) if hasDupe { - return errors.Errorf("unique index conflicts detected: %v", err) + return errors.Annotate(err, "unique index conflicts detected") } else if err != nil { logutil.BgLogger().Error("fail to detect unique index conflicts, unknown index status", zap.Error(err)) - return errors.Errorf("fail to detect unique index conflicts, unknown index status %v", err) + return errors.Annotate(err, "fail to detect unique index conflicts, unknown index status") } } // should release before ReleaseEngine From 8ad1b49f9f9fd76ca8498f02f1cdf14aa6cb0a48 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Jan 2022 22:31:54 +0800 Subject: [PATCH 07/19] add log --- ddl/ddl_worker.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index ee61c88d6ed18..eb09bb0e22d21 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -26,6 +26,9 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" + "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -41,8 +44,6 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/topsql" - "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" ) var ( @@ -774,8 +775,14 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onSetDefaultValue(t, job) case model.ActionAddIndex: ver, err = w.onCreateIndex(d, t, job, false) + if err != nil { + logutil.BgLogger().Error("[ddl] fail to run add index job", zap.Error(err)) + } case model.ActionAddPrimaryKey: ver, err = w.onCreateIndex(d, t, job, true) + if err != nil { + logutil.BgLogger().Error("[ddl] fail to run add index job", zap.Error(err)) + } case model.ActionDropIndex, model.ActionDropPrimaryKey: ver, err = onDropIndex(t, job) case model.ActionDropIndexes: From b268db5084c838932a27e4add2078bdaecbe33ec Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Jan 2022 23:13:34 +0800 Subject: [PATCH 08/19] try fix --- ddl/index.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index e97ca22dbc86c..2f0c023fce393 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -552,14 +552,9 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo job.SnapshotVer = 0 job.SchemaState = model.StateWriteReorganization case model.StateWriteReorganization: - currentVer, err := d.store.CurrentVersion(kv.GlobalTxnScope) - currentTS := job.StartTS - if err == nil { - currentTS = currentVer.Ver - } // TODO: optimize index ddl. if *sst.IndexDDLLightning { - err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, currentTS, indexInfo.Unique}) + err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, job.StartTS, indexInfo.Unique}) if err != nil { return ver, errors.Annotate(err, "PrepareIndexOp err") } @@ -593,7 +588,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo logutil.BgLogger().Error("FinishIndexOp err1" + err.Error()) return ver, errors.Trace(err) } else { - err = sst.FinishIndexOp(w.ctx, currentTS, ctx.(sqlexec.RestrictedSQLExecutor), tbl, indexInfo.Unique) + err = sst.FinishIndexOp(w.ctx, job.StartTS, ctx.(sqlexec.RestrictedSQLExecutor), tbl, indexInfo.Unique) if err != nil { logutil.BgLogger().Error("FinishIndexOp err2" + err.Error()) return ver, errors.Trace(err) From f7cdb0ec244f458faad0e7d9d2d57416d94005a2 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Jan 2022 23:21:44 +0800 Subject: [PATCH 09/19] fix bug --- ddl/sst/common.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/sst/common.go b/ddl/sst/common.go index 9a2913f84600c..91fe62878184e 100644 --- a/ddl/sst/common.go +++ b/ddl/sst/common.go @@ -130,9 +130,9 @@ func generateLightningConfig(info ClusterInfo, unique bool) *config.Config { cfg.Checkpoint.Enable = false cfg.TikvImporter.SortedKVDir = name if unique { - cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone - } else { cfg.TikvImporter.DuplicateResolution = config.DupeResAlgAbort + } else { + cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone } cfg.TiDB.PdAddr = info.PdAddr cfg.TiDB.Host = "127.0.0.1" From 1e316784bed3f5c1bcf51ab8f97ff0406a46e90d Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 7 Jan 2022 23:49:15 +0800 Subject: [PATCH 10/19] fix bug --- br/pkg/lightning/backend/local/iterator.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index b265c9d4e889f..a074e68e4494f 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -126,15 +126,14 @@ func (d *duplicateIter) Next() bool { d.curVal = append(d.curVal[:0], d.iter.Value()...) return true } + d.logger.Info("[detect-dupe] local duplicate key detected", + logutil.Key("key", d.curKey), + logutil.Key("prevValue", d.curVal), + logutil.Key("value", d.iter.Value())) if d.duplicateAbort { d.err = tidbkv.ErrKeyExists.FastGenByArgs(d.curKey, "unknown") return false } - - d.logger.Debug("[detect-dupe] local duplicate key detected", - logutil.Key("key", d.curKey), - logutil.Key("prevValue", d.curVal), - logutil.Key("value", d.iter.Value())) if !recordFirst { d.record(d.curRawKey, d.curVal) recordFirst = true From 11c1df6f31b096b76fbb370f88c8004977488dcb Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 8 Jan 2022 00:02:24 +0800 Subject: [PATCH 11/19] fix bug --- br/pkg/lightning/backend/local/iterator.go | 2 ++ br/pkg/lightning/backend/local/local.go | 1 + 2 files changed, 3 insertions(+) diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index a074e68e4494f..0bd9ebb643aaf 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -120,6 +120,7 @@ func (d *duplicateIter) Next() bool { if d.err != nil { return false } + d.logger.Debug("[detect-debug] current key", zap.ByteString("curKey", d.curKey)) if !bytes.Equal(d.nextKey, d.curKey) { d.curKey, d.nextKey = d.nextKey, d.curKey[:0] d.curRawKey = append(d.curRawKey[:0], d.iter.Key()...) @@ -205,6 +206,7 @@ func newKeyIter(ctx context.Context, engineFile *File, opts *pebble.IterOptions) newOpts.LowerBound = normalIterStartKey opts = &newOpts } + log.L().Info("duplicate detectStatus", zap.Bool("detect", engineFile.duplicateDetection), zap.Bool("abort", engineFile.duplicateAbort)) if !engineFile.duplicateDetection { return pebbleIter{Iterator: engineFile.db.NewIter(opts)} } diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index b0f96027e5921..8a0567c15ff8e 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1357,6 +1357,7 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, sstMetasChan: make(chan metaOrFlush), tableInfo: cfg.TableInfo, duplicateDetection: local.duplicateDetection, + duplicateAbort: local.duplicateAbort, duplicateDB: local.duplicateDB, errorMgr: local.errorMgr, } From cac9969990ef969bbd7f42a40b540ccdec494ed9 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 8 Jan 2022 00:51:51 +0800 Subject: [PATCH 12/19] add log --- br/pkg/lightning/backend/local/local.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 8a0567c15ff8e..7c9d64f2e5cf8 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1537,6 +1537,8 @@ func (local *local) WriteToTiKV( } count++ totalCount++ + log.L().Info("[debug-iter] iterator count", zap.Int("count", count), zap.Int64("totalCount", totalCount), + zap.ByteString("key", iter.Key()), zap.ByteString("value", iter.Value())) if count >= local.batchWriteKVPairs { for i := range clients { From a9014d1aee2207a55b921339f938705ee9089e08 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 8 Jan 2022 01:31:16 +0800 Subject: [PATCH 13/19] add log --- ddl/backfilling.go | 7 ++++--- ddl/index.go | 5 +++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 07fce74cde947..c26228f479bdb 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -24,6 +24,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/tikv/client-go/v2/tikv" + "go.uber.org/zap" + ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -39,8 +42,6 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" decoder "github.com/pingcap/tidb/util/rowDecoder" - "github.com/tikv/client-go/v2/tikv" - "go.uber.org/zap" ) type backfillWorkerType byte @@ -262,7 +263,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount)) w.ddlWorker.reorgCtx.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount) - if num := result.scanCount - lastLogCount; num >= 30000 { + if num := result.scanCount - lastLogCount; num >= 0 { lastLogCount = result.scanCount logutil.BgLogger().Info("[ddl] backfill worker back fill index", zap.Int("workerID", w.id), diff --git a/ddl/index.go b/ddl/index.go index 2f0c023fce393..eb933edfb8d7c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/tidb/br/pkg/lightning/log" tableutil "github.com/pingcap/tidb/table/tables/util" "github.com/pingcap/errors" @@ -1349,6 +1350,8 @@ func (w *addIndexWorker) backfillDataInTxnByRead(handleRange reorgBackfillTask) if idxRecord.skip { continue } + log.L().Info("[debug-fetch] idxRecord info", zap.Int("scanCount", taskCtx.scanCount), zap.Int("addedCount", taskCtx.addedCount), + zap.ByteString("key", idxRecord.key)) // TODO: check if need lock. // Lock the row key to notify us that someone delete or update the row, @@ -1370,6 +1373,8 @@ func (w *addIndexWorker) backfillDataInTxnByRead(handleRange reorgBackfillTask) } taskCtx.addedCount++ } + + log.L().Info("[debug-fetch] finish idxRecord info", zap.Int("scanCount", taskCtx.scanCount), zap.Int("addedCount", taskCtx.addedCount)) errInTxn = sst.FlushKeyValSync(context.TODO(), w.jobStartTs, w.wc) if errInTxn != nil { sst.LogError("FlushKeyValSync %d paris err: %v.", len(w.wc.Fetch()), errInTxn.Error()) From ed31f6a8dc85f8b32d29c38d5654d43d659f5d76 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 8 Jan 2022 02:07:22 +0800 Subject: [PATCH 14/19] fix --- ddl/sst/kvbuf.go | 8 +++++--- table/tables/index.go | 6 ++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ddl/sst/kvbuf.go b/ddl/sst/kvbuf.go index 21d578f282945..29258045e001b 100644 --- a/ddl/sst/kvbuf.go +++ b/ddl/sst/kvbuf.go @@ -2,6 +2,7 @@ package sst import ( "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/kv" ) // woker is addIndexWorker. one worker one cache. @@ -41,10 +42,11 @@ func (wc *WorkerKVCache) Fetch() []common.KvPair { return wc.pairs } -func (wc *WorkerKVCache) PushKeyValue(k, v []byte) { +func (wc *WorkerKVCache) PushKeyValue(k, v []byte, h kv.Handle) { p := common.KvPair{ - Key: wc.cloneBytes(k), - Val: wc.cloneBytes(v), + Key: wc.cloneBytes(k), + Val: wc.cloneBytes(v), + RowID: h.IntValue(), } wc.pairs = append(wc.pairs, p) } diff --git a/table/tables/index.go b/table/tables/index.go index a51a964bd990c..468dbf1a52b63 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -16,13 +16,15 @@ package tables import ( "context" - "github.com/pingcap/tidb/ddl/sst" "io" "sync" "time" + "github.com/pingcap/tidb/ddl/sst" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" @@ -197,7 +199,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue // TODO: optimize index ddl if *sst.IndexDDLLightning && c.jobStartTs > 0 { // err = sst.IndexOperator(ctx, c.jobStartTs, key, idxVal) - c.wc.PushKeyValue(key, idxVal) + c.wc.PushKeyValue(key, idxVal, h) return nil, nil } if !distinct || skipCheck || opt.Untouched { From 8655a01b77074a9de598367ce0ec3162dbcfd86b Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 8 Jan 2022 02:19:16 +0800 Subject: [PATCH 15/19] fix --- ddl/index.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index eb933edfb8d7c..5acda41a4d12f 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -16,6 +16,7 @@ package ddl import ( "context" + "strconv" "strings" "sync/atomic" "time" @@ -601,7 +602,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo // if timeout, we should return, check for the owner and re-wait job done. return ver, nil } - if kv.ErrKeyExists.Equal(err) || errCancelledDDLJob.Equal(err) || errCantDecodeRecord.Equal(err) { + if kv.ErrKeyExists.Equal(err) || errCancelledDDLJob.Equal(err) || errCantDecodeRecord.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10)) { logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) ver, err = convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, err) if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { @@ -1364,7 +1365,7 @@ func (w *addIndexWorker) backfillDataInTxnByRead(handleRange reorgBackfillTask) // Create the index. handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData) if err != nil { - if kv.ErrKeyExists.Equal(err) && idxRecord.handle.Equal(handle) { + if (kv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10))) && idxRecord.handle.Equal(handle) { // Index already exists, skip it. continue } From 63456fda96879d9ac41f62c77f3b46d0a7b214c0 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 8 Jan 2022 02:42:08 +0800 Subject: [PATCH 16/19] add failfast --- br/pkg/lightning/backend/local/local.go | 8 +++++--- br/pkg/utils/utildb/retry.go | 4 ++++ ddl/index.go | 4 ++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 7c9d64f2e5cf8..0980f44720e1c 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -26,6 +26,7 @@ import ( "os" "path/filepath" "sort" + "strconv" "strings" "sync" "time" @@ -74,6 +75,7 @@ import ( "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/utils/utilmath" "github.com/pingcap/tidb/br/pkg/version" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" @@ -1800,7 +1802,7 @@ WriteAndIngest: err = local.writeAndIngestPairs(ctx, engineFile, region, pairStart, end, regionSplitSize, regionSplitKeys) local.ingestConcurrency.Recycle(w) if err != nil { - if common.IsContextCanceledError(err) { + if common.IsContextCanceledError(err) || tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10)) { return err } _, regionStart, _ := codec.DecodeBytes(region.Region.StartKey, []byte{}) @@ -1847,7 +1849,7 @@ loopWrite: var rangeStats rangeStats metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engineFile, region, start, end, regionSplitSize, regionSplitKeys) if err != nil { - if common.IsContextCanceledError(err) { + if common.IsContextCanceledError(err) || tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10)) { return err } @@ -1985,7 +1987,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File backOffTime := time.Second for i := 0; i < maxRetryTimes; i++ { err = local.writeAndIngestByRange(ctx, engineFile, startKey, endKey, regionSplitSize, regionSplitKeys) - if err == nil || common.IsContextCanceledError(err) { + if err == nil || common.IsContextCanceledError(err) || tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10)) { return } log.L().Warn("write and ingest by range failed", diff --git a/br/pkg/utils/utildb/retry.go b/br/pkg/utils/utildb/retry.go index cf6a4f77e5b37..09a9031acab18 100644 --- a/br/pkg/utils/utildb/retry.go +++ b/br/pkg/utils/utildb/retry.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc/status" tmysql "github.com/pingcap/tidb/errno" + tidbkv "github.com/pingcap/tidb/kv" ) var retryableServerError = []string{ @@ -111,6 +112,9 @@ func isSingleRetryableError(err error) bool { case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows: return false } + if tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), "1062") { + return false + } switch nerr := err.(type) { case net.Error: diff --git a/ddl/index.go b/ddl/index.go index 5acda41a4d12f..5f8a04f81e68c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -588,12 +588,12 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo ctx, err := w.sessPool.get() if err != nil { logutil.BgLogger().Error("FinishIndexOp err1" + err.Error()) - return ver, errors.Trace(err) + err = errors.Trace(err) } else { err = sst.FinishIndexOp(w.ctx, job.StartTS, ctx.(sqlexec.RestrictedSQLExecutor), tbl, indexInfo.Unique) if err != nil { logutil.BgLogger().Error("FinishIndexOp err2" + err.Error()) - return ver, errors.Trace(err) + err = errors.Trace(err) } } } From 248e75487fb862ad77dc039f2624c13dfab66035 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 8 Jan 2022 02:52:57 +0800 Subject: [PATCH 17/19] fix bug --- br/pkg/lightning/backend/local/iterator.go | 6 +++--- br/pkg/lightning/backend/local/local.go | 4 ++-- ddl/backfilling.go | 2 +- ddl/index.go | 5 +++-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/br/pkg/lightning/backend/local/iterator.go b/br/pkg/lightning/backend/local/iterator.go index 0bd9ebb643aaf..557a4ed4f4b03 100644 --- a/br/pkg/lightning/backend/local/iterator.go +++ b/br/pkg/lightning/backend/local/iterator.go @@ -120,14 +120,14 @@ func (d *duplicateIter) Next() bool { if d.err != nil { return false } - d.logger.Debug("[detect-debug] current key", zap.ByteString("curKey", d.curKey)) + //d.logger.Debug("[detect-debug] current key", zap.ByteString("curKey", d.curKey)) if !bytes.Equal(d.nextKey, d.curKey) { d.curKey, d.nextKey = d.nextKey, d.curKey[:0] d.curRawKey = append(d.curRawKey[:0], d.iter.Key()...) d.curVal = append(d.curVal[:0], d.iter.Value()...) return true } - d.logger.Info("[detect-dupe] local duplicate key detected", + d.logger.Debug("[detect-dupe] local duplicate key detected", logutil.Key("key", d.curKey), logutil.Key("prevValue", d.curVal), logutil.Key("value", d.iter.Value())) @@ -206,7 +206,7 @@ func newKeyIter(ctx context.Context, engineFile *File, opts *pebble.IterOptions) newOpts.LowerBound = normalIterStartKey opts = &newOpts } - log.L().Info("duplicate detectStatus", zap.Bool("detect", engineFile.duplicateDetection), zap.Bool("abort", engineFile.duplicateAbort)) + //log.L().Info("duplicate detectStatus", zap.Bool("detect", engineFile.duplicateDetection), zap.Bool("abort", engineFile.duplicateAbort)) if !engineFile.duplicateDetection { return pebbleIter{Iterator: engineFile.db.NewIter(opts)} } diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 0980f44720e1c..bc9963264ce0f 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1539,8 +1539,8 @@ func (local *local) WriteToTiKV( } count++ totalCount++ - log.L().Info("[debug-iter] iterator count", zap.Int("count", count), zap.Int64("totalCount", totalCount), - zap.ByteString("key", iter.Key()), zap.ByteString("value", iter.Value())) + //log.L().Info("[debug-iter] iterator count", zap.Int("count", count), zap.Int64("totalCount", totalCount), + // zap.ByteString("key", iter.Key()), zap.ByteString("value", iter.Value())) if count >= local.batchWriteKVPairs { for i := range clients { diff --git a/ddl/backfilling.go b/ddl/backfilling.go index c26228f479bdb..479921e11aa03 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -263,7 +263,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount)) w.ddlWorker.reorgCtx.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount) - if num := result.scanCount - lastLogCount; num >= 0 { + if num := result.scanCount - lastLogCount; num >= 30000 { lastLogCount = result.scanCount logutil.BgLogger().Info("[ddl] backfill worker back fill index", zap.Int("workerID", w.id), diff --git a/ddl/index.go b/ddl/index.go index 5f8a04f81e68c..eee14bdc259e4 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -585,7 +585,8 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo }) // TODO: optimize index ddl. if err == nil && *sst.IndexDDLLightning { - ctx, err := w.sessPool.get() + var ctx sessionctx.Context + ctx, err = w.sessPool.get() if err != nil { logutil.BgLogger().Error("FinishIndexOp err1" + err.Error()) err = errors.Trace(err) @@ -1375,7 +1376,7 @@ func (w *addIndexWorker) backfillDataInTxnByRead(handleRange reorgBackfillTask) taskCtx.addedCount++ } - log.L().Info("[debug-fetch] finish idxRecord info", zap.Int("scanCount", taskCtx.scanCount), zap.Int("addedCount", taskCtx.addedCount)) + //log.L().Info("[debug-fetch] finish idxRecord info", zap.Int("scanCount", taskCtx.scanCount), zap.Int("addedCount", taskCtx.addedCount)) errInTxn = sst.FlushKeyValSync(context.TODO(), w.jobStartTs, w.wc) if errInTxn != nil { sst.LogError("FlushKeyValSync %d paris err: %v.", len(w.wc.Fetch()), errInTxn.Error()) From 76d673ae81d3e1807ad6d7f604428ac22583c016 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 8 Jan 2022 10:21:14 +0800 Subject: [PATCH 18/19] fix bug --- br/pkg/lightning/backend/local/local.go | 4 +++- ddl/index.go | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index bc9963264ce0f..a08640eee814d 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1987,8 +1987,10 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File backOffTime := time.Second for i := 0; i < maxRetryTimes; i++ { err = local.writeAndIngestByRange(ctx, engineFile, startKey, endKey, regionSplitSize, regionSplitKeys) - if err == nil || common.IsContextCanceledError(err) || tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10)) { + if err == nil || common.IsContextCanceledError(err) { return + } else if tidbkv.ErrKeyExists.Equal(err) || strings.Contains(err.Error(), strconv.FormatInt(mysql.ErrDupEntry, 10)) { + break } log.L().Warn("write and ingest by range failed", zap.Int("retry time", i+1), log.ShortError(err)) diff --git a/ddl/index.go b/ddl/index.go index eee14bdc259e4..b1c1e22f036e3 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -21,7 +21,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/tidb/br/pkg/lightning/log" tableutil "github.com/pingcap/tidb/table/tables/util" "github.com/pingcap/errors" @@ -1352,8 +1351,8 @@ func (w *addIndexWorker) backfillDataInTxnByRead(handleRange reorgBackfillTask) if idxRecord.skip { continue } - log.L().Info("[debug-fetch] idxRecord info", zap.Int("scanCount", taskCtx.scanCount), zap.Int("addedCount", taskCtx.addedCount), - zap.ByteString("key", idxRecord.key)) + //log.L().Info("[debug-fetch] idxRecord info", zap.Int("scanCount", taskCtx.scanCount), zap.Int("addedCount", taskCtx.addedCount), + // zap.ByteString("key", idxRecord.key)) // TODO: check if need lock. // Lock the row key to notify us that someone delete or update the row, From 7b944a976a1aa87aa74d4b2697c12159e61334e9 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 8 Jan 2022 12:32:36 +0800 Subject: [PATCH 19/19] fix bug --- ddl/sst/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/sst/index.go b/ddl/sst/index.go index ca9e85dc1574d..12145d2d59c07 100644 --- a/ddl/sst/index.go +++ b/ddl/sst/index.go @@ -164,7 +164,7 @@ func FinishIndexOp(ctx context.Context, startTs uint64, exec sqlexec.RestrictedS } err = closeEngine.Cleanup(ctx) if err != nil { - return errors.Annotate(err, "engine.Cleanup err:%v") + return errors.Annotate(err, "engine.Cleanup err") } if unique { hasDupe, err := ei.backend.CollectRemoteDuplicateRows(ctx, tbl, ei.tbl.Name.O, &kv.SessionOptions{