Skip to content

Commit

Permalink
Add some comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Nov 21, 2023
1 parent 1b6b4f5 commit a99d439
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
41 changes: 40 additions & 1 deletion duetector/analyzer/jaeger/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@
from duetector.analyzer.jaeger.proto.query_pb2_grpc import *
from duetector.analyzer.models import AnalyzerBrief, Brief, Tracking
from duetector.extension.analyzer import hookimpl
from duetector.log import logger
from duetector.otel import OTelInspector

ChannelInitializer = Callable[[], grpc.aio.Channel]


class JaegerConnector(OTelInspector):
"""
Providing query method for jaeger backend
"""

def __init__(self, channel_initializer: ChannelInitializer):
self.channel_initializer: ChannelInitializer = channel_initializer

async def inspect_all_collector_ids(self) -> List[str]:
logger.info("Querying all collector ids...")
async with self.channel_initializer() as channel:
stub = QueryServiceStub(channel)
response = await stub.GetServices(GetServicesRequest())
Expand All @@ -42,6 +48,7 @@ async def inspect_all_collector_ids(self) -> List[str]:
]

async def get_operation(self, service: str, span_kind: Optional[str] = None) -> List[str]:
logger.info(f"Querying operations of {service}...")
async with self.channel_initializer() as channel:
stub = QueryServiceStub(channel)
response = await stub.GetOperations(
Expand All @@ -50,6 +57,7 @@ async def get_operation(self, service: str, span_kind: Optional[str] = None) ->
return [operation.name for operation in response.operations]

async def inspect_all_tracers(self) -> List[str]:
logger.info("Querying all tracers...")
ret = []
for collector_id in await self.inspect_all_collector_ids():
service = self.generate_service_name(collector_id)
Expand Down Expand Up @@ -78,6 +86,10 @@ def get_find_tracers_request(
duration_max: Optional[int] = None,
search_depth: int = 20,
) -> FindTracesRequest:
if not collector_id:
raise AnalysQueryError(f"collector_id is required, current:{collector_id}")
if not tracer_name:
raise AnalysQueryError(f"tracer_name is required, current:{tracer_name}")
if search_depth < 1 or search_depth > 1500:
raise AnalysQueryError("Jaeger search_depth must be between 1 and 1500.")

Expand Down Expand Up @@ -109,6 +121,10 @@ async def query_trace(
duration_max: Optional[int] = None,
search_depth: int = 20,
) -> List[Tracking]:
if not collector_id:
raise AnalysQueryError(f"collector_id is required, current:{collector_id}")
if not tracer_name:
raise AnalysQueryError(f"tracer_name is required, current:{tracer_name}")
request = self.get_find_tracers_request(
collector_id=collector_id,
tracer_name=tracer_name,
Expand Down Expand Up @@ -146,6 +162,10 @@ async def brief(
start_time_max: Optional[datetime] = None,
inspect_type=True,
) -> Optional[Brief]:
if not collector_id:
raise AnalysQueryError(f"collector_id is required, current:{collector_id}")
if not tracer_name:
raise AnalysQueryError(f"tracer_name is required, current:{tracer_name}")
request = self.get_find_tracers_request(
collector_id=collector_id,
tracer_name=tracer_name,
Expand Down Expand Up @@ -276,6 +296,17 @@ async def query(
Returns:
List[duetector.analyzer.models.Tracking]: List of tracking records.
"""
not_support_params = {
"start": start,
"columns": columns,
"distinct": distinct,
"order_by_asc": order_by_asc,
"order_by_desc": order_by_desc,
}
for k, v in not_support_params:
if v:
logger.warning("Not support params: %s=%s", k, v)

if not collector_ids:
collector_ids = await self.get_all_collector_ids()
if not tracers:
Expand All @@ -300,7 +331,7 @@ async def brief(
collector_ids: Optional[List[str]] = None,
start_datetime: Optional[datetime] = None,
end_datetime: Optional[datetime] = None,
with_details: bool = True,
with_details: bool = False,
distinct: bool = False,
inspect_type: bool = True,
) -> AnalyzerBrief:
Expand All @@ -323,6 +354,14 @@ async def brief(
Returns:
AnalyzerBrief: A brief of this analyzer.
"""
not_support_params = {
"with_details": with_details,
"distinct": distinct,
}
for k, v in not_support_params.items():
if v:
logger.warning("Not support params: %s=%s", k, v)

if tracers:
tracers = [t for t in tracers if t in await self.get_all_tracers()]
else:
Expand Down
3 changes: 2 additions & 1 deletion duetector/collectors/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def initialize(
processor_kwargs: Optional[Dict[str, Any]] = None,
) -> None:
if self._initialized:
logger.info("Alre")
logger.info("Already initiated. Skip...")
return

if not resource_kwargs:
Expand Down Expand Up @@ -188,6 +188,7 @@ def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)

if "grpc" in self.exporter:
logger.info("Merge grpc kwargs into exporter_kwargs")
self.exporter_kwargs.update(self.grpc_exporter_kwargs)

self.otel = OTelInitiator()
Expand Down

0 comments on commit a99d439

Please sign in to comment.