Skip to content

Commit

Permalink
feat: add kinesis stream support
Browse files Browse the repository at this point in the history
  • Loading branch information
Taliesin Millhouse committed Jun 10, 2022
1 parent a696846 commit 34406db
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 12 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ The currently supported functionality includes:
- Deleting all IAM OpenID Connect Providers
- Deleting all Customer managed keys from Key Management Service in an AWS account
- Deleting all CloudWatch Log Groups in an AWS Account
- Deleting all Kinesis Streams in an AWS account

### BEWARE!

Expand Down Expand Up @@ -219,6 +220,9 @@ The following resources support the Config file:
- EKS Clusters
- Resource type: `ekscluster`
- Config key: `EKSCluster`
- Kinesis Streams
- Resource type: `kinesis-stream`
- Config key: `KinesisStream`

#### Example

Expand Down Expand Up @@ -327,6 +331,7 @@ To find out what we options are supported in the config file today, consult this
| eip | none | ✅ | none | none |
| ec2 | none | ✅ | none | none |
| eks | none | ✅ | none | none |
| kinesis-stream | none | ✅ | none | none |
| acmpca | none | none | none | none |
| iam role | none | none | none | none |
| ... (more to come) | none | none | none | none |
Expand Down
35 changes: 23 additions & 12 deletions aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func getRandomRegionWithExclusions(regionsToExclude []string) (string, error) {
rand.Seed(time.Now().UnixNano())

// exclude from "allRegions"
var exclusions = make(map[string]string)
exclusions := make(map[string]string)
for _, region := range regionsToExclude {
exclusions[region] = region
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp

count := 1
totalRegions := len(targetRegions)
var resourcesCache = map[string]map[string][]*string{}
resourcesCache := map[string]map[string][]*string{}

for _, region := range targetRegions {
// The "global" region case is handled outside this loop
Expand All @@ -228,9 +228,9 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
logging.Logger.Infof("Checking region [%d/%d]: %s", count, totalRegions, region)

session, err := session.NewSession(&awsgo.Config{
Region: awsgo.String(region)},
Region: awsgo.String(region),
},
)

if err != nil {
return nil, errors.WithStackTrace(err)
}
Expand Down Expand Up @@ -517,7 +517,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
dbInstances := DBInstances{}
if IsNukeable(dbInstances.ResourceName(), resourceTypes) {
instanceNames, err := getAllRdsInstances(session, excludeAfter)

if err != nil {
return nil, errors.WithStackTrace(err)
}
Expand All @@ -535,7 +534,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
dbClusters := DBClusters{}
if IsNukeable(dbClusters.ResourceName(), resourceTypes) {
clustersNames, err := getAllRdsClusters(session, excludeAfter)

if err != nil {
return nil, errors.WithStackTrace(err)
}
Expand All @@ -551,7 +549,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
lambdaFunctions := LambdaFunctions{}
if IsNukeable(lambdaFunctions.ResourceName(), resourceTypes) {
lambdaFunctionNames, err := getAllLambdaFunctions(session, excludeAfter, configObj, lambdaFunctions.MaxBatchSize())

if err != nil {
return nil, errors.WithStackTrace(err)
}
Expand Down Expand Up @@ -663,7 +660,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
DynamoDB := DynamoDB{}
if IsNukeable(DynamoDB.ResourceName(), resourceTypes) {
tablenames, err := getAllDynamoTables(session, excludeAfter, configObj, DynamoDB)

if err != nil {
return nil, errors.WithStackTrace(err)
}
Expand Down Expand Up @@ -721,6 +717,20 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
}
// End KMS Customer managed keys

// Kinesis Streams
kinesisStreams := KinesisStreams{}
if IsNukeable(kinesisStreams.ResourceName(), resourceTypes) {
streams, err := getAllKinesisStreams(session, configObj)
if err != nil {
return nil, errors.WithStackTrace(err)
}
if len(streams) > 0 {
kinesisStreams.Names = awsgo.StringValueSlice(streams)
resourcesInRegion.Resources = append(resourcesInRegion.Resources, kinesisStreams)
}
}
// End Kinesis Streams

if len(resourcesInRegion.Resources) > 0 {
account.Resources[region] = resourcesInRegion
}
Expand All @@ -735,7 +745,8 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
// As there is no actual region named global we have to pick a valid one just to create the session
sessionRegion := defaultRegion
session, err := session.NewSession(&awsgo.Config{
Region: awsgo.String(sessionRegion)},
Region: awsgo.String(sessionRegion),
},
)
if err != nil {
return nil, errors.WithStackTrace(err)
Expand All @@ -747,7 +758,6 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
iamUsers := IAMUsers{}
if IsNukeable(iamUsers.ResourceName(), resourceTypes) {
userNames, err := getAllIamUsers(session, excludeAfter, configObj)

if err != nil {
return nil, errors.WithStackTrace(err)
}
Expand Down Expand Up @@ -816,6 +826,7 @@ func ListResourceTypes() []string {
OIDCProviders{}.ResourceName(),
KmsCustomerKeys{}.ResourceName(),
CloudWatchLogGroups{}.ResourceName(),
KinesisStreams{}.ResourceName(),
}
sort.Strings(resourceTypes)
return resourceTypes
Expand Down Expand Up @@ -881,9 +892,9 @@ func NukeAllResources(account *AwsAccountResources, regions []string) error {
}

session, err := session.NewSession(&awsgo.Config{
Region: awsgo.String(sessionRegion)},
Region: awsgo.String(sessionRegion),
},
)

if err != nil {
return errors.WithStackTrace(err)
}
Expand Down
122 changes: 122 additions & 0 deletions aws/kinesis_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package aws

import (
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/logging"
"github.com/gruntwork-io/go-commons/errors"
"github.com/hashicorp/go-multierror"
)

func getAllKinesisStreams(session *session.Session, configObj config.Config) ([]*string, error) {
svc := kinesis.New(session)

allStreams := []*string{}
err := svc.ListStreamsPages(
&kinesis.ListStreamsInput{},
func(page *kinesis.ListStreamsOutput, lastPage bool) bool {
for _, streamName := range page.StreamNames {
if shouldIncludeKinesisStream(streamName, configObj) {
allStreams = append(allStreams, streamName)
}
}
return !lastPage
},
)
if err != nil {
return nil, errors.WithStackTrace(err)
}
return allStreams, nil
}

func shouldIncludeKinesisStream(streamName *string, configObj config.Config) bool {
if streamName == nil {
return false
}

return config.ShouldInclude(
aws.StringValue(streamName),
configObj.KinesisStream.IncludeRule.NamesRegExp,
configObj.KinesisStream.ExcludeRule.NamesRegExp,
)
}

func nukeAllKinesisStreams(session *session.Session, identifiers []*string) error {
region := aws.StringValue(session.Config.Region)
svc := kinesis.New(session)

if len(identifiers) == 0 {
logging.Logger.Infof("No Kinesis Streams to nuke in region: %s", region)
}

// NOTE: we don't need to do pagination here, because the pagination is handled by the caller to this function,
// based on KinesisStream.MaxBatchSize, however we add a guard here to warn users when the batching fails and
// has a chance of throttling AWS. Since we concurrently make one call for each identifier, we pick 100 for the
// limit here because many APIs in AWS have a limit of 100 requests per second.
if len(identifiers) > 100 {
logging.Logger.Errorf("Nuking too many Kinesis Streams at once (100): halting to avoid hitting AWS API rate limiting")
return TooManyStreamsErr{}
}

// There is no bulk delete Kinesis Stream API, so we delete the batch of Kinesis Streams concurrently
// using go routines.
logging.Logger.Infof("Deleting Kinesis Streams in region: %s", region)
wg := new(sync.WaitGroup)
wg.Add(len(identifiers))
errChans := make([]chan error, len(identifiers))
for i, streamName := range identifiers {
errChans[i] = make(chan error, 1)
go deleteKinesisStreamAsync(wg, errChans[i], svc, streamName, region)
}
wg.Wait()

// Collect all the errors from the async delete calls into a single error struct.
// NOTE: We ignore OperationAbortedException which is thrown when there is an eventual consistency issue, where
// cloud-nuke picks up a Stream that is already requested to be deleted.
var allErrs *multierror.Error
for _, errChan := range errChans {
if err := <-errChan; err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "OperationAbortedException" {
allErrs = multierror.Append(allErrs, err)
}
}
}
finalErr := allErrs.ErrorOrNil()
if finalErr != nil {
return errors.WithStackTrace(finalErr)
}
return nil
}

func deleteKinesisStreamAsync(
wg *sync.WaitGroup,
errChan chan error,
svc *kinesis.Kinesis,
streamName *string,
region string,
) {
defer wg.Done()
input := &kinesis.DeleteStreamInput{StreamName: streamName}
_, err := svc.DeleteStream(input)
errChan <- err

streamNameStr := aws.StringValue(streamName)
if err == nil {
logging.Logger.Infof("[OK] Kinesis Stream %s delete in %s", streamNameStr, region)
} else {
logging.Logger.Errorf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, region, err)
}
}

// Custom errors

type TooManyStreamsErr struct{}

func (err TooManyStreamsErr) Error() string {
return "Too many Streams requested at once."
}
121 changes: 121 additions & 0 deletions aws/kinesis_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package aws

import (
"fmt"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestListKinesisStreams(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)
svc := kinesis.New(session)

sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)

sNames, err := getAllKinesisStreams(session, config.Config{})
require.NoError(t, err)
assert.Contains(t, aws.StringValueSlice(sNames), aws.StringValue(sName))
}

func TestNukeKinesisStreamOne(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)
svc := kinesis.New(session)

// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
identifiers := []*string{sName}

require.NoError(
t,
nukeAllKinesisStreams(session, identifiers),
)

assertKinesisStreamsDeleted(t, svc, identifiers)
}

func TestNukeKinesisStreamMoreThanOne(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)
svc := kinesis.New(session)

sNames := []*string{}
for i := 0; i < 3; i++ {
// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
sNames = append(sNames, sName)
}

require.NoError(
t,
nukeAllKinesisStreams(session, sNames),
)

assertKinesisStreamsDeleted(t, svc, sNames)
}

func createKinesisStream(t *testing.T, svc *kinesis.Kinesis) *string {
uniqueID := util.UniqueID()
name := fmt.Sprintf("cloud-nuke-test-%s", strings.ToLower(uniqueID))

_, err := svc.CreateStream(&kinesis.CreateStreamInput{
ShardCount: aws.Int64(1),
StreamName: aws.String(name),
})
require.NoError(t, err)

// Add an arbitrary sleep to account for eventual consistency
time.Sleep(15 * time.Second)
return &name
}

func deleteKinesisStream(t *testing.T, svc *kinesis.Kinesis, name *string, checkErr bool) {
_, err := svc.DeleteStream(&kinesis.DeleteStreamInput{
StreamName: name,
})
if checkErr {
require.NoError(t, err)
}
}

func assertKinesisStreamsDeleted(t *testing.T, svc *kinesis.Kinesis, identifiers []*string) {
for _, name := range identifiers {
_, err := svc.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: name,
})

require.NotNil(t, err)
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "ResourceNotFoundException" {
t.Fatalf("Stream %s is not deleted", aws.StringValue(name))
}
}
}
Loading

0 comments on commit 34406db

Please sign in to comment.