Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ebs br: make sure fsr credit is full filled #48627

Merged
merged 14 commits into from
Nov 21, 2023
1 change: 1 addition & 0 deletions br/pkg/aws/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"@com_github_aws_aws_sdk_go//aws",
"@com_github_aws_aws_sdk_go//aws/awserr",
"@com_github_aws_aws_sdk_go//aws/session",
"@com_github_aws_aws_sdk_go//service/cloudwatch",
"@com_github_aws_aws_sdk_go//service/ec2",
"@com_github_aws_aws_sdk_go//service/ec2/ec2iface",
"@com_github_pingcap_errors//:errors",
Expand Down
125 changes: 118 additions & 7 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/pingcap/errors"
Expand All @@ -31,7 +32,8 @@ const (
)

type EC2Session struct {
ec2 ec2iface.EC2API
ec2 ec2iface.EC2API
cloudwatchClient *cloudwatch.CloudWatch
// aws operation concurrency
concurrency uint
}
Expand All @@ -51,7 +53,8 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
return nil, errors.Trace(err)
}
ec2Session := ec2.New(sess)
return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil
cloudwatchClient := cloudwatch.New(sess)
return &EC2Session{ec2: ec2Session, cloudwatchClient: cloudwatchClient, concurrency: concurrency}, nil
}

// CreateSnapshots is the mainly steps to control the data volume snapshots.
Expand Down Expand Up @@ -324,8 +327,64 @@ func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string)
return snapshotsIDsMap, eg.Wait()
}

// waitDataFSREnabled waits FSR for data volume snapshots are all enabled
// waitDataFSREnabled waits FSR for data volume snapshots are all enabled and also have enough credit balance
func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error {
// Record current time
start := time.Now()

// get the maximum size of volumes, in GiB
var maxVolumeSize int64 = 0
resp, err := e.ec2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{SnapshotIds: snapShotIDs})
if err != nil {
return errors.Trace(err)
}
if len(resp.Snapshots) <= 0 {
return errors.Errorf("specified snapshot [%s] is not found", *snapShotIDs[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe need check the length of snapShotIDs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller has checked the empty set.

}

for _, s := range resp.Snapshots {
if *s.VolumeSize > maxVolumeSize {
maxVolumeSize = *s.VolumeSize
}
}

// Calculate the time in minutes to fill 1.0 credit according to
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-fast-snapshot-restore.html#volume-creation-credits
// 5 minutes more is just for safe
fillElapsedTime := 60.0/(min(10, 1024.0/(float64)(maxVolumeSize))) + 5

// We have to sleep for at least fillElapsedTime minutes in order to make credits are filled to 1.0
// Let's heartbeat every 5 minutes
for time.Since(start) <= time.Duration(fillElapsedTime)*time.Minute {
log.Info("FSR enablement is ongoing, going to sleep for 5 minutes...")
time.Sleep(5 * time.Minute)
}

// Wait that all snapshot has enough fsr credit balance, it's very likely true since we have wait for long enough
log.Info("Start check and wait all snapshots have enough fsr credit balance")

startIdx := 0
retryCount := 0
for startIdx < len(snapShotIDs) {
creditBalance, err := e.getFSRCreditBalance(snapShotIDs[startIdx], targetAZ)
if err != nil {
return errors.Trace(err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should retry if the api fails. Similar to retry below for creditBalance == 0 (from "No result for metric"). Maybe for that, you can return an error as well. And consolidate the retry logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error here comes from a cloudwatch api service, it's an AWS service issue and retrying might not solve the problem.

Copy link
Contributor

@YuJuncen YuJuncen Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AWS go SDK should have an internal retry of about 20 seconds. Is this enough? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AWS go SDK should have an internal retry of about 20 seconds. Is this enough? 🤔

Make sense. Checking from https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Alarms.html, the default cloudwatch interval is 5 seconds.


if creditBalance >= 1.0 {
startIdx++
retryCount = 0
} else {
if creditBalance == 0 {
if retryCount >= 3 {
return errors.Errorf("cloudwatch metrics for %s operation failed after retrying", *snapShotIDs[startIdx])
}
retryCount++
}
time.Sleep(1 * time.Minute)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocking comment. Now that you are checking the actual fsrCreditBalance, why do you still want to keep the time based heuristic in 351-360.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible cloud watch data is not synced up yet. So, I issued 3 times retrying at 1 minute intervals.


// Create a map to store the strings as keys
pendingSnapshots := make(map[string]struct{})

Copy link
Contributor

@nkg- nkg- Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit question for

Values: []*string{aws.String("disabled"), aws.String("disabling"), aws.String("enabling"), aws.String("optimizing")},
.

Why not check for "enabled" and remove those snapshots from the pending list. There should be much fewer enabled snapshots. Also seems more robust, since you don't have the check for all other types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid there could some conflict fsr operation to those snapshots in case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean. What conflicts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, fsr status for snapshots is disabled. I am jus to check if there some concurrent operation which is to disable FSR.

Expand Down Expand Up @@ -378,6 +437,50 @@ func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string)
}
}

// getFSRCreditBalance is used to get maximum fsr credit balance of snapshot for last 5 minutes
func (e *EC2Session) getFSRCreditBalance(snapshotID *string, targetAZ string) (float64, error) {
// Set the time range to query for metrics
startTime := time.Now().Add(-5 * time.Minute)
endTime := time.Now()

// Prepare the input for the GetMetricStatisticsWithContext API call
input := &cloudwatch.GetMetricStatisticsInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
Namespace: aws.String("AWS/EBS"),
MetricName: aws.String("FastSnapshotRestoreCreditsBalance"),
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("SnapshotId"),
Value: snapshotID,
},
{
Name: aws.String("AvailabilityZone"),
Value: aws.String(targetAZ),
},
},
Period: aws.Int64(300),
Statistics: []*string{aws.String("Maximum")},
}

log.Info("metrics input", zap.Any("input", input))

// Call cloudwatchClient API to retrieve the FastSnapshotRestoreCreditsBalance metric data
resp, err := e.cloudwatchClient.GetMetricStatisticsWithContext(context.Background(), input)
if err != nil {
return 0, errors.Trace(err)
}

// parse the response
if len(resp.Datapoints) == 0 {
log.Warn("No result for metric FastSnapshotRestoreCreditsBalance returned", zap.Stringp("snapshot", snapshotID))
return 0, nil
Copy link
Contributor

@YuJuncen YuJuncen Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will prefer returning a NaN here and retry if the result is Nan. Zero should be a meaningful result of this function. Once AWS changes their algorithm of fulfilling credits (say, refresh the credit every 30 minutes.), we may fail in that case.
Returning an error and retry will also be fine I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed are made accordinngly.

}
result := resp.Datapoints[0]
log.Info("credit balance", zap.Stringp("snapshot", snapshotID), zap.Float64p("credit", result.Maximum))
return *result.Maximum, nil
}

