Skip to content

Commit

Permalink
Building test for jaeger
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Nov 14, 2023
1 parent debed35 commit c30cefa
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 3 deletions.
143 changes: 142 additions & 1 deletion duetector/analyzer/jaeger/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,158 @@
import asyncio
import functools
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Union

import grpc

try:
from functools import cache
except ImportError:
from functools import lru_cache as cache

from duetector.analyzer.base import Analyzer
from duetector.analyzer.jaeger.proto.query_pb2 import *
from duetector.analyzer.jaeger.proto.query_pb2_grpc import *
from duetector.analyzer.models import AnalyzerBrief, Tracking
from duetector.collectors.otel import OTelCollector
from duetector.extension.analyzer import hookimpl

ChannelInitializer = Callable[[], grpc.aio.Channel]


class JaegerConnector:
def __init__(self, channel_initializer: ChannelInitializer):
self.channel_initializer: ChannelInitializer = channel_initializer

def inspect_all_service(self):
pass

def inspect_all_operation(self):
pass


class JaegerAnalyzer(Analyzer):
default_config = {
"disabled": True,
# TODO: Support secure channel
# "secure_channel": False,
"host": "localhost",
"port": 16685,
}
service_prefix = OTelCollector.service_prefix
service_sep = OTelCollector.service_sep

def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)

@property
@cache
def channel(self) -> ChannelInitializer:
"""
Example:
async with self.channel as channel:
stub = QueryServiceStub(channel)
response = await stub.GetServices(GetServicesRequest())
print(response)
"""
target_func = grpc.aio.insecure_channel
kwargs = {"target": f"{self.config.host}:{self.config.port}"}

return functools.partial(target_func, **kwargs)

@property
@cache
def connector(self):
return JaegerConnector(self.channel)

def get_all_tracers(self) -> List[str]:
"""
Get all tracers from storage.
Returns:
List[str]: List of tracer's name.
"""

raise NotImplementedError

def get_all_collector_ids(self) -> List[str]:
"""
Get all collector id from storage.
Returns:
List[str]: List of collector id.
"""
raise NotImplementedError

def query(
self,
tracers: Optional[List[str]] = None,
collector_ids: Optional[List[str]] = None,
start_datetime: Optional[datetime] = None,
end_datetime: Optional[datetime] = None,
start: int = 0,
limit: int = 0,
columns: Optional[List[str]] = None,
where: Optional[Dict[str, Any]] = None,
distinct: bool = False,
order_by_asc: Optional[List[str]] = None,
order_by_desc: Optional[List[str]] = None,
) -> List[Tracking]:
"""
Query all tracking records from database.
Args:
tracers (Optional[List[str]], optional): Tracer's name. Defaults to None, all tracers will be queried.
collector_ids (Optional[List[str]], optional): Collector id. Defaults to None, all collector id will be queried.
start_datetime (Optional[datetime], optional): Start time. Defaults to None.
end_datetime (Optional[datetime], optional): End time. Defaults to None.
start (int, optional): Start index. Defaults to 0.
limit (int, optional): Limit of records. Defaults to 20. ``0`` means no limit.
columns (Optional[List[str]], optional): Columns to query. Defaults to None, all columns will be queried.
where (Optional[Dict[str, Any]], optional): Where clause. Defaults to None.
distinct (bool, optional): Distinct. Defaults to False.
order_by_asc (Optional[List[str]], optional): Order by asc. Defaults to None.
order_by_desc (Optional[List[str]], optional): Order by desc. Defaults to None.
Returns:
List[duetector.analyzer.models.Tracking]: List of tracking records.
"""
raise NotImplementedError

def brief(
self,
tracers: Optional[List[str]] = None,
collector_ids: Optional[List[str]] = None,
start_datetime: Optional[datetime] = None,
end_datetime: Optional[datetime] = None,
with_details: bool = True,
distinct: bool = False,
inspect_type: bool = False,
) -> AnalyzerBrief:
"""
Get a brief of this analyzer.
Args:
tracers (Optional[List[str]], optional):
Tracers. Defaults to None, all tracers will be queried.
If a specific tracer is not found, it will be ignored.
collector_ids (Optional[List[str]], optional):
Collector ids. Defaults to None, all collector ids will be queried.
If a specific collector id is not found, it will be ignored.
start_datetime (Optional[datetime], optional): Start time. Defaults to None.
end_datetime (Optional[datetime], optional): End time. Defaults to None.
with_details (bool, optional): With details. Defaults to True.
distinct (bool, optional): Distinct. Defaults to False.
inspect_type (bool, optional): Weather fileds's value is type or type name. Defaults to False, type name.
Returns:
AnalyzerBrief: A brief of this analyzer.
"""
raise NotImplementedError

