Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka V1 Support #8381

Merged
merged 15 commits into from
Dec 13, 2024
1 change: 1 addition & 0 deletions moto/backend_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
("iotdata", re.compile("https?://data\\.iot\\.(.+)\\.amazonaws.com")),
("iotdata", re.compile("https?://data-ats\\.iot\\.(.+)\\.amazonaws.com")),
("ivs", re.compile("https?://ivs\\.(.+)\\.amazonaws\\.com")),
("kafka", re.compile("https?://kafka\\.(.+)\\.amazonaws\\.com")),
("kinesis", re.compile("https?://kinesis\\.(.+)\\.amazonaws\\.com")),
("kinesis", re.compile("https?://(.+)\\.control-kinesis\\.(.+)\\.amazonaws\\.com")),
("kinesis", re.compile("https?://(.+)\\.data-kinesis\\.(.+)\\.amazonaws\\.com")),
Expand Down
4 changes: 4 additions & 0 deletions moto/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from moto.iot.models import IoTBackend
from moto.iotdata.models import IoTDataPlaneBackend
from moto.ivs.models import IVSBackend
from moto.kafka.models import KafkaBackend
from moto.kinesis.models import KinesisBackend
from moto.kinesisvideo.models import KinesisVideoBackend
from moto.kinesisvideoarchivedmedia.models import KinesisVideoArchivedMediaBackend
Expand Down Expand Up @@ -258,6 +259,7 @@ def get_service_from_url(url: str) -> Optional[str]:
"Literal['iot']",
"Literal['iot-data']",
"Literal['ivs']",
"Literal['kafka']",
"Literal['kinesis']",
"Literal['kinesisvideo']",
"Literal['kinesis-video-archived-media']",
Expand Down Expand Up @@ -528,6 +530,8 @@ def get_backend(name: "Literal['iot-data']") -> "BackendDict[IoTDataPlaneBackend
@overload
def get_backend(name: "Literal['ivs']") -> "BackendDict[IVSBackend]": ...
@overload
def get_backend(name: "Literal['kafka']") -> "BackendDict[KafkaBackend]": ...
@overload
def get_backend(name: "Literal['kinesis']") -> "BackendDict[KinesisBackend]": ...
@overload
def get_backend(
Expand Down
1 change: 1 addition & 0 deletions moto/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .models import kafka_backends # noqa: F401
1 change: 1 addition & 0 deletions moto/kafka/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Exceptions raised by the kafka service."""
332 changes: 332 additions & 0 deletions moto/kafka/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
"""KafkaBackend class with methods for supported APIs."""

import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.utilities.utils import get_partition

from ..utilities.tagging_service import TaggingService


class FakeKafkaCluster(BaseModel):
def __init__(
self,
cluster_name: str,
account_id: str,
region_name: str,
cluster_type: str,
tags: Optional[Dict[str, str]] = None,
broker_node_group_info: Optional[Dict[str, Any]] = None,
kafka_version: Optional[str] = None,
number_of_broker_nodes: Optional[int] = None,
configuration_info: Optional[Dict[str, Any]] = None,
serverless_config: Optional[Dict[str, Any]] = None,
encryption_info: Optional[Dict[str, Any]] = None,
enhanced_monitoring: str = "DEFAULT",
open_monitoring: Optional[Dict[str, Any]] = None,
logging_info: Optional[Dict[str, Any]] = None,
storage_mode: str = "LOCAL",
current_version: str = "1.0",
client_authentication: Optional[Dict[str, Any]] = None,
state: str = "CREATING",
active_operation_arn: Optional[str] = None,
zookeeper_connect_string: Optional[str] = None,
zookeeper_connect_string_tls: Optional[str] = None,
):
# General attributes
self.cluster_id = str(uuid.uuid4())
self.cluster_name = cluster_name
self.account_id = account_id
self.region_name = region_name
self.cluster_type = cluster_type
self.tags = tags or {}
self.state = state
self.creation_time = datetime.now().isoformat()
self.current_version = current_version
self.active_operation_arn = active_operation_arn
self.arn = self._generate_arn()

# Attributes specific to PROVISIONED clusters
self.broker_node_group_info = broker_node_group_info
self.kafka_version = kafka_version
self.number_of_broker_nodes = number_of_broker_nodes
self.configuration_info = configuration_info
self.encryption_info = encryption_info
self.enhanced_monitoring = enhanced_monitoring
self.open_monitoring = open_monitoring
self.logging_info = logging_info
self.storage_mode = storage_mode
self.client_authentication = client_authentication
self.zookeeper_connect_string = zookeeper_connect_string
self.zookeeper_connect_string_tls = zookeeper_connect_string_tls

# Attributes specific to SERVERLESS clusters
self.serverless_config = serverless_config

def _generate_arn(self) -> str:
resource_type = (
"cluster" if self.cluster_type == "PROVISIONED" else "serverless-cluster"
)
partition = get_partition(self.region_name)
return f"arn:{partition}:kafka:{self.region_name}:{self.account_id}:{resource_type}/{self.cluster_id}"


class KafkaBackend(BaseBackend):
"""Implementation of Kafka APIs."""

def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.clusters: Dict[str, FakeKafkaCluster] = {}
self.tagger = TaggingService()

def create_cluster_v2(
self,
cluster_name: str,
tags: Optional[Dict[str, str]],
provisioned: Optional[Dict[str, Any]],
serverless: Optional[Dict[str, Any]],
) -> Tuple[str, str, str, str]:
if provisioned:
cluster_type = "PROVISIONED"
broker_node_group_info = provisioned.get("brokerNodeGroupInfo")
kafka_version = provisioned.get("kafkaVersion", "default-kafka-version")
number_of_broker_nodes = int(provisioned.get("numberOfBrokerNodes", 1))
storage_mode = provisioned.get("storageMode", "LOCAL")
serverless_config = None
elif serverless:
cluster_type = "SERVERLESS"
broker_node_group_info = None
kafka_version = None
number_of_broker_nodes = None
storage_mode = None
serverless_config = serverless

new_cluster = FakeKafkaCluster(
cluster_name=cluster_name,
account_id=self.account_id,
region_name=self.region_name,
cluster_type=cluster_type,
broker_node_group_info=broker_node_group_info,
kafka_version=kafka_version,
number_of_broker_nodes=number_of_broker_nodes,
serverless_config=serverless_config,
tags=tags,
state="CREATING",
storage_mode=storage_mode if storage_mode else "LOCAL",
current_version="1.0",
)

self.clusters[new_cluster.arn] = new_cluster

if tags:
self.tag_resource(new_cluster.arn, tags)

return (
new_cluster.arn,
new_cluster.cluster_name,
new_cluster.state,
new_cluster.cluster_type,
)

def describe_cluster_v2(self, cluster_arn: str) -> Dict[str, Any]:
cluster = self.clusters[cluster_arn]

cluster_info: Dict[str, Any] = {
"activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation",
"clusterArn": cluster.arn,
"clusterName": cluster.cluster_name,
"clusterType": cluster.cluster_type,
"creationTime": cluster.creation_time,
"currentVersion": cluster.current_version,
"state": cluster.state,
"stateInfo": {
"code": "string",
"message": "Cluster state details.",
},
"tags": self.list_tags_for_resource(cluster.arn),
}

if cluster.cluster_type == "PROVISIONED":
cluster_info.update(
{
"provisioned": {
"brokerNodeGroupInfo": cluster.broker_node_group_info or {},
"clientAuthentication": cluster.client_authentication or {},
"currentBrokerSoftwareInfo": {
"configurationArn": (cluster.configuration_info or {}).get(
"arn", "string"
),
"configurationRevision": (
cluster.configuration_info or {}
).get("revision", 1),
"kafkaVersion": cluster.kafka_version,
},
"encryptionInfo": cluster.encryption_info or {},
"enhancedMonitoring": cluster.enhanced_monitoring,
"openMonitoring": cluster.open_monitoring or {},
"loggingInfo": cluster.logging_info or {},
"numberOfBrokerNodes": cluster.number_of_broker_nodes or 0,
"zookeeperConnectString": cluster.zookeeper_connect_string
or "zookeeper.example.com:2181",
"zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls
or "zookeeper.example.com:2181",
"storageMode": cluster.storage_mode,
"customerActionStatus": "NONE",
}
}
)

elif cluster.cluster_type == "SERVERLESS":
cluster_info.update(
{
"serverless": {
"vpcConfigs": cluster.serverless_config.get("vpcConfigs", [])
if cluster.serverless_config
else [],
"clientAuthentication": cluster.serverless_config.get(
"clientAuthentication", {}
)
if cluster.serverless_config
else {},
}
}
)

return cluster_info

def list_clusters_v2(
self,
cluster_name_filter: Optional[str],
cluster_type_filter: Optional[str],
max_results: Optional[int],
next_token: Optional[str],
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
cluster_info_list = [
{
"clusterArn": cluster.arn,
"clusterName": cluster.cluster_name,
"clusterType": cluster.cluster_type,
"state": cluster.state,
"creationTime": cluster.creation_time,
}
for cluster in self.clusters.values()
]

return cluster_info_list, None

def create_cluster(
self,
broker_node_group_info: Dict[str, Any],
client_authentication: Optional[Dict[str, Any]],
cluster_name: str,
configuration_info: Optional[Dict[str, Any]] = None,
encryption_info: Optional[Dict[str, Any]] = None,
enhanced_monitoring: str = "DEFAULT",
open_monitoring: Optional[Dict[str, Any]] = None,
kafka_version: str = "2.8.1",
logging_info: Optional[Dict[str, Any]] = None,
number_of_broker_nodes: int = 1,
tags: Optional[Dict[str, str]] = None,
storage_mode: str = "LOCAL",
) -> Tuple[str, str, str]:
new_cluster = FakeKafkaCluster(
cluster_name=cluster_name,
account_id=self.account_id,
region_name=self.region_name,
cluster_type="PROVISIONED",
broker_node_group_info=broker_node_group_info,
client_authentication=client_authentication,
kafka_version=kafka_version,
number_of_broker_nodes=number_of_broker_nodes,
configuration_info=configuration_info,
encryption_info=encryption_info,
enhanced_monitoring=enhanced_monitoring,
open_monitoring=open_monitoring,
logging_info=logging_info,
storage_mode=storage_mode,
)

self.clusters[new_cluster.arn] = new_cluster

if tags:
self.tag_resource(new_cluster.arn, tags)

return new_cluster.arn, new_cluster.cluster_name, new_cluster.state

def describe_cluster(self, cluster_arn: str) -> Dict[str, Any]:
cluster = self.clusters[cluster_arn]

return {
"activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation",
"brokerNodeGroupInfo": cluster.broker_node_group_info or {},
"clientAuthentication": cluster.client_authentication or {},
"clusterArn": cluster.arn,
"clusterName": cluster.cluster_name,
"creationTime": cluster.creation_time,
"currentBrokerSoftwareInfo": {
"configurationArn": (cluster.configuration_info or {}).get(
"arn", "string"
),
"configurationRevision": (cluster.configuration_info or {}).get(
"revision", 1
),
"kafkaVersion": cluster.kafka_version,
},
"currentVersion": cluster.current_version,
"encryptionInfo": cluster.encryption_info or {},
"enhancedMonitoring": cluster.enhanced_monitoring,
"openMonitoring": cluster.open_monitoring or {},
"loggingInfo": cluster.logging_info or {},
"numberOfBrokerNodes": cluster.number_of_broker_nodes or 0,
"state": cluster.state,
"stateInfo": {
"code": "string",
"message": "Cluster state details.",
},
"tags": self.list_tags_for_resource(cluster.arn),
"zookeeperConnectString": cluster.zookeeper_connect_string
or "zookeeper.example.com:2181",
"zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls
or "zookeeper.example.com:2181",
"storageMode": cluster.storage_mode,
"customerActionStatus": "NONE",
}

def list_clusters(
self,
cluster_name_filter: Optional[str],
max_results: Optional[int],
next_token: Optional[str],
) -> List[Dict[str, Any]]:
cluster_info_list = [
{
"clusterArn": cluster.arn,
"clusterName": cluster.cluster_name,
"state": cluster.state,
"creationTime": cluster.creation_time,
"clusterType": cluster.cluster_type,
}
for cluster_arn, cluster in self.clusters.items()
]

return cluster_info_list

def delete_cluster(self, cluster_arn: str, current_version: str) -> Tuple[str, str]:
cluster = self.clusters.pop(cluster_arn)
return cluster_arn, cluster.state

def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
return self.tagger.get_tag_dict_for_resource(resource_arn)

def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None:
tags_list = [{"Key": k, "Value": v} for k, v in tags.items()]
self.tagger.tag_resource(resource_arn, tags_list)

def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None:
self.tagger.untag_resource_using_names(resource_arn, tag_keys)


kafka_backends = BackendDict(KafkaBackend, "kafka")
Loading
Loading