Skip to content

Commit

Permalink
Extract OTelInspector for all Otel code
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Nov 15, 2023
1 parent 8ec6b04 commit 95f9c7a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 20 deletions.
44 changes: 31 additions & 13 deletions duetector/analyzer/jaeger/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,61 @@
from duetector.analyzer.jaeger.proto.query_pb2 import *
from duetector.analyzer.jaeger.proto.query_pb2_grpc import *
from duetector.analyzer.models import AnalyzerBrief, Tracking
from duetector.collectors.otel import OTelCollector
from duetector.collectors.otel import OTelInspector
from duetector.extension.analyzer import hookimpl

ChannelInitializer = Callable[[], grpc.aio.Channel]


class JaegerConnector:
class JaegerConnector(OTelInspector):
def __init__(self, channel_initializer: ChannelInitializer):
self.channel_initializer: ChannelInitializer = channel_initializer

def inspect_all_service(self):
pass
def inspect_all_collector_ids(self) -> List[str]:
with self.channel_initializer() as channel:
stub = QueryServiceStub(channel)
response = stub.GetServices(GetServicesRequest())
return [
self.get_identifier(service)
for service in response.services
if self.get_identifier(service)
]

def get_operation(self, service: str, span_kind: Optional[str] = None) -> List[str]:
with self.channel_initializer() as channel:
stub = QueryServiceStub(channel)
response = stub.GetOperations(
GetOperationsRequest(service=service, span_kind=span_kind)
)
return [operation.name for operation in response.operations]

def inspect_all_operation(self):
pass
def inspect_all_tracers(self) -> List[str]:
return [
self.get_tracer_name(operation)
for operation in self.get_operation(
service for service in self.inspect_all_collector_ids()
)
if self.get_tracer_name(operation)
]


class JaegerAnalyzer(Analyzer):
default_config = {
"disabled": True,
# TODO: Support secure channel
"secure": False,
"root_certificates_path": "",
"private_key_path": "",
"certificate_chain_path": "",
"host": "localhost",
"port": 16685,
}
service_prefix = OTelCollector.service_prefix
service_sep = OTelCollector.service_sep

def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)

@property
@cache
def channel(self) -> ChannelInitializer:
def channel_initializer(self) -> ChannelInitializer:
"""
Example:
async with self.channel as channel:
Expand All @@ -78,7 +96,7 @@ def channel(self) -> ChannelInitializer:
@property
@cache
def connector(self):
return JaegerConnector(self.channel)
return JaegerConnector(self.channel_initializer)

def get_all_tracers(self) -> List[str]:
"""
Expand All @@ -88,7 +106,7 @@ def get_all_tracers(self) -> List[str]:
List[str]: List of tracer's name.
"""

raise NotImplementedError
return self.connector.inspect_all_tracers()

def get_all_collector_ids(self) -> List[str]:
"""
Expand All @@ -97,7 +115,7 @@ def get_all_collector_ids(self) -> List[str]:
Returns:
List[str]: List of collector id.
"""
raise NotImplementedError
return self.connector.inspect_all_collector_ids()

def query(
self,
Expand Down
Empty file added duetector/analyzer/otel.py
Empty file.
3 changes: 0 additions & 3 deletions duetector/collectors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ def set_span(self, collector, span):
span.set_attribute(k, v)
span.set_attribute("collector_id", collector.id)

def span_name(self, collector):
return f"{self.tracer}@{collector.id}"


if __name__ == "__main__":
Tracking(tracer="test", dt=datetime.now())
30 changes: 27 additions & 3 deletions duetector/collectors/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,30 @@
from duetector.utils import Singleton, get_grpc_cred_from_path


class OTelInspector:
service_prefix = "duetector"
service_sep = "-"

@classmethod
def generate_service_name(cls, identifier: str) -> str:
return cls.service_sep.join([f"{cls.service_prefix}", f"{identifier}"])

@classmethod
def get_identifier(cls, service_name: str) -> Optional[str]:
if not service_name.startswith(cls.service_prefix):
return None

return service_name.replace(cls.service_prefix + cls.service_sep, "")

@classmethod
def generate_span_name(cls, t: Tracking) -> str:
return t.tracer

@classmethod
def get_tracer_name(cls, span_name: str) -> str:
return span_name


class OTelInitiator(metaclass=Singleton):
"""
Host the OpenTelemetry SDK and initialize the provider and exporter.
Expand Down Expand Up @@ -109,7 +133,7 @@ def shutdown(self):
self.provider = None


class OTelCollector(Collector):
class OTelCollector(Collector, OTelInspector):
"""
A collector using OpenTelemetry SDK.
Expand Down Expand Up @@ -152,7 +176,7 @@ def exporter_kwargs(self) -> Dict[str, Any]:

@property
def service_name(self) -> str:
return self.service_sep.join([f"{self.service_prefix}", f"{self.id}"])
return self.generate_service_name(self.id)

@property
def grpc_exporter_kwargs(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -186,7 +210,7 @@ def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):

def _emit(self, t: Tracking):
tracer = trace.get_tracer(self.id)
with tracer.start_as_current_span(t.span_name(self)) as span:
with tracer.start_as_current_span(self.generate_span_name(t)) as span:
t.set_span(self, span)

def summary(self) -> Dict:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_jaeger_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def test_jaeger_analyzer(jaeger_analyzer: JaegerAnalyzer):
from duetector.analyzer.jaeger.proto.query_pb2 import GetServicesRequest
from duetector.analyzer.jaeger.proto.query_pb2_grpc import QueryServiceStub

async with jaeger_analyzer.channel() as channel:
async with jaeger_analyzer.channel_initializer() as channel:
stub = QueryServiceStub(channel)
response = await stub.GetServices(GetServicesRequest())
print(response)
Expand Down

0 comments on commit 95f9c7a

Please sign in to comment.