Skip to content

Commit

Permalink
Add Kraft to Kafka containers
Browse files Browse the repository at this point in the history
  • Loading branch information
jfmlima committed Jun 20, 2024
1 parent 090bd0d commit 7562f56
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 11 deletions.
30 changes: 30 additions & 0 deletions core/testcontainers/core/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Callable

Check warning on line 1 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L1

Added line #L1 was not covered by tests

from packaging.version import Version

Check warning on line 3 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L3

Added line #L3 was not covered by tests


class ComparableVersion:
def __init__(self, version):
self.version = Version(version)

Check warning on line 8 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L6-L8

Added lines #L6 - L8 were not covered by tests

def __lt__(self, other: str):

Check warning on line 10 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L10

Added line #L10 was not covered by tests
return self._apply_op(other, lambda x, y: x < y)

def __le__(self, other: str):

Check warning on line 13 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L13

Added line #L13 was not covered by tests
return self._apply_op(other, lambda x, y: x <= y)

def __eq__(self, other: str):

Check warning on line 16 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L16

Added line #L16 was not covered by tests
return self._apply_op(other, lambda x, y: x == y)

def __ne__(self, other: str):

Check warning on line 19 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L19

Added line #L19 was not covered by tests
return self._apply_op(other, lambda x, y: x != y)

def __gt__(self, other: str):

Check warning on line 22 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L22

Added line #L22 was not covered by tests
return self._apply_op(other, lambda x, y: x > y)

def __ge__(self, other: str):

Check warning on line 25 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L25

Added line #L25 was not covered by tests
return self._apply_op(other, lambda x, y: x >= y)

def _apply_op(self, other: str, op: Callable[[Version, Version], bool]):
other = Version(other)
return op(self.version, other)

Check warning on line 30 in core/testcontainers/core/version.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/version.py#L28-L30

Added lines #L28 - L30 were not covered by tests
112 changes: 101 additions & 11 deletions modules/kafka/testcontainers/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import base64
import tarfile
import time
import uuid
from io import BytesIO
from textwrap import dedent

from testcontainers.core.container import DockerContainer
from testcontainers.core.utils import raise_for_deprecated_parameter
from testcontainers.core.version import ComparableVersion
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka._redpanda import RedpandaContainer
from typing_extensions import Self

__all__ = [
"KafkaContainer",
Expand All @@ -29,15 +32,26 @@ class KafkaContainer(DockerContainer):
"""

TC_START_SCRIPT = "/tc-start.sh"
MIN_KRAFT_TAG = "7.0.0"

def __init__(self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093, **kwargs) -> None:
def __init__(
self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093, **kwargs
) -> None:
raise_for_deprecated_parameter(kwargs, "port_to_expose", "port")
super().__init__(image, **kwargs)
self.port = port
self.kraft_enabled = False
self.wait_for = r".*\[KafkaServer id=\d+\] started.*"
self.boot_command = ""
self.cluster_id = self._random_uuid()
self.listeners = f"PLAINTEXT://0.0.0.0:{self.port},BROKER://0.0.0.0:9092"
self.security_protocol_map = "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"

self.with_exposed_ports(self.port)
listeners = f"PLAINTEXT://0.0.0.0:{self.port},BROKER://0.0.0.0:9092"
self.with_env("KAFKA_LISTENERS", listeners)
self.with_env("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
self.with_env("KAFKA_LISTENERS", self.listeners)
self.with_env(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", self.security_protocol_map
)
self.with_env("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")

self.with_env("KAFKA_BROKER_ID", "1")
Expand All @@ -46,6 +60,85 @@ def __init__(self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093,
self.with_env("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "10000000")
self.with_env("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")

def with_kraft(self) -> Self:
self._verify_min_kraft_version()
self.kraft_enabled = True
return self

def _verify_min_kraft_version(self):
actual_version = self.image.split(":")[-1]

if ComparableVersion(actual_version) < self.MIN_KRAFT_TAG:
raise ValueError(
f"Provided Confluent Platform's version {actual_version} "
f"is not supported in Kraft mode"
f" (must be {self.MIN_KRAFT_TAG} or above)"
)

def with_cluster_id(self, cluster_id: str) -> Self:
self.cluster_id = cluster_id
return self

@classmethod
def _random_uuid(cls):
uuid_value = uuid.uuid4()
uuid_bytes = uuid_value.bytes
base64_encoded_uuid = base64.b64encode(uuid_bytes)

return base64_encoded_uuid.decode()

def configure(self):
if self.kraft_enabled:
self._configure_kraft()
else:
self._configure_zookeeper()

def _configure_kraft(self) -> None:
self.wait_for = r".*Kafka Server started.*"

self.with_env("CLUSTER_ID", self.cluster_id)
self.with_env("KAFKA_NODE_ID", 1)
self.with_env(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
f"{self.security_protocol_map},CONTROLLER:PLAINTEXT",
)
self.with_env(
"KAFKA_LISTENERS",
f"{self.listeners},CONTROLLER://0.0.0.0:9094",
)
self.with_env("KAFKA_PROCESS_ROLES", "broker,controller")

network_alias = self._get_network_alias()
controller_quorum_voters = f"1@{network_alias}:9094"
self.with_env("KAFKA_CONTROLLER_QUORUM_VOTERS", controller_quorum_voters)
self.with_env("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")

self.boot_command = f"""
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t {self.cluster_id} -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
"""

def _get_network_alias(self):
if self._network:
return next(
iter(
self._network_aliases
or [self._network.name or self._kwargs.get("network", [])]
),
None,
)

return "localhost"

def _configure_zookeeper(self) -> None:
self.boot_command = """
echo 'clientPort=2181' > zookeeper.properties
echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
zookeeper-server-start zookeeper.properties &
export KAFKA_ZOOKEEPER_CONNECT='localhost:2181'
"""

def get_bootstrap_server(self) -> str:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.port)
Expand All @@ -59,11 +152,7 @@ def tc_start(self) -> None:
dedent(
f"""
#!/bin/bash
echo 'clientPort=2181' > zookeeper.properties
echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
zookeeper-server-start zookeeper.properties &
export KAFKA_ZOOKEEPER_CONNECT='localhost:2181'
{self.boot_command}
export KAFKA_ADVERTISED_LISTENERS={listeners}
. /etc/confluent/docker/bash-config
/etc/confluent/docker/configure
Expand All @@ -78,10 +167,11 @@ def tc_start(self) -> None:
def start(self, timeout=30) -> "KafkaContainer":
script = KafkaContainer.TC_START_SCRIPT
command = f'sh -c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
self.configure()
self.with_command(command)
super().start()
self.tc_start()
wait_for_logs(self, r".*\[KafkaServer id=\d+\] started.*", timeout=timeout)
wait_for_logs(self, self.wait_for, timeout=timeout)
return self

def create_file(self, content: bytes, path: str) -> None:
Expand Down
6 changes: 6 additions & 0 deletions modules/kafka/tests/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ def test_kafka_producer_consumer():
produce_and_consume_kafka_message(container)


def test_kafka_with_kraft_producer_consumer():
with KafkaContainer().with_kraft() as container:
assert container.kraft_enabled
produce_and_consume_kafka_message(container)


def test_kafka_producer_consumer_custom_port():
with KafkaContainer(port=9888) as container:
assert container.port == 9888
Expand Down

0 comments on commit 7562f56

Please sign in to comment.