diff --git a/kafka_consumer/tests/test_integration.py b/kafka_consumer/tests/test_integration.py index 2793a361bd5f13..201250042b3154 100644 --- a/kafka_consumer/tests/test_integration.py +++ b/kafka_consumer/tests/test_integration.py @@ -5,8 +5,9 @@ from contextlib import nullcontext as does_not_raise import pytest -from tests.common import assert_check_kafka, metrics +from tests.common import LEGACY_CLIENT, assert_check_kafka, metrics +from datadog_checks.base import ConfigurationError from datadog_checks.dev.utils import get_metadata_metrics pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] @@ -168,3 +169,48 @@ def test_config( assert exception_msg in caplog.text aggregator.assert_metrics_using_metadata(get_metadata_metrics()) + + +@pytest.mark.parametrize( + 'sasl_oauth_token_provider, expected_exception', + [ + pytest.param( + {}, + pytest.raises(AssertionError, match="sasl_oauth_token_provider required for OAUTHBEARER sasl"), + id="No sasl_oauth_token_provider", + ), + pytest.param( + {'sasl_oauth_token_provider': {}}, + pytest.raises(ConfigurationError, match="The `url` setting of `auth_token` reader is required"), + id="Empty sasl_oauth_token_provider, url missing", + ), + pytest.param( + {'sasl_oauth_token_provider': {'url': 'http://fake.url'}}, + pytest.raises(ConfigurationError, match="The `client_id` setting of `auth_token` reader is required"), + id="client_id missing", + ), + pytest.param( + {'sasl_oauth_token_provider': {'url': 'http://fake.url', 'client_id': 'id'}}, + pytest.raises(ConfigurationError, match="The `client_secret` setting of `auth_token` reader is required"), + id="client_secret missing", + ), + pytest.param( + {'sasl_oauth_token_provider': {'url': 'http://fake.url', 'client_id': 'id', 'client_secret': 'secret'}}, + pytest.raises(Exception, match="NoBrokersAvailable"), # Mock the expected response after library migration + id="valid config", + ), + ], +) +@pytest.mark.skipif(not LEGACY_CLIENT, reason='not implemented yet with confluent-kafka') +def test_oauth_config(sasl_oauth_token_provider, check, expected_exception, kafka_instance): + kafka_instance.update( + { + 'monitor_unlisted_consumer_groups': True, + 'security_protocol': 'SASL_PLAINTEXT', + 'sasl_mechanism': 'OAUTHBEARER', + } + ) + kafka_instance.update(sasl_oauth_token_provider) + + with expected_exception: + check(kafka_instance).check(kafka_instance) diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 77cb49b701ac4b..5addafc7af1f4a 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -5,9 +5,8 @@ import mock import pytest -from tests.common import KAFKA_CONNECT_STR, LEGACY_CLIENT, metrics +from tests.common import LEGACY_CLIENT, metrics -from datadog_checks.base import ConfigurationError from datadog_checks.dev.utils import get_metadata_metrics from datadog_checks.kafka_consumer import KafkaCheck from datadog_checks.kafka_consumer.client.kafka_python_client import OAuthTokenProvider @@ -39,51 +38,6 @@ def test_tls_config_ok(check, kafka_instance_tls): assert kafka_consumer_check.client._tls_context.tls_cert is not None -@pytest.mark.parametrize( - 'sasl_oauth_token_provider, expected_exception', - [ - pytest.param( - {}, - pytest.raises(AssertionError, match="sasl_oauth_token_provider required for OAUTHBEARER sasl"), - id="No sasl_oauth_token_provider", - ), - pytest.param( - {'sasl_oauth_token_provider': {}}, - pytest.raises(ConfigurationError, match="The `url` setting of `auth_token` reader is required"), - id="Empty sasl_oauth_token_provider, url missing", - ), - pytest.param( - {'sasl_oauth_token_provider': {'url': 'http://fake.url'}}, - pytest.raises(ConfigurationError, match="The `client_id` setting of `auth_token` reader is required"), - id="client_id missing", - ), - pytest.param( - {'sasl_oauth_token_provider': {'url': 'http://fake.url', 'client_id': 'id'}}, - pytest.raises(ConfigurationError, match="The `client_secret` setting of `auth_token` reader is required"), - id="client_secret missing", - ), - pytest.param( - {'sasl_oauth_token_provider': {'url': 'http://fake.url', 'client_id': 'id', 'client_secret': 'secret'}}, - pytest.raises(Exception, match="NoBrokersAvailable"), # Mock the expected response after library migration - id="valid config", - ), - ], -) -@pytest.mark.skipif(not LEGACY_CLIENT, reason='not implemented yet with confluent-kafka') -def test_oauth_config(sasl_oauth_token_provider, check, expected_exception, kafka_instance): - kafka_instance.update( - { - 'monitor_unlisted_consumer_groups': True, - 'security_protocol': 'SASL_PLAINTEXT', - 'sasl_mechanism': 'OAUTHBEARER', - } - ) - kafka_instance.update(sasl_oauth_token_provider) - - with expected_exception: - check(kafka_instance).check(kafka_instance) - - @pytest.mark.skip(reason='Add a test that not only check the parameter but also run the check') def test_oauth_token_client_config(check, kafka_instance): kafka_instance['kafka_client_api_version'] = "3.3.2"