Skip to content

Commit

Permalink
Add the new pbench-tools-kill command
Browse files Browse the repository at this point in the history
PBENCH-702

A new comprehensive command to hunt down and kill any and all Tool
Meister sub-system components running locally or remotely, based on the
recorded pbench "result" data directories found in the configured
`${pbench_run}` directory hierarchy.

For example, if there are 5 pbench result directories found in
`${pbench_run}`, and two of those results have left-over processes still
running locally or remotely, this command will find and kill them all
without prejudice.

The new `pbench.agent.tool_group.gen_tool_groups` helper module method
is added to encapsulate searching for tool group data in result
directories.
  • Loading branch information
portante authored Oct 31, 2022
1 parent d8f871d commit 26450d0
Show file tree
Hide file tree
Showing 5 changed files with 837 additions and 11 deletions.
17 changes: 14 additions & 3 deletions lib/pbench/agent/tool_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
import re
import shutil
from typing import Iterable, Optional


class BadToolGroup(Exception):
Expand All @@ -19,7 +20,7 @@ class ToolGroup:
TOOL_GROUP_PREFIX = "tools-v1"

@staticmethod
def verify_tool_group(name, pbench_run=None):
def verify_tool_group(name: str, pbench_run: Optional[str] = None) -> Path:
"""verify_tool_group - given a tool group name, verify it exists in the
${pbench_run} directory as a properly prefixed tool group directory
name.
Expand Down Expand Up @@ -56,7 +57,7 @@ def verify_tool_group(name, pbench_run=None):
else:
return tg_dir

def __init__(self, name):
def __init__(self, name: str, pbench_run: Optional[str] = None):
"""Construct a ToolGroup object from the on-disk data of the given
tool group.
Expand All @@ -75,7 +76,7 @@ def __init__(self, name):
Raises BadToolGroup via the verify_tool_group() method on error.
"""
self.tg_dir = self.verify_tool_group(name)
self.tg_dir = self.verify_tool_group(name, pbench_run)
self.name = name

# __trigger__
Expand Down Expand Up @@ -173,3 +174,13 @@ def archive(self, target_dir: Path):
0
"""
shutil.copytree(str(self.tg_dir), target_dir / self.tg_dir.name, symlinks=False)


def gen_tool_groups(pbench_run: str) -> Iterable[ToolGroup]:
"""Generate a series of ToolGroup objects for each on-disk tool group
found in the given pbench run directory.
"""
for tg_dir in Path(pbench_run).glob(f"{ToolGroup.TOOL_GROUP_PREFIX}-*"):
# All on-disk tool group directories will have names that look like
# above.
yield ToolGroup(tg_dir.name[len(ToolGroup.TOOL_GROUP_PREFIX) + 1 :], pbench_run)
247 changes: 247 additions & 0 deletions lib/pbench/cli/agent/commands/tools/kill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# -*- mode: python -*-

"""Tool Meister "Kill"
Module responsible for hunting down and stopping all Tool Meisters, the Tool
Data Sink, and the Redis server orchestrated by pbench-tool-meister-start.
This is a "big hammer" approach that is offered to users when they find the
distributed system state of the Pbench Agent not working correctly.
The algorithm is fairly straight-forward:
For each pbench result directory in ${pbench_run}
1. If the result was NOT orchestrated by pbench-tool-meister-start, ignore
2. Find the recorded pids for the Redis Server, Tool Data Sink, and local
Tool Meister in their respective pid files, and stop those processes
from running
3. Determine all the remote hosts used for that result
4. For each remote host:
a. `ssh` to that remote host
b. Stop the Tool Meister running on that host
The pbench-tool-meister-start generates a UUID for the entire session and
inserts that value into each command line of spawned remote Tool Meister
processes. Any Tool Meister process with that UUID in its command line string
will be stopped via `kill -KILL`, along with all of its child processes.
"""

from collections import defaultdict
import pathlib
import shlex
from typing import Callable, Dict, Iterable, List, Tuple

import click
import psutil

from pbench.agent.base import BaseCommand
from pbench.agent.tool_group import gen_tool_groups
from pbench.agent.utils import LocalRemoteHost, TemplateSsh
from pbench.cli.agent import CliContext, pass_cli_context
from pbench.cli.agent.options import common_options


def kill_family(proc: psutil.Process):
"""Kill a parent process and all its children."""
try:
# Get the list of children of the parent before killing it.
children = list(proc.children(recursive=True))
except psutil.NoSuchProcess:
return
try:
proc.kill()
except psutil.NoSuchProcess:
pass
for child in children:
try:
child.kill()
except psutil.NoSuchProcess:
pass


class PidSource:
"""For a given PID file name keep track of discovered Process-es and UUIDs as
Tool Meister directories are `load()`ed. The `killem()` method is invoked
by the caller at its discretion.
The `killem()` method clears out all accumlated data.
"""

def __init__(self, file_name: str, display_name: str):
self.file_name = file_name
self.display_name = display_name
self.procs_by_uuid: Dict[str, psutil.Process] = {}
self.uuid_to_tmdir: Dict[str, pathlib.Path] = {}

def load(self, tm_dir: pathlib.Path, uuid: str) -> bool:
"""Load a PID from the given directory associated with the given UUID.
Records the loaded PID if it has a live process associated with it and
returns True, otherwise returns False.
"""
try:
pid = (tm_dir / self.file_name).read_text()
except FileNotFoundError:
return False
try:
self.procs_by_uuid[uuid] = psutil.Process(pid)
except psutil.NoSuchProcess:
return False
self.uuid_to_tmdir[uuid] = tm_dir
return True

