From 936228e9449a257ba42fc0c44bbd20818f8fba5e Mon Sep 17 00:00:00 2001 From: BornChanger <97348524+BornChanger@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:59:40 +0800 Subject: [PATCH] This is an automated cherry-pick of #48627 Signed-off-by: ti-chi-bot --- br/pkg/aws/BUILD.bazel | 1 + br/pkg/aws/ebs.go | 288 +++++++++++++++++++++++++++++++- br/pkg/aws/ebs_test.go | 2 +- br/pkg/task/restore_ebs_meta.go | 2 +- 4 files changed, 285 insertions(+), 8 deletions(-) diff --git a/br/pkg/aws/BUILD.bazel b/br/pkg/aws/BUILD.bazel index 2b70183655569..3290cdb864759 100644 --- a/br/pkg/aws/BUILD.bazel +++ b/br/pkg/aws/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "@com_github_aws_aws_sdk_go//aws", "@com_github_aws_aws_sdk_go//aws/awserr", "@com_github_aws_aws_sdk_go//aws/session", + "@com_github_aws_aws_sdk_go//service/cloudwatch", "@com_github_aws_aws_sdk_go//service/ec2", "@com_github_aws_aws_sdk_go//service/ec2/ec2iface", "@com_github_pingcap_errors//:errors", diff --git a/br/pkg/aws/ebs.go b/br/pkg/aws/ebs.go index ddea6b358f556..61b86083ee8c4 100644 --- a/br/pkg/aws/ebs.go +++ b/br/pkg/aws/ebs.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/pingcap/errors" @@ -30,7 +31,8 @@ const ( ) type EC2Session struct { - ec2 ec2iface.EC2API + ec2 ec2iface.EC2API + cloudwatchClient *cloudwatch.CloudWatch // aws operation concurrency concurrency uint } @@ -50,7 +52,8 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) { return nil, errors.Trace(err) } ec2Session := ec2.New(sess) - return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil + cloudwatchClient := cloudwatch.New(sess) + return &EC2Session{ec2: ec2Session, cloudwatchClient: cloudwatchClient, concurrency: concurrency}, nil } // CreateSnapshots is the mainly steps to control the data volume snapshots. @@ -281,6 +284,271 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) { log.Info("delete snapshot end", zap.Int("need-to-del", len(snapIDMap)), zap.Int32("deleted", deletedCnt.Load())) } +<<<<<<< HEAD +======= +// EnableDataFSR enables FSR for data volume snapshots +func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string) (map[string][]*string, error) { + snapshotsIDsMap := fetchTargetSnapshots(meta, targetAZ) + + if len(snapshotsIDsMap) == 0 { + return snapshotsIDsMap, errors.Errorf("empty backup meta") + } + + eg, _ := errgroup.WithContext(context.Background()) + + for availableZone := range snapshotsIDsMap { + targetAZ := availableZone + // We have to control the batch size to avoid the error of "parameter SourceSnapshotIds must be less than or equal to 10" + for i := 0; i < len(snapshotsIDsMap[targetAZ]); i += FsrApiSnapshotsThreshold { + start := i + end := i + FsrApiSnapshotsThreshold + if end > len(snapshotsIDsMap[targetAZ]) { + end = len(snapshotsIDsMap[targetAZ]) + } + eg.Go(func() error { + log.Info("enable fsr for snapshots", zap.String("available zone", targetAZ), zap.Any("snapshots", snapshotsIDsMap[targetAZ][start:end])) + resp, err := e.ec2.EnableFastSnapshotRestores(&ec2.EnableFastSnapshotRestoresInput{ + AvailabilityZones: []*string{&targetAZ}, + SourceSnapshotIds: snapshotsIDsMap[targetAZ][start:end], + }) + + if err != nil { + return errors.Trace(err) + } + + if len(resp.Unsuccessful) > 0 { + log.Warn("not all snapshots enabled FSR") + return errors.Errorf("Some snapshot fails to enable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors) + } + + return e.waitDataFSREnabled(snapshotsIDsMap[targetAZ][start:end], targetAZ) + }) + } + } + return snapshotsIDsMap, eg.Wait() +} + +// waitDataFSREnabled waits FSR for data volume snapshots are all enabled and also have enough credit balance +func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error { + // Record current time + start := time.Now() + + // get the maximum size of volumes, in GiB + var maxVolumeSize int64 = 0 + resp, err := e.ec2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{SnapshotIds: snapShotIDs}) + if err != nil { + return errors.Trace(err) + } + if len(resp.Snapshots) <= 0 { + return errors.Errorf("specified snapshot [%s] is not found", *snapShotIDs[0]) + } + + for _, s := range resp.Snapshots { + if *s.VolumeSize > maxVolumeSize { + maxVolumeSize = *s.VolumeSize + } + } + + // Calculate the time in minutes to fill 1.0 credit according to + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-fast-snapshot-restore.html#volume-creation-credits + // 5 minutes more is just for safe + fillElapsedTime := 60.0/(min(10, 1024.0/(float64)(maxVolumeSize))) + 5 + + // We have to sleep for at least fillElapsedTime minutes in order to make credits are filled to 1.0 + // Let's heartbeat every 5 minutes + for time.Since(start) <= time.Duration(fillElapsedTime)*time.Minute { + log.Info("FSR enablement is ongoing, going to sleep for 5 minutes...") + time.Sleep(5 * time.Minute) + } + + // Wait that all snapshot has enough fsr credit balance, it's very likely true since we have wait for long enough + log.Info("Start check and wait all snapshots have enough fsr credit balance") + + startIdx := 0 + retryCount := 0 + for startIdx < len(snapShotIDs) { + creditBalance, _ := e.getFSRCreditBalance(snapShotIDs[startIdx], targetAZ) + if creditBalance != nil && *creditBalance >= 1.0 { + startIdx++ + retryCount = 0 + } else { + if creditBalance == nil { + // For invalid calling, retry 3 times + if retryCount >= 3 { + return errors.Errorf("cloudwatch metrics for %s operation failed after retrying", *snapShotIDs[startIdx]) + } + retryCount++ + } + // Retry for both invalid calling and not enough fsr credit + // Cloudwatch by default flushes every 5 seconds. So, 20 seconds wait should be enough + time.Sleep(20 * time.Second) + } + } + + // Create a map to store the strings as keys + pendingSnapshots := make(map[string]struct{}) + + // Populate the map with the strings from the array + for _, str := range snapShotIDs { + pendingSnapshots[*str] = struct{}{} + } + + log.Info("starts check fsr pending snapshots", zap.Any("snapshots", pendingSnapshots), zap.String("available zone", targetAZ)) + for { + if len(pendingSnapshots) == 0 { + log.Info("all snapshots in current batch fsr enablement is finished", zap.String("available zone", targetAZ), zap.Any("snapshots", snapShotIDs)) + return nil + } + + // check pending snapshots every 1 minute + time.Sleep(1 * time.Minute) + log.Info("check snapshots not fsr enabled", zap.Int("count", len(pendingSnapshots))) + input := &ec2.DescribeFastSnapshotRestoresInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("state"), + Values: []*string{aws.String("disabled"), aws.String("disabling"), aws.String("enabling"), aws.String("optimizing")}, + }, + { + Name: aws.String("availability-zone"), + Values: []*string{aws.String(targetAZ)}, + }, + }, + } + + result, err := e.ec2.DescribeFastSnapshotRestores(input) + if err != nil { + return errors.Trace(err) + } + + uncompletedSnapshots := make(map[string]struct{}) + for _, fastRestore := range result.FastSnapshotRestores { + _, found := pendingSnapshots[*fastRestore.SnapshotId] + if found { + // Detect some conflict states + if strings.EqualFold(*fastRestore.State, "disabled") || strings.EqualFold(*fastRestore.State, "disabling") { + log.Error("detect conflict status", zap.String("snapshot", *fastRestore.SnapshotId), zap.String("status", *fastRestore.State)) + return errors.Errorf("status of snapshot %s is %s ", *fastRestore.SnapshotId, *fastRestore.State) + } + uncompletedSnapshots[*fastRestore.SnapshotId] = struct{}{} + } + } + pendingSnapshots = uncompletedSnapshots + } +} + +// getFSRCreditBalance is used to get maximum fsr credit balance of snapshot for last 5 minutes +func (e *EC2Session) getFSRCreditBalance(snapshotID *string, targetAZ string) (*float64, error) { + // Set the time range to query for metrics + startTime := time.Now().Add(-5 * time.Minute) + endTime := time.Now() + + // Prepare the input for the GetMetricStatisticsWithContext API call + input := &cloudwatch.GetMetricStatisticsInput{ + StartTime: aws.Time(startTime), + EndTime: aws.Time(endTime), + Namespace: aws.String("AWS/EBS"), + MetricName: aws.String("FastSnapshotRestoreCreditsBalance"), + Dimensions: []*cloudwatch.Dimension{ + { + Name: aws.String("SnapshotId"), + Value: snapshotID, + }, + { + Name: aws.String("AvailabilityZone"), + Value: aws.String(targetAZ), + }, + }, + Period: aws.Int64(300), + Statistics: []*string{aws.String("Maximum")}, + } + + log.Info("metrics input", zap.Any("input", input)) + + // Call cloudwatchClient API to retrieve the FastSnapshotRestoreCreditsBalance metric data + resp, err := e.cloudwatchClient.GetMetricStatisticsWithContext(context.Background(), input) + if err != nil { + log.Error("GetMetricStatisticsWithContext failed", zap.Error(err)) + return nil, errors.Trace(err) + } + + // parse the response + if len(resp.Datapoints) == 0 { + log.Warn("No result for metric FastSnapshotRestoreCreditsBalance returned", zap.Stringp("snapshot", snapshotID)) + return nil, nil + } + result := resp.Datapoints[0] + log.Info("credit balance", zap.Stringp("snapshot", snapshotID), zap.Float64p("credit", result.Maximum)) + return result.Maximum, nil +} + +// DisableDataFSR disables FSR for data volume snapshots +func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error { + if len(snapshotsIDsMap) == 0 { + return nil + } + + eg, _ := errgroup.WithContext(context.Background()) + + for availableZone := range snapshotsIDsMap { + targetAZ := availableZone + // We have to control the batch size to avoid the error of "parameter SourceSnapshotIds must be less than or equal to 10" + for i := 0; i < len(snapshotsIDsMap[targetAZ]); i += FsrApiSnapshotsThreshold { + start := i + end := i + FsrApiSnapshotsThreshold + if end > len(snapshotsIDsMap[targetAZ]) { + end = len(snapshotsIDsMap[targetAZ]) + } + eg.Go(func() error { + resp, err := e.ec2.DisableFastSnapshotRestores(&ec2.DisableFastSnapshotRestoresInput{ + AvailabilityZones: []*string{&targetAZ}, + SourceSnapshotIds: snapshotsIDsMap[targetAZ][start:end], + }) + + if err != nil { + return errors.Trace(err) + } + + if len(resp.Unsuccessful) > 0 { + log.Warn("not all snapshots disabled FSR", zap.String("available zone", targetAZ)) + return errors.Errorf("Some snapshot fails to disable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors) + } + + log.Info("Disable FSR issued", zap.String("available zone", targetAZ), zap.Any("snapshots", snapshotsIDsMap[targetAZ][start:end])) + + return nil + }) + } + } + return eg.Wait() +} + +func fetchTargetSnapshots(meta *config.EBSBasedBRMeta, specifiedAZ string) map[string][]*string { + var sourceSnapshotIDs = make(map[string][]*string) + + if len(meta.TiKVComponent.Stores) == 0 { + return sourceSnapshotIDs + } + + for i := range meta.TiKVComponent.Stores { + store := meta.TiKVComponent.Stores[i] + for j := range store.Volumes { + oldVol := store.Volumes[j] + // Handle data volume snapshots only + if strings.Compare(oldVol.Type, "storage.data-dir") == 0 { + if specifiedAZ != "" { + sourceSnapshotIDs[specifiedAZ] = append(sourceSnapshotIDs[specifiedAZ], &oldVol.SnapshotID) + } else { + sourceSnapshotIDs[oldVol.VolumeAZ] = append(sourceSnapshotIDs[oldVol.VolumeAZ], &oldVol.SnapshotID) + } + } + } + } + + return sourceSnapshotIDs +} + +>>>>>>> 711e95ff460 (ebs br: make sure fsr credit is full filled (#48627)) // CreateVolumes create volumes from snapshots // if err happens in the middle, return half-done result // returned map: store id -> old volume id -> new volume id @@ -365,7 +633,7 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin return newVolumeIDMap, eg.Wait() } -func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress) (int64, error) { +func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress, fsrEnabledRequired bool) (int64, error) { pendingVolumes := make([]*string, 0, len(volumeIDMap)) for oldVolID := range volumeIDMap { newVolumeID := volumeIDMap[oldVolID] @@ -385,7 +653,11 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress return 0, errors.Trace(err) } - createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(resp) + createdVolumeSize, unfinishedVolumes, err := e.HandleDescribeVolumesResponse(resp, fsrEnabledRequired) + if err != nil { + return 0, errors.Trace(err) + } + progress.IncBy(int64(len(pendingVolumes) - len(unfinishedVolumes))) totalVolumeSize += createdVolumeSize pendingVolumes = unfinishedVolumes @@ -428,12 +700,16 @@ func ec2Tag(key, val string) *ec2.Tag { return &ec2.Tag{Key: &key, Value: &val} } -func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput) (int64, []*string) { +func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput, fsrEnabledRequired bool) (int64, []*string, error) { totalVolumeSize := int64(0) var unfinishedVolumes []*string for _, volume := range resp.Volumes { if *volume.State == ec2.VolumeStateAvailable { + if fsrEnabledRequired && volume.FastRestored != nil && !*volume.FastRestored { + log.Error("snapshot fsr is not enabled for the volume", zap.String("volume", *volume.SnapshotId)) + return 0, nil, errors.Errorf("Snapshot [%s] of volume [%s] is not fsr enabled", *volume.SnapshotId, *volume.VolumeId) + } log.Info("volume is available", zap.String("id", *volume.VolumeId)) totalVolumeSize += *volume.Size } else { @@ -442,5 +718,5 @@ func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutp } } - return totalVolumeSize, unfinishedVolumes + return totalVolumeSize, unfinishedVolumes, nil } diff --git a/br/pkg/aws/ebs_test.go b/br/pkg/aws/ebs_test.go index d7f3be2a4a4a1..e55ea68c86e04 100644 --- a/br/pkg/aws/ebs_test.go +++ b/br/pkg/aws/ebs_test.go @@ -72,7 +72,7 @@ func TestHandleDescribeVolumesResponse(t *testing.T) { } e := &EC2Session{} - createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(curentVolumesStates) + createdVolumeSize, unfinishedVolumes, _ := e.HandleDescribeVolumesResponse(curentVolumesStates, false) require.Equal(t, int64(4), createdVolumeSize) require.Equal(t, 1, len(unfinishedVolumes)) } diff --git a/br/pkg/task/restore_ebs_meta.go b/br/pkg/task/restore_ebs_meta.go index 53286505b5b9c..37af06b888e7b 100644 --- a/br/pkg/task/restore_ebs_meta.go +++ b/br/pkg/task/restore_ebs_meta.go @@ -242,7 +242,7 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin if err != nil { return nil, 0, errors.Trace(err) } - totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress) + totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress, h.cfg.UseFSR) if err != nil { return nil, 0, errors.Trace(err) }