diff --git a/CORE/README.md b/CORE/README.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/design/CO-RE.md b/docs/design/CO-RE.md new file mode 100644 index 0000000..a8bc354 --- /dev/null +++ b/docs/design/CO-RE.md @@ -0,0 +1,166 @@ +# CO-RE and SubprocessMonitor [#44](https://github.com/hitsz-ids/duetector/issues/44) + +## 1. Introduction of CO-RE + +> If you already familiar with [CO-RE](https://facebookmicrosites.github.io/bpf/blog/2020/02/19/bpf-portability-and-co-re.html), you can skip this section. + +### 1.1 Problem + +A BPF program is a piece of user-provided code which is injected straight into a kernel. Inevitably, a dependency on kernel data structures will be introduced. Unfortunately, the data structure of the kernel is not set in stone. + +> In such cases, how do you make sure you are not reading garbage data when some kernel added an extra field before the field you thought is, say, at offset 16 from the start of `struct task_struct`? Suddenly, for that kernel, you'll need to read data from, e.g., offset 24. And the problems don't end there: what if a field got renamed, as was the case with `thread_struct`'s `fs` field (useful for accessing thread-local storage), which got renamed to `fsbase` between 4.6 and 4.7 kernels. + +All this means that you can no longer compile your BPF program locally using kernel headers of your dev server and distribute it in compiled form to other systems, while expecting it to work and produce correct results. **This is because kernel headers for different kernel versions will specify a different memory layout of data your program relies on.** + +### 1.2 Status quo + +One possible solution is to compile the BPF program in real-time on the actual running machine. This is what [BCC](https://github.com/iovisor/bcc/) did and our `BccTracer` is based on it. However, this approach has a few drawbacks: + +- Clang/LLVM combo is a big library, resulting in big fat binaries that need to be distributed with your application.(We do suffer from this problem.) +- Clang/LLVM combo is resource-heavy, so when you are compiling BPF code at start up, you'll use a significant amount of resources, potentially tipping over a carefully balanced production workfload. And vice versa, on a busy host, compiling a small BPF program might take minutes in some cases.(Yes, it's slowing down our startup and causes a large performance impact when popping containers at scale.) +- You are making a big bet that the target system will have kernel headers present, which most of the time is not a problem, but sometimes can cause a lot of headaches. (Yes, see [how-to/run-with-docker](../how-to/run-with-docker.md)) +- BPF program testing and development iteration is quite painful as well. (Yes, we haven't found a good way to test BPF programs:[#47](https://github.com/hitsz-ids/duetector/issues/47).) + +### 1.3 BPF CO-RE helps + +BPF CO-RE, by making the kernel self-contained information, together with a BPF loader, allows BPF programs to be compiled offline, and then loaded into the kernel without the need to recompile them on the target system. This is a huge improvement over the status quo. + +BPF CO-RE requires the following parts: + +- BTF type information, which allows to capture crucial pieces of information about kernel and BPF program types and code, enabling all the other parts of BPF CO-RE puzzle; +- BPF loader ([libbpf](https://github.com/libbpf/libbpf)) ties BTFs from kernel and BPF program together to adjust compiled BPF code to specific kernel on target hosts; +- kernel, while staying completely BPF CO-RE-agnostic, provides advanced BPF features to enable some of the more advanced scenarios. + +There are three language have its own CO-RE implementation: + +- C: [libbpf](https://github.com/libbpf/libbpf) +- Rust: [aya-rs](https://github.com/aya-rs/aya) +- Go: [cilium/ebpf](https://github.com/cilium/ebpf), it transforms C code to Go code and then compile it. + +### 1.4 Requirements of CO-RE + +| Feature | Kernel version | Commit | +| --------------------- | -------------- | -------------------------------------------------------------------------------------------------- | +| BPF Type Format (BTF) | 4.18 | [`69b693f0aefa`](https://github.com/torvalds/linux/commit/69b693f0aefa0ed521e8bd02260523b5ae446ad7) | + +> Note: +> +> - If one `SubprocessTracer` not based on `CO-RE`, it will not require BTF. + +## 2 Introduction of SubprocessMonitor + +`SubprocessMonitor` will provide support for subprocess-based tracer.Its primary goal is to support the CO-RE program. This is because CO-RE programs are often written through the underlying language(C/Rust). It's hard to write a CO-RE program in Python(CPython may support, but it's not a good idea). And It's hard for C/Rust to write a `duetector`-like program. So `SubprocessMonitor` will provide a way to write a CO-RE program in C/Rust and use it in Python. Instead of writing cumbersome programs in userspace, C/Rust users simply communicate with `SubprocessMonitor` through a protocol. + +Furthermore, `SubprocessMonitor` also support non-BPF program, as long as it can be run in a subprocess and obey the protocol. + +## 3. Protocol + +**This is a very early experimental Protocol with a high probability of extreme variability, and at the same time, many of the fields are undefined** + +### 3.1 Protocol Definition + +#### 3.1.1 Message Format + +```python +{ + "proto": "duetector", // or opentelemetry if type is event + "type": "" // init, event, stop, stopped + "version": "0.1.0", // protocol version + "payload":{} +} +``` + +#### 3.1.2 Init Message + +##### From host(reqeust) + +type: `init` + +payload: +- `poll_time`: unit: seconds +- `kill_timeout`: unit, seconds +- `config`: object, config of tracer + +##### From subprocess(response) + +type: `init` + +payload: +- `name`: string, name of tracer +- `version`: string, version of tracer + +#### 3.1.3 Event Message + +##### From host + +type: `event` + +payload: + +##### From subprocess + +type: `event` + +payload: object of tracking data or OpenTelemetry OTLP things + +#### 3.1.3 Stop Message + +##### From host + +type: `stop` + +payload: + +##### From subprocess + +type: `stopped` + +payload: + +### 3.2 Intergation with OpenTelemetry + +After we intro OpenTelemetry in [#25](https://github.com/hitsz-ids/duetector/issues/25), we can support OpenTelemetry protocol to passthrough the `Event` message. + +## 4. Design + +### 4.1 Architecture + +![SubprocessMonitorArch](./image/spm-arch.png) + +### 4.2 State Transfer & Failure Handling + +![SubrpocessMonitorFailureHandling](./image/spm-state-flow.png) + +- When a `SubprocessTracer` `attatch` to the `SubprocessHost`, host will `Popen` a subprocess and send `init` message to subprocess. +- After the subprocess is started, subprocess will send a `init` message to host. +- Subprocess will send `Event` message to host when it has something to report. +- Host will poll the subprocess's stdout and stderr periodically and send `Event` message to subprocess. +- When `detach` is called, host will send SIGTERM and `stop` message to subprocess and wait for subprocess to exit, if subprocess doesn't exit in `kill_timeout` seconds, host will send SIGKILL to subprocess. +- When subprocess stops, it will send a `stopped` message to host. + +#### When Host Process Crashes + +In this case, subprocess becomes an orphan process. It will be adopted by `init` process and continue to run. Subprocess will no longer receive `Event` message from host. No message is received within twice or more `poll time``, subprocess can be assumed that the host process has crashed. Subprocess will send a `stopped` message to host and exit. + +#### When Subprocess Crashes + +In this case, host can check subprocess is alive or not. If subprocess is not alive, host should log an `warming` for every poll. + +Host can be configured to restart subprocess when subprocess crashes for a certain number of times. + +## 5. Migration + +### 5.1 Migration from BccTracer + +1. Reimplement `Tracer` in C/Rust/Go +2. Commit code to [/CORE](../../CORE/) +3. Implement `SubprocessTracer` in `duetector` + +Note: + +- Need to add the corresponding Github Action to compile the code and upload the binary to Github Release. +- Need to change our docker image to include the binary. + +## 6. Others + +- Support `SubprocessTracer` template in `TracerManager` diff --git a/docs/design/image/spm-arch.png b/docs/design/image/spm-arch.png new file mode 100644 index 0000000..bba7080 Binary files /dev/null and b/docs/design/image/spm-arch.png differ diff --git a/docs/design/image/spm-state-flow.png b/docs/design/image/spm-state-flow.png new file mode 100644 index 0000000..5dc51d9 Binary files /dev/null and b/docs/design/image/spm-state-flow.png differ diff --git a/docs/design/src/spm-arch.drawio b/docs/design/src/spm-arch.drawio new file mode 100644 index 0000000..055017b --- /dev/null +++ b/docs/design/src/spm-arch.drawio @@ -0,0 +1,57 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/design/src/spm-state-flow.drawio b/docs/design/src/spm-state-flow.drawio new file mode 100644 index 0000000..0d02299 --- /dev/null +++ b/docs/design/src/spm-state-flow.drawio @@ -0,0 +1,125 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/source/monitors/index.rst b/docs/source/monitors/index.rst index b46da89..501d16c 100644 --- a/docs/source/monitors/index.rst +++ b/docs/source/monitors/index.rst @@ -22,3 +22,4 @@ Avaliable Monitor Bcc Monitor Shell Monitor + Subprocess Monitor diff --git a/docs/source/monitors/subprocess.rst b/docs/source/monitors/subprocess.rst new file mode 100644 index 0000000..a669266 --- /dev/null +++ b/docs/source/monitors/subprocess.rst @@ -0,0 +1,16 @@ +Subprocess Monitor +================== + +``SubprocessMonitor`` use ``SubprocessHost`` as backend for cacheing output of a subprocess. + +.. autoclass:: duetector.monitors.subprocess_monitor.SubprocessHost + :members: + :undoc-members: + :private-members: + :show-inheritance: + +.. autoclass:: duetector.monitors.subprocess_monitor.SubprocessMonitor + :members: + :undoc-members: + :private-members: + :show-inheritance: diff --git a/duetector/cli/main.py b/duetector/cli/main.py index b54f02c..a45af0b 100644 --- a/duetector/cli/main.py +++ b/duetector/cli/main.py @@ -10,7 +10,7 @@ from duetector.config import CONFIG_PATH, ConfigLoader from duetector.log import logger from duetector.managers.analyzer import AnalyzerManager -from duetector.monitors import BccMonitor, ShMonitor +from duetector.monitors import BccMonitor, ShMonitor, SubprocessMonitor from duetector.monitors.base import Monitor from duetector.tools.config_generator import ConfigGenerator @@ -127,6 +127,11 @@ def generate_config(path): default=True, help=f"Set false or False to disable shell monitor, default: ``True``.", ) +@click.option( + "--enable_sp_monitor", + default=True, + help=f"Set false or False to disable subprocess monitor, default: ``True``.", +) @click.option( "--brief", default=True, @@ -139,6 +144,7 @@ def start( config_dump_dir, enable_bcc_monitor, enable_sh_monitor, + enable_sp_monitor, brief, ): """ @@ -158,6 +164,8 @@ def start( monitors.append(BccMonitor(c)) if enable_sh_monitor: monitors.append(ShMonitor(c)) + if enable_sp_monitor: + monitors.append(SubprocessMonitor(c)) for m in monitors: m.start_polling() diff --git a/duetector/managers/tracer.py b/duetector/managers/tracer.py index 6015611..2a41544 100644 --- a/duetector/managers/tracer.py +++ b/duetector/managers/tracer.py @@ -8,7 +8,7 @@ from duetector.extension.tracer import project_name from duetector.log import logger from duetector.managers.base import Manager -from duetector.tracers.base import ShellTracer, Tracer +from duetector.tracers.base import ShellTracer, SubprocessTracer, Tracer PROJECT_NAME = project_name #: Default project name for pluggy hookspec = pluggy.HookspecMarker(PROJECT_NAME) @@ -38,10 +38,18 @@ class TracerTemplate(Configuable): [tracer.template.sh] pstracer = { "comm" = ["ps", "-aux"], config = { "enable_cache" = false } } + [tracer.template.sp] + randomtracer = { "comm" = ["cat", "/dev/random"], config = { "enable_cache" = false } } + + TODO: + + Example of ``tracer.template.sp`` is not working yet. Replace it with some CO-RE example + """ _avaliable_tracer_type = { "sh": ShellTracer, + "sp": SubprocessTracer, } """ Available tracer type. diff --git a/duetector/monitors/__init__.py b/duetector/monitors/__init__.py index 56aa72b..62f2f11 100644 --- a/duetector/monitors/__init__.py +++ b/duetector/monitors/__init__.py @@ -1,4 +1,5 @@ from .bcc_monitor import BccMonitor from .sh_monitor import ShMonitor +from .subprocess_monitor import SubprocessMonitor -__all__ = ["BccMonitor", "ShMonitor"] +__all__ = ["BccMonitor", "ShMonitor", "SubprocessMonitor"] diff --git a/duetector/monitors/base.py b/duetector/monitors/base.py index d0060f5..9498427 100644 --- a/duetector/monitors/base.py +++ b/duetector/monitors/base.py @@ -81,7 +81,7 @@ def poll_all(self): """ Poll all tracers. Depends on ``self.poll``. """ - return [self._backend.submit(self.poll, tracer) for tracer in self.tracers] + return self._backend.map(self.poll, self.tracers) def poll(self, tracer: Tracer): """ diff --git a/duetector/monitors/sh_monitor.py b/duetector/monitors/sh_monitor.py index 3041c96..0dd9d71 100644 --- a/duetector/monitors/sh_monitor.py +++ b/duetector/monitors/sh_monitor.py @@ -71,7 +71,7 @@ def poll_all(self): """ Poll all tracers. """ - return [self.backend.submit(self.poll, tracer) for tracer in self.tracers] + return self.backend.map(self.poll, self.tracers) def set_callback(self, tracer, callback): """ diff --git a/duetector/monitors/subprocess_monitor.py b/duetector/monitors/subprocess_monitor.py new file mode 100644 index 0000000..1773006 --- /dev/null +++ b/duetector/monitors/subprocess_monitor.py @@ -0,0 +1,288 @@ +import subprocess +import threading +from collections import Counter +from io import DEFAULT_BUFFER_SIZE +from select import select +from typing import IO, Any, Callable, Dict, List, NamedTuple, Optional + +import psutil + +from duetector.collectors.base import Collector +from duetector.log import logger +from duetector.managers.collector import CollectorManager +from duetector.managers.filter import FilterManager +from duetector.managers.tracer import TracerManager +from duetector.monitors.base import Monitor +from duetector.proto.subprocess import ( + EventMessage, + InitMessage, + StopMessage, + StoppedMessage, + dispatch_message, +) +from duetector.tracers.base import SubprocessTracer + + +class SubprocessHost: + def __init__( + self, + timeout, + backend, + poll_szie=1024, + bufsize=DEFAULT_BUFFER_SIZE * 4, + kill_timeout=5, + restart_times=0, + ) -> None: + self.tracers: Dict[SubprocessTracer, subprocess.Popen] = {} + self.callbacks: Dict[SubprocessTracer, Callable[[NamedTuple], None]] = {} + self.timeout = timeout + self.backend = backend + self.bufsize = bufsize + self.poll_szie = poll_szie + self.kill_timeout = kill_timeout + + self.restart_times = restart_times + self.restart_counter: Counter = Counter() + self.shutdown_event = threading.Event() + self.shutdown_event.clear() + + def notify_init(self, tracer: SubprocessTracer): + logger.debug(f"Notify init for tracer {tracer.__class__.__name__}") + self._writeline( + InitMessage.from_host(self, tracer).model_dump_json(), + self.tracers[tracer].stdin, + ) + + self._poll(tracer, self.tracers[tracer].stdout.readline()) + + def notify_stop(self, tracer: SubprocessTracer): + logger.debug(f"Notify stop for tracer {tracer.__class__.__name__}") + self._writeline(StopMessage.from_host(self).model_dump_json(), self.tracers[tracer].stdin) + + def notify_poll(self, tracer: SubprocessTracer): + logger.debug(f"Notify poll for tracer {tracer.__class__.__name__}") + self._writeline(EventMessage.from_host(self).model_dump_json(), self.tracers[tracer].stdin) + + def _writeline(self, json_str: str, io: IO): + if not json_str.endswith("\n"): + json_str += "\n" + io.write(json_str) + io.flush() + + def attach(self, tracer: SubprocessTracer): + if self.shutdown_event.is_set(): + raise RuntimeError("Host already shutdown") + + p = subprocess.Popen( + tracer.comm, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, # FIXME: We currently using a text proto, may change to binary proto in the future + bufsize=self.bufsize, + ) + self.tracers[tracer] = p + self.notify_init(tracer) + + def detach(self, tracer: SubprocessTracer): + if tracer in self.tracers: + p = self.tracers.get(tracer) + try: + p = psutil.Process(p.pid) + except psutil.NoSuchProcess: + # Already stopped + logger.warning("Detaching a stopped tracer") + self.poll(tracer) + self.tracers.pop(tracer) + return + + try: + self.notify_stop(tracer) + logger.info(f"Detaching {tracer}") + p.terminate() + logger.info("Wating for subprocess to stop") + p.wait(self.kill_timeout) + except psutil.TimeoutExpired: + logger.warning("Timeout for terminate subprocess, kill it.") + p.kill() + self.poll(tracer) + self.tracers.pop(tracer) + else: + logger.warning("Tracer not attached, ignore") + + def shutdown(self): + logger.info("Shutting down host") + self.shutdown_event.set() + for tracer in list(self.tracers): + self.detach(tracer) + + def is_alive(self, tracer: SubprocessTracer): + p = self.tracers[tracer] + try: + psutil_p = psutil.Process(p.pid) + except psutil.NoSuchProcess: + return False + return psutil_p.is_running() + + def poll(self, tracer: SubprocessTracer): + """ + Poll a tracer. + """ + p = self.tracers[tracer] + if not self.is_alive(tracer): + if self.restart_times == 0: + return + if ( + self.restart_counter[tracer] >= self.restart_times + and not self.shutdown_event.is_set() + ): + logger.warning( + f"Tracer {tracer.__class__.__name__} restart times exceed limit, stop it." + ) + self.detach(tracer) + return + logger.warning(f"Tracer {tracer.__class__.__name__} stopped, restart it.") + self.restart_counter[tracer] += 1 + self.attach(tracer) + p = self.tracers[tracer] + # Poll next time + + else: + logger.debug(f"Polling tracer {tracer.__class__.__name__}") + self.notify_poll(tracer) + ready = select([p.stdout.fileno()], [], [], self.timeout)[0] + poll_count = 0 + while ready and poll_count < self.poll_szie and not self.shutdown_event.is_set(): + poll_count += 1 + output = p.stdout.readline() + if not output: + break + self._poll(tracer, output) + ready = select([p.stdout.fileno()], [], [], self.timeout)[0] + logger.debug(f"Total poll count: {poll_count}") + + def _poll(self, tracer: SubprocessTracer, output): + if not output: + # Empty output + return + msg = dispatch_message(output) + if not msg: + return + if isinstance(msg, EventMessage): + self.callbacks[tracer](msg.serialize_namedtuple()) + if isinstance(msg, StoppedMessage): + self.detach(tracer) + if isinstance(msg, InitMessage): + logger.info(f"Tracer {tracer.__class__.__name__} initialized") + + def poll_all(self): + """ + Poll all tracers. + """ + return self.backend.map(self.poll, self.tracers) + + def set_callback(self, tracer, callback): + """ + Set callback for tracer. + """ + self.callbacks[tracer] = callback + + +class SubprocessMonitor(Monitor): + config_scope = "monitor.subprocess" + + default_config = { + **Monitor.default_config, + "auto_init": True, + "timeout": 0.01, + "kill_timeout": 5, + "pool_size": 1024, + "bufsize": DEFAULT_BUFFER_SIZE * 4, + "restart_times": 0, + } + + @property + def auto_init(self): + """ + Auto init tracers when init monitor. + """ + return self.config.auto_init + + @property + def timeout(self): + """ + Timeout for poll. + """ + return float(self.config.timeout) + + @property + def kill_timeout(self): + """ + Timeout for kill subprocess. + """ + return int(self.config.kill_timeout) + + @property + def bufsize(self): + """ + Buffer size for subprocess. + """ + return int(self.config.bufsize) + + @property + def poll_szie(self): + """ + Poll size for subprocess. + """ + return float(self.config.pool_size) + + def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs): + super().__init__(config=config, *args, **kwargs) + if self.disabled: + logger.info("SubprocessMonitor disabled") + self.tracers = [] + self.filters = [] + self.collectors = [] + return + + self.tracers: List[SubprocessTracer] = TracerManager(config).init(tracer_type=SubprocessTracer) # type: ignore + self.filters: List[Callable] = FilterManager(config).init() + self.collectors: List[Collector] = CollectorManager(config).init() + + self.host = SubprocessHost( + timeout=self.timeout, + backend=self._backend, + bufsize=self.bufsize, + poll_szie=self.poll_szie, + kill_timeout=self.kill_timeout, + ) + if self.auto_init: + self.init() + + def init(self): + for tracer in self.tracers: + tracer.attach(self.host) + self._set_callback(self.host, tracer) + logger.info(f"Tracer {tracer.__class__.__name__} attached") + + def _set_callback(self, host, tracer): + def _(data): + for filter in self.filters: + data = filter(data) + if not data: + return + + for collector in self.collectors: + collector.emit(tracer, data) + + tracer.set_callback(host, _) + + def shutdown(self): + self.host.shutdown() + super().shutdown() + + def poll_all(self): + return self.host.poll_all() + + def poll(self, tracer: SubprocessTracer): # type: ignore + return self.host.poll(tracer) diff --git a/duetector/proto/__init__.py b/duetector/proto/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/duetector/proto/subprocess.py b/duetector/proto/subprocess.py new file mode 100644 index 0000000..fda0ff8 --- /dev/null +++ b/duetector/proto/subprocess.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import json +from collections import namedtuple +from typing import Any, Dict + +from pydantic import BaseModel + +from duetector.log import logger + +VERSION = "0.1.0" + + +class Message(BaseModel): + proto: str + version: str = VERSION + type: str + payload: Dict[str, Any] = {} + + @classmethod + def from_host(cls, *args, **kwargs): + raise NotImplementedError + + @classmethod + def from_subprocess(cls, *args, **kwargs): + raise NotImplementedError + + def serialize_namedtuple(self): + return namedtuple("EventPayload", self.payload)(**self.payload) + + +def dispatch_message(data): + if isinstance(data, str): + try: + data = json.loads(data) + except json.JSONDecodeError: + logger.warning(f"Invalid json: {data}") + return None + + if data["type"] == "init": + return InitMessage.from_subprocess(data) + elif data["type"] == "event": + return EventMessage.from_subprocess(data.get("payload", {})) + elif data["type"] == "stopped": + return StoppedMessage.from_subprocess(data) + else: + raise ValueError(f"Unknown message type: {data['type']}") + + +class InitMessage(Message): + proto: str = "duetector" + version: str = "0.1.0" + type: str = "init" + payload: Dict[str, Any] = {} + + @classmethod + def from_host(cls, host, tracer) -> InitMessage: + return InitMessage( + payload={ + "poll_time": host.timeout, + "kill_timeout": host.kill_timeout, + "config": tracer.config._config_dict, + } + ) + + @classmethod + def from_subprocess(cls, data) -> InitMessage: + return InitMessage(payload=data) + + +class EventMessage(Message): + proto: str = "duetector" + version: str = "0.1.0" + type: str = "event" + payload: Dict[str, Any] = {} + + @classmethod + def from_host(cls, *args, **kwargs) -> EventMessage: + return EventMessage() + + @classmethod + def from_subprocess(cls, data) -> EventMessage: + if isinstance(data, str): + data = json.loads(data) + + return EventMessage(payload=data) + + +class StopMessage(Message): + proto: str = "duetector" + version: str = "0.1.0" + type: str = "stop" + payload: Dict[str, Any] = {} + + @classmethod + def from_host(cls, *args, **kwargs) -> StopMessage: + return StopMessage() + + +class StoppedMessage(StopMessage): + type: str = "stopped" + + @classmethod + def from_subprocess(cls, *args, **kwargs) -> StoppedMessage: + return StoppedMessage() diff --git a/duetector/static/config.toml b/duetector/static/config.toml index d1a9dae..cf5c530 100644 --- a/duetector/static/config.toml +++ b/duetector/static/config.toml @@ -39,6 +39,8 @@ disabled = false [tracer.template.sh] +[tracer.template.sp] + [tracer.clonetracer] disabled = false attach_event = "__x64_sys_clone" @@ -119,5 +121,21 @@ max_workers = 10 interval_ms = 500 call_when_shutdown = true +[monitor.subprocess] +disabled = false +auto_init = true +timeout = 0.01 +kill_timeout = 5 +pool_size = 1024 +bufsize = 32768 +restart_times = 0 + +[monitor.subprocess.backend_args] +max_workers = 10 + +[monitor.subprocess.poller] +interval_ms = 500 +call_when_shutdown = true + [server] token = "" diff --git a/duetector/tools/config_generator.py b/duetector/tools/config_generator.py index b2ce894..7d1b86c 100644 --- a/duetector/tools/config_generator.py +++ b/duetector/tools/config_generator.py @@ -4,14 +4,13 @@ import tomli_w -from duetector.analyzer.db import DBAnalyzer from duetector.config import ConfigLoader from duetector.log import logger from duetector.managers.analyzer import AnalyzerManager from duetector.managers.collector import CollectorManager from duetector.managers.filter import FilterManager from duetector.managers.tracer import TracerManager -from duetector.monitors import BccMonitor, ShMonitor +from duetector.monitors import BccMonitor, ShMonitor, SubprocessMonitor from duetector.service.config import ServerConfig @@ -56,7 +55,7 @@ class ConfigGenerator: All managers to inspect. """ - monitors = [BccMonitor, ShMonitor] + monitors = [BccMonitor, ShMonitor, SubprocessMonitor] """ All monitors to inspect. """ diff --git a/duetector/tools/poller.py b/duetector/tools/poller.py index 83eb608..1cd206a 100644 --- a/duetector/tools/poller.py +++ b/duetector/tools/poller.py @@ -62,11 +62,17 @@ def start(self, func, *args, **kwargs): def _poll(): while not self.shutdown_event.is_set(): - func(*args, **kwargs) + try: + func(*args, **kwargs) + except Exception as e: + logger.exception(e) self.shutdown_event.wait(timeout=self.interval_ms / 1000) # call func one last time before exit if self.call_when_shutdown: - func(*args, **kwargs) + try: + func(*args, **kwargs) + except Exception as e: + logger.exception(e) self._thread = threading.Thread(target=_poll) self.shutdown_event.clear() diff --git a/duetector/tracers/__init__.py b/duetector/tracers/__init__.py index 5d8875f..8f39e26 100644 --- a/duetector/tracers/__init__.py +++ b/duetector/tracers/__init__.py @@ -1,3 +1,3 @@ -from .base import BccTracer, ShellTracer, Tracer +from .base import BccTracer, ShellTracer, SubprocessTracer, Tracer -__all__ = ["Tracer", "BccTracer", "ShellTracer"] +__all__ = ["Tracer", "BccTracer", "ShellTracer", "SubprocessTracer"] diff --git a/duetector/tracers/base.py b/duetector/tracers/base.py index 189daec..d8b5519 100644 --- a/duetector/tracers/base.py +++ b/duetector/tracers/base.py @@ -266,10 +266,6 @@ def enable_cache(self): return self.config.enable_cache - @property - def disabled(self): - return self.config.disabled - def set_cache(self, cache): """ Set cache for this tracer. @@ -306,3 +302,49 @@ def set_callback(self, host, callback: Callable[[NamedTuple], None]): Set callback function to host. """ host.set_callback(self, callback) + + +class SubprocessTracer(Tracer): + default_config = { + **Tracer.default_config, + } + """ + Default config for this tracer. + """ + + comm: List[str] + """ + shell command + """ + + preserve_env: bool = False + """ + If preserve env for this command + """ + + def __init__(self, config: Optional[Union[Config, Dict[str, Any]]] = None, *args, **kwargs): + super().__init__(config, *args, **kwargs) + + def attach(self, host): + """ + Attach to host. + """ + host.attach(self) + + def detach(self, host): + """ + Detach from host. + """ + host.detach(self) + + def get_poller(self, host) -> Callable: + """ + Get poller function from host. + """ + return host.get_poller(self) + + def set_callback(self, host, callback: Callable[[NamedTuple], None]): + """ + Set callback function to host. + """ + host.set_callback(self, callback) diff --git a/duetector/tracers/subprocess/__init__.py b/duetector/tracers/subprocess/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/bin/dummy_process.py b/tests/bin/dummy_process.py new file mode 100755 index 0000000..640d1f2 --- /dev/null +++ b/tests/bin/dummy_process.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python + +import json +import os +import signal +import threading +from sys import stderr, stdout +from typing import IO + +from duetector.proto.subprocess import ( + EventMessage, + InitMessage, + StopMessage, + StoppedMessage, +) + + +def _writeline(msg, io: IO = stdout): + if not msg.endswith("\n"): + msg += "\n" + io.write(msg) + io.flush() + + +class Tracer: + def __init__(self, init_config) -> None: + self.name = "dummy" + self.version = "0.1.0" + self.event = threading.Event() + self.thread = None + self.init_config = init_config + self.timewait = 1 + + def _target(self): + msg = EventMessage.from_subprocess( + { + "pid": os.getpid(), + "extra": "noting", + } + ) + while not self.event.is_set(): + _writeline(msg.model_dump_json()) + self.event.wait(self.timewait) + + def start(self): + if self.thread and self.thread.is_alive(): + raise RuntimeError("Treacer already running") + msg = InitMessage.from_subprocess({"name": self.name, "version": self.version}) + _writeline(msg.model_dump_json()) + self.event.clear() + self.thread = threading.Thread(target=self._target) + self.thread.start() + + def stop(self): + self.event.set() + self.thread.join() + self.thread = None + + +def dispatch_message(data): + if isinstance(data, str): + data = json.loads(data) + + if data["type"] == "init": + return InitMessage(payload=data) + elif data["type"] == "event": + return EventMessage(payload=data) + elif data["type"] == "stop": + return StopMessage(payload=data) + else: + raise ValueError(f"Unknown message type: {data['type']}") + + +def main(): + line = input() + msg = dispatch_message(line) + if isinstance(msg, InitMessage): + init_config = msg.payload + t = Tracer(init_config) + t.start() + else: + raise ValueError("Expect init message") + + def _stop(*args, **kwargs): + t.stop() + _writeline(StoppedMessage().model_dump_json()) + exit(0) + + signal.signal(signal.SIGINT, _stop) + signal.signal(signal.SIGTERM, _stop) + while True: + line = input() + msg = dispatch_message(line) + if isinstance(msg, EventMessage): + stderr.write(msg.serialize_namedtuple()) + if isinstance(msg, StopMessage): + # Send stop signal to itself + os.kill(os.getpid(), signal.SIGTERM) + if isinstance(msg, InitMessage): + raise ValueError("Unexpected init message") + + +if __name__ == "__main__": + main() diff --git a/tests/test_sp_monitor.py b/tests/test_sp_monitor.py new file mode 100644 index 0000000..2cb0925 --- /dev/null +++ b/tests/test_sp_monitor.py @@ -0,0 +1,66 @@ +import time +from copy import deepcopy +from pathlib import Path + +import pytest + +from duetector.monitors.subprocess_monitor import SubprocessMonitor +from duetector.tracers.base import SubprocessTracer + +_HERE = Path(__file__).parent + + +class DummySpTracer(SubprocessTracer): + default_config = { + **SubprocessTracer.default_config, + } + + comm = [(_HERE / "bin" / "dummy_process.py").as_posix()] + + +@pytest.fixture +def config(full_config): + c = deepcopy(full_config) + + c.update( + **{ + "monitor": { + "subprocess": { + "auto_init": False, + } + }, + } + ) + yield c + + +@pytest.fixture +def sp_monitor(config) -> SubprocessMonitor: + sp_monitor = SubprocessMonitor(config) + sp_monitor.tracers = [DummySpTracer()] + sp_monitor.init() + yield sp_monitor + sp_monitor.shutdown() + + +def test_sp_monitor(sp_monitor: SubprocessMonitor): + assert sp_monitor + assert sp_monitor.tracers + assert sp_monitor.filters + assert sp_monitor.collectors + assert sp_monitor.host + assert not sp_monitor.auto_init + assert sp_monitor.timeout + popens = list(sp_monitor.host.tracers.values()) + time.sleep(2.5) + sp_monitor.poll_all() + time.sleep(0.5) + sp_monitor.shutdown() + for popen in popens: + assert popen.poll() == 0 + + # assert sp_monitor.summary()["SubprocessMonitor"]["DBCollector"]["dummysptracer"]["count"] + + +if __name__ == "__main__": + pytest.main(["-vv", "-s", __file__])