Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshot_backup: deny executing tidb-lightning import while running snapshot_backup (#47001) #47341

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3061,8 +3061,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:dLoYgMFgzUaS6fAAPdjA7oGDM0LdCIm+qhgb3PzrDps=",
version = "v0.0.0-20230726063044-73d6d7f3756b",
sum = "h1:tBKPWWqgWEBs04BV4UN7RhtUkZDs0oz+WyMbtRDVtL8=",
version = "v0.0.0-20230928035022-1bdcc25ed63c",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {
NewBackupCommand(),
NewRestoreCommand(),
NewStreamCommand(),
newOpeartorCommand(),
newOperatorCommand(),
)
// Outputs cmd.Print to stdout.
rootCmd.SetOut(os.Stdout)
Expand Down
19 changes: 12 additions & 7 deletions br/cmd/br/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/spf13/cobra"
)

func newOpeartorCommand() *cobra.Command {
func newOperatorCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "operator <subcommand>",
Short: "utilities for operators like tidb-operator.",
Expand All @@ -26,24 +26,29 @@ func newOpeartorCommand() *cobra.Command {
},
Hidden: true,
}
cmd.AddCommand(newPauseGcCommand())
cmd.AddCommand(newPrepareForSnapshotBackupCommand(
"pause-gc-and-schedulers",
"(Will be replaced with `prepare-for-snapshot-backup`) pause gc, schedulers and importing until the program exits."))
cmd.AddCommand(newPrepareForSnapshotBackupCommand(
"prepare-for-snapshot-backup",
"pause gc, schedulers and importing until the program exits, for snapshot backup."))
return cmd
}

func newPauseGcCommand() *cobra.Command {
func newPrepareForSnapshotBackupCommand(use string, short string) *cobra.Command {
cmd := &cobra.Command{
Use: "pause-gc-and-schedulers",
Short: "pause gc and schedulers to the ts until the program exits.",
Use: use,
Short: short,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.PauseGcConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.PauseGCAndScheduler(ctx, &cfg)
return operator.AdaptEnvForSnapshotBackup(ctx, &cfg)
},
}
operator.DefineFlagsForPauseGcConfig(cmd.Flags())
operator.DefineFlagsForPrepareSnapBackup(cmd.Flags())
return cmd
}
2 changes: 2 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,6 @@ var (
ErrKVDownloadFailed = errors.Normalize("download sst failed", errors.RFCCodeText("BR:KV:ErrKVDownloadFailed"))
// ErrKVIngestFailed indicates a generic, retryable ingest error.
ErrKVIngestFailed = errors.Normalize("ingest sst failed", errors.RFCCodeText("BR:KV:ErrKVIngestFailed"))

ErrPossibleInconsistency = errors.Normalize("the cluster state might be inconsistent", errors.RFCCodeText("BR:KV:ErrPossibleInconsistency"))
)
18 changes: 18 additions & 0 deletions br/pkg/task/backup_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -144,6 +145,16 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
if e != nil {
return errors.Trace(err)
}
denyLightning := utils.NewSuspendImporting("backup_ebs_command", mgr.StoreManager)
_, err := denyLightning.DenyAllStores(ctx, utils.DefaultBRGCSafePointTTL)
if err != nil {
return errors.Annotate(err, "lightning from running")
}
go func() {
if err := denyLightning.Keeper(ctx, utils.DefaultBRGCSafePointTTL); err != nil {
log.Warn("cannot keep deny importing, the backup archive may not be useable if there were importing.", logutil.ShortError(err))
}
}()
defer func() {
if ctx.Err() != nil {
log.Warn("context canceled, doing clean work with background context")
Expand All @@ -155,6 +166,13 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
if restoreE := restoreFunc(ctx); restoreE != nil {
log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
res, err := denyLightning.AllowAllStores(ctx)
if err != nil {
log.Warn("failed to restore importing, you may need to wait until you are able to start importing", zap.Duration("wait_for", utils.DefaultBRGCSafePointTTL))
}
if err := denyLightning.ConsistentWithPrev(res); err != nil {
log.Warn("lightning hasn't been denied, the backup archive may not be usable.", logutil.ShortError(err))
}
}()
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ go_library(
"//br/pkg/pdutil",
"//br/pkg/task",
"//br/pkg/utils",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
],
Expand Down
124 changes: 104 additions & 20 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (
"context"
"crypto/tls"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/keepalive"
)

func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) {
Expand All @@ -34,59 +37,140 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error)
return mgr, nil
}

func cleanUpWith(f func(ctx context.Context)) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWith(f func(ctx context.Context)) {
_ = cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil })
}

func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithErr(f func(ctx context.Context) error) error {
ctx, cancel := context.WithTimeout(context.Background(), cx.cfg.TTL)
defer cancel()
f(ctx)
return f(ctx)
}

// PauseGCAndScheduler blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config.
type AdaptEnvForSnapshotBackupContext struct {
context.Context

pdMgr *pdutil.PdController
kvMgr *utils.StoreManager
cfg PauseGcConfig

rdGrp sync.WaitGroup
runGrp *errgroup.Group
}

func (cx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) {
logutil.CL(cx).Info("Stage ready.", append(notes, zap.String("component", name))...)
cx.rdGrp.Done()
}

func hintAllReady() {
// Hacking: some version of operators using the follow two logs to check whether we are ready...
log.Info("Schedulers are paused.")
log.Info("GC is paused.")
log.Info("All ready.")
}

