From c30cefac0ed95a54e4f15ad8b4435d7f1250570d Mon Sep 17 00:00:00 2001 From: wunder957 Date: Tue, 14 Nov 2023 18:15:15 +0800 Subject: [PATCH] Building test for jaeger --- duetector/analyzer/jaeger/analyzer.py | 143 +++++++++++++++++++++++++- duetector/collectors/otel.py | 5 +- duetector/static/config.toml | 2 + pyproject.toml | 2 +- pytest.ini | 2 + tests/test_jaeger_analyzer.py | 125 ++++++++++++++++++++++ 6 files changed, 276 insertions(+), 3 deletions(-) create mode 100644 pytest.ini create mode 100644 tests/test_jaeger_analyzer.py diff --git a/duetector/analyzer/jaeger/analyzer.py b/duetector/analyzer/jaeger/analyzer.py index dce39c0..e529c97 100644 --- a/duetector/analyzer/jaeger/analyzer.py +++ b/duetector/analyzer/jaeger/analyzer.py @@ -1,17 +1,158 @@ import asyncio +import functools +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Union import grpc +try: + from functools import cache +except ImportError: + from functools import lru_cache as cache + from duetector.analyzer.base import Analyzer from duetector.analyzer.jaeger.proto.query_pb2 import * from duetector.analyzer.jaeger.proto.query_pb2_grpc import * +from duetector.analyzer.models import AnalyzerBrief, Tracking +from duetector.collectors.otel import OTelCollector from duetector.extension.analyzer import hookimpl +ChannelInitializer = Callable[[], grpc.aio.Channel] + + +class JaegerConnector: + def __init__(self, channel_initializer: ChannelInitializer): + self.channel_initializer: ChannelInitializer = channel_initializer + + def inspect_all_service(self): + pass + + def inspect_all_operation(self): + pass + class JaegerAnalyzer(Analyzer): default_config = { "disabled": True, + # TODO: Support secure channel + # "secure_channel": False, + "host": "localhost", + "port": 16685, } + service_prefix = OTelCollector.service_prefix + service_sep = OTelCollector.service_sep + + def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs): + super().__init__(config, *args, **kwargs) + + @property + @cache + def channel(self) -> ChannelInitializer: + """ + Example: + async with self.channel as channel: + stub = QueryServiceStub(channel) + response = await stub.GetServices(GetServicesRequest()) + print(response) + + """ + target_func = grpc.aio.insecure_channel + kwargs = {"target": f"{self.config.host}:{self.config.port}"} + + return functools.partial(target_func, **kwargs) + + @property + @cache + def connector(self): + return JaegerConnector(self.channel) + + def get_all_tracers(self) -> List[str]: + """ + Get all tracers from storage. + + Returns: + List[str]: List of tracer's name. + """ + + raise NotImplementedError + + def get_all_collector_ids(self) -> List[str]: + """ + Get all collector id from storage. + + Returns: + List[str]: List of collector id. + """ + raise NotImplementedError + + def query( + self, + tracers: Optional[List[str]] = None, + collector_ids: Optional[List[str]] = None, + start_datetime: Optional[datetime] = None, + end_datetime: Optional[datetime] = None, + start: int = 0, + limit: int = 0, + columns: Optional[List[str]] = None, + where: Optional[Dict[str, Any]] = None, + distinct: bool = False, + order_by_asc: Optional[List[str]] = None, + order_by_desc: Optional[List[str]] = None, + ) -> List[Tracking]: + """ + Query all tracking records from database. + + Args: + tracers (Optional[List[str]], optional): Tracer's name. Defaults to None, all tracers will be queried. + collector_ids (Optional[List[str]], optional): Collector id. Defaults to None, all collector id will be queried. + start_datetime (Optional[datetime], optional): Start time. Defaults to None. + end_datetime (Optional[datetime], optional): End time. Defaults to None. + start (int, optional): Start index. Defaults to 0. + limit (int, optional): Limit of records. Defaults to 20. ``0`` means no limit. + columns (Optional[List[str]], optional): Columns to query. Defaults to None, all columns will be queried. + where (Optional[Dict[str, Any]], optional): Where clause. Defaults to None. + distinct (bool, optional): Distinct. Defaults to False. + order_by_asc (Optional[List[str]], optional): Order by asc. Defaults to None. + order_by_desc (Optional[List[str]], optional): Order by desc. Defaults to None. + Returns: + List[duetector.analyzer.models.Tracking]: List of tracking records. + """ + raise NotImplementedError + + def brief( + self, + tracers: Optional[List[str]] = None, + collector_ids: Optional[List[str]] = None, + start_datetime: Optional[datetime] = None, + end_datetime: Optional[datetime] = None, + with_details: bool = True, + distinct: bool = False, + inspect_type: bool = False, + ) -> AnalyzerBrief: + """ + Get a brief of this analyzer. + + Args: + tracers (Optional[List[str]], optional): + Tracers. Defaults to None, all tracers will be queried. + If a specific tracer is not found, it will be ignored. + collector_ids (Optional[List[str]], optional): + Collector ids. Defaults to None, all collector ids will be queried. + If a specific collector id is not found, it will be ignored. + start_datetime (Optional[datetime], optional): Start time. Defaults to None. + end_datetime (Optional[datetime], optional): End time. Defaults to None. + with_details (bool, optional): With details. Defaults to True. + distinct (bool, optional): Distinct. Defaults to False. + inspect_type (bool, optional): Weather fileds's value is type or type name. Defaults to False, type name. + + Returns: + AnalyzerBrief: A brief of this analyzer. + """ + raise NotImplementedError + + def analyze(self): + # TODO: Not design yet. + pass @hookimpl @@ -22,7 +163,7 @@ def init_analyzer(config): if __name__ == "__main__": async def run() -> None: - async with grpc.aio.insecure_channel("localhost:16685") as channel: + async with JaegerAnalyzer().channel() as channel: stub = QueryServiceStub(channel) response = await stub.GetServices(GetServicesRequest()) print(response) diff --git a/duetector/collectors/otel.py b/duetector/collectors/otel.py index ae1416d..da954bf 100644 --- a/duetector/collectors/otel.py +++ b/duetector/collectors/otel.py @@ -122,6 +122,9 @@ class OTelCollector(Collector): """ + service_prefix = "duetector" + service_sep = "-" + default_config = { **Collector.default_config, "disabled": True, @@ -143,7 +146,7 @@ def exporter_kwargs(self) -> Dict[str, Any]: @property def service_name(self) -> str: - return f"duetector-{self.id}" + return self.service_sep.join([f"{self.service_prefix}", f"{self.id}"]) def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs): super().__init__(config, *args, **kwargs) diff --git a/duetector/static/config.toml b/duetector/static/config.toml index fa4377a..8eb5063 100644 --- a/duetector/static/config.toml +++ b/duetector/static/config.toml @@ -100,6 +100,8 @@ include_extension = true [analyzer.jaegeranalyzer] disabled = true +host = "localhost" +port = 16685 [analyzer.dbanalyzer] disabled = false diff --git a/pyproject.toml b/pyproject.toml index c5d6050..642683f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ classifiers = [ 'Programming Language :: Python :: 3.11', ] [project.optional-dependencies] -test = ["pytest", "pytest-cov", "httpx"] +test = ["pytest", "pytest-cov", "pytest-asyncio", "httpx", "docker"] docs = ["Sphinx<=7.2.4", "sphinx-rtd-theme", "sphinx-click", "autodoc_pydantic"] [project.scripts] diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..2f4c80e --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_mode = auto diff --git a/tests/test_jaeger_analyzer.py b/tests/test_jaeger_analyzer.py new file mode 100644 index 0000000..c464634 --- /dev/null +++ b/tests/test_jaeger_analyzer.py @@ -0,0 +1,125 @@ +import socket +import time +from collections import namedtuple + +import pytest + +import docker +from duetector.analyzer.jaeger.analyzer import JaegerAnalyzer +from duetector.collectors.otel import OTelCollector + + +def get_port(): + # Get an unoccupied port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("localhost", 0)) + return s.getsockname()[1] + + +from duetector.utils import get_boot_time_duration_ns + +timestamp = 13205215231927 +datetime = get_boot_time_duration_ns(timestamp) + + +@pytest.fixture(scope="session") +def data_t(): + d = namedtuple("Tracking", ["pid", "uid", "gid", "comm", "fname", "timestamp", "custom"]) + + yield d( + pid=9999, + uid=9999, + gid=9999, + comm="dummy", + fname="dummy.file", + timestamp=timestamp, + custom="dummy-xargs", + ) + + +@pytest.fixture(scope="session") +def service_id(): + yield "unittest-service" + + +@pytest.fixture(scope="session") +def docker_client(): + try: + client = docker.from_env() + client.ping() + return client + except: + pytest.skip("Docker is not available") + + +@pytest.fixture(scope="session") +def jaeger_container(docker_client: docker.DockerClient, service_id, data_t): + query_port = get_port() + otel_grpc_port = get_port() + ui_port = get_port() + try: + """ + docker run --rm --name jaeger \ + -p {random_query_port}:16685 \ + -p {random_otel_port}:4317 \ + jaegertracing/all-in-one:1.50 + """ + container = docker_client.containers.run( + "jaegertracing/all-in-one:1.50", + detach=True, + ports={"16685": query_port, "4317": otel_grpc_port, "16686": ui_port}, + ) + # Waiting for the container to start by query ui_port + while True: + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect(("localhost", ui_port)) + break + except: + time.sleep(0.1) + + # Generate testing data + config = { + "otelcollector": { + "disabled": False, + "statis_id": service_id, + "exporter": "otlp-grpc", + "exporter_kwargs": { + "endpoint": f"localhost:{otel_grpc_port}", + "insecure": True, + }, + } + } + collector = OTelCollector(config) + collector.emit("dummy", data_t) + collector.shutdown() + + yield query_port + finally: + container.stop() + + +@pytest.fixture +def jaeger_analyzer(jaeger_container): + config = { + "jaegeranalyzer": { + "disabled": False, + "host": "localhost", + "port": jaeger_container, + } + } + yield JaegerAnalyzer(config) + + +async def test_jaeger_analyzer(jaeger_analyzer: JaegerAnalyzer): + from duetector.analyzer.jaeger.proto.query_pb2 import GetServicesRequest + from duetector.analyzer.jaeger.proto.query_pb2_grpc import QueryServiceStub + + async with jaeger_analyzer.channel() as channel: + stub = QueryServiceStub(channel) + response = await stub.GetServices(GetServicesRequest()) + print(response) + + +if __name__ == "__main__": + pytest.main(["-vv", "-s", __file__])