Skip to content

Commit

Permalink
Refactor the tests (#13997)
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorentClarret authored Feb 17, 2023
1 parent cbc9499 commit d73b7cf
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 106 deletions.
7 changes: 5 additions & 2 deletions kafka_consumer/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import socket

from datadog_checks.dev import get_docker_hostname
from datadog_checks.dev.utils import get_metadata_metrics

HERE = os.path.dirname(os.path.abspath(__file__))
HOST = get_docker_hostname()
Expand All @@ -26,12 +27,14 @@ def assert_check_kafka(aggregator, consumer_groups):
for partition in partitions:
tags = [f"topic:{topic}", f"partition:{partition}"] + ['optional:tag1']
for mname in BROKER_METRICS:
aggregator.assert_metric(mname, tags=tags, at_least=1)
aggregator.assert_metric(mname, tags=tags, count=1)

for mname in CONSUMER_METRICS:
aggregator.assert_metric(
mname,
tags=tags + [f"consumer_group:{name}"],
at_least=1,
count=1,
)

aggregator.assert_all_metrics_covered()
aggregator.assert_metrics_using_metadata(get_metadata_metrics())
95 changes: 47 additions & 48 deletions kafka_consumer/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# (C) Datadog, Inc. 2018-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import copy
import os
import time

Expand All @@ -10,36 +11,34 @@
from packaging.version import parse as parse_version

from datadog_checks.dev import WaitFor, docker_run
from datadog_checks.kafka_consumer import KafkaCheck

from .common import DOCKER_IMAGE_PATH, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, TOPICS
from .runners import KConsumer, Producer

# Dummy TLS certs
CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'certificate')
cert = os.path.join(CERTIFICATE_DIR, 'cert.cert')
private_key = os.path.join(CERTIFICATE_DIR, 'server.pem')

def find_topics():
consumer = KafkaConsumer(bootstrap_servers=KAFKA_CONNECT_STR, request_timeout_ms=1000)
topics = consumer.topics()

# We expect to find 2 topics: `marvel` and `dc`
return len(topics) == 2


def initialize_topics():
consumer = KConsumer(TOPICS)

with Producer():
with consumer:
time.sleep(5)

E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'start_commands': [
'apt-get update',
'apt-get install -y build-essential',
],
}

@pytest.fixture(scope='session')
def mock_local_kafka_hosts_dns():
mapping = {'kafka1': ('127.0.0.1', 9092), 'kafka2': ('127.0.0.1', 9093)}
with mock_local(mapping):
yield
INSTANCE = {
'kafka_connect_str': KAFKA_CONNECT_STR,
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
}


@pytest.fixture(scope='session')
def dd_environment(mock_local_kafka_hosts_dns, e2e_instance):
def dd_environment(mock_local_kafka_hosts_dns):
"""
Start a kafka cluster and wait for it to be up and running.
"""
Expand All @@ -55,37 +54,22 @@ def dd_environment(mock_local_kafka_hosts_dns, e2e_instance):
},
):
yield {
'instances': [e2e_instance],
'instances': [INSTANCE],
'init_config': {'kafka_timeout': 30},
}, E2E_METADATA


E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'start_commands': [
'apt-get update',
'apt-get install -y build-essential',
],
}
@pytest.fixture
def check():
return lambda instance, init_config=None: KafkaCheck('kafka_consumer', init_config or {}, [instance])


@pytest.fixture(scope='session')
@pytest.fixture
def kafka_instance():
return {
'kafka_connect_str': KAFKA_CONNECT_STR,
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
}


# Dummy TLS certs
CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'certificate')
cert = os.path.join(CERTIFICATE_DIR, 'cert.cert')
private_key = os.path.join(CERTIFICATE_DIR, 'server.pem')
return copy.deepcopy(INSTANCE)


@pytest.fixture(scope='session')
@pytest.fixture
def kafka_instance_tls():
return {
'kafka_connect_str': KAFKA_CONNECT_STR,
Expand All @@ -100,12 +84,27 @@ def kafka_instance_tls():
}


@pytest.fixture(scope='session')
def e2e_instance(kafka_instance):
return kafka_instance


def _get_bootstrap_server_flag():
if KAFKA_VERSION != 'latest' and parse_version(KAFKA_VERSION) < parse_version('3.0'):
return '--zookeeper zookeeper:2181'
return '--bootstrap-server kafka1:19092'


def find_topics():
consumer = KafkaConsumer(bootstrap_servers=KAFKA_CONNECT_STR, request_timeout_ms=1000)
return consumer.topics() == set(TOPICS)


