Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
restore: update and restore GCLifeTime once when parallel (#220)
Browse files Browse the repository at this point in the history
* restore: update and restore GCLifeTime once when parallel

* Update lightning/restore/restore.go

Co-Authored-By: amyangfei <[email protected]>

* Update lightning/restore/restore.go

Co-Authored-By: amyangfei <[email protected]>

* Update lightning/restore/restore.go

Co-Authored-By: amyangfei <[email protected]>

* restore: fix bug in reviewing

* restore: call ObtainGCLifeTime closer to use

* restore: improve readabiilty

* restore: reduce struct copy and pointer deref

* restore: fix bug in reviewing

* Update lightning/restore/restore.go

Co-Authored-By: kennytm <[email protected]>

* restore: improve readability

* restore: refine mock expectation order

* restore: adjust detail
  • Loading branch information
lance6716 committed Aug 7, 2019
1 parent ae59251 commit e1376e0
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 52 deletions.
121 changes: 84 additions & 37 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ var (
// DeliverPauser is a shared pauser to pause progress to (*chunkRestore).encodeLoop
var DeliverPauser = common.NewPauser()

var gcLifeTimeKey struct{}

func init() {
cfg := tidbcfg.GetGlobalConfig()
cfg.Log.SlowThreshold = 3000
Expand Down Expand Up @@ -476,6 +474,66 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
}
}

type gcLifeTimeManager struct {
runningJobsLock sync.Mutex
runningJobs int
oriGCLifeTime string
}

func newGCLifeTimeManager() *gcLifeTimeManager {
// Default values of three member are enough to initialize this struct
return &gcLifeTimeManager{}
}

// Pre- and post-condition:
// if m.runningJobs == 0, GC life time has not been increased.
// if m.runningJobs > 0, GC life time has been increased.
// m.runningJobs won't be negative(overflow) since index concurrency is relatively small
func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

if m.runningJobs == 0 {
oriGCLifeTime, err := ObtainGCLifeTime(ctx, db)
if err != nil {
return err
}
m.oriGCLifeTime = oriGCLifeTime
err = increaseGCLifeTime(ctx, db)
if err != nil {
return err
}
}
m.runningJobs += 1
return nil
}

// Pre- and post-condition:
// if m.runningJobs == 0, GC life time has been tried to recovered. If this try fails, a warning will be printed.
// if m.runningJobs > 0, GC life time has not been recovered.
// m.runningJobs won't minus to negative since removeOneJob follows a successful addOneJob.
func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

m.runningJobs -= 1
if m.runningJobs == 0 {
err := UpdateGCLifeTime(ctx, db, m.oriGCLifeTime)
if err != nil {
query := fmt.Sprintf(
"UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'",
m.oriGCLifeTime,
)
log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed",
zap.String("query", query),
log.ShortError(err),
)
}
}
}

var gcLifeTimeKey struct{}

func (rc *RestoreController) restoreTables(ctx context.Context) error {
logTask := log.L().Begin(zap.InfoLevel, "restore all tables data")

Expand All @@ -492,11 +550,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)
oriGCLifeTime, err := ObtainGCLifeTime(ctx, rc.tidbMgr.db)
if err != nil {
return err
}
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, oriGCLifeTime)

manager := newGCLifeTimeManager()
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, manager)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
Expand Down Expand Up @@ -548,7 +604,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
wg.Wait()
stopPeriodicActions <- struct{}{}

err = restoreErr.Get()
err := restoreErr.Get()
logTask.End(zap.ErrorLevel, err)
return err
}
Expand Down Expand Up @@ -1332,21 +1388,18 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto
// DoChecksum do checksum for tables.
// table should be in <db>.<table>, format. e.g. foo.bar
func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) {
ori, err := increaseGCLifeTime(ctx, db)
if err != nil {
return nil, errors.Trace(err)
var err error
manager, ok := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager)
if !ok {
return nil, errors.New("No gcLifeTimeManager found in context, check context initialization")
}

if err = manager.addOneJob(ctx, db); err != nil {
return nil, err
}

// set it back finally
defer func() {
err := UpdateGCLifeTime(ctx, db, ori)
if err != nil {
query := fmt.Sprintf("UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", ori)
log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed",
zap.String("query", query),
log.ShortError(err),
)
}
}()
defer manager.removeOneJob(ctx, db)

task := log.With(zap.String("table", table)).Begin(zap.InfoLevel, "remote checksum")

Expand All @@ -1370,25 +1423,19 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum,
return &cs, nil
}

func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, err error) {
func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) {
// checksum command usually takes a long time to execute,
// so here need to increase the gcLifeTime for single transaction.
// try to get gcLifeTime from context first.
gcLifeTime, ok := ctx.Value(&gcLifeTimeKey).(string)
if !ok {
oriGCLifeTime, err = ObtainGCLifeTime(ctx, db)
if err != nil {
return "", err
}
} else {
oriGCLifeTime = gcLifeTime
}

// try to get gcLifeTimeManager from context first.
// DoChecksum has assure this getting action success.
manager, _ := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager)

var increaseGCLifeTime bool
if oriGCLifeTime != "" {
ori, err := time.ParseDuration(oriGCLifeTime)
if manager.oriGCLifeTime != "" {
ori, err := time.ParseDuration(manager.oriGCLifeTime)
if err != nil {
return "", errors.Trace(err)
return errors.Trace(err)
}
if ori < defaultGCLifeTime {
increaseGCLifeTime = true
Expand All @@ -1400,13 +1447,13 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string,
if increaseGCLifeTime {
err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String())
if err != nil {
return "", errors.Trace(err)
return err
}
}

failpoint.Inject("IncreaseGCUpdateDuration", nil)

return oriGCLifeTime, nil
return nil
}

////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit e1376e0

Please sign in to comment.