Skip to content

Commit

Permalink
Merge pull request #757 from Aiven-Open/matyaskuti/confluent_kafka_fo…
Browse files Browse the repository at this point in the history
…r_admin_client

Use confluent-kafka-python for Kafka admin client instead of kafka-python
  • Loading branch information
eliax1996 authored Nov 20, 2023
2 parents 327407d + 79c6198 commit b0d6b29
Show file tree
Hide file tree
Showing 26 changed files with 651 additions and 472 deletions.
35 changes: 15 additions & 20 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from enum import Enum
from functools import partial
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import PartitionMetadata, TopicPartition
Expand All @@ -33,6 +32,7 @@
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess
from karapace.config import Config
from karapace.kafka_admin import KafkaAdminClient
from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config
from karapace.key_format import KeyFormatter
from karapace.utils import assert_never
Expand Down Expand Up @@ -186,7 +186,7 @@ def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[Pa


@contextlib.contextmanager
def _admin(config: Config) -> KafkaAdminClient:
def _admin(config: Config) -> Iterator[KafkaAdminClient]:
"""Creates an automatically closing Kafka admin client.
:param config: for the client.
Expand All @@ -201,10 +201,7 @@ def _admin(config: Config) -> KafkaAdminClient:
retry=retry_if_exception_type(KafkaError),
)(kafka_admin_from_config)(config)

try:
yield admin
finally:
admin.close()
yield admin


@retry(
Expand All @@ -222,26 +219,24 @@ def _maybe_create_topic(
topic_configs: Mapping[str, str],
) -> bool:
"""Returns True if topic creation was successful, False if topic already exists"""
topic = NewTopic(
name=name,
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=replication_factor,
topic_configs=topic_configs,
)

with _admin(config) as admin:
try:
admin.create_topics([topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
admin.new_topic(
name,
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=replication_factor,
config=dict(topic_configs),
)
except TopicAlreadyExistsError:
LOG.debug("Topic %r already exists", topic.name)
LOG.debug("Topic %r already exists", name)
return False

LOG.info(
"Created topic %r (partition count: %s, replication factor: %s, config: %s)",
topic.name,
topic.num_partitions,
topic.replication_factor,
topic.topic_configs,
name,
constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor,
topic_configs,
)
return True

Expand Down Expand Up @@ -520,7 +515,7 @@ def create_backup(
topic_configurations = get_topic_configurations(
admin=admin,
topic_name=topic_name,
config_source_filter={ConfigSource.TOPIC_CONFIG},
config_source_filter={ConfigSource.DYNAMIC_TOPIC_CONFIG},
)

# Note: It's expected that we at some point want to introduce handling of
Expand Down
43 changes: 10 additions & 33 deletions karapace/backup/topic_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,12 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from enum import Enum
from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType
from kafka.errors import for_code
from kafka.protocol.admin import DescribeConfigsRequest
from typing import Container, Dict, Final
from __future__ import annotations

from karapace.kafka_admin import ConfigSource, KafkaAdminClient
from typing import Container, Final

class ConfigSource(int, Enum):
UNKNOWN = 0
TOPIC_CONFIG = 1
DYNAMIC_BROKER_CONFIG = 2
DYNAMIC_DEFAULT_BROKER_CONFIG = 3
STATIC_BROKER_CONFIG = 4
DEFAULT_CONFIG = 5
DYNAMIC_BROKER_LOGGER_CONFIG = 6


ALL_CONFIG_SOURCES: Final = {item.value for item in ConfigSource.__members__.values()}
ALL_CONFIG_SOURCES: Final = ConfigSource


DEFAULT_CONFIGS: Final = [
Expand All @@ -35,7 +22,7 @@ def get_topic_configurations(
admin: KafkaAdminClient,
topic_name: str,
config_source_filter: Container[ConfigSource] = (),
) -> Dict[str, str]:
) -> dict[str, str]:
"""Get configurations of the specified topic. The following configurations will be retrieved by default:
- `cleanup.policy`
- `min.insync.replicas`
Expand All @@ -47,18 +34,8 @@ def get_topic_configurations(
:param config_source_filter: returns all the configurations that match the sources specified,
plus the default configurations. If empty, returns only the default configurations.
"""
if admin._matching_api_version(DescribeConfigsRequest) == 0: # pylint: disable=protected-access
raise NotImplementedError("Broker version is not supported")
req_cfgs = [ConfigResource(ConfigResourceType.TOPIC, topic_name)]
cfgs = admin.describe_configs(req_cfgs)
assert len(cfgs) == 1
assert len(cfgs[0].resources) == 1
err, _, _, _, config_values = cfgs[0].resources[0]
if err != 0:
raise for_code(err)
topic_config = {}
for cv in config_values:
name, val, _, config_source, _, _ = cv
if name in DEFAULT_CONFIGS or (config_source in config_source_filter):
topic_config[name] = val
return topic_config
return admin.get_topic_config(
topic_name,
config_name_filter=DEFAULT_CONFIGS,
config_source_filter=config_source_filter,
)
2 changes: 1 addition & 1 deletion karapace/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

SCHEMA_TOPIC_NUM_PARTITIONS: Final = 1
API_VERSION_AUTO_TIMEOUT_MS: Final = 30000
TOPIC_CREATION_TIMEOUT_MS: Final = 20000
TOPIC_CREATION_TIMEOUT_S: Final = 20
DEFAULT_SCHEMA_TOPIC: Final = "_schemas"
DEFAULT_PRODUCER_MAX_REQUEST: Final = 1048576
DEFAULT_AIOHTTP_CLIENT_MAX_SIZE: Final = 1048576
Expand Down
Loading

0 comments on commit b0d6b29

Please sign in to comment.