Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws_msk_cluster: support Cluster expansion and Open Monitoring #11451

Merged
merged 4 commits into from
Feb 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 208 additions & 2 deletions aws/resource_aws_msk_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func resourceAwsMskCluster() *schema.Resource {
Type: schema.TypeString,
Optional: true,
Default: kafka.EnhancedMonitoringDefault,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{
kafka.EnhancedMonitoringDefault,
kafka.EnhancedMonitoringPerBroker,
Expand All @@ -205,7 +204,54 @@ func resourceAwsMskCluster() *schema.Resource {
"number_of_broker_nodes": {
Type: schema.TypeInt,
Required: true,
ForceNew: true,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that MSK allows you to increase, but not decrease this value. Probably due to how Kafka requires partition reassignment when the cluster make up changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. The same applies to the broker storage size (you can expand it but you cannot decrease it). However, no special validation rules have been applied in that case, so I assume that the same can be done here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a CustomizeDiff function is added then we can ForceNew if the number of broker nodes is decreased.
A similar example:
https://github.com/terraform-providers/terraform-provider-aws/blob/4acecb05959a9e9b07bea9afb87d760ea887da07/aws/resource_aws_elasticsearch_domain.go#L31-L54

},
"open_monitoring": {
Type: schema.TypeList,
Optional: true,
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"prometheus": {
Type: schema.TypeList,
Optional: true,
marcoreni marked this conversation as resolved.
Show resolved Hide resolved
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"jmx_exporter": {
Type: schema.TypeList,
Optional: true,
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enabled_in_broker": {
Type: schema.TypeBool,
Required: true,
},
},
},
},
"node_exporter": {
Type: schema.TypeList,
Optional: true,
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enabled_in_broker": {
Type: schema.TypeBool,
Required: true,
},
},
},
},
},
},
},
},
},
},
"tags": tagsSchema(),
"zookeeper_connect_string": {
Expand All @@ -228,6 +274,7 @@ func resourceAwsMskClusterCreate(d *schema.ResourceData, meta interface{}) error
EnhancedMonitoring: aws.String(d.Get("enhanced_monitoring").(string)),
KafkaVersion: aws.String(d.Get("kafka_version").(string)),
NumberOfBrokerNodes: aws.Int64(int64(d.Get("number_of_broker_nodes").(int))),
OpenMonitoring: expandMskOpenMonitoring(d.Get("open_monitoring").([]interface{})),
Tags: keyvaluetags.New(d.Get("tags").(map[string]interface{})).IgnoreAws().KafkaTags(),
}

Expand Down Expand Up @@ -343,6 +390,10 @@ func resourceAwsMskClusterRead(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("error setting tags: %s", err)
}

if err := d.Set("open_monitoring", flattenMskOpenMonitoring(cluster.OpenMonitoring)); err != nil {
return fmt.Errorf("error setting open_monitoring: %s", err)
}

d.Set("zookeeper_connect_string", aws.StringValue(cluster.ZookeeperConnectString))

return nil
Expand Down Expand Up @@ -380,6 +431,55 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
}
}

if d.HasChange("number_of_broker_nodes") {
input := &kafka.UpdateBrokerCountInput{
ClusterArn: aws.String(d.Id()),
CurrentVersion: aws.String(d.Get("current_version").(string)),
TargetNumberOfBrokerNodes: aws.Int64(int64(d.Get("number_of_broker_nodes").(int))),
}

output, err := conn.UpdateBrokerCount(input)

if err != nil {
return fmt.Errorf("error updating MSK Cluster (%s) broker count: %s", d.Id(), err)
}

if output == nil {
return fmt.Errorf("error updating MSK Cluster (%s) broker count: empty response", d.Id())
}

clusterOperationARN := aws.StringValue(output.ClusterOperationArn)

if err := waitForMskClusterOperation(conn, clusterOperationARN); err != nil {
return fmt.Errorf("error waiting for MSK Cluster (%s) operation (%s): %s", d.Id(), clusterOperationARN, err)
}
}