def initialize_topics():
consumer = KConsumer(TOPICS)

with Producer():
with consumer:
time.sleep(5)


@pytest.fixture(scope='session')
def mock_local_kafka_hosts_dns():
mapping = {'kafka1': ('127.0.0.1', 9092), 'kafka2': ('127.0.0.1', 9093)}
with mock_local(mapping):
yield
51 changes: 20 additions & 31 deletions kafka_consumer/tests/python_client/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,63 @@
# Licensed under a 3-clause BSD style license (see LICENSE)
import pytest

from datadog_checks.kafka_consumer import KafkaCheck
from datadog_checks.dev.utils import get_metadata_metrics

from ..common import KAFKA_CONNECT_STR, assert_check_kafka

pytestmark = [pytest.mark.integration]
pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')]


@pytest.mark.usefixtures('dd_environment')
def test_check_kafka(aggregator, kafka_instance, dd_run_check):
def test_check_kafka(aggregator, check, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
dd_run_check(check(kafka_instance))
assert_check_kafka(aggregator, kafka_instance['consumer_groups'])


@pytest.mark.usefixtures('dd_environment')
def test_can_send_event(aggregator, kafka_instance, dd_run_check):
def test_can_send_event(aggregator, check, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.send_event("test", "test", [], "test", "test")
aggregator.assert_event("test", exact_match=False, count=1)


@pytest.mark.usefixtures('dd_environment')
def test_check_kafka_metrics_limit(aggregator, kafka_instance, dd_run_check):
def test_check_kafka_metrics_limit(aggregator, check, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
kafka_consumer_check = KafkaCheck('kafka_consumer', {'max_partition_contexts': 1}, [kafka_instance])
dd_run_check(kafka_consumer_check)
dd_run_check(check(kafka_instance, {'max_partition_contexts': 1}))

assert len(aggregator._metrics) == 1


@pytest.mark.usefixtures('dd_environment')
def test_consumer_config_error(caplog, dd_run_check):
def test_consumer_config_error(caplog, check, dd_run_check):
instance = {'kafka_connect_str': KAFKA_CONNECT_STR, 'tags': ['optional:tag1']}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance])
kafka_consumer_check = check(instance)

dd_run_check(kafka_consumer_check, extract_message=True)
assert 'monitor_unlisted_consumer_groups is False' in caplog.text


@pytest.mark.usefixtures('dd_environment')
def test_no_topics(aggregator, kafka_instance, dd_run_check):
def test_no_topics(aggregator, check, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
dd_run_check(check(kafka_instance))

assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


@pytest.mark.usefixtures('dd_environment')
def test_no_partitions(aggregator, kafka_instance, dd_run_check):
def test_no_partitions(aggregator, check, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
dd_run_check(check(kafka_instance))

assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


@pytest.mark.usefixtures('dd_environment')
def test_version_metadata(datadog_agent, kafka_instance, dd_run_check):
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
def test_version_metadata(datadog_agent, check, kafka_instance, dd_run_check):
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.check_id = 'test:123'

kafka_client = kafka_consumer_check.client.create_kafka_admin_client()
Expand All @@ -91,19 +80,19 @@ def test_version_metadata(datadog_agent, kafka_instance, dd_run_check):
pytest.param(False, 2, ['topic:marvel'], id="Disabled"),
],
)
@pytest.mark.usefixtures('dd_environment')
def test_monitor_broker_highwatermarks(dd_run_check, aggregator, is_enabled, metric_count, topic_tags):
def test_monitor_broker_highwatermarks(dd_run_check, check, aggregator, is_enabled, metric_count, topic_tags):
instance = {
'kafka_connect_str': KAFKA_CONNECT_STR,
'consumer_groups': {'my_consumer': {'marvel': None}},
'monitor_all_broker_highwatermarks': is_enabled,
}
check = KafkaCheck('kafka_consumer', {}, [instance])
dd_run_check(check)
dd_run_check(check(instance))

# After refactor and library migration, write unit tests to assert expected metric values
aggregator.assert_metric('kafka.broker_offset', count=metric_count)

for tag in topic_tags:
aggregator.assert_metric_has_tag('kafka.broker_offset', tag, count=2)

aggregator.assert_metric_has_tag_prefix('kafka.broker_offset', 'partition', count=metric_count)
aggregator.assert_metrics_using_metadata(get_metadata_metrics())
Loading

0 comments on commit d73b7cf

Please sign in to comment.