Skip to content

Commit

Permalink
Add shutdown and watch demo
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Nov 28, 2023
1 parent 3c8cd53 commit 4dc5309
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 2 deletions.
7 changes: 7 additions & 0 deletions duetector/injectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def __call__(self, data: namedtuple) -> namedtuple | None:
return data
return self.inject(data)

def shutdown(self):
pass


class ProcInjector(Injector):
def __init__(self, config: dict[str, Any] = None, *args, **kwargs):
Expand All @@ -88,3 +91,7 @@ def get_patch_kwargs(
**self.cgroup_inspector.inspect(param),
**self.namespace_inspector.inspect(param),
}

def shutdown(self):
self.cgroup_inspector.stop()
self.namespace_inspector.stop()
84 changes: 82 additions & 2 deletions duetector/injectors/inspector.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,80 @@
from __future__ import annotations

import itertools
import signal
from threading import Event, Thread
from typing import Any

from watchfiles import watch

from duetector.log import logger
from duetector.utils import Singleton


class ProcWatcher(metaclass=Singleton):
def __init__(self) -> None:
pass
def __init__(
self,
proc_dir: str = "/proc",
ignore_permission_denied: bool = True,
) -> None:
self.proc_dir = proc_dir
self.ignore_permission_denied = ignore_permission_denied

self.thread: Thread | None = None

self.stop_event = Event()
self.stop_event.clear()

self.start()
signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)

def start(self):
if self.thread:
if self.thread.is_alive():
return
else:
self.stop()

self.stop_event.clear()

def _():
logger.debug("Starting watch proc dir.")
while True:
try:
self._watch()
except Exception as e:
logger.exception(e)
self.stop_event.wait(0.001)
if self.stop_event.is_set():
return

self.thread = Thread(
target=_,
)
self.thread.start()
logger.info("Proc watcher started.")

def _watch(self):
for changes in watch(
self.proc_dir,
stop_event=self.stop_event,
ignore_permission_denied=self.ignore_permission_denied,
recursive=False,
force_polling=True,
):
pass
# logger.info(changes)

def stop(self, sig=None, frame=None):
self.stop_event.set()
if self.thread:
logger.info("Waiting proc watcher to stop.")
self.thread.join(1)
self.thread = None

def pause(self):
signal.pause()


def with_prefix(sep: str, prefix, key: str | list[str]) -> str:
Expand Down Expand Up @@ -52,6 +118,9 @@ def inspect(self, model: dict[str, Any]) -> dict[str, Any]:
def _inspect(self, model: dict[str, Any]) -> dict[str, Any]:
raise NotImplementedError

def stop(self):
pass


class NamespaceInspector(Inspector):
@property
Expand All @@ -64,6 +133,9 @@ def __init__(self) -> None:
def _inspect(self, model: dict[str, Any]) -> dict[str, Any]:
return {}

def stop(self):
self.proc_watcher.stop()


class CgroupInspector(Inspector):
@property
Expand All @@ -75,3 +147,11 @@ def __init__(self) -> None:

def _inspect(self, model: dict[str, Any]) -> dict[str, Any]:
return {}

def stop(self):
self.proc_watcher.stop()


if __name__ == "__main__":
w = ProcWatcher()
w.pause()
2 changes: 2 additions & 0 deletions duetector/monitors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def shutdown(self):
self._backend.shutdown()
for c in self.collectors:
c.shutdown()
for i in self.injectors:
i.shutdown()

def _inject_extra_info(self, data: namedtuple) -> namedtuple:
patch_kwargs = {}
Expand Down

0 comments on commit 4dc5309

Please sign in to comment.