Skip to content

Commit

Permalink
Merge pull request #552 from gnufied/make-controller-expansion-idempo…
Browse files Browse the repository at this point in the history
…tent

Make EBS controllerexpansion idempotent
  • Loading branch information
k8s-ci-robot authored Sep 23, 2020
2 parents e078ae9 + 0f5ece6 commit e95bb1d
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 36 deletions.
114 changes: 85 additions & 29 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ var (
VolumeTypeST1,
VolumeTypeStandard,
}

volumeModificationDuration = 1 * time.Second
volumeModificationWaitFactor = 1.7
volumeModificationWaitSteps = 10
)

// AWS provisioning limits.
Expand Down Expand Up @@ -114,6 +118,9 @@ var (

// ErrInvalidMaxResults is returned when a MaxResults pagination parameter is between 1 and 4
ErrInvalidMaxResults = errors.New("MaxResults parameter must be 0 or greater than or equal to 5")

// VolumeNotBeingModified is returned if volume being described is not being modified
VolumeNotBeingModified = fmt.Errorf("volume is not being modified")
)

// Disk represents a EBS volume
Expand Down Expand Up @@ -651,7 +658,6 @@ func (c *cloud) ec2SnapshotResponseToStruct(ec2Snapshot *ec2.Snapshot) *Snapshot
func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
var volumes []*ec2.Volume
var nextToken *string

for {
response, err := c.ec2.DescribeVolumesWithContext(ctx, request)
if err != nil {
Expand Down Expand Up @@ -799,13 +805,6 @@ func isAWSError(err error, code string) bool {
return false
}

// isAWSErrorIncorrectModification returns a boolean indicating whether the given error
// is an AWS IncorrectModificationState error. This error means that a modification action
// on an EBS volume cannot occur because the volume is currently being modified.
func isAWSErrorIncorrectModification(err error) bool {
return isAWSError(err, "IncorrectModificationState")
}

// isAWSErrorInstanceNotFound returns a boolean indicating whether the
// given error is an AWS InvalidInstanceID.NotFound error. This error is
// reported when the specified instance doesn't exist.
Expand Down Expand Up @@ -834,6 +833,12 @@ func isAWSErrorInvalidAttachmentNotFound(err error) bool {
return isAWSError(err, "InvalidAttachment.NotFound")
}

// isAWSErrorModificationNotFound returns a boolean indicating whether the given
// error is an AWS InvalidVolumeModification.NotFound error
func isAWSErrorModificationNotFound(err error) bool {
return isAWSError(err, "InvalidVolumeModification.NotFound")
}

// isAWSErrorSnapshotNotFound returns a boolean indicating whether the
// given error is an AWS InvalidSnapshot.NotFound error. This error is
// reported when the specified snapshot doesn't exist.
Expand All @@ -858,8 +863,33 @@ func (c *cloud) ResizeDisk(ctx context.Context, volumeID string, newSizeBytes in
newSizeGiB := util.RoundUpGiB(newSizeBytes)
oldSizeGiB := aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)

if latestMod != nil && modFetchError == nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
_, err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && modFetchError != VolumeNotBeingModified {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %v", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).Infof("Volume %q's current size (%d GiB) is greater or equal to the new size (%d GiB)", volumeID, oldSizeGiB, newSizeGiB)
klog.V(5).Infof("Volume %q current size (%d GiB) is greater or equal to the new size (%d GiB)", volumeID, oldSizeGiB, newSizeGiB)
_, err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && err != VolumeNotBeingModified {
return oldSizeGiB, err
}
return oldSizeGiB, nil
}

Expand All @@ -868,38 +898,54 @@ func (c *cloud) ResizeDisk(ctx context.Context, volumeID string, newSizeBytes in
Size: aws.Int64(newSizeGiB),
}

var mod *ec2.VolumeModification
klog.Infof("expanding volume %q to size %d", volumeID, newSizeGiB)
response, err := c.ec2.ModifyVolumeWithContext(ctx, req)
if err != nil {
if !isAWSErrorIncorrectModification(err) {
return 0, fmt.Errorf("could not modify AWS volume %q: %v", volumeID, err)
}
return 0, fmt.Errorf("could not modify AWS volume %q: %v", volumeID, err)
}

m, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil {
return 0, err
}
mod = m
mod := response.VolumeModification

state := aws.StringValue(mod.ModificationState)
if volumeModificationDone(state) {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}

if mod == nil {
mod = response.VolumeModification
_, err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}

state := aws.StringValue(mod.ModificationState)
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
return aws.Int64Value(mod.TargetSize), nil
// Checks for desired size on volume by also verifying volume size by describing volume.
// This is to get around potential eventual consistency problems with describing volume modifications
// objects and ensuring that we read two different objects to verify volume state.
func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

return c.waitForVolumeSize(ctx, volumeID)
// AWS resizes in chunks of GiB (not GB)
oldSizeGiB := aws.Int64Value(volume.Size)
if oldSizeGiB >= newSizeGiB {
return oldSizeGiB, nil
}
return oldSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, newSizeGiB)
}

