From a8b8d7726f81e488c7fe169c371f98d6cb3885d2 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Sun, 18 Apr 2021 12:04:07 +0300 Subject: [PATCH 01/17] feat(kafka): aws_msk_config and aws_msk_cluster modules have been added --- aws_msk_cluster.py | 827 +++++++++++++++++++++++++++++++++++++++++++++ aws_msk_config.py | 295 ++++++++++++++++ 2 files changed, 1122 insertions(+) create mode 100644 aws_msk_cluster.py create mode 100644 aws_msk_config.py diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py new file mode 100644 index 00000000000..f4d9a5db84f --- /dev/null +++ b/aws_msk_cluster.py @@ -0,0 +1,827 @@ +#!/usr/bin/python +# Copyright: (c) 2021, Daniil Kupchenko (@oukooveu) +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + + +DOCUMENTATION = r""" +--- +module: aws_msk_cluster +short_description: Manage Amazon MSK clusters. +version_added: "1.5.0" +requirements: + - botocore >= 1.17.42 + - boto3 >= 1.17.9 +description: + - Create, delete and modify Amazon MSK (Managed Streaming for Apache Kafka) clusters. +author: + - Daniil Kupchenko (@oukooveu) +options: + state: + description: Create (present) or delete (absent) cluster. + choices: ['present', 'absent'] + type: str + default: 'present' + name: + description: The name of the cluster. + required: true + type: str + version: + description: + - The version of Apache Kafka. + - This version should exist in given configuration. + - This parameter is required when I(state=present). + type: str + configuration_arn: + description: + - ARN of the configuration to use. + - This parameter is required when I(state=present). + type: str + configuration_revision: + description: + - The revision of the configuration to use. + - This parameter is required when I(state=present). + type: int + nodes: + description: The number of broker nodes in the cluster. Should be greater or equal to two. + type: int + default: 3 + instance_type: + description: + - The type of Amazon EC2 instances to use for Kafka brokers. + - Update operation requires boto3 version >= 1.16.58 + choices: + - kafka.t3.small + - kafka.m5.large + - kafka.m5.xlarge + - kafka.m5.2xlarge + - kafka.m5.4xlarge + default: kafka.t3.small + type: str + ebs_volume_size: + description: The size in GiB of the EBS volume for the data drive on each broker node. + type: int + default: 100 + subnets: + description: + - The list of subnets to connect to in the client virtual private cloud (VPC). + AWS creates elastic network interfaces inside these subnets. Client applications use + elastic network interfaces to produce and consume data. + Client subnets can't be in Availability Zone us-east-1e. + - This parameter is required when I(state=present). + type: list + elements: str + security_groups: + description: + - The AWS security groups to associate with the elastic network interfaces in order to specify + who can connect to and communicate with the Amazon MSK cluster. + If you don't specify a security group, Amazon MSK uses the default security group associated with the VPC. + type: list + elements: str + encryption: + description: + - Includes all encryption-related information. + - Effective only for new cluster and can not be updated. + type: dict + suboptions: + kms_key_id: + description: + - The ARN of the AWS KMS key for encrypting data at rest. If you don't specify a KMS key, MSK creates one for you and uses it. + default: Null + type: str + in_transit: + description: The details for encryption in transit. + type: dict + suboptions: + in_cluster: + description: + - When set to true, it indicates that data communication among the broker nodes of the cluster is encrypted. + When set to false, the communication happens in plaintext. + type: bool + default: True + client_broker: + description: + - Indicates the encryption setting for data in transit between clients and brokers. The following are the possible values. + TLS means that client-broker communication is enabled with TLS only. + TLS_PLAINTEXT means that client-broker communication is enabled for both TLS-encrypted, as well as plaintext data. + PLAINTEXT means that client-broker communication is enabled in plaintext only. + choices: + - TLS + - TLS_PLAINTEXT + - PLAINTEXT + type: str + default: TLS + authentication: + description: + - Includes all client authentication related information. + - Effective only for new cluster and can not be updated. + type: dict + suboptions: + tls_ca_arn: + description: List of ACM Certificate Authority ARNs. + type: list + elements: str + sasl_scram: + description: SASL/SCRAM authentication is enabled or not. + type: bool + default: False + enhanced_monitoring: + description: Specifies the level of monitoring for the MSK cluster. + choices: + - DEFAULT + - PER_BROKER + - PER_TOPIC_PER_BROKER + - PER_TOPIC_PER_PARTITION + default: DEFAULT + type: str + open_monitoring: + description: The settings for open monitoring. + type: dict + suboptions: + jmx_exporter: + description: Indicates whether you want to enable or disable the JMX Exporter. + type: bool + default: False + node_exporter: + description: Indicates whether you want to enable or disable the Node Exporter. + type: bool + default: False + logging: + description: Logging configuration. + type: dict + suboptions: + cloudwatch: + description: Details of the CloudWatch Logs destination for broker logs. + type: dict + suboptions: + enabled: + description: Specifies whether broker logs get sent to the specified CloudWatch Logs destination. + type: bool + default: False + log_group: + description: The CloudWatch log group that is the destination for broker logs. + type: str + required: False + firehose: + description: Details of the Kinesis Data Firehose delivery stream that is the destination for broker logs. + type: dict + suboptions: + enabled: + description: Specifies whether broker logs get send to the specified Kinesis Data Firehose delivery stream. + type: bool + default: False + delivery_stream: + description: The Kinesis Data Firehose delivery stream that is the destination for broker logs. + type: str + required: False + s3: + description: Details of the Amazon S3 destination for broker logs. + type: dict + suboptions: + enabled: + description: Specifies whether broker logs get sent to the specified Amazon S3 destination. + type: bool + default: False + bucket: + description: The name of the S3 bucket that is the destination for broker logs. + type: str + required: False + prefix: + description: The S3 prefix that is the destination for broker logs. + type: str + required: False + wait: + description: Whether to wait for the cluster to be available or deleted. + type: bool + default: false + wait_timeout: + description: How long to wait, seconds. Cluster creation can take up to 20-30 minutes. + type: int + default: 3600 + tags: + description: Tag dictionary to apply to the cluster. + type: dict + purge_tags: + description: Remove tags not listed in I(tags) when tags is specified. + default: true + type: bool +extends_documentation_fragment: + - amazon.aws.aws + - amazon.aws.ec2 +notes: + - All operations are time consuming, for example create takes 20-30 minutes, + update kafka version -- more than one hour, update configuration -- 10-15 minutes; + - Cluster's brokers get evenly distributed over a number of availability zones + that's equal to the number of subnets. +""" + +EXAMPLES = r""" +# Note: These examples do not set authentication details, see the AWS Guide for details. + +- aws_msk_cluster: + name: kafka-cluster + state: present + version: 2.6.1 + nodes: 6 + ebs_volume_size: "{{ aws_msk_options.ebs_volume_size }}" + subnets: + - subnet-e3b48ce7c25861eeb + - subnet-2990c8b25b07ddd43 + - subnet-d9fbeaf46c54bfab6 + wait: true + wait_timeout: 1800 + configuration_arn: arn:aws:kafka:us-east-1:000000000001:configuration/kafka-cluster-configuration/aaaaaaaa-bbbb-4444-3333-ccccccccc-1 + configuration_revision: 1 + +- aws_msk_cluster: + name: kafka-cluster + state: absent +""" + +RETURN = r""" +# These are examples of possible return values, and in general should use other names for return values. + +bootstrap_broker_string: + description: A list of brokers that a client application can use to bootstrap. + type: complex + contains: + plain: + description: A string containing one or more hostname:port pairs. + type: str + tls: + description: A string containing one or more DNS names (or IP) and TLS port pairs. + type: str + returned: I(state=present) and cluster state is I(ACTIVE) +cluster_info: + description: Description of the MSK cluster. + type: dict + returned: I(state=present) +response: + description: The response from actual API call. + type: dict + returned: always + sample: {} +""" + +import time + +try: + import botocore +except ImportError: + pass # handled by AnsibleAWSModule + +from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ( + camel_dict_to_snake_dict, + compare_aws_tags, + AWSRetry, +) + + +@AWSRetry.backoff(tries=5, delay=5) +def list_clusters_with_backoff(client, cluster_name): + paginator = client.get_paginator("list_clusters") + return paginator.paginate(ClusterNameFilter=cluster_name).build_full_result() + + +@AWSRetry.backoff(tries=5, delay=5) +def list_nodes_with_backoff(client, cluster_arn): + paginator = client.get_paginator("list_nodes") + return paginator.paginate(ClusterArn=cluster_arn).build_full_result() + + +def find_cluster_by_name(client, module, cluster_name): + cluster_list = list_clusters_with_backoff(client, cluster_name).get("ClusterInfoList", []) + if cluster_list: + if len(cluster_list) != 1: + module.fail_json(msg="Found more than one cluster with name '{0}'".format(cluster_name)) + return cluster_list[0] + return {} + + +def get_cluster_state(client, module, arn): + try: + response = client.describe_cluster(ClusterArn=arn) + except client.exceptions.NotFoundException: + return "DELETED" + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "Failed to get kafka cluster state") + return response["ClusterInfo"]["State"] + + +def get_cluster_version(client, module, arn): + try: + response = client.describe_cluster(ClusterArn=arn) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "Failed to get kafka cluster version") + return response["ClusterInfo"]["CurrentVersion"] + + +def wait_for_cluster_state(client, module, arn, state="ACTIVE"): + start = time.time() + timeout = int(module.params.get("wait_timeout")) + check_interval = 60 + while True: + current_state = get_cluster_state(client, module, arn) + if current_state == state: + return + if time.time() - start > timeout: + module.fail_json( + msg="Timeout waiting for cluster {0} (desired state is '{1}')".format( + current_state, state + ) + ) + time.sleep(check_interval) + + +def prepare_create_options(module): + """ + Return data structure for cluster create operation + """ + + c_params = { + "ClusterName": module.params["name"], + "KafkaVersion": module.params["version"], + "ConfigurationInfo": { + "Arn": module.params["configuration_arn"], + "Revision": module.params["configuration_revision"], + }, + "NumberOfBrokerNodes": module.params["nodes"], + "BrokerNodeGroupInfo": { + "ClientSubnets": module.params["subnets"], + "InstanceType": module.params["instance_type"], + } + } + + if module.params["security_groups"] and len(module.params["security_groups"]) != 0: + c_params["BrokerNodeGroupInfo"]["SecurityGroups"] = module.params.get("security_groups") + + if module.params["ebs_volume_size"]: + c_params["BrokerNodeGroupInfo"]["StorageInfo"] = { + "EbsStorageInfo": { + "VolumeSize": module.params.get("ebs_volume_size") + } + } + + if module.params["encryption"]: + c_params["EncryptionInfo"] = {} + if module.params["encryption"].get("kms_key_id"): + c_params["EncryptionInfo"]["EncryptionAtRest"] = { + "DataVolumeKMSKeyId": module.params["encryption"]["kms_key_id"] + } + c_params["EncryptionInfo"]["EncryptionInTransit"] = { + "ClientBroker": module.params["encryption"]["in_transit"].get("client_broker", "TLS"), + "InCluster": module.params["encryption"]["in_transit"].get("in_cluster", True) + } + + if module.params["authentication"]: + c_params["ClientAuthentication"] = {} + if module.params["authentication"].get("sasl_scram"): + c_params["ClientAuthentication"]["Sasl"] = { + "Scram": module.params["authentication"]["sasl_scram"] + } + if module.params["authentication"].get("tls_ca_arn"): + c_params["ClientAuthentication"]["Tls"] = { + "CertificateAuthorityArnList": module.params["authentication"]["tls_ca_arn"] + } + + c_params.update(prepare_enhanced_monitoring_options(module)) + c_params.update(prepare_open_monitoring_options(module)) + c_params.update(prepare_logging_options(module)) + + return c_params + + +def prepare_enhanced_monitoring_options(module): + m_params = {} + m_params["EnhancedMonitoring"] = module.params["enhanced_monitoring"] or "DEFAULT" + return m_params + + +def prepare_open_monitoring_options(module): + m_params = {} + open_monitoring = module.params["open_monitoring"] or {} + m_params["OpenMonitoring"] = { + "Prometheus": { + "JmxExporter": { + "EnabledInBroker": open_monitoring.get("jmx_exporter", False) + }, + "NodeExporter": { + "EnabledInBroker": open_monitoring.get("node_exporter", False) + } + } + } + return m_params + + +def prepare_logging_options(module): + l_params = {} + logging = module.params["logging"] or {} + if logging.get("cloudwatch"): + l_params["CloudWatchLogs"] = { + "Enabled": module.params["logging"]["cloudwatch"].get("enabled"), + "LogGroup": module.params["logging"]["cloudwatch"].get("log_group") + } + else: + l_params["CloudWatchLogs"] = { + "Enabled": False + } + if logging.get("firehose"): + l_params["Firehose"] = { + "Enabled": module.params["logging"]["firehose"].get("enabled"), + "DeliveryStream": module.params["logging"]["firehose"].get("delivery_stream") + } + else: + l_params["Firehose"] = { + "Enabled": False + } + if logging.get("s3"): + l_params["S3"] = { + "Enabled": module.params["logging"]["s3"].get("enabled"), + "Bucket": module.params["logging"]["s3"].get("bucket"), + "Prefix": module.params["logging"]["s3"].get("prefix") + } + else: + l_params["S3"] = { + "Enabled": False + } + return { + "LoggingInfo": { + "BrokerLogs": l_params + } + } + + +def create_or_update_cluster(client, module): + """ + Create new or update existing cluster + """ + + changed = False + response = {} + + cluster = find_cluster_by_name(client, module, module.params["name"]) + + if not cluster: + + changed = True + + if module.check_mode: + return True, {} + + create_params = prepare_create_options(module) + + try: + response = client.create_cluster(**create_params) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "Failed to create kafka cluster") + + if module.params.get("wait"): + wait_for_cluster_state(client, module, arn=response["ClusterArn"], state="ACTIVE") + + else: + + response["ClusterArn"] = cluster["ClusterArn"] + response["changes"] = {} + + # prepare available update methods definitions with current/target values and options + msk_cluster_changes = { + "broker_count": { + "current_value": cluster["NumberOfBrokerNodes"], + "target_value": module.params.get("nodes"), + "update_params": { + "TargetNumberOfBrokerNodes": module.params.get("nodes") + } + }, + "broker_storage": { + "current_value": cluster["BrokerNodeGroupInfo"]["StorageInfo"]["EbsStorageInfo"]["VolumeSize"], + "target_value": module.params.get("ebs_volume_size"), + "update_params": { + "TargetBrokerEBSVolumeInfo": [ + {"KafkaBrokerNodeId": "All", "VolumeSizeGB": module.params.get("ebs_volume_size")} + ] + } + }, + "broker_type": { + "boto3_version": "1.16.58", + "current_value": cluster["BrokerNodeGroupInfo"]["InstanceType"], + "target_value": module.params.get("instance_type"), + "update_params": { + "TargetInstanceType": module.params.get("instance_type") + } + }, + "cluster_configuration": { + "current_value": { + "arn": cluster["CurrentBrokerSoftwareInfo"]["ConfigurationArn"], + "revision": cluster["CurrentBrokerSoftwareInfo"]["ConfigurationRevision"], + }, + "target_value": { + "arn": module.params.get("configuration_arn"), + "revision": module.params.get("configuration_revision"), + }, + "update_params": { + "ConfigurationInfo": { + "Arn": module.params.get("configuration_arn"), + "Revision": module.params.get("configuration_revision") + } + } + }, + "cluster_kafka_version": { + "current_value": cluster["CurrentBrokerSoftwareInfo"]["KafkaVersion"], + "target_value": module.params.get("version"), + "update_params": { + "TargetKafkaVersion": module.params.get("version") + } + }, + "enhanced_monitoring": { + "current_value": cluster["EnhancedMonitoring"], + "target_value": module.params.get("enhanced_monitoring"), + "update_method": "update_monitoring", + "update_params": prepare_enhanced_monitoring_options(module) + }, + "open_monitoring": { + "current_value": { + "OpenMonitoring": cluster["OpenMonitoring"] + }, + "target_value": prepare_open_monitoring_options(module), + "update_method": "update_monitoring", + "update_params": prepare_open_monitoring_options(module) + }, + "logging": { + "current_value": { + "LoggingInfo": cluster["LoggingInfo"] + }, + "target_value": prepare_logging_options(module), + "update_method": "update_monitoring", + "update_params": prepare_logging_options(module) + } + } + + for method, options in msk_cluster_changes.items(): + + if 'boto3_version' in options: + if not module.boto3_at_least(options["boto3_version"]): + continue + + try: + update_method = getattr(client, options.get("update_method", "update_" + method)) + except AttributeError as e: + module.fail_json_aws(e, "There is no update method 'update_{0}'".format(method)) + + if options["current_value"] != options["target_value"]: + changed = True + if module.check_mode: + return True, {} + + # need to get cluster version and check for the state because + # there can be several updates requested but only one in time can be performed + version = get_cluster_version(client, module, cluster["ClusterArn"]) + state = get_cluster_state(client, module, cluster["ClusterArn"]) + if state != "ACTIVE": + if module.params["wait"]: + wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="ACTIVE") + else: + module.fail_json( + msg="Cluster can be updated only in active state, current state is '{0}'. check cluster state or use wait option".format( + state + ) + ) + try: + response["changes"][method] = update_method( + ClusterArn=cluster["ClusterArn"], + CurrentVersion=version, + **options["update_params"] + ) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws( + e, "Failed to update cluster via 'update_{0}'".format(method) + ) + + if module.params["wait"]: + wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="ACTIVE") + + changed |= update_cluster_tags(client, module, response["ClusterArn"]) + + return changed, response + + +def update_cluster_tags(client, module, arn): + new_tags = module.params.get('tags') + if new_tags is None: + return False + purge_tags = module.params.get('purge_tags') + + try: + existing_tags = client.list_tags_for_resource(ResourceArn=arn)['Tags'] + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="Unable to retrieve tags for cluster '{0}'".format(arn)) + + tags_to_add, tags_to_remove = compare_aws_tags(existing_tags, new_tags, purge_tags=purge_tags) + + if not module.check_mode: + try: + if tags_to_remove: + client.untag_resource(ResourceArn=arn, TagKeys=tags_to_remove) + if tags_to_add: + client.tag_resource(ResourceArn=arn, Tags=tags_to_add) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="Unable to set tags for cluster '{0}'".format(arn)) + + changed = bool(tags_to_add) or bool(tags_to_remove) + return changed + + +def delete_cluster(client, module): + + cluster = find_cluster_by_name(client, module, module.params["name"]) + + if module.check_mode: + if cluster: + return True, cluster + else: + return False, {} + + if not cluster: + return False, {} + + try: + response = client.delete_cluster( + ClusterArn=cluster["ClusterArn"], + CurrentVersion=cluster["CurrentVersion"], + ) + except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: + module.fail_json_aws(e, "Failed to delete kafka cluster") + + if module.params["wait"]: + wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="DELETED") + + response["bootstrap_broker_string"] = {} + + return True, response + + +def main(): + + module_args = dict( + name=dict(type="str", required=True), + state=dict(type="str", choices=["present", "absent"], default="present"), + version=dict(type="str"), + configuration_arn=dict(type="str"), + configuration_revision=dict(type="int"), + nodes=dict(type="int", default=3), + instance_type=dict( + choices=[ + "kafka.t3.small", + "kafka.m5.large", + "kafka.m5.xlarge", + "kafka.m5.2xlarge", + "kafka.m5.4xlarge", + ], + default="kafka.t3.small", + ), + ebs_volume_size=dict(type="int", default=100), + subnets=dict(type="list", elements="str"), + security_groups=dict(type="list", elements="str", required=False), + encryption=dict( + type="dict", + options=dict( + kms_key_id=dict(type="str", required=False), + in_transit=dict( + type="dict", + options=dict( + in_cluster=dict(type="bool", default=True), + client_broker=dict( + choices=["TLS", "TLS_PLAINTEXT", "PLAINTEXT"], + default="TLS" + ), + ), + ), + ), + ), + authentication=dict( + type="dict", + options=dict( + tls_ca_arn=dict(type="list", elements="str", required=False), + sasl_scram=dict(type="bool", default=False), + ), + ), + enhanced_monitoring=dict( + choices=[ + "DEFAULT", + "PER_BROKER", + "PER_TOPIC_PER_BROKER", + "PER_TOPIC_PER_PARTITION", + ], + default="DEFAULT", + required=False, + ), + open_monitoring=dict( + type="dict", + options=dict( + jmx_exporter=dict(type="bool", default=False), + node_exporter=dict(type="bool", default=False), + ), + ), + logging=dict( + type="dict", + options=dict( + cloudwatch=dict( + type="dict", + options=dict( + enabled=dict(type="bool", default=False), + log_group=dict(type="str", required=False), + ), + ), + firehose=dict( + type="dict", + options=dict( + enabled=dict(type="bool", default=False), + delivery_stream=dict(type="str", required=False), + ), + ), + s3=dict( + type="dict", + options=dict( + enabled=dict(type="bool", default=False), + bucket=dict(type="str", required=False), + prefix=dict(type="str", required=False), + ), + ), + ), + ), + wait=dict(type="bool", default=False), + wait_timeout=dict(type="int", default=3600), + tags=dict(type='dict'), + purge_tags=dict(type='bool', default=True), + ) + + module = AnsibleAWSModule( + argument_spec=module_args, + required_if=[['state', 'present', ['version', 'configuration_arn', 'configuration_revision', 'subnets']]], + supports_check_mode=True + ) + + client = module.client("kafka") + + if module.params["state"] == "present": + if len(module.params["subnets"]) < 2: + module.fail_json( + msg="At least two client subnets should be provided" + ) + if int(module.params["nodes"]) % int(len(module.params["subnets"])) != 0: + module.fail_json( + msg="The number of broker nodes must be a multiple of availability zones in the subnets parameter" + ) + changed, response = create_or_update_cluster(client, module) + elif module.params["state"] == "absent": + changed, response = delete_cluster(client, module) + + cluster_info = {} + bootstrap_broker_string = {} + if response.get("ClusterArn") and module.params["state"] == "present": + try: + cluster_info = client.describe_cluster(ClusterArn=response["ClusterArn"])[ + "ClusterInfo" + ] + if cluster_info.get("State") == "ACTIVE": + brokers = client.get_bootstrap_brokers(ClusterArn=response["ClusterArn"]) + if brokers.get("BootstrapBrokerString"): + bootstrap_broker_string["plain"] = brokers["BootstrapBrokerString"] + if brokers.get("BootstrapBrokerStringTls"): + bootstrap_broker_string["tls"] = brokers["BootstrapBrokerStringTls"] + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws( + e, + "Can not obtain information about cluster {0}".format( + response["ClusterArn"] + ), + ) + + module.exit_json( + changed=changed, + bootstrap_broker_string=bootstrap_broker_string, + cluster_info=camel_dict_to_snake_dict(cluster_info), + response=camel_dict_to_snake_dict(response), + ) + + +if __name__ == "__main__": + main() diff --git a/aws_msk_config.py b/aws_msk_config.py new file mode 100644 index 00000000000..5b3cbd9e492 --- /dev/null +++ b/aws_msk_config.py @@ -0,0 +1,295 @@ +#!/usr/bin/python +# Copyright: (c) 2021, Daniil Kupchenko (@oukooveu) +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + + +DOCUMENTATION = r""" +--- +module: aws_msk_config +short_description: Manage Amazon MSK cluster configurations. +version_added: "1.5.0" +requirements: + - botocore >= 1.17.42 + - boto3 >= 1.17.9 +description: + - Create, delete and modify Amazon MSK (Managed Streaming for Apache Kafka) cluster configurations. +author: + - Daniil Kupchenko (@oukooveu) +options: + state: + description: Create (present) or delete (absent) cluster configuration. + choices: ['present', 'absent'] + default: 'present' + type: str + name: + description: The name of the configuration. + required: true + type: str + description: + description: The description of the configuration. + type: str + config: + description: Contents of the server.properties file. + type: dict + aliases: ['configuration'] + kafka_versions: + description: + - The versions of Apache Kafka with which you can use this MSK configuration. + - Required when I(state=present). + type: list + elements: str +extends_documentation_fragment: + - amazon.aws.aws + - amazon.aws.ec2 +""" + +EXAMPLES = r""" +# Note: These examples do not set authentication details, see the AWS Guide for details. + +- aws_msk_config: + name: kafka-cluster-configuration + state: present + kafka_versions: + - 2.6.0 + - 2.6.1 + config: + auto.create.topics.enable=false + num.partitions=1 + default.replication.factor=3 + zookeeper.session.timeout.ms=18000 + +- aws_msk_config: + name: kafka-cluster-configuration + state: absent +""" + +RETURN = r""" +# These are examples of possible return values, and in general should use other names for return values. + +arn: + description: The Amazon Resource Name (ARN) of the configuration. + type: str + returned: I(state=present) + sample: "arn:aws:kafka:::configuration//" +revision: + description: The revision number. + type: int + returned: I(state=present) + sample: 1 +server_properties: + description: Contents of the server.properties file. + type: str + returned: I(state=present) + sample: "default.replication.factor=3\nnum.io.threads=8\nzookeeper.session.timeout.ms=18000" +response: + description: The response from actual API call. + type: dict + returned: always + sample: {} +""" + +try: + import botocore +except ImportError: + pass # handled by AnsibleAWSModule + +from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ( + camel_dict_to_snake_dict, + AWSRetry, +) + + +BOTOCORE_MIN_VERSION = "1.17.42" + + +def dict_to_prop(d): + """convert dictionary to multi-line properties""" + if len(d) == 0: + return "" + return "\n".join("{0}={1}".format(k, v) for k, v in d.items()) + + +def prop_to_dict(p): + """convert properties to dictionary""" + if len(p) == 0: + return {} + return { + k.strip(): v.strip() for k, v in (i.split("=") for i in p.decode().split("\n")) + } + + +@AWSRetry.backoff(tries=5, delay=5) +def get_configurations_with_backoff(client): + paginator = client.get_paginator("list_configurations") + return paginator.paginate().build_full_result() + + +def find_active_config(client, module): + """ + looking for configuration by name + status is not returned for list_configurations in botocore 1.17.42 + delete_configuration method was added in botocore 1.17.48 + """ + + name = module.params["name"] + + try: + all_configs = get_configurations_with_backoff(client)["Configurations"] + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="failed to obtain kafka configurations") + + active_configs = list( + item + for item in all_configs + if item["Name"] == name and item["State"] == "ACTIVE" + ) + + if active_configs: + if len(active_configs) == 1: + return active_configs[0] + else: + module.fail_json_aws( + msg="found more than one active config with name '{0}'".format(name) + ) + + return None + + +def create_config(client, module): + """create new or update existing configuration""" + + config = find_active_config(client, module) + + # create new configuration + if not config: + + if module.check_mode: + return True, {} + + try: + response = client.create_configuration( + Name=module.params.get("name"), + Description=module.params.get("description"), + KafkaVersions=module.params.get("kafka_versions"), + ServerProperties=dict_to_prop(module.params.get("config")).encode(), + ) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "failed to create kafka configuration") + + # update existing configuration (creates new revision) + else: + # it's required because 'config' doesn't contain 'ServerProperties' + response = client.describe_configuration_revision( + Arn=config["Arn"], Revision=config["LatestRevision"]["Revision"] + ) + + # compare configurations (description and properties) and update if required + prop_module = {str(k): str(v) for k, v in module.params.get("config").items()} + if prop_to_dict(response.get("ServerProperties", "")) == prop_module: + if response.get("Description", "") == module.params.get("description"): + return False, response + + if module.check_mode: + return True, {} + + try: + response = client.update_configuration( + Arn=config["Arn"], + Description=module.params.get("description"), + ServerProperties=dict_to_prop(module.params.get("config")).encode(), + ) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "failed to update kafka configuration") + + arn = response["Arn"] + revision = response["LatestRevision"]["Revision"] + + result = client.describe_configuration_revision(Arn=arn, Revision=revision) + + return True, result + + +def delete_config(client, module): + """delete configuration""" + + config = find_active_config(client, module) + + if module.check_mode: + if config: + return True, config + else: + return False, {} + + if config: + try: + response = client.delete_configuration(Arn=config["Arn"]) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "failed to delete the kafka configuration") + return True, response + + return False, {} + + +def main(): + + module_args = dict( + name=dict(type="str", required=True), + description=dict(type="str", default=""), + state=dict(choices=["present", "absent"], default="present"), + config=dict(type="dict", aliases=["configuration"], default={}), + kafka_versions=dict(type="list", elements="str"), + ) + + module = AnsibleAWSModule(argument_spec=module_args, supports_check_mode=True) + + if not module.botocore_at_least(BOTOCORE_MIN_VERSION): + module.fail_json( + msg="aws_msk_config module requires botocore >= {0}".format( + BOTOCORE_MIN_VERSION + ) + ) + + client = module.client("kafka") + + if module.params["state"] == "present": + changed, response = create_config(client, module) + + elif module.params["state"] == "absent": + changed, response = delete_config(client, module) + + # return some useless staff in check mode if configuration doesn't exists + # can be useful when these options are referenced by other modules during check mode run + if module.check_mode and not response.get("Arn"): + arn = "arn:aws:kafka:region:account:configuration/name/id" + revision = 1 + server_properties = "" + else: + arn = response.get("Arn") + revision = response.get("Revision") + server_properties = response.get("ServerProperties", "") + + module.exit_json( + changed=changed, + arn=arn, + revision=revision, + server_properties=server_properties, + response=camel_dict_to_snake_dict(response), + ) + + +if __name__ == "__main__": + main() From c19378446434393b5de1d4681d3b948fcfc2bb66 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Sun, 18 Apr 2021 14:15:20 +0300 Subject: [PATCH 02/17] fix: fix for python 2.6 related errors --- aws_msk_config.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/aws_msk_config.py b/aws_msk_config.py index 5b3cbd9e492..1e5f253501b 100644 --- a/aws_msk_config.py +++ b/aws_msk_config.py @@ -118,9 +118,15 @@ def prop_to_dict(p): """convert properties to dictionary""" if len(p) == 0: return {} - return { - k.strip(): v.strip() for k, v in (i.split("=") for i in p.decode().split("\n")) - } + r_dict = {} + for s in p.decode().split("\n"): + kv = s.split("=") + r_dict[kv[0].strip()] = kv[1].strip() + return r_dict + # python >= 2.7 is required: + # return { + # k.strip(): v.strip() for k, v in (i.split("=") for i in p.decode().split("\n")) + # } @AWSRetry.backoff(tries=5, delay=5) @@ -192,7 +198,10 @@ def create_config(client, module): ) # compare configurations (description and properties) and update if required - prop_module = {str(k): str(v) for k, v in module.params.get("config").items()} + # prop_module = {str(k): str(v) for k, v in module.params.get("config").items()} + prop_module = {} + for k, v in module.params.get("config").items(): + prop_module[str(k)] = str(v) if prop_to_dict(response.get("ServerProperties", "")) == prop_module: if response.get("Description", "") == module.params.get("description"): return False, response From 02791792ec07528ea0c76a66cf6f3ee53265723a Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Mon, 19 Apr 2021 21:58:56 +0300 Subject: [PATCH 03/17] chore(doc): formatting Co-authored-by: Mark Chappell --- aws_msk_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_msk_config.py b/aws_msk_config.py index 1e5f253501b..259517395ad 100644 --- a/aws_msk_config.py +++ b/aws_msk_config.py @@ -21,7 +21,7 @@ - Daniil Kupchenko (@oukooveu) options: state: - description: Create (present) or delete (absent) cluster configuration. + description: Create (C(present)) or delete (C(absent)) cluster configuration. choices: ['present', 'absent'] default: 'present' type: str From 76e24149950ff6f1a26263a690b00fbece30ab1b Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Mon, 19 Apr 2021 22:02:21 +0300 Subject: [PATCH 04/17] fix: misprint in config sample Co-authored-by: Mark Chappell --- aws_msk_config.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aws_msk_config.py b/aws_msk_config.py index 259517395ad..2fbb66262d2 100644 --- a/aws_msk_config.py +++ b/aws_msk_config.py @@ -57,10 +57,10 @@ - 2.6.0 - 2.6.1 config: - auto.create.topics.enable=false - num.partitions=1 - default.replication.factor=3 - zookeeper.session.timeout.ms=18000 + auto.create.topics.enable: false + num.partitions: 1 + default.replication.factor: 3 + zookeeper.session.timeout.ms: 18000 - aws_msk_config: name: kafka-cluster-configuration From add677330d5c0ab0f85ba189a7fcac8de401c2fa Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Mon, 19 Apr 2021 22:08:17 +0300 Subject: [PATCH 05/17] chore: jittered_backoff instead of backoff --- aws_msk_cluster.py | 4 ++-- aws_msk_config.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index f4d9a5db84f..cfecbc9b16a 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -281,13 +281,13 @@ ) -@AWSRetry.backoff(tries=5, delay=5) +@AWSRetry.jittered_backoff(retries=5, delay=5) def list_clusters_with_backoff(client, cluster_name): paginator = client.get_paginator("list_clusters") return paginator.paginate(ClusterNameFilter=cluster_name).build_full_result() -@AWSRetry.backoff(tries=5, delay=5) +@AWSRetry.jittered_backoff(retries=5, delay=5) def list_nodes_with_backoff(client, cluster_arn): paginator = client.get_paginator("list_nodes") return paginator.paginate(ClusterArn=cluster_arn).build_full_result() diff --git a/aws_msk_config.py b/aws_msk_config.py index 2fbb66262d2..e2ce939268b 100644 --- a/aws_msk_config.py +++ b/aws_msk_config.py @@ -129,7 +129,7 @@ def prop_to_dict(p): # } -@AWSRetry.backoff(tries=5, delay=5) +@AWSRetry.jittered_backoff(retries=5, delay=5) def get_configurations_with_backoff(client): paginator = client.get_paginator("list_configurations") return paginator.paginate().build_full_result() From f991ce76d39b7ac942363f3a6e82e05066717ee7 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Mon, 19 Apr 2021 22:11:44 +0300 Subject: [PATCH 06/17] chore: retry decorator for client invocation --- aws_msk_cluster.py | 2 +- aws_msk_config.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index cfecbc9b16a..a94405fb64d 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -776,7 +776,7 @@ def main(): supports_check_mode=True ) - client = module.client("kafka") + client = module.client("kafka", retry_decorator=AWSRetry.jittered_backoff()) if module.params["state"] == "present": if len(module.params["subnets"]) < 2: diff --git a/aws_msk_config.py b/aws_msk_config.py index e2ce939268b..5952a970f8c 100644 --- a/aws_msk_config.py +++ b/aws_msk_config.py @@ -272,7 +272,7 @@ def main(): ) ) - client = module.client("kafka") + client = module.client("kafka", retry_decorator=AWSRetry.jittered_backoff()) if module.params["state"] == "present": changed, response = create_config(client, module) From e2b6ccea65eb3d223d29adb8a8f0730ee7995ab9 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Mon, 19 Apr 2021 22:20:03 +0300 Subject: [PATCH 07/17] chore: aws_retry for aws calls --- aws_msk_cluster.py | 16 ++++++++-------- aws_msk_config.py | 8 +++++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index a94405fb64d..d47ce2145ae 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -304,7 +304,7 @@ def find_cluster_by_name(client, module, cluster_name): def get_cluster_state(client, module, arn): try: - response = client.describe_cluster(ClusterArn=arn) + response = client.describe_cluster(ClusterArn=arn, aws_retry=True) except client.exceptions.NotFoundException: return "DELETED" except ( @@ -317,7 +317,7 @@ def get_cluster_state(client, module, arn): def get_cluster_version(client, module, arn): try: - response = client.describe_cluster(ClusterArn=arn) + response = client.describe_cluster(ClusterArn=arn, aws_retry=True) except ( botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError, @@ -481,7 +481,7 @@ def create_or_update_cluster(client, module): create_params = prepare_create_options(module) try: - response = client.create_cluster(**create_params) + response = client.create_cluster(**create_params, aws_retry=True) except ( botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError, @@ -627,7 +627,7 @@ def update_cluster_tags(client, module, arn): purge_tags = module.params.get('purge_tags') try: - existing_tags = client.list_tags_for_resource(ResourceArn=arn)['Tags'] + existing_tags = client.list_tags_for_resource(ResourceArn=arn, aws_retry=True)['Tags'] except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: module.fail_json_aws(e, msg="Unable to retrieve tags for cluster '{0}'".format(arn)) @@ -636,9 +636,9 @@ def update_cluster_tags(client, module, arn): if not module.check_mode: try: if tags_to_remove: - client.untag_resource(ResourceArn=arn, TagKeys=tags_to_remove) + client.untag_resource(ResourceArn=arn, TagKeys=tags_to_remove, aws_retry=True) if tags_to_add: - client.tag_resource(ResourceArn=arn, Tags=tags_to_add) + client.tag_resource(ResourceArn=arn, Tags=tags_to_add, aws_retry=True) except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: module.fail_json_aws(e, msg="Unable to set tags for cluster '{0}'".format(arn)) @@ -795,11 +795,11 @@ def main(): bootstrap_broker_string = {} if response.get("ClusterArn") and module.params["state"] == "present": try: - cluster_info = client.describe_cluster(ClusterArn=response["ClusterArn"])[ + cluster_info = client.describe_cluster(ClusterArn=response["ClusterArn"], aws_retry=True)[ "ClusterInfo" ] if cluster_info.get("State") == "ACTIVE": - brokers = client.get_bootstrap_brokers(ClusterArn=response["ClusterArn"]) + brokers = client.get_bootstrap_brokers(ClusterArn=response["ClusterArn"], aws_retry=True) if brokers.get("BootstrapBrokerString"): bootstrap_broker_string["plain"] = brokers["BootstrapBrokerString"] if brokers.get("BootstrapBrokerStringTls"): diff --git a/aws_msk_config.py b/aws_msk_config.py index 5952a970f8c..5cbe5727c41 100644 --- a/aws_msk_config.py +++ b/aws_msk_config.py @@ -183,6 +183,7 @@ def create_config(client, module): Description=module.params.get("description"), KafkaVersions=module.params.get("kafka_versions"), ServerProperties=dict_to_prop(module.params.get("config")).encode(), + aws_retry=True ) except ( botocore.exceptions.BotoCoreError, @@ -194,7 +195,7 @@ def create_config(client, module): else: # it's required because 'config' doesn't contain 'ServerProperties' response = client.describe_configuration_revision( - Arn=config["Arn"], Revision=config["LatestRevision"]["Revision"] + Arn=config["Arn"], Revision=config["LatestRevision"]["Revision"], aws_retry=True ) # compare configurations (description and properties) and update if required @@ -214,6 +215,7 @@ def create_config(client, module): Arn=config["Arn"], Description=module.params.get("description"), ServerProperties=dict_to_prop(module.params.get("config")).encode(), + aws_retry=True ) except ( botocore.exceptions.BotoCoreError, @@ -224,7 +226,7 @@ def create_config(client, module): arn = response["Arn"] revision = response["LatestRevision"]["Revision"] - result = client.describe_configuration_revision(Arn=arn, Revision=revision) + result = client.describe_configuration_revision(Arn=arn, Revision=revision, aws_retry=True) return True, result @@ -242,7 +244,7 @@ def delete_config(client, module): if config: try: - response = client.delete_configuration(Arn=config["Arn"]) + response = client.delete_configuration(Arn=config["Arn"], aws_retry=True) except ( botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError, From 835e4085b857504966cc116b6f35be8ca3e9650f Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Mon, 19 Apr 2021 22:37:13 +0300 Subject: [PATCH 08/17] fix: missing try/except for api call --- aws_msk_config.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/aws_msk_config.py b/aws_msk_config.py index 5cbe5727c41..8c9da121287 100644 --- a/aws_msk_config.py +++ b/aws_msk_config.py @@ -166,6 +166,16 @@ def find_active_config(client, module): return None +def get_configuration_revision(client, module, arn, revision): + try: + return client.describe_configuration_revision(Arn=arn, Revision=revision, aws_retry=True) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "failed to describe kafka configuration revision") + + def create_config(client, module): """create new or update existing configuration""" @@ -194,9 +204,7 @@ def create_config(client, module): # update existing configuration (creates new revision) else: # it's required because 'config' doesn't contain 'ServerProperties' - response = client.describe_configuration_revision( - Arn=config["Arn"], Revision=config["LatestRevision"]["Revision"], aws_retry=True - ) + response = get_configuration_revision(client, module, arn=config["Arn"], revision=config["LatestRevision"]["Revision"]) # compare configurations (description and properties) and update if required # prop_module = {str(k): str(v) for k, v in module.params.get("config").items()} @@ -226,7 +234,7 @@ def create_config(client, module): arn = response["Arn"] revision = response["LatestRevision"]["Revision"] - result = client.describe_configuration_revision(Arn=arn, Revision=revision, aws_retry=True) + result = get_configuration_revision(client, module, arn=arn, revision=revision) return True, result From 88f92625b336c0d9967e374a40536475f8cf4ffc Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Mon, 19 Apr 2021 23:07:39 +0300 Subject: [PATCH 09/17] chore: dedicated function for changes comparison --- aws_msk_config.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/aws_msk_config.py b/aws_msk_config.py index 8c9da121287..aa4d425f73c 100644 --- a/aws_msk_config.py +++ b/aws_msk_config.py @@ -176,6 +176,21 @@ def get_configuration_revision(client, module, arn, revision): module.fail_json_aws(e, "failed to describe kafka configuration revision") +def is_configuration_changed(module, current): + """ + compare configuration's description and properties + python 2.7+ version: + prop_module = {str(k): str(v) for k, v in module.params.get("config").items()} + """ + prop_module = {} + for k, v in module.params.get("config").items(): + prop_module[str(k)] = str(v) + if prop_to_dict(current.get("ServerProperties", "")) == prop_module: + if current.get("Description", "") == module.params.get("description"): + return False + return True + + def create_config(client, module): """create new or update existing configuration""" @@ -206,14 +221,8 @@ def create_config(client, module): # it's required because 'config' doesn't contain 'ServerProperties' response = get_configuration_revision(client, module, arn=config["Arn"], revision=config["LatestRevision"]["Revision"]) - # compare configurations (description and properties) and update if required - # prop_module = {str(k): str(v) for k, v in module.params.get("config").items()} - prop_module = {} - for k, v in module.params.get("config").items(): - prop_module[str(k)] = str(v) - if prop_to_dict(response.get("ServerProperties", "")) == prop_module: - if response.get("Description", "") == module.params.get("description"): - return False, response + if not is_configuration_changed(module, response): + return False, response if module.check_mode: return True, {} From a55ef6bb05eeaa4960e6792cb3187465dbf80b24 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Tue, 20 Apr 2021 00:56:32 +0300 Subject: [PATCH 10/17] fix: parameters order --- aws_msk_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index d47ce2145ae..6efdde656c4 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -481,7 +481,7 @@ def create_or_update_cluster(client, module): create_params = prepare_create_options(module) try: - response = client.create_cluster(**create_params, aws_retry=True) + response = client.create_cluster(aws_retry=True, **create_params) except ( botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError, From e060bd310bc412a33b66d181ba8da4c490692c00 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Wed, 12 May 2021 10:51:22 +0300 Subject: [PATCH 11/17] chore: bump version_added --- aws_msk_cluster.py | 2 +- aws_msk_config.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index 6efdde656c4..0ce40761831 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -11,7 +11,7 @@ --- module: aws_msk_cluster short_description: Manage Amazon MSK clusters. -version_added: "1.5.0" +version_added: "2.0.0" requirements: - botocore >= 1.17.42 - boto3 >= 1.17.9 diff --git a/aws_msk_config.py b/aws_msk_config.py index aa4d425f73c..c02769152a5 100644 --- a/aws_msk_config.py +++ b/aws_msk_config.py @@ -11,7 +11,7 @@ --- module: aws_msk_config short_description: Manage Amazon MSK cluster configurations. -version_added: "1.5.0" +version_added: "2.0.0" requirements: - botocore >= 1.17.42 - boto3 >= 1.17.9 From 4f845bc5c8baee88300fb117ad634a7bb9300195 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Tue, 22 Jun 2021 14:24:46 +0300 Subject: [PATCH 12/17] chore(doc): highlighting in description Co-authored-by: Mark Chappell --- aws_msk_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index 0ce40761831..acc6d0f8c26 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -21,7 +21,7 @@ - Daniil Kupchenko (@oukooveu) options: state: - description: Create (present) or delete (absent) cluster. + description: Create (C(present)) or delete (C(absent)) cluster. choices: ['present', 'absent'] type: str default: 'present' From 00ad611d2abaa3f0637f25e57390ff80f89f6d2e Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Tue, 22 Jun 2021 14:25:59 +0300 Subject: [PATCH 13/17] chore(doc): dedicated note about us-east-1e zone Co-authored-by: Mark Chappell --- aws_msk_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index acc6d0f8c26..e092db97ac8 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -70,7 +70,7 @@ - The list of subnets to connect to in the client virtual private cloud (VPC). AWS creates elastic network interfaces inside these subnets. Client applications use elastic network interfaces to produce and consume data. - Client subnets can't be in Availability Zone us-east-1e. + - Client subnets can't be in Availability Zone us-east-1e. - This parameter is required when I(state=present). type: list elements: str From 41e82b908a33e38762cfe82c690a76f07f4ee775 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Tue, 22 Jun 2021 14:26:32 +0300 Subject: [PATCH 14/17] chore(doc): reformatting Co-authored-by: Mark Chappell --- aws_msk_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index e092db97ac8..e5e62bf4b5a 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -198,7 +198,7 @@ type: bool default: false wait_timeout: - description: How long to wait, seconds. Cluster creation can take up to 20-30 minutes. + description: How many seconds to wait. Cluster creation can take up to 20-30 minutes. type: int default: 3600 tags: From 6adf6216c9afdc5be043ecffdbed66ed6304b995 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Tue, 22 Jun 2021 14:27:23 +0300 Subject: [PATCH 15/17] chore(doc): wait_for_cluster_state explanation Co-authored-by: Mark Chappell --- aws_msk_cluster.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index e5e62bf4b5a..81aae367b2a 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -327,6 +327,7 @@ def get_cluster_version(client, module, arn): def wait_for_cluster_state(client, module, arn, state="ACTIVE"): + # As of 2021-06 boto3 doesn't offer any built in waiters start = time.time() timeout = int(module.params.get("wait_timeout")) check_interval = 60 From ec6377233dd686ed2e2be6a90f05b30fb7cd7644 Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Tue, 22 Jun 2021 14:32:01 +0300 Subject: [PATCH 16/17] chore: catch exception for find_cluster_by_name --- aws_msk_cluster.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index 81aae367b2a..3b6794e689d 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -294,7 +294,13 @@ def list_nodes_with_backoff(client, cluster_arn): def find_cluster_by_name(client, module, cluster_name): - cluster_list = list_clusters_with_backoff(client, cluster_name).get("ClusterInfoList", []) + try: + cluster_list = list_clusters_with_backoff(client, cluster_name).get("ClusterInfoList", []) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "Failed to find kafka cluster by name") if cluster_list: if len(cluster_list) != 1: module.fail_json(msg="Found more than one cluster with name '{0}'".format(cluster_name)) From e1019160f9fd966a12cb0c9c4a53508b48e2ae5e Mon Sep 17 00:00:00 2001 From: Daniil Kupchenko Date: Sun, 27 Jun 2021 00:51:25 +0300 Subject: [PATCH 17/17] fix: cluster name should not exceed 64 characters limit --- aws_msk_cluster.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/aws_msk_cluster.py b/aws_msk_cluster.py index 3b6794e689d..7f85c00a59b 100644 --- a/aws_msk_cluster.py +++ b/aws_msk_cluster.py @@ -794,6 +794,10 @@ def main(): module.fail_json( msg="The number of broker nodes must be a multiple of availability zones in the subnets parameter" ) + if len(module.params["name"]) > 64: + module.fail_json( + module.fail_json(msg='Cluster name "{0}" exceeds 64 character limit'.format(module.params["name"])) + ) changed, response = create_or_update_cluster(client, module) elif module.params["state"] == "absent": changed, response = delete_cluster(client, module)