From dec0f075ef228b4734aabb43c72da9dd0fab62d0 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Sat, 7 Dec 2024 19:44:28 -0500 Subject: [PATCH 01/14] feat: kafka --- moto/backend_index.py | 1 + moto/backends.py | 4 + moto/kafka/__init__.py | 1 + moto/kafka/exceptions.py | 3 + moto/kafka/models.py | 329 ++++++++++++++++++++++++ moto/kafka/responses.py | 146 +++++++++++ moto/kafka/urls.py | 14 + moto/resourcegroupstaggingapi/models.py | 61 ++++- tests/test_kafka/__init__.py | 0 tests/test_kafka/test_kafka.py | 146 +++++++++++ 10 files changed, 691 insertions(+), 14 deletions(-) create mode 100644 moto/kafka/__init__.py create mode 100644 moto/kafka/exceptions.py create mode 100644 moto/kafka/models.py create mode 100644 moto/kafka/responses.py create mode 100644 moto/kafka/urls.py create mode 100644 tests/test_kafka/__init__.py create mode 100644 tests/test_kafka/test_kafka.py diff --git a/moto/backend_index.py b/moto/backend_index.py index 781aeeb2affc..8b4452cb755a 100644 --- a/moto/backend_index.py +++ b/moto/backend_index.py @@ -96,6 +96,7 @@ ("iotdata", re.compile("https?://data\\.iot\\.(.+)\\.amazonaws.com")), ("iotdata", re.compile("https?://data-ats\\.iot\\.(.+)\\.amazonaws.com")), ("ivs", re.compile("https?://ivs\\.(.+)\\.amazonaws\\.com")), + ("kafka", re.compile("https?://kafka\\.(.+)\\.amazonaws\\.com")), ("kinesis", re.compile("https?://kinesis\\.(.+)\\.amazonaws\\.com")), ("kinesis", re.compile("https?://(.+)\\.control-kinesis\\.(.+)\\.amazonaws\\.com")), ("kinesis", re.compile("https?://(.+)\\.data-kinesis\\.(.+)\\.amazonaws\\.com")), diff --git a/moto/backends.py b/moto/backends.py index 4142a8ad659d..e81b7d53dae5 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -80,6 +80,7 @@ from moto.iot.models import IoTBackend from moto.iotdata.models import IoTDataPlaneBackend from moto.ivs.models import IVSBackend + from moto.kafka.models import KafkaBackend from moto.kinesis.models import KinesisBackend from moto.kinesisvideo.models import KinesisVideoBackend from moto.kinesisvideoarchivedmedia.models import KinesisVideoArchivedMediaBackend @@ -258,6 +259,7 @@ def get_service_from_url(url: str) -> Optional[str]: "Literal['iot']", "Literal['iot-data']", "Literal['ivs']", + "Literal['kafka']", "Literal['kinesis']", "Literal['kinesisvideo']", "Literal['kinesis-video-archived-media']", @@ -528,6 +530,8 @@ def get_backend(name: "Literal['iot-data']") -> "BackendDict[IoTDataPlaneBackend @overload def get_backend(name: "Literal['ivs']") -> "BackendDict[IVSBackend]": ... @overload +def get_backend(name: "Literal['kafka']") -> "BackendDict[KafkaBackend]": ... +@overload def get_backend(name: "Literal['kinesis']") -> "BackendDict[KinesisBackend]": ... @overload def get_backend( diff --git a/moto/kafka/__init__.py b/moto/kafka/__init__.py new file mode 100644 index 000000000000..420b9c1f2511 --- /dev/null +++ b/moto/kafka/__init__.py @@ -0,0 +1 @@ +from .models import kafka_backends #noqa: F401 \ No newline at end of file diff --git a/moto/kafka/exceptions.py b/moto/kafka/exceptions.py new file mode 100644 index 000000000000..a8a17a04326f --- /dev/null +++ b/moto/kafka/exceptions.py @@ -0,0 +1,3 @@ +"""Exceptions raised by the kafka service.""" +from moto.core.exceptions import JsonRESTError + diff --git a/moto/kafka/models.py b/moto/kafka/models.py new file mode 100644 index 000000000000..73aad77acb0a --- /dev/null +++ b/moto/kafka/models.py @@ -0,0 +1,329 @@ +"""KafkaBackend class with methods for supported APIs.""" + +import uuid +from moto.core.base_backend import BaseBackend, BackendDict +from moto.core.common_models import BaseModel +from moto.utilities.utils import get_partition +from typing import Any, Dict, List, Optional, Tuple +from ..utilities.tagging_service import TaggingService +from datetime import datetime + + +class FakeKafkaCluster(BaseModel): + def __init__( + self, + cluster_name: str, + account_id: str, + region_name: str, + cluster_type: str, + tags: dict = None, + broker_node_group_info: dict = None, + kafka_version: str = None, + number_of_broker_nodes: int = None, + configuration_info=None, + serverless_config: dict = None, + encryption_info: dict = None, + enhanced_monitoring: str = "DEFAULT", + open_monitoring: dict = None, + logging_info: dict = None, + storage_mode: str = "LOCAL", + current_version: str = "1.0", + client_authentication: dict = None, + state: str = "CREATING", + active_operation_arn: str = None, + zookeeper_connect_string: str = None, + zookeeper_connect_string_tls: str = None, + ): + # General attributes + self.cluster_id = str(uuid.uuid4()) + self.cluster_name = cluster_name + self.account_id = account_id + self.region_name = region_name + self.cluster_type = cluster_type + self.tags = tags or {} + self.state = state + self.creation_time = datetime.now().isoformat() + self.current_version = current_version + self.active_operation_arn = active_operation_arn + self.arn = self._generate_arn() + + # Attributes specific to PROVISIONED clusters + self.broker_node_group_info = broker_node_group_info + self.kafka_version = kafka_version + self.number_of_broker_nodes = number_of_broker_nodes + self.configuration_info = configuration_info + self.encryption_info = encryption_info + self.enhanced_monitoring = enhanced_monitoring + self.open_monitoring = open_monitoring + self.logging_info = logging_info + self.storage_mode = storage_mode + self.client_authentication = client_authentication + self.zookeeper_connect_string = zookeeper_connect_string + self.zookeeper_connect_string_tls = zookeeper_connect_string_tls + + # Attributes specific to SERVERLESS clusters + self.serverless_config = serverless_config + + def _generate_arn(self) -> str: + resource_type = "cluster" if self.cluster_type == "PROVISIONED" else "serverless-cluster" + partition = get_partition(self.region_name) + return f"arn:{partition}:kafka:{self.region_name}:{self.account_id}:{resource_type}/{self.cluster_id}" + + def to_dict(self) -> dict: + cluster_info = { + "ClusterName": self.cluster_name, + "ClusterArn": self.arn, + "ClusterType": self.cluster_type, + "State": self.state, + "CreationTime": self.creation_time, + "CurrentVersion": self.current_version, + "Tags": self.tags, + "ActiveOperationArn": self.active_operation_arn, + } + + if self.cluster_type == "PROVISIONED": + cluster_info["Provisioned"] = { + "BrokerNodeGroupInfo": self.broker_node_group_info, + "KafkaVersion": self.kafka_version, + "NumberOfBrokerNodes": self.number_of_broker_nodes, + "EncryptionInfo": self.encryption_info, + "EnhancedMonitoring": self.enhanced_monitoring, + "OpenMonitoring": self.open_monitoring, + "LoggingInfo": self.logging_info, + "StorageMode": self.storage_mode, + "ClientAuthentication": self.client_authentication, + "ZookeeperConnectString": self.zookeeper_connect_string, + "ZookeeperConnectStringTls": self.zookeeper_connect_string_tls, + } + + elif self.cluster_type == "SERVERLESS": + cluster_info["Serverless"] = { + "VpcConfigs": self.serverless_config.get("VpcConfigs", []), + "ClientAuthentication": self.serverless_config.get("ClientAuthentication", {}), + } + + return cluster_info + + +class KafkaBackend(BaseBackend): + """Implementation of Kafka APIs.""" + + def __init__(self, region_name, account_id): + super().__init__(region_name, account_id) + self.clusters: Dict[str, FakeKafkaCluster] = {} + self.tagger = TaggingService() + + def create_cluster_v2(self, cluster_name, tags, provisioned, serverless): + if provisioned: + cluster_type = "PROVISIONED" + broker_node_group_info = provisioned.get("BrokerNodeGroupInfo") + kafka_version = provisioned.get( + "KafkaVersion", "default-kafka-version") + number_of_broker_nodes = provisioned.get("NumberOfBrokerNodes", 1) + storage_mode = provisioned.get("StorageMode", "LOCAL") + serverless_config = None + elif serverless: + cluster_type = "SERVERLESS" + broker_node_group_info = None + kafka_version = None + number_of_broker_nodes = None + storage_mode = None + serverless_config = serverless + + new_cluster = FakeKafkaCluster( + cluster_name=cluster_name, + account_id=self.account_id, + region_name=self.region_name, + cluster_type=cluster_type, + broker_node_group_info=broker_node_group_info, + kafka_version=kafka_version, + number_of_broker_nodes=number_of_broker_nodes, + serverless_config=serverless_config, + tags=tags, + state="CREATING", + storage_mode=storage_mode, + current_version="1.0", + ) + + self.clusters[new_cluster.arn] = new_cluster + + if tags: + tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] + self.tagger.tag_resource(new_cluster.arn, tags_list) + + return new_cluster.arn, new_cluster.cluster_name, new_cluster.state, new_cluster.cluster_type + + def describe_cluster_v2(self, cluster_arn): + + cluster = self.clusters.get(cluster_arn) + + cluster_info = { + "ClusterInfo": { + "ActiveOperationArn": cluster.active_operation_arn or "arn:aws:kafka:region:account-id:operation/active-operation", + "ClusterArn": cluster.arn, + "ClusterName": cluster.cluster_name, + "ClusterType": cluster.cluster_type, + "CreationTime": cluster.creation_time, + "CurrentVersion": cluster.current_version, + "State": cluster.state, + "StateInfo": { + "Code": "string", + "Message": "Cluster state details.", + }, + "Tags": self.list_tags_for_resource(cluster.arn), + } + } + + if cluster.cluster_type == "PROVISIONED": + cluster_info["ClusterInfo"].update({ + "BrokerNodeGroupInfo": cluster.broker_node_group_info or {}, + "ClientAuthentication": cluster.client_authentication or {}, + "CurrentBrokerSoftwareInfo": { + "ConfigurationArn": (cluster.configuration_info or {}).get("Arn", "string"), + "ConfigurationRevision": (cluster.configuration_info or {}).get("Revision", 1), + "KafkaVersion": cluster.kafka_version, + }, + "EncryptionInfo": cluster.encryption_info or {}, + "EnhancedMonitoring": cluster.enhanced_monitoring, + "OpenMonitoring": cluster.open_monitoring or {}, + "LoggingInfo": cluster.logging_info or {}, + "NumberOfBrokerNodes": cluster.number_of_broker_nodes or 0, + "ZookeeperConnectString": cluster.zookeeper_connect_string or "zookeeper.example.com:2181", + "ZookeeperConnectStringTls": cluster.zookeeper_connect_string_tls or "zookeeper.example.com:2181", + "StorageMode": cluster.storage_mode, + "CustomerActionStatus": "NONE", + }) + + elif cluster.cluster_type == "SERVERLESS": + cluster_info["ClusterInfo"].update({ + "Serverless": { + "VpcConfigs": cluster.serverless_config.get("VpcConfigs", []), + "ClientAuthentication": cluster.serverless_config.get("ClientAuthentication", {}), + } + }) + + return cluster_info + + def list_clusters_v2(self, cluster_name_filter, cluster_type_filter, max_results, next_token): + cluster_info_list = [ + { + "ClusterArn": cluster.arn, + "ClusterName": cluster.cluster_name, + "ClusterType": cluster.cluster_type, + "State": cluster.state, + "CreationTime": cluster.creation_time, + } + for cluster in self.clusters.values() + ] + + return cluster_info_list, None + + def create_cluster( + self, + broker_node_group_info, + client_authentication, + cluster_name, + configuration_info=None, + encryption_info=None, + enhanced_monitoring="DEFAULT", + open_monitoring=None, + kafka_version="2.8.1", + logging_info=None, + number_of_broker_nodes=1, + tags=None, + storage_mode="LOCAL", + ): + new_cluster = FakeKafkaCluster( + cluster_name=cluster_name, + account_id=self.account_id, + region_name=self.region_name, + cluster_type="PROVISIONED", + broker_node_group_info=broker_node_group_info, + client_authentication=client_authentication, + kafka_version=kafka_version, + number_of_broker_nodes=number_of_broker_nodes, + configuration_info=configuration_info, + encryption_info=encryption_info, + enhanced_monitoring=enhanced_monitoring, + open_monitoring=open_monitoring, + logging_info=logging_info, + storage_mode=storage_mode, + ) + + self.clusters[new_cluster.arn] = new_cluster + + if tags: + tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] + self.tagger.tag_resource(new_cluster.arn, tags_list) + + return new_cluster.arn, new_cluster.cluster_name, new_cluster.state + + def describe_cluster(self, cluster_arn): + cluster = self.clusters.get(cluster_arn) + + return { + "ClusterInfo": { + "ActiveOperationArn": cluster.active_operation_arn or "arn:aws:kafka:region:account-id:operation/active-operation", + "BrokerNodeGroupInfo": cluster.broker_node_group_info or {}, + "ClientAuthentication": cluster.client_authentication or {}, + "ClusterArn": cluster.arn, + "ClusterName": cluster.cluster_name, + "CreationTime": cluster.creation_time, + "CurrentBrokerSoftwareInfo": { + "ConfigurationArn": (cluster.configuration_info or {}).get("Arn", "string"), + "ConfigurationRevision": (cluster.configuration_info or {}).get("Revision", 1), + "KafkaVersion": cluster.kafka_version, + }, + "CurrentVersion": cluster.current_version, + "EncryptionInfo": cluster.encryption_info or {}, + "EnhancedMonitoring": cluster.enhanced_monitoring, + "OpenMonitoring": cluster.open_monitoring or {}, + "LoggingInfo": cluster.logging_info or {}, + "NumberOfBrokerNodes": cluster.number_of_broker_nodes or 0, + "State": cluster.state, + "StateInfo": { + "Code": "string", + "Message": "Cluster state details.", + }, + "Tags": self.list_tags_for_resource(cluster.arn), + "ZookeeperConnectString": cluster.zookeeper_connect_string or "zookeeper.example.com:2181", + "ZookeeperConnectStringTls": cluster.zookeeper_connect_string_tls or "zookeeper.example.com:2181", + "StorageMode": cluster.storage_mode, + "CustomerActionStatus": "NONE", + } + } + + def list_clusters(self, cluster_name_filter, max_results, next_token) -> List[Dict[str, Any]]: + + cluster_info_list = [ + { + "ClusterArn": cluster.arn, + "ClusterName": cluster.cluster_name, + "State": cluster.state, + "CreationTime": cluster.creation_time, + "ClusterType": cluster.cluster_type, + } + for cluster_arn, cluster in self.clusters.items() + ] + + return cluster_info_list, None + + def delete_cluster(self, cluster_arn, current_version): + cluster = self.clusters.pop(cluster_arn) + return cluster_arn, cluster.state + + def list_tags_for_resource(self, resource_arn) -> List[Dict[str, str]]: + tags = self.tagger.get_tag_dict_for_resource(resource_arn) + Tags = [] + for key, value in tags.items(): + Tags.append({"Key": key, "Value": value}) + return Tags + + def tag_resource(self, resource_arn, tags): + self.tagger.tag_resource(resource_arn, tags) + + def untag_resource(self, resource_arn, tag_keys): + self.tagger.untag_resource_using_names(resource_arn, tag_keys) + + +kafka_backends = BackendDict(KafkaBackend, "kafka") diff --git a/moto/kafka/responses.py b/moto/kafka/responses.py new file mode 100644 index 000000000000..0f7c96a12555 --- /dev/null +++ b/moto/kafka/responses.py @@ -0,0 +1,146 @@ +"""Handles incoming kafka requests, invokes methods, returns responses.""" +import json + +from moto.core.responses import BaseResponse +from .models import KafkaBackend, kafka_backends + + +class KafkaResponse(BaseResponse): + """Handler for Kafka requests and responses.""" + + def __init__(self): + super().__init__(service_name="kafka") + + @property + def kafka_backend(self) -> KafkaBackend: + """Return backend instance specific for this region.""" + return kafka_backends[self.current_account][self.region] + + def create_cluster_v2(self): + params = self._get_params() + cluster_name = self._get_param("clusterName") + tags = self._get_param("tags") + provisioned = self._get_param("provisioned") + serverless = self._get_param("serverless") + cluster_arn, cluster_name, state, cluster_type = self.kafka_backend.create_cluster_v2( + cluster_name=cluster_name, + tags=tags, + provisioned=provisioned, + serverless=serverless, + ) + return json.dumps(dict(clusterArn=cluster_arn, clusterName=cluster_name, state=state, clusterType=cluster_type)) + + def describe_cluster_v2(self): + params = self._get_params() + cluster_arn = self._get_param("clusterArn") + cluster_info = self.kafka_backend.describe_cluster_v2( + cluster_arn=cluster_arn, + ) + return json.dumps(dict(clusterInfo=cluster_info)) + + def list_clusters_v2(self): + params = self._get_params() + cluster_name_filter = self._get_param("clusterNameFilter") + cluster_type_filter = self._get_param("clusterTypeFilter") + max_results = self._get_param("maxResults") + next_token = self._get_param("nextToken") + cluster_info_list, next_token = self.kafka_backend.list_clusters_v2( + cluster_name_filter=cluster_name_filter, + cluster_type_filter=cluster_type_filter, + max_results=max_results, + next_token=next_token, + ) + return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token)) + + def list_tags_for_resource(self): + params = self._get_params() + resource_arn = self._get_param("resourceArn") + tags = self.kafka_backend.list_tags_for_resource( + resource_arn=resource_arn, + ) + return json.dumps(dict(tags=tags)) + + def tag_resource(self): + params = self._get_params() + resource_arn = self._get_param("resourceArn") + tags = self._get_param("tags") + self.kafka_backend.tag_resource( + resource_arn=resource_arn, + tags=tags, + ) + return json.dumps(dict()) + + def untag_resource(self): + params = self._get_params() + resource_arn = self._get_param("resourceArn") + tag_keys = self._get_param("tagKeys") + self.kafka_backend.untag_resource( + resource_arn=resource_arn, + tag_keys=tag_keys, + ) + return json.dumps(dict()) + + def create_cluster(self): + params = self._get_params() + broker_node_group_info = self._get_param("brokerNodeGroupInfo") + client_authentication = self._get_param("clientAuthentication") + cluster_name = self._get_param("clusterName") + configuration_info = self._get_param("configurationInfo") + encryption_info = self._get_param("encryptionInfo") + enhanced_monitoring = self._get_param("enhancedMonitoring") + open_monitoring = self._get_param("openMonitoring") + kafka_version = self._get_param("kafkaVersion") + logging_info = self._get_param("loggingInfo") + number_of_broker_nodes = self._get_param("numberOfBrokerNodes") + tags = self._get_param("tags") + storage_mode = self._get_param("storageMode") + cluster_arn, cluster_name, state = self.kafka_backend.create_cluster( + broker_node_group_info=broker_node_group_info, + client_authentication=client_authentication, + cluster_name=cluster_name, + configuration_info=configuration_info, + encryption_info=encryption_info, + enhanced_monitoring=enhanced_monitoring, + open_monitoring=open_monitoring, + kafka_version=kafka_version, + logging_info=logging_info, + number_of_broker_nodes=number_of_broker_nodes, + tags=tags, + storage_mode=storage_mode, + ) + return json.dumps(dict(clusterArn=cluster_arn, clusterName=cluster_name, state=state)) + + def describe_cluster(self): + params = self._get_params() + cluster_arn = self._get_param("clusterArn") + cluster_info = self.kafka_backend.describe_cluster( + cluster_arn=cluster_arn, + ) + return json.dumps(dict(clusterInfo=cluster_info)) + + def delete_cluster(self): + params = self._get_params() + cluster_arn = self._get_param("clusterArn") + current_version = self._get_param("currentVersion") + cluster_arn, state = self.kafka_backend.delete_cluster( + cluster_arn=cluster_arn, + current_version=current_version, + ) + return json.dumps(dict(clusterArn=cluster_arn, state=state)) + + def list_clusters(self): + params = self._get_params() + cluster_name_filter = self._get_param("clusterNameFilter") + max_results = self._get_param("maxResults") + next_token = self._get_param("nextToken") + + cluster_info_list, next_token = self.kafka_backend.list_clusters( + cluster_name_filter=cluster_name_filter, + max_results=max_results, + next_token=next_token, + ) + + # Correctly Printing Here + print(json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token))) + + return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token)) diff --git a/moto/kafka/urls.py b/moto/kafka/urls.py new file mode 100644 index 000000000000..c8e0d76abab4 --- /dev/null +++ b/moto/kafka/urls.py @@ -0,0 +1,14 @@ +"""kafka base URL and path.""" +from .responses import KafkaResponse + +url_bases = [ + r"https?://kafka\.(.+)\.amazonaws\.com", +] + +url_paths = { + "{0}/api/v2/clusters$": KafkaResponse.dispatch, + "{0}/api/v2/clusters/(?P[^/]+)$": KafkaResponse.dispatch, + "{0}/v1/tags/(?P[^/]+)$": KafkaResponse.dispatch, + "{0}/v1/clusters$": KafkaResponse.dispatch, + "{0}/v1/clusters/(?P[^/]+)$": KafkaResponse.dispatch, +} diff --git a/moto/resourcegroupstaggingapi/models.py b/moto/resourcegroupstaggingapi/models.py index 19dc9339a8f7..eeb720676446 100644 --- a/moto/resourcegroupstaggingapi/models.py +++ b/moto/resourcegroupstaggingapi/models.py @@ -14,6 +14,7 @@ from moto.emr.models import ElasticMapReduceBackend, emr_backends from moto.glacier.models import GlacierBackend, glacier_backends from moto.glue.models import GlueBackend, glue_backends +from moto.kafka.models import KafkaBackend, kafka_backends from moto.kinesis.models import KinesisBackend, kinesis_backends from moto.kms.models import KmsBackend, kms_backends from moto.logs.models import LogsBackend, logs_backends @@ -148,6 +149,10 @@ def workspacesweb_backends(self) -> Optional[WorkSpacesWebBackend]: return workspacesweb_backends[self.account_id][self.region_name] return None + @property + def kafka_backend(self) -> KafkaBackend: + return kafka_backends[self.account_id][self.region_name] + @property def sagemaker_backend(self) -> SageMakerModelBackend: return sagemaker_backends[self.account_id][self.region_name] @@ -166,17 +171,20 @@ def _get_resources_generator( values = tag_filter_dict.get("Values", []) if len(values) == 0: # Check key matches - filters.append(lambda t, v, key=tag_filter_dict["Key"]: t == key) + filters.append( + lambda t, v, key=tag_filter_dict["Key"]: t == key) elif len(values) == 1: # Check it's exactly the same as key, value filters.append( - lambda t, v, key=tag_filter_dict["Key"], value=values[0]: t == key # type: ignore + # type: ignore + lambda t, v, key=tag_filter_dict["Key"], value=values[0]: t == key and v == value ) else: # Check key matches and value is one of the provided values filters.append( - lambda t, v, key=tag_filter_dict["Key"], vl=values: t == key # type: ignore + # type: ignore + lambda t, v, key=tag_filter_dict["Key"], vl=values: t == key and v in vl ) @@ -235,7 +243,8 @@ def format_tag_keys( or "s3:bucket" in resource_type_filters ): for bucket in self.s3_backend.buckets.values(): - tags = self.s3_backend.tagger.list_tags_for_resource(bucket.arn)["Tags"] + tags = self.s3_backend.tagger.list_tags_for_resource(bucket.arn)[ + "Tags"] if not tags or not tag_filter( tags ): # Skip if no tags, or invalid filter @@ -268,7 +277,8 @@ def format_tag_keys( if not resource_type_filters or "ecs:cluster" in resource_type_filters: for cluster in self.ecs_backend.clusters.values(): - tags = format_tag_keys(cluster.tags, ["key", "value"]) # type: ignore[arg-type] + # type: ignore[arg-type] + tags = format_tag_keys(cluster.tags, ["key", "value"]) if not tag_filter(tags): continue yield {"ResourceARN": f"{cluster.arn}", "Tags": tags} @@ -322,7 +332,8 @@ def format_tag_keys( or resource_type in resource_type_filters ): for resource in resources: - tags = format_tags(self.ec2_backend.tags.get(resource.id, {})) + tags = format_tags( + self.ec2_backend.tags.get(resource.id, {})) if not tags or not tag_filter(tags): continue yield { @@ -337,7 +348,8 @@ def format_tag_keys( or "elasticfilesystem:access-point" in resource_type_filters ): for ap in self.efs_backend.access_points.values(): - tags = self.efs_backend.list_tags_for_resource(ap.access_point_id) + tags = self.efs_backend.list_tags_for_resource( + ap.access_point_id) if not tag_filter(tags): continue yield {"ResourceARN": f"{ap.access_point_arn}", "Tags": tags} @@ -349,7 +361,8 @@ def format_tag_keys( or "elasticfilesystem:file-system" in resource_type_filters ): for fs in self.efs_backend.file_systems_by_id.values(): - tags = self.efs_backend.list_tags_for_resource(fs.file_system_id) + tags = self.efs_backend.list_tags_for_resource( + fs.file_system_id) if not tag_filter(tags): continue yield {"ResourceARN": f"{fs.file_system_arn}", "Tags": tags} @@ -436,7 +449,8 @@ def format_tag_keys( if not resource_type_filters or "kms" in resource_type_filters: for kms_key in self.kms_backend.list_keys(): tags = format_tag_keys( - self.kms_backend.list_resource_tags(kms_key.id).get("Tags", []), + self.kms_backend.list_resource_tags( + kms_key.id).get("Tags", []), ["TagKey", "TagValue"], ) if not tag_filter(tags): # Skip if no tags, or invalid filter @@ -586,6 +600,20 @@ def format_tag_keys( "Tags": tags, } + # Kafka (MSK) + if self.kafka_backend and ( + not resource_type_filters or "kafka" in resource_type_filters + ): + for cluster in self.kafka_backend.clusters.values(): + tags = self.kafka_backend.list_tags_for_resource(cluster.arn)[ + "Tags"] + if not tags or not tag_filter(tags): + continue + yield { + "ResourceARN": cluster.arn, + "Tags": tags, + } + # Workspaces Web if self.workspacesweb_backends and ( not resource_type_filters or "workspaces-web" in resource_type_filters @@ -700,7 +728,8 @@ def get_ec2_keys(res_id: str) -> List[Dict[str, str]]: # EC2 Instance, resource type ec2:instance for reservation in self.ec2_backend.reservations.values(): for instance in reservation.instances: - for key in get_ec2_keys(instance.id): # type: ignore[assignment] + # type: ignore[assignment] + for key in get_ec2_keys(instance.id): yield key # EC2 NetworkInterface, resource type ec2:network-interface @@ -760,7 +789,8 @@ def get_ec2_values(res_id: str) -> List[Dict[str, str]]: # EC2 Instance, resource type ec2:instance for reservation in self.ec2_backend.reservations.values(): for instance in reservation.instances: - for value in get_ec2_values(instance.id): # type: ignore[assignment] + # type: ignore[assignment] + for value in get_ec2_values(instance.id): yield value # EC2 NetworkInterface, resource type ec2:network-interface @@ -778,7 +808,8 @@ def get_ec2_values(res_id: str) -> List[Dict[str, str]]: # EC2 Snapshot, resource type ec2:snapshot for snapshot in self.ec2_backend.snapshots.values(): - for value in get_ec2_values(snapshot.id): # type: ignore[assignment] + # type: ignore[assignment] + for value in get_ec2_values(snapshot.id): yield value # TODO EC2 SpotInstanceRequest @@ -982,12 +1013,14 @@ def tag_resources( ): resource_id = arn.split("/")[-1] self.workspacesweb_backends.create_tags( # type: ignore[union-attr] - resource_id, TaggingService.convert_dict_to_tags_input(tags) + resource_id, TaggingService.convert_dict_to_tags_input( + tags) ) elif arn.startswith(f"arn:{get_partition(self.region_name)}:workspaces:"): resource_id = arn.split("/")[-1] self.workspaces_backend.create_tags( # type: ignore[union-attr] - resource_id, TaggingService.convert_dict_to_tags_input(tags) + resource_id, TaggingService.convert_dict_to_tags_input( + tags) ) elif arn.startswith(f"arn:{get_partition(self.region_name)}:logs:"): self.logs_backend.tag_resource(arn, tags) diff --git a/tests/test_kafka/__init__.py b/tests/test_kafka/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/test_kafka/test_kafka.py b/tests/test_kafka/test_kafka.py new file mode 100644 index 000000000000..48fb1b623272 --- /dev/null +++ b/tests/test_kafka/test_kafka.py @@ -0,0 +1,146 @@ +"""Unit tests for kafka-supported APIs.""" +import boto3 + +from moto import mock_aws + +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html + + +FAKE_TAGS = { + "TestKey": "TestValue", + "TestKey2": "TestValue2" +} + + +@mock_aws +def test_create_cluster_v2(): + client = boto3.client("kafka", region_name="ap-southeast-1") + cluster_name = "TestServerlessCluster" + + response = client.create_cluster_v2( + ClusterName=cluster_name, + Serverless={ + "VpcConfigs": [ + { + "SubnetIds": ["subnet-0123456789abcdef0"], + "SecurityGroupIds": ["sg-0123456789abcdef0"], + } + ] + }, + Tags=FAKE_TAGS, + ) + + assert response["ClusterArn"].startswith("arn:aws:kafka") + assert response["ClusterName"] == cluster_name + assert response["State"] == "CREATING" + + clusters = client.list_clusters_v2() + assert len(clusters["ClusterInfoList"]) == 1 + assert clusters["ClusterInfoList"][0]["ClusterName"] == cluster_name + assert clusters["ClusterInfoList"][0]["ClusterType"] == "SERVERLESS" + + resp = client.describe_cluster_v2(ClusterArn=response["ClusterArn"]) + cluster_info = resp["ClusterInfo"] + + assert cluster_info["ClusterName"] == cluster_name + assert cluster_info["State"] == "CREATING" + assert cluster_info["ClusterType"] == "SERVERLESS" + assert cluster_info["Serverless"]["VpcConfigs"][0]["SubnetIds"] == [ + "subnet-0123456789abcdef0" + ] + assert cluster_info["Serverless"]["VpcConfigs"][0]["SecurityGroupIds"] == [ + "sg-0123456789abcdef0" + ] + assert cluster_info["Tags"] == FAKE_TAGS + + +@mock_aws +def test_list_tags_for_resource(): + client = boto3.client("kafka", region_name="us-east-2") + create_resp = client.create_cluster( + ClusterName="TestCluster", + BrokerNodeGroupInfo={ + "InstanceType": "kafka.m5.large", + "ClientSubnets": ["subnet-0123456789abcdef0"], + "SecurityGroups": ["sg-0123456789abcdef0"], + }, + KafkaVersion="2.8.1", + NumberOfBrokerNodes=3, + Tags=FAKE_TAGS, + ) + + TempTags = {"TestKey3": "TestValue3"} + + client.tag_resource( + ResourceArn=create_resp["ClusterArn"], + Tags=TempTags, + ) + + tags = client.list_tags_for_resource(ResourceArn=create_resp["ClusterArn"]) + assert tags["Tags"] == {**FAKE_TAGS, **TempTags} + + client.untag_resource( + ResourceArn=create_resp["ClusterArn"], + TagKeys=list(TempTags.keys()), + ) + + tags = client.list_tags_for_resource(ResourceArn=create_resp["ClusterArn"]) + assert tags["Tags"] == FAKE_TAGS + + +@ mock_aws +def test_create_cluster(): + client = boto3.client("kafka", region_name="eu-west-1") + cluster_name = "TestCluster" + response = client.create_cluster( + ClusterName=cluster_name, + BrokerNodeGroupInfo={ + "InstanceType": "kafka.m5.large", + "ClientSubnets": ["subnet-0123456789abcdef0"], + "SecurityGroups": ["sg-0123456789abcdef0"], + }, + KafkaVersion="2.8.1", + NumberOfBrokerNodes=3, + Tags=FAKE_TAGS, + ) + + assert response["ClusterArn"].startswith("arn:aws:kafka") + assert response["ClusterName"] == cluster_name + assert response["State"] == "CREATING" + + clusters = client.list_clusters() + assert len(clusters["ClusterInfoList"]) == 1 + assert clusters["ClusterInfoList"][0]["ClusterName"] == cluster_name + + resp = client.describe_cluster(ClusterArn=response["ClusterArn"]) + assert resp["ClusterInfo"]["ClusterName"] == cluster_name + assert resp["ClusterInfo"]["State"] == "CREATING" + assert resp["ClusterInfo"]["KafkaVersion"] == "2.8.1" + assert resp["ClusterInfo"]["NumberOfBrokerNodes"] == 3 + assert resp["ClusterInfo"]["BrokerNodeGroupInfo"]["InstanceType"] == "kafka.m5.large" + assert resp["ClusterInfo"]["BrokerNodeGroupInfo"]["ClientSubnets"] == [ + "subnet-0123456789abcdef0"] + assert resp["ClusterInfo"]["BrokerNodeGroupInfo"]["SecurityGroups"] == [ + "sg-0123456789abcdef0"] + assert resp["ClusterInfo"]["Tags"] == FAKE_TAGS + + +@ mock_aws +def test_delete_cluster(): + client = boto3.client("kafka", region_name="us-east-2") + create_resp = client.create_cluster( + ClusterName="TestCluster", + BrokerNodeGroupInfo={ + "InstanceType": "kafka.m5.large", + "ClientSubnets": ["subnet-0123456789abcdef0"], + "SecurityGroups": ["sg-0123456789abcdef0"], + }, + KafkaVersion="2.8.1", + NumberOfBrokerNodes=3, + Tags=FAKE_TAGS, + ) + + client.delete_cluster(ClusterArn=create_resp["ClusterArn"]) + clusters = client.list_clusters() + assert len(clusters["ClusterInfoList"]) == 0 From 80710c7458b0596d1b522d6d17af5a7358d9a792 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Sat, 7 Dec 2024 19:45:48 -0500 Subject: [PATCH 02/14] fix: linting --- moto/kafka/__init__.py | 2 +- moto/kafka/exceptions.py | 2 - moto/kafka/models.py | 114 +++++++++++++++--------- moto/kafka/responses.py | 39 ++++---- moto/kafka/urls.py | 1 + moto/resourcegroupstaggingapi/models.py | 27 ++---- tests/test_kafka/test_kafka.py | 20 +++-- 7 files changed, 116 insertions(+), 89 deletions(-) diff --git a/moto/kafka/__init__.py b/moto/kafka/__init__.py index 420b9c1f2511..5788123554dc 100644 --- a/moto/kafka/__init__.py +++ b/moto/kafka/__init__.py @@ -1 +1 @@ -from .models import kafka_backends #noqa: F401 \ No newline at end of file +from .models import kafka_backends # noqa: F401 diff --git a/moto/kafka/exceptions.py b/moto/kafka/exceptions.py index a8a17a04326f..9dc38515a9b8 100644 --- a/moto/kafka/exceptions.py +++ b/moto/kafka/exceptions.py @@ -1,3 +1 @@ """Exceptions raised by the kafka service.""" -from moto.core.exceptions import JsonRESTError - diff --git a/moto/kafka/models.py b/moto/kafka/models.py index 73aad77acb0a..5cd36741d3ea 100644 --- a/moto/kafka/models.py +++ b/moto/kafka/models.py @@ -1,12 +1,14 @@ """KafkaBackend class with methods for supported APIs.""" import uuid -from moto.core.base_backend import BaseBackend, BackendDict +from datetime import datetime +from typing import Any, Dict, List + +from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.utilities.utils import get_partition -from typing import Any, Dict, List, Optional, Tuple + from ..utilities.tagging_service import TaggingService -from datetime import datetime class FakeKafkaCluster(BaseModel): @@ -65,7 +67,9 @@ def __init__( self.serverless_config = serverless_config def _generate_arn(self) -> str: - resource_type = "cluster" if self.cluster_type == "PROVISIONED" else "serverless-cluster" + resource_type = ( + "cluster" if self.cluster_type == "PROVISIONED" else "serverless-cluster" + ) partition = get_partition(self.region_name) return f"arn:{partition}:kafka:{self.region_name}:{self.account_id}:{resource_type}/{self.cluster_id}" @@ -99,7 +103,9 @@ def to_dict(self) -> dict: elif self.cluster_type == "SERVERLESS": cluster_info["Serverless"] = { "VpcConfigs": self.serverless_config.get("VpcConfigs", []), - "ClientAuthentication": self.serverless_config.get("ClientAuthentication", {}), + "ClientAuthentication": self.serverless_config.get( + "ClientAuthentication", {} + ), } return cluster_info @@ -117,8 +123,7 @@ def create_cluster_v2(self, cluster_name, tags, provisioned, serverless): if provisioned: cluster_type = "PROVISIONED" broker_node_group_info = provisioned.get("BrokerNodeGroupInfo") - kafka_version = provisioned.get( - "KafkaVersion", "default-kafka-version") + kafka_version = provisioned.get("KafkaVersion", "default-kafka-version") number_of_broker_nodes = provisioned.get("NumberOfBrokerNodes", 1) storage_mode = provisioned.get("StorageMode", "LOCAL") serverless_config = None @@ -151,15 +156,20 @@ def create_cluster_v2(self, cluster_name, tags, provisioned, serverless): tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] self.tagger.tag_resource(new_cluster.arn, tags_list) - return new_cluster.arn, new_cluster.cluster_name, new_cluster.state, new_cluster.cluster_type + return ( + new_cluster.arn, + new_cluster.cluster_name, + new_cluster.state, + new_cluster.cluster_type, + ) def describe_cluster_v2(self, cluster_arn): - cluster = self.clusters.get(cluster_arn) cluster_info = { "ClusterInfo": { - "ActiveOperationArn": cluster.active_operation_arn or "arn:aws:kafka:region:account-id:operation/active-operation", + "ActiveOperationArn": cluster.active_operation_arn + or "arn:aws:kafka:region:account-id:operation/active-operation", "ClusterArn": cluster.arn, "ClusterName": cluster.cluster_name, "ClusterType": cluster.cluster_type, @@ -175,36 +185,50 @@ def describe_cluster_v2(self, cluster_arn): } if cluster.cluster_type == "PROVISIONED": - cluster_info["ClusterInfo"].update({ - "BrokerNodeGroupInfo": cluster.broker_node_group_info or {}, - "ClientAuthentication": cluster.client_authentication or {}, - "CurrentBrokerSoftwareInfo": { - "ConfigurationArn": (cluster.configuration_info or {}).get("Arn", "string"), - "ConfigurationRevision": (cluster.configuration_info or {}).get("Revision", 1), - "KafkaVersion": cluster.kafka_version, - }, - "EncryptionInfo": cluster.encryption_info or {}, - "EnhancedMonitoring": cluster.enhanced_monitoring, - "OpenMonitoring": cluster.open_monitoring or {}, - "LoggingInfo": cluster.logging_info or {}, - "NumberOfBrokerNodes": cluster.number_of_broker_nodes or 0, - "ZookeeperConnectString": cluster.zookeeper_connect_string or "zookeeper.example.com:2181", - "ZookeeperConnectStringTls": cluster.zookeeper_connect_string_tls or "zookeeper.example.com:2181", - "StorageMode": cluster.storage_mode, - "CustomerActionStatus": "NONE", - }) + cluster_info["ClusterInfo"].update( + { + "BrokerNodeGroupInfo": cluster.broker_node_group_info or {}, + "ClientAuthentication": cluster.client_authentication or {}, + "CurrentBrokerSoftwareInfo": { + "ConfigurationArn": (cluster.configuration_info or {}).get( + "Arn", "string" + ), + "ConfigurationRevision": (cluster.configuration_info or {}).get( + "Revision", 1 + ), + "KafkaVersion": cluster.kafka_version, + }, + "EncryptionInfo": cluster.encryption_info or {}, + "EnhancedMonitoring": cluster.enhanced_monitoring, + "OpenMonitoring": cluster.open_monitoring or {}, + "LoggingInfo": cluster.logging_info or {}, + "NumberOfBrokerNodes": cluster.number_of_broker_nodes or 0, + "ZookeeperConnectString": cluster.zookeeper_connect_string + or "zookeeper.example.com:2181", + "ZookeeperConnectStringTls": cluster.zookeeper_connect_string_tls + or "zookeeper.example.com:2181", + "StorageMode": cluster.storage_mode, + "CustomerActionStatus": "NONE", + } + ) elif cluster.cluster_type == "SERVERLESS": - cluster_info["ClusterInfo"].update({ - "Serverless": { - "VpcConfigs": cluster.serverless_config.get("VpcConfigs", []), - "ClientAuthentication": cluster.serverless_config.get("ClientAuthentication", {}), + cluster_info["ClusterInfo"].update( + { + "Serverless": { + "VpcConfigs": cluster.serverless_config.get("VpcConfigs", []), + "ClientAuthentication": cluster.serverless_config.get( + "ClientAuthentication", {} + ), + } } - }) + ) return cluster_info - def list_clusters_v2(self, cluster_name_filter, cluster_type_filter, max_results, next_token): + def list_clusters_v2( + self, cluster_name_filter, cluster_type_filter, max_results, next_token + ): cluster_info_list = [ { "ClusterArn": cluster.arn, @@ -263,15 +287,20 @@ def describe_cluster(self, cluster_arn): return { "ClusterInfo": { - "ActiveOperationArn": cluster.active_operation_arn or "arn:aws:kafka:region:account-id:operation/active-operation", + "ActiveOperationArn": cluster.active_operation_arn + or "arn:aws:kafka:region:account-id:operation/active-operation", "BrokerNodeGroupInfo": cluster.broker_node_group_info or {}, "ClientAuthentication": cluster.client_authentication or {}, "ClusterArn": cluster.arn, "ClusterName": cluster.cluster_name, "CreationTime": cluster.creation_time, "CurrentBrokerSoftwareInfo": { - "ConfigurationArn": (cluster.configuration_info or {}).get("Arn", "string"), - "ConfigurationRevision": (cluster.configuration_info or {}).get("Revision", 1), + "ConfigurationArn": (cluster.configuration_info or {}).get( + "Arn", "string" + ), + "ConfigurationRevision": (cluster.configuration_info or {}).get( + "Revision", 1 + ), "KafkaVersion": cluster.kafka_version, }, "CurrentVersion": cluster.current_version, @@ -286,15 +315,18 @@ def describe_cluster(self, cluster_arn): "Message": "Cluster state details.", }, "Tags": self.list_tags_for_resource(cluster.arn), - "ZookeeperConnectString": cluster.zookeeper_connect_string or "zookeeper.example.com:2181", - "ZookeeperConnectStringTls": cluster.zookeeper_connect_string_tls or "zookeeper.example.com:2181", + "ZookeeperConnectString": cluster.zookeeper_connect_string + or "zookeeper.example.com:2181", + "ZookeeperConnectStringTls": cluster.zookeeper_connect_string_tls + or "zookeeper.example.com:2181", "StorageMode": cluster.storage_mode, "CustomerActionStatus": "NONE", } } - def list_clusters(self, cluster_name_filter, max_results, next_token) -> List[Dict[str, Any]]: - + def list_clusters( + self, cluster_name_filter, max_results, next_token + ) -> List[Dict[str, Any]]: cluster_info_list = [ { "ClusterArn": cluster.arn, diff --git a/moto/kafka/responses.py b/moto/kafka/responses.py index 0f7c96a12555..55aa586312f9 100644 --- a/moto/kafka/responses.py +++ b/moto/kafka/responses.py @@ -1,7 +1,9 @@ """Handles incoming kafka requests, invokes methods, returns responses.""" + import json from moto.core.responses import BaseResponse + from .models import KafkaBackend, kafka_backends @@ -17,21 +19,28 @@ def kafka_backend(self) -> KafkaBackend: return kafka_backends[self.current_account][self.region] def create_cluster_v2(self): - params = self._get_params() cluster_name = self._get_param("clusterName") tags = self._get_param("tags") provisioned = self._get_param("provisioned") serverless = self._get_param("serverless") - cluster_arn, cluster_name, state, cluster_type = self.kafka_backend.create_cluster_v2( - cluster_name=cluster_name, - tags=tags, - provisioned=provisioned, - serverless=serverless, + cluster_arn, cluster_name, state, cluster_type = ( + self.kafka_backend.create_cluster_v2( + cluster_name=cluster_name, + tags=tags, + provisioned=provisioned, + serverless=serverless, + ) + ) + return json.dumps( + dict( + clusterArn=cluster_arn, + clusterName=cluster_name, + state=state, + clusterType=cluster_type, + ) ) - return json.dumps(dict(clusterArn=cluster_arn, clusterName=cluster_name, state=state, clusterType=cluster_type)) def describe_cluster_v2(self): - params = self._get_params() cluster_arn = self._get_param("clusterArn") cluster_info = self.kafka_backend.describe_cluster_v2( cluster_arn=cluster_arn, @@ -39,7 +48,6 @@ def describe_cluster_v2(self): return json.dumps(dict(clusterInfo=cluster_info)) def list_clusters_v2(self): - params = self._get_params() cluster_name_filter = self._get_param("clusterNameFilter") cluster_type_filter = self._get_param("clusterTypeFilter") max_results = self._get_param("maxResults") @@ -53,7 +61,6 @@ def list_clusters_v2(self): return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token)) def list_tags_for_resource(self): - params = self._get_params() resource_arn = self._get_param("resourceArn") tags = self.kafka_backend.list_tags_for_resource( resource_arn=resource_arn, @@ -61,7 +68,6 @@ def list_tags_for_resource(self): return json.dumps(dict(tags=tags)) def tag_resource(self): - params = self._get_params() resource_arn = self._get_param("resourceArn") tags = self._get_param("tags") self.kafka_backend.tag_resource( @@ -71,7 +77,6 @@ def tag_resource(self): return json.dumps(dict()) def untag_resource(self): - params = self._get_params() resource_arn = self._get_param("resourceArn") tag_keys = self._get_param("tagKeys") self.kafka_backend.untag_resource( @@ -81,7 +86,6 @@ def untag_resource(self): return json.dumps(dict()) def create_cluster(self): - params = self._get_params() broker_node_group_info = self._get_param("brokerNodeGroupInfo") client_authentication = self._get_param("clientAuthentication") cluster_name = self._get_param("clusterName") @@ -108,10 +112,11 @@ def create_cluster(self): tags=tags, storage_mode=storage_mode, ) - return json.dumps(dict(clusterArn=cluster_arn, clusterName=cluster_name, state=state)) + return json.dumps( + dict(clusterArn=cluster_arn, clusterName=cluster_name, state=state) + ) def describe_cluster(self): - params = self._get_params() cluster_arn = self._get_param("clusterArn") cluster_info = self.kafka_backend.describe_cluster( cluster_arn=cluster_arn, @@ -119,7 +124,6 @@ def describe_cluster(self): return json.dumps(dict(clusterInfo=cluster_info)) def delete_cluster(self): - params = self._get_params() cluster_arn = self._get_param("clusterArn") current_version = self._get_param("currentVersion") cluster_arn, state = self.kafka_backend.delete_cluster( @@ -129,7 +133,6 @@ def delete_cluster(self): return json.dumps(dict(clusterArn=cluster_arn, state=state)) def list_clusters(self): - params = self._get_params() cluster_name_filter = self._get_param("clusterNameFilter") max_results = self._get_param("maxResults") next_token = self._get_param("nextToken") @@ -141,6 +144,6 @@ def list_clusters(self): ) # Correctly Printing Here - print(json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token))) + # print(json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token))) return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token)) diff --git a/moto/kafka/urls.py b/moto/kafka/urls.py index c8e0d76abab4..233c6616eab5 100644 --- a/moto/kafka/urls.py +++ b/moto/kafka/urls.py @@ -1,4 +1,5 @@ """kafka base URL and path.""" + from .responses import KafkaResponse url_bases = [ diff --git a/moto/resourcegroupstaggingapi/models.py b/moto/resourcegroupstaggingapi/models.py index eeb720676446..c6185a0e9f43 100644 --- a/moto/resourcegroupstaggingapi/models.py +++ b/moto/resourcegroupstaggingapi/models.py @@ -171,8 +171,7 @@ def _get_resources_generator( values = tag_filter_dict.get("Values", []) if len(values) == 0: # Check key matches - filters.append( - lambda t, v, key=tag_filter_dict["Key"]: t == key) + filters.append(lambda t, v, key=tag_filter_dict["Key"]: t == key) elif len(values) == 1: # Check it's exactly the same as key, value filters.append( @@ -243,8 +242,7 @@ def format_tag_keys( or "s3:bucket" in resource_type_filters ): for bucket in self.s3_backend.buckets.values(): - tags = self.s3_backend.tagger.list_tags_for_resource(bucket.arn)[ - "Tags"] + tags = self.s3_backend.tagger.list_tags_for_resource(bucket.arn)["Tags"] if not tags or not tag_filter( tags ): # Skip if no tags, or invalid filter @@ -332,8 +330,7 @@ def format_tag_keys( or resource_type in resource_type_filters ): for resource in resources: - tags = format_tags( - self.ec2_backend.tags.get(resource.id, {})) + tags = format_tags(self.ec2_backend.tags.get(resource.id, {})) if not tags or not tag_filter(tags): continue yield { @@ -348,8 +345,7 @@ def format_tag_keys( or "elasticfilesystem:access-point" in resource_type_filters ): for ap in self.efs_backend.access_points.values(): - tags = self.efs_backend.list_tags_for_resource( - ap.access_point_id) + tags = self.efs_backend.list_tags_for_resource(ap.access_point_id) if not tag_filter(tags): continue yield {"ResourceARN": f"{ap.access_point_arn}", "Tags": tags} @@ -361,8 +357,7 @@ def format_tag_keys( or "elasticfilesystem:file-system" in resource_type_filters ): for fs in self.efs_backend.file_systems_by_id.values(): - tags = self.efs_backend.list_tags_for_resource( - fs.file_system_id) + tags = self.efs_backend.list_tags_for_resource(fs.file_system_id) if not tag_filter(tags): continue yield {"ResourceARN": f"{fs.file_system_arn}", "Tags": tags} @@ -449,8 +444,7 @@ def format_tag_keys( if not resource_type_filters or "kms" in resource_type_filters: for kms_key in self.kms_backend.list_keys(): tags = format_tag_keys( - self.kms_backend.list_resource_tags( - kms_key.id).get("Tags", []), + self.kms_backend.list_resource_tags(kms_key.id).get("Tags", []), ["TagKey", "TagValue"], ) if not tag_filter(tags): # Skip if no tags, or invalid filter @@ -605,8 +599,7 @@ def format_tag_keys( not resource_type_filters or "kafka" in resource_type_filters ): for cluster in self.kafka_backend.clusters.values(): - tags = self.kafka_backend.list_tags_for_resource(cluster.arn)[ - "Tags"] + tags = self.kafka_backend.list_tags_for_resource(cluster.arn)["Tags"] if not tags or not tag_filter(tags): continue yield { @@ -1013,14 +1006,12 @@ def tag_resources( ): resource_id = arn.split("/")[-1] self.workspacesweb_backends.create_tags( # type: ignore[union-attr] - resource_id, TaggingService.convert_dict_to_tags_input( - tags) + resource_id, TaggingService.convert_dict_to_tags_input(tags) ) elif arn.startswith(f"arn:{get_partition(self.region_name)}:workspaces:"): resource_id = arn.split("/")[-1] self.workspaces_backend.create_tags( # type: ignore[union-attr] - resource_id, TaggingService.convert_dict_to_tags_input( - tags) + resource_id, TaggingService.convert_dict_to_tags_input(tags) ) elif arn.startswith(f"arn:{get_partition(self.region_name)}:logs:"): self.logs_backend.tag_resource(arn, tags) diff --git a/tests/test_kafka/test_kafka.py b/tests/test_kafka/test_kafka.py index 48fb1b623272..94d0af97f63c 100644 --- a/tests/test_kafka/test_kafka.py +++ b/tests/test_kafka/test_kafka.py @@ -1,4 +1,5 @@ """Unit tests for kafka-supported APIs.""" + import boto3 from moto import mock_aws @@ -7,10 +8,7 @@ # http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html -FAKE_TAGS = { - "TestKey": "TestValue", - "TestKey2": "TestValue2" -} +FAKE_TAGS = {"TestKey": "TestValue", "TestKey2": "TestValue2"} @mock_aws @@ -89,7 +87,7 @@ def test_list_tags_for_resource(): assert tags["Tags"] == FAKE_TAGS -@ mock_aws +@mock_aws def test_create_cluster(): client = boto3.client("kafka", region_name="eu-west-1") cluster_name = "TestCluster" @@ -118,15 +116,19 @@ def test_create_cluster(): assert resp["ClusterInfo"]["State"] == "CREATING" assert resp["ClusterInfo"]["KafkaVersion"] == "2.8.1" assert resp["ClusterInfo"]["NumberOfBrokerNodes"] == 3 - assert resp["ClusterInfo"]["BrokerNodeGroupInfo"]["InstanceType"] == "kafka.m5.large" + assert ( + resp["ClusterInfo"]["BrokerNodeGroupInfo"]["InstanceType"] == "kafka.m5.large" + ) assert resp["ClusterInfo"]["BrokerNodeGroupInfo"]["ClientSubnets"] == [ - "subnet-0123456789abcdef0"] + "subnet-0123456789abcdef0" + ] assert resp["ClusterInfo"]["BrokerNodeGroupInfo"]["SecurityGroups"] == [ - "sg-0123456789abcdef0"] + "sg-0123456789abcdef0" + ] assert resp["ClusterInfo"]["Tags"] == FAKE_TAGS -@ mock_aws +@mock_aws def test_delete_cluster(): client = boto3.client("kafka", region_name="us-east-2") create_resp = client.create_cluster( From e29a08d6d66178d20d9981640e915698bba90247 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Sun, 8 Dec 2024 22:00:13 -0500 Subject: [PATCH 03/14] fix: fixed tests --- moto/kafka/models.py | 182 ++++++++++++++++----------------- moto/kafka/responses.py | 15 +-- tests/test_kafka/test_kafka.py | 5 +- 3 files changed, 97 insertions(+), 105 deletions(-) diff --git a/moto/kafka/models.py b/moto/kafka/models.py index 5cd36741d3ea..7665ef42e1fd 100644 --- a/moto/kafka/models.py +++ b/moto/kafka/models.py @@ -123,7 +123,8 @@ def create_cluster_v2(self, cluster_name, tags, provisioned, serverless): if provisioned: cluster_type = "PROVISIONED" broker_node_group_info = provisioned.get("BrokerNodeGroupInfo") - kafka_version = provisioned.get("KafkaVersion", "default-kafka-version") + kafka_version = provisioned.get( + "kafkaVersion", "default-kafka-version") number_of_broker_nodes = provisioned.get("NumberOfBrokerNodes", 1) storage_mode = provisioned.get("StorageMode", "LOCAL") serverless_config = None @@ -153,8 +154,7 @@ def create_cluster_v2(self, cluster_name, tags, provisioned, serverless): self.clusters[new_cluster.arn] = new_cluster if tags: - tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] - self.tagger.tag_resource(new_cluster.arn, tags_list) + self.tag_resource(new_cluster.arn, tags) return ( new_cluster.arn, @@ -164,61 +164,58 @@ def create_cluster_v2(self, cluster_name, tags, provisioned, serverless): ) def describe_cluster_v2(self, cluster_arn): - cluster = self.clusters.get(cluster_arn) + cluster = self.clusters[cluster_arn] cluster_info = { - "ClusterInfo": { - "ActiveOperationArn": cluster.active_operation_arn - or "arn:aws:kafka:region:account-id:operation/active-operation", - "ClusterArn": cluster.arn, - "ClusterName": cluster.cluster_name, - "ClusterType": cluster.cluster_type, - "CreationTime": cluster.creation_time, - "CurrentVersion": cluster.current_version, - "State": cluster.state, - "StateInfo": { - "Code": "string", - "Message": "Cluster state details.", - }, - "Tags": self.list_tags_for_resource(cluster.arn), - } + "activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation", + "clusterArn": cluster.arn, + "clusterName": cluster.cluster_name, + "clusterType": cluster.cluster_type, + "creationTime": cluster.creation_time, + "currentVersion": cluster.current_version, + "state": cluster.state, + "stateInfo": { + "code": "string", + "message": "Cluster state details.", + }, + "tags": self.list_tags_for_resource(cluster.arn), } if cluster.cluster_type == "PROVISIONED": - cluster_info["ClusterInfo"].update( + cluster_info.update( { - "BrokerNodeGroupInfo": cluster.broker_node_group_info or {}, - "ClientAuthentication": cluster.client_authentication or {}, - "CurrentBrokerSoftwareInfo": { - "ConfigurationArn": (cluster.configuration_info or {}).get( - "Arn", "string" + "brokerNodeGroupInfo": cluster.broker_node_group_info or {}, + "clientAuthentication": cluster.client_authentication or {}, + "currentBrokerSoftwareInfo": { + "configurationArn": (cluster.configuration_info or {}).get( + "arn", "string" ), - "ConfigurationRevision": (cluster.configuration_info or {}).get( + "configurationRevision": (cluster.configuration_info or {}).get( "Revision", 1 ), - "KafkaVersion": cluster.kafka_version, + "kafkaVersion": cluster.kafka_version, }, - "EncryptionInfo": cluster.encryption_info or {}, - "EnhancedMonitoring": cluster.enhanced_monitoring, - "OpenMonitoring": cluster.open_monitoring or {}, - "LoggingInfo": cluster.logging_info or {}, - "NumberOfBrokerNodes": cluster.number_of_broker_nodes or 0, - "ZookeeperConnectString": cluster.zookeeper_connect_string + "encryptionInfo": cluster.encryption_info or {}, + "enhancedMonitoring": cluster.enhanced_monitoring, + "openMonitoring": cluster.open_monitoring or {}, + "loggingInfo": cluster.logging_info or {}, + "numberOfBrokerNodes": cluster.number_of_broker_nodes or 0, + "zookeeperConnectString": cluster.zookeeper_connect_string or "zookeeper.example.com:2181", - "ZookeeperConnectStringTls": cluster.zookeeper_connect_string_tls + "zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls or "zookeeper.example.com:2181", - "StorageMode": cluster.storage_mode, - "CustomerActionStatus": "NONE", + "storageMode": cluster.storage_mode, + "customerActionStatus": "NONE", } ) elif cluster.cluster_type == "SERVERLESS": - cluster_info["ClusterInfo"].update( + cluster_info.update( { - "Serverless": { - "VpcConfigs": cluster.serverless_config.get("VpcConfigs", []), - "ClientAuthentication": cluster.serverless_config.get( - "ClientAuthentication", {} + "serverless": { + "vpcConfigs": cluster.serverless_config.get("vpcConfigs", []), + "clientAuthentication": cluster.serverless_config.get( + "clientAuthentication", {} ), } } @@ -231,11 +228,11 @@ def list_clusters_v2( ): cluster_info_list = [ { - "ClusterArn": cluster.arn, - "ClusterName": cluster.cluster_name, - "ClusterType": cluster.cluster_type, - "State": cluster.state, - "CreationTime": cluster.creation_time, + "clusterArn": cluster.arn, + "clusterName": cluster.cluster_name, + "clusterType": cluster.cluster_type, + "state": cluster.state, + "creationTime": cluster.creation_time, } for cluster in self.clusters.values() ] @@ -277,51 +274,47 @@ def create_cluster( self.clusters[new_cluster.arn] = new_cluster if tags: - tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] - self.tagger.tag_resource(new_cluster.arn, tags_list) + self.tag_resource(new_cluster.arn, tags) return new_cluster.arn, new_cluster.cluster_name, new_cluster.state def describe_cluster(self, cluster_arn): - cluster = self.clusters.get(cluster_arn) + cluster = self.clusters[cluster_arn] return { - "ClusterInfo": { - "ActiveOperationArn": cluster.active_operation_arn - or "arn:aws:kafka:region:account-id:operation/active-operation", - "BrokerNodeGroupInfo": cluster.broker_node_group_info or {}, - "ClientAuthentication": cluster.client_authentication or {}, - "ClusterArn": cluster.arn, - "ClusterName": cluster.cluster_name, - "CreationTime": cluster.creation_time, - "CurrentBrokerSoftwareInfo": { - "ConfigurationArn": (cluster.configuration_info or {}).get( - "Arn", "string" - ), - "ConfigurationRevision": (cluster.configuration_info or {}).get( - "Revision", 1 - ), - "KafkaVersion": cluster.kafka_version, - }, - "CurrentVersion": cluster.current_version, - "EncryptionInfo": cluster.encryption_info or {}, - "EnhancedMonitoring": cluster.enhanced_monitoring, - "OpenMonitoring": cluster.open_monitoring or {}, - "LoggingInfo": cluster.logging_info or {}, - "NumberOfBrokerNodes": cluster.number_of_broker_nodes or 0, - "State": cluster.state, - "StateInfo": { - "Code": "string", - "Message": "Cluster state details.", - }, - "Tags": self.list_tags_for_resource(cluster.arn), - "ZookeeperConnectString": cluster.zookeeper_connect_string - or "zookeeper.example.com:2181", - "ZookeeperConnectStringTls": cluster.zookeeper_connect_string_tls - or "zookeeper.example.com:2181", - "StorageMode": cluster.storage_mode, - "CustomerActionStatus": "NONE", - } + "activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation", + "brokerNodeGroupInfo": cluster.broker_node_group_info or {}, + "clientAuthentication": cluster.client_authentication or {}, + "clusterArn": cluster.arn, + "clusterName": cluster.cluster_name, + "creationTime": cluster.creation_time, + "currentBrokerSoftwareInfo": { + "configurationArn": (cluster.configuration_info or {}).get( + "arn", "string" + ), + "configurationRevision": (cluster.configuration_info or {}).get( + "Revision", 1 + ), + "kafkaVersion": cluster.kafka_version, + }, + "currentVersion": cluster.current_version, + "encryptionInfo": cluster.encryption_info or {}, + "enhancedMonitoring": cluster.enhanced_monitoring, + "openMonitoring": cluster.open_monitoring or {}, + "loggingInfo": cluster.logging_info or {}, + "numberOfBrokerNodes": cluster.number_of_broker_nodes or 0, + "state": cluster.state, + "stateInfo": { + "code": "string", + "message": "Cluster state details.", + }, + "tags": self.list_tags_for_resource(cluster.arn), + "zookeeperConnectString": cluster.zookeeper_connect_string + or "zookeeper.example.com:2181", + "zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls + or "zookeeper.example.com:2181", + "storageMode": cluster.storage_mode, + "customerActionStatus": "NONE", } def list_clusters( @@ -329,11 +322,11 @@ def list_clusters( ) -> List[Dict[str, Any]]: cluster_info_list = [ { - "ClusterArn": cluster.arn, - "ClusterName": cluster.cluster_name, - "State": cluster.state, - "CreationTime": cluster.creation_time, - "ClusterType": cluster.cluster_type, + "clusterArn": cluster.arn, + "clusterName": cluster.cluster_name, + "state": cluster.state, + "creationTime": cluster.creation_time, + "clusterType": cluster.cluster_type, } for cluster_arn, cluster in self.clusters.items() ] @@ -345,14 +338,11 @@ def delete_cluster(self, cluster_arn, current_version): return cluster_arn, cluster.state def list_tags_for_resource(self, resource_arn) -> List[Dict[str, str]]: - tags = self.tagger.get_tag_dict_for_resource(resource_arn) - Tags = [] - for key, value in tags.items(): - Tags.append({"Key": key, "Value": value}) - return Tags + return self.tagger.get_tag_dict_for_resource(resource_arn) def tag_resource(self, resource_arn, tags): - self.tagger.tag_resource(resource_arn, tags) + tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] + self.tagger.tag_resource(resource_arn, tags_list) def untag_resource(self, resource_arn, tag_keys): self.tagger.untag_resource_using_names(resource_arn, tag_keys) diff --git a/moto/kafka/responses.py b/moto/kafka/responses.py index 55aa586312f9..5014404ddb66 100644 --- a/moto/kafka/responses.py +++ b/moto/kafka/responses.py @@ -1,6 +1,7 @@ """Handles incoming kafka requests, invokes methods, returns responses.""" import json +from urllib.parse import unquote from moto.core.responses import BaseResponse @@ -41,7 +42,7 @@ def create_cluster_v2(self): ) def describe_cluster_v2(self): - cluster_arn = self._get_param("clusterArn") + cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1]) cluster_info = self.kafka_backend.describe_cluster_v2( cluster_arn=cluster_arn, ) @@ -61,14 +62,14 @@ def list_clusters_v2(self): return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token)) def list_tags_for_resource(self): - resource_arn = self._get_param("resourceArn") + resource_arn = unquote(self.parsed_url.path.split("/tags/")[-1]) tags = self.kafka_backend.list_tags_for_resource( resource_arn=resource_arn, ) return json.dumps(dict(tags=tags)) def tag_resource(self): - resource_arn = self._get_param("resourceArn") + resource_arn = unquote(self._get_param("resourceArn")) tags = self._get_param("tags") self.kafka_backend.tag_resource( resource_arn=resource_arn, @@ -77,8 +78,8 @@ def tag_resource(self): return json.dumps(dict()) def untag_resource(self): - resource_arn = self._get_param("resourceArn") - tag_keys = self._get_param("tagKeys") + resource_arn = unquote(self._get_param("resourceArn")) + tag_keys = self.__dict__["data"]["tagKeys"] self.kafka_backend.untag_resource( resource_arn=resource_arn, tag_keys=tag_keys, @@ -117,14 +118,14 @@ def create_cluster(self): ) def describe_cluster(self): - cluster_arn = self._get_param("clusterArn") + cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1]) cluster_info = self.kafka_backend.describe_cluster( cluster_arn=cluster_arn, ) return json.dumps(dict(clusterInfo=cluster_info)) def delete_cluster(self): - cluster_arn = self._get_param("clusterArn") + cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1]) current_version = self._get_param("currentVersion") cluster_arn, state = self.kafka_backend.delete_cluster( cluster_arn=cluster_arn, diff --git a/tests/test_kafka/test_kafka.py b/tests/test_kafka/test_kafka.py index 94d0af97f63c..f689e89934b1 100644 --- a/tests/test_kafka/test_kafka.py +++ b/tests/test_kafka/test_kafka.py @@ -80,10 +80,11 @@ def test_list_tags_for_resource(): client.untag_resource( ResourceArn=create_resp["ClusterArn"], - TagKeys=list(TempTags.keys()), + TagKeys=["TestKey3"], ) tags = client.list_tags_for_resource(ResourceArn=create_resp["ClusterArn"]) + assert tags["Tags"] == FAKE_TAGS @@ -114,7 +115,7 @@ def test_create_cluster(): resp = client.describe_cluster(ClusterArn=response["ClusterArn"]) assert resp["ClusterInfo"]["ClusterName"] == cluster_name assert resp["ClusterInfo"]["State"] == "CREATING" - assert resp["ClusterInfo"]["KafkaVersion"] == "2.8.1" + assert resp["ClusterInfo"]["CurrentBrokerSoftwareInfo"]["KafkaVersion"] == "2.8.1" assert resp["ClusterInfo"]["NumberOfBrokerNodes"] == 3 assert ( resp["ClusterInfo"]["BrokerNodeGroupInfo"]["InstanceType"] == "kafka.m5.large" From 1b6d09e3e8d6cfe43a4020797710fd64907c2104 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Sun, 8 Dec 2024 22:55:04 -0500 Subject: [PATCH 04/14] linting --- moto/kafka/models.py | 86 +++++++++++++++++++++++------------------ moto/kafka/responses.py | 25 ++++++------ 2 files changed, 59 insertions(+), 52 deletions(-) diff --git a/moto/kafka/models.py b/moto/kafka/models.py index 7665ef42e1fd..334d175c9302 100644 --- a/moto/kafka/models.py +++ b/moto/kafka/models.py @@ -2,7 +2,7 @@ import uuid from datetime import datetime -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, Tuple from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel @@ -18,23 +18,23 @@ def __init__( account_id: str, region_name: str, cluster_type: str, - tags: dict = None, - broker_node_group_info: dict = None, - kafka_version: str = None, - number_of_broker_nodes: int = None, - configuration_info=None, - serverless_config: dict = None, - encryption_info: dict = None, + tags: Optional[Dict[str, str]] = None, + broker_node_group_info: Optional[Dict[str, Any]] = None, + kafka_version: Optional[str] = None, + number_of_broker_nodes: Optional[int] = None, + configuration_info: Optional[Dict[str, Any]] = None, + serverless_config: Optional[Dict[str, Any]] = None, + encryption_info: Optional[Dict[str, Any]] = None, enhanced_monitoring: str = "DEFAULT", - open_monitoring: dict = None, - logging_info: dict = None, + open_monitoring: Optional[Dict[str, Any]] = None, + logging_info: Optional[Dict[str, Any]] = None, storage_mode: str = "LOCAL", current_version: str = "1.0", - client_authentication: dict = None, + client_authentication: Optional[Dict[str, Any]] = None, state: str = "CREATING", - active_operation_arn: str = None, - zookeeper_connect_string: str = None, - zookeeper_connect_string_tls: str = None, + active_operation_arn: Optional[str] = None, + zookeeper_connect_string: Optional[str] = None, + zookeeper_connect_string_tls: Optional[str] = None, ): # General attributes self.cluster_id = str(uuid.uuid4()) @@ -73,7 +73,7 @@ def _generate_arn(self) -> str: partition = get_partition(self.region_name) return f"arn:{partition}:kafka:{self.region_name}:{self.account_id}:{resource_type}/{self.cluster_id}" - def to_dict(self) -> dict: + def to_dict(self) -> Dict[str, Any]: cluster_info = { "ClusterName": self.cluster_name, "ClusterArn": self.arn, @@ -102,10 +102,10 @@ def to_dict(self) -> dict: elif self.cluster_type == "SERVERLESS": cluster_info["Serverless"] = { - "VpcConfigs": self.serverless_config.get("VpcConfigs", []), + "VpcConfigs": self.serverless_config.get("VpcConfigs", []) if self.serverless_config else [], "ClientAuthentication": self.serverless_config.get( "ClientAuthentication", {} - ), + ) if self.serverless_config else {}, } return cluster_info @@ -114,12 +114,18 @@ def to_dict(self) -> dict: class KafkaBackend(BaseBackend): """Implementation of Kafka APIs.""" - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.clusters: Dict[str, FakeKafkaCluster] = {} self.tagger = TaggingService() - def create_cluster_v2(self, cluster_name, tags, provisioned, serverless): + def create_cluster_v2( + self, + cluster_name: str, + tags: Optional[Dict[str, str]], + provisioned: Optional[Dict[str, Optional[str | Any]]], + serverless: Optional[Dict[str, Optional[str | Any]]], + ): if provisioned: cluster_type = "PROVISIONED" broker_node_group_info = provisioned.get("BrokerNodeGroupInfo") @@ -163,7 +169,7 @@ def create_cluster_v2(self, cluster_name, tags, provisioned, serverless): new_cluster.cluster_type, ) - def describe_cluster_v2(self, cluster_arn): + def describe_cluster_v2(self, cluster_arn: str): cluster = self.clusters[cluster_arn] cluster_info = { @@ -224,7 +230,11 @@ def describe_cluster_v2(self, cluster_arn): return cluster_info def list_clusters_v2( - self, cluster_name_filter, cluster_type_filter, max_results, next_token + self, + cluster_name_filter: Optional[str], + cluster_type_filter: Optional[str], + max_results: Optional[int], + next_token: Optional[str], ): cluster_info_list = [ { @@ -241,18 +251,18 @@ def list_clusters_v2( def create_cluster( self, - broker_node_group_info, - client_authentication, - cluster_name, - configuration_info=None, - encryption_info=None, - enhanced_monitoring="DEFAULT", - open_monitoring=None, - kafka_version="2.8.1", - logging_info=None, - number_of_broker_nodes=1, - tags=None, - storage_mode="LOCAL", + broker_node_group_info: Dict[str, Any], + client_authentication: Optional[Dict[str, Any]], + cluster_name: str, + configuration_info: Optional[Dict[str, Any]] = None, + encryption_info: Optional[Dict[str, Any]] = None, + enhanced_monitoring: str = "DEFAULT", + open_monitoring: Optional[Dict[str, Any]] = None, + kafka_version: str = "2.8.1", + logging_info: Optional[Dict[str, Any]] = None, + number_of_broker_nodes: int = 1, + tags: Optional[Dict[str, str]] = None, + storage_mode: str = "LOCAL", ): new_cluster = FakeKafkaCluster( cluster_name=cluster_name, @@ -278,7 +288,7 @@ def create_cluster( return new_cluster.arn, new_cluster.cluster_name, new_cluster.state - def describe_cluster(self, cluster_arn): + def describe_cluster(self, cluster_arn: str): cluster = self.clusters[cluster_arn] return { @@ -333,18 +343,18 @@ def list_clusters( return cluster_info_list, None - def delete_cluster(self, cluster_arn, current_version): + def delete_cluster(self, cluster_arn: str, current_version: str): cluster = self.clusters.pop(cluster_arn) return cluster_arn, cluster.state - def list_tags_for_resource(self, resource_arn) -> List[Dict[str, str]]: + def list_tags_for_resource(self, resource_arn: str) -> List[Dict[str, str]]: return self.tagger.get_tag_dict_for_resource(resource_arn) - def tag_resource(self, resource_arn, tags): + def tag_resource(self, resource_arn: str, tags: Dict[str, str]): tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] self.tagger.tag_resource(resource_arn, tags_list) - def untag_resource(self, resource_arn, tag_keys): + def untag_resource(self, resource_arn: str, tag_keys: List[str]): self.tagger.untag_resource_using_names(resource_arn, tag_keys) diff --git a/moto/kafka/responses.py b/moto/kafka/responses.py index 5014404ddb66..2b1b133132eb 100644 --- a/moto/kafka/responses.py +++ b/moto/kafka/responses.py @@ -11,7 +11,7 @@ class KafkaResponse(BaseResponse): """Handler for Kafka requests and responses.""" - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="kafka") @property @@ -19,7 +19,7 @@ def kafka_backend(self) -> KafkaBackend: """Return backend instance specific for this region.""" return kafka_backends[self.current_account][self.region] - def create_cluster_v2(self): + def create_cluster_v2(self) -> str: cluster_name = self._get_param("clusterName") tags = self._get_param("tags") provisioned = self._get_param("provisioned") @@ -41,14 +41,14 @@ def create_cluster_v2(self): ) ) - def describe_cluster_v2(self): + def describe_cluster_v2(self) -> str: cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1]) cluster_info = self.kafka_backend.describe_cluster_v2( cluster_arn=cluster_arn, ) return json.dumps(dict(clusterInfo=cluster_info)) - def list_clusters_v2(self): + def list_clusters_v2(self) -> str: cluster_name_filter = self._get_param("clusterNameFilter") cluster_type_filter = self._get_param("clusterTypeFilter") max_results = self._get_param("maxResults") @@ -61,14 +61,14 @@ def list_clusters_v2(self): ) return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token)) - def list_tags_for_resource(self): + def list_tags_for_resource(self) -> str: resource_arn = unquote(self.parsed_url.path.split("/tags/")[-1]) tags = self.kafka_backend.list_tags_for_resource( resource_arn=resource_arn, ) return json.dumps(dict(tags=tags)) - def tag_resource(self): + def tag_resource(self) -> str: resource_arn = unquote(self._get_param("resourceArn")) tags = self._get_param("tags") self.kafka_backend.tag_resource( @@ -77,7 +77,7 @@ def tag_resource(self): ) return json.dumps(dict()) - def untag_resource(self): + def untag_resource(self) -> str: resource_arn = unquote(self._get_param("resourceArn")) tag_keys = self.__dict__["data"]["tagKeys"] self.kafka_backend.untag_resource( @@ -86,7 +86,7 @@ def untag_resource(self): ) return json.dumps(dict()) - def create_cluster(self): + def create_cluster(self) -> str: broker_node_group_info = self._get_param("brokerNodeGroupInfo") client_authentication = self._get_param("clientAuthentication") cluster_name = self._get_param("clusterName") @@ -117,14 +117,14 @@ def create_cluster(self): dict(clusterArn=cluster_arn, clusterName=cluster_name, state=state) ) - def describe_cluster(self): + def describe_cluster(self) -> str: cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1]) cluster_info = self.kafka_backend.describe_cluster( cluster_arn=cluster_arn, ) return json.dumps(dict(clusterInfo=cluster_info)) - def delete_cluster(self): + def delete_cluster(self) -> str: cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1]) current_version = self._get_param("currentVersion") cluster_arn, state = self.kafka_backend.delete_cluster( @@ -133,7 +133,7 @@ def delete_cluster(self): ) return json.dumps(dict(clusterArn=cluster_arn, state=state)) - def list_clusters(self): + def list_clusters(self) -> str: cluster_name_filter = self._get_param("clusterNameFilter") max_results = self._get_param("maxResults") next_token = self._get_param("nextToken") @@ -144,7 +144,4 @@ def list_clusters(self): next_token=next_token, ) - # Correctly Printing Here - # print(json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token))) - return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token)) From 693fa3732a9ebbefcc4494835c8c1570d4b19256 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Sun, 8 Dec 2024 22:59:23 -0500 Subject: [PATCH 05/14] linting --- moto/resourcegroupstaggingapi/models.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/moto/resourcegroupstaggingapi/models.py b/moto/resourcegroupstaggingapi/models.py index 59e6d3a10228..dfd2d992a270 100644 --- a/moto/resourcegroupstaggingapi/models.py +++ b/moto/resourcegroupstaggingapi/models.py @@ -181,14 +181,12 @@ def _get_resources_generator( elif len(values) == 1: # Check it's exactly the same as key, value filters.append( - # type: ignore - lambda t, v, key=tag_filter_dict["Key"], value=values[0]: t == key + lambda t, v, key=tag_filter_dict["Key"], value=values[0]: t == key # type: ignore and v == value ) else: # Check key matches and value is one of the provided values filters.append( - # type: ignore lambda t, v, key=tag_filter_dict["Key"], vl=values: t == key and v in vl ) From 6cbf69465c3dc44dbf8233f35963db1d5cf3d654 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Sun, 8 Dec 2024 22:59:42 -0500 Subject: [PATCH 06/14] linting --- moto/resourcegroupstaggingapi/models.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/moto/resourcegroupstaggingapi/models.py b/moto/resourcegroupstaggingapi/models.py index dfd2d992a270..d63ed619ee83 100644 --- a/moto/resourcegroupstaggingapi/models.py +++ b/moto/resourcegroupstaggingapi/models.py @@ -187,7 +187,7 @@ def _get_resources_generator( else: # Check key matches and value is one of the provided values filters.append( - lambda t, v, key=tag_filter_dict["Key"], vl=values: t == key + lambda t, v, key=tag_filter_dict["Key"], vl=values: t == key # type: ignore and v in vl ) @@ -279,8 +279,7 @@ def format_tag_keys( if not resource_type_filters or "ecs:cluster" in resource_type_filters: for cluster in self.ecs_backend.clusters.values(): - # type: ignore[arg-type] - tags = format_tag_keys(cluster.tags, ["key", "value"]) + tags = format_tag_keys(cluster.tags, ["key", "value"]) # type: ignore[arg-type] if not tag_filter(tags): continue yield {"ResourceARN": f"{cluster.arn}", "Tags": tags} @@ -743,8 +742,7 @@ def get_ec2_keys(res_id: str) -> List[Dict[str, str]]: # EC2 Instance, resource type ec2:instance for reservation in self.ec2_backend.reservations.values(): for instance in reservation.instances: - # type: ignore[assignment] - for key in get_ec2_keys(instance.id): + for key in get_ec2_keys(instance.id): # type: ignore[assignment] yield key # EC2 NetworkInterface, resource type ec2:network-interface @@ -804,8 +802,7 @@ def get_ec2_values(res_id: str) -> List[Dict[str, str]]: # EC2 Instance, resource type ec2:instance for reservation in self.ec2_backend.reservations.values(): for instance in reservation.instances: - # type: ignore[assignment] - for value in get_ec2_values(instance.id): + for value in get_ec2_values(instance.id): # type: ignore[assignment] yield value # EC2 NetworkInterface, resource type ec2:network-interface @@ -823,8 +820,7 @@ def get_ec2_values(res_id: str) -> List[Dict[str, str]]: # EC2 Snapshot, resource type ec2:snapshot for snapshot in self.ec2_backend.snapshots.values(): - # type: ignore[assignment] - for value in get_ec2_values(snapshot.id): + for value in get_ec2_values(snapshot.id): # type: ignore[assignment] yield value # TODO EC2 SpotInstanceRequest From a8a6a1687b2d262cd4ccda396b4cca8421aba666 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Sun, 8 Dec 2024 23:50:08 -0500 Subject: [PATCH 07/14] linting --- moto/kafka/models.py | 29 ++++++++++++++----------- moto/resourcegroupstaggingapi/models.py | 9 +++++--- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/moto/kafka/models.py b/moto/kafka/models.py index 334d175c9302..e00951ed3e8c 100644 --- a/moto/kafka/models.py +++ b/moto/kafka/models.py @@ -125,7 +125,7 @@ def create_cluster_v2( tags: Optional[Dict[str, str]], provisioned: Optional[Dict[str, Optional[str | Any]]], serverless: Optional[Dict[str, Optional[str | Any]]], - ): + ) -> Tuple[str, str, str, str]: if provisioned: cluster_type = "PROVISIONED" broker_node_group_info = provisioned.get("BrokerNodeGroupInfo") @@ -153,7 +153,7 @@ def create_cluster_v2( serverless_config=serverless_config, tags=tags, state="CREATING", - storage_mode=storage_mode, + storage_mode=storage_mode if storage_mode else "LOCAL", current_version="1.0", ) @@ -169,7 +169,7 @@ def create_cluster_v2( new_cluster.cluster_type, ) - def describe_cluster_v2(self, cluster_arn: str): + def describe_cluster_v2(self, cluster_arn: str) -> Dict[str, Any]: cluster = self.clusters[cluster_arn] cluster_info = { @@ -219,10 +219,10 @@ def describe_cluster_v2(self, cluster_arn: str): cluster_info.update( { "serverless": { - "vpcConfigs": cluster.serverless_config.get("vpcConfigs", []), + "vpcConfigs": cluster.serverless_config.get("vpcConfigs", []) if cluster.serverless_config else [], "clientAuthentication": cluster.serverless_config.get( "clientAuthentication", {} - ), + ) if cluster.serverless_config else {}, } } ) @@ -235,7 +235,7 @@ def list_clusters_v2( cluster_type_filter: Optional[str], max_results: Optional[int], next_token: Optional[str], - ): + ) -> Tuple[List[Dict[str, Any]], Optional[str]]: cluster_info_list = [ { "clusterArn": cluster.arn, @@ -263,7 +263,7 @@ def create_cluster( number_of_broker_nodes: int = 1, tags: Optional[Dict[str, str]] = None, storage_mode: str = "LOCAL", - ): + ) -> Tuple[str, str, str]: new_cluster = FakeKafkaCluster( cluster_name=cluster_name, account_id=self.account_id, @@ -288,7 +288,7 @@ def create_cluster( return new_cluster.arn, new_cluster.cluster_name, new_cluster.state - def describe_cluster(self, cluster_arn: str): + def describe_cluster(self, cluster_arn: str) -> Dict[str, Any]: cluster = self.clusters[cluster_arn] return { @@ -328,7 +328,10 @@ def describe_cluster(self, cluster_arn: str): } def list_clusters( - self, cluster_name_filter, max_results, next_token + self, + cluster_name_filter: Optional[str], + max_results: Optional[int], + next_token: Optional[str], ) -> List[Dict[str, Any]]: cluster_info_list = [ { @@ -343,18 +346,18 @@ def list_clusters( return cluster_info_list, None - def delete_cluster(self, cluster_arn: str, current_version: str): + def delete_cluster(self, cluster_arn: str, current_version: str) -> Tuple[str, str]: cluster = self.clusters.pop(cluster_arn) return cluster_arn, cluster.state - def list_tags_for_resource(self, resource_arn: str) -> List[Dict[str, str]]: + def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]: return self.tagger.get_tag_dict_for_resource(resource_arn) - def tag_resource(self, resource_arn: str, tags: Dict[str, str]): + def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None: tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] self.tagger.tag_resource(resource_arn, tags_list) - def untag_resource(self, resource_arn: str, tag_keys: List[str]): + def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: self.tagger.untag_resource_using_names(resource_arn, tag_keys) diff --git a/moto/resourcegroupstaggingapi/models.py b/moto/resourcegroupstaggingapi/models.py index d63ed619ee83..f056af7d16be 100644 --- a/moto/resourcegroupstaggingapi/models.py +++ b/moto/resourcegroupstaggingapi/models.py @@ -619,12 +619,15 @@ def format_tag_keys( if self.kafka_backend and ( not resource_type_filters or "kafka" in resource_type_filters ): - for cluster in self.kafka_backend.clusters.values(): - tags = self.kafka_backend.list_tags_for_resource(cluster.arn)["Tags"] + for msk_cluster in self.kafka_backend.clusters.values(): + tag_dict = self.kafka_backend.list_tags_for_resource(msk_cluster.arn) + tags = [{"Key": key, "Value": value} for key, value in tag_dict.items()] + if not tags or not tag_filter(tags): continue + yield { - "ResourceARN": cluster.arn, + "ResourceARN": msk_cluster.arn, "Tags": tags, } From 929846f866796ae2f63312af820e66cff23d1648 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 9 Dec 2024 18:07:33 -0100 Subject: [PATCH 08/14] Linting/Typing --- moto/kafka/models.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/moto/kafka/models.py b/moto/kafka/models.py index e00951ed3e8c..5d468a31af3b 100644 --- a/moto/kafka/models.py +++ b/moto/kafka/models.py @@ -102,10 +102,14 @@ def to_dict(self) -> Dict[str, Any]: elif self.cluster_type == "SERVERLESS": cluster_info["Serverless"] = { - "VpcConfigs": self.serverless_config.get("VpcConfigs", []) if self.serverless_config else [], + "VpcConfigs": self.serverless_config.get("VpcConfigs", []) + if self.serverless_config + else [], "ClientAuthentication": self.serverless_config.get( "ClientAuthentication", {} - ) if self.serverless_config else {}, + ) + if self.serverless_config + else {}, } return cluster_info @@ -123,15 +127,14 @@ def create_cluster_v2( self, cluster_name: str, tags: Optional[Dict[str, str]], - provisioned: Optional[Dict[str, Optional[str | Any]]], - serverless: Optional[Dict[str, Optional[str | Any]]], + provisioned: Optional[Dict[str, Any]], + serverless: Optional[Dict[str, Any]], ) -> Tuple[str, str, str, str]: if provisioned: cluster_type = "PROVISIONED" broker_node_group_info = provisioned.get("BrokerNodeGroupInfo") - kafka_version = provisioned.get( - "kafkaVersion", "default-kafka-version") - number_of_broker_nodes = provisioned.get("NumberOfBrokerNodes", 1) + kafka_version = provisioned.get("kafkaVersion", "default-kafka-version") + number_of_broker_nodes = int(provisioned.get("NumberOfBrokerNodes", 1)) storage_mode = provisioned.get("StorageMode", "LOCAL") serverless_config = None elif serverless: @@ -172,7 +175,7 @@ def create_cluster_v2( def describe_cluster_v2(self, cluster_arn: str) -> Dict[str, Any]: cluster = self.clusters[cluster_arn] - cluster_info = { + cluster_info: Dict[str, Any] = { "activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation", "clusterArn": cluster.arn, "clusterName": cluster.cluster_name, @@ -219,10 +222,14 @@ def describe_cluster_v2(self, cluster_arn: str) -> Dict[str, Any]: cluster_info.update( { "serverless": { - "vpcConfigs": cluster.serverless_config.get("vpcConfigs", []) if cluster.serverless_config else [], + "vpcConfigs": cluster.serverless_config.get("vpcConfigs", []) + if cluster.serverless_config + else [], "clientAuthentication": cluster.serverless_config.get( "clientAuthentication", {} - ) if cluster.serverless_config else {}, + ) + if cluster.serverless_config + else {}, } } ) @@ -344,7 +351,7 @@ def list_clusters( for cluster_arn, cluster in self.clusters.items() ] - return cluster_info_list, None + return cluster_info_list def delete_cluster(self, cluster_arn: str, current_version: str) -> Tuple[str, str]: cluster = self.clusters.pop(cluster_arn) From 211db4b4305145ffb3da443708ded3b51571d34d Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Mon, 9 Dec 2024 18:02:01 -0500 Subject: [PATCH 09/14] fixed test --- moto/kafka/responses.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moto/kafka/responses.py b/moto/kafka/responses.py index 2b1b133132eb..302561822d53 100644 --- a/moto/kafka/responses.py +++ b/moto/kafka/responses.py @@ -138,7 +138,7 @@ def list_clusters(self) -> str: max_results = self._get_param("maxResults") next_token = self._get_param("nextToken") - cluster_info_list, next_token = self.kafka_backend.list_clusters( + cluster_info_list = self.kafka_backend.list_clusters( cluster_name_filter=cluster_name_filter, max_results=max_results, next_token=next_token, From a700c75e67e1e14861610a42a5c64bebbeb5a986 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Mon, 9 Dec 2024 18:08:40 -0500 Subject: [PATCH 10/14] run test --- moto/kafka/responses.py | 1 - 1 file changed, 1 deletion(-) diff --git a/moto/kafka/responses.py b/moto/kafka/responses.py index 302561822d53..b899ae703d84 100644 --- a/moto/kafka/responses.py +++ b/moto/kafka/responses.py @@ -4,7 +4,6 @@ from urllib.parse import unquote from moto.core.responses import BaseResponse - from .models import KafkaBackend, kafka_backends From fd52c78e7f6d4acbf6dbdabea8134617c41e13d5 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Mon, 9 Dec 2024 18:24:04 -0500 Subject: [PATCH 11/14] linting --- moto/kafka/responses.py | 1 + 1 file changed, 1 insertion(+) diff --git a/moto/kafka/responses.py b/moto/kafka/responses.py index b899ae703d84..302561822d53 100644 --- a/moto/kafka/responses.py +++ b/moto/kafka/responses.py @@ -4,6 +4,7 @@ from urllib.parse import unquote from moto.core.responses import BaseResponse + from .models import KafkaBackend, kafka_backends From eff07df975e528199e044e6abda96fb76c1c77ba Mon Sep 17 00:00:00 2001 From: Brian Pandola Date: Tue, 10 Dec 2024 01:47:57 -0800 Subject: [PATCH 12/14] Fix urls ARNs contain a forward slash --- moto/kafka/urls.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/moto/kafka/urls.py b/moto/kafka/urls.py index 233c6616eab5..a379b50e1db0 100644 --- a/moto/kafka/urls.py +++ b/moto/kafka/urls.py @@ -8,8 +8,8 @@ url_paths = { "{0}/api/v2/clusters$": KafkaResponse.dispatch, - "{0}/api/v2/clusters/(?P[^/]+)$": KafkaResponse.dispatch, - "{0}/v1/tags/(?P[^/]+)$": KafkaResponse.dispatch, + "{0}/api/v2/clusters/(?P.+)$": KafkaResponse.dispatch, + "{0}/v1/tags/(?P.+)$": KafkaResponse.dispatch, "{0}/v1/clusters$": KafkaResponse.dispatch, - "{0}/v1/clusters/(?P[^/]+)$": KafkaResponse.dispatch, + "{0}/v1/clusters/(?P.+)$": KafkaResponse.dispatch, } From 47acc61a0694ab075a0d1e60b4150a2a4c9290e4 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Wed, 11 Dec 2024 15:09:33 -0500 Subject: [PATCH 13/14] coverage --- moto/kafka/models.py | 56 ++++++++------- tests/test_kafka/test_kafka.py | 127 ++++++++++++++++++++++++++++----- 2 files changed, 138 insertions(+), 45 deletions(-) diff --git a/moto/kafka/models.py b/moto/kafka/models.py index 5d468a31af3b..32e184ab047e 100644 --- a/moto/kafka/models.py +++ b/moto/kafka/models.py @@ -102,7 +102,7 @@ def to_dict(self) -> Dict[str, Any]: elif self.cluster_type == "SERVERLESS": cluster_info["Serverless"] = { - "VpcConfigs": self.serverless_config.get("VpcConfigs", []) + "VpcConfigs": self.serverless_config.get("vpcConfigs", []) if self.serverless_config else [], "ClientAuthentication": self.serverless_config.get( @@ -132,10 +132,10 @@ def create_cluster_v2( ) -> Tuple[str, str, str, str]: if provisioned: cluster_type = "PROVISIONED" - broker_node_group_info = provisioned.get("BrokerNodeGroupInfo") + broker_node_group_info = provisioned.get("brokerNodeGroupInfo") kafka_version = provisioned.get("kafkaVersion", "default-kafka-version") - number_of_broker_nodes = int(provisioned.get("NumberOfBrokerNodes", 1)) - storage_mode = provisioned.get("StorageMode", "LOCAL") + number_of_broker_nodes = int(provisioned.get("numberOfBrokerNodes", 1)) + storage_mode = provisioned.get("storageMode", "LOCAL") serverless_config = None elif serverless: cluster_type = "SERVERLESS" @@ -193,28 +193,30 @@ def describe_cluster_v2(self, cluster_arn: str) -> Dict[str, Any]: if cluster.cluster_type == "PROVISIONED": cluster_info.update( { - "brokerNodeGroupInfo": cluster.broker_node_group_info or {}, - "clientAuthentication": cluster.client_authentication or {}, - "currentBrokerSoftwareInfo": { - "configurationArn": (cluster.configuration_info or {}).get( - "arn", "string" - ), - "configurationRevision": (cluster.configuration_info or {}).get( - "Revision", 1 - ), - "kafkaVersion": cluster.kafka_version, - }, - "encryptionInfo": cluster.encryption_info or {}, - "enhancedMonitoring": cluster.enhanced_monitoring, - "openMonitoring": cluster.open_monitoring or {}, - "loggingInfo": cluster.logging_info or {}, - "numberOfBrokerNodes": cluster.number_of_broker_nodes or 0, - "zookeeperConnectString": cluster.zookeeper_connect_string - or "zookeeper.example.com:2181", - "zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls - or "zookeeper.example.com:2181", - "storageMode": cluster.storage_mode, - "customerActionStatus": "NONE", + "provisioned": { + "brokerNodeGroupInfo": cluster.broker_node_group_info or {}, + "clientAuthentication": cluster.client_authentication or {}, + "currentBrokerSoftwareInfo": { + "configurationArn": (cluster.configuration_info or {}).get( + "arn", "string" + ), + "configurationRevision": ( + cluster.configuration_info or {} + ).get("revision", 1), + "kafkaVersion": cluster.kafka_version, + }, + "encryptionInfo": cluster.encryption_info or {}, + "enhancedMonitoring": cluster.enhanced_monitoring, + "openMonitoring": cluster.open_monitoring or {}, + "loggingInfo": cluster.logging_info or {}, + "numberOfBrokerNodes": cluster.number_of_broker_nodes or 0, + "zookeeperConnectString": cluster.zookeeper_connect_string + or "zookeeper.example.com:2181", + "zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls + or "zookeeper.example.com:2181", + "storageMode": cluster.storage_mode, + "customerActionStatus": "NONE", + } } ) @@ -310,7 +312,7 @@ def describe_cluster(self, cluster_arn: str) -> Dict[str, Any]: "arn", "string" ), "configurationRevision": (cluster.configuration_info or {}).get( - "Revision", 1 + "revision", 1 ), "kafkaVersion": cluster.kafka_version, }, diff --git a/tests/test_kafka/test_kafka.py b/tests/test_kafka/test_kafka.py index f689e89934b1..a88860c5d8c4 100644 --- a/tests/test_kafka/test_kafka.py +++ b/tests/test_kafka/test_kafka.py @@ -3,6 +3,7 @@ import boto3 from moto import mock_aws +from moto.kafka.models import FakeKafkaCluster # See our Development Tips on writing tests for hints on how to write good tests: # http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html @@ -14,10 +15,11 @@ @mock_aws def test_create_cluster_v2(): client = boto3.client("kafka", region_name="ap-southeast-1") - cluster_name = "TestServerlessCluster" + s_cluster_name = "TestServerlessCluster" + p_cluster_name = "TestProvisionedCluster" - response = client.create_cluster_v2( - ClusterName=cluster_name, + s_response = client.create_cluster_v2( + ClusterName=s_cluster_name, Serverless={ "VpcConfigs": [ { @@ -29,28 +31,58 @@ def test_create_cluster_v2(): Tags=FAKE_TAGS, ) - assert response["ClusterArn"].startswith("arn:aws:kafka") - assert response["ClusterName"] == cluster_name - assert response["State"] == "CREATING" + p_response = client.create_cluster_v2( + ClusterName=p_cluster_name, + Provisioned={ + "BrokerNodeGroupInfo": { + "InstanceType": "kafka.m5.large", + "ClientSubnets": ["subnet-0123456789abcdef0"], + "SecurityGroups": ["sg-0123456789abcdef0"], + }, + "KafkaVersion": "2.8.1", + "NumberOfBrokerNodes": 3, + }, + Tags=FAKE_TAGS, + ) - clusters = client.list_clusters_v2() - assert len(clusters["ClusterInfoList"]) == 1 - assert clusters["ClusterInfoList"][0]["ClusterName"] == cluster_name - assert clusters["ClusterInfoList"][0]["ClusterType"] == "SERVERLESS" + assert s_response["ClusterArn"].startswith("arn:aws:kafka") + assert s_response["ClusterName"] == s_cluster_name + assert s_response["State"] == "CREATING" - resp = client.describe_cluster_v2(ClusterArn=response["ClusterArn"]) - cluster_info = resp["ClusterInfo"] + assert p_response["ClusterArn"].startswith("arn:aws:kafka") + assert p_response["ClusterName"] == p_cluster_name + assert p_response["State"] == "CREATING" - assert cluster_info["ClusterName"] == cluster_name - assert cluster_info["State"] == "CREATING" - assert cluster_info["ClusterType"] == "SERVERLESS" - assert cluster_info["Serverless"]["VpcConfigs"][0]["SubnetIds"] == [ + clusters = client.list_clusters_v2() + assert len(clusters["ClusterInfoList"]) == 2 + assert clusters["ClusterInfoList"][0]["ClusterName"] == s_cluster_name + assert clusters["ClusterInfoList"][0]["ClusterType"] == "SERVERLESS" + assert clusters["ClusterInfoList"][1]["ClusterName"] == p_cluster_name + assert clusters["ClusterInfoList"][1]["ClusterType"] == "PROVISIONED" + + s_resp = client.describe_cluster_v2(ClusterArn=s_response["ClusterArn"]) + s_cluster_info = s_resp["ClusterInfo"] + p_resp = client.describe_cluster_v2(ClusterArn=p_response["ClusterArn"]) + p_cluster_info = p_resp["ClusterInfo"] + + assert s_cluster_info["ClusterName"] == s_cluster_name + assert s_cluster_info["State"] == "CREATING" + assert s_cluster_info["ClusterType"] == "SERVERLESS" + assert s_cluster_info["Serverless"]["VpcConfigs"][0]["SubnetIds"] == [ "subnet-0123456789abcdef0" ] - assert cluster_info["Serverless"]["VpcConfigs"][0]["SecurityGroupIds"] == [ + assert s_cluster_info["Serverless"]["VpcConfigs"][0]["SecurityGroupIds"] == [ "sg-0123456789abcdef0" ] - assert cluster_info["Tags"] == FAKE_TAGS + assert s_cluster_info["Tags"] == FAKE_TAGS + + assert p_cluster_info["ClusterName"] == p_cluster_name + assert p_cluster_info["State"] == "CREATING" + assert p_cluster_info["ClusterType"] == "PROVISIONED" + assert ( + p_cluster_info["Provisioned"]["BrokerNodeGroupInfo"]["InstanceType"] + == "kafka.m5.large" + ) @mock_aws @@ -147,3 +179,62 @@ def test_delete_cluster(): client.delete_cluster(ClusterArn=create_resp["ClusterArn"]) clusters = client.list_clusters() assert len(clusters["ClusterInfoList"]) == 0 + + +@mock_aws +def test_to_dict(): + provCluster = FakeKafkaCluster( + cluster_name="TestCluster", + account_id="123456789012", + region_name="us-east-1", + cluster_type="PROVISIONED", + broker_node_group_info={ + "InstanceType": "kafka.m5.large", + "ClientSubnets": ["subnet-0123456789abcdef0"], + "SecurityGroups": ["sg-0123456789abcdef0"], + }, + kafka_version="2.8.1", + number_of_broker_nodes=3, + tags={"Key1": "Value1"}, + enhanced_monitoring="DEFAULT", + state="ACTIVE", + client_authentication=None, + encryption_info=None, + logging_info=None, + open_monitoring=None, + storage_mode="LOCAL", + current_version="1.0", + configuration_info=None, + ) + + provClusterDict = provCluster.to_dict() + + assert provClusterDict["ClusterName"] == "TestCluster" + assert provClusterDict["State"] == "ACTIVE" + assert provClusterDict["ClusterType"] == "PROVISIONED" + + serverlessCluster = FakeKafkaCluster( + cluster_name="TestCluster", + account_id="123456789012", + region_name="us-east-1", + cluster_type="SERVERLESS", + broker_node_group_info=None, + kafka_version="2.8.1", + number_of_broker_nodes=None, + tags={"Key1": "Value1"}, + enhanced_monitoring="DEFAULT", + state="ACTIVE", + client_authentication=None, + encryption_info=None, + logging_info=None, + open_monitoring=None, + storage_mode="LOCAL", + current_version="1.0", + configuration_info=None, + ) + + serverlessClusterDict = serverlessCluster.to_dict() + + assert serverlessClusterDict["ClusterName"] == "TestCluster" + assert serverlessClusterDict["State"] == "ACTIVE" + assert serverlessClusterDict["ClusterType"] == "SERVERLESS" From 5a3e308b0fc9a8561214e27d2e0b09bdbb1d30f4 Mon Sep 17 00:00:00 2001 From: jamarcelin Date: Fri, 13 Dec 2024 13:56:23 -0500 Subject: [PATCH 14/14] removed to_dict --- moto/kafka/models.py | 41 ----------------------- tests/test_kafka/test_kafka.py | 60 ---------------------------------- 2 files changed, 101 deletions(-) diff --git a/moto/kafka/models.py b/moto/kafka/models.py index 32e184ab047e..8308f5a1b2b0 100644 --- a/moto/kafka/models.py +++ b/moto/kafka/models.py @@ -73,47 +73,6 @@ def _generate_arn(self) -> str: partition = get_partition(self.region_name) return f"arn:{partition}:kafka:{self.region_name}:{self.account_id}:{resource_type}/{self.cluster_id}" - def to_dict(self) -> Dict[str, Any]: - cluster_info = { - "ClusterName": self.cluster_name, - "ClusterArn": self.arn, - "ClusterType": self.cluster_type, - "State": self.state, - "CreationTime": self.creation_time, - "CurrentVersion": self.current_version, - "Tags": self.tags, - "ActiveOperationArn": self.active_operation_arn, - } - - if self.cluster_type == "PROVISIONED": - cluster_info["Provisioned"] = { - "BrokerNodeGroupInfo": self.broker_node_group_info, - "KafkaVersion": self.kafka_version, - "NumberOfBrokerNodes": self.number_of_broker_nodes, - "EncryptionInfo": self.encryption_info, - "EnhancedMonitoring": self.enhanced_monitoring, - "OpenMonitoring": self.open_monitoring, - "LoggingInfo": self.logging_info, - "StorageMode": self.storage_mode, - "ClientAuthentication": self.client_authentication, - "ZookeeperConnectString": self.zookeeper_connect_string, - "ZookeeperConnectStringTls": self.zookeeper_connect_string_tls, - } - - elif self.cluster_type == "SERVERLESS": - cluster_info["Serverless"] = { - "VpcConfigs": self.serverless_config.get("vpcConfigs", []) - if self.serverless_config - else [], - "ClientAuthentication": self.serverless_config.get( - "ClientAuthentication", {} - ) - if self.serverless_config - else {}, - } - - return cluster_info - class KafkaBackend(BaseBackend): """Implementation of Kafka APIs.""" diff --git a/tests/test_kafka/test_kafka.py b/tests/test_kafka/test_kafka.py index a88860c5d8c4..d7c39d615222 100644 --- a/tests/test_kafka/test_kafka.py +++ b/tests/test_kafka/test_kafka.py @@ -3,7 +3,6 @@ import boto3 from moto import mock_aws -from moto.kafka.models import FakeKafkaCluster # See our Development Tips on writing tests for hints on how to write good tests: # http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html @@ -179,62 +178,3 @@ def test_delete_cluster(): client.delete_cluster(ClusterArn=create_resp["ClusterArn"]) clusters = client.list_clusters() assert len(clusters["ClusterInfoList"]) == 0 - - -@mock_aws -def test_to_dict(): - provCluster = FakeKafkaCluster( - cluster_name="TestCluster", - account_id="123456789012", - region_name="us-east-1", - cluster_type="PROVISIONED", - broker_node_group_info={ - "InstanceType": "kafka.m5.large", - "ClientSubnets": ["subnet-0123456789abcdef0"], - "SecurityGroups": ["sg-0123456789abcdef0"], - }, - kafka_version="2.8.1", - number_of_broker_nodes=3, - tags={"Key1": "Value1"}, - enhanced_monitoring="DEFAULT", - state="ACTIVE", - client_authentication=None, - encryption_info=None, - logging_info=None, - open_monitoring=None, - storage_mode="LOCAL", - current_version="1.0", - configuration_info=None, - ) - - provClusterDict = provCluster.to_dict() - - assert provClusterDict["ClusterName"] == "TestCluster" - assert provClusterDict["State"] == "ACTIVE" - assert provClusterDict["ClusterType"] == "PROVISIONED" - - serverlessCluster = FakeKafkaCluster( - cluster_name="TestCluster", - account_id="123456789012", - region_name="us-east-1", - cluster_type="SERVERLESS", - broker_node_group_info=None, - kafka_version="2.8.1", - number_of_broker_nodes=None, - tags={"Key1": "Value1"}, - enhanced_monitoring="DEFAULT", - state="ACTIVE", - client_authentication=None, - encryption_info=None, - logging_info=None, - open_monitoring=None, - storage_mode="LOCAL", - current_version="1.0", - configuration_info=None, - ) - - serverlessClusterDict = serverlessCluster.to_dict() - - assert serverlessClusterDict["ClusterName"] == "TestCluster" - assert serverlessClusterDict["State"] == "ACTIVE" - assert serverlessClusterDict["ClusterType"] == "SERVERLESS"