Skip to content

Commit

Permalink
Allow creating consumer without topic
Browse files Browse the repository at this point in the history
Creating a consumer without a topic first and subscribing later is a
valid scenario and should be allowed. This also makes it possible to
create a test fixture for consumers, removing much duplication from
tests.
  • Loading branch information
Mátyás Kuti committed Jan 31, 2024
1 parent 27eb530 commit 94f390c
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 31 deletions.
5 changes: 3 additions & 2 deletions karapace/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
class KafkaConsumer(_KafkaConfigMixin, Consumer):
def __init__(
self,
topic: str,
bootstrap_servers: Iterable[str] | str,
topic: str | None = None,
verify_connection: bool = True,
**params: Unpack[KafkaClientParams],
) -> None:
Expand All @@ -32,7 +32,8 @@ def __init__(

super().__init__(bootstrap_servers, verify_connection, **params)

self.subscribe([topic])
if topic is not None:
self.subscribe([topic])

@staticmethod
def _create_group_id() -> str:
Expand Down
4 changes: 2 additions & 2 deletions karapace/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ def kafka_admin_from_config(config: Config) -> KafkaAdminClient:
@contextlib.contextmanager
def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaConsumer]:
consumer = KafkaConsumer(
topic,
enable_auto_commit=False,
bootstrap_servers=config["bootstrap_uri"],
topic=topic,
enable_auto_commit=False,
client_id=config["client_id"],
security_protocol=config["security_protocol"],
ssl_cafile=config["ssl_cafile"],
Expand Down
4 changes: 2 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ def _create_consumer_from_config(config: Config) -> KafkaConsumer:
# Group not set on purpose, all consumers read the same data
session_timeout_ms = config["session_timeout_ms"]
return KafkaConsumer(
config["topic_name"],
enable_auto_commit=False,
bootstrap_servers=config["bootstrap_uri"],
topic=config["topic_name"],
enable_auto_commit=False,
client_id=config["client_id"],
fetch_max_wait_ms=50,
security_protocol=config["security_protocol"],
Expand Down
14 changes: 14 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from karapace.client import Client
from karapace.config import Config, set_config_defaults, write_config
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka.producer import KafkaProducer
from karapace.kafka_rest_apis import KafkaRest
from pathlib import Path
Expand Down Expand Up @@ -215,6 +216,19 @@ def fixture_admin(kafka_servers: KafkaServers) -> Iterator[KafkaAdminClient]:
yield KafkaAdminClient(bootstrap_servers=kafka_servers.bootstrap_servers)


@pytest.fixture(scope="function", name="consumer")
def fixture_consumer(
kafka_servers: KafkaServers,
) -> Iterator[KafkaConsumer]:
consumer = KafkaConsumer(
bootstrap_servers=kafka_servers.bootstrap_servers,
auto_offset_reset="earliest",
enable_auto_commit=False,
)
yield consumer
consumer.close()


@pytest.fixture(scope="function", name="rest_async")
async def fixture_rest_async(
request: SubRequest,
Expand Down
29 changes: 4 additions & 25 deletions tests/integration/kafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,10 @@


class TestPartitionsForTopic:
def test_partitions_for_returns_empty_for_unknown_topic(self, kafka_servers: KafkaServers) -> None:
consumer = KafkaConsumer(
bootstrap_servers=kafka_servers.bootstrap_servers,
topic="nonexistent",
)

def test_partitions_for_returns_empty_for_unknown_topic(self, consumer: KafkaConsumer) -> None:
assert consumer.partitions_for_topic("nonexistent") == {}

def test_partitions_for(self, kafka_servers: KafkaServers, new_topic: NewTopic) -> None:
consumer = KafkaConsumer(
bootstrap_servers=kafka_servers.bootstrap_servers,
topic=new_topic.topic,
)

def test_partitions_for(self, consumer: KafkaConsumer, new_topic: NewTopic) -> None:
partitions = consumer.partitions_for_topic(new_topic.topic)

assert len(partitions) == 1
Expand All @@ -47,29 +37,18 @@ def test_get_watermark_offsets_unkown_topic(self, kafka_servers: KafkaServers) -
with pytest.raises(UnknownTopicOrPartitionError):
_, _ = consumer.get_watermark_offsets(TopicPartition("nonexistent", 0))

def test_get_watermark_offsets_empty_topic(self, kafka_servers: KafkaServers, new_topic: NewTopic) -> None:
consumer = KafkaConsumer(
bootstrap_servers=kafka_servers.bootstrap_servers,
topic=new_topic.topic,
auto_offset_reset="earliest",
)

def test_get_watermark_offsets_empty_topic(self, consumer: KafkaConsumer, new_topic: NewTopic) -> None:
beginning, end = consumer.get_watermark_offsets(TopicPartition(new_topic.topic, 0))

assert beginning == 0
assert end == 0

def test_get_watermark_offsets_topic_with_one_message(
self,
kafka_servers: KafkaServers,
consumer: KafkaConsumer,
producer: KafkaProducer,
new_topic: NewTopic,
) -> None:
consumer = KafkaConsumer(
bootstrap_servers=kafka_servers.bootstrap_servers,
topic=new_topic.topic,
auto_offset_reset="earliest",
)
producer.send(new_topic.topic)
producer.flush()

Expand Down

0 comments on commit 94f390c

Please sign in to comment.