def killem(self, echo: Callable[[str], None]) -> None:
"""Kill all PIDs found, and their children."""
if not self.procs_by_uuid:
return
echo(f"Killing {self.display_name} PIDs ...")
# Clear out the stored data ahead of the killings.
procs_by_uuid, self.procs_by_uuid = self.procs_by_uuid, {}
uuid_to_tmdir, self.uuid_to_tmdir = self.uuid_to_tmdir, {}
for uuid, proc in procs_by_uuid.items():
pid = proc.pid
echo(f"\tKilling {pid} (from {uuid_to_tmdir[uuid]})")
try:
kill_family(proc)
except Exception as exc:
echo(f"\t\terror killing {pid}: {exc}", err=True)


def gen_result_tm_directories(
pbench_run: pathlib.Path,
) -> Iterable[Tuple[pathlib.Path, str]]:
"""Generate the list of result directories available under ${pbench_run},
yielding a Path object for that directory, along with its recorded UUID.
Yields a tuple of the result directory Path object and associated UUID.
"""
for entry in pbench_run.iterdir():
if not entry.is_dir():
continue
tm_dir = entry / "tm"
try:
uuid = (tm_dir / ".uuid").read_text()
except FileNotFoundError:
# This is either not a pbench result directory, or the Tool
# Meister sub-system was not orchestrated by
# pbench-tool-meister-start for this result.
continue
yield tm_dir, uuid


def gen_host_names(result_dir: pathlib.Path) -> Iterable[str]:
"""Read the registered tool data saved for this result directory and
return the list of remote hosts.
"""
tool_groups = list(gen_tool_groups(result_dir))
if not tool_groups:
return

lrh = LocalRemoteHost()

for tg in tool_groups:
for host_name in tg.hostnames.keys():
if lrh.is_local(host_name):
continue
yield host_name


class KillTools(BaseCommand):
"""Find and stop all orchestrated Tool Meister instances."""

def execute(self, uuids: List[str]) -> int:
"""Execute the tools kill operation.
If any UUIDs are passed as arguments, we only want to look for, and
locally kill, processes having those UUIDs.
Without command line arguments, kill all the local PIDs from all the
discovered result directories.
All the Redis server PIDs are killed first, then the Tool Data Sinks,
and finally the local Tool Meisters. We kill all the Redis Servers
first in case killing them causes all the other processes to just exit
on their own. Then we kill all the Tool Data Sinks, and their
children. Then we kill all the (local) Tool Meisters, and their
children.
We then remotely kill (via `ssh`) all the Tool Meisters by invoking
this same command on a remote host with the list of UUIDs found across
all results involving that host.
"""
if uuids:
# We have a list of UUIDs to kill, implying that we search locally
# by UUID and only kill those PIDs found with the UUID in their
# registered command line.
for proc in psutil.process_iter():
# Consider each command line element.
for el in proc.cmdline():
for uuid in uuids:
if uuid in el:
pid = proc.pid
click.echo(f"\tKilling {pid} with UUID '{uuid}'")
try:
kill_family(proc)
except Exception as exc:
click.echo(f"\t\terror killing {pid}: {exc}", err=True)
return 0

# All three dictionaries for PID files that might be found, in the
# order in which we'll kill their PIDs.
all_pids = [
PidSource("redis.pid", "redis server"),
PidSource("pbench-tool-data-sink.pid", "tool data sink"),
PidSource("tm.pid", "local tool meister"),
]
local_pids = False
remote_tms = defaultdict(list)
for tm_dir, uuid in gen_result_tm_directories(self.pbench_run):
# If a result directory has any dangling components of the Tool
# Meister sub-system active, that is PID files for any local
# component, record those components. NOTE: we use a list
# comprehension here to ensure that the .load() method is invoked
# on each PidSource object.
local_pids |= any([pidsrc.load(tm_dir, uuid) for pidsrc in all_pids])
# Find all the remotes for this result that need to be tracked
# down.
for host in gen_host_names(tm_dir.parent):
remote_tms[host].append(uuid)

if not local_pids and not remote_tms:
# No local or remote pids found, nothing to do.
return 0

# Kill all the local PIDs (and their children).

for pidsrc in all_pids:
pidsrc.killem(click.echo)

# Kill all the remote Tool Meisters.

cmd = "pbench-tools-kill {{uuids}}"
template = TemplateSsh("ssh", shlex.split(self.ssh_opts), cmd)

# First fire off a number of background ssh processes, one per remote
# host.
remotes = []
for host, uuids in remote_tms.items():
click.echo(f"Killing all Tool Meister processes on remote host {host}...")
template.start(host, uuids=" ".join(uuids))
remotes.append(host)

# Wait for them all to complete, oldest to youngest.
for host in remotes:
template.wait(host)

return 0


@click.command(
help="Ensure all instances of running tools are stopped locally or remotely"
)
@common_options
@click.argument("uuids", nargs=-1)
@pass_cli_context
def main(ctxt: CliContext, uuids: List[str]):
status = KillTools(ctxt).execute(uuids)
click.get_current_context().exit(status)
Loading

0 comments on commit 26450d0

Please sign in to comment.