Skip to content

Commit

Permalink
pkg/lightning: refine config for duplicate-resolution and pause GC …
Browse files Browse the repository at this point in the history
…when `duplicate-resolution` is enabled (pingcap#29249) (pingcap#29425)
  • Loading branch information
ti-srebot authored Nov 4, 2021
1 parent 64319c0 commit 9a7bcca
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 70 deletions.
14 changes: 7 additions & 7 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ func NewLocalBackend(
}

var duplicateDB *pebble.DB
if cfg.TikvImporter.DuplicateDetection {
if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
duplicateDB, err = openDuplicateDB(localFile)
if err != nil {
return backend.MakeBackend(nil), errors.Annotate(err, "open duplicate db failed")
Expand Down Expand Up @@ -970,7 +970,7 @@ func NewLocalBackend(

engineMemCacheSize: int(cfg.TikvImporter.EngineMemCacheSize),
localWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize),
duplicateDetection: cfg.TikvImporter.DuplicateDetection,
duplicateDetection: cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone,
checkTiKVAvaliable: cfg.App.CheckRequirements,
duplicateDB: duplicateDB,
errorMgr: errorMgr,
Expand Down Expand Up @@ -2139,13 +2139,13 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
}()

switch algorithm {
case config.DupeResAlgUnsafeNoop:
logger.Warn("[resolve-dupe] skipping resolution since algorithm is 'unsafe-noop'. this table will become inconsistent!")
case config.DupeResAlgRecord, config.DupeResAlgNone:
logger.Warn("[resolve-dupe] skipping resolution due to selected algorithm. this table will become inconsistent!", zap.Stringer("algorithm", algorithm))
return nil
case config.DupeResAlgKeepAnyOne:
panic("keep-any-one is not yet supported")
case config.DupeResAlgDelete:
case config.DupeResAlgRemove:
break
default:
panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm))
}

