Skip to content

Commit

Permalink
ebs_br: allow temporary TiKV unreachable during starting snapshot bac…
Browse files Browse the repository at this point in the history
…kup (#49154)

close #49152, close #49153
  • Loading branch information
YuJuncen authored Jan 15, 2024
1 parent 186f08c commit ac71239
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 36 deletions.
19 changes: 15 additions & 4 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ type PdController struct {

// control the pause schedulers goroutine
schedulerPauseCh chan struct{}
// control the ttl of pausing schedulers
SchedulerPauseTTL time.Duration
}

// NewPdController creates a new PdController.
Expand Down Expand Up @@ -447,17 +449,19 @@ func (p *PdController) getStoreInfoWith(
func (p *PdController) doPauseSchedulers(ctx context.Context,
schedulers []string, post pdHTTPRequest) ([]string, error) {
// pause this scheduler with 300 seconds
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout.Seconds())})
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(p.ttlOfPausing().Seconds())})
if err != nil {
return nil, errors.Trace(err)
}
// PauseSchedulers remove pd scheduler temporarily.
removedSchedulers := make([]string, 0, len(schedulers))
for _, scheduler := range schedulers {
for _, addr := range p.getAllPDAddrs() {
_, err = post(ctx, addr, pdhttp.SchedulerByName(scheduler), p.cli, http.MethodPost, body)
var resp []byte
resp, err = post(ctx, addr, pdhttp.SchedulerByName(scheduler), p.cli, http.MethodPost, body)
if err == nil {
removedSchedulers = append(removedSchedulers, scheduler)
log.Info("Paused scheduler.", zap.String("response", string(resp)), zap.String("on", addr))
break
}
}
Expand Down Expand Up @@ -492,7 +496,7 @@ func (p *PdController) pauseSchedulersAndConfigWith(
}

go func() {
tick := time.NewTicker(pauseTimeout / 3)
tick := time.NewTicker(p.ttlOfPausing() / 3)
defer tick.Stop()

for {
Expand Down Expand Up @@ -637,7 +641,7 @@ func (p *PdController) doUpdatePDScheduleConfig(

func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error {
// pause this scheduler with 300 seconds
return p.doUpdatePDScheduleConfig(ctx, cfg, post, pdhttp.ConfigWithTTLSeconds(pauseTimeout.Seconds()))
return p.doUpdatePDScheduleConfig(ctx, cfg, post, pdhttp.ConfigWithTTLSeconds(p.ttlOfPausing().Seconds()))
}

func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg ClusterConfig,
Expand Down Expand Up @@ -1069,6 +1073,13 @@ func (p *PdController) Close() {
}
}

func (p *PdController) ttlOfPausing() time.Duration {
if p.SchedulerPauseTTL > 0 {
return p.SchedulerPauseTTL
}
return pauseTimeout
}

// FetchPDVersion get pd version
func FetchPDVersion(ctx context.Context, pdHTTPCli pdhttp.Client) (*semver.Version, error) {
ver, err := pdHTTPCli.GetPDVersion(ctx)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/task/operator",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/pdutil",
"//br/pkg/task",
Expand All @@ -18,6 +19,7 @@ go_library(
"@com_github_spf13_pflag//:pflag",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)
110 changes: 80 additions & 30 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ package operator
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"os"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/keepalive"
Expand All @@ -38,13 +43,16 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error)
}

func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWith(f func(ctx context.Context)) {
_ = cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil })
cx.cleanUpWithRetErr(nil, func(ctx context.Context) error { f(ctx); return nil })
}

