Skip to content

Commit

Permalink
post-process: support run table analyze after all tables are finished (
Browse files Browse the repository at this point in the history
…pingcap#509)

* batch split with limit

* batch split with limit

* update

* add log with split region failed

* set batch split size to 2048

* add delay to retry split region

* set outer loop retry split regions to a bigger value

* update

* add retry for region scatter

* update br

* wait some time before retry scatter region

* add start/end key to log if scan region failed

* update br

* fix session

* work around a panic

* fix unit test

* support analyze at last

* fix

* fix

* fix

* better naming and add some comments

Co-authored-by: lance6716 <[email protected]>
  • Loading branch information
glorv and lance6716 authored Dec 22, 2020
1 parent c2b6dde commit 7622f91
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 21 deletions.
1 change: 1 addition & 0 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ type PostRestore struct {
Compact bool `toml:"compact" json:"compact"`
Checksum PostOpLevel `toml:"checksum" json:"checksum"`
Analyze PostOpLevel `toml:"analyze" json:"analyze"`
AnalyzeAtLast bool `toml:"analyze-at-last" json:"analyze-at-last"`
}

type CSVConfig struct {
Expand Down
89 changes: 68 additions & 21 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,16 +683,24 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}
}

type task struct {
tr *TableRestore
cp *TableCheckpoint
}

totalTables := 0
for _, dbMeta := range rc.dbMetas {
totalTables += len(dbMeta.Tables)
}
postProcessTaskChan := make(chan task, totalTables)

var wg sync.WaitGroup
var restoreErr common.OnceError

stopPeriodicActions := make(chan struct{})
go rc.runPeriodicActions(ctx, stopPeriodicActions)
defer close(stopPeriodicActions)

type task struct {
tr *TableRestore
cp *TableCheckpoint
}
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)

Expand All @@ -706,12 +714,15 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
for task := range taskCh {
tableLogTask := task.tr.logger.Begin(zap.InfoLevel, "restore table")
web.BroadcastTableCheckpoint(task.tr.tableName, task.cp)
err := task.tr.restoreTable(ctx2, rc, task.cp)
needPostProcess, err := task.tr.restoreTable(ctx2, rc, task.cp)
err = errors.Annotatef(err, "restore table %s failed", task.tr.tableName)
tableLogTask.End(zap.ErrorLevel, err)
web.BroadcastError(task.tr.tableName, err)
metric.RecordTableCount("completed", err)
restoreErr.Set(err)
if needPostProcess {
postProcessTaskChan <- task
}
wg.Done()
}
}()
Expand Down Expand Up @@ -822,7 +833,32 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}

wg.Wait()
close(stopPeriodicActions)
// if context is done, should return directly
select {
case <-ctx.Done():
err = restoreErr.Get()
if err == nil {
err = ctx.Err()
}
logTask.End(zap.ErrorLevel, err)
return err
default:
}

close(postProcessTaskChan)
// otherwise, we should run all tasks in the post-process task chan
for i := 0; i < rc.cfg.App.TableConcurrency; i++ {
wg.Add(1)
go func() {
for task := range postProcessTaskChan {
// force all the remain post-process tasks to be executed
_, err := task.tr.postProcess(ctx, rc, task.cp, true)
restoreErr.Set(err)
}
wg.Done()
}()
}
wg.Wait()

err = restoreErr.Get()
logTask.End(zap.ErrorLevel, err)
Expand All @@ -833,12 +869,12 @@ func (t *TableRestore) restoreTable(
ctx context.Context,
rc *RestoreController,
cp *TableCheckpoint,
) error {
) (bool, error) {
// 1. Load the table info.

select {
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
default:
}

Expand All @@ -850,10 +886,10 @@ func (t *TableRestore) restoreTable(
)
} else if cp.Status < CheckpointStatusAllWritten {
if err := t.populateChunks(ctx, rc, cp); err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}
if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, t.tableName, cp.Engines); err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}
web.BroadcastTableCheckpoint(t.tableName, cp)

Expand All @@ -876,11 +912,11 @@ func (t *TableRestore) restoreTable(
// 2. Restore engines (if still needed)
err := t.restoreEngines(ctx, rc, cp)
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}

// 3. Post-process
return errors.Trace(t.postProcess(ctx, rc, cp))
// 3. Post-process. With the last parameter set to false, we can allow delay analyze execute latter
return t.postProcess(ctx, rc, cp, false /* force-analyze */)
}

func (t *TableRestore) restoreEngines(ctx context.Context, rc *RestoreController, cp *TableCheckpoint) error {
Expand Down Expand Up @@ -1181,12 +1217,16 @@ func (t *TableRestore) importEngine(
return nil
}

func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, cp *TableCheckpoint) error {
// postProcess execute rebase-auto-id/checksum/analyze according to the task config.
//
// if the parameter forceAnalyze to true, postProcess force run analyze even if the analyze-at-last config is true.
// And if analyze phase is skipped, the first return value will be true.
func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, cp *TableCheckpoint, forceAnalyze bool) (bool, error) {
// there are no data in this table, no need to do post process
// this is important for tables that are just the dump table of views
// because at this stage, the table was already deleted and replaced by the related view
if len(cp.Engines) == 1 {
return nil
return false, nil
}

// 3. alter table set auto_increment
Expand All @@ -1203,15 +1243,16 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c
rc.alterTableLock.Unlock()
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, err, CheckpointStatusAlteredAutoInc)
if err != nil {
return err
return false, err
}
cp.Status = CheckpointStatusAlteredAutoInc
}

// tidb backend don't need checksum & analyze
if !rc.backend.ShouldPostProcess() {
t.logger.Debug("skip checksum & analyze, not supported by this backend")
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, nil, CheckpointStatusAnalyzeSkipped)
return nil
return false, nil
}

// 4. do table checksum
Expand All @@ -1238,17 +1279,20 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c
}
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, err, CheckpointStatusChecksummed)
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}
}
cp.Status = CheckpointStatusChecksummed
}

// 5. do table analyze
finished := true
if cp.Status < CheckpointStatusAnalyzed {
if rc.cfg.PostRestore.Analyze == config.OpLevelOff {
t.logger.Info("skip analyze")
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, nil, CheckpointStatusAnalyzeSkipped)
} else {
cp.Status = CheckpointStatusAnalyzed
} else if forceAnalyze || !rc.cfg.PostRestore.AnalyzeAtLast {
err := t.analyzeTable(ctx, rc.tidbGlue.GetSQLExecutor())
// witch post restore level 'optional', we will skip analyze error
if rc.cfg.PostRestore.Analyze == config.OpLevelOptional {
Expand All @@ -1259,12 +1303,15 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c
}
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, err, CheckpointStatusAnalyzed)
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}
cp.Status = CheckpointStatusAnalyzed
} else {
finished = false
}
}

return nil
return !finished, nil
}

// do full compaction for the whole data.
Expand Down
2 changes: 2 additions & 0 deletions tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ level-1-compact = false
# if set true, compact will do full compaction to tikv data.
# if this setting is missing, the default value is false.
compact = false
# if set to true, lightning will analyze all table together at last
analyze-at-last = false

# cron performs some periodic actions in background
[cron]
Expand Down

0 comments on commit 7622f91

Please sign in to comment.