From 34406db65e4d1cc1ccb658e592322c1d67a86e12 Mon Sep 17 00:00:00 2001 From: Taliesin Millhouse Date: Thu, 26 May 2022 16:10:13 +1000 Subject: [PATCH] feat: add kinesis stream support --- README.md | 5 ++ aws/aws.go | 35 +++++++---- aws/kinesis_stream.go | 122 ++++++++++++++++++++++++++++++++++++ aws/kinesis_stream_test.go | 121 +++++++++++++++++++++++++++++++++++ aws/kinesis_stream_types.go | 38 +++++++++++ config/config.go | 1 + config/config_test.go | 1 + 7 files changed, 311 insertions(+), 12 deletions(-) create mode 100644 aws/kinesis_stream.go create mode 100644 aws/kinesis_stream_test.go create mode 100644 aws/kinesis_stream_types.go diff --git a/README.md b/README.md index 5db246525..988ea2629 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ The currently supported functionality includes: - Deleting all IAM OpenID Connect Providers - Deleting all Customer managed keys from Key Management Service in an AWS account - Deleting all CloudWatch Log Groups in an AWS Account +- Deleting all Kinesis Streams in an AWS account ### BEWARE! @@ -219,6 +220,9 @@ The following resources support the Config file: - EKS Clusters - Resource type: `ekscluster` - Config key: `EKSCluster` +- Kinesis Streams + - Resource type: `kinesis-stream` + - Config key: `KinesisStream` #### Example @@ -327,6 +331,7 @@ To find out what we options are supported in the config file today, consult this | eip | none | ✅ | none | none | | ec2 | none | ✅ | none | none | | eks | none | ✅ | none | none | +| kinesis-stream | none | ✅ | none | none | | acmpca | none | none | none | none | | iam role | none | none | none | none | | ... (more to come) | none | none | none | none | diff --git a/aws/aws.go b/aws/aws.go index 8e498c9c2..d363f6890 100644 --- a/aws/aws.go +++ b/aws/aws.go @@ -115,7 +115,7 @@ func getRandomRegionWithExclusions(regionsToExclude []string) (string, error) { rand.Seed(time.Now().UnixNano()) // exclude from "allRegions" - var exclusions = make(map[string]string) + exclusions := make(map[string]string) for _, region := range regionsToExclude { exclusions[region] = region } @@ -217,7 +217,7 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp count := 1 totalRegions := len(targetRegions) - var resourcesCache = map[string]map[string][]*string{} + resourcesCache := map[string]map[string][]*string{} for _, region := range targetRegions { // The "global" region case is handled outside this loop @@ -228,9 +228,9 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp logging.Logger.Infof("Checking region [%d/%d]: %s", count, totalRegions, region) session, err := session.NewSession(&awsgo.Config{ - Region: awsgo.String(region)}, + Region: awsgo.String(region), + }, ) - if err != nil { return nil, errors.WithStackTrace(err) } @@ -517,7 +517,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp dbInstances := DBInstances{} if IsNukeable(dbInstances.ResourceName(), resourceTypes) { instanceNames, err := getAllRdsInstances(session, excludeAfter) - if err != nil { return nil, errors.WithStackTrace(err) } @@ -535,7 +534,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp dbClusters := DBClusters{} if IsNukeable(dbClusters.ResourceName(), resourceTypes) { clustersNames, err := getAllRdsClusters(session, excludeAfter) - if err != nil { return nil, errors.WithStackTrace(err) } @@ -551,7 +549,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp lambdaFunctions := LambdaFunctions{} if IsNukeable(lambdaFunctions.ResourceName(), resourceTypes) { lambdaFunctionNames, err := getAllLambdaFunctions(session, excludeAfter, configObj, lambdaFunctions.MaxBatchSize()) - if err != nil { return nil, errors.WithStackTrace(err) } @@ -663,7 +660,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp DynamoDB := DynamoDB{} if IsNukeable(DynamoDB.ResourceName(), resourceTypes) { tablenames, err := getAllDynamoTables(session, excludeAfter, configObj, DynamoDB) - if err != nil { return nil, errors.WithStackTrace(err) } @@ -721,6 +717,20 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp } // End KMS Customer managed keys + // Kinesis Streams + kinesisStreams := KinesisStreams{} + if IsNukeable(kinesisStreams.ResourceName(), resourceTypes) { + streams, err := getAllKinesisStreams(session, configObj) + if err != nil { + return nil, errors.WithStackTrace(err) + } + if len(streams) > 0 { + kinesisStreams.Names = awsgo.StringValueSlice(streams) + resourcesInRegion.Resources = append(resourcesInRegion.Resources, kinesisStreams) + } + } + // End Kinesis Streams + if len(resourcesInRegion.Resources) > 0 { account.Resources[region] = resourcesInRegion } @@ -735,7 +745,8 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp // As there is no actual region named global we have to pick a valid one just to create the session sessionRegion := defaultRegion session, err := session.NewSession(&awsgo.Config{ - Region: awsgo.String(sessionRegion)}, + Region: awsgo.String(sessionRegion), + }, ) if err != nil { return nil, errors.WithStackTrace(err) @@ -747,7 +758,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp iamUsers := IAMUsers{} if IsNukeable(iamUsers.ResourceName(), resourceTypes) { userNames, err := getAllIamUsers(session, excludeAfter, configObj) - if err != nil { return nil, errors.WithStackTrace(err) } @@ -816,6 +826,7 @@ func ListResourceTypes() []string { OIDCProviders{}.ResourceName(), KmsCustomerKeys{}.ResourceName(), CloudWatchLogGroups{}.ResourceName(), + KinesisStreams{}.ResourceName(), } sort.Strings(resourceTypes) return resourceTypes @@ -881,9 +892,9 @@ func NukeAllResources(account *AwsAccountResources, regions []string) error { } session, err := session.NewSession(&awsgo.Config{ - Region: awsgo.String(sessionRegion)}, + Region: awsgo.String(sessionRegion), + }, ) - if err != nil { return errors.WithStackTrace(err) } diff --git a/aws/kinesis_stream.go b/aws/kinesis_stream.go new file mode 100644 index 000000000..a8c6c3829 --- /dev/null +++ b/aws/kinesis_stream.go @@ -0,0 +1,122 @@ +package aws + +import ( + "sync" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/gruntwork-io/cloud-nuke/config" + "github.com/gruntwork-io/cloud-nuke/logging" + "github.com/gruntwork-io/go-commons/errors" + "github.com/hashicorp/go-multierror" +) + +func getAllKinesisStreams(session *session.Session, configObj config.Config) ([]*string, error) { + svc := kinesis.New(session) + + allStreams := []*string{} + err := svc.ListStreamsPages( + &kinesis.ListStreamsInput{}, + func(page *kinesis.ListStreamsOutput, lastPage bool) bool { + for _, streamName := range page.StreamNames { + if shouldIncludeKinesisStream(streamName, configObj) { + allStreams = append(allStreams, streamName) + } + } + return !lastPage + }, + ) + if err != nil { + return nil, errors.WithStackTrace(err) + } + return allStreams, nil +} + +func shouldIncludeKinesisStream(streamName *string, configObj config.Config) bool { + if streamName == nil { + return false + } + + return config.ShouldInclude( + aws.StringValue(streamName), + configObj.KinesisStream.IncludeRule.NamesRegExp, + configObj.KinesisStream.ExcludeRule.NamesRegExp, + ) +} + +func nukeAllKinesisStreams(session *session.Session, identifiers []*string) error { + region := aws.StringValue(session.Config.Region) + svc := kinesis.New(session) + + if len(identifiers) == 0 { + logging.Logger.Infof("No Kinesis Streams to nuke in region: %s", region) + } + + // NOTE: we don't need to do pagination here, because the pagination is handled by the caller to this function, + // based on KinesisStream.MaxBatchSize, however we add a guard here to warn users when the batching fails and + // has a chance of throttling AWS. Since we concurrently make one call for each identifier, we pick 100 for the + // limit here because many APIs in AWS have a limit of 100 requests per second. + if len(identifiers) > 100 { + logging.Logger.Errorf("Nuking too many Kinesis Streams at once (100): halting to avoid hitting AWS API rate limiting") + return TooManyStreamsErr{} + } + + // There is no bulk delete Kinesis Stream API, so we delete the batch of Kinesis Streams concurrently + // using go routines. + logging.Logger.Infof("Deleting Kinesis Streams in region: %s", region) + wg := new(sync.WaitGroup) + wg.Add(len(identifiers)) + errChans := make([]chan error, len(identifiers)) + for i, streamName := range identifiers { + errChans[i] = make(chan error, 1) + go deleteKinesisStreamAsync(wg, errChans[i], svc, streamName, region) + } + wg.Wait() + + // Collect all the errors from the async delete calls into a single error struct. + // NOTE: We ignore OperationAbortedException which is thrown when there is an eventual consistency issue, where + // cloud-nuke picks up a Stream that is already requested to be deleted. + var allErrs *multierror.Error + for _, errChan := range errChans { + if err := <-errChan; err != nil { + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "OperationAbortedException" { + allErrs = multierror.Append(allErrs, err) + } + } + } + finalErr := allErrs.ErrorOrNil() + if finalErr != nil { + return errors.WithStackTrace(finalErr) + } + return nil +} + +func deleteKinesisStreamAsync( + wg *sync.WaitGroup, + errChan chan error, + svc *kinesis.Kinesis, + streamName *string, + region string, +) { + defer wg.Done() + input := &kinesis.DeleteStreamInput{StreamName: streamName} + _, err := svc.DeleteStream(input) + errChan <- err + + streamNameStr := aws.StringValue(streamName) + if err == nil { + logging.Logger.Infof("[OK] Kinesis Stream %s delete in %s", streamNameStr, region) + } else { + logging.Logger.Errorf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, region, err) + } +} + +// Custom errors + +type TooManyStreamsErr struct{} + +func (err TooManyStreamsErr) Error() string { + return "Too many Streams requested at once." +} diff --git a/aws/kinesis_stream_test.go b/aws/kinesis_stream_test.go new file mode 100644 index 000000000..b19067ff9 --- /dev/null +++ b/aws/kinesis_stream_test.go @@ -0,0 +1,121 @@ +package aws + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/gruntwork-io/cloud-nuke/config" + "github.com/gruntwork-io/cloud-nuke/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestListKinesisStreams(t *testing.T) { + t.Parallel() + + region, err := getRandomRegion() + require.NoError(t, err) + + session, err := session.NewSession(&aws.Config{Region: aws.String(region)}) + require.NoError(t, err) + svc := kinesis.New(session) + + sName := createKinesisStream(t, svc) + defer deleteKinesisStream(t, svc, sName, true) + + sNames, err := getAllKinesisStreams(session, config.Config{}) + require.NoError(t, err) + assert.Contains(t, aws.StringValueSlice(sNames), aws.StringValue(sName)) +} + +func TestNukeKinesisStreamOne(t *testing.T) { + t.Parallel() + + region, err := getRandomRegion() + require.NoError(t, err) + + session, err := session.NewSession(&aws.Config{Region: aws.String(region)}) + require.NoError(t, err) + svc := kinesis.New(session) + + // We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke. + sName := createKinesisStream(t, svc) + defer deleteKinesisStream(t, svc, sName, true) + identifiers := []*string{sName} + + require.NoError( + t, + nukeAllKinesisStreams(session, identifiers), + ) + + assertKinesisStreamsDeleted(t, svc, identifiers) +} + +func TestNukeKinesisStreamMoreThanOne(t *testing.T) { + t.Parallel() + + region, err := getRandomRegion() + require.NoError(t, err) + + session, err := session.NewSession(&aws.Config{Region: aws.String(region)}) + require.NoError(t, err) + svc := kinesis.New(session) + + sNames := []*string{} + for i := 0; i < 3; i++ { + // We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke. + sName := createKinesisStream(t, svc) + defer deleteKinesisStream(t, svc, sName, true) + sNames = append(sNames, sName) + } + + require.NoError( + t, + nukeAllKinesisStreams(session, sNames), + ) + + assertKinesisStreamsDeleted(t, svc, sNames) +} + +func createKinesisStream(t *testing.T, svc *kinesis.Kinesis) *string { + uniqueID := util.UniqueID() + name := fmt.Sprintf("cloud-nuke-test-%s", strings.ToLower(uniqueID)) + + _, err := svc.CreateStream(&kinesis.CreateStreamInput{ + ShardCount: aws.Int64(1), + StreamName: aws.String(name), + }) + require.NoError(t, err) + + // Add an arbitrary sleep to account for eventual consistency + time.Sleep(15 * time.Second) + return &name +} + +func deleteKinesisStream(t *testing.T, svc *kinesis.Kinesis, name *string, checkErr bool) { + _, err := svc.DeleteStream(&kinesis.DeleteStreamInput{ + StreamName: name, + }) + if checkErr { + require.NoError(t, err) + } +} + +func assertKinesisStreamsDeleted(t *testing.T, svc *kinesis.Kinesis, identifiers []*string) { + for _, name := range identifiers { + _, err := svc.DescribeStream(&kinesis.DescribeStreamInput{ + StreamName: name, + }) + + require.NotNil(t, err) + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "ResourceNotFoundException" { + t.Fatalf("Stream %s is not deleted", aws.StringValue(name)) + } + } +} diff --git a/aws/kinesis_stream_types.go b/aws/kinesis_stream_types.go new file mode 100644 index 000000000..7c61946ac --- /dev/null +++ b/aws/kinesis_stream_types.go @@ -0,0 +1,38 @@ +package aws + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/gruntwork-io/go-commons/errors" +) + +// KinesisStreams - represents all Kinesis streams +type KinesisStreams struct { + Names []string +} + +// ResourceName - The simple name of the AWS resource +func (k KinesisStreams) ResourceName() string { + return "kinesis-stream" +} + +// ResourceIdentifiers - The names of the Kinesis Streams +func (k KinesisStreams) ResourceIdentifiers() []string { + return k.Names +} + +func (k KinesisStreams) MaxBatchSize() int { + // Tentative batch size to ensure AWS doesn't throttle. Note that Kinesis Streams does not support bulk delete, so + // we will be deleting this many in parallel using go routines. We pick 35 here, which is half of what the AWS web + // console will do. We pick a conservative number here to avoid hitting AWS API rate limits. + return 35 +} + +// Nuke - nuke 'em all!!! +func (k KinesisStreams) Nuke(session *session.Session, identifiers []string) error { + if err := nukeAllKinesisStreams(session, aws.StringSlice(identifiers)); err != nil { + return errors.WithStackTrace(err) + } + + return nil +} diff --git a/config/config.go b/config/config.go index a52d77c0c..3ce9c55ff 100644 --- a/config/config.go +++ b/config/config.go @@ -33,6 +33,7 @@ type Config struct { CloudWatchLogGroup ResourceType `yaml:"CloudWatchLogGroup"` KMSCustomerKeys ResourceType `yaml:"KMSCustomerKeys"` EKSCluster ResourceType `yaml:"EKSCluster"` + KinesisStream ResourceType `yaml:"KinesisStream"` } type ResourceType struct { diff --git a/config/config_test.go b/config/config_test.go index c6927b270..321f92a19 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -34,6 +34,7 @@ func emptyConfig() *Config { ResourceType{FilterRule{}, FilterRule{}}, ResourceType{FilterRule{}, FilterRule{}}, ResourceType{FilterRule{}, FilterRule{}}, + ResourceType{FilterRule{}, FilterRule{}}, } }