Skip to content

Commit

Permalink
refactor kinesis streams (#535)
Browse files Browse the repository at this point in the history
  • Loading branch information
james03160927 authored Aug 1, 2023
1 parent d41ecee commit c7f9700
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 179 deletions.
2 changes: 1 addition & 1 deletion aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
}
if IsNukeable(kinesisStreams.ResourceName(), resourceTypes) {
start := time.Now()
streams, err := getAllKinesisStreams(cloudNukeSession, configObj)
streams, err := kinesisStreams.getAll(configObj)
if err != nil {
ge := report.GeneralError{
Error: err,
Expand Down
73 changes: 20 additions & 53 deletions aws/kinesis_stream.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,44 @@
package aws

import (
"context"
"sync"

"github.com/gruntwork-io/cloud-nuke/telemetry"
commonTelemetry "github.com/gruntwork-io/go-commons/telemetry"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/logging"
"github.com/gruntwork-io/cloud-nuke/report"
"github.com/gruntwork-io/go-commons/errors"
"github.com/hashicorp/go-multierror"
)

func getAllKinesisStreams(session *session.Session, configObj config.Config) ([]*string, error) {
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
if err != nil {
return []*string{}, errors.WithStackTrace(err)
}
svc := kinesis.NewFromConfig(cfg)

func (ks KinesisStreams) getAll(configObj config.Config) ([]*string, error) {
allStreams := []*string{}

paginator := kinesis.NewListStreamsPaginator(svc, nil)

for paginator.HasMorePages() {
resp, err := paginator.NextPage(context.TODO())
if err != nil {
return []*string{}, errors.WithStackTrace(err)
}
for _, stream := range resp.StreamNames {
if shouldIncludeKinesisStream(aws.String(stream), configObj) {
allStreams = append(allStreams, aws.String(stream))
err := ks.Client.ListStreamsPages(&kinesis.ListStreamsInput{}, func(page *kinesis.ListStreamsOutput, lastPage bool) bool {
for _, stream := range page.StreamNames {
if configObj.KinesisStream.ShouldInclude(config.ResourceValue{
Name: stream,
}) {
allStreams = append(allStreams, stream)
}
}
}

return !lastPage
})
if err != nil {
return nil, errors.WithStackTrace(err)
}
return allStreams, nil
}

func shouldIncludeKinesisStream(streamName *string, configObj config.Config) bool {
if streamName == nil {
return false
}

return config.ShouldInclude(
aws.StringValue(streamName),
configObj.KinesisStream.IncludeRule.NamesRegExp,
configObj.KinesisStream.ExcludeRule.NamesRegExp,
)
return allStreams, nil
}

func nukeAllKinesisStreams(session *session.Session, identifiers []*string) error {
region := aws.StringValue(session.Config.Region)
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
if err != nil {
return err
}
svc := kinesis.NewFromConfig(cfg)

func (ks KinesisStreams) nukeAll(identifiers []*string) error {
if len(identifiers) == 0 {
logging.Logger.Debugf("No Kinesis Streams to nuke in region: %s", region)
logging.Logger.Debugf("No Kinesis Streams to nuke in region: %s", ks.Region)
}

// NOTE: we don't need to do pagination here, because the pagination is handled by the caller to this function,
Expand All @@ -83,13 +52,13 @@ func nukeAllKinesisStreams(session *session.Session, identifiers []*string) erro

// There is no bulk delete Kinesis Stream API, so we delete the batch of Kinesis Streams concurrently
// using go routines.
logging.Logger.Debugf("Deleting Kinesis Streams in region: %s", region)
logging.Logger.Debugf("Deleting Kinesis Streams in region: %s", ks.Region)
wg := new(sync.WaitGroup)
wg.Add(len(identifiers))
errChans := make([]chan error, len(identifiers))
for i, streamName := range identifiers {
errChans[i] = make(chan error, 1)
go deleteKinesisStreamAsync(wg, errChans[i], svc, streamName, region)
go ks.deleteAsync(wg, errChans[i], streamName)
}
wg.Wait()

Expand All @@ -103,7 +72,7 @@ func nukeAllKinesisStreams(session *session.Session, identifiers []*string) erro
telemetry.TrackEvent(commonTelemetry.EventContext{
EventName: "Error Nuking Kinesis Stream",
}, map[string]interface{}{
"region": *session.Config.Region,
"region": ks.Region,
})
allErrs = multierror.Append(allErrs, err)
}
Expand All @@ -116,16 +85,14 @@ func nukeAllKinesisStreams(session *session.Session, identifiers []*string) erro
return nil
}

func deleteKinesisStreamAsync(
func (ks KinesisStreams) deleteAsync(
wg *sync.WaitGroup,
errChan chan error,
svc *kinesis.Client,
streamName *string,
region string,
) {
defer wg.Done()
input := &kinesis.DeleteStreamInput{StreamName: streamName}
_, err := svc.DeleteStream(context.TODO(), input)
_, err := ks.Client.DeleteStream(input)

// Record status of this resource
e := report.Entry{
Expand All @@ -139,9 +106,9 @@ func deleteKinesisStreamAsync(

streamNameStr := aws.StringValue(streamName)
if err == nil {
logging.Logger.Debugf("[OK] Kinesis Stream %s delete in %s", streamNameStr, region)
logging.Logger.Debugf("[OK] Kinesis Stream %s delete in %s", streamNameStr, ks.Region)
} else {
logging.Logger.Debugf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, region, err)
logging.Logger.Debugf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, ks.Region, err)
}
}

Expand Down
178 changes: 59 additions & 119 deletions aws/kinesis_stream_test.go
Original file line number Diff line number Diff line change
@@ -1,145 +1,85 @@
package aws

import (
"context"
"fmt"
"github.com/gruntwork-io/cloud-nuke/telemetry"
"strings"
"testing"
"time"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/util"
"github.com/stretchr/testify/assert"
"github.com/gruntwork-io/cloud-nuke/telemetry"
"github.com/stretchr/testify/require"
"regexp"
"testing"
)

func TestListKinesisStreams(t *testing.T) {
telemetry.InitTelemetry("cloud-nuke", "")
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)

cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
require.NoError(t, err)

svc := kinesis.NewFromConfig(cfg)
type mockedKinesisClient struct {
kinesisiface.KinesisAPI
ListStreamsOutput kinesis.ListStreamsOutput
DeleteStreamOutput kinesis.DeleteStreamOutput
}

sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
func (m mockedKinesisClient) ListStreamsPages(input *kinesis.ListStreamsInput, fn func(*kinesis.ListStreamsOutput, bool) bool) error {
fn(&m.ListStreamsOutput, true)
return nil
}

sNames, err := getAllKinesisStreams(session, config.Config{})
require.NoError(t, err)
assert.Contains(t, aws.StringValueSlice(sNames), aws.StringValue(sName))
func (m mockedKinesisClient) DeleteStream(input *kinesis.DeleteStreamInput) (*kinesis.DeleteStreamOutput, error) {
return &m.DeleteStreamOutput, nil
}

func TestNukeKinesisStreamOne(t *testing.T) {
func TestKinesisStreams_GetAll(t *testing.T) {
telemetry.InitTelemetry("cloud-nuke", "")
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)

cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
require.NoError(t, err)

svc := kinesis.NewFromConfig(cfg)

// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
identifiers := []*string{sName}
testName1 := "stream1"
testName2 := "stream2"
ks := KinesisStreams{
Client: mockedKinesisClient{
ListStreamsOutput: kinesis.ListStreamsOutput{
StreamNames: []*string{aws.String(testName1), aws.String(testName2)},
},
},
}

require.NoError(
t,
nukeAllKinesisStreams(session, identifiers),
)
tests := map[string]struct {
configObj config.ResourceType
expected []string
}{
"emptyFilter": {
configObj: config.ResourceType{},
expected: []string{testName1, testName2},
},
"nameExclusionFilter": {
configObj: config.ResourceType{
ExcludeRule: config.FilterRule{
NamesRegExp: []config.Expression{{
RE: *regexp.MustCompile(testName1),
}}},
},
expected: []string{testName2},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
names, err := ks.getAll(config.Config{
KinesisStream: tc.configObj,
})
require.NoError(t, err)
require.Equal(t, tc.expected, aws.StringValueSlice(names))
})
}

assertKinesisStreamsDeleted(t, svc, identifiers)
}

func TestNukeKinesisStreamMoreThanOne(t *testing.T) {
func TestKinesisStreams_NukeAll(t *testing.T) {
telemetry.InitTelemetry("cloud-nuke", "")
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)

cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
require.NoError(t, err)

svc := kinesis.NewFromConfig(cfg)

sNames := []*string{}
for i := 0; i < 3; i++ {
// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
sNames = append(sNames, sName)
ks := KinesisStreams{
Client: mockedKinesisClient{
DeleteStreamOutput: kinesis.DeleteStreamOutput{},
},
}

require.NoError(
t,
nukeAllKinesisStreams(session, sNames),
)

assertKinesisStreamsDeleted(t, svc, sNames)
}

func createKinesisStream(t *testing.T, svc *kinesis.Client) *string {
uniqueID := util.UniqueID()
name := fmt.Sprintf("cloud-nuke-test-%s", strings.ToLower(uniqueID))

_, err := svc.CreateStream(context.TODO(), &kinesis.CreateStreamInput{
ShardCount: aws.Int32(1),
StreamName: aws.String(name),
})
err := ks.nukeAll([]*string{aws.String("test")})
require.NoError(t, err)

// Add an arbitrary sleep to account for eventual consistency
time.Sleep(15 * time.Second)
return &name
}

func deleteKinesisStream(t *testing.T, svc *kinesis.Client, name *string, checkErr bool) {
_, err := svc.DeleteStream(context.TODO(), &kinesis.DeleteStreamInput{
StreamName: name,
})
if checkErr {
require.NoError(t, err)
}
}

func assertKinesisStreamsDeleted(t *testing.T, svc *kinesis.Client, identifiers []*string) {
for _, name := range identifiers {
stream, err := svc.DescribeStream(context.TODO(), &kinesis.DescribeStreamInput{
StreamName: name,
})

// There is an error returned, assert it's because the Stream cannot be found because it's
// been deleted. Otherwise assert that the stream status is DELETING.
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "ResourceNotFoundException" {
t.Fatalf("Stream %s is not deleted", aws.StringValue(name))
}
} else {
require.Equal(t, types.StreamStatusDeleting, stream.StreamDescription.StreamStatus)
}
}
}
12 changes: 6 additions & 6 deletions aws/kinesis_stream_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ type KinesisStreams struct {
}

// ResourceName - The simple name of the AWS resource
func (k KinesisStreams) ResourceName() string {
func (ks KinesisStreams) ResourceName() string {
return "kinesis-stream"
}

// ResourceIdentifiers - The names of the Kinesis Streams
func (k KinesisStreams) ResourceIdentifiers() []string {
return k.Names
func (ks KinesisStreams) ResourceIdentifiers() []string {
return ks.Names
}

func (k KinesisStreams) MaxBatchSize() int {
func (ks KinesisStreams) MaxBatchSize() int {
// Tentative batch size to ensure AWS doesn't throttle. Note that Kinesis Streams does not support bulk delete, so
// we will be deleting this many in parallel using go routines. We pick 35 here, which is half of what the AWS web
// console will do. We pick a conservative number here to avoid hitting AWS API rate limits.
return 35
}

// Nuke - nuke 'em all!!!
func (k KinesisStreams) Nuke(session *session.Session, identifiers []string) error {
if err := nukeAllKinesisStreams(session, aws.StringSlice(identifiers)); err != nil {
func (ks KinesisStreams) Nuke(session *session.Session, identifiers []string) error {
if err := ks.nukeAll(aws.StringSlice(identifiers)); err != nil {
return errors.WithStackTrace(err)
}

Expand Down

0 comments on commit c7f9700

Please sign in to comment.