Skip to content

Commit

Permalink
Merge branch 'master' into add_historical_stats_test_2
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jan 19, 2023
2 parents 804fcdd + bbd1995 commit f3a19bb
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 29 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type Engine struct {
config backend.LocalEngineConfig
tableInfo *checkpoints.TidbTableInfo

dupDetectOpt dupDetectOpt

// total size of SST files waiting to be ingested
pendingFileSize atomic.Int64

Expand Down Expand Up @@ -981,7 +983,7 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
zap.String("table", common.UniqueTable(e.tableInfo.DB, e.tableInfo.Name)),
zap.Int64("tableID", e.tableInfo.ID),
zap.Stringer("engineUUID", e.UUID))
return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger)
return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt)
}

func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/pebble"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"go.uber.org/multierr"
Expand Down Expand Up @@ -82,6 +83,11 @@ type dupDetectIter struct {
writeBatch *pebble.Batch
writeBatchSize int64
logger log.Logger
option dupDetectOpt
}

type dupDetectOpt struct {
reportErrOnDup bool
}

func (d *dupDetectIter) Seek(key []byte) bool {
Expand Down Expand Up @@ -149,6 +155,14 @@ func (d *dupDetectIter) Next() bool {
d.curVal = append(d.curVal[:0], d.iter.Value()...)
return true
}
if d.option.reportErrOnDup {
dupKey := make([]byte, len(d.curKey))
dupVal := make([]byte, len(d.iter.Value()))
copy(dupKey, d.curKey)
copy(dupVal, d.curVal)
d.err = common.ErrFoundDuplicateKeys.FastGenByArgs(dupKey, dupVal)
return false
}
if !recordFirst {
d.record(d.curRawKey, d.curKey, d.curVal)
recordFirst = true
Expand Down Expand Up @@ -192,7 +206,7 @@ func (d *dupDetectIter) OpType() sst.Pair_OP {
var _ Iter = &dupDetectIter{}

func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter,
opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger) *dupDetectIter {
opts *pebble.IterOptions, dupDB *pebble.DB, logger log.Logger, dupOpt dupDetectOpt) *dupDetectIter {
newOpts := &pebble.IterOptions{TableFilter: opts.TableFilter}
if len(opts.LowerBound) > 0 {
newOpts.LowerBound = keyAdapter.Encode(nil, opts.LowerBound, math.MinInt64)
Expand All @@ -206,6 +220,7 @@ func newDupDetectIter(ctx context.Context, db *pebble.DB, keyAdapter KeyAdapter,
keyAdapter: keyAdapter,
writeBatch: dupDB.NewBatch(),
logger: logger,
option: dupOpt,
}
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestDupDetectIterator(t *testing.T) {
dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
require.NoError(t, err)
var iter Iter
iter = newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L())
iter = newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), dupDetectOpt{})
sort.Slice(pairs, func(i, j int) bool {
key1 := keyAdapter.Encode(nil, pairs[i].Key, pairs[i].RowID)
key2 := keyAdapter.Encode(nil, pairs[j].Key, pairs[j].RowID)
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestDupDetectIterSeek(t *testing.T) {

dupDB, err := pebble.Open(filepath.Join(storeDir, "duplicates"), &pebble.Options{})
require.NoError(t, err)
iter := newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L())
iter := newDupDetectIter(context.Background(), db, keyAdapter, &pebble.IterOptions{}, dupDB, log.L(), dupDetectOpt{})

require.True(t, iter.Seek([]byte{1, 2, 3, 1}))
require.Equal(t, pairs[1].Val, iter.Value())
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ type local struct {

checkTiKVAvaliable bool
duplicateDetection bool
duplicateDetectOpt dupDetectOpt
duplicateDB *pebble.DB
keyAdapter KeyAdapter
errorMgr *errormanager.ErrorManager
Expand Down Expand Up @@ -500,6 +501,7 @@ func NewLocalBackend(
engineMemCacheSize: int(cfg.TikvImporter.EngineMemCacheSize),
localWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize),
duplicateDetection: duplicateDetection,
duplicateDetectOpt: dupDetectOpt{duplicateDetection && cfg.TikvImporter.DuplicateResolution == config.DupeResAlgErr},
checkTiKVAvaliable: cfg.App.CheckRequirements,
duplicateDB: duplicateDB,
keyAdapter: keyAdapter,
Expand Down Expand Up @@ -804,6 +806,7 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e
config: engineCfg,
tableInfo: cfg.TableInfo,
duplicateDetection: local.duplicateDetection,
dupDetectOpt: local.duplicateDetectOpt,
duplicateDB: local.duplicateDB,
errorMgr: local.errorMgr,
keyAdapter: local.keyAdapter,
Expand Down Expand Up @@ -854,6 +857,7 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
tableInfo: cfg.TableInfo,
keyAdapter: local.keyAdapter,
duplicateDetection: local.duplicateDetection,
dupDetectOpt: local.duplicateDetectOpt,
duplicateDB: local.duplicateDB,
errorMgr: local.errorMgr,
logger: log.FromContext(ctx),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows"))
ErrFoundDuplicateKeys = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey"))
)

type withStack struct {
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ const (
// DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows.
DupeResAlgRemove

// DupeResAlgErr reports an error and stops the import process.
// Note: this value is only used for internal.
DupeResAlgErr
)

func (dra *DuplicateResolutionAlgorithm) UnmarshalTOML(v interface{}) error {
Expand Down
28 changes: 15 additions & 13 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2043,34 +2043,36 @@ func (rc *Client) RestoreKVFiles(
log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId))
skipFile += len(files)
} else {
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
rc.workerPool.ApplyOnErrorGroup(eg, func() (err error) {
fileStart := time.Now()
defer func() {
onProgress(int64(len(files)))
updateStats(uint64(kvCount), size)
summary.CollectInt("File", len(files))

filenames := make([]string, 0, len(files))
for _, f := range files {
filenames = append(filenames, f.Path+", ")
if err == nil {
filenames := make([]string, 0, len(files))
for _, f := range files {
filenames = append(filenames, f.Path+", ")
}
log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size),
zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames))
}
log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size),
zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames))
}()

return rc.fileImporter.ImportKVFiles(ectx, files, rule, rc.shiftStartTS, rc.startTS, rc.restoreTS, supportBatch)
})
}
}

