Skip to content

Commit

Permalink
Configure Kafka clients based on REST auth method
Browse files Browse the repository at this point in the history
In the previous commit the handling of the authentication header was
added to differentiate between basic and OAuth (Bearer token) based
authentication. With this change, based on the established
configuration, Kafka clients are configured differently.
  • Loading branch information
Mátyás Kuti committed Oct 4, 2023
1 parent 1e21729 commit c5f8623
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 10 deletions.
2 changes: 2 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Config(TypedDict):
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
sasl_oauth_token: str | None
topic_name: str
metadata_max_age_ms: int
admin_metadata_max_age: int
Expand Down Expand Up @@ -131,6 +132,7 @@ class ConfigDefaults(Config, total=False):
"sasl_mechanism": None,
"sasl_plain_username": None,
SASL_PLAIN_PASSWORD: None,
"sasl_oauth_token": None,
"topic_name": DEFAULT_SCHEMA_TOPIC,
"metadata_max_age_ms": 60000,
"admin_metadata_max_age": 5,
Expand Down
10 changes: 3 additions & 7 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from karapace.config import Config, create_client_ssl_context
from karapace.errors import InvalidSchema
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.auth_utils import get_auth_config_from_header
from karapace.kafka_rest_apis.auth_utils import get_auth_config_from_header, get_kafka_client_auth_parameters_from_config
from karapace.kafka_rest_apis.consumer_manager import ConsumerManager
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache
Expand Down Expand Up @@ -461,9 +461,7 @@ async def _maybe_create_async_producer(self) -> AIOKafkaProducer:
metadata_max_age_ms=self.config["metadata_max_age_ms"],
security_protocol=self.config["security_protocol"],
ssl_context=ssl_context,
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
**get_kafka_client_auth_parameters_from_config(self.config),
)

try:
Expand Down Expand Up @@ -616,13 +614,11 @@ def init_admin_client(self):
ssl_cafile=self.config["ssl_cafile"],
ssl_certfile=self.config["ssl_certfile"],
ssl_keyfile=self.config["ssl_keyfile"],
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
api_version=(1, 0, 0),
metadata_max_age_ms=self.config["metadata_max_age_ms"],
connections_max_idle_ms=self.config["connections_max_idle_ms"],
kafka_client=KarapaceKafkaClient,
**get_kafka_client_auth_parameters_from_config(self.config),
)
break
except: # pylint: disable=bare-except
Expand Down
29 changes: 29 additions & 0 deletions karapace/kafka_rest_apis/auth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,32 @@ def get_auth_config_from_header(
}

raise_unauthorized()


@dataclasses.dataclass
class SimpleOauthTokenProvider(AbstractTokenProvider):
_token: str

async def token(self) -> str:
return self._token


class SASLOauthParams(TypedDict):
sasl_mechanism: str
sasl_oauth_token_provider: AbstractTokenProvider


def get_kafka_client_auth_parameters_from_config(
config: Config,
) -> Union[SASLPlainConfig, SASLOauthParams]:
if config["sasl_mechanism"] == "OAUTHBEARER":
return {
"sasl_mechanism": config["sasl_mechanism"],
"sasl_oauth_token_provider": SimpleOauthTokenProvider(config["sasl_oauth_token"]),
}

return {
"sasl_mechanism": config["sasl_mechanism"],
"sasl_plain_username": config["sasl_plain_username"],
"sasl_plain_password": config["sasl_plain_password"],
}
5 changes: 2 additions & 3 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from kafka.errors import GroupAuthorizationFailedError, IllegalStateError, KafkaConfigurationError, KafkaError
from kafka.structs import TopicPartition
from karapace.config import Config, create_client_ssl_context
from karapace.kafka_rest_apis.auth_utils import get_kafka_client_auth_parameters_from_config
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.karapace import empty_response, KarapaceBase
from karapace.serialization import DeserializationError, InvalidMessageHeader, InvalidPayload, SchemaRegistrySerializer
Expand Down Expand Up @@ -205,9 +206,6 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
client_id=internal_name,
security_protocol=self.config["security_protocol"],
ssl_context=ssl_context,
sasl_mechanism=self.config["sasl_mechanism"],
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
group_id=group_name,
fetch_min_bytes=max(1, fetch_min_bytes), # Discard earlier negative values
fetch_max_bytes=self.config["consumer_request_max_bytes"],
Expand All @@ -218,6 +216,7 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
enable_auto_commit=request_data["auto.commit.enable"],
auto_offset_reset=request_data["auto.offset.reset"],
session_timeout_ms=session_timeout_ms,
**get_kafka_client_auth_parameters_from_config(self.config),
)
await c.start()
return c
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/test_auth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,31 @@ def test_get_auth_config_from_header(
config = set_config_defaults(config_override)
auth_config = get_auth_config_from_header(auth_header, config)
assert auth_config == expected_auth_config


async def test_simple_oauth_token_provider_returns_configured_token() -> None:
token_provider = SimpleOauthTokenProvider("TOKEN")
assert await token_provider.token() == "TOKEN"


def test_get_client_auth_parameters_from_config_sasl_plain() -> None:
config = set_config_defaults(
{"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"}
)

client_auth_params = get_kafka_client_auth_parameters_from_config(config)

assert client_auth_params == {
"sasl_mechanism": "PLAIN",
"sasl_plain_username": "username",
"sasl_plain_password": "password",
}


async def test_get_client_auth_parameters_from_config_oauth() -> None:
config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": "TOKEN"})

client_auth_params = get_kafka_client_auth_parameters_from_config(config)

assert client_auth_params["sasl_mechanism"] == "OAUTHBEARER"
assert await client_auth_params["sasl_oauth_token_provider"].token() == "TOKEN"

0 comments on commit c5f8623

Please sign in to comment.