diff --git a/br/pkg/aws/BUILD.bazel b/br/pkg/aws/BUILD.bazel index 2b70183655569..3290cdb864759 100644 --- a/br/pkg/aws/BUILD.bazel +++ b/br/pkg/aws/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "@com_github_aws_aws_sdk_go//aws", "@com_github_aws_aws_sdk_go//aws/awserr", "@com_github_aws_aws_sdk_go//aws/session", + "@com_github_aws_aws_sdk_go//service/cloudwatch", "@com_github_aws_aws_sdk_go//service/ec2", "@com_github_aws_aws_sdk_go//service/ec2/ec2iface", "@com_github_pingcap_errors//:errors", diff --git a/br/pkg/aws/ebs.go b/br/pkg/aws/ebs.go index b53248cb41c01..2e0cfa36f6e65 100644 --- a/br/pkg/aws/ebs.go +++ b/br/pkg/aws/ebs.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/pingcap/errors" @@ -31,7 +32,8 @@ const ( ) type EC2Session struct { - ec2 ec2iface.EC2API + ec2 ec2iface.EC2API + cloudwatchClient *cloudwatch.CloudWatch // aws operation concurrency concurrency uint } @@ -51,7 +53,8 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) { return nil, errors.Trace(err) } ec2Session := ec2.New(sess) - return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil + cloudwatchClient := cloudwatch.New(sess) + return &EC2Session{ec2: ec2Session, cloudwatchClient: cloudwatchClient, concurrency: concurrency}, nil } // CreateSnapshots is the mainly steps to control the data volume snapshots. @@ -324,8 +327,63 @@ func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string) return snapshotsIDsMap, eg.Wait() } -// waitDataFSREnabled waits FSR for data volume snapshots are all enabled +// waitDataFSREnabled waits FSR for data volume snapshots are all enabled and also have enough credit balance func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error { + // Record current time + start := time.Now() + + // get the maximum size of volumes, in GiB + var maxVolumeSize int64 = 0 + resp, err := e.ec2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{SnapshotIds: snapShotIDs}) + if err != nil { + return errors.Trace(err) + } + if len(resp.Snapshots) <= 0 { + return errors.Errorf("specified snapshot [%s] is not found", *snapShotIDs[0]) + } + + for _, s := range resp.Snapshots { + if *s.VolumeSize > maxVolumeSize { + maxVolumeSize = *s.VolumeSize + } + } + + // Calculate the time in minutes to fill 1.0 credit according to + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-fast-snapshot-restore.html#volume-creation-credits + // 5 minutes more is just for safe + fillElapsedTime := 60.0/(min(10, 1024.0/(float64)(maxVolumeSize))) + 5 + + // We have to sleep for at least fillElapsedTime minutes in order to make credits are filled to 1.0 + // Let's heartbeat every 5 minutes + for time.Since(start) <= time.Duration(fillElapsedTime)*time.Minute { + log.Info("FSR enablement is ongoing, going to sleep for 5 minutes...") + time.Sleep(5 * time.Minute) + } + + // Wait that all snapshot has enough fsr credit balance, it's very likely true since we have wait for long enough + log.Info("Start check and wait all snapshots have enough fsr credit balance") + + startIdx := 0 + retryCount := 0 + for startIdx < len(snapShotIDs) { + creditBalance, _ := e.getFSRCreditBalance(snapShotIDs[startIdx], targetAZ) + if creditBalance != nil && *creditBalance >= 1.0 { + startIdx++ + retryCount = 0 + } else { + if creditBalance == nil { + // For invalid calling, retry 3 times + if retryCount >= 3 { + return errors.Errorf("cloudwatch metrics for %s operation failed after retrying", *snapShotIDs[startIdx]) + } + retryCount++ + } + // Retry for both invalid calling and not enough fsr credit + // Cloudwatch by default flushes every 5 seconds. So, 20 seconds wait should be enough + time.Sleep(20 * time.Second) + } + } + // Create a map to store the strings as keys pendingSnapshots := make(map[string]struct{}) @@ -378,6 +436,51 @@ func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) } } +// getFSRCreditBalance is used to get maximum fsr credit balance of snapshot for last 5 minutes +func (e *EC2Session) getFSRCreditBalance(snapshotID *string, targetAZ string) (*float64, error) { + // Set the time range to query for metrics + startTime := time.Now().Add(-5 * time.Minute) + endTime := time.Now() + + // Prepare the input for the GetMetricStatisticsWithContext API call + input := &cloudwatch.GetMetricStatisticsInput{ + StartTime: aws.Time(startTime), + EndTime: aws.Time(endTime), + Namespace: aws.String("AWS/EBS"), + MetricName: aws.String("FastSnapshotRestoreCreditsBalance"), + Dimensions: []*cloudwatch.Dimension{ + { + Name: aws.String("SnapshotId"), + Value: snapshotID, + }, + { + Name: aws.String("AvailabilityZone"), + Value: aws.String(targetAZ), + }, + }, + Period: aws.Int64(300), + Statistics: []*string{aws.String("Maximum")}, + } + + log.Info("metrics input", zap.Any("input", input)) + + // Call cloudwatchClient API to retrieve the FastSnapshotRestoreCreditsBalance metric data + resp, err := e.cloudwatchClient.GetMetricStatisticsWithContext(context.Background(), input) + if err != nil { + log.Error("GetMetricStatisticsWithContext failed", zap.Error(err)) + return nil, errors.Trace(err) + } + + // parse the response + if len(resp.Datapoints) == 0 { + log.Warn("No result for metric FastSnapshotRestoreCreditsBalance returned", zap.Stringp("snapshot", snapshotID)) + return nil, nil + } + result := resp.Datapoints[0] + log.Info("credit balance", zap.Stringp("snapshot", snapshotID), zap.Float64p("credit", result.Maximum)) + return result.Maximum, nil +} + // DisableDataFSR disables FSR for data volume snapshots func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error { if len(snapshotsIDsMap) == 0 { @@ -528,7 +631,7 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin return newVolumeIDMap, eg.Wait() } -func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress) (int64, error) { +func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress, fsrEnabledRequired bool) (int64, error) { pendingVolumes := make([]*string, 0, len(volumeIDMap)) for oldVolID := range volumeIDMap { newVolumeID := volumeIDMap[oldVolID] @@ -548,7 +651,11 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress return 0, errors.Trace(err) } - createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(resp) + createdVolumeSize, unfinishedVolumes, err := e.HandleDescribeVolumesResponse(resp, fsrEnabledRequired) + if err != nil { + return 0, errors.Trace(err) + } + progress.IncBy(int64(len(pendingVolumes) - len(unfinishedVolumes))) totalVolumeSize += createdVolumeSize pendingVolumes = unfinishedVolumes @@ -591,12 +698,16 @@ func ec2Tag(key, val string) *ec2.Tag { return &ec2.Tag{Key: &key, Value: &val} } -func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput) (int64, []*string) { +func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput, fsrEnabledRequired bool) (int64, []*string, error) { totalVolumeSize := int64(0) var unfinishedVolumes []*string for _, volume := range resp.Volumes { if *volume.State == ec2.VolumeStateAvailable { + if fsrEnabledRequired && volume.FastRestored != nil && !*volume.FastRestored { + log.Error("snapshot fsr is not enabled for the volume", zap.String("volume", *volume.SnapshotId)) + return 0, nil, errors.Errorf("Snapshot [%s] of volume [%s] is not fsr enabled", *volume.SnapshotId, *volume.VolumeId) + } log.Info("volume is available", zap.String("id", *volume.VolumeId)) totalVolumeSize += *volume.Size } else { @@ -605,5 +716,5 @@ func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutp } } - return totalVolumeSize, unfinishedVolumes + return totalVolumeSize, unfinishedVolumes, nil } diff --git a/br/pkg/aws/ebs_test.go b/br/pkg/aws/ebs_test.go index d7f3be2a4a4a1..e55ea68c86e04 100644 --- a/br/pkg/aws/ebs_test.go +++ b/br/pkg/aws/ebs_test.go @@ -72,7 +72,7 @@ func TestHandleDescribeVolumesResponse(t *testing.T) { } e := &EC2Session{} - createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(curentVolumesStates) + createdVolumeSize, unfinishedVolumes, _ := e.HandleDescribeVolumesResponse(curentVolumesStates, false) require.Equal(t, int64(4), createdVolumeSize) require.Equal(t, 1, len(unfinishedVolumes)) } diff --git a/br/pkg/task/restore_ebs_meta.go b/br/pkg/task/restore_ebs_meta.go index 8c87aa669ef5d..cbb5d509f6133 100644 --- a/br/pkg/task/restore_ebs_meta.go +++ b/br/pkg/task/restore_ebs_meta.go @@ -260,7 +260,7 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin if err != nil { return nil, 0, errors.Trace(err) } - totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress) + totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress, h.cfg.UseFSR) if err != nil { return nil, 0, errors.Trace(err) }