Skip to content

Commit

Permalink
Feature: Support OTel Collector (#82)
Browse files Browse the repository at this point in the history
* Init OTel feature branch

* Add OTel Support

* Add some docs and rename mode to exporter

* Fix testing

* More statement when shutdown
  • Loading branch information
Wh1isper authored Oct 18, 2023
1 parent 357d935 commit f438561
Show file tree
Hide file tree
Showing 14 changed files with 295 additions and 18 deletions.
13 changes: 5 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,18 @@ Join our [slack channel](https://join.slack.com/t/hitsz-ids/shared_invite/zt-239
- `Tracer` Support
- [X] eBPF-based tracer
- [X] Shell command tracer
- [ ] Subprocess tracer
- [X] Subprocess tracer
- `Filter` Support
- [X] Pattern matching, based on regular expressions
- `Collector` and `Analyzer` Support
- [X] SQL database
- [ ] OpenTelemetry
- Analyzer Support
- [X] SQL database
- [ ] OpenTelemetry
- Data Collection and Analysis
- [X] `Analyzer` Support SQL database
- [X] `Collector` Support SQL database and *OpenTelemetry(Experimental)*
- User Interface
- [X] CLI Tools
- [X] PIP Service
- [ ] Control Panel
- Enhancements
- [ ] Runc containers identification
- [ ] `RunC` containers identification

The eBPF program requires kernel support, see [Kernel Support](./docs/kernel_config.md)

Expand Down
10 changes: 6 additions & 4 deletions README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,18 @@ duetector🔍是一个基于可扩展的的数据使用探测器,它可以在L
- `Tracer`支持
- [X] 基于eBPF的tracer
- [X] 基于shell命令的tracer
- [ ] 基于子进程的tracer
- [X] 基于子进程的tracer
- `Filter`支持
- [X] 支持正则的模式匹配
- `Collector``Analyzer`支持
- [X] SQL数据库
- [ ] Opentelemetry
- 遥测数据传输与分析
- [X] `Analyzer`支持SQL数据库
- [X] `Collector`支持SQL数据库和*Opentelemetry(实验性)*
- 用户接口
- [X] 命令行工具
- [X] PIP服务
- [ ] 控制平面
- 增强功能
- [ ] `RunC`容器云原生支持

eBPF程序需要内核支持,详见[内核支持](./docs/kernel_config.md)

Expand Down
2 changes: 1 addition & 1 deletion dev-tools/start-docs-host.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
set -e
docker run --rm \
-it \
-p 8080:80 \
-p 8910:80 \
-v $(pwd)/../docs/build/html:/usr/share/nginx/html:ro \
nginx
1 change: 1 addition & 0 deletions docs/source/collectors/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Avaliable Collector
:maxdepth: 2

DB Collectors <db>
OTel Collector <otel>


Data Models
Expand Down
15 changes: 15 additions & 0 deletions docs/source/collectors/otel.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
OTelCollector
==================

.. autoclass:: duetector.collectors.otel.OTelInitiator
:members:
:undoc-members:
:private-members:
:show-inheritance:


.. autoclass:: duetector.collectors.otel.OTelCollector
:members:
:undoc-members:
:private-members:
:show-inheritance:
10 changes: 10 additions & 0 deletions duetector/collectors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ def from_namedtuple(tracer, data: NamedTuple) -> Tracking: # type: ignore

return Tracking(**args)

def set_span(self, span):
for k in self.model_fields:
if k in ("tracer", "extended"):
continue
v = getattr(self, k)
if v is not None:
span.set_attribute(k, v)
for k, v in self.extended.items():
span.set_attribute(k, v)


if __name__ == "__main__":
Tracking(tracer="test", dt=datetime.now())
167 changes: 167 additions & 0 deletions duetector/collectors/otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from typing import Any, Dict, Optional

from opentelemetry import trace
from opentelemetry.exporter.jaeger.proto.grpc import (
JaegerExporter as GRPCJaegerExporter,
)
from opentelemetry.exporter.jaeger.thrift import JaegerExporter as ThriftJaegerExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GRPCOTLPSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HTTPOTLPSpanExporter,
)
from opentelemetry.exporter.zipkin.json import ZipkinExporter as JSONZipkinExporter
from opentelemetry.exporter.zipkin.proto.http import (
ZipkinExporter as HTTPZipkinExporter,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

from duetector.collectors.base import Collector
from duetector.collectors.models import Tracking
from duetector.extension.collector import hookimpl


class OTelInitiator:
"""
Host the OpenTelemetry SDK and initialize the provider and exporter.
Avaliable exporters:
- ``console``
- ``otlp-grpc``
- ``otlp-http``
- ``jaeger-thrift``
- ``jaeger-grpc``
- ``zipkin-http``
- ``zipkin-json``
- ``prometheus``
Example:
.. code-block:: python
otel = OTelInitiator()
trace = otel.initialize(
service_name="duetector",
exporter="console",
)
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("test") as span:
span.set_attribute("test", "test")
otel.shutdown()
"""

exporter_cls = {
"console": ConsoleSpanExporter,
"otlp-grpc": GRPCOTLPSpanExporter,
"otlp-http": HTTPOTLPSpanExporter,
"jaeger-thrift": ThriftJaegerExporter,
"jaeger-grpc": GRPCJaegerExporter,
"zipkin-http": HTTPZipkinExporter,
"zipkin-json": JSONZipkinExporter,
# Prometheus only support metrics
# "prometheus": "TODO"
}

def __init__(self):
self._initialized = False
self.provider = None

def initialize(
self,
service_name="unknown-service",
resource_kwargs: Optional[Dict[str, Any]] = None,
provider_kwargs: Optional[Dict[str, Any]] = None,
exporter="console",
exporter_kwargs: Optional[Dict[str, Any]] = None,
) -> None:
if self._initialized:
return

if not resource_kwargs:
resource_kwargs = {}
resource_kwargs.setdefault(SERVICE_NAME, service_name)
resource = Resource(attributes=resource_kwargs)

if not provider_kwargs:
provider_kwargs = {}
provider = TracerProvider(resource=resource, **provider_kwargs)
self.provider = provider

if not exporter_kwargs:
exporter_kwargs = {}
processor = BatchSpanProcessor(self.exporter_cls[exporter](**exporter_kwargs))

provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
self._initialized = True

def shutdown(self):
if self._initialized and self.provider:
self.provider.shutdown()
self._initialized = False
self.provider = None


class OTelCollector(Collector):
"""
A collector using OpenTelemetry SDK.
Config:
- ``exporter``: One of ``console``, ``otlp-grpc``, ``otlp-http``, ``jaeger-thrift``, ``jaeger-grpc``, ``zipkin-http``, ``zipkin-json``, see :class:`OTelInitiator` for more details
- ``exporter_kwargs``: A dict of kwargs for exporter
Note:
Since v1.35, the Jaeger supports OTLP natively. Please use the OTLP exporter instead. Support for this exporter will end July 2023.
"""

default_config = {
**Collector.default_config,
"disabled": True,
"exporter": "console",
"exporter_kwargs": {},
}

@property
def exporter(self) -> str:
return self.config.exporter

@property
def endpoint(self) -> Optional[str]:
return self.config.endpoint

@property
def exporter_kwargs(self) -> Dict[str, Any]:
return self.config.exporter_kwargs

def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)
self.otel = OTelInitiator()
self.otel.initialize(
service_name="duetector",
exporter=self.exporter,
exporter_kwargs=self.exporter_kwargs,
)

def _emit(self, t: Tracking):
tracer = trace.get_tracer(self.id)
with tracer.start_as_current_span(t.tracer) as span:
t.set_span(span)

def summary(self) -> Dict:
return {}

def shutdown(self):
super().shutdown()
self.otel.shutdown()


@hookimpl
def init_collector(config):
return OTelCollector(config)
4 changes: 2 additions & 2 deletions duetector/collectors/register.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Expose for plugin system
from . import base, db
from . import base, db, otel

registers = [base, db]
registers = [base, db, otel]
10 changes: 10 additions & 0 deletions duetector/static/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ poll_timeout = 10
disabled = false
include_extension = true

[collector.otelcollector]
disabled = true
statis_id = ""
exporter = "console"

[collector.otelcollector.backend_args]
max_workers = 10

[collector.otelcollector.exporter_kwargs]

[collector.dbcollector]
disabled = false
statis_id = ""
Expand Down
6 changes: 5 additions & 1 deletion duetector/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
from datetime import datetime, timedelta

try:
Expand All @@ -8,10 +9,13 @@

class Singleton(type):
_instances = {}
_lock = threading.Lock()

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
with cls._lock:
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


Expand Down
13 changes: 11 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,19 @@ dependencies = [
"SQLAlchemy>=2",
"click",
"psutil",
# Following are for web server
# OTel
"opentelemetry-sdk",
"opentelemetry-api",
"opentelemetry-semantic-conventions",
"opentelemetry-exporter-otlp-proto-grpc",
"opentelemetry-exporter-otlp-proto-http",
"opentelemetry-exporter-jaeger",
"opentelemetry-exporter-zipkin-proto-http",
"opentelemetry-exporter-zipkin-json",
# Web server
"fastapi",
"uvicorn[standard]",
"anyio"
"anyio",
]
dynamic = ["version"]
classifiers = [
Expand Down
10 changes: 10 additions & 0 deletions tests/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ poll_timeout = 10
disabled = false
include_extension = true

[collector.otelcollector]
disabled = false
statis_id = ""
exporter = "console"

[collector.otelcollector.backend_args]
max_workers = 10

[collector.otelcollector.exporter_kwargs]

[collector.dbcollector]
disabled = false
id = "unittest"
Expand Down
File renamed without changes.
Loading

0 comments on commit f438561

Please sign in to comment.