Skip to content

Commit

Permalink
Basically subprocess monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Sep 26, 2023
1 parent d58da5d commit 3f8ac99
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 142 deletions.
24 changes: 12 additions & 12 deletions docs/draft/CO-RE.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Furthermore, `SubprocessMonitor` also support non-BPF program, as long as it can
{
"proto": "duetector", // or opentelemetry if type is event
"type": "" // init, event, stop, stopped
"version": "0.1", // protocol version
"version": "0.1.0", // protocol version
"payload":{}
}
```
Expand All @@ -74,46 +74,46 @@ Furthermore, `SubprocessMonitor` also support non-BPF program, as long as it can

##### From host(reqeust)

type: init
type: `init`

payload:
- poll_time: unit: seconds
- kill_timeout: unit, seconds
- config: object, config of tracer
- `poll_time`: unit: seconds
- `kill_timeout`: unit, seconds
- `config`: object, config of tracer

##### From subprocess(response)

type: init
type: `init`

payload:
- name: string, name of tracer
- version: string, version of tracer
- `name`: string, name of tracer
- `version`: string, version of tracer

#### 3.1.3 Event Message

##### From host

type: event
type: `event`

payload:

##### From subprocess

type: event
type: `event`

payload: object of tracking data or OpenTelemetry OTLP things

#### 3.1.3 Stop Message

##### From host

type: stop
type: `stop`

payload:

##### From subprocess

type: stopped
type: `stopped`

payload:

Expand Down
121 changes: 105 additions & 16 deletions duetector/monitors/subprocess_monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import subprocess
from io import DEFAULT_BUFFER_SIZE, StringIO
from typing import Any, Callable, Dict, List, NamedTuple, Optional
import threading
from collections import Counter
from io import DEFAULT_BUFFER_SIZE
from select import select
from typing import IO, Any, Callable, Dict, List, NamedTuple, Optional

import psutil

Expand All @@ -10,19 +13,50 @@
from duetector.managers.filter import FilterManager
from duetector.managers.tracer import TracerManager
from duetector.monitors.base import Monitor
from duetector.proto.subprocess import (
EventMessage,
InitMessage,
StopMessage,
dispatch_message,
)
from duetector.tracers.base import SubprocessTracer


class SubprocessHost:
def __init__(self, timeout, backend, bufsize=DEFAULT_BUFFER_SIZE * 4, kill_timeout=5) -> None:
def __init__(
self,
timeout,
backend,
bufsize=DEFAULT_BUFFER_SIZE * 4,
kill_timeout=5,
restart_times=0,
) -> None:
self.tracers: Dict[SubprocessTracer, subprocess.Popen] = {}
self.callbacks: Dict[SubprocessTracer, Callable[[NamedTuple], None]] = {}
self.timeout = timeout
self.backend = backend
self.bufsize = bufsize
self.kill_timeout = kill_timeout

self.restart_times = restart_times
self.restart_counter: Counter = Counter()
self.shutdown_event = threading.Event()
self.shutdown_event.clear()

def notify_init(self, tracer: SubprocessTracer):
self.tracers[tracer].stdin.write(InitMessage.from_host(self, tracer).model_dump_json())
self._poll(tracer, self.tracers[tracer].stdout.readline())

def notify_stop(self, tracer: SubprocessTracer):
self.tracers[tracer].stdin.write(StopMessage.from_host(self).model_dump_json())

def notify_poll(self, tracer: SubprocessTracer):
self.tracers[tracer].stdin.write(EventMessage.from_host(self).model_dump_json())

def attach(self, tracer: SubprocessTracer):
if self.shutdown_event.is_set():
raise RuntimeError("Host already shutdown")

p = subprocess.Popen(
tracer.comm,
stdin=subprocess.PIPE,
Expand All @@ -32,36 +66,82 @@ def attach(self, tracer: SubprocessTracer):
bufsize=self.bufsize,
)
self.tracers[tracer] = p
self.notify_init(tracer)

def detach(self, tracer):
def detach(self, tracer: SubprocessTracer):
if tracer in self.tracers:
p = self.tracers.pop(tracer)
p = self.tracers.get(tracer)
try:
p = psutil.Process(p.pid)
except psutil.NoSuchProcess:
# Already stopped
self.poll(tracer)
self.tracers.pop(tracer)
return

try:
# TODO: Write stop message for notify subprocess to stop
# tracer.notify_stop(p.stdin)
self.notify_stop(tracer)
p.terminate()
logger.info("Wating for subprocess to stop")
p.wait(self.kill_timeout)
except psutil.TimeoutExpired:
logger.warning("Timeout for terminate subprocess, kill it.")
p.kill()
self.poll(tracer)
self.tracers.pop(tracer)

def shutdown(self):
for tracer in self.tracers:
self.shutdown_event.set()
for tracer in list(self.tracers):
self.detach(tracer)

def poll(self, tracer):
def is_alive(self, tracer: SubprocessTracer):
p = self.tracers[tracer]
try:
psutil_p = psutil.Process(p.pid)
except psutil.NoSuchProcess:
return False
return psutil_p.is_running()

def poll(self, tracer: SubprocessTracer):
"""
Poll a tracer.
"""
p = self.tracers[tracer]
# TODO: Write poll message for keepalive
tracer.notify_poll(p.stdin)
outputs = p.stdout.readlines()
callback = self.callbacks[tracer]
for output in outputs:
callback(tracer.deserialize(output))
if not self.is_alive(tracer):
if self.restart_times == 0:
return
if (
self.restart_counter[tracer] >= self.restart_times
and not self.shutdown_event.is_set()
):
logger.warning(
f"Tracer {tracer.__class__.__name__} restart times exceed limit, stop it."
)
self.detach(tracer)
return
logger.warning(f"Tracer {tracer.__class__.__name__} stopped, restart it.")
self.restart_counter[tracer] += 1
self.attach(tracer)
p = self.tracers[tracer]
# Poll next time

else:
self.notify_poll(tracer)
# FIXME: This stuck everything as no EOF
outputs = p.stdout.readlines()
for output in outputs:
self._poll(tracer, output)

def _poll(self, tracer: SubprocessTracer, output):
msg = dispatch_message(output)
if isinstance(msg, EventMessage):
callback = self.callbacks[tracer]
callback(msg.serialize_namedtuple())
if isinstance(msg, StopMessage):
self.detach(tracer)
if isinstance(msg, InitMessage):
logger.info(f"Tracer {tracer.__class__.__name__} initialized")

def poll_all(self):
"""
Expand All @@ -77,12 +157,15 @@ def set_callback(self, tracer, callback):


class SubprocessMonitor(Monitor):
config_scope = "monitor.subprocess"

default_config = {
**Monitor.default_config,
"auto_init": True,
"timeout": 5,
"kill_timeout": 5,
"bufsize": DEFAULT_BUFFER_SIZE * 4,
"restart_times": 0,
}

@property
Expand Down Expand Up @@ -128,7 +211,7 @@ def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):

self.host = SubprocessHost(
timeout=self.timeout,
backend=self.backend,
backend=self._backend,
bufsize=self.bufsize,
kill_timeout=self.kill_timeout,
)
Expand Down Expand Up @@ -156,3 +239,9 @@ def _(data):
def shutdown(self):
self.host.shutdown()
super().shutdown()

def poll_all(self):
return self.host.poll_all()

def poll(self, tracer: SubprocessTracer): # type: ignore
return self.host.poll(tracer)
99 changes: 99 additions & 0 deletions duetector/proto/subprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from __future__ import annotations

import json
from collections import namedtuple
from typing import Any, Dict

from pydantic import BaseModel

VERSION = "0.1.0"


class Message(BaseModel):
proto: str
version: str = VERSION
type: str
payload: Dict[str, Any] = {}

@classmethod
def from_host(cls, *args, **kwargs):
raise NotImplementedError

@classmethod
def from_subprocess(cls, *args, **kwargs):
raise NotImplementedError

def serialize_namedtuple(self):
return namedtuple("EventPayload", self.payload)(**self.payload)


def dispatch_message(data):
if isinstance(data, str):
data = json.loads(data)

if data["type"] == "init":
return InitMessage.from_subprocess(data)
elif data["type"] == "event":
return EventMessage.from_subprocess(data)
elif data["type"] == "stopped":
return StoppedMessage.from_subprocess(data)
else:
raise ValueError(f"Unknown message type: {data['type']}")


class InitMessage(Message):
proto: str = "duetector"
version: str = "0.1.0"
type: str = "init"
payload: Dict[str, Any] = {}

@classmethod
def from_host(cls, host, tracer) -> InitMessage:
return InitMessage(
payload={
"poll_time": host.timeout,
"kill_timeout": host.kill_timeout,
"config": tracer.config._config_dict,
}
)

@classmethod
def from_subprocess(cls, data) -> InitMessage:
return InitMessage(payload=data)


class EventMessage(Message):
proto: str = "duetector"
version: str = "0.1.0"
type: str = "event"
payload: Dict[str, Any] = {}

@classmethod
def from_host(cls, *args, **kwargs) -> EventMessage:
return EventMessage()

@classmethod
def from_subprocess(cls, data) -> EventMessage:
if isinstance(data, str):
data = json.loads(data)

return EventMessage(payload=data)


class StopMessage(Message):
proto: str = "duetector"
version: str = "0.1.0"
type: str = "stop"
payload: Dict[str, Any] = {}

@classmethod
def from_host(cls, *args, **kwargs) -> StopMessage:
return StopMessage()


class StoppedMessage(StopMessage):
type: str = "stopped"

@classmethod
def from_subprocess(cls, *args, **kwargs) -> StoppedMessage:
return StoppedMessage()
Loading

0 comments on commit 3f8ac99

Please sign in to comment.