// waitForVolumeSize waits for a volume modification to finish and return its size.
func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) (int64, error) {
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 20,
Duration: volumeModificationDuration,
Factor: volumeModificationWaitFactor,
Steps: volumeModificationWaitSteps,
}

var modVolSizeGiB int64
Expand All @@ -910,7 +956,7 @@ func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) (int64,
}

state := aws.StringValue(m.ModificationState)
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
if volumeModificationDone(state) {
modVolSizeGiB = aws.Int64Value(m.TargetSize)
return true, nil
}
Expand All @@ -934,12 +980,15 @@ func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string
}
mod, err := c.ec2.DescribeVolumesModificationsWithContext(ctx, request)
if err != nil {
if isAWSErrorModificationNotFound(err) {
return nil, VolumeNotBeingModified
}
return nil, fmt.Errorf("error describing modifications in volume %q: %v", volumeID, err)
}

volumeMods := mod.VolumesModifications
if len(volumeMods) == 0 {
return nil, fmt.Errorf("could not find any modifications for volume %q", volumeID)
return nil, VolumeNotBeingModified
}

return volumeMods[len(volumeMods)-1], nil
Expand All @@ -961,3 +1010,10 @@ func (c *cloud) randomAvailabilityZone(ctx context.Context) (string, error) {

return zones[0], nil
}

func volumeModificationDone(state string) bool {
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
return true
}
return false
}
54 changes: 47 additions & 7 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func TestResizeDisk(t *testing.T) {
VolumeModification: &ec2.VolumeModification{
VolumeId: aws.String("vol-test"),
TargetSize: aws.Int64(2),
ModificationState: aws.String(ec2.VolumeModificationStateOptimizing),
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
reqSizeGiB: 2,
Expand Down Expand Up @@ -659,6 +659,26 @@ func TestResizeDisk(t *testing.T) {
reqSizeGiB: 2,
expErr: nil,
},
{
name: "success: with previous expansion",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(2),
AvailabilityZone: aws.String(defaultZone),
},
descModVolume: &ec2.DescribeVolumesModificationsOutput{
VolumesModifications: []*ec2.VolumeModification{
{
VolumeId: aws.String("vol-test"),
TargetSize: aws.Int64(2),
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
},
reqSizeGiB: 2,
expErr: nil,
},
{
name: "fail: volume doesn't exist",
volumeID: "vol-test",
Expand All @@ -667,46 +687,66 @@ func TestResizeDisk(t *testing.T) {
expErr: fmt.Errorf("ResizeDisk generic error"),
},
{
name: "success: there is a resizing in progress",
name: "failure: volume in modifying state",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(1),
AvailabilityZone: aws.String(defaultZone),
},
modifiedVolumeError: awserr.New("IncorrectModificationState", "", nil),
descModVolume: &ec2.DescribeVolumesModificationsOutput{
VolumesModifications: []*ec2.VolumeModification{
{
VolumeId: aws.String("vol-test"),
TargetSize: aws.Int64(2),
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
ModificationState: aws.String(ec2.VolumeModificationStateModifying),
},
},
},
reqSizeGiB: 2,
expErr: nil,
expErr: fmt.Errorf("ResizeDisk generic error"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockEC2 := mocks.NewMockEC2(mockCtrl)
// reduce number of steps to reduce test time
volumeModificationWaitSteps = 3
c := newCloud(mockEC2)

ctx := context.Background()
if tc.existingVolume != nil || tc.existingVolumeError != nil {
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{tc.existingVolume},
}, tc.existingVolumeError).AnyTimes()
Volumes: []*ec2.Volume{
tc.existingVolume,
},
}, tc.existingVolumeError)

if tc.expErr == nil && aws.Int64Value(tc.existingVolume.Size) != tc.reqSizeGiB {
resizedVolume := &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(tc.reqSizeGiB),
AvailabilityZone: aws.String(defaultZone),
}
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{
resizedVolume,
},
}, tc.existingVolumeError)
}
}
if tc.modifiedVolume != nil || tc.modifiedVolumeError != nil {
mockEC2.EXPECT().ModifyVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(tc.modifiedVolume, tc.modifiedVolumeError).AnyTimes()
}
if tc.descModVolume != nil {
mockEC2.EXPECT().DescribeVolumesModificationsWithContext(gomock.Eq(ctx), gomock.Any()).Return(tc.descModVolume, nil).AnyTimes()
} else {
emptyOutput := &ec2.DescribeVolumesModificationsOutput{}
mockEC2.EXPECT().DescribeVolumesModificationsWithContext(gomock.Eq(ctx), gomock.Any()).Return(emptyOutput, nil).AnyTimes()
}

newSize, err := c.ResizeDisk(ctx, tc.volumeID, util.GiBToBytes(tc.reqSizeGiB))
Expand Down

0 comments on commit e95bb1d

Please sign in to comment.