Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kinesis stream support #304

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The currently supported functionality includes:
- Inspecting and deleting all GuardDuty Detectors in an AWS Account
- Inspecting and deleting all Macie member accounts in an AWS account - as long as those accounts were created by Invitation - and not via AWS Organizations
- Inspecting and deleting all SageMaker Notebook Instances in an AWS account
- Inspecting and deleting all Kinesis Streams in an AWS account

### BEWARE!

Expand Down Expand Up @@ -358,6 +359,9 @@ The following resources support the Config file:
Notes:
* no configuration options for KMS customer keys, since keys are created with auto-generated identifier

- Kinesis Streams
- Resource type: `kinesis-stream`
- Config key: `KinesisStream`

#### Example

Expand Down Expand Up @@ -466,6 +470,7 @@ To find out what we options are supported in the config file today, consult this
| eip | none | ✅ | none | none |
| ec2 | none | ✅ | none | none |
| eks | none | ✅ | none | none |
| kinesis-stream | none | ✅ | none | none |
| acmpca | none | none | none | none |
| iam role | none | none | none | none |
| sagemaker-notebook-instances| none| ✅ | none | none |
Expand Down
15 changes: 15 additions & 0 deletions aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,20 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
}
// End SageMaker Notebook Instances

// Kinesis Streams
kinesisStreams := KinesisStreams{}
if IsNukeable(kinesisStreams.ResourceName(), resourceTypes) {
streams, err := getAllKinesisStreams(cloudNukeSession, configObj)
if err != nil {
return nil, errors.WithStackTrace(err)
}
if len(streams) > 0 {
kinesisStreams.Names = awsgo.StringValueSlice(streams)
resourcesInRegion.Resources = append(resourcesInRegion.Resources, kinesisStreams)
}
}
// End Kinesis Streams

if len(resourcesInRegion.Resources) > 0 {
account.Resources[region] = resourcesInRegion
}
Expand Down Expand Up @@ -861,6 +875,7 @@ func ListResourceTypes() []string {
GuardDuty{}.ResourceName(),
MacieMember{}.ResourceName(),
SageMakerNotebookInstances{}.ResourceName(),
KinesisStreams{}.ResourceName(),
}
sort.Strings(resourceTypes)
return resourceTypes
Expand Down
122 changes: 122 additions & 0 deletions aws/kinesis_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package aws

import (
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/logging"
"github.com/gruntwork-io/go-commons/errors"
"github.com/hashicorp/go-multierror"
)

func getAllKinesisStreams(session *session.Session, configObj config.Config) ([]*string, error) {
svc := kinesis.New(session)

allStreams := []*string{}
err := svc.ListStreamsPages(
&kinesis.ListStreamsInput{},
func(page *kinesis.ListStreamsOutput, lastPage bool) bool {
for _, streamName := range page.StreamNames {
if shouldIncludeKinesisStream(streamName, configObj) {
allStreams = append(allStreams, streamName)
}
}
return !lastPage
},
)
if err != nil {
return nil, errors.WithStackTrace(err)
}
return allStreams, nil
}

func shouldIncludeKinesisStream(streamName *string, configObj config.Config) bool {
if streamName == nil {
return false
}

return config.ShouldInclude(
aws.StringValue(streamName),
configObj.KinesisStream.IncludeRule.NamesRegExp,
configObj.KinesisStream.ExcludeRule.NamesRegExp,
)
}

func nukeAllKinesisStreams(session *session.Session, identifiers []*string) error {
region := aws.StringValue(session.Config.Region)
svc := kinesis.New(session)

if len(identifiers) == 0 {
logging.Logger.Infof("No Kinesis Streams to nuke in region: %s", region)
}

// NOTE: we don't need to do pagination here, because the pagination is handled by the caller to this function,
// based on KinesisStream.MaxBatchSize, however we add a guard here to warn users when the batching fails and
// has a chance of throttling AWS. Since we concurrently make one call for each identifier, we pick 100 for the
// limit here because many APIs in AWS have a limit of 100 requests per second.
if len(identifiers) > 100 {
logging.Logger.Errorf("Nuking too many Kinesis Streams at once (100): halting to avoid hitting AWS API rate limiting")
return TooManyStreamsErr{}
}

// There is no bulk delete Kinesis Stream API, so we delete the batch of Kinesis Streams concurrently
// using go routines.
logging.Logger.Infof("Deleting Kinesis Streams in region: %s", region)
wg := new(sync.WaitGroup)
wg.Add(len(identifiers))
errChans := make([]chan error, len(identifiers))
for i, streamName := range identifiers {
errChans[i] = make(chan error, 1)
go deleteKinesisStreamAsync(wg, errChans[i], svc, streamName, region)
}
wg.Wait()

// Collect all the errors from the async delete calls into a single error struct.
// NOTE: We ignore OperationAbortedException which is thrown when there is an eventual consistency issue, where
// cloud-nuke picks up a Stream that is already requested to be deleted.
var allErrs *multierror.Error
for _, errChan := range errChans {
if err := <-errChan; err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "OperationAbortedException" {
allErrs = multierror.Append(allErrs, err)
}
}
}
finalErr := allErrs.ErrorOrNil()
if finalErr != nil {
return errors.WithStackTrace(finalErr)
}
return nil
}

