-
-
Notifications
You must be signed in to change notification settings - Fork 356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement support for SNS Topics #354
Changes from all commits
e10b6a5
6defb24
d1a05b0
67adb58
bf21784
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package aws | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
awsconfig "github.com/aws/aws-sdk-go-v2/config" | ||
"github.com/aws/aws-sdk-go-v2/service/sns" | ||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/gruntwork-io/cloud-nuke/config" | ||
"github.com/gruntwork-io/cloud-nuke/logging" | ||
"github.com/gruntwork-io/go-commons/errors" | ||
"github.com/hashicorp/go-multierror" | ||
) | ||
|
||
func getAllSNSTopics(session *session.Session, excludeAfter time.Time, configObj config.Config) ([]*string, error) { | ||
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region))) | ||
if err != nil { | ||
return []*string{}, errors.WithStackTrace(err) | ||
} | ||
svc := sns.NewFromConfig(cfg) | ||
|
||
allSNSTopics := []*string{} | ||
|
||
paginator := sns.NewListTopicsPaginator(svc, nil) | ||
|
||
for paginator.HasMorePages() { | ||
resp, err := paginator.NextPage(context.TODO()) | ||
if err != nil { | ||
return []*string{}, errors.WithStackTrace(err) | ||
} | ||
for _, topic := range resp.Topics { | ||
allSNSTopics = append(allSNSTopics, topic.TopicArn) | ||
} | ||
} | ||
return allSNSTopics, nil | ||
} | ||
|
||
func nukeAllSNSTopics(session *session.Session, identifiers []*string) error { | ||
region := aws.StringValue(session.Config.Region) | ||
|
||
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region))) | ||
if err != nil { | ||
return errors.WithStackTrace(err) | ||
} | ||
svc := sns.NewFromConfig(cfg) | ||
|
||
if len(identifiers) == 0 { | ||
logging.Logger.Infof("No SNS Topics to nuke in region %s", region) | ||
} | ||
|
||
if len(identifiers) > 100 { | ||
logging.Logger.Errorf("Nuking too many SNS Topics (100): halting to avoid hitting AWS API rate limiting") | ||
return TooManySNSTopicsErr{} | ||
} | ||
|
||
// There is no bulk delete SNS API, so we delete the batch of SNS Topics concurrently using goroutines | ||
logging.Logger.Infof("Deleting SNS Topics in region %s", region) | ||
wg := new(sync.WaitGroup) | ||
wg.Add(len(identifiers)) | ||
errChans := make([]chan error, len(identifiers)) | ||
for i, topicArn := range identifiers { | ||
errChans[i] = make(chan error, 1) | ||
go deleteSNSTopicAsync(wg, errChans[i], svc, topicArn, region) | ||
} | ||
wg.Wait() | ||
|
||
var allErrs *multierror.Error | ||
for _, errChan := range errChans { | ||
if err := <-errChan; err != nil { | ||
allErrs = multierror.Append(allErrs, err) | ||
logging.Logger.Errorf("[Failed] %s", err) | ||
} | ||
} | ||
finalErr := allErrs.ErrorOrNil() | ||
if finalErr != nil { | ||
return errors.WithStackTrace(finalErr) | ||
} | ||
return nil | ||
} | ||
|
||
func deleteSNSTopicAsync(wg *sync.WaitGroup, errChan chan error, svc *sns.Client, topicArn *string, region string) { | ||
defer wg.Done() | ||
|
||
deleteParam := &sns.DeleteTopicInput{ | ||
TopicArn: topicArn, | ||
} | ||
|
||
logging.Logger.Infof("Deleting SNS Topic (arn=%s) in region: %s", aws.StringValue(topicArn), region) | ||
|
||
_, err := svc.DeleteTopic(context.TODO(), deleteParam) | ||
|
||
errChan <- err | ||
|
||
if err == nil { | ||
logging.Logger.Infof("[OK] Deleted SNS Topic (arn=%s) in region: %s", aws.StringValue(topicArn), region) | ||
} else { | ||
logging.Logger.Errorf("[Failed] Error deleting SNS Topic (arn=%s) in %s", aws.StringValue(topicArn), region) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're rate limited, is this where execution will end up? I think this is fine. In the worst case we'll have to retry the delete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, that or any other unhandled cause for an error state should end up here. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package aws | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
"testing" | ||
"time" | ||
|
||
awsconfig "github.com/aws/aws-sdk-go-v2/config" | ||
"github.com/aws/aws-sdk-go-v2/service/sns" | ||
"github.com/aws/aws-sdk-go/aws" | ||
awsgo "github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/gruntwork-io/cloud-nuke/config" | ||
"github.com/gruntwork-io/cloud-nuke/util" | ||
"github.com/gruntwork-io/go-commons/errors" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type TestSNSTopic struct { | ||
Name *string | ||
Arn *string | ||
} | ||
|
||
func createTestSNSTopic(t *testing.T, session *session.Session, name string) (*TestSNSTopic, error) { | ||
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region))) | ||
require.NoError(t, err) | ||
|
||
svc := sns.NewFromConfig(cfg) | ||
|
||
testSNSTopic := &TestSNSTopic{ | ||
Name: aws.String(name), | ||
} | ||
|
||
param := &sns.CreateTopicInput{ | ||
Name: testSNSTopic.Name, | ||
} | ||
|
||
// Do a coin-flip to choose either a FIFO or Standard SNS Topic | ||
coin := []string{ | ||
"true", | ||
"false", | ||
} | ||
rand.Seed(time.Now().UnixNano()) | ||
coinflip := coin[rand.Intn(len(coin))] | ||
param.Attributes = make(map[string]string) | ||
param.Attributes["FifoTopic"] = coinflip | ||
|
||
// If we did choose to create a fifo queue, the name must end in ".fifo" | ||
if coinflip == "true" { | ||
param.Name = aws.String(fmt.Sprintf("%s.fifo", aws.StringValue(param.Name))) | ||
} | ||
|
||
output, err := svc.CreateTopic(context.TODO(), param) | ||
if err != nil { | ||
assert.Failf(t, "Could not create test SNS Topic: %s", errors.WithStackTrace(err).Error()) | ||
} | ||
|
||
testSNSTopic.Arn = output.TopicArn | ||
|
||
return testSNSTopic, nil | ||
} | ||
|
||
func TestListSNSTopics(t *testing.T) { | ||
t.Parallel() | ||
|
||
region, err := getRandomRegion() | ||
require.NoError(t, err) | ||
session, err := session.NewSession(&awsgo.Config{ | ||
Region: awsgo.String(region), | ||
}, | ||
) | ||
if err != nil { | ||
assert.Fail(t, errors.WithStackTrace(err).Error()) | ||
} | ||
|
||
snsTopicName := "aws-nuke-test-" + util.UniqueID() | ||
testSNSTopic, createTestSNSTopicErr := createTestSNSTopic(t, session, snsTopicName) | ||
require.NoError(t, createTestSNSTopicErr) | ||
// clean up after this test | ||
defer nukeAllSNSTopics(session, []*string{testSNSTopic.Arn}) | ||
|
||
snsTopicArns, err := getAllSNSTopics(session, time.Now(), config.Config{}) | ||
if err != nil { | ||
assert.Fail(t, "Unable to fetch list of SNS Topics") | ||
} | ||
|
||
assert.Contains(t, awsgo.StringValueSlice(snsTopicArns), aws.StringValue(testSNSTopic.Arn)) | ||
} | ||
|
||
func TestNukeSNSTopicOne(t *testing.T) { | ||
t.Parallel() | ||
|
||
region, err := getRandomRegion() | ||
require.NoError(t, err) | ||
|
||
session, err := session.NewSession(&aws.Config{Region: aws.String(region)}) | ||
require.NoError(t, err) | ||
|
||
snsTopicName := "aws-nuke-test-" + util.UniqueID() | ||
|
||
testSNSTopic, createTestSNSTopicErr := createTestSNSTopic(t, session, snsTopicName) | ||
require.NoError(t, createTestSNSTopicErr) | ||
|
||
nukeErr := nukeAllSNSTopics(session, []*string{testSNSTopic.Arn}) | ||
require.NoError(t, nukeErr) | ||
|
||
// Make sure the SNS Topic was deleted | ||
snsTopicArns, err := getAllSNSTopics(session, time.Now(), config.Config{}) | ||
require.NoError(t, err) | ||
|
||
assert.NotContains(t, aws.StringValueSlice(snsTopicArns), aws.StringValue(testSNSTopic.Arn)) | ||
} | ||
|
||
func TestNukeSNSTopicMoreThanOne(t *testing.T) { | ||
t.Parallel() | ||
|
||
region, err := getRandomRegion() | ||
require.NoError(t, err) | ||
|
||
session, err := session.NewSession(&aws.Config{Region: aws.String(region)}) | ||
require.NoError(t, err) | ||
|
||
testSNSTopicName := "aws-nuke-test-" + util.UniqueID() | ||
testSNSTopicName2 := "aws-nuke-test-" + util.UniqueID() | ||
|
||
testSNSTopic, createTestErr := createTestSNSTopic(t, session, testSNSTopicName) | ||
require.NoError(t, createTestErr) | ||
testSNSTopic2, createTestErr2 := createTestSNSTopic(t, session, testSNSTopicName2) | ||
require.NoError(t, createTestErr2) | ||
|
||
nukeErr := nukeAllSNSTopics(session, []*string{testSNSTopic.Arn, testSNSTopic2.Arn}) | ||
require.NoError(t, nukeErr) | ||
|
||
// Make sure the SNS topics were deleted | ||
snsTopicArns, err := getAllSNSTopics(session, time.Now(), config.Config{}) | ||
require.NoError(t, err) | ||
|
||
assert.NotContains(t, aws.StringValueSlice(snsTopicArns), aws.StringValue(testSNSTopic.Arn)) | ||
assert.NotContains(t, aws.StringValueSlice(snsTopicArns), aws.StringValue(testSNSTopic2.Arn)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package aws | ||
|
||
import ( | ||
awsgo "github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/gruntwork-io/go-commons/errors" | ||
) | ||
|
||
type SNSTopic struct { | ||
Arns []string | ||
} | ||
|
||
func (s SNSTopic) ResourceName() string { | ||
return "snstopic" | ||
} | ||
|
||
func (s SNSTopic) ResourceIdentifiers() []string { | ||
return s.Arns | ||
} | ||
|
||
func (s SNSTopic) MaxBatchSize() int { | ||
return 50 | ||
} | ||
|
||
func (s SNSTopic) Nuke(session *session.Session, identifiers []string) error { | ||
if err := nukeAllSNSTopics(session, awsgo.StringSlice(identifiers)); err != nil { | ||
return errors.WithStackTrace(err) | ||
} | ||
return nil | ||
} | ||
|
||
// custom errors | ||
|
||
type TooManySNSTopicsErr struct{} | ||
|
||
func (err TooManySNSTopicsErr) Error() string { | ||
return "Too many SNS Topics requested at once." | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know that 100 is the limit? Should we detect rate limiting and respond to that, or is that overkill?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't know for sure, but this is the pattern for proactively trying to avoid rate limiting that I find us carrying throughout the resources. It would be nice if we had some intelligent logic to respond to rate limiting, but I'd need to think about the cleanest way to do that.