From d73b7cf4d65ee4cfa4ef77f7e787b45361e33c21 Mon Sep 17 00:00:00 2001 From: Florent Clarret Date: Fri, 17 Feb 2023 18:04:28 +0000 Subject: [PATCH] Refactor the tests (#13997) --- kafka_consumer/tests/common.py | 7 +- kafka_consumer/tests/conftest.py | 95 +++++++++---------- .../tests/python_client/test_integration.py | 51 ++++------ .../tests/python_client/test_unit.py | 45 ++++----- 4 files changed, 92 insertions(+), 106 deletions(-) diff --git a/kafka_consumer/tests/common.py b/kafka_consumer/tests/common.py index e5347ed5af08d..9d7621498e7bd 100644 --- a/kafka_consumer/tests/common.py +++ b/kafka_consumer/tests/common.py @@ -5,6 +5,7 @@ import socket from datadog_checks.dev import get_docker_hostname +from datadog_checks.dev.utils import get_metadata_metrics HERE = os.path.dirname(os.path.abspath(__file__)) HOST = get_docker_hostname() @@ -26,12 +27,14 @@ def assert_check_kafka(aggregator, consumer_groups): for partition in partitions: tags = [f"topic:{topic}", f"partition:{partition}"] + ['optional:tag1'] for mname in BROKER_METRICS: - aggregator.assert_metric(mname, tags=tags, at_least=1) + aggregator.assert_metric(mname, tags=tags, count=1) + for mname in CONSUMER_METRICS: aggregator.assert_metric( mname, tags=tags + [f"consumer_group:{name}"], - at_least=1, + count=1, ) aggregator.assert_all_metrics_covered() + aggregator.assert_metrics_using_metadata(get_metadata_metrics()) diff --git a/kafka_consumer/tests/conftest.py b/kafka_consumer/tests/conftest.py index 3b05006cca840..ba47b4689fc67 100644 --- a/kafka_consumer/tests/conftest.py +++ b/kafka_consumer/tests/conftest.py @@ -1,6 +1,7 @@ # (C) Datadog, Inc. 2018-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import copy import os import time @@ -10,36 +11,34 @@ from packaging.version import parse as parse_version from datadog_checks.dev import WaitFor, docker_run +from datadog_checks.kafka_consumer import KafkaCheck from .common import DOCKER_IMAGE_PATH, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, TOPICS from .runners import KConsumer, Producer +# Dummy TLS certs +CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'certificate') +cert = os.path.join(CERTIFICATE_DIR, 'cert.cert') +private_key = os.path.join(CERTIFICATE_DIR, 'server.pem') -def find_topics(): - consumer = KafkaConsumer(bootstrap_servers=KAFKA_CONNECT_STR, request_timeout_ms=1000) - topics = consumer.topics() - - # We expect to find 2 topics: `marvel` and `dc` - return len(topics) == 2 - - -def initialize_topics(): - consumer = KConsumer(TOPICS) - - with Producer(): - with consumer: - time.sleep(5) - +E2E_METADATA = { + 'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')], + 'start_commands': [ + 'apt-get update', + 'apt-get install -y build-essential', + ], +} -@pytest.fixture(scope='session') -def mock_local_kafka_hosts_dns(): - mapping = {'kafka1': ('127.0.0.1', 9092), 'kafka2': ('127.0.0.1', 9093)} - with mock_local(mapping): - yield +INSTANCE = { + 'kafka_connect_str': KAFKA_CONNECT_STR, + 'tags': ['optional:tag1'], + 'consumer_groups': {'my_consumer': {'marvel': [0]}}, + 'broker_requests_batch_size': 1, +} @pytest.fixture(scope='session') -def dd_environment(mock_local_kafka_hosts_dns, e2e_instance): +def dd_environment(mock_local_kafka_hosts_dns): """ Start a kafka cluster and wait for it to be up and running. """ @@ -55,37 +54,22 @@ def dd_environment(mock_local_kafka_hosts_dns, e2e_instance): }, ): yield { - 'instances': [e2e_instance], + 'instances': [INSTANCE], 'init_config': {'kafka_timeout': 30}, }, E2E_METADATA -E2E_METADATA = { - 'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')], - 'start_commands': [ - 'apt-get update', - 'apt-get install -y build-essential', - ], -} +@pytest.fixture +def check(): + return lambda instance, init_config=None: KafkaCheck('kafka_consumer', init_config or {}, [instance]) -@pytest.fixture(scope='session') +@pytest.fixture def kafka_instance(): - return { - 'kafka_connect_str': KAFKA_CONNECT_STR, - 'tags': ['optional:tag1'], - 'consumer_groups': {'my_consumer': {'marvel': [0]}}, - 'broker_requests_batch_size': 1, - } - - -# Dummy TLS certs -CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'certificate') -cert = os.path.join(CERTIFICATE_DIR, 'cert.cert') -private_key = os.path.join(CERTIFICATE_DIR, 'server.pem') + return copy.deepcopy(INSTANCE) -@pytest.fixture(scope='session') +@pytest.fixture def kafka_instance_tls(): return { 'kafka_connect_str': KAFKA_CONNECT_STR, @@ -100,12 +84,27 @@ def kafka_instance_tls(): } -@pytest.fixture(scope='session') -def e2e_instance(kafka_instance): - return kafka_instance - - def _get_bootstrap_server_flag(): if KAFKA_VERSION != 'latest' and parse_version(KAFKA_VERSION) < parse_version('3.0'): return '--zookeeper zookeeper:2181' return '--bootstrap-server kafka1:19092' + + +def find_topics(): + consumer = KafkaConsumer(bootstrap_servers=KAFKA_CONNECT_STR, request_timeout_ms=1000) + return consumer.topics() == set(TOPICS) + + +def initialize_topics(): + consumer = KConsumer(TOPICS) + + with Producer(): + with consumer: + time.sleep(5) + + +@pytest.fixture(scope='session') +def mock_local_kafka_hosts_dns(): + mapping = {'kafka1': ('127.0.0.1', 9092), 'kafka2': ('127.0.0.1', 9093)} + with mock_local(mapping): + yield diff --git a/kafka_consumer/tests/python_client/test_integration.py b/kafka_consumer/tests/python_client/test_integration.py index 3b706e15bfeb3..9f7ea820cdc07 100644 --- a/kafka_consumer/tests/python_client/test_integration.py +++ b/kafka_consumer/tests/python_client/test_integration.py @@ -3,74 +3,63 @@ # Licensed under a 3-clause BSD style license (see LICENSE) import pytest -from datadog_checks.kafka_consumer import KafkaCheck +from datadog_checks.dev.utils import get_metadata_metrics from ..common import KAFKA_CONNECT_STR, assert_check_kafka -pytestmark = [pytest.mark.integration] +pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] -@pytest.mark.usefixtures('dd_environment') -def test_check_kafka(aggregator, kafka_instance, dd_run_check): +def test_check_kafka(aggregator, check, kafka_instance, dd_run_check): """ Testing Kafka_consumer check. """ - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + dd_run_check(check(kafka_instance)) assert_check_kafka(aggregator, kafka_instance['consumer_groups']) -@pytest.mark.usefixtures('dd_environment') -def test_can_send_event(aggregator, kafka_instance, dd_run_check): +def test_can_send_event(aggregator, check, kafka_instance, dd_run_check): """ Testing Kafka_consumer check. """ - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + kafka_consumer_check = check(kafka_instance) kafka_consumer_check.send_event("test", "test", [], "test", "test") aggregator.assert_event("test", exact_match=False, count=1) -@pytest.mark.usefixtures('dd_environment') -def test_check_kafka_metrics_limit(aggregator, kafka_instance, dd_run_check): +def test_check_kafka_metrics_limit(aggregator, check, kafka_instance, dd_run_check): """ Testing Kafka_consumer check. """ - kafka_consumer_check = KafkaCheck('kafka_consumer', {'max_partition_contexts': 1}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + dd_run_check(check(kafka_instance, {'max_partition_contexts': 1})) assert len(aggregator._metrics) == 1 -@pytest.mark.usefixtures('dd_environment') -def test_consumer_config_error(caplog, dd_run_check): +def test_consumer_config_error(caplog, check, dd_run_check): instance = {'kafka_connect_str': KAFKA_CONNECT_STR, 'tags': ['optional:tag1']} - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance]) + kafka_consumer_check = check(instance) dd_run_check(kafka_consumer_check, extract_message=True) assert 'monitor_unlisted_consumer_groups is False' in caplog.text -@pytest.mark.usefixtures('dd_environment') -def test_no_topics(aggregator, kafka_instance, dd_run_check): +def test_no_topics(aggregator, check, kafka_instance, dd_run_check): kafka_instance['consumer_groups'] = {'my_consumer': {}} - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + dd_run_check(check(kafka_instance)) assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) -@pytest.mark.usefixtures('dd_environment') -def test_no_partitions(aggregator, kafka_instance, dd_run_check): +def test_no_partitions(aggregator, check, kafka_instance, dd_run_check): kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}} - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + dd_run_check(check(kafka_instance)) assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) -@pytest.mark.usefixtures('dd_environment') -def test_version_metadata(datadog_agent, kafka_instance, dd_run_check): - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) +def test_version_metadata(datadog_agent, check, kafka_instance, dd_run_check): + kafka_consumer_check = check(kafka_instance) kafka_consumer_check.check_id = 'test:123' kafka_client = kafka_consumer_check.client.create_kafka_admin_client() @@ -91,19 +80,19 @@ def test_version_metadata(datadog_agent, kafka_instance, dd_run_check): pytest.param(False, 2, ['topic:marvel'], id="Disabled"), ], ) -@pytest.mark.usefixtures('dd_environment') -def test_monitor_broker_highwatermarks(dd_run_check, aggregator, is_enabled, metric_count, topic_tags): +def test_monitor_broker_highwatermarks(dd_run_check, check, aggregator, is_enabled, metric_count, topic_tags): instance = { 'kafka_connect_str': KAFKA_CONNECT_STR, 'consumer_groups': {'my_consumer': {'marvel': None}}, 'monitor_all_broker_highwatermarks': is_enabled, } - check = KafkaCheck('kafka_consumer', {}, [instance]) - dd_run_check(check) + dd_run_check(check(instance)) # After refactor and library migration, write unit tests to assert expected metric values aggregator.assert_metric('kafka.broker_offset', count=metric_count) + for tag in topic_tags: aggregator.assert_metric_has_tag('kafka.broker_offset', tag, count=2) aggregator.assert_metric_has_tag_prefix('kafka.broker_offset', 'partition', count=metric_count) + aggregator.assert_metrics_using_metadata(get_metadata_metrics()) diff --git a/kafka_consumer/tests/python_client/test_unit.py b/kafka_consumer/tests/python_client/test_unit.py index 6665e0e6c7bf1..71252fb9c725c 100644 --- a/kafka_consumer/tests/python_client/test_unit.py +++ b/kafka_consumer/tests/python_client/test_unit.py @@ -8,7 +8,7 @@ import pytest from datadog_checks.base import ConfigurationError -from datadog_checks.kafka_consumer import KafkaCheck +from datadog_checks.dev.utils import get_metadata_metrics from datadog_checks.kafka_consumer.client.kafka_python_client import OAuthTokenProvider from ..common import KAFKA_CONNECT_STR, metrics @@ -16,19 +16,18 @@ pytestmark = [pytest.mark.unit] -def test_gssapi(kafka_instance, dd_run_check): +def test_gssapi(kafka_instance, dd_run_check, check): instance = copy.deepcopy(kafka_instance) instance['sasl_mechanism'] = 'GSSAPI' instance['security_protocol'] = 'SASL_PLAINTEXT' instance['sasl_kerberos_service_name'] = 'kafka' - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance]) # assert the check doesn't fail with: # Exception: Could not find main GSSAPI shared library. with pytest.raises(Exception, match='check_version'): - dd_run_check(kafka_consumer_check) + dd_run_check(check(instance)) -def test_tls_config_ok(kafka_instance_tls): +def test_tls_config_ok(check, kafka_instance_tls): with mock.patch('datadog_checks.base.utils.tls.ssl') as ssl: with mock.patch('kafka.KafkaAdminClient') as kafka_admin_client: @@ -39,7 +38,7 @@ def test_tls_config_ok(kafka_instance_tls): tls_context = mock.MagicMock() ssl.SSLContext.return_value = tls_context - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance_tls]) + kafka_consumer_check = check(kafka_instance_tls) kafka_consumer_check.client._create_kafka_client(clazz=kafka_admin_client) assert tls_context.check_hostname is True @@ -78,7 +77,7 @@ def test_tls_config_ok(kafka_instance_tls): ), ], ) -def test_oauth_config(sasl_oauth_token_provider, expected_exception): +def test_oauth_config(sasl_oauth_token_provider, check, expected_exception): instance = { 'kafka_connect_str': KAFKA_CONNECT_STR, 'monitor_unlisted_consumer_groups': True, @@ -86,26 +85,24 @@ def test_oauth_config(sasl_oauth_token_provider, expected_exception): 'sasl_mechanism': 'OAUTHBEARER', } instance.update(sasl_oauth_token_provider) - check = KafkaCheck('kafka_consumer', {}, [instance]) with expected_exception: - check.check(instance) + check(instance).check(instance) -def test_oauth_token_client_config(kafka_instance): - instance = copy.deepcopy(kafka_instance) - instance['kafka_client_api_version'] = "3.3.2" - instance['security_protocol'] = "SASL_PLAINTEXT" - instance['sasl_mechanism'] = "OAUTHBEARER" - instance['sasl_oauth_token_provider'] = { +def test_oauth_token_client_config(check, kafka_instance): + kafka_instance['kafka_client_api_version'] = "3.3.2" + kafka_instance['security_protocol'] = "SASL_PLAINTEXT" + kafka_instance['sasl_mechanism'] = "OAUTHBEARER" + kafka_instance['sasl_oauth_token_provider'] = { "url": "http://fake.url", "client_id": "id", "client_secret": "secret", } with mock.patch('kafka.KafkaAdminClient') as kafka_admin_client: - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance]) - _ = kafka_consumer_check.client._create_kafka_client(clazz=kafka_admin_client) + kafka_consumer_check = check(kafka_instance) + kafka_consumer_check.client._create_kafka_client(clazz=kafka_admin_client) params = kafka_admin_client.call_args_list[0].kwargs assert params['security_protocol'] == 'SASL_PLAINTEXT' @@ -124,12 +121,11 @@ def test_oauth_token_client_config(kafka_instance): ), ], ) -def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance): +def test_tls_config_legacy(extra_config, expected_http_kwargs, check, kafka_instance): instance = kafka_instance instance.update(extra_config) - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance]) - + kafka_consumer_check = check(instance) kafka_consumer_check.get_tls_context() actual_options = { k: v for k, v in kafka_consumer_check._tls_context_wrapper.config.items() if k in expected_http_kwargs @@ -137,8 +133,6 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance): assert expected_http_kwargs == actual_options -# Get rid of dd_environment when we refactor and fix expected behaviors in the check -@pytest.mark.usefixtures('dd_environment') @pytest.mark.parametrize( 'instance, expected_exception, metric_count', [ @@ -208,10 +202,11 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance): ), ], ) -def test_config(dd_run_check, instance, aggregator, expected_exception, metric_count): - check = KafkaCheck('kafka_consumer', {}, [instance]) +def test_config(check, instance, aggregator, expected_exception, metric_count): with expected_exception: - check.check(instance) + check(instance).check(instance) for m in metrics: aggregator.assert_metric(m, count=metric_count) + + aggregator.assert_metrics_using_metadata(get_metadata_metrics())