Skip to content

Commit

Permalink
Merge pull request #22414 from rtim75/f-aws_mskconnect_worker_configu…
Browse files Browse the repository at this point in the history
…ration

Add aws_mskconnect_worker_configuration resource
  • Loading branch information
ewbankkit authored Jan 6, 2022
2 parents 63d5a9c + 7302b2b commit 5902887
Show file tree
Hide file tree
Showing 9 changed files with 558 additions and 2 deletions.
7 changes: 7 additions & 0 deletions .changelog/22414.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:new-resource
aws_mskconnect_worker_configuration
```

```release-note:new-data-source
aws_mskconnect_worker_configuration
```
6 changes: 4 additions & 2 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,8 @@ func Provider() *schema.Provider {
"aws_msk_configuration": kafka.DataSourceConfiguration(),
"aws_msk_kafka_version": kafka.DataSourceVersion(),

"aws_mskconnect_custom_plugin": kafkaconnect.DataSourceCustomPlugin(),
"aws_mskconnect_custom_plugin": kafkaconnect.DataSourceCustomPlugin(),
"aws_mskconnect_worker_configuration": kafkaconnect.DataSourceWorkerConfiguration(),

"aws_kinesis_stream": kinesis.DataSourceStream(),
"aws_kinesis_stream_consumer": kinesis.DataSourceStreamConsumer(),
Expand Down Expand Up @@ -1337,7 +1338,8 @@ func Provider() *schema.Provider {
"aws_msk_configuration": kafka.ResourceConfiguration(),
"aws_msk_scram_secret_association": kafka.ResourceScramSecretAssociation(),

"aws_mskconnect_custom_plugin": kafkaconnect.ResourceCustomPlugin(),
"aws_mskconnect_custom_plugin": kafkaconnect.ResourceCustomPlugin(),
"aws_mskconnect_worker_configuration": kafkaconnect.ResourceWorkerConfiguration(),

"aws_kinesis_stream": kinesis.ResourceStream(),
"aws_kinesis_stream_consumer": kinesis.ResourceStreamConsumer(),
Expand Down
24 changes: 24 additions & 0 deletions internal/service/kafkaconnect/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,27 @@ func FindCustomPluginByARN(conn *kafkaconnect.KafkaConnect, arn string) (*kafkac

return output, nil
}

func FindWorkerConfigurationByARN(conn *kafkaconnect.KafkaConnect, arn string) (*kafkaconnect.DescribeWorkerConfigurationOutput, error) {
input := &kafkaconnect.DescribeWorkerConfigurationInput{
WorkerConfigurationArn: aws.String(arn),
}

output, err := conn.DescribeWorkerConfiguration(input)
if tfawserr.ErrCodeEquals(err, kafkaconnect.ErrCodeNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return output, nil
}
125 changes: 125 additions & 0 deletions internal/service/kafkaconnect/worker_configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package kafkaconnect

import (
"encoding/base64"
"fmt"
"log"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafkaconnect"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

func ResourceWorkerConfiguration() *schema.Resource {
return &schema.Resource{
Create: resourceWorkerConfigurationCreate,
Read: resourceWorkerConfigurationRead,
Delete: schema.Noop,

Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Schema: map[string]*schema.Schema{
"arn": {
Type: schema.TypeString,
Computed: true,
},
"description": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"latest_revision": {
Type: schema.TypeInt,
Computed: true,
},
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"properties_file_content": {
Type: schema.TypeString,
ForceNew: true,
Required: true,
StateFunc: func(v interface{}) string {
switch v := v.(type) {
case string:
return decodePropertiesFileContent(v)
default:
return ""
}
},
},
},
}
}

func resourceWorkerConfigurationCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).KafkaConnectConn

name := d.Get("name").(string)
properties := d.Get("properties_file_content").(string)

input := &kafkaconnect.CreateWorkerConfigurationInput{
Name: aws.String(name),
PropertiesFileContent: aws.String(verify.Base64Encode([]byte(properties))),
}

if v, ok := d.GetOk("description"); ok {
input.Description = aws.String(v.(string))
}

log.Print("[DEBUG] Creating MSK Connect Worker Configuration")
output, err := conn.CreateWorkerConfiguration(input)
if err != nil {
return fmt.Errorf("error creating MSK Connect Worker Configuration (%s): %w", name, err)
}

d.SetId(aws.StringValue(output.WorkerConfigurationArn))

return resourceWorkerConfigurationRead(d, meta)
}

func resourceWorkerConfigurationRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).KafkaConnectConn

config, err := FindWorkerConfigurationByARN(conn, d.Id())

if tfresource.NotFound(err) && !d.IsNewResource() {
log.Printf("[WARN] MSK Connect Worker Configuration (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}

if err != nil {
return fmt.Errorf("error reading MSK Connect Worker Configuration (%s): %w", d.Id(), err)
}

d.Set("arn", config.WorkerConfigurationArn)
d.Set("name", config.Name)
d.Set("description", config.Description)

if config.LatestRevision != nil {
d.Set("latest_revision", config.LatestRevision.Revision)
d.Set("properties_file_content", decodePropertiesFileContent(aws.StringValue(config.LatestRevision.PropertiesFileContent)))
} else {
d.Set("latest_revision", nil)
d.Set("properties_file_content", nil)
}

return nil
}

func decodePropertiesFileContent(content string) string {
result, err := base64.StdEncoding.DecodeString(content)
if err != nil {
return content
}

return string(result)
}
102 changes: 102 additions & 0 deletions internal/service/kafkaconnect/worker_configuration_data_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package kafkaconnect

import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafkaconnect"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
)

func DataSourceWorkerConfiguration() *schema.Resource {
return &schema.Resource{
Read: dataSourceWorkerConfigurationRead,

Schema: map[string]*schema.Schema{
"arn": {
Type: schema.TypeString,
Computed: true,
},
"description": {
Type: schema.TypeString,
Computed: true,
},
"latest_revision": {
Type: schema.TypeInt,
Computed: true,
},
"name": {
Type: schema.TypeString,
Required: true,
},
"properties_file_content": {
Type: schema.TypeString,
Computed: true,
},
},
}
}

func dataSourceWorkerConfigurationRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).KafkaConnectConn

configName := d.Get("name")

input := &kafkaconnect.ListWorkerConfigurationsInput{}

var config *kafkaconnect.WorkerConfigurationSummary

err := conn.ListWorkerConfigurationsPages(input, func(page *kafkaconnect.ListWorkerConfigurationsOutput, lastPage bool) bool {
if page == nil {
return !lastPage
}

for _, configSummary := range page.WorkerConfigurations {
if aws.StringValue(configSummary.Name) == configName {
config = configSummary

return false
}
}

return !lastPage
})

if err != nil {
return fmt.Errorf("error listing MSK Connect Worker Configurations: %w", err)
}

if config == nil {
return fmt.Errorf("error reading MSK Connect Worker Configuration (%s): no results found", configName)
}

describeInput := &kafkaconnect.DescribeWorkerConfigurationInput{
WorkerConfigurationArn: config.WorkerConfigurationArn,
}

describeOutput, err := conn.DescribeWorkerConfiguration(describeInput)

if err != nil {
return fmt.Errorf("error reading MSK Connect Worker Configuration (%s): %w", configName, err)
}

d.SetId(aws.StringValue(config.Name))
d.Set("arn", config.WorkerConfigurationArn)
d.Set("description", config.Description)
d.Set("name", config.Name)

if config.LatestRevision != nil {
d.Set("latest_revision", config.LatestRevision.Revision)
} else {
d.Set("latest_revision", nil)
}

if describeOutput.LatestRevision != nil {
d.Set("properties_file_content", decodePropertiesFileContent(aws.StringValue(describeOutput.LatestRevision.PropertiesFileContent)))
} else {
d.Set("properties_file_content", nil)
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package kafkaconnect_test

import (
"fmt"
"testing"

"github.com/aws/aws-sdk-go/service/kafkaconnect"
sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-provider-aws/internal/acctest"
)

func TestAccKafkaConnectWorkerConfigurationDataSource_basic(t *testing.T) {
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)

propertiesFileContent := "key.converter=hello\nvalue.converter=world"

resourceName := "aws_mskconnect_worker_configuration.test"
dataSourceName := "data.aws_mskconnect_worker_configuration.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(kafkaconnect.EndpointsID, t) },
ErrorCheck: acctest.ErrorCheck(t, kafkaconnect.EndpointsID),
CheckDestroy: nil,
Providers: acctest.Providers,
Steps: []resource.TestStep{
{
Config: testAccWorkerConfigurationDataSourceConfigBasic(rName, propertiesFileContent),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrPair(resourceName, "arn", dataSourceName, "arn"),
resource.TestCheckResourceAttrPair(resourceName, "description", dataSourceName, "description"),
resource.TestCheckResourceAttrPair(resourceName, "latest_revision", dataSourceName, "latest_revision"),
resource.TestCheckResourceAttrPair(resourceName, "name", dataSourceName, "name"),
resource.TestCheckResourceAttrPair(resourceName, "properties_file_content", dataSourceName, "properties_file_content"),
),
},
},
})
}

func testAccWorkerConfigurationDataSourceConfigBasic(name, content string) string {
return fmt.Sprintf(`
resource "aws_mskconnect_worker_configuration" "test" {
name = %[1]q
properties_file_content = %[2]q
}
data "aws_mskconnect_worker_configuration" "test" {
name = aws_mskconnect_worker_configuration.test.name
}
`, name, content)
}
Loading

0 comments on commit 5902887

Please sign in to comment.