Skip to content

Commit

Permalink
Revert "Flatten the check structure"
Browse files Browse the repository at this point in the history
This reverts commit 1492138.
  • Loading branch information
FlorentClarret committed Feb 15, 2023
1 parent 1492138 commit 7418ea7
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 93 deletions.
201 changes: 111 additions & 90 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,118 @@ def __init__(self, name, init_config, instances):
)
self._consumer_groups = self.instance.get('consumer_groups', {})

self.sub_check = NewKafkaConsumerCheck(self)

def check(self, _):
return self.sub_check.check()

def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
event_dict = {
'timestamp': int(time()),
'msg_title': title,
'event_type': event_type,
'alert_type': severity,
'msg_text': text,
'tags': tags,
'aggregation_key': aggregation_key,
}
self.event(event_dict)

def create_kafka_client(self):
return self._create_kafka_client(clazz=KafkaClient)

def create_kafka_admin_client(self):
return self._create_kafka_client(clazz=KafkaAdminClient)

def validate_consumer_groups(self):
"""Validate any explicitly specified consumer groups.
consumer_groups = {'consumer_group': {'topic': [0, 1]}}
"""
assert isinstance(self._consumer_groups, dict)
for consumer_group, topics in self._consumer_groups.items():
assert isinstance(consumer_group, string_types)
assert isinstance(topics, dict) or topics is None # topics are optional
if topics is not None:
for topic, partitions in topics.items():
assert isinstance(topic, string_types)
assert isinstance(partitions, (list, tuple)) or partitions is None # partitions are optional
if partitions is not None:
for partition in partitions:
assert isinstance(partition, int)

def _create_kafka_client(self, clazz):
kafka_connect_str = self.instance.get('kafka_connect_str')
if not isinstance(kafka_connect_str, (string_types, list)):
raise ConfigurationError('kafka_connect_str should be string or list of strings')
kafka_version = self.instance.get('kafka_client_api_version')
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))

tls_context = self.get_tls_context()
crlfile = self.instance.get('ssl_crlfile', self.instance.get('tls_crlfile'))
if crlfile:
tls_context.load_verify_locations(crlfile)
tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF

return clazz(
bootstrap_servers=kafka_connect_str,
client_id='dd-agent',
request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000,
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for
# broker version during the bootstrapping process. Note that this returns the first version found, so in
# a mixed-version cluster this will be a non-deterministic result.
api_version=kafka_version,
# While we check for SASL/SSL params, if not present they will default to the kafka-python values for
# plaintext connections
security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'),
sasl_mechanism=self.instance.get('sasl_mechanism'),
sasl_plain_username=self.instance.get('sasl_plain_username'),
sasl_plain_password=self.instance.get('sasl_plain_password'),
sasl_kerberos_service_name=self.instance.get('sasl_kerberos_service_name', 'kafka'),
sasl_kerberos_domain_name=self.instance.get('sasl_kerberos_domain_name'),
sasl_oauth_token_provider=(
OAuthTokenProvider(**self.instance['sasl_oauth_token_provider'])
if 'sasl_oauth_token_provider' in self.instance
else None
),
ssl_context=tls_context,
)


class NewKafkaConsumerCheck(object):
"""
Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets.
For details about the supported options, see the associated `conf.yaml.example`.
"""

def __init__(self, parent_check):
self._parent_check = parent_check
self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE)
self._kafka_client = None

def check(self, _):
def __getattr__(self, item):
try:
return getattr(self._parent_check, item)
except AttributeError:
raise AttributeError("NewKafkaConsumerCheck has no attribute called {}".format(item))

@property
def kafka_client(self):
if self._kafka_client is None:
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for
# broker version during the bootstrapping process. Note that this returns the first version found, so in
# a mixed-version cluster this will be a non-deterministic result.
kafka_version = self.instance.get('kafka_client_api_version')
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))

self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version)
return self._kafka_client

def check(self):
"""The main entrypoint of the check."""
self._consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset}
self._highwater_offsets = {} # Expected format: {(topic, partition): offset}
Expand Down Expand Up @@ -103,19 +211,6 @@ def check(self, _):

self._collect_broker_metadata()

@property
def kafka_client(self):
if self._kafka_client is None:
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for
# broker version during the bootstrapping process. Note that this returns the first version found, so in
# a mixed-version cluster this will be a non-deterministic result.
kafka_version = self.instance.get('kafka_client_api_version')
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))

self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version)
return self._kafka_client

