From c5f8623e610d2826d00d46559b33e1ecfce79f67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Tue, 3 Oct 2023 17:02:10 +0200 Subject: [PATCH] Configure Kafka clients based on REST auth method In the previous commit the handling of the authentication header was added to differentiate between basic and OAuth (Bearer token) based authentication. With this change, based on the established configuration, Kafka clients are configured differently. --- karapace/config.py | 2 ++ karapace/kafka_rest_apis/__init__.py | 10 ++----- karapace/kafka_rest_apis/auth_utils.py | 29 ++++++++++++++++++++ karapace/kafka_rest_apis/consumer_manager.py | 5 ++-- tests/unit/test_auth_utils.py | 28 +++++++++++++++++++ 5 files changed, 64 insertions(+), 10 deletions(-) diff --git a/karapace/config.py b/karapace/config.py index f426dfcc5..4358b3bfe 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -62,6 +62,7 @@ class Config(TypedDict): sasl_mechanism: str | None sasl_plain_username: str | None sasl_plain_password: str | None + sasl_oauth_token: str | None topic_name: str metadata_max_age_ms: int admin_metadata_max_age: int @@ -131,6 +132,7 @@ class ConfigDefaults(Config, total=False): "sasl_mechanism": None, "sasl_plain_username": None, SASL_PLAIN_PASSWORD: None, + "sasl_oauth_token": None, "topic_name": DEFAULT_SCHEMA_TOPIC, "metadata_max_age_ms": 60000, "admin_metadata_max_age": 5, diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 369c93a45..4ecd57bdd 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -15,7 +15,7 @@ from karapace.config import Config, create_client_ssl_context from karapace.errors import InvalidSchema from karapace.kafka_rest_apis.admin import KafkaRestAdminClient -from karapace.kafka_rest_apis.auth_utils import get_auth_config_from_header +from karapace.kafka_rest_apis.auth_utils import get_auth_config_from_header, get_kafka_client_auth_parameters_from_config from karapace.kafka_rest_apis.consumer_manager import ConsumerManager from karapace.kafka_rest_apis.error_codes import RESTErrorCodes from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache @@ -461,9 +461,7 @@ async def _maybe_create_async_producer(self) -> AIOKafkaProducer: metadata_max_age_ms=self.config["metadata_max_age_ms"], security_protocol=self.config["security_protocol"], ssl_context=ssl_context, - sasl_mechanism=self.config["sasl_mechanism"], - sasl_plain_username=self.config["sasl_plain_username"], - sasl_plain_password=self.config["sasl_plain_password"], + **get_kafka_client_auth_parameters_from_config(self.config), ) try: @@ -616,13 +614,11 @@ def init_admin_client(self): ssl_cafile=self.config["ssl_cafile"], ssl_certfile=self.config["ssl_certfile"], ssl_keyfile=self.config["ssl_keyfile"], - sasl_mechanism=self.config["sasl_mechanism"], - sasl_plain_username=self.config["sasl_plain_username"], - sasl_plain_password=self.config["sasl_plain_password"], api_version=(1, 0, 0), metadata_max_age_ms=self.config["metadata_max_age_ms"], connections_max_idle_ms=self.config["connections_max_idle_ms"], kafka_client=KarapaceKafkaClient, + **get_kafka_client_auth_parameters_from_config(self.config), ) break except: # pylint: disable=bare-except diff --git a/karapace/kafka_rest_apis/auth_utils.py b/karapace/kafka_rest_apis/auth_utils.py index 0f646178b..ebe1fc6a7 100644 --- a/karapace/kafka_rest_apis/auth_utils.py +++ b/karapace/kafka_rest_apis/auth_utils.py @@ -65,3 +65,32 @@ def get_auth_config_from_header( } raise_unauthorized() + + +@dataclasses.dataclass +class SimpleOauthTokenProvider(AbstractTokenProvider): + _token: str + + async def token(self) -> str: + return self._token + + +class SASLOauthParams(TypedDict): + sasl_mechanism: str + sasl_oauth_token_provider: AbstractTokenProvider + + +def get_kafka_client_auth_parameters_from_config( + config: Config, +) -> Union[SASLPlainConfig, SASLOauthParams]: + if config["sasl_mechanism"] == "OAUTHBEARER": + return { + "sasl_mechanism": config["sasl_mechanism"], + "sasl_oauth_token_provider": SimpleOauthTokenProvider(config["sasl_oauth_token"]), + } + + return { + "sasl_mechanism": config["sasl_mechanism"], + "sasl_plain_username": config["sasl_plain_username"], + "sasl_plain_password": config["sasl_plain_password"], + } diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index 72a1bea9a..c5fe4ab03 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -10,6 +10,7 @@ from kafka.errors import GroupAuthorizationFailedError, IllegalStateError, KafkaConfigurationError, KafkaError from kafka.structs import TopicPartition from karapace.config import Config, create_client_ssl_context +from karapace.kafka_rest_apis.auth_utils import get_kafka_client_auth_parameters_from_config from karapace.kafka_rest_apis.error_codes import RESTErrorCodes from karapace.karapace import empty_response, KarapaceBase from karapace.serialization import DeserializationError, InvalidMessageHeader, InvalidPayload, SchemaRegistrySerializer @@ -205,9 +206,6 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name client_id=internal_name, security_protocol=self.config["security_protocol"], ssl_context=ssl_context, - sasl_mechanism=self.config["sasl_mechanism"], - sasl_plain_username=self.config["sasl_plain_username"], - sasl_plain_password=self.config["sasl_plain_password"], group_id=group_name, fetch_min_bytes=max(1, fetch_min_bytes), # Discard earlier negative values fetch_max_bytes=self.config["consumer_request_max_bytes"], @@ -218,6 +216,7 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name enable_auto_commit=request_data["auto.commit.enable"], auto_offset_reset=request_data["auto.offset.reset"], session_timeout_ms=session_timeout_ms, + **get_kafka_client_auth_parameters_from_config(self.config), ) await c.start() return c diff --git a/tests/unit/test_auth_utils.py b/tests/unit/test_auth_utils.py index 3c6b78278..b31bb0054 100644 --- a/tests/unit/test_auth_utils.py +++ b/tests/unit/test_auth_utils.py @@ -65,3 +65,31 @@ def test_get_auth_config_from_header( config = set_config_defaults(config_override) auth_config = get_auth_config_from_header(auth_header, config) assert auth_config == expected_auth_config + + +async def test_simple_oauth_token_provider_returns_configured_token() -> None: + token_provider = SimpleOauthTokenProvider("TOKEN") + assert await token_provider.token() == "TOKEN" + + +def test_get_client_auth_parameters_from_config_sasl_plain() -> None: + config = set_config_defaults( + {"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"} + ) + + client_auth_params = get_kafka_client_auth_parameters_from_config(config) + + assert client_auth_params == { + "sasl_mechanism": "PLAIN", + "sasl_plain_username": "username", + "sasl_plain_password": "password", + } + + +async def test_get_client_auth_parameters_from_config_oauth() -> None: + config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": "TOKEN"}) + + client_auth_params = get_kafka_client_auth_parameters_from_config(config) + + assert client_auth_params["sasl_mechanism"] == "OAUTHBEARER" + assert await client_auth_params["sasl_oauth_token_provider"].token() == "TOKEN"