diff --git a/aws/aws.go b/aws/aws.go index 6f2f9836..ae522ee8 100644 --- a/aws/aws.go +++ b/aws/aws.go @@ -1558,7 +1558,7 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp } if IsNukeable(kinesisStreams.ResourceName(), resourceTypes) { start := time.Now() - streams, err := getAllKinesisStreams(cloudNukeSession, configObj) + streams, err := kinesisStreams.getAll(configObj) if err != nil { ge := report.GeneralError{ Error: err, diff --git a/aws/kinesis_stream.go b/aws/kinesis_stream.go index c5bfec85..48defba8 100644 --- a/aws/kinesis_stream.go +++ b/aws/kinesis_stream.go @@ -1,17 +1,14 @@ package aws import ( - "context" "sync" "github.com/gruntwork-io/cloud-nuke/telemetry" commonTelemetry "github.com/gruntwork-io/go-commons/telemetry" - awsconfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" "github.com/gruntwork-io/cloud-nuke/config" "github.com/gruntwork-io/cloud-nuke/logging" "github.com/gruntwork-io/cloud-nuke/report" @@ -19,57 +16,29 @@ import ( "github.com/hashicorp/go-multierror" ) -func getAllKinesisStreams(session *session.Session, configObj config.Config) ([]*string, error) { - cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region))) - if err != nil { - return []*string{}, errors.WithStackTrace(err) - } - svc := kinesis.NewFromConfig(cfg) - +func (ks KinesisStreams) getAll(configObj config.Config) ([]*string, error) { allStreams := []*string{} - - paginator := kinesis.NewListStreamsPaginator(svc, nil) - - for paginator.HasMorePages() { - resp, err := paginator.NextPage(context.TODO()) - if err != nil { - return []*string{}, errors.WithStackTrace(err) - } - for _, stream := range resp.StreamNames { - if shouldIncludeKinesisStream(aws.String(stream), configObj) { - allStreams = append(allStreams, aws.String(stream)) + err := ks.Client.ListStreamsPages(&kinesis.ListStreamsInput{}, func(page *kinesis.ListStreamsOutput, lastPage bool) bool { + for _, stream := range page.StreamNames { + if configObj.KinesisStream.ShouldInclude(config.ResourceValue{ + Name: stream, + }) { + allStreams = append(allStreams, stream) } } - } + return !lastPage + }) if err != nil { return nil, errors.WithStackTrace(err) } - return allStreams, nil -} - -func shouldIncludeKinesisStream(streamName *string, configObj config.Config) bool { - if streamName == nil { - return false - } - return config.ShouldInclude( - aws.StringValue(streamName), - configObj.KinesisStream.IncludeRule.NamesRegExp, - configObj.KinesisStream.ExcludeRule.NamesRegExp, - ) + return allStreams, nil } -func nukeAllKinesisStreams(session *session.Session, identifiers []*string) error { - region := aws.StringValue(session.Config.Region) - cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region))) - if err != nil { - return err - } - svc := kinesis.NewFromConfig(cfg) - +func (ks KinesisStreams) nukeAll(identifiers []*string) error { if len(identifiers) == 0 { - logging.Logger.Debugf("No Kinesis Streams to nuke in region: %s", region) + logging.Logger.Debugf("No Kinesis Streams to nuke in region: %s", ks.Region) } // NOTE: we don't need to do pagination here, because the pagination is handled by the caller to this function, @@ -83,13 +52,13 @@ func nukeAllKinesisStreams(session *session.Session, identifiers []*string) erro // There is no bulk delete Kinesis Stream API, so we delete the batch of Kinesis Streams concurrently // using go routines. - logging.Logger.Debugf("Deleting Kinesis Streams in region: %s", region) + logging.Logger.Debugf("Deleting Kinesis Streams in region: %s", ks.Region) wg := new(sync.WaitGroup) wg.Add(len(identifiers)) errChans := make([]chan error, len(identifiers)) for i, streamName := range identifiers { errChans[i] = make(chan error, 1) - go deleteKinesisStreamAsync(wg, errChans[i], svc, streamName, region) + go ks.deleteAsync(wg, errChans[i], streamName) } wg.Wait() @@ -103,7 +72,7 @@ func nukeAllKinesisStreams(session *session.Session, identifiers []*string) erro telemetry.TrackEvent(commonTelemetry.EventContext{ EventName: "Error Nuking Kinesis Stream", }, map[string]interface{}{ - "region": *session.Config.Region, + "region": ks.Region, }) allErrs = multierror.Append(allErrs, err) } @@ -116,16 +85,14 @@ func nukeAllKinesisStreams(session *session.Session, identifiers []*string) erro return nil } -func deleteKinesisStreamAsync( +func (ks KinesisStreams) deleteAsync( wg *sync.WaitGroup, errChan chan error, - svc *kinesis.Client, streamName *string, - region string, ) { defer wg.Done() input := &kinesis.DeleteStreamInput{StreamName: streamName} - _, err := svc.DeleteStream(context.TODO(), input) + _, err := ks.Client.DeleteStream(input) // Record status of this resource e := report.Entry{ @@ -139,9 +106,9 @@ func deleteKinesisStreamAsync( streamNameStr := aws.StringValue(streamName) if err == nil { - logging.Logger.Debugf("[OK] Kinesis Stream %s delete in %s", streamNameStr, region) + logging.Logger.Debugf("[OK] Kinesis Stream %s delete in %s", streamNameStr, ks.Region) } else { - logging.Logger.Debugf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, region, err) + logging.Logger.Debugf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, ks.Region, err) } } diff --git a/aws/kinesis_stream_test.go b/aws/kinesis_stream_test.go index a213caac..603c35dd 100644 --- a/aws/kinesis_stream_test.go +++ b/aws/kinesis_stream_test.go @@ -1,145 +1,85 @@ package aws import ( - "context" - "fmt" - "github.com/gruntwork-io/cloud-nuke/telemetry" - "strings" - "testing" - "time" - - awsconfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/kinesis" - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" "github.com/gruntwork-io/cloud-nuke/config" - "github.com/gruntwork-io/cloud-nuke/util" - "github.com/stretchr/testify/assert" + "github.com/gruntwork-io/cloud-nuke/telemetry" "github.com/stretchr/testify/require" + "regexp" + "testing" ) -func TestListKinesisStreams(t *testing.T) { - telemetry.InitTelemetry("cloud-nuke", "") - t.Parallel() - - region, err := getRandomRegion() - require.NoError(t, err) - - session, err := session.NewSession(&aws.Config{Region: aws.String(region)}) - require.NoError(t, err) - - cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region))) - require.NoError(t, err) - - svc := kinesis.NewFromConfig(cfg) +type mockedKinesisClient struct { + kinesisiface.KinesisAPI + ListStreamsOutput kinesis.ListStreamsOutput + DeleteStreamOutput kinesis.DeleteStreamOutput +} - sName := createKinesisStream(t, svc) - defer deleteKinesisStream(t, svc, sName, true) +func (m mockedKinesisClient) ListStreamsPages(input *kinesis.ListStreamsInput, fn func(*kinesis.ListStreamsOutput, bool) bool) error { + fn(&m.ListStreamsOutput, true) + return nil +} - sNames, err := getAllKinesisStreams(session, config.Config{}) - require.NoError(t, err) - assert.Contains(t, aws.StringValueSlice(sNames), aws.StringValue(sName)) +func (m mockedKinesisClient) DeleteStream(input *kinesis.DeleteStreamInput) (*kinesis.DeleteStreamOutput, error) { + return &m.DeleteStreamOutput, nil } -func TestNukeKinesisStreamOne(t *testing.T) { +func TestKinesisStreams_GetAll(t *testing.T) { telemetry.InitTelemetry("cloud-nuke", "") t.Parallel() - region, err := getRandomRegion() - require.NoError(t, err) - - session, err := session.NewSession(&aws.Config{Region: aws.String(region)}) - require.NoError(t, err) - - cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region))) - require.NoError(t, err) - - svc := kinesis.NewFromConfig(cfg) - - // We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke. - sName := createKinesisStream(t, svc) - defer deleteKinesisStream(t, svc, sName, true) - identifiers := []*string{sName} + testName1 := "stream1" + testName2 := "stream2" + ks := KinesisStreams{ + Client: mockedKinesisClient{ + ListStreamsOutput: kinesis.ListStreamsOutput{ + StreamNames: []*string{aws.String(testName1), aws.String(testName2)}, + }, + }, + } - require.NoError( - t, - nukeAllKinesisStreams(session, identifiers), - ) + tests := map[string]struct { + configObj config.ResourceType + expected []string + }{ + "emptyFilter": { + configObj: config.ResourceType{}, + expected: []string{testName1, testName2}, + }, + "nameExclusionFilter": { + configObj: config.ResourceType{ + ExcludeRule: config.FilterRule{ + NamesRegExp: []config.Expression{{ + RE: *regexp.MustCompile(testName1), + }}}, + }, + expected: []string{testName2}, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + names, err := ks.getAll(config.Config{ + KinesisStream: tc.configObj, + }) + require.NoError(t, err) + require.Equal(t, tc.expected, aws.StringValueSlice(names)) + }) + } - assertKinesisStreamsDeleted(t, svc, identifiers) } -func TestNukeKinesisStreamMoreThanOne(t *testing.T) { +func TestKinesisStreams_NukeAll(t *testing.T) { telemetry.InitTelemetry("cloud-nuke", "") t.Parallel() - region, err := getRandomRegion() - require.NoError(t, err) - - session, err := session.NewSession(&aws.Config{Region: aws.String(region)}) - require.NoError(t, err) - - cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region))) - require.NoError(t, err) - - svc := kinesis.NewFromConfig(cfg) - - sNames := []*string{} - for i := 0; i < 3; i++ { - // We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke. - sName := createKinesisStream(t, svc) - defer deleteKinesisStream(t, svc, sName, true) - sNames = append(sNames, sName) + ks := KinesisStreams{ + Client: mockedKinesisClient{ + DeleteStreamOutput: kinesis.DeleteStreamOutput{}, + }, } - require.NoError( - t, - nukeAllKinesisStreams(session, sNames), - ) - - assertKinesisStreamsDeleted(t, svc, sNames) -} - -func createKinesisStream(t *testing.T, svc *kinesis.Client) *string { - uniqueID := util.UniqueID() - name := fmt.Sprintf("cloud-nuke-test-%s", strings.ToLower(uniqueID)) - - _, err := svc.CreateStream(context.TODO(), &kinesis.CreateStreamInput{ - ShardCount: aws.Int32(1), - StreamName: aws.String(name), - }) + err := ks.nukeAll([]*string{aws.String("test")}) require.NoError(t, err) - - // Add an arbitrary sleep to account for eventual consistency - time.Sleep(15 * time.Second) - return &name -} - -func deleteKinesisStream(t *testing.T, svc *kinesis.Client, name *string, checkErr bool) { - _, err := svc.DeleteStream(context.TODO(), &kinesis.DeleteStreamInput{ - StreamName: name, - }) - if checkErr { - require.NoError(t, err) - } -} - -func assertKinesisStreamsDeleted(t *testing.T, svc *kinesis.Client, identifiers []*string) { - for _, name := range identifiers { - stream, err := svc.DescribeStream(context.TODO(), &kinesis.DescribeStreamInput{ - StreamName: name, - }) - - // There is an error returned, assert it's because the Stream cannot be found because it's - // been deleted. Otherwise assert that the stream status is DELETING. - if err != nil { - if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "ResourceNotFoundException" { - t.Fatalf("Stream %s is not deleted", aws.StringValue(name)) - } - } else { - require.Equal(t, types.StreamStatusDeleting, stream.StreamDescription.StreamStatus) - } - } } diff --git a/aws/kinesis_stream_types.go b/aws/kinesis_stream_types.go index 464be599..694bb5c8 100644 --- a/aws/kinesis_stream_types.go +++ b/aws/kinesis_stream_types.go @@ -15,16 +15,16 @@ type KinesisStreams struct { } // ResourceName - The simple name of the AWS resource -func (k KinesisStreams) ResourceName() string { +func (ks KinesisStreams) ResourceName() string { return "kinesis-stream" } // ResourceIdentifiers - The names of the Kinesis Streams -func (k KinesisStreams) ResourceIdentifiers() []string { - return k.Names +func (ks KinesisStreams) ResourceIdentifiers() []string { + return ks.Names } -func (k KinesisStreams) MaxBatchSize() int { +func (ks KinesisStreams) MaxBatchSize() int { // Tentative batch size to ensure AWS doesn't throttle. Note that Kinesis Streams does not support bulk delete, so // we will be deleting this many in parallel using go routines. We pick 35 here, which is half of what the AWS web // console will do. We pick a conservative number here to avoid hitting AWS API rate limits. @@ -32,8 +32,8 @@ func (k KinesisStreams) MaxBatchSize() int { } // Nuke - nuke 'em all!!! -func (k KinesisStreams) Nuke(session *session.Session, identifiers []string) error { - if err := nukeAllKinesisStreams(session, aws.StringSlice(identifiers)); err != nil { +func (ks KinesisStreams) Nuke(session *session.Session, identifiers []string) error { + if err := ks.nukeAll(aws.StringSlice(identifiers)); err != nil { return errors.WithStackTrace(err) }