Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MSK Node Data source #15163

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions aws/data_source_aws_msk_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package aws

import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func dataSourceAwsMskNode() *schema.Resource {
return &schema.Resource{
Read: dataSourceAwsMskNodeRead,

Schema: map[string]*schema.Schema{
"attached_eni_id": {
Type: schema.TypeString,
Computed: true,
},
"broker_id": {
Type: schema.TypeInt,
Optional: true,
},
"client_subnet": {
Type: schema.TypeString,
Computed: true,
},
"client_vpc_ip_address": {
Type: schema.TypeString,
Computed: true,
},
"kafka_version": {
Type: schema.TypeString,
Computed: true,
},
"broker_endpoint": {
Type: schema.TypeString,
Optional: true,
},
"instance_type": {
Type: schema.TypeString,
Computed: true,
},
"arn": {
Type: schema.TypeString,
Computed: true,
},
"cluster_arn": {
Type: schema.TypeString,
Required: true,
},
},
}
}

func dataSourceAwsMskNodeRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kafkaconn

listNodesInput := &kafka.ListNodesInput{
ClusterArn: aws.String(d.Get("cluster_arn").(string)),
}

var nodes []*kafka.NodeInfo
for {
listNodesOutput, err := conn.ListNodes(listNodesInput)

if err != nil {
return fmt.Errorf("error listing MSK Cluster Nodes: %s", err)
}

if listNodesOutput == nil {
break
}

nodes = append(nodes, listNodesOutput.NodeInfoList...)

if aws.StringValue(listNodesOutput.NextToken) == "" {
break
}

listNodesInput.NextToken = listNodesOutput.NextToken
}

if len(nodes) == 0 {
return fmt.Errorf("error reading MSK Nodes: no results found")
}

var brokerNodes []*kafka.NodeInfo

for _, broker := range nodes {
if int(*broker.BrokerNodeInfo.BrokerId) == d.Get("broker_id").(int) {
brokerNodes = append(brokerNodes, broker)
} else if *broker.BrokerNodeInfo.Endpoints[0] == d.Get("broker_endpoint").(string) {
brokerNodes = append(brokerNodes, broker)
}
}

if len(brokerNodes) < 1 {
return fmt.Errorf("error reading MSK Nodes: node not found, try adjusting search criteria")
}

node := brokerNodes[0]

d.Set("attached_eni_id", aws.StringValue(node.BrokerNodeInfo.AttachedENIId))
d.Set("broker_id", int(aws.Float64Value(node.BrokerNodeInfo.BrokerId)))
d.Set("client_subnet", aws.StringValue(node.BrokerNodeInfo.ClientSubnet))
d.Set("client_vpc_ip_address", aws.StringValue(node.BrokerNodeInfo.ClientVpcIpAddress))
d.Set("kafka_version", aws.StringValue(node.BrokerNodeInfo.CurrentBrokerSoftwareInfo.KafkaVersion))
d.Set("broker_endpoint", aws.StringValue(node.BrokerNodeInfo.Endpoints[0]))
d.Set("instance_type", aws.StringValue(node.InstanceType))
d.Set("arn", aws.StringValue(node.NodeARN))
d.SetId(aws.StringValue(node.NodeARN))

return nil
}
56 changes: 56 additions & 0 deletions aws/data_source_aws_msk_node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package aws

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccAWSMskNodes_ClientIp(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_msk_node.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
Steps: []resource.TestStep{
{
Config: testAccMskNodeDataSourceClientVPC(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrSet(dataSourceName, "attached_eni_id"),
resource.TestCheckResourceAttrSet(dataSourceName, "broker_id"),
resource.TestCheckResourceAttrSet(dataSourceName, "client_subnet"),
resource.TestCheckResourceAttrSet(dataSourceName, "client_vpc_ip_address"),
resource.TestCheckResourceAttrSet(dataSourceName, "kafka_version"),
resource.TestCheckResourceAttrSet(dataSourceName, "broker_endpoint"),
resource.TestCheckResourceAttrSet(dataSourceName, "instance_type"),
resource.TestCheckResourceAttrSet(dataSourceName, "arn"),
),
},
},
})
}

func testAccMskNodeDataSourceClientVPC(rName string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = "2.2.1"
number_of_broker_nodes = 3
broker_node_group_info {
client_subnets = [aws_subnet.example_subnet_az1.id, aws_subnet.example_subnet_az2.id, aws_subnet.example_subnet_az3.id]
ebs_volume_size = 10
instance_type = "kafka.t3.small"
security_groups = [aws_security_group.example_sg.id]
}
}

data "aws_msk_node" "test" {
cluster_arn = aws_msk_cluster.test.arn
broker_endpoint = trimsuffix(split(",", aws_msk_cluster.test.bootstrap_brokers_tls)[0], ":9094")
}
`, rName)

}
1 change: 1 addition & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func Provider() *schema.Provider {
"aws_lex_slot_type": dataSourceAwsLexSlotType(),
"aws_mq_broker": dataSourceAwsMqBroker(),
"aws_msk_cluster": dataSourceAwsMskCluster(),
"aws_msk_node": dataSourceAwsMskNode(),
"aws_msk_configuration": dataSourceAwsMskConfiguration(),
"aws_nat_gateway": dataSourceAwsNatGateway(),
"aws_neptune_orderable_db_instance": dataSourceAwsNeptuneOrderableDbInstance(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ The following arguments are supported:

In addition to the argument above, the following attributes are exported:

* `repository_endpoint` - The URL of the returned endpoint.
* `repository_endpoint` - The URL of the returned endpoint.
40 changes: 40 additions & 0 deletions website/docs/d/msk_node.html.markdown
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---
subcategory: "Managed Streaming for Kafka (MSK)"
layout: "aws"
page_title: "AWS: aws_msk_node"
description: |-
Get information on an Amazon MSK Cluster node
---

# Data Source: aws_msk_node

Get information on an Amazon MSK Cluster node.

## Example Usage

```hcl
data "aws_msk_node" "example" {
cluster_arn = "example"
broker_endpoint = "b-1.kafka-primary.0000.00.kafka.us-east-1.amazonaws.com"
}
```

## Argument Reference

The following arguments are supported:

* `cluster_arn` - (Required) Arn of the cluster.
* `broker_endpoint` - (Optional) Broker endpoint of the node.
* `broker_id` - (Optional) Broker ID of the node.

## Attribute Reference

In addition to all arguments above, the following attributes are exported:

* `arn` - Amazon Resource Name (ARN) of the MSK cluster node.
* `attached_eni_id` - The attached elastic network interface of the broker
* `client_subnet` - The client subnet to which this broker node belongs
* `client_vpc_ip_address` - The virtual private cloud (VPC) of the client
* `cluster_arn` - Amazon Resource Name (ARN) of the MSK cluster.
* `instance_type` - The instance type
* `kafka_version` - The version of Apache Kafka