From e5c30f6c05f657ab5db2151567000f4828fc519f Mon Sep 17 00:00:00 2001 From: Peter Portante Date: Thu, 21 Apr 2022 12:18:29 -0400 Subject: [PATCH] Isolate parameter handling to ToolDataSink class --- .../test-start-stop-tool-meister/test-51.txt | 2 +- .../test-start-stop-tool-meister/test-52.txt | 2 +- lib/pbench/agent/tool_data_sink.py | 158 +++++++++--------- 3 files changed, 80 insertions(+), 82 deletions(-) diff --git a/agent/util-scripts/gold/test-start-stop-tool-meister/test-51.txt b/agent/util-scripts/gold/test-start-stop-tool-meister/test-51.txt index 9083ace3c3..a625769b07 100644 --- a/agent/util-scripts/gold/test-start-stop-tool-meister/test-51.txt +++ b/agent/util-scripts/gold/test-start-stop-tool-meister/test-51.txt @@ -125,7 +125,7 @@ install_check_output = perf: perf is installed +++ mock-run/tm/pbench-tool-data-sink.err file contents DEBUG pbench-tool-data-sink daemon -- re-constructing Redis server object DEBUG pbench-tool-data-sink daemon -- reconstructed Redis server object -DEBUG pbench-tool-data-sink driver -- params_key (tds-default): {'benchmark_run_dir': '/var/tmp/pbench-test-utils/pbench/mock-run', 'bind_hostname': 'localhost', 'channel_prefix': 'pbench-agent-cli', 'port': 8080, 'tool_group': 'default', 'tool_metadata': "{'persistent': {'dcgm': {'collector': 'prometheus', 'port': '9400'}, 'node-exporter': {'collector': 'prometheus', 'port': '9100'}, 'pcp': {'collector': 'pcp', 'port': '44321'}}, 'transient': {'blktrace': None, 'bpftrace': None, 'cpuacct': None, 'disk': None, 'dm-cache': None, 'docker': None, 'docker-info': None, 'external-data-source': None, 'haproxy-ocp': None, 'iostat': None, 'jmap': None, 'jstack': None, 'kvm-spinlock': None, 'kvmstat': None, 'kvmtrace': None, 'lockstat': None, 'mpstat': None, 'numastat': None, 'oc': None, 'openvswitch': None, 'pcp-transient': None, 'perf': None, 'pidstat': None, 'pprof': None, 'proc-interrupts': None, 'proc-sched_debug': None, 'proc-vmstat': None, 'prometheus-metrics': None, 'qemu-migrate': None, 'rabbit': None, 'sar': None, 'strace': None, 'sysfs': None, 'systemtap': None, 'tcpdump': None, 'turbostat': None, 'user-tool': None, 'virsh-migrate': None, 'vmstat': None}}", 'tool_trigger': None, 'tools': {'testhost.example.com': {'mpstat': '', 'perf': '--record-opts="-a -freq=100 -g --event=branch-misses --event=cache-misses --event=instructions" --report-opts="-I -g"'}}} +DEBUG pbench-tool-data-sink driver -- params_key (tds-default): {'benchmark_run_dir': '/var/tmp/pbench-test-utils/pbench/mock-run', 'bind_hostname': 'localhost', 'channel_prefix': 'pbench-agent-cli', 'optional_md': {'config': '', 'date': '1900-01-01T00:00:00', 'script': 'fake-bm', 'ssh_opts': '-o BatchMode=yes -o StrictHostKeyChecking=no'}, 'port': 8080, 'tool_group': 'default', 'tool_metadata': {'persistent': {'dcgm': {'collector': 'prometheus', 'port': '9400'}, 'node-exporter': {'collector': 'prometheus', 'port': '9100'}, 'pcp': {'collector': 'pcp', 'port': '44321'}}, 'transient': {'blktrace': None, 'bpftrace': None, 'cpuacct': None, 'disk': None, 'dm-cache': None, 'docker': None, 'docker-info': None, 'external-data-source': None, 'haproxy-ocp': None, 'iostat': None, 'jmap': None, 'jstack': None, 'kvm-spinlock': None, 'kvmstat': None, 'kvmtrace': None, 'lockstat': None, 'mpstat': None, 'numastat': None, 'oc': None, 'openvswitch': None, 'pcp-transient': None, 'perf': None, 'pidstat': None, 'pprof': None, 'proc-interrupts': None, 'proc-sched_debug': None, 'proc-vmstat': None, 'prometheus-metrics': None, 'qemu-migrate': None, 'rabbit': None, 'sar': None, 'strace': None, 'sysfs': None, 'systemtap': None, 'tcpdump': None, 'turbostat': None, 'user-tool': None, 'virsh-migrate': None, 'vmstat': None}}, 'tool_trigger': None, 'tools': {'testhost.example.com': {'mpstat': '', 'perf': '--record-opts="-a -freq=100 -g --event=branch-misses --event=cache-misses --event=instructions" --report-opts="-I -g"'}}} INFO pbench-tool-data-sink web_server_run -- Running Bottle web server ... Bottle v#.##.## server starting up (using DataSinkWsgiServer(handler_class=.DataSinkWsgiRequestHandler'>))... Listening on http://localhost:8080/ diff --git a/agent/util-scripts/gold/test-start-stop-tool-meister/test-52.txt b/agent/util-scripts/gold/test-start-stop-tool-meister/test-52.txt index cac63634b2..c5a45b868e 100644 --- a/agent/util-scripts/gold/test-start-stop-tool-meister/test-52.txt +++ b/agent/util-scripts/gold/test-start-stop-tool-meister/test-52.txt @@ -125,7 +125,7 @@ install_check_output = perf: perf is installed +++ mock-run/tm/pbench-tool-data-sink.err file contents DEBUG pbench-tool-data-sink daemon -- re-constructing Redis server object DEBUG pbench-tool-data-sink daemon -- reconstructed Redis server object -DEBUG pbench-tool-data-sink driver -- params_key (tds-mygroup): {'benchmark_run_dir': '/var/tmp/pbench-test-utils/pbench/mock-run', 'bind_hostname': 'localhost', 'channel_prefix': 'pbench-agent-cli', 'port': 8080, 'tool_group': 'mygroup', 'tool_metadata': "{'persistent': {'dcgm': {'collector': 'prometheus', 'port': '9400'}, 'node-exporter': {'collector': 'prometheus', 'port': '9100'}, 'pcp': {'collector': 'pcp', 'port': '44321'}}, 'transient': {'blktrace': None, 'bpftrace': None, 'cpuacct': None, 'disk': None, 'dm-cache': None, 'docker': None, 'docker-info': None, 'external-data-source': None, 'haproxy-ocp': None, 'iostat': None, 'jmap': None, 'jstack': None, 'kvm-spinlock': None, 'kvmstat': None, 'kvmtrace': None, 'lockstat': None, 'mpstat': None, 'numastat': None, 'oc': None, 'openvswitch': None, 'pcp-transient': None, 'perf': None, 'pidstat': None, 'pprof': None, 'proc-interrupts': None, 'proc-sched_debug': None, 'proc-vmstat': None, 'prometheus-metrics': None, 'qemu-migrate': None, 'rabbit': None, 'sar': None, 'strace': None, 'sysfs': None, 'systemtap': None, 'tcpdump': None, 'turbostat': None, 'user-tool': None, 'virsh-migrate': None, 'vmstat': None}}", 'tool_trigger': None, 'tools': {'testhost.example.com': {'mpstat': '', 'perf': '--record-opts="-a -freq=100 -g --event=branch-misses --event=cache-misses --event=instructions" --report-opts="-I -g"'}}} +DEBUG pbench-tool-data-sink driver -- params_key (tds-mygroup): {'benchmark_run_dir': '/var/tmp/pbench-test-utils/pbench/mock-run', 'bind_hostname': 'localhost', 'channel_prefix': 'pbench-agent-cli', 'optional_md': {'config': '', 'date': '1900-01-01T00:00:00', 'script': 'fake-bm', 'ssh_opts': '-o BatchMode=yes -o StrictHostKeyChecking=no'}, 'port': 8080, 'tool_group': 'mygroup', 'tool_metadata': {'persistent': {'dcgm': {'collector': 'prometheus', 'port': '9400'}, 'node-exporter': {'collector': 'prometheus', 'port': '9100'}, 'pcp': {'collector': 'pcp', 'port': '44321'}}, 'transient': {'blktrace': None, 'bpftrace': None, 'cpuacct': None, 'disk': None, 'dm-cache': None, 'docker': None, 'docker-info': None, 'external-data-source': None, 'haproxy-ocp': None, 'iostat': None, 'jmap': None, 'jstack': None, 'kvm-spinlock': None, 'kvmstat': None, 'kvmtrace': None, 'lockstat': None, 'mpstat': None, 'numastat': None, 'oc': None, 'openvswitch': None, 'pcp-transient': None, 'perf': None, 'pidstat': None, 'pprof': None, 'proc-interrupts': None, 'proc-sched_debug': None, 'proc-vmstat': None, 'prometheus-metrics': None, 'qemu-migrate': None, 'rabbit': None, 'sar': None, 'strace': None, 'sysfs': None, 'systemtap': None, 'tcpdump': None, 'turbostat': None, 'user-tool': None, 'virsh-migrate': None, 'vmstat': None}}, 'tool_trigger': None, 'tools': {'testhost.example.com': {'mpstat': '', 'perf': '--record-opts="-a -freq=100 -g --event=branch-misses --event=cache-misses --event=instructions" --report-opts="-I -g"'}}} INFO pbench-tool-data-sink web_server_run -- Running Bottle web server ... Bottle v#.##.## server starting up (using DataSinkWsgiServer(handler_class=.DataSinkWsgiRequestHandler'>))... Listening on http://localhost:8080/ diff --git a/lib/pbench/agent/tool_data_sink.py b/lib/pbench/agent/tool_data_sink.py index 17c5d744d1..84e2fd4c28 100644 --- a/lib/pbench/agent/tool_data_sink.py +++ b/lib/pbench/agent/tool_data_sink.py @@ -730,13 +730,31 @@ def validate(self, directory): return local_dir +class ExternalEnvironment(NamedTuple): + """Encapsulation of the various external environment parameters needed by + the operation of the Tool Data Sink. + """ + + cp_path: str + hostname: str + pbench_bin: Path + pbench_run: str + PROG: str + tar_path: str + + class ToolDataSinkParams(NamedTuple): - benchmark_run_dir: BenchmarkRunDir + """Encapsulation of the parameter set provided to the Tool Data Sink by + the orchestrator. + """ + + benchmark_run_dir: str bind_hostname: str - port: str channel_prefix: str + optional_md: Dict[str, str] + port: str tool_group: str - tool_metadata: ToolMetadata + tool_metadata: Dict[str, str] tool_trigger: str tools: Dict[str, str] @@ -754,17 +772,16 @@ class ToolDataSink(Bottle): _data_actions = frozenset(("send", "sysinfo")) @staticmethod - def fetch_params( - params: Dict[str, Any], benchmark_run_dir: BenchmarkRunDir - ) -> ToolDataSinkParams: + def fetch_params(params: Dict[str, Any]) -> ToolDataSinkParams: try: return ToolDataSinkParams( - benchmark_run_dir=benchmark_run_dir, + benchmark_run_dir=params["benchmark_run_dir"], bind_hostname=params["bind_hostname"], - port=params["port"], channel_prefix=params["channel_prefix"], + optional_md=params.get("optional_md", dict()), + port=params["port"], tool_group=params["group"], - tool_metadata=ToolMetadata.tool_md_from_dict(params["tool_metadata"]), + tool_metadata=params["tool_metadata"], tool_trigger=params["tool_trigger"], tools=params["tools"], ) @@ -773,15 +790,11 @@ def fetch_params( def __init__( self, - pbench_bin: Path, - hostname: str, - tar_path: str, - cp_path: str, + ext_env: ExternalEnvironment, redis_server: redis.Redis, redis_host: str, redis_port: int, tdsp: ToolDataSinkParams, - optional_md: Dict[str, Any], logger: logging.Logger, ): """Constructor for the Tool Data Sink object - responsible for @@ -790,15 +803,19 @@ def __init__( """ super(ToolDataSink, self).__init__() # Save external state - self.pbench_bin = pbench_bin - self.hostname = hostname - self.tar_path = tar_path - self.cp_path = cp_path + self.pbench_bin = ext_env.pbench_bin + self.hostname = ext_env.hostname + self.tar_path = ext_env.tar_path + self.cp_path = ext_env.cp_path self.redis_server = redis_server self.redis_host = redis_host self.redis_port = redis_port + self.benchmark_run_dir = BenchmarkRunDir( + tdsp.benchmark_run_dir, ext_env.pbench_run + ) + self.tool_metadata = ToolMetadata.tool_md_from_dict(tdsp.tool_metadata) + self.optional_md = tdsp.optional_md self.params = tdsp - self.optional_md = optional_md self.logger = logger # Initialize internal state self.action = None @@ -936,7 +953,7 @@ def tm_log_capture(self): # logs from remote Tool Meisters. logger = logging.getLogger("tm_log_capture_thread") logger.setLevel(logging.WARNING) - tm_log_file = self.params.benchmark_run_dir.local / "tm" / "tm.logs" + tm_log_file = self.benchmark_run_dir.local / "tm" / "tm.logs" with tm_log_file.open("w") as fp: try: with self._lock: @@ -1028,8 +1045,8 @@ def record_tms(self, tms): The second thing we do is record all the data and metadata about the Tool Meisters in the ${benchmark_run_dir}/metadata.log file. """ - persistent_tools_l = self.params.tool_metadata.getPersistentTools() - transient_tools_l = self.params.tool_metadata.getTransientTools() + persistent_tools_l = self.tool_metadata.getPersistentTools() + transient_tools_l = self.tool_metadata.getTransientTools() for host, tm in tms.items(): assert tm["kind"] == "tm", f"what? {tm!r}" assert "tools" in tm, f"what? {tm!r}" @@ -1080,7 +1097,7 @@ def record_tms(self, tms): home = os.environ.get("HOME", "") if home: src = str(Path(home) / ".ssh" / "config") - dst = str(self.params.benchmark_run_dir.local / "ssh.config") + dst = str(self.benchmark_run_dir.local / "ssh.config") try: shutil.copyfile(src, dst) except FileNotFoundError: @@ -1090,7 +1107,7 @@ def record_tms(self, tms): # cp -L /etc/ssh/ssh_config ${dir}/ > /dev/null 2>&1 etc_ssh = Path("/etc") / "ssh" src = str(etc_ssh / "ssh_config") - dst = str(self.params.benchmark_run_dir.local / "ssh_config") + dst = str(self.benchmark_run_dir.local / "ssh_config") try: shutil.copyfile(src, dst) except FileNotFoundError: @@ -1127,14 +1144,14 @@ def record_tms(self, tms): self.cp_path, "-rL", "/etc/ssh/ssh_config.d", - f"{self.params.benchmark_run_dir.local}/", + f"{self.benchmark_run_dir.local}/", ], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) - mdlog_name = self.params.benchmark_run_dir.local / "metadata.log" + mdlog_name = self.benchmark_run_dir.local / "metadata.log" mdlog = MetadataLog() try: with mdlog_name.open("r") as fp: @@ -1145,22 +1162,22 @@ def record_tms(self, tms): section = "pbench" mdlog.add_section(section) - mdlog.set(section, "config", self.optional_md["config"]) - mdlog.set(section, "date", self.optional_md["date"]) - mdlog.set(section, "name", self.params.benchmark_run_dir.local.name) + mdlog.set(section, "config", self.optional_md.get("config", "")) + mdlog.set(section, "date", self.optional_md.get("date", "")) + mdlog.set(section, "name", self.benchmark_run_dir.local.name) version, seqno, sha1, hostdata = collect_local_info(self.pbench_bin) rpm_version = f"v{version}-{seqno}g{sha1}" mdlog.set(section, "rpm-version", rpm_version) rpm_versions = dict() rpm_versions[rpm_version] = 1 - mdlog.set(section, "script", self.optional_md["script"]) + mdlog.set(section, "script", self.optional_md.get("script", "")) section = "controller" mdlog.add_section(section) mdlog.set(section, "hostname", self.hostname) for hd_key, hd_val in sorted(hostdata.items()): mdlog.set(section, f"hostname-{hd_key}", hd_val) - mdlog.set(section, "ssh_opts", self.optional_md["ssh_opts"]) + mdlog.set(section, "ssh_opts", self.optional_md.get("ssh_opts", "")) section = "run" mdlog.add_section(section) @@ -1438,7 +1455,7 @@ def execute_action(self, action, directory_str, args, data): # the caller wants to report that it is stopping all the Tool # Meisters due to an interruption (SIGINT or otherwise). # - mdlog_name = self.params.benchmark_run_dir.local / "metadata.log" + mdlog_name = self.benchmark_run_dir.local / "metadata.log" mdlog = MetadataLog() try: with mdlog_name.open("r") as fp: @@ -1457,7 +1474,7 @@ def execute_action(self, action, directory_str, args, data): if args["interrupt"]: # args["interrupt"] == True ==> run / run_interrupted mdlog.set(section, "run_interrupted", "true") - iterations = self.params.benchmark_run_dir.local / ".iterations" + iterations = self.benchmark_run_dir.local / ".iterations" try: iterations_val = iterations.read_text() except FileNotFoundError: @@ -1483,17 +1500,17 @@ def execute_action(self, action, directory_str, args, data): return try: - local_dir = self.params.benchmark_run_dir.validate(directory_str) - except self.params.benchmark_run_dir.Prefix: + local_dir = self.benchmark_run_dir.validate(directory_str) + except self.benchmark_run_dir.Prefix: self.logger.error( "action '%s' with invalid directory, '%s' (not a sub-directory of '%s')", action, directory_str, - self.params.benchmark_run_dir, + self.benchmark_run_dir, ) self._send_client_status(action, "directory not a sub-dir of run directory") return - except self.params.benchmark_run_dir.Exists: + except self.benchmark_run_dir.Exists: self.logger.error( "action '%s' with invalid directory, '%s' (does not exist)", action, @@ -1520,7 +1537,7 @@ def execute_action(self, action, directory_str, args, data): pcp_tools = [] persist_tools = self._tm_tracking[tm]["persistent_tools"] for tool in persist_tools: - tool_data = self.params.tool_metadata.getProperties(tool) + tool_data = self.tool_metadata.getProperties(tool) if tool_data["collector"] == "prometheus": prom_tools.append(tool) elif tool_data["collector"] == "pcp": @@ -1547,10 +1564,10 @@ def execute_action(self, action, directory_str, args, data): if prom_tool_dict: self._prom_server = PromCollector( self.pbench_bin, - self.params.benchmark_run_dir, + self.benchmark_run_dir, self.params.tool_group, prom_tool_dict, - self.params.tool_metadata, + self.tool_metadata, self.tar_path, logger=self.logger, ) @@ -1562,10 +1579,10 @@ def execute_action(self, action, directory_str, args, data): ) self._pcp_server = PcpCollector( self.pbench_bin, - self.params.benchmark_run_dir, + self.benchmark_run_dir, self.params.tool_group, pcp_tool_dict, - self.params.tool_metadata, + self.tool_metadata, self.tar_path, redis_host=self.redis_host, redis_port=self.redis_port, @@ -1879,34 +1896,25 @@ class Arguments(NamedTuple): def driver( - PROG: str, + ext_env: ExternalEnvironment, redis_server: redis.Redis, parsed: Arguments, - pbench_bin: Path, - hostname: str, - tar_path: str, - cp_path: str, tdsp: ToolDataSinkParams, - optional_md: Dict[str, Any], logger: logging.Logger = None, ): """Create and drive a Tool Data Sink instance""" if logger is None: - logger = get_logger(PROG, level=parsed.level) + logger = get_logger(ext_env.PROG, level=parsed.level) logger.debug("params_key (%s): %s", parsed.key, tdsp) try: with ToolDataSink( - pbench_bin, - hostname, - tar_path, - cp_path, + ext_env, redis_server, parsed.host, parsed.port, tdsp, - optional_md, logger, ) as tds_app: tds_app.execute() @@ -1930,15 +1938,10 @@ def driver( def daemon( - PROG: str, + ext_env: ExternalEnvironment, redis_server: redis.Redis, parsed: Arguments, - pbench_bin: Path, - hostname: str, - tar_path: str, - cp_path: str, tdsp: ToolDataSinkParams, - optional_md: Dict[str, Any], ): """Daemonize a Tool Data Sink instance""" # Disconnect any existing connections to the Redis server. @@ -1949,10 +1952,10 @@ def daemon( sys.stderr.flush() sys.stdout.flush() - pidfile_name = f"{PROG}.pid" + pidfile_name = f"{ext_env.PROG}.pid" pfctx = pidfile.PIDFile(pidfile_name) - with open(f"{PROG}.out", "w") as sofp, open( - f"{PROG}.err", "w" + with open(f"{ext_env.PROG}.out", "w") as sofp, open( + f"{ext_env.PROG}.err", "w" ) as sefp, DaemonContext( stdout=sofp, stderr=sefp, @@ -1960,7 +1963,7 @@ def daemon( umask=0o022, pidfile=pfctx, ): - logger = get_logger(PROG, daemon=True, level=parsed.level) + logger = get_logger(ext_env.PROG, daemon=True, level=parsed.level) # We have to re-open the connection to the redis server now that we # are "daemonized". @@ -1978,15 +1981,10 @@ def daemon( else: logger.debug("reconstructed Redis server object") return driver( - PROG, + ext_env, redis_server, parsed, - pbench_bin, - hostname, - tar_path, - cp_path, tdsp, - optional_md, logger=logger, ) @@ -2062,9 +2060,7 @@ def start(prog: Path, parsed: Arguments): # E.g. params = '{ "channel_prefix": "some-prefix", # "benchmark_run_dir": "/loo/goo" }' params = json.loads(params_str) - - benchmark_run_dir = BenchmarkRunDir(params["benchmark_run_dir"], pbench_run) - tdsp = ToolDataSink.fetch_params(params, benchmark_run_dir) + tdsp = ToolDataSink.fetch_params(params) except Exception as ex: print( f"Unable to fetch and decode parameter key, {parsed.key}: {ex}", @@ -2072,19 +2068,21 @@ def start(prog: Path, parsed: Arguments): ) return 6 - optional_md = params.get("optional_md", dict()) + ext_env = ExternalEnvironment( + cp_path=cp_path, + hostname=hostname, + pbench_bin=pbench_bin, + pbench_run=pbench_run, + PROG=PROG, + tar_path=tar_path, + ) func = daemon if parsed.daemonize else driver ret_val = func( - PROG, + ext_env, redis_server, parsed, - pbench_bin, - hostname, - tar_path, - cp_path, tdsp, - optional_md, ) return ret_val