func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithErr(f func(ctx context.Context) error) error {
func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithRetErr(errOut *error, f func(ctx context.Context) error) {
ctx, cancel := context.WithTimeout(context.Background(), cx.cfg.TTL)
defer cancel()
return f(ctx)
err := f(ctx)
if errOut != nil {
*errOut = multierr.Combine(*errOut, err)
}
}

type AdaptEnvForSnapshotBackupContext struct {
Expand All @@ -58,6 +66,18 @@ type AdaptEnvForSnapshotBackupContext struct {
runGrp *errgroup.Group
}

func (cx *AdaptEnvForSnapshotBackupContext) Close() {
cx.pdMgr.Close()
cx.kvMgr.Close()
}

func (cx *AdaptEnvForSnapshotBackupContext) GetBackOffer(operation string) utils.Backoffer {
state := utils.InitialRetryState(64, 1*time.Second, 10*time.Second)
bo := utils.GiveUpRetryOn(&state, berrors.ErrPossibleInconsistency)
bo = utils.VerboseRetry(bo, logutil.CL(cx).With(zap.String("operation", operation)))
return bo
}

func (cx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) {
logutil.CL(cx).Info("Stage ready.", append(notes, zap.String("component", name))...)
cx.rdGrp.Done()
Expand All @@ -77,6 +97,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
if err != nil {
return errors.Annotate(err, "failed to dial PD")
}
mgr.SchedulerPauseTTL = cfg.TTL
var tconf *tls.Config
if cfg.TLS.IsEnabled() {
tconf, err = cfg.TLS.ToTLSConfig()
Expand All @@ -97,73 +118,102 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
rdGrp: sync.WaitGroup{},
runGrp: eg,
}
defer cx.Close()

cx.rdGrp.Add(3)

eg.Go(func() error { return pauseGCKeeper(cx) })
eg.Go(func() error { return pauseSchedulerKeeper(cx) })
eg.Go(func() error { return pauseImporting(cx) })
go func() {
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
cfg.OnAllReady()
}
hintAllReady()
}()
defer func() {
if cfg.OnExit != nil {
cfg.OnExit()
}
}()

return eg.Wait()
}

func getCallerName() string {
name, err := os.Hostname()
if err != nil {
name = fmt.Sprintf("UNKNOWN-%d", rand.Int63())
}
return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid())
}

