Skip to content

Commit

Permalink
Add Amazon Managed Streaming for Apache Kafka (#574)
Browse files Browse the repository at this point in the history
* add msk

* add test to discover and delete msk cluster

* mock tests for msk

* remove test helpers for msk

* go mod tidy

* Refactor the code to reflect the latest structure

---------

Co-authored-by: robpickerill <[email protected]>
  • Loading branch information
james03160927 and robpickerill authored Aug 30, 2023
1 parent 6887713 commit 6902144
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 65 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ of the file that are supported are listed here.
| lc | LaunchConfiguration | ✅ (Launch Configuration Name) | ✅ (Created Time) | ❌ |
| lt | LaunchTemplate | ✅ (Launch Template Name) | ✅ (Created Time) | ❌ |
| macie-member | MacieMember | ❌ | ✅ (Creation Time) | ❌ |
| msk-cluster | MskCluster | ✅ (Cluster Name) | ✅ (Creation Time) | ❌ |
| nat-gateway | NatGateway | ✅ (EC2 Name Tag) | ✅ (Creation Time) | ❌ |
| oidcprovider | OIDCProvider | ✅ (Provider URL) | ✅ (Creation Time) | ❌ |
| opensearchdomain | OpenSearchDomain | ✅ (Domain Name) | ✅ (First Seen Tag Time) | ❌ |
Expand Down
64 changes: 64 additions & 0 deletions aws/resources/msk_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package resources

import (
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/logging"
"github.com/gruntwork-io/cloud-nuke/report"
)

func (m MSKCluster) getAll(configObj config.Config) ([]*string, error) {
var clusterIDs []*string

err := m.Client.ListClustersV2Pages(&kafka.ListClustersV2Input{}, func(page *kafka.ListClustersV2Output, lastPage bool) bool {
for _, cluster := range page.ClusterInfoList {
if m.shouldInclude(cluster, configObj) {
clusterIDs = append(clusterIDs, cluster.ClusterArn)
}
}
return !lastPage
})
if err != nil {
return nil, err
}

return clusterIDs, nil
}

func (m MSKCluster) shouldInclude(cluster *kafka.Cluster, configObj config.Config) bool {
if *cluster.State == kafka.ClusterStateDeleting {
return false
}

// if cluster is still creating, skip it as it will only throw an error when attempting to delete it
// BadRequestException: You can't delete cluster in CREATING state.
if *cluster.State == kafka.ClusterStateCreating {
return false
}

return configObj.MSKCluster.ShouldInclude(config.ResourceValue{
Name: cluster.ClusterName,
Time: cluster.CreationTime,
})
}

func (m MSKCluster) nukeAll(identifiers []string) error {
for _, clusterArn := range identifiers {
_, err := m.Client.DeleteCluster(&kafka.DeleteClusterInput{
ClusterArn: &clusterArn,
})
if err != nil {
logging.Logger.Errorf("[Failed] %s", err)
}

// Record status of this resource
e := report.Entry{
Identifier: clusterArn,
ResourceType: "MSKCluster",
Error: err,
}
report.Record(e)
}

return nil
}
232 changes: 232 additions & 0 deletions aws/resources/msk_cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package resources

import (
"fmt"
"regexp"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/aws/aws-sdk-go/service/kafka/kafkaiface"
"github.com/gruntwork-io/cloud-nuke/config"
)

type mockMSKClient struct {
kafkaiface.KafkaAPI
listClustersV2PagesFn func(input *kafka.ListClustersV2Input, callback func(*kafka.ListClustersV2Output, bool) bool) error
deleteClusterFn func(input *kafka.DeleteClusterInput) (*kafka.DeleteClusterOutput, error)
}

func (m mockMSKClient) ListClustersV2Pages(input *kafka.ListClustersV2Input, callback func(*kafka.ListClustersV2Output, bool) bool) error {
return m.listClustersV2PagesFn(input, callback)
}

func (m mockMSKClient) DeleteCluster(input *kafka.DeleteClusterInput) (*kafka.DeleteClusterOutput, error) {
return nil, nil
}

func TestListMSKClustersSingle(t *testing.T) {
mockMskClient := mockMSKClient{
listClustersV2PagesFn: func(input *kafka.ListClustersV2Input, callback func(*kafka.ListClustersV2Output, bool) bool) error {
callback(&kafka.ListClustersV2Output{
ClusterInfoList: []*kafka.Cluster{
{
ClusterArn: aws.String("arn:aws:kafka:us-east-1:123456789012:cluster/test-cluster-1/1a2b3c4d-5e6f-7g8h-9i0j-1k2l3m4n5o6p"),
ClusterName: aws.String("test-cluster-1"),
CreationTime: aws.Time(time.Now()),
State: aws.String(kafka.ClusterStateActive),
},
},
}, true)
return nil
},
}

msk := MSKCluster{
Client: &mockMskClient,
}

clusterIDs, err := msk.getAll(config.Config{})
if err != nil {
t.Fatalf("Unable to list MSK Clusters: %v", err)
}

if len(clusterIDs) != 1 {
t.Fatalf("Expected 1 cluster, got %d", len(clusterIDs))
}

if *clusterIDs[0] != "arn:aws:kafka:us-east-1:123456789012:cluster/test-cluster-1/1a2b3c4d-5e6f-7g8h-9i0j-1k2l3m4n5o6p" {
t.Fatalf("Unexpected cluster ID: %s", *clusterIDs[0])
}
}

func TestListMSKClustersMultiple(t *testing.T) {
mockMskClient := mockMSKClient{
listClustersV2PagesFn: func(input *kafka.ListClustersV2Input, callback func(*kafka.ListClustersV2Output, bool) bool) error {
callback(&kafka.ListClustersV2Output{
ClusterInfoList: []*kafka.Cluster{
{
ClusterArn: aws.String("arn:aws:kafka:us-east-1:123456789012:cluster/test-cluster-1/1a2b3c4d-5e6f-7g8h-9i0j-1k2l3m4n5o6p"),
ClusterName: aws.String("test-cluster-1"),
CreationTime: aws.Time(time.Now()),
State: aws.String(kafka.ClusterStateActive),
}, {
ClusterArn: aws.String("arn:aws:kafka:us-east-1:123456789012:cluster/test-cluster-2/1a2b3c4d-5e6f-7g8h-9i0j-1k2l3m4n5o6p"),
ClusterName: aws.String("test-cluster-2"),
CreationTime: aws.Time(time.Now()),
State: aws.String(kafka.ClusterStateActive),
}, {
ClusterArn: aws.String("arn:aws:kafka:us-east-1:123456789012:cluster/test-cluster-3/1a2b3c4d-5e6f-7g8h-9i0j-1k2l3m4n5o6p"),
ClusterName: aws.String("test-cluster-3"),
CreationTime: aws.Time(time.Now()),
State: aws.String(kafka.ClusterStateActive),
},
},
}, true)
return nil
},
}

msk := MSKCluster{
Client: &mockMskClient,
}

clusterIDs, err := msk.getAll(config.Config{})
if err != nil {
t.Fatalf("Unable to list MSK Clusters: %v", err)
}

if len(clusterIDs) != 3 {
t.Fatalf("Expected 3 clusters, got %d", len(clusterIDs))
}

for i := range clusterIDs {
prefix := fmt.Sprintf("arn:aws:kafka:us-east-1:123456789012:cluster/test-cluster-%d", i+1)
if !strings.HasPrefix(*clusterIDs[i], prefix) {
t.Fatalf("Unexpected cluster ID: %s", *clusterIDs[i])
}
}
}

func TestGetAllMSKError(t *testing.T) {
mockMskClient := mockMSKClient{
listClustersV2PagesFn: func(input *kafka.ListClustersV2Input, callback func(*kafka.ListClustersV2Output, bool) bool) error {
return fmt.Errorf("Error listing MSK Clusters")
},
}

msk := MSKCluster{
Client: &mockMskClient,
}

_, err := msk.getAll(config.Config{})
if err == nil {
t.Fatalf("Expected error listing MSK Clusters")
}
}

func TestShouldIncludeMSKCluster(t *testing.T) {
clusterName := "test-cluster"
creationTime := time.Now()

tests := map[string]struct {
cluster kafka.Cluster
configObj config.Config
expected bool
}{
"cluster is in deleting state": {
cluster: kafka.Cluster{
ClusterName: &clusterName,
State: aws.String(kafka.ClusterStateDeleting),
CreationTime: &creationTime,
},
configObj: config.Config{},
expected: false,
},
"cluster is in creating state": {
cluster: kafka.Cluster{
ClusterName: &clusterName,
State: aws.String(kafka.ClusterStateCreating),
CreationTime: &creationTime,
},
configObj: config.Config{},
expected: false,
},
"cluster is in active state": {
cluster: kafka.Cluster{
ClusterName: &clusterName,
State: aws.String(kafka.ClusterStateActive),
CreationTime: &creationTime,
},
configObj: config.Config{},
expected: true,
},
"cluster excluded by name": {
cluster: kafka.Cluster{
ClusterName: &clusterName,
State: aws.String(kafka.ClusterStateActive),
CreationTime: &creationTime,
},
configObj: config.Config{
MSKCluster: config.ResourceType{
ExcludeRule: config.FilterRule{
NamesRegExp: []config.Expression{
{
RE: *regexp.MustCompile("test-cluster"),
},
},
},
},
},
expected: false,
},
"cluster included by name": {
cluster: kafka.Cluster{
ClusterName: &clusterName,
State: aws.String(kafka.ClusterStateActive),
CreationTime: &creationTime,
},
configObj: config.Config{
MSKCluster: config.ResourceType{
IncludeRule: config.FilterRule{
NamesRegExp: []config.Expression{
{
RE: *regexp.MustCompile("test-cluster"),
},
},
},
},
},
expected: true,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
msk := MSKCluster{}
actual := msk.shouldInclude(&tc.cluster, tc.configObj)
if actual != tc.expected {
t.Fatalf("Expected %v, got %v", tc.expected, actual)
}
})
}
}

func TestNukeMSKCluster(t *testing.T) {
mockMskClient := mockMSKClient{
deleteClusterFn: func(input *kafka.DeleteClusterInput) (*kafka.DeleteClusterOutput, error) {
return nil, nil
},
}

msk := MSKCluster{
Client: &mockMskClient,
}

err := msk.Nuke(nil, []string{})
if err != nil {
t.Fatalf("Unable to nuke MSK Clusters: %v", err)
}
}
57 changes: 57 additions & 0 deletions aws/resources/msk_cluster_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package resources

import (
awsgo "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/aws/aws-sdk-go/service/kafka/kafkaiface"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/go-commons/errors"
)

// MSKCluster - represents all AWS Managed Streaming for Kafka clusters that should be deleted.
type MSKCluster struct {
Client kafkaiface.KafkaAPI
Region string
ClusterArns []string
}

func (msk MSKCluster) Init(session *session.Session) {
msk.Client = kafka.New(session)
}

// ResourceName - the simple name of the aws resource
func (msk MSKCluster) ResourceName() string {
return "msk-cluster"
}

// ResourceIdentifiers - The instance ids of the AWS Managed Streaming for Kafka clusters
func (msk MSKCluster) ResourceIdentifiers() []string {
return msk.ClusterArns
}

func (msk MSKCluster) MaxBatchSize() int {
// Tentative batch size to ensure AWS doesn't throttle. Note that nat gateway does not support bulk delete, so
// we will be deleting this many in parallel using go routines. We conservatively pick 10 here, both to limit
// overloading the runtime and to avoid AWS throttling with many API calls.
return 10
}

func (msk MSKCluster) GetAndSetIdentifiers(configObj config.Config) ([]string, error) {
identifiers, err := msk.getAll(configObj)
if err != nil {
return nil, err
}

msk.ClusterArns = awsgo.StringValueSlice(identifiers)
return msk.ClusterArns, nil
}

// Nuke - nuke 'em all!!!
func (msk MSKCluster) Nuke(_ *session.Session, identifiers []string) error {
if err := msk.nukeAll(identifiers); err != nil {
return errors.WithStackTrace(err)
}

return nil
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Config struct {
LaunchConfiguration ResourceType `yaml:"LaunchConfiguration"`
LaunchTemplate ResourceType `yaml:"LaunchTemplate"`
MacieMember ResourceType `yaml:"MacieMember"`
MSKCluster ResourceType `yaml:"MSKCluster"`
NatGateway ResourceType `yaml:"NatGateway"`
OIDCProvider ResourceType `yaml:"OIDCProvider"`
OpenSearchDomain ResourceType `yaml:"OpenSearchDomain"`
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func emptyConfig() *Config {
ResourceType{FilterRule{}, FilterRule{}},
ResourceType{FilterRule{}, FilterRule{}},
ResourceType{FilterRule{}, FilterRule{}},
ResourceType{FilterRule{}, FilterRule{}},
}
}

Expand Down
Loading

0 comments on commit 6902144

Please sign in to comment.