Skip to content

Commit

Permalink
Create the topics from the python code instead of the docker image
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorentClarret committed Mar 13, 2023
1 parent c258d73 commit 24c5cf7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 29 deletions.
36 changes: 20 additions & 16 deletions kafka_consumer/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import pytest
from confluent_kafka.admin import AdminClient
from confluent_kafka.cimpl import NewTopic
from datadog_test_libs.utils.mock_dns import mock_local
from packaging.version import parse as parse_version

from datadog_checks.dev import WaitFor, docker_run
from datadog_checks.kafka_consumer import KafkaCheck

from .common import DOCKER_IMAGE_PATH, HERE, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, LEGACY_CLIENT, TOPICS
from .common import DOCKER_IMAGE_PATH, HERE, HOST_IP, KAFKA_CONNECT_STR, LEGACY_CLIENT, TOPICS
from .runners import KConsumer, Producer

# Dummy TLS certs
Expand Down Expand Up @@ -53,13 +53,15 @@ def dd_environment(mock_local_kafka_hosts_dns):
"""
with docker_run(
DOCKER_IMAGE_PATH,
conditions=[WaitFor(find_topics, attempts=60, wait=3), WaitFor(initialize_topics)],
conditions=[
WaitFor(create_topics, attempts=60, wait=3),
WaitFor(initialize_topics),
],
env_vars={
# Advertising the hostname doesn't work on docker:dind so we manually
# resolve the IP address. This seems to also work outside docker:dind
# so we got that goin for us.
'KAFKA_HOST': HOST_IP,
'BOOTSTRAP_SERVER_FLAG': _get_bootstrap_server_flag(),
},
):
yield {
Expand Down Expand Up @@ -94,20 +96,13 @@ def kafka_instance_tls():
}


def _get_bootstrap_server_flag():
if KAFKA_VERSION != 'latest' and parse_version(KAFKA_VERSION) < parse_version('3.0'):
return '--zookeeper zookeeper:2181'
return '--bootstrap-server kafka1:19092'
def create_topics():
client = _create_admin_client()

for topic in TOPICS:
client.create_topics([NewTopic(topic, 2, 1)], operation_timeout=1, request_timeout=1)

def find_topics():
client = AdminClient(
{
"bootstrap.servers": KAFKA_CONNECT_STR,
"socket.timeout.ms": 1000,
}
)
return set(client.list_topics().topics.keys()) == set(TOPICS)
return set(client.list_topics(timeout=1).topics.keys()) == set(TOPICS)


def initialize_topics():
Expand All @@ -123,3 +118,12 @@ def mock_local_kafka_hosts_dns():
mapping = {'kafka1': ('127.0.0.1', 9092), 'kafka2': ('127.0.0.1', 9093)}
with mock_local(mapping):
yield


def _create_admin_client():
return AdminClient(
{
"bootstrap.servers": KAFKA_CONNECT_STR,
"socket.timeout.ms": 1000,
}
)
13 changes: 0 additions & 13 deletions kafka_consumer/tests/docker/noauth/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,3 @@ services:
depends_on:
zookeeper:
condition: service_healthy

kafka-init:
image: docker.io/bitnami/kafka:${KAFKA_VERSION}
container_name: kafka-init
hostname: kafka-init
command: >
sh -c "sleep 10 &&./opt/bitnami/kafka/bin/kafka-topics.sh ${BOOTSTRAP_SERVER_FLAG} --create --topic marvel --if-not-exists --partitions 2 --replication-factor 1 &&
./opt/bitnami/kafka/bin/kafka-topics.sh ${BOOTSTRAP_SERVER_FLAG} --create --topic dc --if-not-exists --partitions 2 --replication-factor 1"
depends_on:
kafka1:
condition: service_started
kafka2:
condition: service_started

0 comments on commit 24c5cf7

Please sign in to comment.