Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the tests #13997

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions kafka_consumer/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import socket

from datadog_checks.dev import get_docker_hostname
from datadog_checks.dev.utils import get_metadata_metrics

HERE = os.path.dirname(os.path.abspath(__file__))
HOST = get_docker_hostname()
Expand All @@ -26,12 +27,14 @@ def assert_check_kafka(aggregator, consumer_groups):
for partition in partitions:
tags = [f"topic:{topic}", f"partition:{partition}"] + ['optional:tag1']
for mname in BROKER_METRICS:
aggregator.assert_metric(mname, tags=tags, at_least=1)
aggregator.assert_metric(mname, tags=tags, count=1)

for mname in CONSUMER_METRICS:
aggregator.assert_metric(
mname,
tags=tags + [f"consumer_group:{name}"],
at_least=1,
count=1,
)

aggregator.assert_all_metrics_covered()
aggregator.assert_metrics_using_metadata(get_metadata_metrics())
95 changes: 47 additions & 48 deletions kafka_consumer/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# (C) Datadog, Inc. 2018-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import copy
import os
import time

Expand All @@ -10,36 +11,34 @@
from packaging.version import parse as parse_version

from datadog_checks.dev import WaitFor, docker_run
from datadog_checks.kafka_consumer import KafkaCheck

from .common import DOCKER_IMAGE_PATH, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, TOPICS
from .runners import KConsumer, Producer

# Dummy TLS certs
CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'certificate')
cert = os.path.join(CERTIFICATE_DIR, 'cert.cert')
private_key = os.path.join(CERTIFICATE_DIR, 'server.pem')

def find_topics():
consumer = KafkaConsumer(bootstrap_servers=KAFKA_CONNECT_STR, request_timeout_ms=1000)
topics = consumer.topics()

# We expect to find 2 topics: `marvel` and `dc`
return len(topics) == 2


def initialize_topics():
consumer = KConsumer(TOPICS)

with Producer():
with consumer:
time.sleep(5)

E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'start_commands': [
'apt-get update',
'apt-get install -y build-essential',
],
}

@pytest.fixture(scope='session')
def mock_local_kafka_hosts_dns():
mapping = {'kafka1': ('127.0.0.1', 9092), 'kafka2': ('127.0.0.1', 9093)}
with mock_local(mapping):
yield
INSTANCE = {
'kafka_connect_str': KAFKA_CONNECT_STR,
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
}


@pytest.fixture(scope='session')
def dd_environment(mock_local_kafka_hosts_dns, e2e_instance):
def dd_environment(mock_local_kafka_hosts_dns):
"""
Start a kafka cluster and wait for it to be up and running.
"""
Expand All @@ -55,37 +54,22 @@ def dd_environment(mock_local_kafka_hosts_dns, e2e_instance):
},
):
yield {
'instances': [e2e_instance],
'instances': [INSTANCE],
'init_config': {'kafka_timeout': 30},
}, E2E_METADATA


E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'start_commands': [
'apt-get update',
'apt-get install -y build-essential',
],
}
@pytest.fixture
def check():
return lambda instance, init_config=None: KafkaCheck('kafka_consumer', init_config or {}, [instance])


@pytest.fixture(scope='session')
@pytest.fixture
def kafka_instance():
return {
'kafka_connect_str': KAFKA_CONNECT_STR,
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
}


# Dummy TLS certs
CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'certificate')
cert = os.path.join(CERTIFICATE_DIR, 'cert.cert')
private_key = os.path.join(CERTIFICATE_DIR, 'server.pem')
return copy.deepcopy(INSTANCE)


@pytest.fixture(scope='session')
@pytest.fixture
def kafka_instance_tls():
return {
'kafka_connect_str': KAFKA_CONNECT_STR,
Expand All @@ -100,12 +84,27 @@ def kafka_instance_tls():
}


@pytest.fixture(scope='session')
def e2e_instance(kafka_instance):
return kafka_instance


def _get_bootstrap_server_flag():
if KAFKA_VERSION != 'latest' and parse_version(KAFKA_VERSION) < parse_version('3.0'):
return '--zookeeper zookeeper:2181'
return '--bootstrap-server kafka1:19092'


def find_topics():
consumer = KafkaConsumer(bootstrap_servers=KAFKA_CONNECT_STR, request_timeout_ms=1000)
return consumer.topics() == set(TOPICS)


