Skip to content

Commit

Permalink
Implement OAuth
Browse files Browse the repository at this point in the history
  • Loading branch information
yzhan289 committed Mar 27, 2023
1 parent 9613334 commit 154d085
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,21 @@ def __get_authentication_config(self):
"sasl.kerberos.keytab": self.config._sasl_kerberos_keytab,
"sasl.kerberos.principal": self.config._sasl_kerberos_principal,
"sasl.kerberos.service.name": self.config._sasl_kerberos_service_name,
# "sasl.oauthbearer.client.id": self.config._sasl_oauth_token_provider.get("client_id"),
# "sasl.oauthbearer.token.endpoint.url": self.config._sasl_oauth_token_provider.get("url"),
# "sasl.oauthbearer.client.secret": self.config._sasl_oauth_token_provider.get("client_secret"),
}

if self.config._sasl_mechanism == "OAUTHBEARER":
extras_parameters['sasl.oauthbearer.method'] = "oidc"
extras_parameters["sasl.oauthbearer.client.id"] = (self.config._sasl_oauth_token_provider.get("client_id"),)
extras_parameters["sasl.oauthbearer.token.endpoint.url"] = (
self.config._sasl_oauth_token_provider.get("url"),
)
extras_parameters["sasl.oauthbearer.client.secret"] = (
self.config._sasl_oauth_token_provider.get("client_secret"),
)

for key, value in extras_parameters.items():
# Do not add the value if it's not specified
if value:
Expand Down
13 changes: 13 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,16 @@ def validate_config(self):

if isinstance(self._kafka_version, str):
self._kafka_version = tuple(map(int, self._kafka_version.split(".")))

if self._sasl_mechanism == "OAUTHBEARER":
if self._sasl_oauth_token_provider is None:
raise ConfigurationError("sasl_oauth_token_provider required for OAUTHBEARER sasl")

if self._sasl_oauth_token_provider.get("url") is None:
raise ConfigurationError("The `url` setting of `auth_token` reader is required")

elif self._sasl_oauth_token_provider.get("client_id") is None:
raise ConfigurationError("The `client_id` setting of `auth_token` reader is required")

elif self._sasl_oauth_token_provider.get("client_secret") is None:
raise ConfigurationError("The `client_secret` setting of `auth_token` reader is required")
48 changes: 1 addition & 47 deletions kafka_consumer/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from contextlib import nullcontext as does_not_raise

import pytest
from tests.common import LEGACY_CLIENT, assert_check_kafka, metrics
from tests.common import assert_check_kafka, metrics

from datadog_checks.base import ConfigurationError
from datadog_checks.dev.utils import get_metadata_metrics

pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')]
Expand Down Expand Up @@ -174,48 +173,3 @@ def test_config(

assert exception_msg in caplog.text
aggregator.assert_metrics_using_metadata(get_metadata_metrics())


@pytest.mark.parametrize(
'sasl_oauth_token_provider, expected_exception',
[
pytest.param(
{},
pytest.raises(AssertionError, match="sasl_oauth_token_provider required for OAUTHBEARER sasl"),
id="No sasl_oauth_token_provider",
),
pytest.param(
{'sasl_oauth_token_provider': {}},
pytest.raises(ConfigurationError, match="The `url` setting of `auth_token` reader is required"),
id="Empty sasl_oauth_token_provider, url missing",
),
pytest.param(
{'sasl_oauth_token_provider': {'url': 'http://fake.url'}},
pytest.raises(ConfigurationError, match="The `client_id` setting of `auth_token` reader is required"),
id="client_id missing",
),
pytest.param(
{'sasl_oauth_token_provider': {'url': 'http://fake.url', 'client_id': 'id'}},
pytest.raises(ConfigurationError, match="The `client_secret` setting of `auth_token` reader is required"),
id="client_secret missing",
),
pytest.param(
{'sasl_oauth_token_provider': {'url': 'http://fake.url', 'client_id': 'id', 'client_secret': 'secret'}},
pytest.raises(Exception, match="NoBrokersAvailable"), # Mock the expected response after library migration
id="valid config",
),
],
)
@pytest.mark.skipif(not LEGACY_CLIENT, reason='not implemented yet with confluent-kafka')
def test_oauth_config(sasl_oauth_token_provider, check, expected_exception, kafka_instance):
kafka_instance.update(
{
'monitor_unlisted_consumer_groups': True,
'security_protocol': 'SASL_PLAINTEXT',
'sasl_mechanism': 'OAUTHBEARER',
}
)
kafka_instance.update(sasl_oauth_token_provider)

with expected_exception:
check(kafka_instance).check(kafka_instance)
57 changes: 57 additions & 0 deletions kafka_consumer/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import logging
from contextlib import nullcontext as does_not_raise

import mock
import pytest
Expand Down Expand Up @@ -106,6 +107,62 @@ def test_invalid_connect_str(dd_run_check, check, aggregator, caplog, kafka_inst
aggregator.assert_metrics_using_metadata(get_metadata_metrics())


@pytest.mark.parametrize(
'sasl_oauth_token_provider, expected_exception, mocked_admin_client',
[
pytest.param(
{},
pytest.raises(Exception, match="sasl_oauth_token_provider required for OAUTHBEARER sasl"),
None,
id="No sasl_oauth_token_provider",
),
pytest.param(
{'sasl_oauth_token_provider': {}},
pytest.raises(Exception, match="The `url` setting of `auth_token` reader is required"),
None,
id="Empty sasl_oauth_token_provider, url missing",
),
pytest.param(
{'sasl_oauth_token_provider': {'url': 'http://fake.url'}},
pytest.raises(Exception, match="The `client_id` setting of `auth_token` reader is required"),
None,
id="client_id missing",
),
pytest.param(
{'sasl_oauth_token_provider': {'url': 'http://fake.url', 'client_id': 'id'}},
pytest.raises(Exception, match="The `client_secret` setting of `auth_token` reader is required"),
None,
id="client_secret missing",
),
pytest.param(
{'sasl_oauth_token_provider': {'url': 'http://fake.url', 'client_id': 'id', 'client_secret': 'secret'}},
does_not_raise(),
mock.MagicMock(),
id="valid config",
),
],
)
@pytest.mark.skipif(LEGACY_CLIENT, reason='kafka_python does not have confluent_kafka_client.AdminClient')
def test_oauth_config(
sasl_oauth_token_provider, expected_exception, mocked_admin_client, check, dd_run_check, kafka_instance
):
kafka_instance.update(
{
'monitor_unlisted_consumer_groups': True,
'security_protocol': 'SASL_PLAINTEXT',
'sasl_mechanism': 'OAUTHBEARER',
}
)
kafka_instance.update(sasl_oauth_token_provider)

with expected_exception:
with mock.patch(
'datadog_checks.kafka_consumer.client.confluent_kafka_client.AdminClient',
return_value=mocked_admin_client,
):
dd_run_check(check(kafka_instance))


# TODO: After these tests are finished and the revamp is complete,
# the tests should be refactored to be parameters instead of separate tests
@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.GenericKafkaClient")
Expand Down

0 comments on commit 154d085

Please sign in to comment.