Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: remove totalFlashbackRegions since task completion region already contains all regions (#40504) #41433

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 15 additions & 25 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package restore
import (
"context"
"io"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -80,29 +79,27 @@ func NewStoreMeta(storeId uint64) StoreMeta {

// for test
type Recovery struct {
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
totalFlashbackRegions uint64
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
}

func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, concurrency uint32) Recovery {
totalStores := len(allStores)
var StoreMetas = make([]StoreMeta, totalStores)
var regionRecovers = make(map[uint64][]*recovpb.RecoverRegionRequest, totalStores)
return Recovery{
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress,
concurrency: concurrency,
totalFlashbackRegions: 0}
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress,
concurrency: concurrency}
}

func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) {
Expand Down Expand Up @@ -332,14 +329,8 @@ func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolve

// flashback the region data to version resolveTS
func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error) {
var completedRegions atomic.Uint64

// only know the total progress of tikv, progress is total state of the whole restore flow.
ratio := int(recovery.totalFlashbackRegions) / len(recovery.allStores)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, commitTS-1, commitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}

Expand All @@ -354,13 +345,12 @@ func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint
return errors.Trace(err)
}

recovery.progress.IncBy(int64(completedRegions.Load()) / int64(ratio))

log.Info("region flashback complete",
zap.Uint64("resolveTS", resolveTS),
zap.Uint64("commitTS", commitTS),
zap.Int("regions", runner.CompletedRegions()))

recovery.progress.Inc()
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
}

log.Debug("total tikv", zap.Int("total", numBackupStore), zap.String("progress file", cfg.ProgressFile))
// progress = read meta + send recovery + iterate tikv + flashback.
progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*4), !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*4), cfg.ProgressFile)
// progress = read meta + send recovery + iterate tikv + (1 * prepareflashback + 1 * flashback)
progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*3+2), !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*3+2), cfg.ProgressFile)

// restore tikv data from a snapshot volume
var totalRegions int
Expand Down