def _create_kafka_admin_client(self, api_version):
"""Return a KafkaAdminClient."""
# TODO accept None (which inherits kafka-python default of localhost:9092)
Expand Down Expand Up @@ -168,7 +263,7 @@ def _get_highwater_offsets(self):
for topic, partition in broker_led_partitions:
# No sense fetching highwater offsets for internal topics
if topic not in KAFKA_INTERNAL_TOPICS and (
self._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset
self._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset
):
partitions_grouped_by_topic[topic].append(partition)

Expand Down Expand Up @@ -425,7 +520,7 @@ def _collect_broker_metadata(self):
@staticmethod
def batchify(iterable, batch_size):
iterable = list(iterable)
return (iterable[i: i + batch_size] for i in range(0, len(iterable), batch_size))
return (iterable[i : i + batch_size] for i in range(0, len(iterable), batch_size))

# FIXME: This is using a workaround to skip socket wakeup, which causes blocking
# (see https://github.com/dpkp/kafka-python/issues/2286).
Expand Down Expand Up @@ -491,77 +586,3 @@ def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_
"Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient.".format(version)
)
return self._send_request_to_node(group_coordinator_id, request, wakeup=False)

def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
event_dict = {
'timestamp': int(time()),
'msg_title': title,
'event_type': event_type,
'alert_type': severity,
'msg_text': text,
'tags': tags,
'aggregation_key': aggregation_key,
}
self.event(event_dict)

def create_kafka_client(self):
return self._create_kafka_client(clazz=KafkaClient)

def create_kafka_admin_client(self):
return self._create_kafka_client(clazz=KafkaAdminClient)

def validate_consumer_groups(self):
"""Validate any explicitly specified consumer groups.
consumer_groups = {'consumer_group': {'topic': [0, 1]}}
"""
assert isinstance(self._consumer_groups, dict)
for consumer_group, topics in self._consumer_groups.items():
assert isinstance(consumer_group, string_types)
assert isinstance(topics, dict) or topics is None # topics are optional
if topics is not None:
for topic, partitions in topics.items():
assert isinstance(topic, string_types)
assert isinstance(partitions, (list, tuple)) or partitions is None # partitions are optional
if partitions is not None:
for partition in partitions:
assert isinstance(partition, int)

def _create_kafka_client(self, clazz):
kafka_connect_str = self.instance.get('kafka_connect_str')
if not isinstance(kafka_connect_str, (string_types, list)):
raise ConfigurationError('kafka_connect_str should be string or list of strings')
kafka_version = self.instance.get('kafka_client_api_version')
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))

tls_context = self.get_tls_context()
crlfile = self.instance.get('ssl_crlfile', self.instance.get('tls_crlfile'))
if crlfile:
tls_context.load_verify_locations(crlfile)
tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF

return clazz(
bootstrap_servers=kafka_connect_str,
client_id='dd-agent',
request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000,
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for
# broker version during the bootstrapping process. Note that this returns the first version found, so in
# a mixed-version cluster this will be a non-deterministic result.
api_version=kafka_version,
# While we check for SASL/SSL params, if not present they will default to the kafka-python values for
# plaintext connections
security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'),
sasl_mechanism=self.instance.get('sasl_mechanism'),
sasl_plain_username=self.instance.get('sasl_plain_username'),
sasl_plain_password=self.instance.get('sasl_plain_password'),
sasl_kerberos_service_name=self.instance.get('sasl_kerberos_service_name', 'kafka'),
sasl_kerberos_domain_name=self.instance.get('sasl_kerberos_domain_name'),
sasl_oauth_token_provider=(
OAuthTokenProvider(**self.instance['sasl_oauth_token_provider'])
if 'sasl_oauth_token_provider' in self.instance
else None
),
ssl_context=tls_context,
)
4 changes: 1 addition & 3 deletions kafka_consumer/hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ matrix.version.env-vars = [
{ key = "KAFKA_VERSION", value = "1.1.1", if = ["1.1"] },
{ key = "KAFKA_VERSION", value = "2.3.1", if = ["2.3"] },
{ key = "KAFKA_VERSION", value = "3.3.2", if = ["3.3"] },
"ZK_VERSION=3.6.4"
]

[envs.default]
Expand All @@ -18,9 +19,6 @@ dependencies = [
]
e2e-env = false

[envs.default.env-vars]
ZK_VERSION = "3.6.4"

[envs.latest.env-vars]
KAFKA_VERSION = "latest"
ZK_VERSION = "3.6.4"

0 comments on commit 7418ea7

Please sign in to comment.