From bdb0d40841b4d861cbbe1adbee09530f87fe312b Mon Sep 17 00:00:00 2001 From: wunder957 Date: Tue, 17 Oct 2023 16:28:25 +0800 Subject: [PATCH 1/5] Init OTel feature branch --- duetector/collectors/otel.py | 27 +++++++++++++++++++++++++++ duetector/collectors/register.py | 4 ++-- duetector/static/config.toml | 7 +++++++ pyproject.toml | 14 ++++++++++++-- 4 files changed, 48 insertions(+), 4 deletions(-) create mode 100644 duetector/collectors/otel.py diff --git a/duetector/collectors/otel.py b/duetector/collectors/otel.py new file mode 100644 index 0000000..db45a64 --- /dev/null +++ b/duetector/collectors/otel.py @@ -0,0 +1,27 @@ +from typing import Any, Dict, Optional + +from duetector.collectors.base import Collector +from duetector.collectors.models import Tracking +from duetector.extension.collector import hookimpl + + +class OTelCollector(Collector): + default_config = { + **Collector.default_config, + } + + def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs): + super().__init__(config, *args, **kwargs) + # Init as a submodel + + def _emit(self, t: Tracking): + pass + + def summary(self) -> Dict: + # TODO: implement this, only cache data in debug mode(console exporter) + return {} + + +@hookimpl +def init_collector(config): + return OTelCollector(config) diff --git a/duetector/collectors/register.py b/duetector/collectors/register.py index ac07b16..ab5c2ec 100644 --- a/duetector/collectors/register.py +++ b/duetector/collectors/register.py @@ -1,4 +1,4 @@ # Expose for plugin system -from . import base, db +from . import base, db, otel -registers = [base, db] +registers = [base, db, otel] diff --git a/duetector/static/config.toml b/duetector/static/config.toml index cf5c530..e1b6b89 100644 --- a/duetector/static/config.toml +++ b/duetector/static/config.toml @@ -63,6 +63,13 @@ poll_timeout = 10 disabled = false include_extension = true +[collector.otelcollector] +disabled = false +statis_id = "" + +[collector.otelcollector.backend_args] +max_workers = 10 + [collector.dbcollector] disabled = false statis_id = "" diff --git a/pyproject.toml b/pyproject.toml index 18ed336..80d98f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,10 +16,20 @@ dependencies = [ "SQLAlchemy>=2", "click", "psutil", - # Following are for web server + # OTel + "opentelemetry-sdk", + "opentelemetry-api", + "opentelemetry-semantic-conventions", + "opentelemetry-exporter-otlp-proto-grpc", + "opentelemetry-exporter-otlp-proto-http", + "opentelemetry-exporter-jaeger", + "opentelemetry-exporter-zipkin-proto-http", + "opentelemetry-exporter-zipkin-json", + "opentelemetry-exporter-prometheus", + # Web server "fastapi", "uvicorn[standard]", - "anyio" + "anyio", ] dynamic = ["version"] classifiers = [ From 06a1625003a983f7ed4086d18f04536e41e9fa12 Mon Sep 17 00:00:00 2001 From: wunder957 Date: Wed, 18 Oct 2023 15:11:28 +0800 Subject: [PATCH 2/5] Add OTel Support --- dev-tools/start-docs-host.sh | 2 +- docs/source/collectors/index.rst | 1 + docs/source/collectors/otel.rst | 15 +++ duetector/collectors/models.py | 10 ++ duetector/collectors/otel.py | 101 +++++++++++++++++- duetector/static/config.toml | 5 +- duetector/utils.py | 6 +- pyproject.toml | 1 - tests/config.toml | 10 ++ ...test_collector.py => test_db_collector.py} | 0 tests/test_otel_collector.py | 52 +++++++++ 11 files changed, 196 insertions(+), 7 deletions(-) create mode 100644 docs/source/collectors/otel.rst rename tests/{test_collector.py => test_db_collector.py} (100%) create mode 100644 tests/test_otel_collector.py diff --git a/dev-tools/start-docs-host.sh b/dev-tools/start-docs-host.sh index c9543b3..3a75321 100755 --- a/dev-tools/start-docs-host.sh +++ b/dev-tools/start-docs-host.sh @@ -6,6 +6,6 @@ set -e docker run --rm \ -it \ --p 8080:80 \ +-p 8910:80 \ -v $(pwd)/../docs/build/html:/usr/share/nginx/html:ro \ nginx diff --git a/docs/source/collectors/index.rst b/docs/source/collectors/index.rst index b7f4ba6..e2c8324 100644 --- a/docs/source/collectors/index.rst +++ b/docs/source/collectors/index.rst @@ -25,6 +25,7 @@ Avaliable Collector :maxdepth: 2 DB Collectors + OTel Collector Data Models diff --git a/docs/source/collectors/otel.rst b/docs/source/collectors/otel.rst new file mode 100644 index 0000000..54c0de4 --- /dev/null +++ b/docs/source/collectors/otel.rst @@ -0,0 +1,15 @@ +OTelCollector +================== + +.. autoclass:: duetector.collectors.otel.OTelInitiator + :members: + :undoc-members: + :private-members: + :show-inheritance: + + +.. autoclass:: duetector.collectors.otel.OTelCollector + :members: + :undoc-members: + :private-members: + :show-inheritance: diff --git a/duetector/collectors/models.py b/duetector/collectors/models.py index dfb074e..cbca6a3 100644 --- a/duetector/collectors/models.py +++ b/duetector/collectors/models.py @@ -102,6 +102,16 @@ def from_namedtuple(tracer, data: NamedTuple) -> Tracking: # type: ignore return Tracking(**args) + def set_span(self, span): + for k in self.model_fields: + if k in ("tracer", "extended"): + continue + v = getattr(self, k) + if v is not None: + span.set_attribute(k, v) + for k, v in self.extended.items(): + span.set_attribute(k, v) + if __name__ == "__main__": Tracking(tracer="test", dt=datetime.now()) diff --git a/duetector/collectors/otel.py b/duetector/collectors/otel.py index db45a64..516c5c2 100644 --- a/duetector/collectors/otel.py +++ b/duetector/collectors/otel.py @@ -1,26 +1,121 @@ from typing import Any, Dict, Optional +from opentelemetry import trace +from opentelemetry.exporter.jaeger.proto.grpc import ( + JaegerExporter as GRPCJaegerExporter, +) +from opentelemetry.exporter.jaeger.thrift import JaegerExporter as ThriftJaegerExporter +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter as GRPCOTLPSpanExporter, +) +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter as HTTPOTLPSpanExporter, +) +from opentelemetry.exporter.zipkin.json import ZipkinExporter as JSONZipkinExporter +from opentelemetry.exporter.zipkin.proto.http import ( + ZipkinExporter as HTTPZipkinExporter, +) +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter + from duetector.collectors.base import Collector from duetector.collectors.models import Tracking from duetector.extension.collector import hookimpl +class OTelInitiator: + exporter_cls = { + "console": ConsoleSpanExporter, + "otlp-grpc": GRPCOTLPSpanExporter, + "otlp-http": HTTPOTLPSpanExporter, + "jaeger-thrift": ThriftJaegerExporter, + "jaeger-grpc": GRPCJaegerExporter, + "zipkin-http": HTTPZipkinExporter, + "zipkin-json": JSONZipkinExporter, + # Prometheus only support metrics + # "prometheus": "TODO" + } + + def __init__(self): + self._initialized = False + self.provider = None + + def initialize( + self, + service_name="unknown-service", + resource_kwargs: Optional[Dict[str, Any]] = None, + provider_kwargs: Optional[Dict[str, Any]] = None, + exporter="console", + exporter_kwargs: Optional[Dict[str, Any]] = None, + ) -> None: + if self._initialized: + return + + if not resource_kwargs: + resource_kwargs = {} + resource_kwargs.setdefault(SERVICE_NAME, service_name) + resource = Resource(attributes=resource_kwargs) + + if not provider_kwargs: + provider_kwargs = {} + provider = TracerProvider(resource=resource, **provider_kwargs) + self.provider = provider + + if not exporter_kwargs: + exporter_kwargs = {} + processor = BatchSpanProcessor(self.exporter_cls[exporter](**exporter_kwargs)) + + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + self._initialized = True + + def shutdown(self): + self.provider.shutdown() + self.provider = None + + class OTelCollector(Collector): default_config = { **Collector.default_config, + "disabled": True, + "mode": "console", + "exporter_kwargs": {}, } + @property + def mode(self) -> str: + return self.config.mode + + @property + def endpoint(self) -> Optional[str]: + return self.config.endpoint + + @property + def exporter_kwargs(self) -> Dict[str, Any]: + return self.config.exporter_kwargs + def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs): super().__init__(config, *args, **kwargs) - # Init as a submodel + self.otel = OTelInitiator() + self.otel.initialize( + service_name="duetector", + exporter=self.mode, + exporter_kwargs=self.exporter_kwargs, + ) def _emit(self, t: Tracking): - pass + tracer = trace.get_tracer(self.id) + with tracer.start_as_current_span(t.tracer) as span: + t.set_span(span) def summary(self) -> Dict: - # TODO: implement this, only cache data in debug mode(console exporter) return {} + def shutdown(self): + super().shutdown() + self.otel.shutdown() + @hookimpl def init_collector(config): diff --git a/duetector/static/config.toml b/duetector/static/config.toml index e1b6b89..b4c6552 100644 --- a/duetector/static/config.toml +++ b/duetector/static/config.toml @@ -64,12 +64,15 @@ disabled = false include_extension = true [collector.otelcollector] -disabled = false +disabled = true statis_id = "" +mode = "console" [collector.otelcollector.backend_args] max_workers = 10 +[collector.otelcollector.exporter_kwargs] + [collector.dbcollector] disabled = false statis_id = "" diff --git a/duetector/utils.py b/duetector/utils.py index c67bfa2..1074773 100644 --- a/duetector/utils.py +++ b/duetector/utils.py @@ -1,3 +1,4 @@ +import threading from datetime import datetime, timedelta try: @@ -8,10 +9,13 @@ class Singleton(type): _instances = {} + _lock = threading.Lock() def __call__(cls, *args, **kwargs): if cls not in cls._instances: - cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + with cls._lock: + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls] diff --git a/pyproject.toml b/pyproject.toml index 80d98f0..7b01bdc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,6 @@ dependencies = [ "opentelemetry-exporter-jaeger", "opentelemetry-exporter-zipkin-proto-http", "opentelemetry-exporter-zipkin-json", - "opentelemetry-exporter-prometheus", # Web server "fastapi", "uvicorn[standard]", diff --git a/tests/config.toml b/tests/config.toml index 036696f..8fef471 100644 --- a/tests/config.toml +++ b/tests/config.toml @@ -55,6 +55,16 @@ poll_timeout = 10 disabled = false include_extension = true +[collector.otelcollector] +disabled = false +statis_id = "" +mode = "console" + +[collector.otelcollector.backend_args] +max_workers = 10 + +[collector.otelcollector.exporter_kwargs] + [collector.dbcollector] disabled = false id = "unittest" diff --git a/tests/test_collector.py b/tests/test_db_collector.py similarity index 100% rename from tests/test_collector.py rename to tests/test_db_collector.py diff --git a/tests/test_otel_collector.py b/tests/test_otel_collector.py new file mode 100644 index 0000000..6504583 --- /dev/null +++ b/tests/test_otel_collector.py @@ -0,0 +1,52 @@ +import json +from collections import namedtuple +from copy import deepcopy + +import pytest + +from duetector.collectors.otel import OTelCollector +from duetector.utils import get_boot_time_duration_ns + +timestamp = 13205215231927 +datetime = get_boot_time_duration_ns(timestamp) + + +@pytest.fixture +def data_t(): + d = namedtuple("Tracking", ["pid", "uid", "gid", "comm", "fname", "timestamp", "custom"]) + + yield d( + pid=9999, + uid=9999, + gid=9999, + comm="dummy", + fname="dummy.file", + timestamp=timestamp, + custom="dummy-xargs", + ) + + +@pytest.fixture +def config(full_config): + c = deepcopy(full_config) + yield c["collector"] + + +@pytest.fixture +def collector(config): + return OTelCollector(config) + + +def test_dbcollector(collector: OTelCollector, data_t, capsys): + collector.emit("dummy", data_t) + collector.shutdown() + # FIXME: how to test this? Code below doesn't work, cannot capture stdout + # captured = capsys.readouterr() + # tracking = json.loads(captured.out) + # assert tracking["attributes"] + # for k in ["pid", "uid", "gid", "comm", "fname", "timestamp", "custom"]: + # assert k in tracking["attributes"] + + +if __name__ == "__main__": + pytest.main(["-s", "-vv", __file__]) From e726f24782e87d8718f5f42b5052947f160f3443 Mon Sep 17 00:00:00 2001 From: wunder957 Date: Wed, 18 Oct 2023 15:22:53 +0800 Subject: [PATCH 3/5] Add some docs and rename mode to exporter --- README.md | 13 ++++----- README_zh.md | 10 ++++--- duetector/collectors/otel.py | 51 +++++++++++++++++++++++++++++++++--- duetector/static/config.toml | 2 +- tests/config.toml | 2 +- 5 files changed, 60 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 74fa471..faf3729 100644 --- a/README.md +++ b/README.md @@ -55,21 +55,18 @@ Join our [slack channel](https://join.slack.com/t/hitsz-ids/shared_invite/zt-239 - `Tracer` Support - [X] eBPF-based tracer - [X] Shell command tracer - - [ ] Subprocess tracer + - [X] Subprocess tracer - `Filter` Support - [X] Pattern matching, based on regular expressions -- `Collector` and `Analyzer` Support - - [X] SQL database - - [ ] OpenTelemetry -- Analyzer Support - - [X] SQL database - - [ ] OpenTelemetry +- Data Collection and Analysis + - [X] `Analyzer` Support SQL database + - [X] `Collector` Support SQL database and *OpenTelemetry(Experimental)* - User Interface - [X] CLI Tools - [X] PIP Service - [ ] Control Panel - Enhancements - - [ ] Runc containers identification + - [ ] `RunC` containers identification The eBPF program requires kernel support, see [Kernel Support](./docs/kernel_config.md) diff --git a/README_zh.md b/README_zh.md index 65cf419..76617cd 100644 --- a/README_zh.md +++ b/README_zh.md @@ -59,16 +59,18 @@ duetector🔍是一个基于可扩展的的数据使用探测器,它可以在L - `Tracer`支持 - [X] 基于eBPF的tracer - [X] 基于shell命令的tracer - - [ ] 基于子进程的tracer + - [X] 基于子进程的tracer - `Filter`支持 - [X] 支持正则的模式匹配 -- `Collector`和`Analyzer`支持 - - [X] SQL数据库 - - [ ] Opentelemetry +- 遥测数据传输与分析 + - [X] `Analyzer`支持SQL数据库 + - [X] `Collector`支持SQL数据库和*Opentelemetry(实验性)* - 用户接口 - [X] 命令行工具 - [X] PIP服务 - [ ] 控制平面 +- 增强功能 + - [ ] `RunC`容器云原生支持 eBPF程序需要内核支持,详见[内核支持](./docs/kernel_config.md) diff --git a/duetector/collectors/otel.py b/duetector/collectors/otel.py index 516c5c2..509a325 100644 --- a/duetector/collectors/otel.py +++ b/duetector/collectors/otel.py @@ -25,6 +25,37 @@ class OTelInitiator: + """ + Host the OpenTelemetry SDK and initialize the provider and exporter. + + Avaliable exporters: + - ``console`` + - ``otlp-grpc`` + - ``otlp-http`` + - ``jaeger-thrift`` + - ``jaeger-grpc`` + - ``zipkin-http`` + - ``zipkin-json`` + - ``prometheus`` + + Example: + + .. code-block:: python + + otel = OTelInitiator() + trace = otel.initialize( + service_name="duetector", + exporter="console", + ) + + from opentelemetry import trace + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("test") as span: + span.set_attribute("test", "test") + + otel.shutdown() + """ + exporter_cls = { "console": ConsoleSpanExporter, "otlp-grpc": GRPCOTLPSpanExporter, @@ -76,16 +107,28 @@ def shutdown(self): class OTelCollector(Collector): + """ + A collector using OpenTelemetry SDK. + + Config: + - ``exporter``: One of ``console``, ``otlp-grpc``, ``otlp-http``, ``jaeger-thrift``, ``jaeger-grpc``, ``zipkin-http``, ``zipkin-json``, see :class:`OTelInitiator` for more details + - ``exporter_kwargs``: A dict of kwargs for exporter + + Note: + Since v1.35, the Jaeger supports OTLP natively. Please use the OTLP exporter instead. Support for this exporter will end July 2023. + + """ + default_config = { **Collector.default_config, "disabled": True, - "mode": "console", + "exporter": "console", "exporter_kwargs": {}, } @property - def mode(self) -> str: - return self.config.mode + def exporter(self) -> str: + return self.config.exporter @property def endpoint(self) -> Optional[str]: @@ -100,7 +143,7 @@ def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs): self.otel = OTelInitiator() self.otel.initialize( service_name="duetector", - exporter=self.mode, + exporter=self.exporter, exporter_kwargs=self.exporter_kwargs, ) diff --git a/duetector/static/config.toml b/duetector/static/config.toml index b4c6552..d053cdf 100644 --- a/duetector/static/config.toml +++ b/duetector/static/config.toml @@ -66,7 +66,7 @@ include_extension = true [collector.otelcollector] disabled = true statis_id = "" -mode = "console" +exporter = "console" [collector.otelcollector.backend_args] max_workers = 10 diff --git a/tests/config.toml b/tests/config.toml index 8fef471..bf5b8d4 100644 --- a/tests/config.toml +++ b/tests/config.toml @@ -58,7 +58,7 @@ include_extension = true [collector.otelcollector] disabled = false statis_id = "" -mode = "console" +exporter = "console" [collector.otelcollector.backend_args] max_workers = 10 From f8bc1f13b42e0745a3b70e8d6ebf9f9b5f231a93 Mon Sep 17 00:00:00 2001 From: wunder957 Date: Wed, 18 Oct 2023 15:32:25 +0800 Subject: [PATCH 4/5] Fix testing --- duetector/collectors/otel.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/duetector/collectors/otel.py b/duetector/collectors/otel.py index 509a325..2ed5c07 100644 --- a/duetector/collectors/otel.py +++ b/duetector/collectors/otel.py @@ -102,8 +102,10 @@ def initialize( self._initialized = True def shutdown(self): - self.provider.shutdown() - self.provider = None + if self.provider: + self.provider.shutdown() + self._initialized = False + self.provider = None class OTelCollector(Collector): From 20c997625e45685393ea0be39ea2be40bc71e3c6 Mon Sep 17 00:00:00 2001 From: wunder957 Date: Wed, 18 Oct 2023 15:32:45 +0800 Subject: [PATCH 5/5] More statement when shutdown --- duetector/collectors/otel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/duetector/collectors/otel.py b/duetector/collectors/otel.py index 2ed5c07..156eef8 100644 --- a/duetector/collectors/otel.py +++ b/duetector/collectors/otel.py @@ -102,7 +102,7 @@ def initialize( self._initialized = True def shutdown(self): - if self.provider: + if self._initialized and self.provider: self.provider.shutdown() self._initialized = False self.provider = None