if supportBatch {
err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc)
} else {
err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc)
}
if err != nil {
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
if supportBatch {
err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc)
} else {
err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc)
}
return errors.Trace(err)
}
})

log.Info("total skip files due to table id not matched", zap.Int("count", skipFile))
if skipFile > 0 {
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() {
if cfg.PitrBatchSize == 0 {
cfg.PitrBatchSize = defaultPiTRBatchSize
}

// another goroutine is used to iterate the backup file
cfg.PitrConcurrency += 1
log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency)))
cfg.Config.Concurrency = cfg.PitrConcurrency
}
Expand Down
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_library(
":ddl_friend",
],
deps = [
"//br/pkg/lightning/common",
"//br/pkg/utils",
"//config",
"//ddl/ingest",
Expand Down
40 changes: 35 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/ingest"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -836,8 +838,11 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
}
err = bc.FinishImport(indexInfo.ID, indexInfo.Unique, tbl)
if err != nil {
if kv.ErrKeyExists.Equal(err) {
if kv.ErrKeyExists.Equal(err) || common.ErrFoundDuplicateKeys.Equal(err) {
logutil.BgLogger().Warn("[ddl] import index duplicate key, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, indexInfo, tbl.Meta())
}
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
} else {
logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err))
Expand Down Expand Up @@ -879,6 +884,22 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
}
}

func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error {
tErr, ok := errors.Cause(originErr).(*terror.Error)
if !ok {
return originErr
}
if len(tErr.Args()) != 2 {
return originErr
}
key, keyIsByte := tErr.Args()[0].([]byte)
value, valIsByte := tErr.Args()[1].([]byte)
if !keyIsByte || !valIsByte {
return originErr
}
return genKeyExistsErr(key, value, idxInfo, tblInfo)
}

