Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snap_backup: make snapshot backup exit once importing task detected #46854

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
ErrBackupNoLeader = errors.Normalize("backup no leader", errors.RFCCodeText("BR:Backup:ErrBackupNoLeader"))
ErrBackupGCSafepointExceeded = errors.Normalize("backup GC safepoint exceeded", errors.RFCCodeText("BR:Backup:ErrBackupGCSafepointExceeded"))
ErrBackupConflicting = errors.Normalize("backup is conflicting with some tasks still running in the cluster.", errors.RFCCodeText("BR:Backup:Conflicting"))

ErrRestoreModeMismatch = errors.Normalize("restore mode mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreModeMismatch"))
ErrRestoreRangeMismatch = errors.Normalize("restore range mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreRangeMismatch"))
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ func (rc *Controller) checkSourceSchema(ctx context.Context) error {
return rc.doPreCheckOnItem(ctx, precheck.CheckSourceSchemaValid)
}

func (rc *Controller) checkCDCPiTR(ctx context.Context) error {
func (rc *Controller) checkConflictTask(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB {
return nil
}
return rc.doPreCheckOnItem(ctx, precheck.CheckTargetUsingCDCPITR)
return rc.doPreCheckOnItem(ctx, precheck.CheckTargetConflictTaskRunning)
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -2157,7 +2157,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
}
}
// even if checkpoint exists, we still need to make sure CDC/PiTR task is not running.
if err := rc.checkCDCPiTR(ctx); err != nil {
if err := rc.checkConflictTask(ctx); err != nil {
return common.ErrCheckCDCPiTR.Wrap(err).GenWithStackByArgs()
}
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID precheck.CheckItemID) (p
return NewLocalDiskPlacementCheckItem(b.cfg), nil
case precheck.CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
case precheck.CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil
case precheck.CheckTargetConflictTaskRunning:
return NewConflictedTaskCheckItem(b.cfg, b.pdLeaderAddrGetter), nil
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
26 changes: 18 additions & 8 deletions br/pkg/lightning/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,28 +744,28 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
return msgs, nil
}

// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
// ConflictedTaskCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
// caller override the Instruction message.
type CDCPITRCheckItem struct {
type ConflictedTaskCheckItem struct {
cfg *config.Config
Instruction string
leaderAddrGetter func() string
// used in test
etcdCli *clientv3.Client
}

// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
func NewCDCPITRCheckItem(cfg *config.Config, leaderAddrGetter func() string) precheck.Checker {
return &CDCPITRCheckItem{
// NewConflictedTaskCheckItem creates a checker to check downstream has enabled CDC or PiTR.
func NewConflictedTaskCheckItem(cfg *config.Config, leaderAddrGetter func() string) precheck.Checker {
return &ConflictedTaskCheckItem{
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
leaderAddrGetter: leaderAddrGetter,
}
}

// GetCheckItemID implements Checker interface.
func (*CDCPITRCheckItem) GetCheckItemID() precheck.CheckItemID {
return precheck.CheckTargetUsingCDCPITR
func (*ConflictedTaskCheckItem) GetCheckItemID() precheck.CheckItemID {
return precheck.CheckTargetConflictTaskRunning
}

func dialEtcdWithCfg(
Expand Down Expand Up @@ -794,7 +794,7 @@ func dialEtcdWithCfg(
}

// Check implements Checker interface.
func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, error) {
func (ci *ConflictedTaskCheckItem) Check(ctx context.Context) (*precheck.CheckResult, error) {
theResult := &precheck.CheckResult{
Item: ci.GetCheckItemID(),
Severity: precheck.Critical,
Expand Down Expand Up @@ -840,6 +840,16 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e
errorMsg = append(errorMsg, nameSet.MessageToUser())
}

otasks, err := utils.GetExportTasksFrom(ctx, ci.etcdCli)
if err != nil {
return nil, errors.Annotate(err, "failed to get import tasks from etcd")
}
// NOTE: perhaps filter out by the type of tasks in the future.
// However for now we don't have the ability of extracting task type from a record of task.
if !otasks.Empty() {
errorMsg = append(errorMsg, otasks.MessageToUser())
}

if len(errorMsg) > 0 {
errorMsg = append(errorMsg, ci.Instruction)
theResult.Passed = false
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
Backend: config.BackendLocal,
},
}
ci := NewCDCPITRCheckItem(cfg, nil)
checker := ci.(*CDCPITRCheckItem)
ci := NewConflictedTaskCheckItem(cfg, nil)
checker := ci.(*ConflictedTaskCheckItem)
checker.etcdCli = testEtcdCluster.RandClient()
result, err := ci.Check(ctx)
s.Require().NoError(err)
Expand Down
52 changes: 26 additions & 26 deletions br/pkg/lightning/precheck/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,37 @@ type CheckItemID string

// CheckItemID constants
const (
CheckLargeDataFile CheckItemID = "CHECK_LARGE_DATA_FILES"
CheckSourcePermission CheckItemID = "CHECK_SOURCE_PERMISSION"
CheckTargetTableEmpty CheckItemID = "CHECK_TARGET_TABLE_EMPTY"
CheckSourceSchemaValid CheckItemID = "CHECK_SOURCE_SCHEMA_VALID"
CheckCheckpoints CheckItemID = "CHECK_CHECKPOINTS"
CheckCSVHeader CheckItemID = "CHECK_CSV_HEADER"
CheckTargetClusterSize CheckItemID = "CHECK_TARGET_CLUSTER_SIZE"
CheckTargetClusterEmptyRegion CheckItemID = "CHECK_TARGET_CLUSTER_EMPTY_REGION"
CheckTargetClusterRegionDist CheckItemID = "CHECK_TARGET_CLUSTER_REGION_DISTRIBUTION"
CheckTargetClusterVersion CheckItemID = "CHECK_TARGET_CLUSTER_VERSION"
CheckLocalDiskPlacement CheckItemID = "CHECK_LOCAL_DISK_PLACEMENT"
CheckLocalTempKVDir CheckItemID = "CHECK_LOCAL_TEMP_KV_DIR"
CheckTargetUsingCDCPITR CheckItemID = "CHECK_TARGET_USING_CDC_PITR"
CheckLargeDataFile CheckItemID = "CHECK_LARGE_DATA_FILES"
CheckSourcePermission CheckItemID = "CHECK_SOURCE_PERMISSION"
CheckTargetTableEmpty CheckItemID = "CHECK_TARGET_TABLE_EMPTY"
CheckSourceSchemaValid CheckItemID = "CHECK_SOURCE_SCHEMA_VALID"
CheckCheckpoints CheckItemID = "CHECK_CHECKPOINTS"
CheckCSVHeader CheckItemID = "CHECK_CSV_HEADER"
CheckTargetClusterSize CheckItemID = "CHECK_TARGET_CLUSTER_SIZE"
CheckTargetClusterEmptyRegion CheckItemID = "CHECK_TARGET_CLUSTER_EMPTY_REGION"
CheckTargetClusterRegionDist CheckItemID = "CHECK_TARGET_CLUSTER_REGION_DISTRIBUTION"
CheckTargetClusterVersion CheckItemID = "CHECK_TARGET_CLUSTER_VERSION"
CheckLocalDiskPlacement CheckItemID = "CHECK_LOCAL_DISK_PLACEMENT"
CheckLocalTempKVDir CheckItemID = "CHECK_LOCAL_TEMP_KV_DIR"
CheckTargetConflictTaskRunning CheckItemID = "CHECK_TARGET_CONFLICT_TASK_RUNNING"
)

var (
// CheckItemIDToDisplayName is a map from CheckItemID to its display name
checkItemIDToDisplayName = map[CheckItemID]string{
CheckLargeDataFile: "Large data file",
CheckSourcePermission: "Source permission",
CheckTargetTableEmpty: "Target table empty",
CheckSourceSchemaValid: "Source schema valid",
CheckCheckpoints: "Checkpoints",
CheckCSVHeader: "CSV header",
CheckTargetClusterSize: "Target cluster size",
CheckTargetClusterEmptyRegion: "Target cluster empty region",
CheckTargetClusterRegionDist: "Target cluster region dist",
CheckTargetClusterVersion: "Target cluster version",
CheckLocalDiskPlacement: "Local disk placement",
CheckLocalTempKVDir: "Local temp KV dir",
CheckTargetUsingCDCPITR: "Target using CDC/PITR",
CheckLargeDataFile: "Large data file",
CheckSourcePermission: "Source permission",
CheckTargetTableEmpty: "Target table empty",
CheckSourceSchemaValid: "Source schema valid",
CheckCheckpoints: "Checkpoints",
CheckCSVHeader: "CSV header",
CheckTargetClusterSize: "Target cluster size",
CheckTargetClusterEmptyRegion: "Target cluster empty region",
CheckTargetClusterRegionDist: "Target cluster region dist",
CheckTargetClusterVersion: "Target cluster version",
CheckLocalDiskPlacement: "Local disk placement",
CheckLocalTempKVDir: "Local temp KV dir",
CheckTargetConflictTaskRunning: "Target is running a task that isn't compatible with importing",
}
)

Expand Down
45 changes: 44 additions & 1 deletion br/pkg/task/backup_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
brpb "github.com/pingcap/kvproto/pkg/brpb"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/storage"
Expand All @@ -39,6 +41,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"

berrors "github.com/pingcap/tidb/br/pkg/errors"
)

const (
Expand Down Expand Up @@ -67,6 +71,38 @@ func DefineBackupEBSFlags(flags *pflag.FlagSet) {
_ = flags.MarkHidden(flagOperatorPausedGCAndSchedulers)
}

// checkConflictingTasksAndRegisterSelf checks whether there are some running importing tasks in the cluster.
// This is necessary because the Ingest raft command relies on the external context beyond the raft log.
// Those context might be lost due to the time order of taking snapshot.
func checkConflictingTasksAndRegisterSelf(ctx context.Context, cfg Config) (func(), error) {
etcdCli, err := dialEtcdWithCfg(ctx, cfg)
if err != nil {
return nil, errors.Annotate(err, "dial etcd failed")
}
defer func() {
_ = etcdCli.Close()
}()
hasImportTask, err := utils.GetImportTasksFrom(ctx, etcdCli)
if err != nil {
return nil, errors.Annotate(err, "get import tasks from etcd failed")
}
if !hasImportTask.Empty() {
return nil, errors.Annotate(berrors.ErrBackupConflicting, hasImportTask.MessageToUser())
}
rg := utils.NewTaskRegister(etcdCli, utils.RegisterSnapshotBackup, uuid.New().String())
if err := rg.RegisterTask(ctx); err != nil {
return nil, errors.Annotate(err, "failed to register the task")
}
cleanUp := func() {
cx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := rg.Close(cx); err != nil {
log.Warn("Failed to deregister the task.", logutil.ShortError(err))
}
}
return cleanUp, nil
}

// RunBackupEBS starts a backup task to backup volume vai EBS snapshot.
func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
cfg.Adjust()
Expand All @@ -87,9 +123,15 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
ctx, cancel := context.WithCancel(c)
defer cancel()

cleanUp, err := checkConflictingTasksAndRegisterSelf(ctx, cfg.Config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking here is improper. We'd better to keep move it to the task of suspend gc and schedule phase. Then we can release as soon as possible.

if err != nil {
return err
}
defer cleanUp()

// receive the volume info from TiDB deployment tools.
backupInfo := &config.EBSBasedBRMeta{}
err := backupInfo.ConfigFromFile(cfg.VolumeFile)
err = backupInfo.ConfigFromFile(cfg.VolumeFile)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -115,6 +157,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
return errors.Trace(err)
}
defer mgr.Close()

client := backup.NewBackupClient(ctx, mgr)

opts := storage.ExternalStorageOptions{
Expand Down
28 changes: 26 additions & 2 deletions br/pkg/utils/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
RegisterRestore RegisterTaskType = iota
RegisterLightning
RegisterImportInto
RegisterSnapshotBackup
)

func (tp RegisterTaskType) String() string {
Expand All @@ -45,6 +46,8 @@ func (tp RegisterTaskType) String() string {
return "lightning"
case RegisterImportInto:
return "import-into"
case RegisterSnapshotBackup:
return "snap_backup"
}
return "default"
}
Expand All @@ -54,6 +57,7 @@ const (
// RegisterImportTaskPrefix is the prefix of the key for task register
// todo: remove "/import" suffix, it's confusing to have a key like "/tidb/brie/import/restore/restore-xxx"
RegisterImportTaskPrefix = "/tidb/brie/import"
RegisterExportTaskPrefix = "/tidb/brie/export"

RegisterRetryInternal = 10 * time.Second
defaultTaskRegisterTTL = 3 * time.Minute // 3 minutes
Expand Down Expand Up @@ -88,13 +92,24 @@ type taskRegister struct {
cancel context.CancelFunc
}

func prefixForTask(tp RegisterTaskType) string {
switch tp {
case RegisterRestore, RegisterLightning, RegisterImportInto:
return RegisterImportTaskPrefix
case RegisterSnapshotBackup:
return RegisterExportTaskPrefix
default:
panic(fmt.Sprintf("unknown task %s", tp))
}
}

// NewTaskRegisterWithTTL build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}
func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp RegisterTaskType, taskName string) TaskRegister {
return &taskRegister{
client: client,
ttl: ttl,
secondTTL: int64(ttl / time.Second),
key: path.Join(RegisterImportTaskPrefix, tp.String(), taskName),
key: path.Join(prefixForTask(tp), tp.String(), taskName),

curLeaseID: clientv3.NoLease,
}
Expand Down Expand Up @@ -307,7 +322,16 @@ func (list RegisterTasksList) Empty() bool {

// GetImportTasksFrom try to get all the import tasks with prefix `RegisterTaskPrefix`
func GetImportTasksFrom(ctx context.Context, client *clientv3.Client) (RegisterTasksList, error) {
resp, err := client.KV.Get(ctx, RegisterImportTaskPrefix, clientv3.WithPrefix())
return getTasksFromWithPrefix(ctx, client, RegisterImportTaskPrefix)
}

// GetExportTasksFrom try to get all the import tasks with prefix `RegisterTaskPrefix`
func GetExportTasksFrom(ctx context.Context, client *clientv3.Client) (RegisterTasksList, error) {
return getTasksFromWithPrefix(ctx, client, RegisterExportTaskPrefix)
}

func getTasksFromWithPrefix(ctx context.Context, client *clientv3.Client, prefix string) (RegisterTasksList, error) {
resp, err := client.KV.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return RegisterTasksList{}, errors.Trace(err)
}
Expand Down