Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Support OTel Collector #82

Merged
merged 5 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,18 @@ Join our [slack channel](https://join.slack.com/t/hitsz-ids/shared_invite/zt-239
- `Tracer` Support
- [X] eBPF-based tracer
- [X] Shell command tracer
- [ ] Subprocess tracer
- [X] Subprocess tracer
- `Filter` Support
- [X] Pattern matching, based on regular expressions
- `Collector` and `Analyzer` Support
- [X] SQL database
- [ ] OpenTelemetry
- Analyzer Support
- [X] SQL database
- [ ] OpenTelemetry
- Data Collection and Analysis
- [X] `Analyzer` Support SQL database
- [X] `Collector` Support SQL database and *OpenTelemetry(Experimental)*
- User Interface
- [X] CLI Tools
- [X] PIP Service
- [ ] Control Panel
- Enhancements
- [ ] Runc containers identification
- [ ] `RunC` containers identification

The eBPF program requires kernel support, see [Kernel Support](./docs/kernel_config.md)

Expand Down
10 changes: 6 additions & 4 deletions README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,18 @@ duetector🔍是一个基于可扩展的的数据使用探测器,它可以在L
- `Tracer`支持
- [X] 基于eBPF的tracer
- [X] 基于shell命令的tracer
- [ ] 基于子进程的tracer
- [X] 基于子进程的tracer
- `Filter`支持
- [X] 支持正则的模式匹配
- `Collector`和`Analyzer`支持
- [X] SQL数据库
- [ ] Opentelemetry
- 遥测数据传输与分析
- [X] `Analyzer`支持SQL数据库
- [X] `Collector`支持SQL数据库和*Opentelemetry(实验性)*
- 用户接口
- [X] 命令行工具
- [X] PIP服务
- [ ] 控制平面
- 增强功能
- [ ] `RunC`容器云原生支持

eBPF程序需要内核支持,详见[内核支持](./docs/kernel_config.md)

Expand Down
2 changes: 1 addition & 1 deletion dev-tools/start-docs-host.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
set -e
docker run --rm \
-it \
-p 8080:80 \
-p 8910:80 \
-v $(pwd)/../docs/build/html:/usr/share/nginx/html:ro \
nginx
1 change: 1 addition & 0 deletions docs/source/collectors/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Avaliable Collector
:maxdepth: 2

DB Collectors <db>
OTel Collector <otel>


Data Models
Expand Down
15 changes: 15 additions & 0 deletions docs/source/collectors/otel.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
OTelCollector
==================

.. autoclass:: duetector.collectors.otel.OTelInitiator
:members:
:undoc-members:
:private-members:
:show-inheritance:


.. autoclass:: duetector.collectors.otel.OTelCollector
:members:
:undoc-members:
:private-members:
:show-inheritance:
10 changes: 10 additions & 0 deletions duetector/collectors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ def from_namedtuple(tracer, data: NamedTuple) -> Tracking: # type: ignore

return Tracking(**args)

def set_span(self, span):
for k in self.model_fields:
if k in ("tracer", "extended"):
continue
v = getattr(self, k)
if v is not None:
span.set_attribute(k, v)
for k, v in self.extended.items():
span.set_attribute(k, v)


if __name__ == "__main__":
Tracking(tracer="test", dt=datetime.now())
167 changes: 167 additions & 0 deletions duetector/collectors/otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from typing import Any, Dict, Optional

from opentelemetry import trace
from opentelemetry.exporter.jaeger.proto.grpc import (
JaegerExporter as GRPCJaegerExporter,
)
from opentelemetry.exporter.jaeger.thrift import JaegerExporter as ThriftJaegerExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GRPCOTLPSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HTTPOTLPSpanExporter,
)
from opentelemetry.exporter.zipkin.json import ZipkinExporter as JSONZipkinExporter
from opentelemetry.exporter.zipkin.proto.http import (
ZipkinExporter as HTTPZipkinExporter,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

from duetector.collectors.base import Collector
from duetector.collectors.models import Tracking
from duetector.extension.collector import hookimpl


class OTelInitiator:
"""
Host the OpenTelemetry SDK and initialize the provider and exporter.

Avaliable exporters:
- ``console``
- ``otlp-grpc``
- ``otlp-http``
- ``jaeger-thrift``
- ``jaeger-grpc``
- ``zipkin-http``
- ``zipkin-json``
- ``prometheus``

Example:

.. code-block:: python

otel = OTelInitiator()
trace = otel.initialize(
service_name="duetector",
exporter="console",
)

from opentelemetry import trace
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("test") as span:
span.set_attribute("test", "test")

otel.shutdown()
"""

exporter_cls = {
"console": ConsoleSpanExporter,
"otlp-grpc": GRPCOTLPSpanExporter,
"otlp-http": HTTPOTLPSpanExporter,
"jaeger-thrift": ThriftJaegerExporter,
"jaeger-grpc": GRPCJaegerExporter,
"zipkin-http": HTTPZipkinExporter,
"zipkin-json": JSONZipkinExporter,
# Prometheus only support metrics
# "prometheus": "TODO"
}

def __init__(self):
self._initialized = False
self.provider = None

def initialize(
self,
service_name="unknown-service",
resource_kwargs: Optional[Dict[str, Any]] = None,
provider_kwargs: Optional[Dict[str, Any]] = None,
exporter="console",
exporter_kwargs: Optional[Dict[str, Any]] = None,
) -> None:
if self._initialized:
return

if not resource_kwargs:
resource_kwargs = {}
resource_kwargs.setdefault(SERVICE_NAME, service_name)
resource = Resource(attributes=resource_kwargs)

if not provider_kwargs:
provider_kwargs = {}
provider = TracerProvider(resource=resource, **provider_kwargs)
self.provider = provider

if not exporter_kwargs:
exporter_kwargs = {}
processor = BatchSpanProcessor(self.exporter_cls[exporter](**exporter_kwargs))

provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
self._initialized = True

def shutdown(self):
if self._initialized and self.provider:
self.provider.shutdown()
self._initialized = False
self.provider = None


class OTelCollector(Collector):
"""
A collector using OpenTelemetry SDK.

Config:
- ``exporter``: One of ``console``, ``otlp-grpc``, ``otlp-http``, ``jaeger-thrift``, ``jaeger-grpc``, ``zipkin-http``, ``zipkin-json``, see :class:`OTelInitiator` for more details
- ``exporter_kwargs``: A dict of kwargs for exporter

Note:
Since v1.35, the Jaeger supports OTLP natively. Please use the OTLP exporter instead. Support for this exporter will end July 2023.

"""

default_config = {
**Collector.default_config,
"disabled": True,
"exporter": "console",
"exporter_kwargs": {},
}

@property
def exporter(self) -> str:
return self.config.exporter

@property
def endpoint(self) -> Optional[str]:
return self.config.endpoint

@property
def exporter_kwargs(self) -> Dict[str, Any]:
return self.config.exporter_kwargs

def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)
self.otel = OTelInitiator()
self.otel.initialize(
service_name="duetector",
exporter=self.exporter,
exporter_kwargs=self.exporter_kwargs,
)