if d.HasChange("enhanced_monitoring") || d.HasChange("open_monitoring") {
input := &kafka.UpdateMonitoringInput{
ClusterArn: aws.String(d.Id()),
CurrentVersion: aws.String(d.Get("current_version").(string)),
EnhancedMonitoring: aws.String(d.Get("enhanced_monitoring").(string)),
OpenMonitoring: expandMskOpenMonitoring(d.Get("open_monitoring").([]interface{})),
}

output, err := conn.UpdateMonitoring(input)

if err != nil {
return fmt.Errorf("error updating MSK Cluster (%s) monitoring: %s", d.Id(), err)
}

if output == nil {
return fmt.Errorf("error updating MSK Cluster (%s) monitoring: empty response", d.Id())
}

clusterOperationARN := aws.StringValue(output.ClusterOperationArn)

if err := waitForMskClusterOperation(conn, clusterOperationARN); err != nil {
return fmt.Errorf("error waiting for MSK Cluster (%s) operation (%s): %s", d.Id(), clusterOperationARN, err)
}
}

if d.HasChange("configuration_info") {
input := &kafka.UpdateClusterConfigurationInput{
ClusterArn: aws.String(d.Id()),
Expand Down Expand Up @@ -516,6 +616,63 @@ func expandMskClusterTls(l []interface{}) *kafka.Tls {
return tls
}

func expandMskOpenMonitoring(l []interface{}) *kafka.OpenMonitoringInfo {
if len(l) == 0 || l[0] == nil {
return nil
}

m := l[0].(map[string]interface{})

openMonitoring := &kafka.OpenMonitoringInfo{
Prometheus: expandMskOpenMonitoringPrometheus(m["prometheus"].([]interface{})),
}

return openMonitoring
}

func expandMskOpenMonitoringPrometheus(l []interface{}) *kafka.PrometheusInfo {
if len(l) == 0 || l[0] == nil {
return nil
}

m := l[0].(map[string]interface{})

prometheus := &kafka.PrometheusInfo{
JmxExporter: expandMskOpenMonitoringPrometheusJmxExporter(m["jmx_exporter"].([]interface{})),
NodeExporter: expandMskOpenMonitoringPrometheusNodeExporter(m["node_exporter"].([]interface{})),
}

return prometheus
}

func expandMskOpenMonitoringPrometheusJmxExporter(l []interface{}) *kafka.JmxExporterInfo {
if len(l) == 0 || l[0] == nil {
return nil
}

m := l[0].(map[string]interface{})

jmxExporter := &kafka.JmxExporterInfo{
EnabledInBroker: aws.Bool(m["enabled_in_broker"].(bool)),
}

return jmxExporter
}

func expandMskOpenMonitoringPrometheusNodeExporter(l []interface{}) *kafka.NodeExporterInfo {
if len(l) == 0 || l[0] == nil {
return nil
}

m := l[0].(map[string]interface{})

nodeExporter := &kafka.NodeExporterInfo{
EnabledInBroker: aws.Bool(m["enabled_in_broker"].(bool)),
}

return nodeExporter
}

func flattenMskBrokerNodeGroupInfo(b *kafka.BrokerNodeGroupInfo) []map[string]interface{} {

if b == nil {
Expand Down Expand Up @@ -599,6 +756,55 @@ func flattenMskTls(tls *kafka.Tls) []map[string]interface{} {
return []map[string]interface{}{m}
}

func flattenMskOpenMonitoring(e *kafka.OpenMonitoring) []map[string]interface{} {
if e == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"prometheus": flattenMskOpenMonitoringPrometheus(e.Prometheus),
}

return []map[string]interface{}{m}
}

func flattenMskOpenMonitoringPrometheus(e *kafka.Prometheus) []map[string]interface{} {
if e == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"jmx_exporter": flattenMskOpenMonitoringPrometheusJmxExporter(e.JmxExporter),
"node_exporter": flattenMskOpenMonitoringPrometheusNodeExporter(e.NodeExporter),
}

return []map[string]interface{}{m}
}

func flattenMskOpenMonitoringPrometheusJmxExporter(e *kafka.JmxExporter) []map[string]interface{} {
if e == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"enabled_in_broker": aws.BoolValue(e.EnabledInBroker),
}

return []map[string]interface{}{m}
}

func flattenMskOpenMonitoringPrometheusNodeExporter(e *kafka.NodeExporter) []map[string]interface{} {
if e == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"enabled_in_broker": aws.BoolValue(e.EnabledInBroker),
}

return []map[string]interface{}{m}
}

func resourceAwsMskClusterDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kafkaconn

Expand Down
Loading