// DisableDataFSR disables FSR for data volume snapshots
func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error {
if len(snapshotsIDsMap) == 0 {
Expand Down Expand Up @@ -528,7 +631,7 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
return newVolumeIDMap, eg.Wait()
}

func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress) (int64, error) {
func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress glue.Progress, fsrEnabledRequired bool) (int64, error) {
pendingVolumes := make([]*string, 0, len(volumeIDMap))
for oldVolID := range volumeIDMap {
newVolumeID := volumeIDMap[oldVolID]
Expand All @@ -548,7 +651,11 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress
return 0, errors.Trace(err)
}

createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(resp)
createdVolumeSize, unfinishedVolumes, err := e.HandleDescribeVolumesResponse(resp, fsrEnabledRequired)
if err != nil {
return 0, errors.Trace(err)
}

progress.IncBy(int64(len(pendingVolumes) - len(unfinishedVolumes)))
totalVolumeSize += createdVolumeSize
pendingVolumes = unfinishedVolumes
Expand Down Expand Up @@ -591,12 +698,16 @@ func ec2Tag(key, val string) *ec2.Tag {
return &ec2.Tag{Key: &key, Value: &val}
}

func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput) (int64, []*string) {
func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput, fsrEnabledRequired bool) (int64, []*string, error) {
totalVolumeSize := int64(0)

var unfinishedVolumes []*string
for _, volume := range resp.Volumes {
if *volume.State == ec2.VolumeStateAvailable {
if fsrEnabledRequired && !*volume.FastRestored {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Makes sense to check the FSR status of the restored volume as well

log.Error("snapshot fsr is not enabled for the volume", zap.String("volume", *volume.SnapshotId))
return 0, nil, errors.Errorf("Snapshot [%s] of volume [%s] is not fsr enabled", *volume.SnapshotId, *volume.VolumeId)
}
log.Info("volume is available", zap.String("id", *volume.VolumeId))
totalVolumeSize += *volume.Size
} else {
Expand All @@ -605,5 +716,5 @@ func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutp
}
}

return totalVolumeSize, unfinishedVolumes
return totalVolumeSize, unfinishedVolumes, nil
}
2 changes: 1 addition & 1 deletion br/pkg/aws/ebs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestHandleDescribeVolumesResponse(t *testing.T) {
}

e := &EC2Session{}
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(curentVolumesStates)
createdVolumeSize, unfinishedVolumes, _ := e.HandleDescribeVolumesResponse(curentVolumesStates, false)
require.Equal(t, int64(4), createdVolumeSize)
require.Equal(t, 1, len(unfinishedVolumes))
}
2 changes: 1 addition & 1 deletion br/pkg/task/restore_ebs_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin
if err != nil {
return nil, 0, errors.Trace(err)
}
totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress)
totalSize, err = ec2Session.WaitVolumesCreated(volumeIDMap, progress, h.cfg.UseFSR)
if err != nil {
return nil, 0, errors.Trace(err)
}
Expand Down
Loading