Skip to content

Commit

Permalink
Quick v0.71 fixes (#2821)
Browse files Browse the repository at this point in the history
Quick fixes for v0.71; pick lint
  • Loading branch information
webbnh authored May 10, 2022
1 parent 0889f37 commit 32e07f5
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 55 deletions.
8 changes: 4 additions & 4 deletions lib/pbench/agent/tool_meister.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,9 @@ def install(self) -> InstallationResult:
if executable is None:
return InstallationResult(returncode=1, output="pcp tool (pmcd) not found")
# FIXME - The Tool Data Sink and Tool Meister have to agree on the
# exact port number to use. We can't use the default `pmcd` port
# number because it might conflict with an existing `pmcd`
# deployment out of our control.
# exact port number to use. We can't use the default `pmcd` port
# number because it might conflict with an existing `pmcd`
# deployment out of our control.
self.args = [
executable,
"--foreground",
Expand Down Expand Up @@ -1798,7 +1798,7 @@ def sysinfo(self, data: Dict[str, str]) -> int:
def get_logger(
logger_name: str, is_daemon: bool = False, level: str = "info"
) -> logging.Logger:
"""Contruct a logger for a Tool Meister instance.
"""Construct a logger for a Tool Meister instance.
If in the Unit Test environment, just log to console.
If in non-unit test environment:
Expand Down
71 changes: 34 additions & 37 deletions lib/pbench/agent/tool_meister_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import socket
import sys
import time
from typing import Dict, Union

from argparse import ArgumentParser, Namespace
from distutils.spawn import find_executable
Expand Down Expand Up @@ -254,10 +255,19 @@ def _waitpid(pid: int) -> int:
Raises an exception if the final exit PID is different from the given PID.
"""
exit_pid, _exit_status = os.waitpid(pid, 0)
if pid != exit_pid:
raise Exception(f"Logic bomb! exit pid, {exit_pid}, does not match pid, {pid}")
exit_status = os.WEXITSTATUS(_exit_status)
return exit_status
assert pid == exit_pid, f"os.waitpid() returned pid {exit_pid}; expected {pid}"
if os.WIFEXITED(_exit_status):
return os.WEXITSTATUS(_exit_status)
elif os.WIFSIGNALED(_exit_status):
raise StartTmsErr(
f"child process killed by signal {os.WTERMSIG(_exit_status)}",
ReturnCode.TDSWAITFAILURE,
)
else:
raise StartTmsErr(
f"wait for child process returned unexpectedly, status = {_exit_status}",
ReturnCode.TDSWAITFAILURE,
)


class StartTmsErr(ReturnCode.Err):
Expand All @@ -271,11 +281,9 @@ class StartTmsErr(ReturnCode.Err):
def start_tms_via_ssh(
exec_dir: Path,
ssh_cmd: str,
ssh_path: Path,
tool_group: ToolGroup,
ssh_opts: str,
redis_server: RedisServerCommon,
redis_client: redis.Redis,
logger: logging.Logger,
) -> None:
"""Orchestrate the creation of local and remote Tool Meister instances using
Expand All @@ -297,7 +305,7 @@ def start_tms_via_ssh(
if debug_level:
cmd += f" {debug_level}"
template = TemplateSsh(ssh_cmd, shlex.split(ssh_opts), cmd)
tms = dict()
tms: Dict[str, Union[str, int, Dict[str, str]]] = {}
tm_count = 0
for host in tool_group.hostnames.keys():
tm_count += 1
Expand Down Expand Up @@ -412,11 +420,9 @@ class ToolDataSink(BaseServer):
def start(
self,
exec_dir: Path,
full_hostname: str,
tds_param_key: str,
redis_server: RedisServerCommon,
redis_client: redis.Redis,
logger: logging.Logger,
) -> None:
assert (
self.host is not None
Expand Down Expand Up @@ -449,9 +455,9 @@ def start(
# Wait for the child to finish daemonizing itself.
retcode = _waitpid(pid)
if retcode != 0:
logger.error(
"failed to create pbench data sink, daemonized; return code: %d",
retcode,
raise self.Err(
f"failed to create pbench data sink, daemonized; return code: {retcode}",
ReturnCode.TDSWAITFAILURE,
)

except Exception:
Expand Down Expand Up @@ -490,7 +496,9 @@ def start(
if pid_file.exists():
self.pid_file = pid_file
else:
logger.error("TDS daemonization didn't create %s", pid_file)
raise self.Err(
f"TDS daemonization didn't create {pid_file}", ReturnCode.TDSWAITFAILURE
)

@staticmethod
def wait(chan: RedisChannelSubscriber, logger: logging.Logger) -> int:
Expand Down Expand Up @@ -581,7 +589,7 @@ def __init__(self, spec: str, def_host_name: str):
super().__init__(spec, def_host_name)
self.pid_file = None

def start(self, tm_dir: Path, full_hostname: str, logger: logging.Logger) -> None:
def start(self, tm_dir: Path) -> None:
"""start_redis - configure and start a Redis server.
Raises a BaseServer.Err exception if an error is encountered.
Expand Down Expand Up @@ -705,11 +713,11 @@ def terminate_no_wait(
and we only check whether the message was sent.
TODO: Ideally, we'd wait some reasonable time for a response from TDS and
then quit; we don't have that mechanism, but this means we may kill a
managed Redis instance before the requests propagate.
then quit; we don't have that mechanism, but this means we may kill a
managed Redis instance before the requests propagate.
Args:
group: The tool group we're trying to terminate
tool_group_name: The tool group we're trying to terminate
logger: Python Logger
redis_client: Redis client
key: TDS Redis pubsub key
Expand Down Expand Up @@ -767,6 +775,8 @@ def start(_prog: str, cli_params: Namespace) -> int:
shf = logging.Formatter(f"{prog.name}: %(message)s")
sh.setFormatter(shf)
logger.addHandler(sh)
tm_dir = None
ssh_cmd = None

# +
# Step 1. - Load the tool group data for the requested tool group
Expand Down Expand Up @@ -899,9 +909,8 @@ def start(_prog: str, cli_params: Namespace) -> int:

try:
if orchestrate:
ssh_cmd = "ssh"
ssh_path = shutil.which(ssh_cmd)
if ssh_path is None:
ssh_cmd = shutil.which("ssh")
if ssh_cmd is None:
raise CleanupTime(
ReturnCode.MISSINGSSHCMD, "required ssh command not in our PATH"
)
Expand All @@ -916,7 +925,7 @@ def start(_prog: str, cli_params: Namespace) -> int:
origin_ip = set()
any_remote = False
template = TemplateSsh(
Path(ssh_path), shlex.split(ssh_opts), "echo ${SSH_CONNECTION}"
ssh_cmd, shlex.split(ssh_opts), "echo ${SSH_CONNECTION}"
)
recovery.add(template.abort, "stop TM clients")

Expand Down Expand Up @@ -985,7 +994,7 @@ def start(_prog: str, cli_params: Namespace) -> int:
if not redis_server_spec:
redis_server_spec = origin

# NOTE: These two assigments create server objects, but neither
# NOTE: These two assignments create server objects, but neither
# constructor starts a server, so no cleanup action is needed at this
# time.
try:
Expand Down Expand Up @@ -1013,7 +1022,7 @@ def start(_prog: str, cli_params: Namespace) -> int:
if orchestrate:
logger.debug("2. starting redis server")
try:
redis_server.start(tm_dir, full_hostname, logger)
redis_server.start(tm_dir)
except redis_server.Err as exc:
raise CleanupTime(
exc.return_code, f"Failed to start a local Redis server: '{exc}'"
Expand Down Expand Up @@ -1122,12 +1131,7 @@ def start(_prog: str, cli_params: Namespace) -> int:
logger.debug("5. starting tool data sink")
try:
tool_data_sink.start(
prog.parent,
full_hostname,
tds_param_key,
redis_server,
redis_client,
logger,
prog.parent, tds_param_key, redis_server, redis_client
)
except tool_data_sink.Err as exc:
raise CleanupTime(
Expand Down Expand Up @@ -1165,14 +1169,7 @@ def start(_prog: str, cli_params: Namespace) -> int:
if orchestrate:
try:
start_tms_via_ssh(
prog.parent,
ssh_cmd,
Path(ssh_path),
tool_group,
ssh_opts,
redis_server,
redis_client,
logger,
prog.parent, ssh_cmd, tool_group, ssh_opts, redis_server, logger
)
except StartTmsErr as exc:
raise CleanupTime(
Expand Down
27 changes: 13 additions & 14 deletions lib/pbench/agent/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import ipaddress
import logging
import os
from pathlib import Path
import signal
import socket
import subprocess
Expand Down Expand Up @@ -221,16 +220,16 @@ def setup_logging(debug, logfile):
level = logging.DEBUG if debug else logging.INFO
fmt = "%(message)s"

rootLogger = logging.getLogger()
root_logger = logging.getLogger()
# cause all messages to be processed when the logger is the root logger
# or delegation to the parent when the logger is a non-root logger
# see https://docs.python.org/3/library/logging.html
rootLogger.setLevel(logging.NOTSET)
root_logger.setLevel(logging.NOTSET)

streamhandler = logging.StreamHandler()
streamhandler.setLevel(level)
streamhandler.setFormatter(logging.Formatter(fmt))
rootLogger.addHandler(streamhandler)
root_logger.addHandler(streamhandler)

if logfile:
if not os.environ.get("_PBENCH_UNIT_TESTS"):
Expand All @@ -240,13 +239,13 @@ def setup_logging(debug, logfile):
filehandler = logging.FileHandler(logfile)
filehandler.setLevel(logging.NOTSET)
filehandler.setFormatter(logging.Formatter(fmt))
rootLogger.addHandler(filehandler)
root_logger.addHandler(filehandler)

return rootLogger
return root_logger


def _log_date():
"""_log_data - helper function to mimick previous bash code behaviors
"""helper function to mimic previous bash code behaviors
Returns an ISO format date string of the current time. If running in
a unit test environment, returns a fixed date string.
Expand All @@ -259,13 +258,13 @@ def _log_date():


def _pbench_log(message):
"""_pbench_log - helper function for logging to the ${pbench_log} file."""
"""helper function for logging to the ${pbench_log} file."""
with open(os.environ["pbench_log"], "a+") as fp:
print(message, file=fp)


def warn_log(msg):
"""warn_log - mimick previous bash behavior of writing warning logs to
"""mimic previous bash behavior of writing warning logs to
both stderr and the ${pbench_log} file.
"""
message = f"[warn][{_log_date()}] {msg}"
Expand All @@ -274,7 +273,7 @@ def warn_log(msg):


def error_log(msg):
"""error_log - mimick previous bash behavior of writing error logs to
"""mimic previous bash behavior of writing error logs to
both stderr and the ${pbench_log} file.
"""
message = f"[error][{_log_date()}] {msg}"
Expand All @@ -283,7 +282,7 @@ def error_log(msg):


def info_log(msg):
"""info_log - mimick previous bash behavior of writing info logs to
"""mimic previous bash behavior of writing info logs to
the ${pbench_log} file.
"""
message = f"[info][{_log_date()}] {msg}"
Expand Down Expand Up @@ -383,7 +382,7 @@ def collect_local_info(pbench_bin):
)
hostdata[arg] = cp.stdout.strip() if cp.stdout is not None else ""

return (version, seqno, sha1, hostdata)
return version, seqno, sha1, hostdata


class TemplateSsh:
Expand All @@ -397,12 +396,12 @@ class Return(NamedTuple):
stdout: str
stderr: str

def __init__(self, ssh_cmd: Path, ssh_args: List[str], cmd: str):
def __init__(self, ssh_cmd: str, ssh_args: List[str], cmd: str):
"""
Create an SSH template object
Args:
ssh_cmd: Path to the ssh command
ssh_cmd: file path of the ssh command
ssh_args: A partial argv representing ssh command options
cmd: A templated string representing a remote command to be executed
"""
Expand Down

0 comments on commit 32e07f5

Please sign in to comment.