func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
Expand Down Expand Up @@ -1246,7 +1267,7 @@ func newAddIndexWorker(decodeColMap map[int64]decoder.Column, id int, t table.Ph
if err != nil {
return nil, errors.Trace(err)
}
lwCtx, err = ei.NewWriterCtx(id)
lwCtx, err = ei.NewWriterCtx(id, indexInfo.Unique)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1485,17 +1506,26 @@ func (w *addIndexWorker) checkHandleExists(key kv.Key, value []byte, handle kv.H
if hasBeenBackFilled {
return nil
}
return genKeyExistsErr(key, value, idxInfo, tblInfo)
}

func genKeyExistsErr(key, value []byte, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error {
idxColLen := len(idxInfo.Columns)
indexName := fmt.Sprintf("%s.%s", tblInfo.Name.String(), idxInfo.Name.String())
colInfos := tables.BuildRowcodecColInfoForIndexColumns(idxInfo, tblInfo)
values, err := tablecodec.DecodeIndexKV(key, value, idxColLen, tablecodec.HandleNotNeeded, colInfos)
if err != nil {
return err
logutil.BgLogger().Warn("decode index key value failed", zap.String("index", indexName),
zap.String("key", hex.EncodeToString(key)), zap.String("value", hex.EncodeToString(value)), zap.Error(err))
return kv.ErrKeyExists.FastGenByArgs(key, indexName)
}
indexName := fmt.Sprintf("%s.%s", w.index.TableMeta().Name.String(), w.index.Meta().Name.String())
valueStr := make([]string, 0, idxColLen)
for i, val := range values[:idxColLen] {
d, err := tablecodec.DecodeColumnValue(val, colInfos[i].Ft, time.Local)
if err != nil {
return kv.ErrKeyExists.FastGenByArgs(key.String(), indexName)
logutil.BgLogger().Warn("decode column value failed", zap.String("index", indexName),
zap.String("key", hex.EncodeToString(key)), zap.String("value", hex.EncodeToString(value)), zap.Error(err))
return kv.ErrKeyExists.FastGenByArgs(key, indexName)
}
str, err := d.ToString()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config
adjustImportMemory(memRoot, cfg)
cfg.Checkpoint.Enable = true
if unique {
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgRecord
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgErr
} else {
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone
}
Expand Down
22 changes: 17 additions & 5 deletions ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ingest
import (
"context"
"strconv"
"sync/atomic"

"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
Expand All @@ -41,6 +42,7 @@ type engineInfo struct {
writerCache generic.SyncMap[int, *backend.LocalEngineWriter]
memRoot MemRoot
diskRoot DiskRoot
rowSeq atomic.Int64
}

// NewEngineInfo create a new EngineInfo struct.
Expand Down Expand Up @@ -144,17 +146,18 @@ func (ei *engineInfo) ImportAndClean() error {
// WriterContext is used to keep a lightning local writer for each backfill worker.
type WriterContext struct {
ctx context.Context
rowSeq func() int64
lWrite *backend.LocalEngineWriter
}

func (ei *engineInfo) NewWriterCtx(id int) (*WriterContext, error) {
func (ei *engineInfo) NewWriterCtx(id int, unique bool) (*WriterContext, error) {
ei.memRoot.RefreshConsumption()
ok := ei.memRoot.CheckConsume(StructSizeWriterCtx)
if !ok {
return nil, genEngineAllocMemFailedErr(ei.memRoot, ei.jobID, ei.indexID)
}

wCtx, err := ei.newWriterContext(id)
wCtx, err := ei.newWriterContext(id, unique)
if err != nil {
logutil.BgLogger().Error(LitErrCreateContextFail, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID),
Expand All @@ -175,7 +178,7 @@ func (ei *engineInfo) NewWriterCtx(id int) (*WriterContext, error) {
// If local writer not exist, then create new one and store it into engine info writer cache.
// note: operate ei.writeCache map is not thread safe please make sure there is sync mechanism to
// make sure the safe.
func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) {
func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContext, error) {
lWrite, exist := ei.writerCache.Load(workerID)
if !exist {
var err error
Expand All @@ -186,10 +189,16 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) {
// Cache the local writer.
ei.writerCache.Store(workerID, lWrite)
}
return &WriterContext{
wc := &WriterContext{
ctx: ei.ctx,
lWrite: lWrite,
}, nil
}
if unique {
wc.rowSeq = func() int64 {
return ei.rowSeq.Add(1)
}
}
return wc, nil
}

func (ei *engineInfo) closeWriters() error {
Expand All @@ -213,6 +222,9 @@ func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error {
kvs := make([]common.KvPair, 1)
kvs[0].Key = key
kvs[0].Val = idxVal
if wCtx.rowSeq != nil {
kvs[0].RowID = wCtx.rowSeq()
}
row := kv.MakeRowsFromKvPairs(kvs)
return wCtx.lWrite.WriteRows(wCtx.ctx, nil, row)
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ error = '''
encode kv error in file %s at offset %d
'''

["Lightning:Restore:ErrFoundDuplicateKey"]
error = '''
found duplicate key '%s', value '%s'
'''

["Lightning:Restore:ErrInvalidMetaStatus"]
error = '''
invalid meta status: '%s'
Expand Down
Loading

0 comments on commit f3a19bb

Please sign in to comment.