diff --git a/.changelog/22414.txt b/.changelog/22414.txt new file mode 100644 index 00000000000..b4b1083dd78 --- /dev/null +++ b/.changelog/22414.txt @@ -0,0 +1,7 @@ +```release-note:new-resource +aws_mskconnect_worker_configuration +``` + +```release-note:new-data-source +aws_mskconnect_worker_configuration +``` diff --git a/internal/provider/provider.go b/internal/provider/provider.go index b94d11a2652..d7d85bba15e 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -577,7 +577,8 @@ func Provider() *schema.Provider { "aws_msk_configuration": kafka.DataSourceConfiguration(), "aws_msk_kafka_version": kafka.DataSourceVersion(), - "aws_mskconnect_custom_plugin": kafkaconnect.DataSourceCustomPlugin(), + "aws_mskconnect_custom_plugin": kafkaconnect.DataSourceCustomPlugin(), + "aws_mskconnect_worker_configuration": kafkaconnect.DataSourceWorkerConfiguration(), "aws_kinesis_stream": kinesis.DataSourceStream(), "aws_kinesis_stream_consumer": kinesis.DataSourceStreamConsumer(), @@ -1337,7 +1338,8 @@ func Provider() *schema.Provider { "aws_msk_configuration": kafka.ResourceConfiguration(), "aws_msk_scram_secret_association": kafka.ResourceScramSecretAssociation(), - "aws_mskconnect_custom_plugin": kafkaconnect.ResourceCustomPlugin(), + "aws_mskconnect_custom_plugin": kafkaconnect.ResourceCustomPlugin(), + "aws_mskconnect_worker_configuration": kafkaconnect.ResourceWorkerConfiguration(), "aws_kinesis_stream": kinesis.ResourceStream(), "aws_kinesis_stream_consumer": kinesis.ResourceStreamConsumer(), diff --git a/internal/service/kafkaconnect/find.go b/internal/service/kafkaconnect/find.go index d3e67e40a7a..1e5d62457f1 100644 --- a/internal/service/kafkaconnect/find.go +++ b/internal/service/kafkaconnect/find.go @@ -32,3 +32,27 @@ func FindCustomPluginByARN(conn *kafkaconnect.KafkaConnect, arn string) (*kafkac return output, nil } + +func FindWorkerConfigurationByARN(conn *kafkaconnect.KafkaConnect, arn string) (*kafkaconnect.DescribeWorkerConfigurationOutput, error) { + input := &kafkaconnect.DescribeWorkerConfigurationInput{ + WorkerConfigurationArn: aws.String(arn), + } + + output, err := conn.DescribeWorkerConfiguration(input) + if tfawserr.ErrCodeEquals(err, kafkaconnect.ErrCodeNotFoundException) { + return nil, &resource.NotFoundError{ + LastError: err, + LastRequest: input, + } + } + + if err != nil { + return nil, err + } + + if output == nil { + return nil, tfresource.NewEmptyResultError(input) + } + + return output, nil +} diff --git a/internal/service/kafkaconnect/worker_configuration.go b/internal/service/kafkaconnect/worker_configuration.go new file mode 100644 index 00000000000..51379374554 --- /dev/null +++ b/internal/service/kafkaconnect/worker_configuration.go @@ -0,0 +1,125 @@ +package kafkaconnect + +import ( + "encoding/base64" + "fmt" + "log" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kafkaconnect" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-provider-aws/internal/conns" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" + "github.com/hashicorp/terraform-provider-aws/internal/verify" +) + +func ResourceWorkerConfiguration() *schema.Resource { + return &schema.Resource{ + Create: resourceWorkerConfigurationCreate, + Read: resourceWorkerConfigurationRead, + Delete: schema.Noop, + + Importer: &schema.ResourceImporter{ + State: schema.ImportStatePassthrough, + }, + + Schema: map[string]*schema.Schema{ + "arn": { + Type: schema.TypeString, + Computed: true, + }, + "description": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, + "latest_revision": { + Type: schema.TypeInt, + Computed: true, + }, + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "properties_file_content": { + Type: schema.TypeString, + ForceNew: true, + Required: true, + StateFunc: func(v interface{}) string { + switch v := v.(type) { + case string: + return decodePropertiesFileContent(v) + default: + return "" + } + }, + }, + }, + } +} + +func resourceWorkerConfigurationCreate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*conns.AWSClient).KafkaConnectConn + + name := d.Get("name").(string) + properties := d.Get("properties_file_content").(string) + + input := &kafkaconnect.CreateWorkerConfigurationInput{ + Name: aws.String(name), + PropertiesFileContent: aws.String(verify.Base64Encode([]byte(properties))), + } + + if v, ok := d.GetOk("description"); ok { + input.Description = aws.String(v.(string)) + } + + log.Print("[DEBUG] Creating MSK Connect Worker Configuration") + output, err := conn.CreateWorkerConfiguration(input) + if err != nil { + return fmt.Errorf("error creating MSK Connect Worker Configuration (%s): %w", name, err) + } + + d.SetId(aws.StringValue(output.WorkerConfigurationArn)) + + return resourceWorkerConfigurationRead(d, meta) +} + +func resourceWorkerConfigurationRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*conns.AWSClient).KafkaConnectConn + + config, err := FindWorkerConfigurationByARN(conn, d.Id()) + + if tfresource.NotFound(err) && !d.IsNewResource() { + log.Printf("[WARN] MSK Connect Worker Configuration (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil + } + + if err != nil { + return fmt.Errorf("error reading MSK Connect Worker Configuration (%s): %w", d.Id(), err) + } + + d.Set("arn", config.WorkerConfigurationArn) + d.Set("name", config.Name) + d.Set("description", config.Description) + + if config.LatestRevision != nil { + d.Set("latest_revision", config.LatestRevision.Revision) + d.Set("properties_file_content", decodePropertiesFileContent(aws.StringValue(config.LatestRevision.PropertiesFileContent))) + } else { + d.Set("latest_revision", nil) + d.Set("properties_file_content", nil) + } + + return nil +} + +func decodePropertiesFileContent(content string) string { + result, err := base64.StdEncoding.DecodeString(content) + if err != nil { + return content + } + + return string(result) +} diff --git a/internal/service/kafkaconnect/worker_configuration_data_source.go b/internal/service/kafkaconnect/worker_configuration_data_source.go new file mode 100644 index 00000000000..ea98239a2c7 --- /dev/null +++ b/internal/service/kafkaconnect/worker_configuration_data_source.go @@ -0,0 +1,102 @@ +package kafkaconnect + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kafkaconnect" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-provider-aws/internal/conns" +) + +func DataSourceWorkerConfiguration() *schema.Resource { + return &schema.Resource{ + Read: dataSourceWorkerConfigurationRead, + + Schema: map[string]*schema.Schema{ + "arn": { + Type: schema.TypeString, + Computed: true, + }, + "description": { + Type: schema.TypeString, + Computed: true, + }, + "latest_revision": { + Type: schema.TypeInt, + Computed: true, + }, + "name": { + Type: schema.TypeString, + Required: true, + }, + "properties_file_content": { + Type: schema.TypeString, + Computed: true, + }, + }, + } +} + +func dataSourceWorkerConfigurationRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*conns.AWSClient).KafkaConnectConn + + configName := d.Get("name") + + input := &kafkaconnect.ListWorkerConfigurationsInput{} + + var config *kafkaconnect.WorkerConfigurationSummary + + err := conn.ListWorkerConfigurationsPages(input, func(page *kafkaconnect.ListWorkerConfigurationsOutput, lastPage bool) bool { + if page == nil { + return !lastPage + } + + for _, configSummary := range page.WorkerConfigurations { + if aws.StringValue(configSummary.Name) == configName { + config = configSummary + + return false + } + } + + return !lastPage + }) + + if err != nil { + return fmt.Errorf("error listing MSK Connect Worker Configurations: %w", err) + } + + if config == nil { + return fmt.Errorf("error reading MSK Connect Worker Configuration (%s): no results found", configName) + } + + describeInput := &kafkaconnect.DescribeWorkerConfigurationInput{ + WorkerConfigurationArn: config.WorkerConfigurationArn, + } + + describeOutput, err := conn.DescribeWorkerConfiguration(describeInput) + + if err != nil { + return fmt.Errorf("error reading MSK Connect Worker Configuration (%s): %w", configName, err) + } + + d.SetId(aws.StringValue(config.Name)) + d.Set("arn", config.WorkerConfigurationArn) + d.Set("description", config.Description) + d.Set("name", config.Name) + + if config.LatestRevision != nil { + d.Set("latest_revision", config.LatestRevision.Revision) + } else { + d.Set("latest_revision", nil) + } + + if describeOutput.LatestRevision != nil { + d.Set("properties_file_content", decodePropertiesFileContent(aws.StringValue(describeOutput.LatestRevision.PropertiesFileContent))) + } else { + d.Set("properties_file_content", nil) + } + + return nil +} diff --git a/internal/service/kafkaconnect/worker_configuration_data_source_test.go b/internal/service/kafkaconnect/worker_configuration_data_source_test.go new file mode 100644 index 00000000000..5ca3121ff31 --- /dev/null +++ b/internal/service/kafkaconnect/worker_configuration_data_source_test.go @@ -0,0 +1,52 @@ +package kafkaconnect_test + +import ( + "fmt" + "testing" + + "github.com/aws/aws-sdk-go/service/kafkaconnect" + sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-provider-aws/internal/acctest" +) + +func TestAccKafkaConnectWorkerConfigurationDataSource_basic(t *testing.T) { + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + + propertiesFileContent := "key.converter=hello\nvalue.converter=world" + + resourceName := "aws_mskconnect_worker_configuration.test" + dataSourceName := "data.aws_mskconnect_worker_configuration.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(kafkaconnect.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, kafkaconnect.EndpointsID), + CheckDestroy: nil, + Providers: acctest.Providers, + Steps: []resource.TestStep{ + { + Config: testAccWorkerConfigurationDataSourceConfigBasic(rName, propertiesFileContent), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrPair(resourceName, "arn", dataSourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "description", dataSourceName, "description"), + resource.TestCheckResourceAttrPair(resourceName, "latest_revision", dataSourceName, "latest_revision"), + resource.TestCheckResourceAttrPair(resourceName, "name", dataSourceName, "name"), + resource.TestCheckResourceAttrPair(resourceName, "properties_file_content", dataSourceName, "properties_file_content"), + ), + }, + }, + }) +} + +func testAccWorkerConfigurationDataSourceConfigBasic(name, content string) string { + return fmt.Sprintf(` +resource "aws_mskconnect_worker_configuration" "test" { + name = %[1]q + properties_file_content = %[2]q +} + +data "aws_mskconnect_worker_configuration" "test" { + name = aws_mskconnect_worker_configuration.test.name +} +`, name, content) +} diff --git a/internal/service/kafkaconnect/worker_configuration_test.go b/internal/service/kafkaconnect/worker_configuration_test.go new file mode 100644 index 00000000000..e000b83bf65 --- /dev/null +++ b/internal/service/kafkaconnect/worker_configuration_test.go @@ -0,0 +1,159 @@ +package kafkaconnect_test + +import ( + "encoding/base64" + "fmt" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kafkaconnect" + sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/hashicorp/terraform-provider-aws/internal/acctest" + "github.com/hashicorp/terraform-provider-aws/internal/conns" +) + +func TestAccKafkaConnectWorkerConfiguration_basic(t *testing.T) { + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + + propertiesFileContent := "key.converter=hello\nvalue.converter=world" + + resourceName := "aws_mskconnect_worker_configuration.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(kafkaconnect.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, kafkaconnect.EndpointsID), + CheckDestroy: nil, + Providers: acctest.Providers, + Steps: []resource.TestStep{ + { + Config: testAccWorkerConfigurationBasic(rName, propertiesFileContent), + Check: resource.ComposeTestCheckFunc( + testAccCheckWorkerConfigurationExists(resourceName), + resource.TestCheckResourceAttrSet(resourceName, "arn"), + resource.TestCheckResourceAttrSet(resourceName, "latest_revision"), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "properties_file_content", propertiesFileContent), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccKafkaConnectWorkerConfiguration_description(t *testing.T) { + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + rDescription := sdkacctest.RandString(20) + + propertiesFileContent := "key.converter=hello\nvalue.converter=world" + + resourceName := "aws_mskconnect_worker_configuration.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(kafkaconnect.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, kafkaconnect.EndpointsID), + CheckDestroy: nil, + Providers: acctest.Providers, + Steps: []resource.TestStep{ + { + Config: testAccWorkerConfigurationDescription(rName, propertiesFileContent, rDescription), + Check: resource.ComposeTestCheckFunc( + testAccCheckWorkerConfigurationExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "description", rDescription), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccKafkaConnectWorkerConfiguration_properties_file_content(t *testing.T) { + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + + propertiesFileContent := "key.converter=hello\nvalue.converter=world" + propertiesFileContentBase64 := base64.StdEncoding.EncodeToString([]byte(propertiesFileContent)) + + resourceName := "aws_mskconnect_worker_configuration.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(kafkaconnect.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, kafkaconnect.EndpointsID), + CheckDestroy: nil, + Providers: acctest.Providers, + Steps: []resource.TestStep{ + { + Config: testAccWorkerConfigurationBasic(rName, propertiesFileContent), + Check: resource.ComposeTestCheckFunc( + testAccCheckWorkerConfigurationExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "properties_file_content", propertiesFileContent), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccWorkerConfigurationBasic(rName, propertiesFileContentBase64), + Check: resource.ComposeTestCheckFunc( + testAccCheckWorkerConfigurationExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "properties_file_content", propertiesFileContent), + ), + }, + }, + }) +} + +func testAccCheckWorkerConfigurationExists(name string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[name] + if !ok { + return fmt.Errorf("Not found: %s", name) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No MSK Worker Configuration ID is set") + } + + conn := acctest.Provider.Meta().(*conns.AWSClient).KafkaConnectConn + + params := &kafkaconnect.DescribeWorkerConfigurationInput{ + WorkerConfigurationArn: aws.String(rs.Primary.ID), + } + + _, err := conn.DescribeWorkerConfiguration(params) + if err != nil { + return err + } + + return nil + } +} + +func testAccWorkerConfigurationBasic(name, content string) string { + return fmt.Sprintf(` +resource "aws_mskconnect_worker_configuration" "test" { + name = %[1]q + properties_file_content = %[2]q +} +`, name, content) +} + +func testAccWorkerConfigurationDescription(name, content, description string) string { + return fmt.Sprintf(` +resource "aws_mskconnect_worker_configuration" "test" { + name = %[1]q + properties_file_content = %[2]q + description = %[3]q +} +`, name, content, description) +} diff --git a/website/docs/d/mskconnect_worker_configuration.html.markdown b/website/docs/d/mskconnect_worker_configuration.html.markdown new file mode 100644 index 00000000000..92b4d000a9a --- /dev/null +++ b/website/docs/d/mskconnect_worker_configuration.html.markdown @@ -0,0 +1,34 @@ +--- +subcategory: "Kafka Connect (MSK Connect)" +layout: "aws" +page_title: "AWS: aws_mskconnect_worker_configuration" +description: |- + Get information on an Amazon MSK Connect worker configuration. +--- + +# Data Source: aws_mskconnect_worker_configuration + +Get information on an Amazon MSK Connect Worker Configuration. + +## Example Usage + +```terraform +data "aws_mskconnect_worker_configuration" "example" { + name = "example" +} +``` + +## Argument Reference + +The following arguments are supported: + +* `name` - (Required) Name of the worker configuration. + +## Attribute Reference + +In addition to all arguments above, the following attributes are exported: + +* `arn` - the Amazon Resource Name (ARN) of the worker configuration. +* `description` - a summary description of the worker configuration. +* `latest_revision` - an ID of the latest successfully created revision of the worker configuration. +* `properties_file_content` - contents of connect-distributed.properties file. diff --git a/website/docs/r/mskconnect_worker_configuration.html.markdown b/website/docs/r/mskconnect_worker_configuration.html.markdown new file mode 100644 index 00000000000..2b68c1d4fae --- /dev/null +++ b/website/docs/r/mskconnect_worker_configuration.html.markdown @@ -0,0 +1,51 @@ +--- +subcategory: "Kafka Connect (MSK Connect)" +layout: "aws" +page_title: "AWS: aws_mskconnect_worker_configuration" +description: |- + Provides an Amazon MSK Connect worker configuration resource. +--- + +# Resource: aws_mskconnect_worker_configuration + +Provides an Amazon MSK Connect Worker Configuration Resource. + +## Example Usage + +### Basic configuration + +```terraform +resource "aws_mskconnect_worker_configuration" "example" { + name = "example" + properties_file_content = <