Skip to content

Commit

Permalink
Verify pending volume modifications and size both
Browse files Browse the repository at this point in the history
When returning successful for volume expansion requests we should
verify both volume size reported via DescribeVolume and pending volume
modifications requests
  • Loading branch information
gnufied committed Sep 9, 2020
1 parent 1c782eb commit a98b32a
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 28 deletions.
109 changes: 85 additions & 24 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
VolumeTypeST1,
VolumeTypeStandard,
}
VolumeNotBeingModified = fmt.Errorf("volume is not being modified")
)

// AWS provisioning limits.
Expand Down Expand Up @@ -648,7 +649,6 @@ func (c *cloud) ec2SnapshotResponseToStruct(ec2Snapshot *ec2.Snapshot) *Snapshot
func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
var volumes []*ec2.Volume
var nextToken *string

for {
response, err := c.ec2.DescribeVolumesWithContext(ctx, request)
if err != nil {
Expand Down Expand Up @@ -831,6 +831,12 @@ func isAWSErrorInvalidAttachmentNotFound(err error) bool {
return isAWSError(err, "InvalidAttachment.NotFound")
}

// isAWSErrorModificationNotFound returns a boolean indicating whether the given
// error is an AWS InvalidVolumeModification.NotFound error
func isAWSErrorModificationNotFound(err error) bool {
return isAWSError(err, "InvalidVolumeModification.NotFound")
}

// isAWSErrorSnapshotNotFound returns a boolean indicating whether the
// given error is an AWS InvalidSnapshot.NotFound error. This error is
// reported when the specified snapshot doesn't exist.
Expand All @@ -855,40 +861,60 @@ func (c *cloud) ResizeDisk(ctx context.Context, volumeID string, newSizeBytes in
newSizeGiB := util.RoundUpGiB(newSizeBytes)
oldSizeGiB := aws.Int64Value(volume.Size)

if oldSizeGiB >= newSizeGiB {
klog.V(5).Infof("Volume %q's current size (%d GiB) is greater or equal to the new size (%d GiB)", volumeID, oldSizeGiB, newSizeGiB)
return oldSizeGiB, nil
}
latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)

latestMod, err := c.getLatestVolumeModification(ctx, volumeID)
// if there are no errors fetching modifications
if err == nil && latestMod != nil {
if latestMod != nil && modFetchError == nil {
state := aws.StringValue(latestMod.ModificationState)
targetSize := aws.Int64Value(latestMod.TargetSize)
if (state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing) && targetSize >= newSizeGiB {
return targetSize, nil
if state == ec2.VolumeModificationStateModifying {
return oldSizeGiB, fmt.Errorf("volume %q is still being expanded to size %d", volumeID, newSizeGiB)
}
}

if state == ec2.VolumeModificationStateModifying {
return oldSizeGiB, fmt.Errorf("volume %q is still being expanded to size %d", volumeID, targetSize)
// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && modFetchError != VolumeNotBeingModified {
klog.Errorf("error fetching volume modifications for %q: %v", volumeID, modFetchError)
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %v", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).Infof("Volume %q current size (%d GiB) is greater or equal to the new size (%d GiB)", volumeID, oldSizeGiB, newSizeGiB)
modificationDone := false
if latestMod != nil {
state := aws.StringValue(latestMod.ModificationState)
targetSize := aws.Int64Value(latestMod.TargetSize)
if volumeModificationDone(state) && (targetSize >= newSizeGiB) {
modificationDone = true
}
}
if modFetchError == VolumeNotBeingModified {
modificationDone = true
}

if modificationDone {
return oldSizeGiB, nil
}
return oldSizeGiB, fmt.Errorf("volume %q is still being expanded", volumeID)
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
Size: aws.Int64(newSizeGiB),
}

klog.Infof("expanding volume %q to size %d", volumeID, newSizeGiB)
var mod *ec2.VolumeModification
response, err := c.ec2.ModifyVolumeWithContext(ctx, req)
if err != nil {
if !isAWSErrorIncorrectModification(err) {
return 0, fmt.Errorf("could not modify AWS volume %q: %v", volumeID, err)
}

m, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil {
return 0, err
m, modFetchError := c.getLatestVolumeModification(ctx, volumeID)
if modFetchError != nil {
return 0, modFetchError
}
mod = m
}
Expand All @@ -898,20 +924,45 @@ func (c *cloud) ResizeDisk(ctx context.Context, volumeID string, newSizeBytes in
}

state := aws.StringValue(mod.ModificationState)
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
return aws.Int64Value(mod.TargetSize), nil
if volumeModificationDone(state) {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}

return c.waitForVolumeSize(ctx, volumeID)
_, err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}

// Checks for desired size on volume by also verifying volume size by describing volume.
// This is to get around potential eventual consistency problems with describing volume modifications
// objects and ensuring that we read two different objects to verify volume state.
func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

// AWS resizes in chunks of GiB (not GB)
oldSizeGiB := aws.Int64Value(volume.Size)
if oldSizeGiB >= newSizeGiB {
return oldSizeGiB, nil
}
return oldSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, newSizeGiB)
}

// waitForVolumeSize waits for a volume modification to finish and return its size.
func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) (int64, error) {
// the default context is 10s and hence we should reduce this to more reasonable value and let external-resizer retry
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 3,
Factor: 1.7,
Steps: 10,
}

var modVolSizeGiB int64
Expand All @@ -922,7 +973,7 @@ func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) (int64,
}

