Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Kafka Consumer #13931

Merged
merged 12 commits into from
Feb 15, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
class ConfluentKafkaClient:
def __init__(self) -> None:
pass

def get_consumer_offsets(self):
pass

def get_broker_offset(self):
pass

def report_consumer_offset_and_lag(self):
pass

def report_broker_offset(self):
pass

def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
from abc import ABC, abstractmethod


class KafkaClient(ABC):
def __init__(self, check) -> None:
self.check = check
self.log = check.log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._context_limit = check._context_limit

def should_get_highwater_offsets(self):
return len(self._consumer_offsets) < self._context_limit

@abstractmethod
def get_consumer_offsets(self):
pass

@abstractmethod
def get_highwater_offsets(self):
pass

@abstractmethod
def report_consumer_offsets_and_lag(self):
pass

@abstractmethod
def report_highwater_offsets(self):
pass

@abstractmethod
def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient


def make_client(check) -> KafkaClient:
return KafkaPythonClient(check)
Loading