From f438561950124a317b6fb0ddd9eb4d28c6a8108b Mon Sep 17 00:00:00 2001 From: wunder957 Date: Wed, 18 Oct 2023 15:56:05 +0800 Subject: [PATCH] Feature: Support OTel Collector (#82) * Init OTel feature branch * Add OTel Support * Add some docs and rename mode to exporter * Fix testing * More statement when shutdown --- README.md | 13 +- README_zh.md | 10 +- dev-tools/start-docs-host.sh | 2 +- docs/source/collectors/index.rst | 1 + docs/source/collectors/otel.rst | 15 ++ duetector/collectors/models.py | 10 ++ duetector/collectors/otel.py | 167 ++++++++++++++++++ duetector/collectors/register.py | 4 +- duetector/static/config.toml | 10 ++ duetector/utils.py | 6 +- pyproject.toml | 13 +- tests/config.toml | 10 ++ ...test_collector.py => test_db_collector.py} | 0 tests/test_otel_collector.py | 52 ++++++ 14 files changed, 295 insertions(+), 18 deletions(-) create mode 100644 docs/source/collectors/otel.rst create mode 100644 duetector/collectors/otel.py rename tests/{test_collector.py => test_db_collector.py} (100%) create mode 100644 tests/test_otel_collector.py diff --git a/README.md b/README.md index 74fa471..faf3729 100644 --- a/README.md +++ b/README.md @@ -55,21 +55,18 @@ Join our [slack channel](https://join.slack.com/t/hitsz-ids/shared_invite/zt-239 - `Tracer` Support - [X] eBPF-based tracer - [X] Shell command tracer - - [ ] Subprocess tracer + - [X] Subprocess tracer - `Filter` Support - [X] Pattern matching, based on regular expressions -- `Collector` and `Analyzer` Support - - [X] SQL database - - [ ] OpenTelemetry -- Analyzer Support - - [X] SQL database - - [ ] OpenTelemetry +- Data Collection and Analysis + - [X] `Analyzer` Support SQL database + - [X] `Collector` Support SQL database and *OpenTelemetry(Experimental)* - User Interface - [X] CLI Tools - [X] PIP Service - [ ] Control Panel - Enhancements - - [ ] Runc containers identification + - [ ] `RunC` containers identification The eBPF program requires kernel support, see [Kernel Support](./docs/kernel_config.md) diff --git a/README_zh.md b/README_zh.md index 65cf419..76617cd 100644 --- a/README_zh.md +++ b/README_zh.md @@ -59,16 +59,18 @@ duetector🔍是一个基于可扩展的的数据使用探测器,它可以在L - `Tracer`支持 - [X] 基于eBPF的tracer - [X] 基于shell命令的tracer - - [ ] 基于子进程的tracer + - [X] 基于子进程的tracer - `Filter`支持 - [X] 支持正则的模式匹配 -- `Collector`和`Analyzer`支持 - - [X] SQL数据库 - - [ ] Opentelemetry +- 遥测数据传输与分析 + - [X] `Analyzer`支持SQL数据库 + - [X] `Collector`支持SQL数据库和*Opentelemetry(实验性)* - 用户接口 - [X] 命令行工具 - [X] PIP服务 - [ ] 控制平面 +- 增强功能 + - [ ] `RunC`容器云原生支持 eBPF程序需要内核支持,详见[内核支持](./docs/kernel_config.md) diff --git a/dev-tools/start-docs-host.sh b/dev-tools/start-docs-host.sh index c9543b3..3a75321 100755 --- a/dev-tools/start-docs-host.sh +++ b/dev-tools/start-docs-host.sh @@ -6,6 +6,6 @@ set -e docker run --rm \ -it \ --p 8080:80 \ +-p 8910:80 \ -v $(pwd)/../docs/build/html:/usr/share/nginx/html:ro \ nginx diff --git a/docs/source/collectors/index.rst b/docs/source/collectors/index.rst index b7f4ba6..e2c8324 100644 --- a/docs/source/collectors/index.rst +++ b/docs/source/collectors/index.rst @@ -25,6 +25,7 @@ Avaliable Collector :maxdepth: 2 DB Collectors + OTel Collector Data Models diff --git a/docs/source/collectors/otel.rst b/docs/source/collectors/otel.rst new file mode 100644 index 0000000..54c0de4 --- /dev/null +++ b/docs/source/collectors/otel.rst @@ -0,0 +1,15 @@ +OTelCollector +================== + +.. autoclass:: duetector.collectors.otel.OTelInitiator + :members: + :undoc-members: + :private-members: + :show-inheritance: + + +.. autoclass:: duetector.collectors.otel.OTelCollector + :members: + :undoc-members: + :private-members: + :show-inheritance: diff --git a/duetector/collectors/models.py b/duetector/collectors/models.py index dfb074e..cbca6a3 100644 --- a/duetector/collectors/models.py +++ b/duetector/collectors/models.py @@ -102,6 +102,16 @@ def from_namedtuple(tracer, data: NamedTuple) -> Tracking: # type: ignore return Tracking(**args) + def set_span(self, span): + for k in self.model_fields: + if k in ("tracer", "extended"): + continue + v = getattr(self, k) + if v is not None: + span.set_attribute(k, v) + for k, v in self.extended.items(): + span.set_attribute(k, v) + if __name__ == "__main__": Tracking(tracer="test", dt=datetime.now()) diff --git a/duetector/collectors/otel.py b/duetector/collectors/otel.py new file mode 100644 index 0000000..156eef8 --- /dev/null +++ b/duetector/collectors/otel.py @@ -0,0 +1,167 @@ +from typing import Any, Dict, Optional + +from opentelemetry import trace +from opentelemetry.exporter.jaeger.proto.grpc import ( + JaegerExporter as GRPCJaegerExporter, +) +from opentelemetry.exporter.jaeger.thrift import JaegerExporter as ThriftJaegerExporter +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter as GRPCOTLPSpanExporter, +) +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter as HTTPOTLPSpanExporter, +) +from opentelemetry.exporter.zipkin.json import ZipkinExporter as JSONZipkinExporter +from opentelemetry.exporter.zipkin.proto.http import ( + ZipkinExporter as HTTPZipkinExporter, +) +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter + +from duetector.collectors.base import Collector +from duetector.collectors.models import Tracking +from duetector.extension.collector import hookimpl + + +class OTelInitiator: + """ + Host the OpenTelemetry SDK and initialize the provider and exporter. + + Avaliable exporters: + - ``console`` + - ``otlp-grpc`` + - ``otlp-http`` + - ``jaeger-thrift`` + - ``jaeger-grpc`` + - ``zipkin-http`` + - ``zipkin-json`` + - ``prometheus`` + + Example: + + .. code-block:: python + + otel = OTelInitiator() + trace = otel.initialize( + service_name="duetector", + exporter="console", + ) + + from opentelemetry import trace + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("test") as span: + span.set_attribute("test", "test") + + otel.shutdown() + """ + + exporter_cls = { + "console": ConsoleSpanExporter, + "otlp-grpc": GRPCOTLPSpanExporter, + "otlp-http": HTTPOTLPSpanExporter, + "jaeger-thrift": ThriftJaegerExporter, + "jaeger-grpc": GRPCJaegerExporter, + "zipkin-http": HTTPZipkinExporter, + "zipkin-json": JSONZipkinExporter, + # Prometheus only support metrics + # "prometheus": "TODO" + } + + def __init__(self): + self._initialized = False + self.provider = None + + def initialize( + self, + service_name="unknown-service", + resource_kwargs: Optional[Dict[str, Any]] = None, + provider_kwargs: Optional[Dict[str, Any]] = None, + exporter="console", + exporter_kwargs: Optional[Dict[str, Any]] = None, + ) -> None: + if self._initialized: + return + + if not resource_kwargs: + resource_kwargs = {} + resource_kwargs.setdefault(SERVICE_NAME, service_name) + resource = Resource(attributes=resource_kwargs) + + if not provider_kwargs: + provider_kwargs = {} + provider = TracerProvider(resource=resource, **provider_kwargs) + self.provider = provider + + if not exporter_kwargs: + exporter_kwargs = {} + processor = BatchSpanProcessor(self.exporter_cls[exporter](**exporter_kwargs)) + + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + self._initialized = True + + def shutdown(self): + if self._initialized and self.provider: + self.provider.shutdown() + self._initialized = False + self.provider = None + + +class OTelCollector(Collector): + """ + A collector using OpenTelemetry SDK. + + Config: + - ``exporter``: One of ``console``, ``otlp-grpc``, ``otlp-http``, ``jaeger-thrift``, ``jaeger-grpc``, ``zipkin-http``, ``zipkin-json``, see :class:`OTelInitiator` for more details + - ``exporter_kwargs``: A dict of kwargs for exporter + + Note: + Since v1.35, the Jaeger supports OTLP natively. Please use the OTLP exporter instead. Support for this exporter will end July 2023. + + """ + + default_config = { + **Collector.default_config, + "disabled": True, + "exporter": "console", + "exporter_kwargs": {}, + } + + @property + def exporter(self) -> str: + return self.config.exporter + + @property + def endpoint(self) -> Optional[str]: + return self.config.endpoint + + @property + def exporter_kwargs(self) -> Dict[str, Any]: + return self.config.exporter_kwargs + + def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs): + super().__init__(config, *args, **kwargs) + self.otel = OTelInitiator() + self.otel.initialize( + service_name="duetector", + exporter=self.exporter, + exporter_kwargs=self.exporter_kwargs, + ) + + def _emit(self, t: Tracking): + tracer = trace.get_tracer(self.id) + with tracer.start_as_current_span(t.tracer) as span: + t.set_span(span) + + def summary(self) -> Dict: + return {} + + def shutdown(self): + super().shutdown() + self.otel.shutdown() + + +@hookimpl +def init_collector(config): + return OTelCollector(config) diff --git a/duetector/collectors/register.py b/duetector/collectors/register.py index ac07b16..ab5c2ec 100644 --- a/duetector/collectors/register.py +++ b/duetector/collectors/register.py @@ -1,4 +1,4 @@ # Expose for plugin system -from . import base, db +from . import base, db, otel -registers = [base, db] +registers = [base, db, otel] diff --git a/duetector/static/config.toml b/duetector/static/config.toml index cf5c530..d053cdf 100644 --- a/duetector/static/config.toml +++ b/duetector/static/config.toml @@ -63,6 +63,16 @@ poll_timeout = 10 disabled = false include_extension = true +[collector.otelcollector] +disabled = true +statis_id = "" +exporter = "console" + +[collector.otelcollector.backend_args] +max_workers = 10 + +[collector.otelcollector.exporter_kwargs] + [collector.dbcollector] disabled = false statis_id = "" diff --git a/duetector/utils.py b/duetector/utils.py index c67bfa2..1074773 100644 --- a/duetector/utils.py +++ b/duetector/utils.py @@ -1,3 +1,4 @@ +import threading from datetime import datetime, timedelta try: @@ -8,10 +9,13 @@ class Singleton(type): _instances = {} + _lock = threading.Lock() def __call__(cls, *args, **kwargs): if cls not in cls._instances: - cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + with cls._lock: + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls] diff --git a/pyproject.toml b/pyproject.toml index 18ed336..7b01bdc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,10 +16,19 @@ dependencies = [ "SQLAlchemy>=2", "click", "psutil", - # Following are for web server + # OTel + "opentelemetry-sdk", + "opentelemetry-api", + "opentelemetry-semantic-conventions", + "opentelemetry-exporter-otlp-proto-grpc", + "opentelemetry-exporter-otlp-proto-http", + "opentelemetry-exporter-jaeger", + "opentelemetry-exporter-zipkin-proto-http", + "opentelemetry-exporter-zipkin-json", + # Web server "fastapi", "uvicorn[standard]", - "anyio" + "anyio", ] dynamic = ["version"] classifiers = [ diff --git a/tests/config.toml b/tests/config.toml index 036696f..bf5b8d4 100644 --- a/tests/config.toml +++ b/tests/config.toml @@ -55,6 +55,16 @@ poll_timeout = 10 disabled = false include_extension = true +[collector.otelcollector] +disabled = false +statis_id = "" +exporter = "console" + +[collector.otelcollector.backend_args] +max_workers = 10 + +[collector.otelcollector.exporter_kwargs] + [collector.dbcollector] disabled = false id = "unittest" diff --git a/tests/test_collector.py b/tests/test_db_collector.py similarity index 100% rename from tests/test_collector.py rename to tests/test_db_collector.py diff --git a/tests/test_otel_collector.py b/tests/test_otel_collector.py new file mode 100644 index 0000000..6504583 --- /dev/null +++ b/tests/test_otel_collector.py @@ -0,0 +1,52 @@ +import json +from collections import namedtuple +from copy import deepcopy + +import pytest + +from duetector.collectors.otel import OTelCollector +from duetector.utils import get_boot_time_duration_ns + +timestamp = 13205215231927 +datetime = get_boot_time_duration_ns(timestamp) + + +@pytest.fixture +def data_t(): + d = namedtuple("Tracking", ["pid", "uid", "gid", "comm", "fname", "timestamp", "custom"]) + + yield d( + pid=9999, + uid=9999, + gid=9999, + comm="dummy", + fname="dummy.file", + timestamp=timestamp, + custom="dummy-xargs", + ) + + +@pytest.fixture +def config(full_config): + c = deepcopy(full_config) + yield c["collector"] + + +@pytest.fixture +def collector(config): + return OTelCollector(config) + + +def test_dbcollector(collector: OTelCollector, data_t, capsys): + collector.emit("dummy", data_t) + collector.shutdown() + # FIXME: how to test this? Code below doesn't work, cannot capture stdout + # captured = capsys.readouterr() + # tracking = json.loads(captured.out) + # assert tracking["attributes"] + # for k in ["pid", "uid", "gid", "comm", "fname", "timestamp", "custom"]: + # assert k in tracking["attributes"] + + +if __name__ == "__main__": + pytest.main(["-s", "-vv", __file__])