Skip to content

Commit

Permalink
Add a tls e2e env
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorentClarret committed Mar 10, 2023
1 parent 34605fc commit 869861e
Show file tree
Hide file tree
Showing 18 changed files with 497 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,24 @@ class ConfluentKafkaClient(KafkaClient):
@property
def kafka_client(self):
if self._kafka_client is None:
self._kafka_client = AdminClient(
{
"bootstrap.servers": self.config._kafka_connect_str,
"socket.timeout.ms": self.config._request_timeout_ms,
"client.id": "dd-agent",
}
)
config = {
"bootstrap.servers": self.config._kafka_connect_str,
"socket.timeout.ms": self.config._request_timeout_ms,
}

if self.config._use_tls:
config.update(
{
"security.protocol": "ssl",
"ssl.ca.location": self.config._tls_ca_cert,
"ssl.certificate.location": self.config._tls_cert,
"ssl.key.location": self.config._tls_private_key,
"ssl.key.password": self.config._tls_private_key_password,
}
)

self._kafka_client = AdminClient(config)

return self._kafka_client

def create_kafka_admin_client(self):
Expand Down
5 changes: 5 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def __init__(self, init_config, instance) -> None:
self._sasl_kerberos_service_name = instance.get('sasl_kerberos_service_name', 'kafka')
self._sasl_kerberos_domain_name = instance.get('sasl_kerberos_domain_name')
self._sasl_oauth_token_provider = instance.get('sasl_oauth_token_provider')
self._use_tls = instance.get('use_tls', False)
self._tls_ca_cert = instance.get("tls_ca_cert")
self._tls_cert = instance.get("tls_cert")
self._tls_private_key = instance.get("tls_private_key")
self._tls_private_key_password = instance.get("tls_private_key_password")
self.use_legacy_client = is_affirmative(instance.get('use_legacy_client', False))

def validate_config(self):
Expand Down
8 changes: 4 additions & 4 deletions kafka_consumer/hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ impl = ["legacy"]
python = ["3.8"]
version = ["1.1", "2.3", "3.3"]

#[[envs.default.matrix]]
#python = ["3.8"]
#version = ["3.3"]
#auth = ["ssl"]
[[envs.default.matrix]]
python = ["3.8"]
version = ["3.3"]
auth = ["ssl"]

[envs.default.overrides]
matrix.version.e2e-env = { value = true, if = ["3.3"] }
Expand Down
20 changes: 0 additions & 20 deletions kafka_consumer/tests/certificate/cert.cert

This file was deleted.

48 changes: 0 additions & 48 deletions kafka_consumer/tests/certificate/server.pem

This file was deleted.

94 changes: 47 additions & 47 deletions kafka_consumer/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@
import pytest
from confluent_kafka.admin import AdminClient
from confluent_kafka.cimpl import NewTopic
from datadog_test_libs.utils.mock_dns import mock_local

from datadog_checks.dev import WaitFor, docker_run
from datadog_checks.kafka_consumer import KafkaCheck

from .common import DOCKER_IMAGE_PATH, HERE, HOST_IP, KAFKA_CONNECT_STR, LEGACY_CLIENT, TOPICS
from .common import AUTHENTICATION, DOCKER_IMAGE_PATH, HERE, HOST_IP, KAFKA_CONNECT_STR, LEGACY_CLIENT, TOPICS
from .runners import KConsumer, Producer

# Dummy TLS certs
CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'certificate')
cert = os.path.join(CERTIFICATE_DIR, 'cert.cert')
private_key = os.path.join(CERTIFICATE_DIR, 'server.pem')

CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'docker', 'ssl', 'certificate')
ROOT_CERTIFICATE = os.path.join(CERTIFICATE_DIR, 'caroot.pem')
CERTIFICATE = os.path.join(CERTIFICATE_DIR, 'cert.pem')
PRIVATE_KEY = os.path.join(CERTIFICATE_DIR, 'key.pem')
PRIVATE_KEY_PASSWORD = 'secret'

if LEGACY_CLIENT:
E2E_METADATA = {
Expand All @@ -37,13 +36,28 @@
'start_commands': ['bash /tmp/start_commands.sh'],
}

INSTANCE = {
'kafka_connect_str': KAFKA_CONNECT_STR,
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
'use_legacy_client': LEGACY_CLIENT,
}
if AUTHENTICATION == "ssl":
INSTANCE = {
'kafka_connect_str': "localhost:9092",
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
'use_tls': True,
'tls_validate_hostname': True,
'tls_cert': CERTIFICATE,
'tls_private_key': PRIVATE_KEY,
'tls_private_key_password': PRIVATE_KEY_PASSWORD,
'tls_ca_cert': ROOT_CERTIFICATE,
'use_legacy_client': LEGACY_CLIENT,
}
else:
INSTANCE = {
'kafka_connect_str': KAFKA_CONNECT_STR,
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
'use_legacy_client': LEGACY_CLIENT,
}