// TODO: reuse the *kv.SessionOptions from NewEncoder for picking the correct time zone.
Expand Down
75 changes: 35 additions & 40 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,26 +358,22 @@ func (cfg *MaxError) UnmarshalTOML(v interface{}) error {
type DuplicateResolutionAlgorithm int

const (
// DupeResAlgUnsafeNoop does not perform resolution. This is unsafe as the
// table will be left in an inconsistent state.
DupeResAlgUnsafeNoop DuplicateResolutionAlgorithm = iota

// DupeResAlgDelete deletes all information related to the duplicated rows.
// Users need to analyze the lightning_task_info.conflict_error_v1 table to
// add back the correct rows.
DupeResAlgDelete

// DupeResAlgKeepAnyOne keeps a single row from the any transitive set of
// duplicated rows. The choice is arbitrary.
// This algorithm is not implemented yet.
DupeResAlgKeepAnyOne
// DupeResAlgRecord only records duplicate records to `lightning_task_info.conflict_error_v1` table on the target TiDB.
DupeResAlgRecord DuplicateResolutionAlgorithm = iota

// DupeResAlgNone doesn't detect duplicate.
DupeResAlgNone

// DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows.
DupeResAlgRemove
)

func (dra *DuplicateResolutionAlgorithm) UnmarshalTOML(v interface{}) error {
if val, ok := v.(string); ok {
return dra.FromStringValue(val)
}
return errors.Errorf("invalid duplicate-resolution '%v', please choose valid option between ['unsafe-noop', 'delete']", v)
return errors.Errorf("invalid duplicate-resolution '%v', please choose valid option between ['record', 'none', 'remove']", v)
}

func (dra DuplicateResolutionAlgorithm) MarshalText() ([]byte, error) {
Expand All @@ -386,12 +382,14 @@ func (dra DuplicateResolutionAlgorithm) MarshalText() ([]byte, error) {

func (dra *DuplicateResolutionAlgorithm) FromStringValue(s string) error {
switch strings.ToLower(s) {
case "unsafe-noop":
*dra = DupeResAlgUnsafeNoop
case "delete":
*dra = DupeResAlgDelete
case "record":
*dra = DupeResAlgRecord
case "none":
*dra = DupeResAlgNone
case "remove":
*dra = DupeResAlgRemove
default:
return errors.Errorf("invalid duplicate-resolution '%s', please choose valid option between ['unsafe-noop', 'delete']", s)
return errors.Errorf("invalid duplicate-resolution '%s', please choose valid option between ['record', 'none', 'remove']", s)
}
return nil
}
Expand All @@ -406,12 +404,12 @@ func (dra *DuplicateResolutionAlgorithm) UnmarshalJSON(data []byte) error {

func (dra DuplicateResolutionAlgorithm) String() string {
switch dra {
case DupeResAlgUnsafeNoop:
return "unsafe-noop"
case DupeResAlgDelete:
return "delete"
case DupeResAlgKeepAnyOne:
return "keep-any-one"
case DupeResAlgRecord:
return "record"
case DupeResAlgNone:
return "none"
case DupeResAlgRemove:
return "remove"
default:
panic(fmt.Sprintf("invalid duplicate-resolution type '%d'", dra))
}
Expand Down Expand Up @@ -511,17 +509,15 @@ type FileRouteRule struct {
}

type TikvImporter struct {
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"`
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateDetection bool `toml:"duplicate-detection" json:"duplicate-detection"`

Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"`
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"`

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
Expand Down Expand Up @@ -646,7 +642,7 @@ func NewConfig() *Config {
SendKVPairs: 32768,
RegionSplitSize: 0,
DiskQuota: ByteSize(math.MaxInt64),
DuplicateResolution: DupeResAlgDelete,
DuplicateResolution: DupeResAlgRecord,
},
PostRestore: PostRestore{
Checksum: OpLevelRequired,
Expand Down Expand Up @@ -803,7 +799,6 @@ func (cfg *Config) Adjust(ctx context.Context) error {
cfg.PostRestore.Checksum = OpLevelOff
cfg.PostRestore.Analyze = OpLevelOff
cfg.PostRestore.Compact = false
cfg.TikvImporter.DuplicateDetection = false
case BackendImporter, BackendLocal:
// RegionConcurrency > NumCPU is meaningless.
cpuCount := runtime.NumCPU()
Expand All @@ -827,8 +822,8 @@ func (cfg *Config) Adjust(ctx context.Context) error {
if err := cfg.CheckAndAdjustForLocalBackend(); err != nil {
return err
}
} else if cfg.TikvImporter.DuplicateDetection {
return errors.Errorf("invalid config: unsupported backend (%s) for duplicate-detection", cfg.TikvImporter.Backend)
} else {
cfg.TikvImporter.DuplicateResolution = DupeResAlgNone
}

if cfg.TikvImporter.Backend == BackendTiDB {
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,20 @@ func (s *configTestSuite) TestDurationMarshalJSON(c *C) {
c.Assert(string(result), Equals, `"13m20s"`)
}

func (s *configTestSuite) TestDuplicateResolutionAlgorithm(c *C) {
var dra config.DuplicateResolutionAlgorithm
dra.FromStringValue("record")
c.Assert(dra, Equals, config.DupeResAlgRecord)
dra.FromStringValue("none")
c.Assert(dra, Equals, config.DupeResAlgNone)
dra.FromStringValue("remove")
c.Assert(dra, Equals, config.DupeResAlgRemove)

c.Assert(config.DupeResAlgRecord.String(), Equals, "record")
c.Assert(config.DupeResAlgNone.String(), Equals, "none")
c.Assert(config.DupeResAlgRemove.String(), Equals, "remove")
}

func (s *configTestSuite) TestLoadConfig(c *C) {
cfg, err := config.LoadGlobalConfig([]string{"-tidb-port", "sss"}, nil)
c.Assert(err, ErrorMatches, `invalid value "sss" for flag -tidb-port: parse error`)
Expand Down
114 changes: 114 additions & 0 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ import (
"sync"
"time"

"github.com/coreos/go-semver/semver"
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/importer"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -54,6 +57,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/collate"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -113,6 +117,11 @@ const (
compactionUpperThreshold = 32 * units.GiB
)

var (
minTiKVVersionForDuplicateResolution = *semver.New("5.2.0")
maxTiKVVersionForDuplicateResolution = version.NextMajorVersion()
)

// DeliverPauser is a shared pauser to pause progress to (*chunkRestore).encodeLoop
var DeliverPauser = common.NewPauser()

Expand Down Expand Up @@ -346,6 +355,17 @@ func NewRestoreControllerWithPauser(
maxOpenFiles = math.MaxInt32
}

if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
if err := tikv.CheckTiKVVersion(ctx, tls, cfg.TiDB.PdAddr, minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {
if berrors.Is(err, berrors.ErrVersionMismatch) {
log.L().Warn("TiKV version doesn't support duplicate resolution. The resolution algorithm will fall back to 'none'", zap.Error(err))
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone
} else {
return nil, errors.Annotate(err, "check TiKV version for duplicate resolution failed")
}
}
}

backend, err = local.NewLocalBackend(ctx, tls, cfg, g, maxOpenFiles, errorMgr)
if err != nil {
return nil, errors.Annotate(err, "build local backend failed")
Expand Down Expand Up @@ -1236,7 +1256,101 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s

var checksumManagerKey struct{}

const (
pauseGCTTLForDupeRes = time.Hour
pauseGCIntervalForDupeRes = time.Minute
)

func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}
defer pdCli.Close()

serviceID := "lightning-duplicate-resolution-" + uuid.New().String()
ttl := int64(pauseGCTTLForDupeRes / time.Second)

var (
safePoint uint64
paused bool
)
// Try to get the minimum safe point across all services as our GC safe point.
for i := 0; i < 10; i++ {
if i > 0 {
time.Sleep(time.Second * 3)
}
minSafePoint, err := pdCli.UpdateServiceGCSafePoint(ctx, serviceID, ttl, 1)
if err != nil {
return nil, errors.Trace(err)
}
newMinSafePoint, err := pdCli.UpdateServiceGCSafePoint(ctx, serviceID, ttl, minSafePoint)
if err != nil {
return nil, errors.Trace(err)
}
if newMinSafePoint <= minSafePoint {
safePoint = minSafePoint
paused = true
break
}
log.L().Warn(
"Failed to register GC safe point because the current minimum safe point is newer"+
" than what we assume, will retry newMinSafePoint next time",
zap.Uint64("minSafePoint", minSafePoint),
zap.Uint64("newMinSafePoint", newMinSafePoint),
)
}
if !paused {
return nil, errors.New("failed to pause GC for duplicate resolution after all retries")
}

exitCh := make(chan struct{})
go func(safePoint uint64) {
defer close(exitCh)
ticker := time.NewTicker(pauseGCIntervalForDupeRes)
defer ticker.Stop()
for {
select {
case <-ticker.C:
minSafePoint, err := pdCli.UpdateServiceGCSafePoint(ctx, serviceID, ttl, safePoint)
if err != nil {
log.L().Warn("Failed to register GC safe point", zap.Error(err))
continue
}
if minSafePoint > safePoint {
log.L().Warn("The current minimum safe point is newer than what we hold, duplicate records are at"+
"risk of being GC and not detectable",
zap.Uint64("safePoint", safePoint),
zap.Uint64("minSafePoint", minSafePoint),
)
safePoint = minSafePoint
}
case <-ctx.Done():
if _, err := pdCli.UpdateServiceGCSafePoint(ctx, serviceID, 0, safePoint); err != nil {
log.L().Warn("Failed to reset safe point ttl to zero", zap.Error(err))
}
return
}
}
}(safePoint)
return exitCh, nil
}

func (rc *Controller) restoreTables(ctx context.Context) error {
if rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
subCtx, cancel := context.WithCancel(ctx)
exitCh, err := rc.keepPauseGCForDupeRes(subCtx)
if err != nil {
cancel()
return errors.Trace(err)
}
defer func() {
cancel()
<-exitCh
}()
}

logTask := log.L().Begin(zap.InfoLevel, "restore all tables data")
if rc.tableWorkers == nil {
rc.tableWorkers = worker.NewPool(ctx, rc.cfg.App.TableConcurrency, "table")
Expand Down
12 changes: 4 additions & 8 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,14 +716,12 @@ func (tr *TableRestore) postProcess(

// 4.5. do duplicate detection.
hasDupe := false
if rc.cfg.TikvImporter.DuplicateDetection {
if rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
var err error
hasLocalDupe, err := rc.backend.CollectLocalDuplicateRows(ctx, tr.encTable, tr.tableName)
if err != nil {
tr.logger.Error("collect local duplicate keys failed", log.ShortError(err))
if rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgUnsafeNoop {
return false, err
}
return false, err
} else {
hasDupe = hasLocalDupe
}
Expand All @@ -738,13 +736,11 @@ func (tr *TableRestore) postProcess(
return true, nil
}

if needRemoteDupe && rc.cfg.TikvImporter.DuplicateDetection {
if needRemoteDupe && rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
hasRemoteDupe, e := rc.backend.CollectRemoteDuplicateRows(ctx, tr.encTable, tr.tableName)
if e != nil {
tr.logger.Error("collect remote duplicate keys failed", log.ShortError(e))
if rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgUnsafeNoop {
return false, e
}
return false, e
} else {
hasDupe = hasDupe || hasRemoteDupe
}
Expand Down
3 changes: 1 addition & 2 deletions br/tests/lightning_duplicate_detection/config1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ task-info-schema-name = 'lightning_task_info'

[tikv-importer]
backend = "local"
duplicate-detection = true
duplicate-resolution = 'unsafe-noop'
duplicate-resolution = 'record'

[checkpoint]
enable = true
Expand Down
3 changes: 1 addition & 2 deletions br/tests/lightning_duplicate_detection/config2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ task-info-schema-name = 'lightning_task_info'

[tikv-importer]
backend = "local"
duplicate-detection = true
duplicate-resolution = 'unsafe-noop'
duplicate-resolution = 'record'

[checkpoint]
enable = true
Expand Down
Loading

0 comments on commit 9a7bcca

Please sign in to comment.