func pauseImporting(cx *AdaptEnvForSnapshotBackupContext) error {
denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", cx.kvMgr)
if _, err := denyLightning.DenyAllStores(cx, cx.cfg.TTL); err != nil {
suspendLightning := utils.NewSuspendImporting(getCallerName(), cx.kvMgr)
_, err := utils.WithRetryV2(cx, cx.GetBackOffer("suspend_lightning"), func(_ context.Context) (map[uint64]bool, error) {
return suspendLightning.DenyAllStores(cx, cx.cfg.TTL)
})
if err != nil {
return errors.Trace(err)
}
cx.ReadyL("pause_lightning")
cx.runGrp.Go(func() error {
err := denyLightning.Keeper(cx, cx.cfg.TTL)
cx.runGrp.Go(func() (err error) {
defer cx.cleanUpWithRetErr(&err, func(ctx context.Context) error {
if ctx.Err() != nil {
return errors.Annotate(ctx.Err(), "cleaning up timed out")
}
res, err := utils.WithRetryV2(ctx, cx.GetBackOffer("restore_lightning"),
func(ctx context.Context) (map[uint64]bool, error) { return suspendLightning.AllowAllStores(ctx) })
if err != nil {
return errors.Annotatef(err, "failed to allow all stores")
}
return suspendLightning.ConsistentWithPrev(res)
})

err = suspendLightning.Keeper(cx, cx.cfg.TTL)
if errors.Cause(err) != context.Canceled {
logutil.CL(cx).Warn("keeper encounters error.", logutil.ShortError(err))
return err
}
return cx.cleanUpWithErr(func(ctx context.Context) error {
for {
if ctx.Err() != nil {
return errors.Annotate(ctx.Err(), "cleaning up timed out")
}
res, err := denyLightning.AllowAllStores(ctx)
if err != nil {
logutil.CL(ctx).Warn("Failed to restore lightning, will retry.", logutil.ShortError(err))
// Retry for 10 times.
time.Sleep(cx.cfg.TTL / 10)
continue
}
return denyLightning.ConsistentWithPrev(res)
}
})
// Clean up the canceled error.
err = nil
return
})
return nil
}

func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error {
func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: int64(ctx.cfg.TTL.Seconds()),
BackupTS: ctx.cfg.SafePoint,
TTL: int64(cx.cfg.TTL.Seconds()),
BackupTS: cx.cfg.SafePoint,
}
if sp.BackupTS == 0 {
rts, err := ctx.pdMgr.GetMinResolvedTS(ctx)
rts, err := cx.pdMgr.GetMinResolvedTS(cx)
if err != nil {
return err
}
logutil.CL(ctx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
logutil.CL(cx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
sp.BackupTS = rts
}
err := utils.StartServiceSafePointKeeper(ctx, ctx.pdMgr.GetPDClient(), sp)
err = utils.StartServiceSafePointKeeper(cx, cx.pdMgr.GetPDClient(), sp)
if err != nil {
return err
}
ctx.ReadyL("pause_gc", zap.Object("safepoint", sp))
cx.ReadyL("pause_gc", zap.Object("safepoint", sp))
defer cx.cleanUpWithRetErr(&err, func(ctx context.Context) error {
cancelSP := utils.BRServiceSafePoint{
ID: sp.ID,
TTL: 0,
}
return utils.UpdateServiceSafePoint(ctx, cx.pdMgr.GetPDClient(), cancelSP)
})
// Note: in fact we can directly return here.
// But the name `keeper` implies once the function exits,
// the GC should be resume, so let's block here.
<-ctx.Done()
<-cx.Done()
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ type PauseGcConfig struct {

SafePoint uint64 `json:"safepoint" yaml:"safepoint"`
TTL time.Duration `json:"ttl" yaml:"ttl"`

OnAllReady func() `json:"-" yaml:"-"`
OnExit func() `json:"-" yaml:"-"`
}

func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) {
_ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.")
_ = f.DurationP("ttl", "i", 2*time.Minute, "The time-to-live of the safepoint.")
_ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.")
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 36,
shard_count = 37,
deps = [
"//br/pkg/errors",
"//pkg/kv",
Expand Down
76 changes: 76 additions & 0 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package utils

import (
"context"
stderrs "errors"
"fmt"
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -334,3 +336,77 @@ func (r *RetryWithBackoffer) RequestBackOff(ms int) {
func (r *RetryWithBackoffer) Inner() *tikv.Backoffer {
return r.bo
}

type verboseBackoffer struct {
inner Backoffer
logger *zap.Logger
groupID uuid.UUID
}

func (v *verboseBackoffer) NextBackoff(err error) time.Duration {
nextBackoff := v.inner.NextBackoff(err)
v.logger.Warn("Encountered err, retrying.",
zap.Stringer("nextBackoff", nextBackoff),
zap.String("err", err.Error()),
zap.Stringer("gid", v.groupID))
return nextBackoff
}

// Attempt returns the remain attempt times
func (v *verboseBackoffer) Attempt() int {
attempt := v.inner.Attempt()
if attempt > 0 {
v.logger.Debug("Retry attempt hint.", zap.Int("attempt", attempt), zap.Stringer("gid", v.groupID))
} else {
v.logger.Warn("Retry limit exceeded.", zap.Stringer("gid", v.groupID))
}
return attempt
}

func VerboseRetry(bo Backoffer, logger *zap.Logger) Backoffer {
if logger == nil {
logger = log.L()
}
vlog := &verboseBackoffer{
inner: bo,
logger: logger,
groupID: uuid.New(),
}
return vlog
}

type failedOnErr struct {
inner Backoffer
failed bool
failedOn []error
}

// NextBackoff returns a duration to wait before retrying again
func (f *failedOnErr) NextBackoff(err error) time.Duration {
for _, fatalErr := range f.failedOn {
if stderrs.Is(errors.Cause(err), fatalErr) {
f.failed = true
return 0
}
}
if !f.failed {
return f.inner.NextBackoff(err)
}
return 0
}

// Attempt returns the remain attempt times
func (f *failedOnErr) Attempt() int {
if f.failed {
return 0
}
return f.inner.Attempt()
}

func GiveUpRetryOn(bo Backoffer, errs ...error) Backoffer {
return &failedOnErr{
inner: bo,
failed: false,
failedOn: errs,
}
}
Loading

0 comments on commit ac71239

Please sign in to comment.