def initialize_topics():
consumer = KConsumer(TOPICS)

with Producer():
with consumer:
time.sleep(5)


@pytest.fixture(scope='session')
def mock_local_kafka_hosts_dns():
mapping = {'kafka1': ('127.0.0.1', 9092), 'kafka2': ('127.0.0.1', 9093)}
with mock_local(mapping):
yield
51 changes: 20 additions & 31 deletions kafka_consumer/tests/python_client/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,63 @@
# Licensed under Simplified BSD License (see LICENSE)
import pytest

from datadog_checks.kafka_consumer import KafkaCheck
from datadog_checks.dev.utils import get_metadata_metrics

from ..common import KAFKA_CONNECT_STR, assert_check_kafka

pytestmark = [pytest.mark.integration]
pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')]


@pytest.mark.usefixtures('dd_environment')
def test_check_kafka(aggregator, kafka_instance, dd_run_check):
def test_check_kafka(aggregator, check, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
dd_run_check(check(kafka_instance))
assert_check_kafka(aggregator, kafka_instance['consumer_groups'])


@pytest.mark.usefixtures('dd_environment')
def test_can_send_event(aggregator, kafka_instance, dd_run_check):
def test_can_send_event(aggregator, check, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.send_event("test", "test", [], "test", "test")
aggregator.assert_event("test", exact_match=False, count=1)


@pytest.mark.usefixtures('dd_environment')
def test_check_kafka_metrics_limit(aggregator, kafka_instance, dd_run_check):
def test_check_kafka_metrics_limit(aggregator, check, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
kafka_consumer_check = KafkaCheck('kafka_consumer', {'max_partition_contexts': 1}, [kafka_instance])
dd_run_check(kafka_consumer_check)
dd_run_check(check(kafka_instance, {'max_partition_contexts': 1}))

assert len(aggregator._metrics) == 1


@pytest.mark.usefixtures('dd_environment')
def test_consumer_config_error(caplog, dd_run_check):
def test_consumer_config_error(caplog, check, dd_run_check):
instance = {'kafka_connect_str': KAFKA_CONNECT_STR, 'tags': ['optional:tag1']}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance])
kafka_consumer_check = check(instance)

dd_run_check(kafka_consumer_check, extract_message=True)
assert 'monitor_unlisted_consumer_groups is False' in caplog.text


@pytest.mark.usefixtures('dd_environment')
def test_no_topics(aggregator, kafka_instance, dd_run_check):
def test_no_topics(aggregator, check, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
dd_run_check(check(kafka_instance))

assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


@pytest.mark.usefixtures('dd_environment')
def test_no_partitions(aggregator, kafka_instance, dd_run_check):
def test_no_partitions(aggregator, check, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
dd_run_check(check(kafka_instance))

assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


@pytest.mark.usefixtures('dd_environment')
def test_version_metadata(datadog_agent, kafka_instance, dd_run_check):
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
def test_version_metadata(datadog_agent, check, kafka_instance, dd_run_check):
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.check_id = 'test:123'

kafka_client = kafka_consumer_check.client.create_kafka_admin_client()
Expand All @@ -91,19 +80,19 @@ def test_version_metadata(datadog_agent, kafka_instance, dd_run_check):
pytest.param(False, 2, ['topic:marvel'], id="Disabled"),
],
)
@pytest.mark.usefixtures('dd_environment')
def test_monitor_broker_highwatermarks(dd_run_check, aggregator, is_enabled, metric_count, topic_tags):
def test_monitor_broker_highwatermarks(dd_run_check, check, aggregator, is_enabled, metric_count, topic_tags):
instance = {
'kafka_connect_str': KAFKA_CONNECT_STR,
'consumer_groups': {'my_consumer': {'marvel': None}},
'monitor_all_broker_highwatermarks': is_enabled,
}
check = KafkaCheck('kafka_consumer', {}, [instance])
dd_run_check(check)
dd_run_check(check(instance))

# After refactor and library migration, write unit tests to assert expected metric values
aggregator.assert_metric('kafka.broker_offset', count=metric_count)

for tag in topic_tags:
aggregator.assert_metric_has_tag('kafka.broker_offset', tag, count=2)

aggregator.assert_metric_has_tag_prefix('kafka.broker_offset', 'partition', count=metric_count)
aggregator.assert_metrics_using_metadata(get_metadata_metrics())
Loading