func deleteKinesisStreamAsync(
wg *sync.WaitGroup,
errChan chan error,
svc *kinesis.Kinesis,
streamName *string,
region string,
) {
defer wg.Done()
input := &kinesis.DeleteStreamInput{StreamName: streamName}
_, err := svc.DeleteStream(input)
errChan <- err

streamNameStr := aws.StringValue(streamName)
if err == nil {
logging.Logger.Infof("[OK] Kinesis Stream %s delete in %s", streamNameStr, region)
} else {
logging.Logger.Errorf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, region, err)
}
}

// Custom errors

type TooManyStreamsErr struct{}

func (err TooManyStreamsErr) Error() string {
return "Too many Streams requested at once."
}
126 changes: 126 additions & 0 deletions aws/kinesis_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package aws

import (
"fmt"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestListKinesisStreams(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)
svc := kinesis.New(session)

sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)

sNames, err := getAllKinesisStreams(session, config.Config{})
require.NoError(t, err)
assert.Contains(t, aws.StringValueSlice(sNames), aws.StringValue(sName))
}

func TestNukeKinesisStreamOne(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)
svc := kinesis.New(session)

// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
identifiers := []*string{sName}

require.NoError(
t,
nukeAllKinesisStreams(session, identifiers),
)

assertKinesisStreamsDeleted(t, svc, identifiers)
}

func TestNukeKinesisStreamMoreThanOne(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)
svc := kinesis.New(session)

sNames := []*string{}
for i := 0; i < 3; i++ {
// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
sNames = append(sNames, sName)
}

require.NoError(
t,
nukeAllKinesisStreams(session, sNames),
)

assertKinesisStreamsDeleted(t, svc, sNames)
}

func createKinesisStream(t *testing.T, svc *kinesis.Kinesis) *string {
uniqueID := util.UniqueID()
name := fmt.Sprintf("cloud-nuke-test-%s", strings.ToLower(uniqueID))

_, err := svc.CreateStream(&kinesis.CreateStreamInput{
ShardCount: aws.Int64(1),
StreamName: aws.String(name),
})
require.NoError(t, err)

// Add an arbitrary sleep to account for eventual consistency
time.Sleep(15 * time.Second)
return &name
}

func deleteKinesisStream(t *testing.T, svc *kinesis.Kinesis, name *string, checkErr bool) {
_, err := svc.DeleteStream(&kinesis.DeleteStreamInput{
StreamName: name,
})
if checkErr {
require.NoError(t, err)
}
}

func assertKinesisStreamsDeleted(t *testing.T, svc *kinesis.Kinesis, identifiers []*string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function assertKinesisStreamsDeleted is not invoked in any other place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch! Added the missing calls 👍

for _, name := range identifiers {
stream, err := svc.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: name,
})

// There is an error returned, assert it's because the Stream cannot be found because it's
// been deleted. Otherwise assert that the stream status is DELETING.
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "ResourceNotFoundException" {
t.Fatalf("Stream %s is not deleted", aws.StringValue(name))
}
} else {
require.Equal(t, "DELETING", *stream.StreamDescription.StreamStatus)
}
}
}
38 changes: 38 additions & 0 deletions aws/kinesis_stream_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/gruntwork-io/go-commons/errors"
)

// KinesisStreams - represents all Kinesis streams
type KinesisStreams struct {
Names []string
}

// ResourceName - The simple name of the AWS resource
func (k KinesisStreams) ResourceName() string {
return "kinesis-stream"
}

// ResourceIdentifiers - The names of the Kinesis Streams
func (k KinesisStreams) ResourceIdentifiers() []string {
return k.Names
}

func (k KinesisStreams) MaxBatchSize() int {
// Tentative batch size to ensure AWS doesn't throttle. Note that Kinesis Streams does not support bulk delete, so
// we will be deleting this many in parallel using go routines. We pick 35 here, which is half of what the AWS web
// console will do. We pick a conservative number here to avoid hitting AWS API rate limits.
return 35
}

// Nuke - nuke 'em all!!!
func (k KinesisStreams) Nuke(session *session.Session, identifiers []string) error {
if err := nukeAllKinesisStreams(session, aws.StringSlice(identifiers)); err != nil {
return errors.WithStackTrace(err)
}

return nil
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
KMSCustomerKeys ResourceType `yaml:"KMSCustomerKeys"`
EKSCluster ResourceType `yaml:"EKSCluster"`
SageMakerNotebook ResourceType `yaml:"SageMakerNotebook"`
KinesisStream ResourceType `yaml:"KinesisStream"`
}

type ResourceType struct {
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func emptyConfig() *Config {
ResourceType{FilterRule{}, FilterRule{}},
ResourceType{FilterRule{}, FilterRule{}},
ResourceType{FilterRule{}, FilterRule{}},
ResourceType{FilterRule{}, FilterRule{}},
}
}

Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ require (
github.com/hashicorp/go-multierror v1.1.0
github.com/pquerna/otp v1.3.0
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/testify v1.7.1
github.com/urfave/cli v1.22.4
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading