Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding new resource aws_msk_cluster_policy #32848

Merged
merged 11 commits into from
Aug 11, 2023
3 changes: 3 additions & 0 deletions .changelog/32848.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:new-resource
aws_msk_cluster_policy
```
164 changes: 164 additions & 0 deletions internal/service/kafka/cluster_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package kafka

import (
"context"
"log"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kafka"
"github.com/aws/aws-sdk-go-v2/service/kafka/types"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/structure"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/errs"
"github.com/hashicorp/terraform-provider-aws/internal/errs/sdkdiag"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

// @SDKResource("aws_msk_cluster_policy", name="Cluster Policy")
func ResourceClusterPolicy() *schema.Resource {
return &schema.Resource{
CreateWithoutTimeout: resourceClusterPolicyPut,
ReadWithoutTimeout: resourceClusterPolicyRead,
UpdateWithoutTimeout: resourceClusterPolicyPut,
DeleteWithoutTimeout: resourceClusterPolicyDelete,

Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},

Schema: map[string]*schema.Schema{
"cluster_arn": {
Type: schema.TypeString,
Required: true,
},
"current_version": {
Type: schema.TypeString,
Computed: true,
},
"policy": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validation.StringIsJSON,
DiffSuppressFunc: verify.SuppressEquivalentPolicyDiffs,
DiffSuppressOnRefresh: true,
StateFunc: func(v interface{}) string {
json, _ := structure.NormalizeJsonString(v)
return json
},
},
},
}
}

func resourceClusterPolicyPut(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).KafkaClient(ctx)

policy, err := structure.NormalizeJsonString(d.Get("policy").(string))
if err != nil {
return sdkdiag.AppendFromErr(diags, err)
}

clusterARN := d.Get("cluster_arn").(string)
in := &kafka.PutClusterPolicyInput{
ClusterArn: aws.String(clusterARN),
Policy: aws.String(policy),
}

_, err = conn.PutClusterPolicy(ctx, in)

if err != nil {
return sdkdiag.AppendErrorf(diags, "setting MSK Cluster Policy (%s): %s", clusterARN, err)
}

if d.IsNewResource() {
d.SetId(clusterARN)
}

return append(diags, resourceClusterPolicyRead(ctx, d, meta)...)
}

func resourceClusterPolicyRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).KafkaClient(ctx)

output, err := FindClusterPolicyByARN(ctx, conn, d.Id())

if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] MSK Cluster Policy (%s) not found, removing from state", d.Id())
d.SetId("")
return diags
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "reading MSK Cluster Policy (%s): %s", d.Id(), err)
}

d.Set("cluster_arn", d.Id())
d.Set("current_version", output.CurrentVersion)
if output.Policy != nil {
policyToSet, err := verify.PolicyToSet(d.Get("policy").(string), aws.ToString(output.Policy))
if err != nil {
return sdkdiag.AppendFromErr(diags, err)
}

d.Set("policy", policyToSet)
} else {
d.Set("policy", nil)
}

return diags
}

func resourceClusterPolicyDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).KafkaClient(ctx)

log.Printf("[INFO] Deleting MSK Cluster Policy: %s", d.Id())
_, err := conn.DeleteClusterPolicy(ctx, &kafka.DeleteClusterPolicyInput{
ClusterArn: aws.String(d.Id()),
})

if errs.IsA[*types.NotFoundException](err) {
return diags
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "deleting MSK Cluster Policy (%s): %s", d.Id(), err)
}

return diags
}

func FindClusterPolicyByARN(ctx context.Context, conn *kafka.Client, id string) (*kafka.GetClusterPolicyOutput, error) {
in := &kafka.GetClusterPolicyInput{
ClusterArn: aws.String(id),
}

out, err := conn.GetClusterPolicy(ctx, in)

if errs.IsA[*types.NotFoundException](err) {
return nil, &retry.NotFoundError{
LastError: err,
LastRequest: in,
}
}

if err != nil {
return nil, err
}

if out == nil {
return nil, tfresource.NewEmptyResultError(in)
}

return out, nil
}
160 changes: 160 additions & 0 deletions internal/service/kafka/cluster_policy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package kafka_test

import (
"context"
"fmt"
"regexp"
"testing"

"github.com/aws/aws-sdk-go-v2/service/kafka"
sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/terraform"
"github.com/hashicorp/terraform-provider-aws/internal/acctest"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
tfkafka "github.com/hashicorp/terraform-provider-aws/internal/service/kafka"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/names"
)

func TestAccKafkaClusterPolicy_basic(t *testing.T) {
ctx := acctest.Context(t)
var clusterpolicy kafka.GetClusterPolicyOutput
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster_policy.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() {
acctest.PreCheck(ctx, t)
acctest.PreCheckPartitionHasService(t, names.Kafka)
testAccPreCheck(ctx, t)
},
ErrorCheck: acctest.ErrorCheck(t, names.Kafka),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterPolicyDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccClusterPolicyConfig_basic(rName),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterPolicyExists(ctx, resourceName, &clusterpolicy),
resource.TestCheckResourceAttrSet(resourceName, "current_version"),
resource.TestMatchResourceAttr(resourceName, "policy", regexp.MustCompile(`"kafka:CreateVpcConnection","kafka:GetBootstrapBrokers"`)),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccKafkaClusterPolicy_disappears(t *testing.T) {
ctx := acctest.Context(t)
var clusterpolicy kafka.GetClusterPolicyOutput
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster_policy.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() {
acctest.PreCheck(ctx, t)
acctest.PreCheckPartitionHasService(t, names.Kafka)
testAccPreCheck(ctx, t)
},
ErrorCheck: acctest.ErrorCheck(t, names.Kafka),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterPolicyDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccClusterPolicyConfig_basic(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckClusterPolicyExists(ctx, resourceName, &clusterpolicy),
acctest.CheckResourceDisappears(ctx, acctest.Provider, tfkafka.ResourceClusterPolicy(), resourceName),
),
ExpectNonEmptyPlan: true,
},
},
})
}

func testAccCheckClusterPolicyDestroy(ctx context.Context) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := acctest.Provider.Meta().(*conns.AWSClient).KafkaClient(ctx)

for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_msk_cluster_policy" {
continue
}

_, err := tfkafka.FindClusterPolicyByARN(ctx, conn, rs.Primary.ID)

if tfresource.NotFound(err) {
continue
}

if err != nil {
return err
}

return fmt.Errorf("MSK Cluster Policy %s still exists", rs.Primary.ID)
}

return nil
}
}

func testAccCheckClusterPolicyExists(ctx context.Context, n string, v *kafka.GetClusterPolicyOutput) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}

conn := acctest.Provider.Meta().(*conns.AWSClient).KafkaClient(ctx)

output, err := tfkafka.FindClusterPolicyByARN(ctx, conn, rs.Primary.ID)

if err != nil {
return err
}

*v = *output

return nil
}
}

func testAccClusterPolicyConfig_basic(rName string) string {
return acctest.ConfigCompose(testAccVPCConnectionConfig_basic(rName), `
data "aws_caller_identity" "current" {}
data "aws_partition" "current" {}

resource "aws_msk_cluster_policy" "test" {
cluster_arn = aws_msk_cluster.test.arn

policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Sid = "testMskClusterPolicy"
Effect = "Allow"
Principal = {
"AWS" = "arn:${data.aws_partition.current.partition}:iam::${data.aws_caller_identity.current.account_id}:root"
}
Action = [
"kafka:Describe*",
"kafka:Get*",
"kafka:CreateVpcConnection",
"kafka:GetBootstrapBrokers",
]
Resource = aws_msk_cluster.test.arn
}]
})

depends_on = [aws_msk_vpc_connection.test]
}
`)
}
5 changes: 5 additions & 0 deletions internal/service/kafka/service_package_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading