Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snap_restore: added retry for recovery (#46094) #46225

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 89 additions & 7 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,75 @@ import (
"google.golang.org/grpc/backoff"
)

type RecoveryStage int

const (
StageUnknown RecoveryStage = iota
StageCollectingMeta
StageMakingRecoveryPlan
StageResetPDAllocateID
StageRecovering
StageFlashback
)

func (s RecoveryStage) String() string {
switch s {
case StageCollectingMeta:
return "collecting meta"
case StageMakingRecoveryPlan:
return "making recovery plan"
case StageResetPDAllocateID:
return "resetting PD allocate ID"
case StageRecovering:
return "recovering"
case StageFlashback:
return "flashback"
default:
return "unknown"
}
}

type recoveryError struct {
error
atStage RecoveryStage
}

func FailedAt(err error) RecoveryStage {
if rerr, ok := err.(recoveryError); ok {
return rerr.atStage
}
return StageUnknown
}

type recoveryBackoffer struct {
state utils.RetryState
}

func newRecoveryBackoffer() *recoveryBackoffer {
return &recoveryBackoffer{
state: utils.InitialRetryState(16, 30*time.Second, 4*time.Minute),
}
}

func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration {
s := FailedAt(err)
switch s {
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering:
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s))
return bo.state.ExponentialBackoff()
case StageFlashback:
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s))
bo.state.GiveUp()
return 0
}
log.Warn("unknown stage of backing off.", zap.Int("val", int(s)))
return bo.state.ExponentialBackoff()
}

func (bo *recoveryBackoffer) Attempt() int {
return bo.state.Attempt()
}

// RecoverData recover the tikv cluster
// 1. read all meta data from tikvs
// 2. make recovery plan and then recovery max allocate ID firstly
Expand All @@ -35,39 +104,52 @@ import (
// 5. prepare the flashback
// 6. flashback to resolveTS
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
// Roughly handle the case that some TiKVs are rebooted during making plan.
// Generally, retry the whole procedure will be fine for most cases. But perhaps we can do finer-grained retry,
// say, we may reuse the recovery plan, and probably no need to rebase PD allocation ID once we have done it.
return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) {
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency)
})
}

func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()

var recovery = NewRecovery(allStores, mgr, progress, concurrency)
if err := recovery.ReadRegionMeta(ctx); err != nil {
return 0, errors.Trace(err)
return 0, recoveryError{error: err, atStage: StageCollectingMeta}
}

totalRegions := recovery.GetTotalRegions()

if err := recovery.MakeRecoveryPlan(); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageMakingRecoveryPlan}
}

log.Info("recover the alloc id to pd", zap.Uint64("max alloc id", recovery.MaxAllocID))
if err := recovery.mgr.RecoverBaseAllocID(ctx, recovery.MaxAllocID); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageResetPDAllocateID}
}

// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
// This wathcher will retrigger `RecoveryRegions` for those stores.
recovery.SpawnTiKVShutDownWatchers(ctx)
if err := recovery.RecoverRegions(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.WaitApply(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
}

if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
}

return totalRegions, nil
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (rs *RetryState) ExponentialBackoff() time.Duration {
return backoff
}

func (rs *RetryState) GiveUp() {
rs.retryTimes = rs.maxRetry
}

// InitialRetryState make the initial state for retrying.
func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState {
return RetryState{
Expand Down
40 changes: 29 additions & 11 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var retryableServerError = []string{
// RetryableFunc presents a retryable operation.
type RetryableFunc func() error

type RetryableFuncV2[T any] func(context.Context) (T, error)

// Backoffer implements a backoff policy for retrying operations.
type Backoffer interface {
// NextBackoff returns a duration to wait before retrying again
Expand All @@ -51,21 +53,37 @@ func WithRetry(
retryableFunc RetryableFunc,
backoffer Backoffer,
) error {
_, err := WithRetryV2[struct{}](ctx, backoffer, func(ctx context.Context) (struct{}, error) {
innerErr := retryableFunc()
return struct{}{}, innerErr
})
return err
}

// WithRetryV2 retries a given operation with a backoff policy.
//
// Returns the returned value if `retryableFunc` succeeded at least once. Otherwise, returns a
// multierr that containing all errors encountered.
// Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value.
func WithRetryV2[T any](
ctx context.Context,
backoffer Backoffer,
fn RetryableFuncV2[T],
) (T, error) {
var allErrors error
for backoffer.Attempt() > 0 {
err := retryableFunc()
if err != nil {
allErrors = multierr.Append(allErrors, err)
select {
case <-ctx.Done():
return allErrors // nolint:wrapcheck
case <-time.After(backoffer.NextBackoff(err)):
}
} else {
return nil
res, err := fn(ctx)
if err == nil {
return res, nil
}
allErrors = multierr.Append(allErrors, err)
select {
case <-ctx.Done():
return *new(T), allErrors
case <-time.After(backoffer.NextBackoff(err)):
}
}
return allErrors // nolint:wrapcheck
return *new(T), allErrors // nolint:wrapcheck
}

// MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.
Expand Down