diff --git a/duetector/injectors/base.py b/duetector/injectors/base.py index 6e72ae3..0f61a12 100644 --- a/duetector/injectors/base.py +++ b/duetector/injectors/base.py @@ -73,6 +73,9 @@ def __call__(self, data: namedtuple) -> namedtuple | None: return data return self.inject(data) + def shutdown(self): + pass + class ProcInjector(Injector): def __init__(self, config: dict[str, Any] = None, *args, **kwargs): @@ -88,3 +91,7 @@ def get_patch_kwargs( **self.cgroup_inspector.inspect(param), **self.namespace_inspector.inspect(param), } + + def shutdown(self): + self.cgroup_inspector.stop() + self.namespace_inspector.stop() diff --git a/duetector/injectors/inspector.py b/duetector/injectors/inspector.py index 29703af..ee0ddca 100644 --- a/duetector/injectors/inspector.py +++ b/duetector/injectors/inspector.py @@ -1,14 +1,80 @@ from __future__ import annotations import itertools +import signal +from threading import Event, Thread from typing import Any +from watchfiles import watch + +from duetector.log import logger from duetector.utils import Singleton class ProcWatcher(metaclass=Singleton): - def __init__(self) -> None: - pass + def __init__( + self, + proc_dir: str = "/proc", + ignore_permission_denied: bool = True, + ) -> None: + self.proc_dir = proc_dir + self.ignore_permission_denied = ignore_permission_denied + + self.thread: Thread | None = None + + self.stop_event = Event() + self.stop_event.clear() + + self.start() + signal.signal(signal.SIGINT, self.stop) + signal.signal(signal.SIGTERM, self.stop) + + def start(self): + if self.thread: + if self.thread.is_alive(): + return + else: + self.stop() + + self.stop_event.clear() + + def _(): + logger.debug("Starting watch proc dir.") + while True: + try: + self._watch() + except Exception as e: + logger.exception(e) + self.stop_event.wait(0.001) + if self.stop_event.is_set(): + return + + self.thread = Thread( + target=_, + ) + self.thread.start() + logger.info("Proc watcher started.") + + def _watch(self): + for changes in watch( + self.proc_dir, + stop_event=self.stop_event, + ignore_permission_denied=self.ignore_permission_denied, + recursive=False, + force_polling=True, + ): + pass + # logger.info(changes) + + def stop(self, sig=None, frame=None): + self.stop_event.set() + if self.thread: + logger.info("Waiting proc watcher to stop.") + self.thread.join(1) + self.thread = None + + def pause(self): + signal.pause() def with_prefix(sep: str, prefix, key: str | list[str]) -> str: @@ -52,6 +118,9 @@ def inspect(self, model: dict[str, Any]) -> dict[str, Any]: def _inspect(self, model: dict[str, Any]) -> dict[str, Any]: raise NotImplementedError + def stop(self): + pass + class NamespaceInspector(Inspector): @property @@ -64,6 +133,9 @@ def __init__(self) -> None: def _inspect(self, model: dict[str, Any]) -> dict[str, Any]: return {} + def stop(self): + self.proc_watcher.stop() + class CgroupInspector(Inspector): @property @@ -75,3 +147,11 @@ def __init__(self) -> None: def _inspect(self, model: dict[str, Any]) -> dict[str, Any]: return {} + + def stop(self): + self.proc_watcher.stop() + + +if __name__ == "__main__": + w = ProcWatcher() + w.pause() diff --git a/duetector/monitors/base.py b/duetector/monitors/base.py index 0c1a575..85bfb1b 100644 --- a/duetector/monitors/base.py +++ b/duetector/monitors/base.py @@ -142,6 +142,8 @@ def shutdown(self): self._backend.shutdown() for c in self.collectors: c.shutdown() + for i in self.injectors: + i.shutdown() def _inject_extra_info(self, data: namedtuple) -> namedtuple: patch_kwargs = {}