diff --git a/lib/pbench/agent/tool_meister.py b/lib/pbench/agent/tool_meister.py index 56a1e7452a..232e62a7b2 100644 --- a/lib/pbench/agent/tool_meister.py +++ b/lib/pbench/agent/tool_meister.py @@ -663,9 +663,9 @@ def install(self) -> InstallationResult: if executable is None: return InstallationResult(returncode=1, output="pcp tool (pmcd) not found") # FIXME - The Tool Data Sink and Tool Meister have to agree on the - # exact port number to use. We can't use the default `pmcd` port - # number because it might conflict with an existing `pmcd` - # deployment out of our control. + # exact port number to use. We can't use the default `pmcd` port + # number because it might conflict with an existing `pmcd` + # deployment out of our control. self.args = [ executable, "--foreground", @@ -1798,7 +1798,7 @@ def sysinfo(self, data: Dict[str, str]) -> int: def get_logger( logger_name: str, is_daemon: bool = False, level: str = "info" ) -> logging.Logger: - """Contruct a logger for a Tool Meister instance. + """Construct a logger for a Tool Meister instance. If in the Unit Test environment, just log to console. If in non-unit test environment: diff --git a/lib/pbench/agent/tool_meister_start.py b/lib/pbench/agent/tool_meister_start.py index bed03ac2f5..988613b2e2 100644 --- a/lib/pbench/agent/tool_meister_start.py +++ b/lib/pbench/agent/tool_meister_start.py @@ -133,6 +133,7 @@ import socket import sys import time +from typing import Dict, Union from argparse import ArgumentParser, Namespace from distutils.spawn import find_executable @@ -254,10 +255,19 @@ def _waitpid(pid: int) -> int: Raises an exception if the final exit PID is different from the given PID. """ exit_pid, _exit_status = os.waitpid(pid, 0) - if pid != exit_pid: - raise Exception(f"Logic bomb! exit pid, {exit_pid}, does not match pid, {pid}") - exit_status = os.WEXITSTATUS(_exit_status) - return exit_status + assert pid == exit_pid, f"os.waitpid() returned pid {exit_pid}; expected {pid}" + if os.WIFEXITED(_exit_status): + return os.WEXITSTATUS(_exit_status) + elif os.WIFSIGNALED(_exit_status): + raise StartTmsErr( + f"child process killed by signal {os.WTERMSIG(_exit_status)}", + ReturnCode.TDSWAITFAILURE, + ) + else: + raise StartTmsErr( + f"wait for child process returned unexpectedly, status = {_exit_status}", + ReturnCode.TDSWAITFAILURE, + ) class StartTmsErr(ReturnCode.Err): @@ -271,11 +281,9 @@ class StartTmsErr(ReturnCode.Err): def start_tms_via_ssh( exec_dir: Path, ssh_cmd: str, - ssh_path: Path, tool_group: ToolGroup, ssh_opts: str, redis_server: RedisServerCommon, - redis_client: redis.Redis, logger: logging.Logger, ) -> None: """Orchestrate the creation of local and remote Tool Meister instances using @@ -297,7 +305,7 @@ def start_tms_via_ssh( if debug_level: cmd += f" {debug_level}" template = TemplateSsh(ssh_cmd, shlex.split(ssh_opts), cmd) - tms = dict() + tms: Dict[str, Union[str, int, Dict[str, str]]] = {} tm_count = 0 for host in tool_group.hostnames.keys(): tm_count += 1 @@ -412,11 +420,9 @@ class ToolDataSink(BaseServer): def start( self, exec_dir: Path, - full_hostname: str, tds_param_key: str, redis_server: RedisServerCommon, redis_client: redis.Redis, - logger: logging.Logger, ) -> None: assert ( self.host is not None @@ -449,9 +455,9 @@ def start( # Wait for the child to finish daemonizing itself. retcode = _waitpid(pid) if retcode != 0: - logger.error( - "failed to create pbench data sink, daemonized; return code: %d", - retcode, + raise self.Err( + f"failed to create pbench data sink, daemonized; return code: {retcode}", + ReturnCode.TDSWAITFAILURE, ) except Exception: @@ -490,7 +496,9 @@ def start( if pid_file.exists(): self.pid_file = pid_file else: - logger.error("TDS daemonization didn't create %s", pid_file) + raise self.Err( + f"TDS daemonization didn't create {pid_file}", ReturnCode.TDSWAITFAILURE + ) @staticmethod def wait(chan: RedisChannelSubscriber, logger: logging.Logger) -> int: @@ -581,7 +589,7 @@ def __init__(self, spec: str, def_host_name: str): super().__init__(spec, def_host_name) self.pid_file = None - def start(self, tm_dir: Path, full_hostname: str, logger: logging.Logger) -> None: + def start(self, tm_dir: Path) -> None: """start_redis - configure and start a Redis server. Raises a BaseServer.Err exception if an error is encountered. @@ -705,11 +713,11 @@ def terminate_no_wait( and we only check whether the message was sent. TODO: Ideally, we'd wait some reasonable time for a response from TDS and - then quit; we don't have that mechanism, but this means we may kill a - managed Redis instance before the requests propagate. + then quit; we don't have that mechanism, but this means we may kill a + managed Redis instance before the requests propagate. Args: - group: The tool group we're trying to terminate + tool_group_name: The tool group we're trying to terminate logger: Python Logger redis_client: Redis client key: TDS Redis pubsub key @@ -767,6 +775,8 @@ def start(_prog: str, cli_params: Namespace) -> int: shf = logging.Formatter(f"{prog.name}: %(message)s") sh.setFormatter(shf) logger.addHandler(sh) + tm_dir = None + ssh_cmd = None # + # Step 1. - Load the tool group data for the requested tool group @@ -899,9 +909,8 @@ def start(_prog: str, cli_params: Namespace) -> int: try: if orchestrate: - ssh_cmd = "ssh" - ssh_path = shutil.which(ssh_cmd) - if ssh_path is None: + ssh_cmd = shutil.which("ssh") + if ssh_cmd is None: raise CleanupTime( ReturnCode.MISSINGSSHCMD, "required ssh command not in our PATH" ) @@ -916,7 +925,7 @@ def start(_prog: str, cli_params: Namespace) -> int: origin_ip = set() any_remote = False template = TemplateSsh( - Path(ssh_path), shlex.split(ssh_opts), "echo ${SSH_CONNECTION}" + ssh_cmd, shlex.split(ssh_opts), "echo ${SSH_CONNECTION}" ) recovery.add(template.abort, "stop TM clients") @@ -985,7 +994,7 @@ def start(_prog: str, cli_params: Namespace) -> int: if not redis_server_spec: redis_server_spec = origin - # NOTE: These two assigments create server objects, but neither + # NOTE: These two assignments create server objects, but neither # constructor starts a server, so no cleanup action is needed at this # time. try: @@ -1013,7 +1022,7 @@ def start(_prog: str, cli_params: Namespace) -> int: if orchestrate: logger.debug("2. starting redis server") try: - redis_server.start(tm_dir, full_hostname, logger) + redis_server.start(tm_dir) except redis_server.Err as exc: raise CleanupTime( exc.return_code, f"Failed to start a local Redis server: '{exc}'" @@ -1122,12 +1131,7 @@ def start(_prog: str, cli_params: Namespace) -> int: logger.debug("5. starting tool data sink") try: tool_data_sink.start( - prog.parent, - full_hostname, - tds_param_key, - redis_server, - redis_client, - logger, + prog.parent, tds_param_key, redis_server, redis_client ) except tool_data_sink.Err as exc: raise CleanupTime( @@ -1165,14 +1169,7 @@ def start(_prog: str, cli_params: Namespace) -> int: if orchestrate: try: start_tms_via_ssh( - prog.parent, - ssh_cmd, - Path(ssh_path), - tool_group, - ssh_opts, - redis_server, - redis_client, - logger, + prog.parent, ssh_cmd, tool_group, ssh_opts, redis_server, logger ) except StartTmsErr as exc: raise CleanupTime( diff --git a/lib/pbench/agent/utils.py b/lib/pbench/agent/utils.py index dd7a1324f0..89b27eeb06 100644 --- a/lib/pbench/agent/utils.py +++ b/lib/pbench/agent/utils.py @@ -1,7 +1,6 @@ import ipaddress import logging import os -from pathlib import Path import signal import socket import subprocess @@ -221,16 +220,16 @@ def setup_logging(debug, logfile): level = logging.DEBUG if debug else logging.INFO fmt = "%(message)s" - rootLogger = logging.getLogger() + root_logger = logging.getLogger() # cause all messages to be processed when the logger is the root logger # or delegation to the parent when the logger is a non-root logger # see https://docs.python.org/3/library/logging.html - rootLogger.setLevel(logging.NOTSET) + root_logger.setLevel(logging.NOTSET) streamhandler = logging.StreamHandler() streamhandler.setLevel(level) streamhandler.setFormatter(logging.Formatter(fmt)) - rootLogger.addHandler(streamhandler) + root_logger.addHandler(streamhandler) if logfile: if not os.environ.get("_PBENCH_UNIT_TESTS"): @@ -240,13 +239,13 @@ def setup_logging(debug, logfile): filehandler = logging.FileHandler(logfile) filehandler.setLevel(logging.NOTSET) filehandler.setFormatter(logging.Formatter(fmt)) - rootLogger.addHandler(filehandler) + root_logger.addHandler(filehandler) - return rootLogger + return root_logger def _log_date(): - """_log_data - helper function to mimick previous bash code behaviors + """helper function to mimic previous bash code behaviors Returns an ISO format date string of the current time. If running in a unit test environment, returns a fixed date string. @@ -259,13 +258,13 @@ def _log_date(): def _pbench_log(message): - """_pbench_log - helper function for logging to the ${pbench_log} file.""" + """helper function for logging to the ${pbench_log} file.""" with open(os.environ["pbench_log"], "a+") as fp: print(message, file=fp) def warn_log(msg): - """warn_log - mimick previous bash behavior of writing warning logs to + """mimic previous bash behavior of writing warning logs to both stderr and the ${pbench_log} file. """ message = f"[warn][{_log_date()}] {msg}" @@ -274,7 +273,7 @@ def warn_log(msg): def error_log(msg): - """error_log - mimick previous bash behavior of writing error logs to + """mimic previous bash behavior of writing error logs to both stderr and the ${pbench_log} file. """ message = f"[error][{_log_date()}] {msg}" @@ -283,7 +282,7 @@ def error_log(msg): def info_log(msg): - """info_log - mimick previous bash behavior of writing info logs to + """mimic previous bash behavior of writing info logs to the ${pbench_log} file. """ message = f"[info][{_log_date()}] {msg}" @@ -383,7 +382,7 @@ def collect_local_info(pbench_bin): ) hostdata[arg] = cp.stdout.strip() if cp.stdout is not None else "" - return (version, seqno, sha1, hostdata) + return version, seqno, sha1, hostdata class TemplateSsh: @@ -397,12 +396,12 @@ class Return(NamedTuple): stdout: str stderr: str - def __init__(self, ssh_cmd: Path, ssh_args: List[str], cmd: str): + def __init__(self, ssh_cmd: str, ssh_args: List[str], cmd: str): """ Create an SSH template object Args: - ssh_cmd: Path to the ssh command + ssh_cmd: file path of the ssh command ssh_args: A partial argv representing ssh command options cmd: A templated string representing a remote command to be executed """