// AdaptEnvForSnapshotBackup blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config.
// This function will block until the context being canceled.
func PauseGCAndScheduler(ctx context.Context, cfg *PauseGcConfig) error {
func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
mgr, err := dialPD(ctx, &cfg.Config)
if err != nil {
return err
return errors.Annotate(err, "failed to dial PD")
}

var tconf *tls.Config
if cfg.TLS.IsEnabled() {
tconf, err = cfg.TLS.ToTLSConfig()
if err != nil {
return errors.Annotate(err, "invalid tls config")
}
}
kvMgr := utils.NewStoreManager(mgr.GetPDClient(), keepalive.ClientParameters{
Time: cfg.Config.GRPCKeepaliveTime,
Timeout: cfg.Config.GRPCKeepaliveTimeout,
}, tconf)
eg, ectx := errgroup.WithContext(ctx)
cx := &AdaptEnvForSnapshotBackupContext{
Context: logutil.ContextWithField(ectx, zap.String("tag", "br_operator")),
pdMgr: mgr,
kvMgr: kvMgr,
cfg: *cfg,
rdGrp: sync.WaitGroup{},
runGrp: eg,
}
cx.rdGrp.Add(3)

eg.Go(func() error { return pauseGCKeeper(ectx, cfg, mgr) })
eg.Go(func() error { return pauseSchedulerKeeper(ectx, mgr) })
eg.Go(func() error { return pauseGCKeeper(cx) })
eg.Go(func() error { return pauseSchedulerKeeper(cx) })
eg.Go(func() error { return pauseImporting(cx) })
go func() {
cx.rdGrp.Wait()
hintAllReady()
}()

return eg.Wait()
}

func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdController) error {
func pauseImporting(cx *AdaptEnvForSnapshotBackupContext) error {
denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", cx.kvMgr)
if _, err := denyLightning.DenyAllStores(cx, cx.cfg.TTL); err != nil {
return errors.Trace(err)
}
cx.ReadyL("pause_lightning")
cx.runGrp.Go(func() error {
err := denyLightning.Keeper(cx, cx.cfg.TTL)
if errors.Cause(err) != context.Canceled {
logutil.CL(cx).Warn("keeper encounters error.", logutil.ShortError(err))
}
return cx.cleanUpWithErr(func(ctx context.Context) error {
for {
if ctx.Err() != nil {
return errors.Annotate(ctx.Err(), "cleaning up timed out")
}
res, err := denyLightning.AllowAllStores(ctx)
if err != nil {
logutil.CL(ctx).Warn("Failed to restore lightning, will retry.", logutil.ShortError(err))
// Retry for 10 times.
time.Sleep(cx.cfg.TTL / 10)
continue
}
return denyLightning.ConsistentWithPrev(res)
}
})
})
return nil
}

func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: int64(cfg.TTL.Seconds()),
BackupTS: cfg.SafePoint,
TTL: int64(ctx.cfg.TTL.Seconds()),
BackupTS: ctx.cfg.SafePoint,
}
if sp.BackupTS == 0 {
rts, err := ctl.GetMinResolvedTS(ctx)
rts, err := ctx.pdMgr.GetMinResolvedTS(ctx)
if err != nil {
return err
}
log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
logutil.CL(ctx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
sp.BackupTS = rts
}
err := utils.StartServiceSafePointKeeper(ctx, ctl.GetPDClient(), sp)
err := utils.StartServiceSafePointKeeper(ctx, ctx.pdMgr.GetPDClient(), sp)
if err != nil {
return err
}
log.Info("GC is paused.", zap.Object("safepoint", sp))
ctx.ReadyL("pause_gc", zap.Object("safepoint", sp))
// Note: in fact we can directly return here.
// But the name `keeper` implies once the function exits,
// the GC should be resume, so let's block here.
<-ctx.Done()
return nil
}

func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error {
undo, err := ctl.RemoveAllPDSchedulers(ctx)
func pauseSchedulerKeeper(ctx *AdaptEnvForSnapshotBackupContext) error {
undo, err := ctx.pdMgr.RemoveAllPDSchedulers(ctx)
if undo != nil {
defer cleanUpWith(func(ctx context.Context) {
defer ctx.cleanUpWith(func(ctx context.Context) {
if err := undo(ctx); err != nil {
log.Warn("failed to restore pd scheduler.", logutil.ShortError(err))
}
Expand All @@ -95,7 +179,7 @@ func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error {
if err != nil {
return err
}
log.Info("Schedulers are paused.")
ctx.ReadyL("pause_scheduler")
// Wait until the context canceled.
// So we can properly do the clean up work.
<-ctx.Done()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type PauseGcConfig struct {
TTL time.Duration `json:"ttl" yaml:"ttl"`
}

func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) {
func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) {
_ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.")
_ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.")
}
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"schema.go",
"sensitive.go",
"store_manager.go",
"suspend_importing.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/utils",
Expand All @@ -36,13 +37,15 @@ go_library(
"//parser/types",
"//sessionctx",
"//util",
"//util/engine",
"//util/sqlexec",
"@com_github_cheggaaa_pb_v3//:pb",
"@com_github_cznic_mathutil//:mathutil",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
Expand Down Expand Up @@ -80,6 +83,7 @@ go_test(
"safe_point_test.go",
"schema_test.go",
"sensitive_test.go",
"suspend_importing_test.go",
],
embed = [":utils"],
flaky = True,
Expand All @@ -103,11 +107,15 @@ go_test(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_multierr//:multierr",
],
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/store_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64)
if addr == "" {
addr = store.GetAddress()
}
log.Info("StoreManager: dialing to store.", zap.String("address", addr), zap.Uint64("store-id", storeID))
conn, err := grpc.DialContext(
ctx,
addr,
Expand Down
Loading