@pytest.fixture(scope='session')
Expand Down Expand Up @@ -80,22 +94,6 @@ def kafka_instance():
return copy.deepcopy(INSTANCE)


@pytest.fixture
def kafka_instance_tls():
return {
'kafka_connect_str': KAFKA_CONNECT_STR,
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
'use_tls': True,
'tls_validate_hostname': True,
'tls_cert': cert,
'tls_private_key': private_key,
'tls_ca_cert': CERTIFICATE_DIR,
'use_legacy_client': LEGACY_CLIENT,
}


def create_topics():
client = _create_admin_client()

Expand All @@ -106,24 +104,26 @@ def create_topics():


def initialize_topics():
consumer = KConsumer(TOPICS)

with Producer():
with consumer:
with Producer(INSTANCE):
with KConsumer(INSTANCE, TOPICS):
time.sleep(5)


@pytest.fixture(scope='session')
def mock_local_kafka_hosts_dns():
mapping = {'kafka1': ('127.0.0.1', 9092), 'kafka2': ('127.0.0.1', 9093)}
with mock_local(mapping):
yield


def _create_admin_client():
return AdminClient(
{
"bootstrap.servers": KAFKA_CONNECT_STR,
"socket.timeout.ms": 1000,
}
)
config = {
"bootstrap.servers": INSTANCE['kafka_connect_str'],
"socket.timeout.ms": 1000,
}

if INSTANCE.get('use_tls', False):
config.update(
{
"security.protocol": "ssl",
"ssl.ca.location": INSTANCE.get("tls_ca_cert"),
"ssl.certificate.location": INSTANCE.get("tls_cert"),
"ssl.key.location": INSTANCE.get("tls_private_key"),
"ssl.key.password": INSTANCE.get("tls_private_key_password"),
}
)