def _emit(self, t: Tracking):
tracer = trace.get_tracer(self.id)
with tracer.start_as_current_span(t.tracer) as span:
t.set_span(span)

def summary(self) -> Dict:
return {}

def shutdown(self):
super().shutdown()
self.otel.shutdown()


@hookimpl
def init_collector(config):
return OTelCollector(config)
4 changes: 2 additions & 2 deletions duetector/collectors/register.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Expose for plugin system
from . import base, db
from . import base, db, otel

registers = [base, db]
registers = [base, db, otel]
10 changes: 10 additions & 0 deletions duetector/static/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ poll_timeout = 10
disabled = false
include_extension = true

[collector.otelcollector]
disabled = true
statis_id = ""
exporter = "console"

[collector.otelcollector.backend_args]
max_workers = 10

[collector.otelcollector.exporter_kwargs]

[collector.dbcollector]
disabled = false
statis_id = ""
Expand Down
6 changes: 5 additions & 1 deletion duetector/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
from datetime import datetime, timedelta

try:
Expand All @@ -8,10 +9,13 @@

class Singleton(type):
_instances = {}
_lock = threading.Lock()

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
with cls._lock:
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


Expand Down
13 changes: 11 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,19 @@ dependencies = [
"SQLAlchemy>=2",
"click",
"psutil",
# Following are for web server
# OTel
"opentelemetry-sdk",
"opentelemetry-api",
"opentelemetry-semantic-conventions",
"opentelemetry-exporter-otlp-proto-grpc",
"opentelemetry-exporter-otlp-proto-http",
"opentelemetry-exporter-jaeger",
"opentelemetry-exporter-zipkin-proto-http",
"opentelemetry-exporter-zipkin-json",
# Web server
"fastapi",
"uvicorn[standard]",
"anyio"
"anyio",
]
dynamic = ["version"]
classifiers = [
Expand Down
10 changes: 10 additions & 0 deletions tests/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ poll_timeout = 10
disabled = false
include_extension = true

[collector.otelcollector]
disabled = false
statis_id = ""
exporter = "console"

[collector.otelcollector.backend_args]
max_workers = 10

[collector.otelcollector.exporter_kwargs]

[collector.dbcollector]
disabled = false
id = "unittest"
Expand Down
File renamed without changes.
Loading