Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor kinesis streams #535

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
}
if IsNukeable(kinesisStreams.ResourceName(), resourceTypes) {
start := time.Now()
streams, err := getAllKinesisStreams(cloudNukeSession, configObj)
streams, err := kinesisStreams.getAll(configObj)
if err != nil {
ge := report.GeneralError{
Error: err,
Expand Down
73 changes: 20 additions & 53 deletions aws/kinesis_stream.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,44 @@
package aws

import (
"context"
"sync"

"github.com/gruntwork-io/cloud-nuke/telemetry"
commonTelemetry "github.com/gruntwork-io/go-commons/telemetry"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/logging"
"github.com/gruntwork-io/cloud-nuke/report"
"github.com/gruntwork-io/go-commons/errors"
"github.com/hashicorp/go-multierror"
)

func getAllKinesisStreams(session *session.Session, configObj config.Config) ([]*string, error) {
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
if err != nil {
return []*string{}, errors.WithStackTrace(err)
}
svc := kinesis.NewFromConfig(cfg)

func (ks KinesisStreams) getAll(configObj config.Config) ([]*string, error) {
allStreams := []*string{}

paginator := kinesis.NewListStreamsPaginator(svc, nil)

for paginator.HasMorePages() {
resp, err := paginator.NextPage(context.TODO())
if err != nil {
return []*string{}, errors.WithStackTrace(err)
}
for _, stream := range resp.StreamNames {
if shouldIncludeKinesisStream(aws.String(stream), configObj) {
allStreams = append(allStreams, aws.String(stream))
err := ks.Client.ListStreamsPages(&kinesis.ListStreamsInput{}, func(page *kinesis.ListStreamsOutput, lastPage bool) bool {
for _, stream := range page.StreamNames {
if configObj.KinesisStream.ShouldInclude(config.ResourceValue{
Name: stream,
}) {
allStreams = append(allStreams, stream)
}
}
}

return !lastPage
})
if err != nil {
return nil, errors.WithStackTrace(err)
}
return allStreams, nil
}

func shouldIncludeKinesisStream(streamName *string, configObj config.Config) bool {
if streamName == nil {
return false
}

return config.ShouldInclude(
aws.StringValue(streamName),
configObj.KinesisStream.IncludeRule.NamesRegExp,
configObj.KinesisStream.ExcludeRule.NamesRegExp,
)
return allStreams, nil
}

func nukeAllKinesisStreams(session *session.Session, identifiers []*string) error {
region := aws.StringValue(session.Config.Region)
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
if err != nil {
return err
}
svc := kinesis.NewFromConfig(cfg)

func (ks KinesisStreams) nukeAll(identifiers []*string) error {
if len(identifiers) == 0 {
logging.Logger.Debugf("No Kinesis Streams to nuke in region: %s", region)
logging.Logger.Debugf("No Kinesis Streams to nuke in region: %s", ks.Region)
}

// NOTE: we don't need to do pagination here, because the pagination is handled by the caller to this function,
Expand All @@ -83,13 +52,13 @@ func nukeAllKinesisStreams(session *session.Session, identifiers []*string) erro

// There is no bulk delete Kinesis Stream API, so we delete the batch of Kinesis Streams concurrently
// using go routines.
logging.Logger.Debugf("Deleting Kinesis Streams in region: %s", region)
logging.Logger.Debugf("Deleting Kinesis Streams in region: %s", ks.Region)
wg := new(sync.WaitGroup)
wg.Add(len(identifiers))
errChans := make([]chan error, len(identifiers))
for i, streamName := range identifiers {
errChans[i] = make(chan error, 1)
go deleteKinesisStreamAsync(wg, errChans[i], svc, streamName, region)
go ks.deleteAsync(wg, errChans[i], streamName)
}
wg.Wait()

Expand All @@ -103,7 +72,7 @@ func nukeAllKinesisStreams(session *session.Session, identifiers []*string) erro
telemetry.TrackEvent(commonTelemetry.EventContext{
EventName: "Error Nuking Kinesis Stream",
}, map[string]interface{}{
"region": *session.Config.Region,
"region": ks.Region,
})
allErrs = multierror.Append(allErrs, err)
}
Expand All @@ -116,16 +85,14 @@ func nukeAllKinesisStreams(session *session.Session, identifiers []*string) erro
return nil
}

func deleteKinesisStreamAsync(
func (ks KinesisStreams) deleteAsync(
wg *sync.WaitGroup,
errChan chan error,
svc *kinesis.Client,
streamName *string,
region string,
) {
defer wg.Done()
input := &kinesis.DeleteStreamInput{StreamName: streamName}
_, err := svc.DeleteStream(context.TODO(), input)
_, err := ks.Client.DeleteStream(input)

// Record status of this resource
e := report.Entry{
Expand All @@ -139,9 +106,9 @@ func deleteKinesisStreamAsync(

streamNameStr := aws.StringValue(streamName)
if err == nil {
logging.Logger.Debugf("[OK] Kinesis Stream %s delete in %s", streamNameStr, region)
logging.Logger.Debugf("[OK] Kinesis Stream %s delete in %s", streamNameStr, ks.Region)
} else {
logging.Logger.Debugf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, region, err)
logging.Logger.Debugf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, ks.Region, err)
}
}

Expand Down
178 changes: 59 additions & 119 deletions aws/kinesis_stream_test.go
Original file line number Diff line number Diff line change
@@ -1,145 +1,85 @@
package aws

import (
"context"
"fmt"
"github.com/gruntwork-io/cloud-nuke/telemetry"
"strings"
"testing"
"time"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/util"
"github.com/stretchr/testify/assert"
"github.com/gruntwork-io/cloud-nuke/telemetry"
"github.com/stretchr/testify/require"
"regexp"
"testing"
)

func TestListKinesisStreams(t *testing.T) {
telemetry.InitTelemetry("cloud-nuke", "")
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)

cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
require.NoError(t, err)

svc := kinesis.NewFromConfig(cfg)
type mockedKinesisClient struct {
kinesisiface.KinesisAPI
ListStreamsOutput kinesis.ListStreamsOutput
DeleteStreamOutput kinesis.DeleteStreamOutput
}

sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
func (m mockedKinesisClient) ListStreamsPages(input *kinesis.ListStreamsInput, fn func(*kinesis.ListStreamsOutput, bool) bool) error {
fn(&m.ListStreamsOutput, true)
return nil
}

sNames, err := getAllKinesisStreams(session, config.Config{})
require.NoError(t, err)
assert.Contains(t, aws.StringValueSlice(sNames), aws.StringValue(sName))
func (m mockedKinesisClient) DeleteStream(input *kinesis.DeleteStreamInput) (*kinesis.DeleteStreamOutput, error) {
return &m.DeleteStreamOutput, nil
}

func TestNukeKinesisStreamOne(t *testing.T) {
func TestKinesisStreams_GetAll(t *testing.T) {
telemetry.InitTelemetry("cloud-nuke", "")
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)

cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
require.NoError(t, err)

svc := kinesis.NewFromConfig(cfg)

// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
identifiers := []*string{sName}
testName1 := "stream1"
testName2 := "stream2"
ks := KinesisStreams{
Client: mockedKinesisClient{
ListStreamsOutput: kinesis.ListStreamsOutput{
StreamNames: []*string{aws.String(testName1), aws.String(testName2)},
},
},
}

require.NoError(
t,
nukeAllKinesisStreams(session, identifiers),
)
tests := map[string]struct {
configObj config.ResourceType
expected []string
}{
"emptyFilter": {
configObj: config.ResourceType{},
expected: []string{testName1, testName2},
},
"nameExclusionFilter": {
configObj: config.ResourceType{
ExcludeRule: config.FilterRule{
NamesRegExp: []config.Expression{{
RE: *regexp.MustCompile(testName1),
}}},
},
expected: []string{testName2},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
names, err := ks.getAll(config.Config{
KinesisStream: tc.configObj,
})
require.NoError(t, err)
require.Equal(t, tc.expected, aws.StringValueSlice(names))
})
}

assertKinesisStreamsDeleted(t, svc, identifiers)
}

func TestNukeKinesisStreamMoreThanOne(t *testing.T) {
func TestKinesisStreams_NukeAll(t *testing.T) {
telemetry.InitTelemetry("cloud-nuke", "")
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)

cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(aws.StringValue(session.Config.Region)))
require.NoError(t, err)

svc := kinesis.NewFromConfig(cfg)

sNames := []*string{}
for i := 0; i < 3; i++ {
// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
sNames = append(sNames, sName)
ks := KinesisStreams{
Client: mockedKinesisClient{
DeleteStreamOutput: kinesis.DeleteStreamOutput{},
},
}

require.NoError(
t,
nukeAllKinesisStreams(session, sNames),
)

assertKinesisStreamsDeleted(t, svc, sNames)
}

func createKinesisStream(t *testing.T, svc *kinesis.Client) *string {
uniqueID := util.UniqueID()
name := fmt.Sprintf("cloud-nuke-test-%s", strings.ToLower(uniqueID))

_, err := svc.CreateStream(context.TODO(), &kinesis.CreateStreamInput{
ShardCount: aws.Int32(1),
StreamName: aws.String(name),
})
err := ks.nukeAll([]*string{aws.String("test")})
require.NoError(t, err)

// Add an arbitrary sleep to account for eventual consistency
time.Sleep(15 * time.Second)
return &name
}

func deleteKinesisStream(t *testing.T, svc *kinesis.Client, name *string, checkErr bool) {
_, err := svc.DeleteStream(context.TODO(), &kinesis.DeleteStreamInput{
StreamName: name,
})
if checkErr {
require.NoError(t, err)
}
}

func assertKinesisStreamsDeleted(t *testing.T, svc *kinesis.Client, identifiers []*string) {
for _, name := range identifiers {
stream, err := svc.DescribeStream(context.TODO(), &kinesis.DescribeStreamInput{
StreamName: name,
})

// There is an error returned, assert it's because the Stream cannot be found because it's
// been deleted. Otherwise assert that the stream status is DELETING.
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "ResourceNotFoundException" {
t.Fatalf("Stream %s is not deleted", aws.StringValue(name))
}
} else {
require.Equal(t, types.StreamStatusDeleting, stream.StreamDescription.StreamStatus)
}
}
}
12 changes: 6 additions & 6 deletions aws/kinesis_stream_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ type KinesisStreams struct {
}

// ResourceName - The simple name of the AWS resource
func (k KinesisStreams) ResourceName() string {
func (ks KinesisStreams) ResourceName() string {
return "kinesis-stream"
}

// ResourceIdentifiers - The names of the Kinesis Streams
func (k KinesisStreams) ResourceIdentifiers() []string {
return k.Names
func (ks KinesisStreams) ResourceIdentifiers() []string {
return ks.Names
}

func (k KinesisStreams) MaxBatchSize() int {
func (ks KinesisStreams) MaxBatchSize() int {
// Tentative batch size to ensure AWS doesn't throttle. Note that Kinesis Streams does not support bulk delete, so
// we will be deleting this many in parallel using go routines. We pick 35 here, which is half of what the AWS web
// console will do. We pick a conservative number here to avoid hitting AWS API rate limits.
return 35
}

// Nuke - nuke 'em all!!!
func (k KinesisStreams) Nuke(session *session.Session, identifiers []string) error {
if err := nukeAllKinesisStreams(session, aws.StringSlice(identifiers)); err != nil {
func (ks KinesisStreams) Nuke(session *session.Session, identifiers []string) error {
if err := ks.nukeAll(aws.StringSlice(identifiers)); err != nil {
return errors.WithStackTrace(err)
}

Expand Down