Skip to content

Commit

Permalink
Replace admin client with confluent-kafka one
Browse files Browse the repository at this point in the history
This change replaces all Kafka admin client usages with a new
implementation, based on confluent-kafka-python's `AdminClient`, aiming
to keep the same interface as much as possible for the time being.

The goal is not to completely remove kafka-python as a dependency,
rather take a step towards that. The kafka-python library is still widely
used in the codebase, eg. for error handling (even in the new
`KafkaAdminClient`).

The new `KafkaAdminClient` takes the place of all previous admin clients
used, thus not specific to the REST proxy anymore. Hence it's move to a
new module as `karapace.kafka_admin.KafkaAdminClient`, which also
consolidates all references to the `confluent_kafka` lib. The test suite
has also been moved and expanded.

Wherever possible the same interface was kept, only adding new methods
to simplify client code (to a minimal extent). This is most notable in
tests and the backups functionality. The new admin client aims to hide
away confluent-kafka-python's Future-based async client approach by
providing wrapper methods that resolve these futures.

Typing (issues) from `confluent_kafka` are ignored for now, no typing
stubs have been added on purpose.

Resources:
* confluent-kafka-python documentation: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#
* librdkafka configuration documentation: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
  • Loading branch information
Mátyás Kuti committed Nov 15, 2023
1 parent df45aae commit 79c6198
Show file tree
Hide file tree
Showing 24 changed files with 633 additions and 469 deletions.
35 changes: 15 additions & 20 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from enum import Enum
from functools import partial
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import PartitionMetadata, TopicPartition
Expand All @@ -33,6 +32,7 @@
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess
from karapace.config import Config
from karapace.kafka_admin import KafkaAdminClient
from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config
from karapace.key_format import KeyFormatter
from karapace.utils import assert_never
Expand Down Expand Up @@ -186,7 +186,7 @@ def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[Pa


@contextlib.contextmanager
def _admin(config: Config) -> KafkaAdminClient:
def _admin(config: Config) -> Iterator[KafkaAdminClient]:
"""Creates an automatically closing Kafka admin client.
:param config: for the client.
Expand All @@ -201,10 +201,7 @@ def _admin(config: Config) -> KafkaAdminClient:
retry=retry_if_exception_type(KafkaError),
)(kafka_admin_from_config)(config)

try:
yield admin
finally:
admin.close()
yield admin


@retry(
Expand All @@ -222,26 +219,24 @@ def _maybe_create_topic(
topic_configs: Mapping[str, str],
) -> bool:
"""Returns True if topic creation was successful, False if topic already exists"""
topic = NewTopic(
name=name,
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=replication_factor,
topic_configs=topic_configs,
)

with _admin(config) as admin:
try:
admin.create_topics([topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
admin.new_topic(
name,
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=replication_factor,
config=dict(topic_configs),
)
except TopicAlreadyExistsError:
LOG.debug("Topic %r already exists", topic.name)
LOG.debug("Topic %r already exists", name)
return False

LOG.info(
"Created topic %r (partition count: %s, replication factor: %s, config: %s)",
topic.name,
topic.num_partitions,
topic.replication_factor,
topic.topic_configs,
name,
constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor,
topic_configs,
)
return True

Expand Down Expand Up @@ -520,7 +515,7 @@ def create_backup(
topic_configurations = get_topic_configurations(
admin=admin,
topic_name=topic_name,
config_source_filter={ConfigSource.TOPIC_CONFIG},
config_source_filter={ConfigSource.DYNAMIC_TOPIC_CONFIG},
)

# Note: It's expected that we at some point want to introduce handling of
Expand Down
43 changes: 10 additions & 33 deletions karapace/backup/topic_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,12 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from enum import Enum
from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType
from kafka.errors import for_code
from kafka.protocol.admin import DescribeConfigsRequest
from typing import Container, Dict, Final
from __future__ import annotations

from karapace.kafka_admin import ConfigSource, KafkaAdminClient
from typing import Container, Final

class ConfigSource(int, Enum):
UNKNOWN = 0
TOPIC_CONFIG = 1
DYNAMIC_BROKER_CONFIG = 2
DYNAMIC_DEFAULT_BROKER_CONFIG = 3
STATIC_BROKER_CONFIG = 4
DEFAULT_CONFIG = 5
DYNAMIC_BROKER_LOGGER_CONFIG = 6


ALL_CONFIG_SOURCES: Final = {item.value for item in ConfigSource.__members__.values()}
ALL_CONFIG_SOURCES: Final = ConfigSource


DEFAULT_CONFIGS: Final = [
Expand All @@ -35,7 +22,7 @@ def get_topic_configurations(
admin: KafkaAdminClient,
topic_name: str,
config_source_filter: Container[ConfigSource] = (),
) -> Dict[str, str]:
) -> dict[str, str]:
"""Get configurations of the specified topic. The following configurations will be retrieved by default:
- `cleanup.policy`
- `min.insync.replicas`
Expand All @@ -47,18 +34,8 @@ def get_topic_configurations(
:param config_source_filter: returns all the configurations that match the sources specified,
plus the default configurations. If empty, returns only the default configurations.
"""
if admin._matching_api_version(DescribeConfigsRequest) == 0: # pylint: disable=protected-access
raise NotImplementedError("Broker version is not supported")
req_cfgs = [ConfigResource(ConfigResourceType.TOPIC, topic_name)]
cfgs = admin.describe_configs(req_cfgs)
assert len(cfgs) == 1
assert len(cfgs[0].resources) == 1
err, _, _, _, config_values = cfgs[0].resources[0]
if err != 0:
raise for_code(err)
topic_config = {}
for cv in config_values:
name, val, _, config_source, _, _ = cv
if name in DEFAULT_CONFIGS or (config_source in config_source_filter):
topic_config[name] = val
return topic_config
return admin.get_topic_config(
topic_name,
config_name_filter=DEFAULT_CONFIGS,
config_source_filter=config_source_filter,
)
2 changes: 1 addition & 1 deletion karapace/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

SCHEMA_TOPIC_NUM_PARTITIONS: Final = 1
API_VERSION_AUTO_TIMEOUT_MS: Final = 30000
TOPIC_CREATION_TIMEOUT_MS: Final = 20000
TOPIC_CREATION_TIMEOUT_S: Final = 20
DEFAULT_SCHEMA_TOPIC: Final = "_schemas"
DEFAULT_PRODUCER_MAX_REQUEST: Final = 1048576
DEFAULT_AIOHTTP_CLIENT_MAX_SIZE: Final = 1048576
Expand Down
Loading

0 comments on commit 79c6198

Please sign in to comment.