def analyze(self):
# TODO: Not design yet.
pass


@hookimpl
Expand All @@ -22,7 +163,7 @@ def init_analyzer(config):
if __name__ == "__main__":

async def run() -> None:
async with grpc.aio.insecure_channel("localhost:16685") as channel:
async with JaegerAnalyzer().channel() as channel:
stub = QueryServiceStub(channel)
response = await stub.GetServices(GetServicesRequest())
print(response)
Expand Down
5 changes: 4 additions & 1 deletion duetector/collectors/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class OTelCollector(Collector):
"""

service_prefix = "duetector"
service_sep = "-"

default_config = {
**Collector.default_config,
"disabled": True,
Expand All @@ -143,7 +146,7 @@ def exporter_kwargs(self) -> Dict[str, Any]:

@property
def service_name(self) -> str:
return f"duetector-{self.id}"
return self.service_sep.join([f"{self.service_prefix}", f"{self.id}"])

def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)
Expand Down
2 changes: 2 additions & 0 deletions duetector/static/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ include_extension = true

[analyzer.jaegeranalyzer]
disabled = true
host = "localhost"
port = 16685

[analyzer.dbanalyzer]
disabled = false
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ classifiers = [
'Programming Language :: Python :: 3.11',
]
[project.optional-dependencies]
test = ["pytest", "pytest-cov", "httpx"]
test = ["pytest", "pytest-cov", "pytest-asyncio", "httpx", "docker"]
docs = ["Sphinx<=7.2.4", "sphinx-rtd-theme", "sphinx-click", "autodoc_pydantic"]

[project.scripts]
Expand Down
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
asyncio_mode = auto
125 changes: 125 additions & 0 deletions tests/test_jaeger_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import socket
import time
from collections import namedtuple

import pytest

import docker
from duetector.analyzer.jaeger.analyzer import JaegerAnalyzer
from duetector.collectors.otel import OTelCollector


def get_port():
# Get an unoccupied port
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("localhost", 0))
return s.getsockname()[1]


from duetector.utils import get_boot_time_duration_ns

timestamp = 13205215231927
datetime = get_boot_time_duration_ns(timestamp)


@pytest.fixture(scope="session")
def data_t():
d = namedtuple("Tracking", ["pid", "uid", "gid", "comm", "fname", "timestamp", "custom"])

yield d(
pid=9999,
uid=9999,
gid=9999,
comm="dummy",
fname="dummy.file",
timestamp=timestamp,
custom="dummy-xargs",
)


@pytest.fixture(scope="session")
def service_id():
yield "unittest-service"


@pytest.fixture(scope="session")
def docker_client():
try:
client = docker.from_env()
client.ping()
return client
except:
pytest.skip("Docker is not available")


@pytest.fixture(scope="session")
def jaeger_container(docker_client: docker.DockerClient, service_id, data_t):
query_port = get_port()
otel_grpc_port = get_port()
ui_port = get_port()
try:
"""
docker run --rm --name jaeger \
-p {random_query_port}:16685 \
-p {random_otel_port}:4317 \
jaegertracing/all-in-one:1.50
"""
container = docker_client.containers.run(
"jaegertracing/all-in-one:1.50",
detach=True,
ports={"16685": query_port, "4317": otel_grpc_port, "16686": ui_port},
)
# Waiting for the container to start by query ui_port
while True:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(("localhost", ui_port))
break
except:
time.sleep(0.1)

# Generate testing data
config = {
"otelcollector": {
"disabled": False,
"statis_id": service_id,
"exporter": "otlp-grpc",
"exporter_kwargs": {
"endpoint": f"localhost:{otel_grpc_port}",
"insecure": True,
},
}
}
collector = OTelCollector(config)
collector.emit("dummy", data_t)
collector.shutdown()

yield query_port
finally:
container.stop()


@pytest.fixture
def jaeger_analyzer(jaeger_container):
config = {
"jaegeranalyzer": {
"disabled": False,
"host": "localhost",
"port": jaeger_container,
}
}
yield JaegerAnalyzer(config)


async def test_jaeger_analyzer(jaeger_analyzer: JaegerAnalyzer):
from duetector.analyzer.jaeger.proto.query_pb2 import GetServicesRequest
from duetector.analyzer.jaeger.proto.query_pb2_grpc import QueryServiceStub

async with jaeger_analyzer.channel() as channel:
stub = QueryServiceStub(channel)
response = await stub.GetServices(GetServicesRequest())
print(response)


if __name__ == "__main__":
pytest.main(["-vv", "-s", __file__])

0 comments on commit c30cefa

Please sign in to comment.