From 44df2b02a65255659405f7fa31f1f24259d1b1bb Mon Sep 17 00:00:00 2001 From: jamarcelin <56656001+jamarcelin@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:55:21 -0500 Subject: [PATCH] Kafka V1 Support (#8381) --- moto/backend_index.py | 1 + moto/backends.py | 4 + moto/kafka/__init__.py | 1 + moto/kafka/exceptions.py | 1 + moto/kafka/models.py | 332 ++++++++++++++++++++++++ moto/kafka/responses.py | 147 +++++++++++ moto/kafka/urls.py | 15 ++ moto/resourcegroupstaggingapi/models.py | 21 ++ tests/test_kafka/__init__.py | 0 tests/test_kafka/test_kafka.py | 180 +++++++++++++ 10 files changed, 702 insertions(+) create mode 100644 moto/kafka/__init__.py create mode 100644 moto/kafka/exceptions.py create mode 100644 moto/kafka/models.py create mode 100644 moto/kafka/responses.py create mode 100644 moto/kafka/urls.py create mode 100644 tests/test_kafka/__init__.py create mode 100644 tests/test_kafka/test_kafka.py diff --git a/moto/backend_index.py b/moto/backend_index.py index 781aeeb2affc..8b4452cb755a 100644 --- a/moto/backend_index.py +++ b/moto/backend_index.py @@ -96,6 +96,7 @@ ("iotdata", re.compile("https?://data\\.iot\\.(.+)\\.amazonaws.com")), ("iotdata", re.compile("https?://data-ats\\.iot\\.(.+)\\.amazonaws.com")), ("ivs", re.compile("https?://ivs\\.(.+)\\.amazonaws\\.com")), + ("kafka", re.compile("https?://kafka\\.(.+)\\.amazonaws\\.com")), ("kinesis", re.compile("https?://kinesis\\.(.+)\\.amazonaws\\.com")), ("kinesis", re.compile("https?://(.+)\\.control-kinesis\\.(.+)\\.amazonaws\\.com")), ("kinesis", re.compile("https?://(.+)\\.data-kinesis\\.(.+)\\.amazonaws\\.com")), diff --git a/moto/backends.py b/moto/backends.py index 4142a8ad659d..e81b7d53dae5 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -80,6 +80,7 @@ from moto.iot.models import IoTBackend from moto.iotdata.models import IoTDataPlaneBackend from moto.ivs.models import IVSBackend + from moto.kafka.models import KafkaBackend from moto.kinesis.models import KinesisBackend from moto.kinesisvideo.models import KinesisVideoBackend from moto.kinesisvideoarchivedmedia.models import KinesisVideoArchivedMediaBackend @@ -258,6 +259,7 @@ def get_service_from_url(url: str) -> Optional[str]: "Literal['iot']", "Literal['iot-data']", "Literal['ivs']", + "Literal['kafka']", "Literal['kinesis']", "Literal['kinesisvideo']", "Literal['kinesis-video-archived-media']", @@ -528,6 +530,8 @@ def get_backend(name: "Literal['iot-data']") -> "BackendDict[IoTDataPlaneBackend @overload def get_backend(name: "Literal['ivs']") -> "BackendDict[IVSBackend]": ... @overload +def get_backend(name: "Literal['kafka']") -> "BackendDict[KafkaBackend]": ... +@overload def get_backend(name: "Literal['kinesis']") -> "BackendDict[KinesisBackend]": ... @overload def get_backend( diff --git a/moto/kafka/__init__.py b/moto/kafka/__init__.py new file mode 100644 index 000000000000..5788123554dc --- /dev/null +++ b/moto/kafka/__init__.py @@ -0,0 +1 @@ +from .models import kafka_backends # noqa: F401 diff --git a/moto/kafka/exceptions.py b/moto/kafka/exceptions.py new file mode 100644 index 000000000000..9dc38515a9b8 --- /dev/null +++ b/moto/kafka/exceptions.py @@ -0,0 +1 @@ +"""Exceptions raised by the kafka service.""" diff --git a/moto/kafka/models.py b/moto/kafka/models.py new file mode 100644 index 000000000000..8308f5a1b2b0 --- /dev/null +++ b/moto/kafka/models.py @@ -0,0 +1,332 @@ +"""KafkaBackend class with methods for supported APIs.""" + +import uuid +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple + +from moto.core.base_backend import BackendDict, BaseBackend +from moto.core.common_models import BaseModel +from moto.utilities.utils import get_partition + +from ..utilities.tagging_service import TaggingService + + +class FakeKafkaCluster(BaseModel): + def __init__( + self, + cluster_name: str, + account_id: str, + region_name: str, + cluster_type: str, + tags: Optional[Dict[str, str]] = None, + broker_node_group_info: Optional[Dict[str, Any]] = None, + kafka_version: Optional[str] = None, + number_of_broker_nodes: Optional[int] = None, + configuration_info: Optional[Dict[str, Any]] = None, + serverless_config: Optional[Dict[str, Any]] = None, + encryption_info: Optional[Dict[str, Any]] = None, + enhanced_monitoring: str = "DEFAULT", + open_monitoring: Optional[Dict[str, Any]] = None, + logging_info: Optional[Dict[str, Any]] = None, + storage_mode: str = "LOCAL", + current_version: str = "1.0", + client_authentication: Optional[Dict[str, Any]] = None, + state: str = "CREATING", + active_operation_arn: Optional[str] = None, + zookeeper_connect_string: Optional[str] = None, + zookeeper_connect_string_tls: Optional[str] = None, + ): + # General attributes + self.cluster_id = str(uuid.uuid4()) + self.cluster_name = cluster_name + self.account_id = account_id + self.region_name = region_name + self.cluster_type = cluster_type + self.tags = tags or {} + self.state = state + self.creation_time = datetime.now().isoformat() + self.current_version = current_version + self.active_operation_arn = active_operation_arn + self.arn = self._generate_arn() + + # Attributes specific to PROVISIONED clusters + self.broker_node_group_info = broker_node_group_info + self.kafka_version = kafka_version + self.number_of_broker_nodes = number_of_broker_nodes + self.configuration_info = configuration_info + self.encryption_info = encryption_info + self.enhanced_monitoring = enhanced_monitoring + self.open_monitoring = open_monitoring + self.logging_info = logging_info + self.storage_mode = storage_mode + self.client_authentication = client_authentication + self.zookeeper_connect_string = zookeeper_connect_string + self.zookeeper_connect_string_tls = zookeeper_connect_string_tls + + # Attributes specific to SERVERLESS clusters + self.serverless_config = serverless_config + + def _generate_arn(self) -> str: + resource_type = ( + "cluster" if self.cluster_type == "PROVISIONED" else "serverless-cluster" + ) + partition = get_partition(self.region_name) + return f"arn:{partition}:kafka:{self.region_name}:{self.account_id}:{resource_type}/{self.cluster_id}" + + +class KafkaBackend(BaseBackend): + """Implementation of Kafka APIs.""" + + def __init__(self, region_name: str, account_id: str): + super().__init__(region_name, account_id) + self.clusters: Dict[str, FakeKafkaCluster] = {} + self.tagger = TaggingService() + + def create_cluster_v2( + self, + cluster_name: str, + tags: Optional[Dict[str, str]], + provisioned: Optional[Dict[str, Any]], + serverless: Optional[Dict[str, Any]], + ) -> Tuple[str, str, str, str]: + if provisioned: + cluster_type = "PROVISIONED" + broker_node_group_info = provisioned.get("brokerNodeGroupInfo") + kafka_version = provisioned.get("kafkaVersion", "default-kafka-version") + number_of_broker_nodes = int(provisioned.get("numberOfBrokerNodes", 1)) + storage_mode = provisioned.get("storageMode", "LOCAL") + serverless_config = None + elif serverless: + cluster_type = "SERVERLESS" + broker_node_group_info = None + kafka_version = None + number_of_broker_nodes = None + storage_mode = None + serverless_config = serverless + + new_cluster = FakeKafkaCluster( + cluster_name=cluster_name, + account_id=self.account_id, + region_name=self.region_name, + cluster_type=cluster_type, + broker_node_group_info=broker_node_group_info, + kafka_version=kafka_version, + number_of_broker_nodes=number_of_broker_nodes, + serverless_config=serverless_config, + tags=tags, + state="CREATING", + storage_mode=storage_mode if storage_mode else "LOCAL", + current_version="1.0", + ) + + self.clusters[new_cluster.arn] = new_cluster + + if tags: + self.tag_resource(new_cluster.arn, tags) + + return ( + new_cluster.arn, + new_cluster.cluster_name, + new_cluster.state, + new_cluster.cluster_type, + ) + + def describe_cluster_v2(self, cluster_arn: str) -> Dict[str, Any]: + cluster = self.clusters[cluster_arn] + + cluster_info: Dict[str, Any] = { + "activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation", + "clusterArn": cluster.arn, + "clusterName": cluster.cluster_name, + "clusterType": cluster.cluster_type, + "creationTime": cluster.creation_time, + "currentVersion": cluster.current_version, + "state": cluster.state, + "stateInfo": { + "code": "string", + "message": "Cluster state details.", + }, + "tags": self.list_tags_for_resource(cluster.arn), + } + + if cluster.cluster_type == "PROVISIONED": + cluster_info.update( + { + "provisioned": { + "brokerNodeGroupInfo": cluster.broker_node_group_info or {}, + "clientAuthentication": cluster.client_authentication or {}, + "currentBrokerSoftwareInfo": { + "configurationArn": (cluster.configuration_info or {}).get( + "arn", "string" + ), + "configurationRevision": ( + cluster.configuration_info or {} + ).get("revision", 1), + "kafkaVersion": cluster.kafka_version, + }, + "encryptionInfo": cluster.encryption_info or {}, + "enhancedMonitoring": cluster.enhanced_monitoring, + "openMonitoring": cluster.open_monitoring or {}, + "loggingInfo": cluster.logging_info or {}, + "numberOfBrokerNodes": cluster.number_of_broker_nodes or 0, + "zookeeperConnectString": cluster.zookeeper_connect_string + or "zookeeper.example.com:2181", + "zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls + or "zookeeper.example.com:2181", + "storageMode": cluster.storage_mode, + "customerActionStatus": "NONE", + } + } + ) + + elif cluster.cluster_type == "SERVERLESS": + cluster_info.update( + { + "serverless": { + "vpcConfigs": cluster.serverless_config.get("vpcConfigs", []) + if cluster.serverless_config + else [], + "clientAuthentication": cluster.serverless_config.get( + "clientAuthentication", {} + ) + if cluster.serverless_config + else {}, + } + } + ) + + return cluster_info + + def list_clusters_v2( + self, + cluster_name_filter: Optional[str], + cluster_type_filter: Optional[str], + max_results: Optional[int], + next_token: Optional[str], + ) -> Tuple[List[Dict[str, Any]], Optional[str]]: + cluster_info_list = [ + { + "clusterArn": cluster.arn, + "clusterName": cluster.cluster_name, + "clusterType": cluster.cluster_type, + "state": cluster.state, + "creationTime": cluster.creation_time, + } + for cluster in self.clusters.values() + ] + + return cluster_info_list, None + + def create_cluster( + self, + broker_node_group_info: Dict[str, Any], + client_authentication: Optional[Dict[str, Any]], + cluster_name: str, + configuration_info: Optional[Dict[str, Any]] = None, + encryption_info: Optional[Dict[str, Any]] = None, + enhanced_monitoring: str = "DEFAULT", + open_monitoring: Optional[Dict[str, Any]] = None, + kafka_version: str = "2.8.1", + logging_info: Optional[Dict[str, Any]] = None, + number_of_broker_nodes: int = 1, + tags: Optional[Dict[str, str]] = None, + storage_mode: str = "LOCAL", + ) -> Tuple[str, str, str]: + new_cluster = FakeKafkaCluster( + cluster_name=cluster_name, + account_id=self.account_id, + region_name=self.region_name, + cluster_type="PROVISIONED", + broker_node_group_info=broker_node_group_info, + client_authentication=client_authentication, + kafka_version=kafka_version, + number_of_broker_nodes=number_of_broker_nodes, + configuration_info=configuration_info, + encryption_info=encryption_info, + enhanced_monitoring=enhanced_monitoring, + open_monitoring=open_monitoring, + logging_info=logging_info, + storage_mode=storage_mode, + ) + + self.clusters[new_cluster.arn] = new_cluster + + if tags: + self.tag_resource(new_cluster.arn, tags) + + return new_cluster.arn, new_cluster.cluster_name, new_cluster.state + + def describe_cluster(self, cluster_arn: str) -> Dict[str, Any]: + cluster = self.clusters[cluster_arn] + + return { + "activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation", + "brokerNodeGroupInfo": cluster.broker_node_group_info or {}, + "clientAuthentication": cluster.client_authentication or {}, + "clusterArn": cluster.arn, + "clusterName": cluster.cluster_name, + "creationTime": cluster.creation_time, + "currentBrokerSoftwareInfo": { + "configurationArn": (cluster.configuration_info or {}).get( + "arn", "string" + ), + "configurationRevision": (cluster.configuration_info or {}).get( + "revision", 1 + ), + "kafkaVersion": cluster.kafka_version, + }, + "currentVersion": cluster.current_version, + "encryptionInfo": cluster.encryption_info or {}, + "enhancedMonitoring": cluster.enhanced_monitoring, + "openMonitoring": cluster.open_monitoring or {}, + "loggingInfo": cluster.logging_info or {}, + "numberOfBrokerNodes": cluster.number_of_broker_nodes or 0, + "state": cluster.state, + "stateInfo": { + "code": "string", + "message": "Cluster state details.", + }, + "tags": self.list_tags_for_resource(cluster.arn), + "zookeeperConnectString": cluster.zookeeper_connect_string + or "zookeeper.example.com:2181", + "zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls + or "zookeeper.example.com:2181", + "storageMode": cluster.storage_mode, + "customerActionStatus": "NONE", + } + + def list_clusters( + self, + cluster_name_filter: Optional[str], + max_results: Optional[int], + next_token: Optional[str], + ) -> List[Dict[str, Any]]: + cluster_info_list = [ + { + "clusterArn": cluster.arn, + "clusterName": cluster.cluster_name, + "state": cluster.state, + "creationTime": cluster.creation_time, + "clusterType": cluster.cluster_type, + } + for cluster_arn, cluster in self.clusters.items() + ] + + return cluster_info_list + + def delete_cluster(self, cluster_arn: str, current_version: str) -> Tuple[str, str]: + cluster = self.clusters.pop(cluster_arn) + return cluster_arn, cluster.state + + def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]: + return self.tagger.get_tag_dict_for_resource(resource_arn) + + def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None: + tags_list = [{"Key": k, "Value": v} for k, v in tags.items()] + self.tagger.tag_resource(resource_arn, tags_list) + + def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: + self.tagger.untag_resource_using_names(resource_arn, tag_keys) + + +kafka_backends = BackendDict(KafkaBackend, "kafka") diff --git a/moto/kafka/responses.py b/moto/kafka/responses.py new file mode 100644 index 000000000000..302561822d53 --- /dev/null +++ b/moto/kafka/responses.py @@ -0,0 +1,147 @@ +"""Handles incoming kafka requests, invokes methods, returns responses.""" + +import json +from urllib.parse import unquote + +from moto.core.responses import BaseResponse + +from .models import KafkaBackend, kafka_backends + + +class KafkaResponse(BaseResponse): + """Handler for Kafka requests and responses.""" + + def __init__(self) -> None: + super().__init__(service_name="kafka") + + @property + def kafka_backend(self) -> KafkaBackend: + """Return backend instance specific for this region.""" + return kafka_backends[self.current_account][self.region] + + def create_cluster_v2(self) -> str: + cluster_name = self._get_param("clusterName") + tags = self._get_param("tags") + provisioned = self._get_param("provisioned") + serverless = self._get_param("serverless") + cluster_arn, cluster_name, state, cluster_type = ( + self.kafka_backend.create_cluster_v2( + cluster_name=cluster_name, + tags=tags, + provisioned=provisioned, + serverless=serverless, + ) + ) + return json.dumps( + dict( + clusterArn=cluster_arn, + clusterName=cluster_name, + state=state, + clusterType=cluster_type, + ) + ) + + def describe_cluster_v2(self) -> str: + cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1]) + cluster_info = self.kafka_backend.describe_cluster_v2( + cluster_arn=cluster_arn, + ) + return json.dumps(dict(clusterInfo=cluster_info)) + + def list_clusters_v2(self) -> str: + cluster_name_filter = self._get_param("clusterNameFilter") + cluster_type_filter = self._get_param("clusterTypeFilter") + max_results = self._get_param("maxResults") + next_token = self._get_param("nextToken") + cluster_info_list, next_token = self.kafka_backend.list_clusters_v2( + cluster_name_filter=cluster_name_filter, + cluster_type_filter=cluster_type_filter, + max_results=max_results, + next_token=next_token, + ) + return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token)) + + def list_tags_for_resource(self) -> str: + resource_arn = unquote(self.parsed_url.path.split("/tags/")[-1]) + tags = self.kafka_backend.list_tags_for_resource( + resource_arn=resource_arn, + ) + return json.dumps(dict(tags=tags)) + + def tag_resource(self) -> str: + resource_arn = unquote(self._get_param("resourceArn")) + tags = self._get_param("tags") + self.kafka_backend.tag_resource( + resource_arn=resource_arn, + tags=tags, + ) + return json.dumps(dict()) + + def untag_resource(self) -> str: + resource_arn = unquote(self._get_param("resourceArn")) + tag_keys = self.__dict__["data"]["tagKeys"] + self.kafka_backend.untag_resource( + resource_arn=resource_arn, + tag_keys=tag_keys, + ) + return json.dumps(dict()) + + def create_cluster(self) -> str: + broker_node_group_info = self._get_param("brokerNodeGroupInfo") + client_authentication = self._get_param("clientAuthentication") + cluster_name = self._get_param("clusterName") + configuration_info = self._get_param("configurationInfo") + encryption_info = self._get_param("encryptionInfo") + enhanced_monitoring = self._get_param("enhancedMonitoring") + open_monitoring = self._get_param("openMonitoring") + kafka_version = self._get_param("kafkaVersion") + logging_info = self._get_param("loggingInfo") + number_of_broker_nodes = self._get_param("numberOfBrokerNodes") + tags = self._get_param("tags") + storage_mode = self._get_param("storageMode") + cluster_arn, cluster_name, state = self.kafka_backend.create_cluster( + broker_node_group_info=broker_node_group_info, + client_authentication=client_authentication, + cluster_name=cluster_name, + configuration_info=configuration_info, + encryption_info=encryption_info, + enhanced_monitoring=enhanced_monitoring, + open_monitoring=open_monitoring, + kafka_version=kafka_version, + logging_info=logging_info, + number_of_broker_nodes=number_of_broker_nodes, + tags=tags, + storage_mode=storage_mode, + ) + return json.dumps( + dict(clusterArn=cluster_arn, clusterName=cluster_name, state=state) + ) + + def describe_cluster(self) -> str: + cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1]) + cluster_info = self.kafka_backend.describe_cluster( + cluster_arn=cluster_arn, + ) + return json.dumps(dict(clusterInfo=cluster_info)) + + def delete_cluster(self) -> str: + cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1]) + current_version = self._get_param("currentVersion") + cluster_arn, state = self.kafka_backend.delete_cluster( + cluster_arn=cluster_arn, + current_version=current_version, + ) + return json.dumps(dict(clusterArn=cluster_arn, state=state)) + + def list_clusters(self) -> str: + cluster_name_filter = self._get_param("clusterNameFilter") + max_results = self._get_param("maxResults") + next_token = self._get_param("nextToken") + + cluster_info_list = self.kafka_backend.list_clusters( + cluster_name_filter=cluster_name_filter, + max_results=max_results, + next_token=next_token, + ) + + return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token)) diff --git a/moto/kafka/urls.py b/moto/kafka/urls.py new file mode 100644 index 000000000000..a379b50e1db0 --- /dev/null +++ b/moto/kafka/urls.py @@ -0,0 +1,15 @@ +"""kafka base URL and path.""" + +from .responses import KafkaResponse + +url_bases = [ + r"https?://kafka\.(.+)\.amazonaws\.com", +] + +url_paths = { + "{0}/api/v2/clusters$": KafkaResponse.dispatch, + "{0}/api/v2/clusters/(?P.+)$": KafkaResponse.dispatch, + "{0}/v1/tags/(?P.+)$": KafkaResponse.dispatch, + "{0}/v1/clusters$": KafkaResponse.dispatch, + "{0}/v1/clusters/(?P.+)$": KafkaResponse.dispatch, +} diff --git a/moto/resourcegroupstaggingapi/models.py b/moto/resourcegroupstaggingapi/models.py index d48a314dd03a..f056af7d16be 100644 --- a/moto/resourcegroupstaggingapi/models.py +++ b/moto/resourcegroupstaggingapi/models.py @@ -14,6 +14,7 @@ from moto.emr.models import ElasticMapReduceBackend, emr_backends from moto.glacier.models import GlacierBackend, glacier_backends from moto.glue.models import GlueBackend, glue_backends +from moto.kafka.models import KafkaBackend, kafka_backends from moto.kinesis.models import KinesisBackend, kinesis_backends from moto.kms.models import KmsBackend, kms_backends from moto.logs.models import LogsBackend, logs_backends @@ -154,6 +155,10 @@ def workspacesweb_backends(self) -> Optional[WorkSpacesWebBackend]: return workspacesweb_backends[self.account_id][self.region_name] return None + @property + def kafka_backend(self) -> KafkaBackend: + return kafka_backends[self.account_id][self.region_name] + @property def sagemaker_backend(self) -> SageMakerModelBackend: return sagemaker_backends[self.account_id][self.region_name] @@ -610,6 +615,22 @@ def format_tag_keys( "Tags": tags, } + # Kafka (MSK) + if self.kafka_backend and ( + not resource_type_filters or "kafka" in resource_type_filters + ): + for msk_cluster in self.kafka_backend.clusters.values(): + tag_dict = self.kafka_backend.list_tags_for_resource(msk_cluster.arn) + tags = [{"Key": key, "Value": value} for key, value in tag_dict.items()] + + if not tags or not tag_filter(tags): + continue + + yield { + "ResourceARN": msk_cluster.arn, + "Tags": tags, + } + # Workspaces Web if self.workspacesweb_backends and ( not resource_type_filters or "workspaces-web" in resource_type_filters diff --git a/tests/test_kafka/__init__.py b/tests/test_kafka/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/test_kafka/test_kafka.py b/tests/test_kafka/test_kafka.py new file mode 100644 index 000000000000..d7c39d615222 --- /dev/null +++ b/tests/test_kafka/test_kafka.py @@ -0,0 +1,180 @@ +"""Unit tests for kafka-supported APIs.""" + +import boto3 + +from moto import mock_aws + +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html + + +FAKE_TAGS = {"TestKey": "TestValue", "TestKey2": "TestValue2"} + + +@mock_aws +def test_create_cluster_v2(): + client = boto3.client("kafka", region_name="ap-southeast-1") + s_cluster_name = "TestServerlessCluster" + p_cluster_name = "TestProvisionedCluster" + + s_response = client.create_cluster_v2( + ClusterName=s_cluster_name, + Serverless={ + "VpcConfigs": [ + { + "SubnetIds": ["subnet-0123456789abcdef0"], + "SecurityGroupIds": ["sg-0123456789abcdef0"], + } + ] + }, + Tags=FAKE_TAGS, + ) + + p_response = client.create_cluster_v2( + ClusterName=p_cluster_name, + Provisioned={ + "BrokerNodeGroupInfo": { + "InstanceType": "kafka.m5.large", + "ClientSubnets": ["subnet-0123456789abcdef0"], + "SecurityGroups": ["sg-0123456789abcdef0"], + }, + "KafkaVersion": "2.8.1", + "NumberOfBrokerNodes": 3, + }, + Tags=FAKE_TAGS, + ) + + assert s_response["ClusterArn"].startswith("arn:aws:kafka") + assert s_response["ClusterName"] == s_cluster_name + assert s_response["State"] == "CREATING" + + assert p_response["ClusterArn"].startswith("arn:aws:kafka") + assert p_response["ClusterName"] == p_cluster_name + assert p_response["State"] == "CREATING" + + clusters = client.list_clusters_v2() + assert len(clusters["ClusterInfoList"]) == 2 + assert clusters["ClusterInfoList"][0]["ClusterName"] == s_cluster_name + assert clusters["ClusterInfoList"][0]["ClusterType"] == "SERVERLESS" + assert clusters["ClusterInfoList"][1]["ClusterName"] == p_cluster_name + assert clusters["ClusterInfoList"][1]["ClusterType"] == "PROVISIONED" + + s_resp = client.describe_cluster_v2(ClusterArn=s_response["ClusterArn"]) + s_cluster_info = s_resp["ClusterInfo"] + p_resp = client.describe_cluster_v2(ClusterArn=p_response["ClusterArn"]) + p_cluster_info = p_resp["ClusterInfo"] + + assert s_cluster_info["ClusterName"] == s_cluster_name + assert s_cluster_info["State"] == "CREATING" + assert s_cluster_info["ClusterType"] == "SERVERLESS" + assert s_cluster_info["Serverless"]["VpcConfigs"][0]["SubnetIds"] == [ + "subnet-0123456789abcdef0" + ] + assert s_cluster_info["Serverless"]["VpcConfigs"][0]["SecurityGroupIds"] == [ + "sg-0123456789abcdef0" + ] + assert s_cluster_info["Tags"] == FAKE_TAGS + + assert p_cluster_info["ClusterName"] == p_cluster_name + assert p_cluster_info["State"] == "CREATING" + assert p_cluster_info["ClusterType"] == "PROVISIONED" + assert ( + p_cluster_info["Provisioned"]["BrokerNodeGroupInfo"]["InstanceType"] + == "kafka.m5.large" + ) + + +@mock_aws +def test_list_tags_for_resource(): + client = boto3.client("kafka", region_name="us-east-2") + create_resp = client.create_cluster( + ClusterName="TestCluster", + BrokerNodeGroupInfo={ + "InstanceType": "kafka.m5.large", + "ClientSubnets": ["subnet-0123456789abcdef0"], + "SecurityGroups": ["sg-0123456789abcdef0"], + }, + KafkaVersion="2.8.1", + NumberOfBrokerNodes=3, + Tags=FAKE_TAGS, + ) + + TempTags = {"TestKey3": "TestValue3"} + + client.tag_resource( + ResourceArn=create_resp["ClusterArn"], + Tags=TempTags, + ) + + tags = client.list_tags_for_resource(ResourceArn=create_resp["ClusterArn"]) + assert tags["Tags"] == {**FAKE_TAGS, **TempTags} + + client.untag_resource( + ResourceArn=create_resp["ClusterArn"], + TagKeys=["TestKey3"], + ) + + tags = client.list_tags_for_resource(ResourceArn=create_resp["ClusterArn"]) + + assert tags["Tags"] == FAKE_TAGS + + +@mock_aws +def test_create_cluster(): + client = boto3.client("kafka", region_name="eu-west-1") + cluster_name = "TestCluster" + response = client.create_cluster( + ClusterName=cluster_name, + BrokerNodeGroupInfo={ + "InstanceType": "kafka.m5.large", + "ClientSubnets": ["subnet-0123456789abcdef0"], + "SecurityGroups": ["sg-0123456789abcdef0"], + }, + KafkaVersion="2.8.1", + NumberOfBrokerNodes=3, + Tags=FAKE_TAGS, + ) + + assert response["ClusterArn"].startswith("arn:aws:kafka") + assert response["ClusterName"] == cluster_name + assert response["State"] == "CREATING" + + clusters = client.list_clusters() + assert len(clusters["ClusterInfoList"]) == 1 + assert clusters["ClusterInfoList"][0]["ClusterName"] == cluster_name + + resp = client.describe_cluster(ClusterArn=response["ClusterArn"]) + assert resp["ClusterInfo"]["ClusterName"] == cluster_name + assert resp["ClusterInfo"]["State"] == "CREATING" + assert resp["ClusterInfo"]["CurrentBrokerSoftwareInfo"]["KafkaVersion"] == "2.8.1" + assert resp["ClusterInfo"]["NumberOfBrokerNodes"] == 3 + assert ( + resp["ClusterInfo"]["BrokerNodeGroupInfo"]["InstanceType"] == "kafka.m5.large" + ) + assert resp["ClusterInfo"]["BrokerNodeGroupInfo"]["ClientSubnets"] == [ + "subnet-0123456789abcdef0" + ] + assert resp["ClusterInfo"]["BrokerNodeGroupInfo"]["SecurityGroups"] == [ + "sg-0123456789abcdef0" + ] + assert resp["ClusterInfo"]["Tags"] == FAKE_TAGS + + +@mock_aws +def test_delete_cluster(): + client = boto3.client("kafka", region_name="us-east-2") + create_resp = client.create_cluster( + ClusterName="TestCluster", + BrokerNodeGroupInfo={ + "InstanceType": "kafka.m5.large", + "ClientSubnets": ["subnet-0123456789abcdef0"], + "SecurityGroups": ["sg-0123456789abcdef0"], + }, + KafkaVersion="2.8.1", + NumberOfBrokerNodes=3, + Tags=FAKE_TAGS, + ) + + client.delete_cluster(ClusterArn=create_resp["ClusterArn"]) + clusters = client.list_clusters() + assert len(clusters["ClusterInfoList"]) == 0