Skip to content

Commit

Permalink
Refactor Kafka Consumer (#13931)
Browse files Browse the repository at this point in the history
* Map out structure

* Combine classes

* Remove deprecated call

* Remove clazz

* Create structure for kafka client classes

* Undo

* Fix style

* Add consumer offset and log collection (#13944)

* Refactor broker offset metric collection (#13934)

* Add broker offset metric collection

* Change import

* Clean up broker offset functions and change names

* Fix style

* Use updated values for check

* Clean up functions

* Refactor client creation (#13946)

* Refactor client creation

* Add back e2e test

* Remove commented out line

* Remove KafkaClient and refactor tests (#13954)

* Revert "Remove KafkaClient and refactor tests (#13954)"

This reverts commit e327d71.

---------

Co-authored-by: Fanny Jiang <[email protected]>
  • Loading branch information
yzhan289 and fanny-jiang committed Feb 15, 2023
1 parent bbc94c8 commit 7347ee1
Show file tree
Hide file tree
Showing 5 changed files with 584 additions and 441 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
class ConfluentKafkaClient:
def __init__(self) -> None:
pass

def get_consumer_offsets(self):
pass

def get_broker_offset(self):
pass

def report_consumer_offset_and_lag(self):
pass

def report_broker_offset(self):
pass

def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
from abc import ABC, abstractmethod


class KafkaClient(ABC):
def __init__(self, check) -> None:
self.check = check
self.log = check.log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._context_limit = check._context_limit

def should_get_highwater_offsets(self):
return len(self._consumer_offsets) < self._context_limit

@abstractmethod
def get_consumer_offsets(self):
pass

@abstractmethod
def get_highwater_offsets(self):
pass

@abstractmethod
def report_consumer_offsets_and_lag(self):
pass

@abstractmethod
def report_highwater_offsets(self):
pass

@abstractmethod
def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient


def make_client(check) -> KafkaClient:
return KafkaPythonClient(check)
Loading

0 comments on commit 7347ee1

Please sign in to comment.