return AdminClient(config)
18 changes: 18 additions & 0 deletions kafka_consumer/tests/docker/ssl/certificate/caroot.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC8DCCAdgCCQDCx8Yc08K3STANBgkqhkiG9w0BAQsFADA6MQswCQYDVQQGEwJV
UzEVMBMGA1UECgwMU2VydmljZVVzZXJzMRQwEgYDVQQDDAtrYWZrYS1hZG1pbjAe
Fw0yMzAzMTAxMDA4MzlaFw0zMzAzMDcxMDA4MzlaMDoxCzAJBgNVBAYTAlVTMRUw
EwYDVQQKDAxTZXJ2aWNlVXNlcnMxFDASBgNVBAMMC2thZmthLWFkbWluMIIBIjAN
BgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwUp4/b9wEZSWqXvpzFOwF2xRPhqh
mmh2YV2vLDlsTXVHzQwLndJHOeoao4StNKriSa/QFBJ6gPD9VjeBNntkILr1AcPD
s9uQUQjiFI9xd5luR75/lopCgtvogWiEo48Sv4WwtOxSr1mYzPjNQ24ZlGvJTvB1
SxKd+T7n7g5IWyzHdDhS0AfA5aU4Ap/L5u8raJLQjLepKPiKSznvmktfY2zotGkK
cWR7SUpChTp1XxRNme/rBTWxP52VlT2oM+7q0iYBGDDaUVUBwOwRN7bhVnd/IPOA
DeAJjyRw/fAAdz8DGS9/QeQeWDK+0nQbOiL+TfjZ54LC127nBg8GUhQg0QIDAQAB
MA0GCSqGSIb3DQEBCwUAA4IBAQAxG64B67IerDlicCRO8ZkMyxo06BT6I6N5Vapo
ZLtNxkTUJ43t4D9f1lFLxrtmz1jDwAf9uYlmTC7FKfWwb5SBlNyHJWkZjnxk2OCt
+zX/X0hlqt8T3R0kSgMFKwHxZ7+LMmuVCz8rPMW+WqSloBTz4uP9hWplVPZaTUC8
8PNw7RdlVgghmwpR8D1mWYHkCfXKqNjez2+6h4i9NOzbUSldXQDMAgFYXftYmR3z
UAKBti9SBiobgyunEqc66q/fsmfMO4fms+B9OYTxA5K39MXAsFJGLm1eS5vSZKgx
l8BUEcO6A+oxuNzHU+P+Lygno1bwMc8qNF4dfZsw53TnM1av
-----END CERTIFICATE-----
21 changes: 21 additions & 0 deletions kafka_consumer/tests/docker/ssl/certificate/cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDhDCCAmwCCQCsnbf8LOf3/zANBgkqhkiG9w0BAQsFADA6MQswCQYDVQQGEwJV
UzEVMBMGA1UECgwMU2VydmljZVVzZXJzMRQwEgYDVQQDDAtrYWZrYS1hZG1pbjAe
Fw0yMzAzMTAxMDA4NDNaFw0zMzAzMDcxMDA4NDNaME4xEjAQBgNVBAMTCWxvY2Fs
aG9zdDEVMBMGA1UEChMMU2VydmljZVVzZXJzMQkwBwYDVQQHEwAxCTAHBgNVBAgT
ADELMAkGA1UEBhMCVVMwggGiMA0GCSqGSIb3DQEBAQUAA4IBjwAwggGKAoIBgQCN
tuxFs3zgslWJE12kVYe53p+y5rEfVsC+BAiv00D+5uf1umMWp/nXvgeTq4eWrcil
77I7DH37LMjfe8imVGoAMEEqeeRTkVoM/DviU0X1OIs79kQPZjINKR7ykVsQWqv6
mUrq1r0Ru1ft7w9Qf8f+rTumXREBGbDoo7p6hhepdqUeyXz+RhgLlVTzNPuDZrWr
OfYPxZV2hIZ8JC8pnXsIU5PP1Cm7JsUOj0WJNio35BS3YpG5np5AY+r3MDDW6Pd2
vBHk5VbaJ0SW36bmt3PlZKM79O6GVR/NBpRUVojl86M7sEP+gQ4EWY8u4qqn5aHk
xbfk8MiLx6nQKRgKSXFJv/LZP+iesZciQlxxOZyVe/MYc1vJJsJyI1R+f/GnV+Yd
3JlLd2pWCL0XHrydORXhTKVTivMJUZvBdrtu15492Ws6qmtTwOH/lKJfs7tJP1r6
NL4AnT1XENm8n6F8KHRFcOGLF/Xb8hVer8ekU8qI4Y5pp8K03YC7N9ILz0Iyp/sC
AwEAATANBgkqhkiG9w0BAQsFAAOCAQEAvKZEk3iUklOK5T8lpaTbpj82p7bxfmVw
4pL09VwJMw0rEU1D6EXZzorHfVQVb2KW9j7IaI/X/wO0yXn78D8YXLhKdXng0u0N
RqWxcTQG+glZWppVT0lZ8p6smoNiA5/2QOfxoWwq9dnwJbjlHoqX0smnbKW5cbZU
tyfUaep2TIENmseOFZH78Uj2YcFaY/4OXxm0aESX3WhY3AFoRlaVek+NdMech7PM
m72ETIXqwNEW57udqT3wgtkA+T9kopPrVszDvYzsqdTwdIkjY8BUdD8doIvQqlik
uTgy27jNBPRIXCnVjQKV6YUNkSbqcpO51Lc9Ub5ucJd9z4zlIIEvPw==
-----END CERTIFICATE-----
41 changes: 41 additions & 0 deletions kafka_consumer/tests/docker/ssl/certificate/key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIHKjAcBgoqhkiG9w0BDAEEMA4ECGy6GhmMIdarAgIEJASCBwicg2r8bfuKau/S
Q08LLCk/DjOIvGIjCAiJi5sjH4KgRmykID4VEqFu3TRZ7z/8xiY48oSX75f5WAkx
EVHSShKaELYfip4vzVllVruXFJUysFE36+oC1HG+CnJQeSkuFRMWk7hIRflUZxJb
HkBjmwZ36jpVyRZmuDRkKmknsqQM+Zrl5O1Snqp6S+wxDTY4xNj7EEFSMNNIt8yE
mr92wboQy/Pu0vDYFB6/M87ttyaRkCDajphL2hwsRjIg4QVLjlKuTLhS3B66F3sI
7RK+fGhxGhYf7kc8ti9QpXKgcQkIbYZ+tAudYPGurgjlpkvQdFz+XE8MNasognk2
cwiz26VMAwrYSCG/HNmTgg1nMUF94AGiG2WSjyW7qOtC7/gPULvjql+KYxRUzXg8
lcrgQWr3pLi9+RLVXd7kfmCzukmWuVzM5e7ij5jT/FD/0Z5iFg0A/opBGZ2bEEUI
M5GTO1ikno9ZSqvgCBPsJFMWicr5vaJE7glfxZlW2gjCws7wjeM8kOj9VK2xMJ7H
l6qEawS5N4Sic9nOP5fIpRQBcIGqg5c4qX1A7JZZ3ShbT3+Gxxj6aEGli1b/E3RM
rTseU1bIHnJjtvBWERk4DV7jy+AlJ5z1aFz8H51owMDY2RiVJJHJaTzAjN9fpz3Z
bmGD2d4YB27FTyZMq5zHevJhyPCjkLSB+n4ym6xW1m7JYqJezHxQuqglEU5dSVwb
xwgFOc09o+HX8SqYpFjVeeKg3NKo+1jLO7KU6b0Smmfc6auzjNuwWNhuONsHR5/9
HOybCrCySWLcDtFImwHW/SWlN8t8dVb/jpOpt7HvlSTLx5g19crOpjy+9TfGlmde
Es+m0KDEO3TCh99EG/rq/3EWYC6D8dfHh4eaU90+NA+b3r3aBqR0mmUDC4fM0OGe
4QUll/DGQO9ZsPr4+X1Epbz/gEAAenGULll5U92ydjD2J4b9w4m8R5vyHMD0Wq5j
ctXtrcDq46pVv2Yo7r/+L9ecIVykMqo6Jd15lnS7T6buDlFhgp/+QCkeW3Awx3v4
e/5RxVxeYrHbn53w485S/CLidlgfdQZbarPv+BKUSA8CfCdJblVAg7/lYmS2b9HQ
bSTK9418mH+3x6CDW+v7nPkOjZe6irabHVGn6yZ84wzi/6asaH3Zg4RU5xU0zPZQ
nkBUU97yB0jH9QPpoWZKglIWWEyP10eX21f7GbsrsoJf+kbcaRsw48I+Sf5CdZ0U
8sqrg2DSF3pJsy476kXycJ10qs6CsqqKTtY7XtarHD0BIYqxde5y+LugIhvpGtvJ
+HSfcHTysaoYXS5WksNZDRE25QX5yyclzq/8BsSFpbsvi840hq2rzUj3Vq6VujPN
6EqlKVvjFtI0Hc92AOvQm48aN+PNlKehHfriy6lkd/xo0Yy1Ez7dvkKIbkNrToc7
j8I1JNG9FzkcORxWmtlNmrI3r8qhsuoo4k+dWbhneu2vqW/+LEemO5ZKRHuAHT/n
UIFBMtSmxtuYA9RZQSwIW1vhmNyD6+s8sD+ghJ0K6Pfy500oRvc8nACj2HQP5oRt
PyeXf+WyXsVE7fxSjc2OQr90BYNA/3SPoZXG0OSDIgVFNR4XwoQO8PNk9kX2LHtJ
vqU7c/n4RoVGkJZv/AyLnNM8incugqwe5wOk+EZB2bsS38Sh8XoL1LHwlmt/2P3D
r8yb89o8El2fSnhy7sa5LyI2yVzssf4JnlI5v523buMp1CBSKh7CvhT7dOff8FZc
vWLmlQOudyyYJjS7MFvm37Nl075akFs16REpWMC3+lU74kH9OmXNoL2rcRoWBEiF
yItLpRfdnRoilo02qMf7e4YdyKXa6XzuIboy1Yb4r56kTpeZCXqggO2mGiaiDEem
8ckHn8U7bn5LSWbXUVLea+YxtBLa2/MBn4QilcjV9S73Qf6ra5OrVoK3O3JdUO8l
5ynM99NUGSBMUPwnqU4c0lasDYp8XYyNgoerWddQqs/iXXQVoPwnOrN4nf9oCY3N
MjoXYLp2T6L4j+AaQQHGJAYykby760olYV5SSRhRA/0ZZGRwQPCMnHNmnP4hh2s1
N8ee5yLRFNLjD0WYPqELiKUR6KkaX+6fXV6OZ+1URBVZYrhJfEAQm2QVxeiimSxs
rXy95P5hBirLvuTj0KmStsIvjVosNwFt/BN6UKHc86JOe/yhNpHhYwYVUt97KQCs
5xd07/7tviHyMeVaUWXh7UK+X5eZJAJfnIqbRu11hhPpsqK9qK4WZ9NzzOoEK0fr
5C+ajFoUi3sp52YSSQBsG7gqdKkq4qGwqOCrmDU+30w7U0T9qm+7+92U46xdebQF
lLlyaGB90YuqxqQ8NlW4m95+/X+w3TxuoBkpTjcsoGPxo3pZVSLI0W4cy0jKLG+/
P2Aidv1pIIfZnAIB8Ek=
-----END ENCRYPTED PRIVATE KEY-----
Loading

0 comments on commit 869861e

Please sign in to comment.