state := aws.StringValue(m.ModificationState)
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
if volumeModificationDone(state) {
modVolSizeGiB = aws.Int64Value(m.TargetSize)
return true, nil
}
Expand All @@ -946,12 +997,15 @@ func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string
}
mod, err := c.ec2.DescribeVolumesModificationsWithContext(ctx, request)
if err != nil {
if isAWSErrorModificationNotFound(err) {
return nil, VolumeNotBeingModified
}
return nil, fmt.Errorf("error describing modifications in volume %q: %v", volumeID, err)
}

volumeMods := mod.VolumesModifications
if len(volumeMods) == 0 {
return nil, fmt.Errorf("could not find any modifications for volume %q", volumeID)
return nil, VolumeNotBeingModified
}

return volumeMods[len(volumeMods)-1], nil
Expand All @@ -973,3 +1027,10 @@ func (c *cloud) randomAvailabilityZone(ctx context.Context) (string, error) {

return zones[0], nil
}

func volumeModificationDone(state string) bool {
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
return true
}
return false
}
44 changes: 40 additions & 4 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func TestResizeDisk(t *testing.T) {
VolumeModification: &ec2.VolumeModification{
VolumeId: aws.String("vol-test"),
TargetSize: aws.Int64(2),
ModificationState: aws.String(ec2.VolumeModificationStateOptimizing),
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
reqSizeGiB: 2,
Expand Down Expand Up @@ -664,7 +664,7 @@ func TestResizeDisk(t *testing.T) {
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(1),
Size: aws.Int64(2),
AvailabilityZone: aws.String(defaultZone),
},
descModVolume: &ec2.DescribeVolumesModificationsOutput{
Expand Down Expand Up @@ -707,6 +707,26 @@ func TestResizeDisk(t *testing.T) {
reqSizeGiB: 2,
expErr: nil,
},
{
name: "failure: volume in modifying state",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(1),
AvailabilityZone: aws.String(defaultZone),
},
descModVolume: &ec2.DescribeVolumesModificationsOutput{
VolumesModifications: []*ec2.VolumeModification{
{
VolumeId: aws.String("vol-test"),
TargetSize: aws.Int64(2),
ModificationState: aws.String(ec2.VolumeModificationStateModifying),
},
},
},
reqSizeGiB: 2,
expErr: fmt.Errorf("ResizeDisk generic error"),
},
}

for _, tc := range testCases {
Expand All @@ -719,8 +739,24 @@ func TestResizeDisk(t *testing.T) {
if tc.existingVolume != nil || tc.existingVolumeError != nil {
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{tc.existingVolume},
}, tc.existingVolumeError).AnyTimes()
Volumes: []*ec2.Volume{
tc.existingVolume,
},
}, tc.existingVolumeError)

if tc.expErr == nil && aws.Int64Value(tc.existingVolume.Size) != tc.reqSizeGiB {
resizedVolume := &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(tc.reqSizeGiB),
AvailabilityZone: aws.String(defaultZone),
}
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{
resizedVolume,
},
}, tc.existingVolumeError)
}
}
if tc.modifiedVolume != nil || tc.modifiedVolumeError != nil {
mockEC2.EXPECT().ModifyVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(tc.modifiedVolume, tc.modifiedVolumeError).AnyTimes()
Expand Down

0 comments on commit a98b32a

Please sign in to comment.