Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ebs br: provide fsr warmup to tikv data volumes #47272

Merged
merged 18 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 147 additions & 1 deletion br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,152 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) {
log.Info("delete snapshot end", zap.Int("need-to-del", len(snapIDMap)), zap.Int32("deleted", deletedCnt.Load()))
}

// EnableDataFSR enables FSR for data volume snapshots
func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string) (map[string][]*string, error) {
snapshotsIDsMap := fetchTargetSnapshots(meta, targetAZ)

if len(snapshotsIDsMap) == 0 {
return snapshotsIDsMap, errors.Errorf("empty backup meta")
}

eg, _ := errgroup.WithContext(context.Background())

for availableZone := range snapshotsIDsMap {
targetAZ := availableZone
eg.Go(func() error {
log.Info("enable fsr for snapshots", zap.String("available zone", targetAZ))
resp, err := e.ec2.EnableFastSnapshotRestores(&ec2.EnableFastSnapshotRestoresInput{
AvailabilityZones: []*string{&targetAZ},
SourceSnapshotIds: snapshotsIDsMap[targetAZ],
})

if err != nil {
return errors.Trace(err)
}

if len(resp.Unsuccessful) > 0 {
log.Warn("not all snapshots enabled FSR")
return errors.Errorf("Some snapshot fails to enable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
}

return e.waitDataFSREnabled(snapshotsIDsMap[targetAZ], targetAZ)
})
}
return snapshotsIDsMap, eg.Wait()
}

// waitDataFSREnabled waits FSR for data volume snapshots are all enabled
func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error {
// Create a map to store the strings as keys
pendingSnapshots := make(map[string]struct{})

// Populate the map with the strings from the array
for _, str := range snapShotIDs {
pendingSnapshots[*str] = struct{}{}
}

log.Info("starts check fsr pending snapshots", zap.Any("snapshots", pendingSnapshots), zap.String("available zone", targetAZ))
for {
if len(pendingSnapshots) == 0 {
log.Info("all snapshots fsr enablement is finished", zap.String("available zone", targetAZ))
return nil
}

// check pending snapshots every 1 minute
time.Sleep(1 * time.Minute)
log.Info("check snapshots not fsr enabled", zap.Int("count", len(pendingSnapshots)))
input := &ec2.DescribeFastSnapshotRestoresInput{
Filters: []*ec2.Filter{
{
BornChanger marked this conversation as resolved.
Show resolved Hide resolved
Name: aws.String("state"),
Values: []*string{aws.String("disabled"), aws.String("disabling"), aws.String("enabling"), aws.String("optimizing")},
},
{
Name: aws.String("availability-zone"),
Values: []*string{aws.String(targetAZ)},
},
},
}

result, err := e.ec2.DescribeFastSnapshotRestores(input)
if err != nil {
return errors.Trace(err)
}

uncompletedSnapshots := make(map[string]struct{})
for _, fastRestore := range result.FastSnapshotRestores {
_, found := pendingSnapshots[*fastRestore.SnapshotId]
if found {
// Detect some conflict states
if strings.EqualFold(*fastRestore.State, "disabled") || strings.EqualFold(*fastRestore.State, "disabling") {
log.Error("detect conflict status", zap.String("snapshot", *fastRestore.SnapshotId), zap.String("status", *fastRestore.State))
return errors.Errorf("status of snapshot %s is %s ", *fastRestore.SnapshotId, *fastRestore.State)
}
uncompletedSnapshots[*fastRestore.SnapshotId] = struct{}{}
}
}
pendingSnapshots = uncompletedSnapshots
}
}

// DisableDataFSR disables FSR for data volume snapshots
func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error {
if len(snapshotsIDsMap) == 0 {
return nil
}

eg, _ := errgroup.WithContext(context.Background())

for availableZone := range snapshotsIDsMap {
targetAZ := availableZone
eg.Go(func() error {
resp, err := e.ec2.DisableFastSnapshotRestores(&ec2.DisableFastSnapshotRestoresInput{
AvailabilityZones: []*string{&targetAZ},
SourceSnapshotIds: snapshotsIDsMap[targetAZ],
})

if err != nil {
return errors.Trace(err)
}

if len(resp.Unsuccessful) > 0 {
log.Warn("not all snapshots disabled FSR", zap.String("available zone", targetAZ))
return errors.Errorf("Some snapshot fails to disable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
}

log.Info("Disable FSR issued", zap.String("available zone", targetAZ))

return nil
})
}
return eg.Wait()
}

func fetchTargetSnapshots(meta *config.EBSBasedBRMeta, specifiedAZ string) map[string][]*string {
var sourceSnapshotIDs = make(map[string][]*string)

if len(meta.TiKVComponent.Stores) == 0 {
return sourceSnapshotIDs
}

for i := range meta.TiKVComponent.Stores {
store := meta.TiKVComponent.Stores[i]
for j := range store.Volumes {
oldVol := store.Volumes[j]
// Handle data volume snapshots only
if strings.Compare(oldVol.Type, "storage.data-dir") == 0 {
if specifiedAZ != "" {
sourceSnapshotIDs[specifiedAZ] = append(sourceSnapshotIDs[specifiedAZ], &oldVol.SnapshotID)
} else {
sourceSnapshotIDs[oldVol.VolumeAZ] = append(sourceSnapshotIDs[oldVol.VolumeAZ], &oldVol.SnapshotID)
}
}
}
}

return sourceSnapshotIDs
}

// CreateVolumes create volumes from snapshots
// if err happens in the middle, return half-done result
// returned map: store id -> old volume id -> new volume id
Expand Down Expand Up @@ -377,7 +523,7 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress
for len(pendingVolumes) > 0 {
// check every 5 seconds
time.Sleep(5 * time.Second)
log.Info("check pending snapshots", zap.Int("count", len(pendingVolumes)))
log.Info("check pending volumes", zap.Int("count", len(pendingVolumes)))
resp, err := e.ec2.DescribeVolumes(&ec2.DescribeVolumesInput{
VolumeIds: pendingVolumes,
})
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/config/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ func (c *EBSBasedBRMeta) GetStoreCount() uint64 {
return uint64(len(c.TiKVComponent.Stores))
}

func (c *EBSBasedBRMeta) GetTiKVVolumeCount() uint64 {
if c.TiKVComponent == nil || len(c.TiKVComponent.Stores) == 0 {
return 0
}
// Assume TiKV nodes are symmetric
return uint64(len(c.TiKVComponent.Stores[0].Volumes))
}

func (c *EBSBasedBRMeta) String() string {
cfg, err := json.Marshal(c)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const (
flagDryRun = "dry-run"
// TODO used for local test, should be removed later
flagSkipAWS = "skip-aws"
flagUseFSR = "use-fsr"
flagCloudAPIConcurrency = "cloud-api-concurrency"
flagWithSysTable = "with-sys-table"
flagOperatorPausedGCAndSchedulers = "operator-paused-gc-and-scheduler"
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
"batch size for ddl to create a batch of tables once.")
flags.Bool(flagWithSysTable, false, "whether restore system privilege tables on default setting")
flags.StringArrayP(FlagResetSysUsers, "", []string{"cloud_admin", "root"}, "whether reset these users after restoration")
flags.Bool(flagUseFSR, false, "whether enable FSR for AWS snapshots")
_ = flags.MarkHidden(FlagResetSysUsers)
_ = flags.MarkHidden(FlagMergeRegionSizeBytes)
_ = flags.MarkHidden(FlagMergeRegionKeyCount)
Expand Down Expand Up @@ -218,6 +219,7 @@ type RestoreConfig struct {
VolumeThroughput int64 `json:"volume-throughput" toml:"volume-throughput"`
ProgressFile string `json:"progress-file" toml:"progress-file"`
TargetAZ string `json:"target-az" toml:"target-az"`
UseFSR bool `json:"use-fsr" toml:"use-fsr"`
}

// DefineRestoreFlags defines common flags for the restore tidb command.
Expand Down Expand Up @@ -391,6 +393,11 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

cfg.UseFSR, err = flags.GetBool(flagUseFSR)
if err != nil {
return errors.Trace(err)
}

// iops: gp3 [3,000-16,000]; io1/io2 [100-32,000]
// throughput: gp3 [125, 1000]; io1/io2 cannot set throughput
// io1 and io2 volumes support up to 64,000 IOPS only on Instances built on the Nitro System.
Expand Down
10 changes: 2 additions & 8 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,17 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto

//TODO: restore volume type into origin type
//ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) by backupmeta
// this is used for cloud restoration

err = client.Init(g, mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
defer client.Close()
log.Info("start to clear system user for cloud")
err = client.ClearSystemUsers(ctx, cfg.ResetSysUsers)

if err != nil {
return errors.Trace(err)
}

// since we cannot reset tiflash automaticlly. so we should start it manually
if err = client.ResetTiFlashReplicas(ctx, g, mgr.GetStorage()); err != nil {
return errors.Trace(err)
}

progress.Close()
summary.CollectDuration("restore duration", time.Since(startAll))
summary.SetSuccessStatus(true)
Expand Down
22 changes: 19 additions & 3 deletions br/pkg/task/restore_ebs_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ func (h *restoreEBSMetaHelper) restore() error {
return errors.Trace(err)
}

storeCount := h.metaInfo.GetStoreCount()
progress := h.g.StartProgress(ctx, h.cmdName, int64(storeCount), !h.cfg.LogProgress)
volumeCount := h.metaInfo.GetStoreCount() * h.metaInfo.GetTiKVVolumeCount()
progress := h.g.StartProgress(ctx, h.cmdName, int64(volumeCount), !h.cfg.LogProgress)
defer progress.Close()
go progressFileWriterRoutine(ctx, progress, int64(storeCount), h.cfg.ProgressFile)
go progressFileWriterRoutine(ctx, progress, int64(volumeCount), h.cfg.ProgressFile)

resolvedTs = h.metaInfo.ClusterInfo.ResolvedTS
if totalSize, err = h.doRestore(ctx, progress); err != nil {
Expand Down Expand Up @@ -226,6 +226,8 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin
volumeIDMap = make(map[string]string)
err error
totalSize int64
// a map whose key is available zone, and value is the snapshot id array
snapshotsIDsMap = make(map[string][]*string)
)
ec2Session, err = aws.NewEC2Session(h.cfg.CloudAPIConcurrency, h.cfg.S3.Region)
if err != nil {
Expand All @@ -236,7 +238,21 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin
log.Error("failed to create all volumes, cleaning up created volume")
ec2Session.DeleteVolumes(volumeIDMap)
}

if h.cfg.UseFSR {
err = ec2Session.DisableDataFSR(snapshotsIDsMap)
BornChanger marked this conversation as resolved.
Show resolved Hide resolved
log.Error("disable fsr failed", zap.Error(err))
}
}()

// Turn on FSR for TiKV data snapshots
if h.cfg.UseFSR {
snapshotsIDsMap, err = ec2Session.EnableDataFSR(h.metaInfo, h.cfg.TargetAZ)
if err != nil {
return nil, 0, errors.Trace(err)
}
}
BornChanger marked this conversation as resolved.
Show resolved Hide resolved

volumeIDMap, err = ec2Session.CreateVolumes(h.metaInfo,
string(h.cfg.VolumeType), h.cfg.VolumeIOPS, h.cfg.VolumeThroughput, h.cfg.TargetAZ)
if err != nil {
Expand Down
Loading