From 842b24454f8c7191d0c27b5e9c976859a95f77fa Mon Sep 17 00:00:00 2001 From: Octavian Purdila Date: Fri, 13 Oct 2023 09:35:22 +0000 Subject: [PATCH] pw_emu: Add user APIs and the command line interface Add the user APIs, the command line interface, as well as related tests and documentation. Change-Id: I8990acc938dc918331508fc18fb1b2863d9adeed Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/173611 Presubmit-Verified: CQ Bot Account Commit-Queue: Octavian Purdila Reviewed-by: Armando Montanez --- pigweed.json | 4 + pw_cli/py/pw_cli/pw_command_plugins.py | 1 + pw_emu/api.rst | 3 + pw_emu/cli.rst | 8 + pw_emu/design.rst | 4 + pw_emu/py/BUILD.gn | 4 + pw_emu/py/pw_emu/__main__.py | 440 +++++++++++++++++++++++++ pw_emu/py/pw_emu/frontend.py | 328 ++++++++++++++++++ pw_emu/py/tests/cli_test.py | 274 +++++++++++++++ pw_emu/py/tests/common.py | 11 + pw_emu/py/tests/frontend_test.py | 150 +++++++++ 11 files changed, 1227 insertions(+) create mode 100644 pw_emu/py/pw_emu/__main__.py create mode 100644 pw_emu/py/pw_emu/frontend.py create mode 100644 pw_emu/py/tests/cli_test.py create mode 100644 pw_emu/py/tests/frontend_test.py diff --git a/pigweed.json b/pigweed.json index ef29fe0793..8374d91b36 100644 --- a/pigweed.json +++ b/pigweed.json @@ -23,6 +23,10 @@ "module": "pw_system.console", "function": "main" }, + "emu": { + "module": "pw_emu.__main__", + "function": "main" + }, "format": { "module": "pw_presubmit.format_code", "function": "main" diff --git a/pw_cli/py/pw_cli/pw_command_plugins.py b/pw_cli/py/pw_cli/pw_command_plugins.py index 5a27f1f226..cfef10a42e 100644 --- a/pw_cli/py/pw_cli/pw_command_plugins.py +++ b/pw_cli/py/pw_cli/pw_command_plugins.py @@ -31,6 +31,7 @@ def _register_builtin_plugins(registry: plugins.Registry) -> None: # Register these by name to avoid circular dependencies. registry.register_by_name('bloat', 'pw_bloat.__main__', 'main') registry.register_by_name('doctor', 'pw_doctor.doctor', 'main') + registry.register_by_name('emu', 'pw_emu.__main__', 'main') registry.register_by_name('format', 'pw_presubmit.format_code', 'main') registry.register_by_name('keep-sorted', 'pw_presubmit.keep_sorted', 'main') registry.register_by_name('logdemo', 'pw_cli.log', 'main') diff --git a/pw_emu/api.rst b/pw_emu/api.rst index 7271117e73..4ba2a8a705 100644 --- a/pw_emu/api.rst +++ b/pw_emu/api.rst @@ -7,6 +7,9 @@ API reference :name: pw_emu :tagline: Emulators frontend +.. automodule:: pw_emu.frontend + :members: + .. automodule:: pw_emu.core :noindex: :members: diff --git a/pw_emu/cli.rst b/pw_emu/cli.rst index f67140d291..7b2d75b595 100644 --- a/pw_emu/cli.rst +++ b/pw_emu/cli.rst @@ -6,3 +6,11 @@ CLI reference .. pigweed-module-subpage:: :name: pw_emu :tagline: Pigweed emulator frontend + +.. argparse:: + :module: pw_emu.__main__ + :func: get_parser + :prog: pw emu + :nodefaultconst: + :nodescription: + :noepilog: diff --git a/pw_emu/design.rst b/pw_emu/design.rst index 1e0fc41d4b..66c9bace1c 100644 --- a/pw_emu/design.rst +++ b/pw_emu/design.rst @@ -19,6 +19,8 @@ how to access the emulator channels (e.g. socket ports, pty paths) .. mermaid:: graph TD; + TemporaryEmulator & pw_emu_cli[pw emu cli] <--> Emulator + Emulator <--> Launcher & Connector Launcher <--> Handles Connector <--> Handles Launcher <--> Config @@ -28,6 +30,8 @@ how to access the emulator channels (e.g. socket ports, pty paths) The implementation uses the following classes: +* :py:class:`pw_emu.frontend.Emulator`: the user visible API + * :py:class:`pw_emu.core.Launcher`: an abstract class that starts an emulator instance for a given configuration and target diff --git a/pw_emu/py/BUILD.gn b/pw_emu/py/BUILD.gn index d43d50c2bd..c5e426828a 100644 --- a/pw_emu/py/BUILD.gn +++ b/pw_emu/py/BUILD.gn @@ -24,13 +24,17 @@ pw_python_package("py") { ] sources = [ "pw_emu/__init__.py", + "pw_emu/__main__.py", "pw_emu/core.py", + "pw_emu/frontend.py", "pw_emu/pigweed_emulators.py", ] tests = [ "tests/__init__.py", + "tests/cli_test.py", "tests/common.py", "tests/core_test.py", + "tests/frontend_test.py", ] pylintrc = "$dir_pigweed/.pylintrc" mypy_ini = "$dir_pigweed/.mypy.ini" diff --git a/pw_emu/py/pw_emu/__main__.py b/pw_emu/py/pw_emu/__main__.py new file mode 100644 index 0000000000..66ebbf4a01 --- /dev/null +++ b/pw_emu/py/pw_emu/__main__.py @@ -0,0 +1,440 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Command line interface for the Pigweed emulators frontend""" + +import argparse +import json +import os +from pathlib import Path +import signal +import subprocess +import sys +import threading + +from typing import Any + +from pw_emu.core import Error +from pw_emu.frontend import Emulator +from serial import serial_for_url, SerialException +from serial.tools.miniterm import Miniterm, key_description + +_TERM_CMD = ['python', '-m', 'serial', '--raw'] + + +def _cmd_gdb_cmds(emu, args: argparse.Namespace) -> None: + """Run gdb commands in batch mode.""" + + emu.run_gdb_cmds(args.gdb_cmd, executable=args.executable, pause=args.pause) + + +def _cmd_load(emu: Emulator, args: argparse.Namespace) -> None: + """Load an executable image via gdb start executing it if pause is + not set""" + + args.command = ['load'] + _cmd_gdb_cmds(emu, args) + + +def _cmd_start(emu: Emulator, args: argparse.Namespace) -> None: + """Launch the emulator and start executing, unless pause is set.""" + + if args.runner: + emu.set_emu(args.runner) + + emu.start( + target=args.target, + file=args.file, + pause=args.pause, + args=args.args, + debug=args.debug, + foreground=args.foreground, + ) + + +def _get_miniterm(emu: Emulator, chan: str) -> Miniterm: + chan_type = emu.get_channel_type(chan) + if chan_type == 'tcp': + host, port = emu.get_channel_addr(chan) + url = f'socket://[{host}]:{port}' + elif chan_type == 'pty': + url = emu.get_channel_path(chan) + else: + raise Error(f'unknown channel type `{chan_type}`') + ser = serial_for_url(url) + ser.timeout = 1 + miniterm = Miniterm(ser) + miniterm.raw = True + miniterm.set_tx_encoding('UTF-8') + miniterm.set_rx_encoding('UTF-8') + + quit_key = key_description(miniterm.exit_character) + menu_key = key_description(miniterm.menu_character) + help_key = key_description('\x08') + help_desc = f'Help: {menu_key} followed by {help_key} ---' + + print(f'--- Miniterm on {chan} ---') + print(f'--- Quit: {quit_key} | Menu: {menu_key} | {help_desc}') + + # On POSIX systems miniterm uses TIOCSTI to "cancel" the TX thread + # (reading from the console, sending to the serial) which is + # disabled on Linux kernels > 6.2 see + # https://github.com/pyserial/pyserial/issues/243 + # + # On Windows the cancel method does not seem to work either with + # recent win10 versions. + # + # Workaround by terminating the process for exceptions in the read + # and write threads. + threading.excepthook = lambda args: signal.raise_signal(signal.SIGTERM) + + return miniterm + + +def _cmd_run(emu: Emulator, args: argparse.Namespace) -> None: + """Start the emulator and connect the terminal to a channel. Stop + the emulator when exiting the terminal""" + + emu.start( + target=args.target, + file=args.file, + pause=True, + args=args.args, + ) + + ctrl_chans = ['gdb', 'monitor', 'qmp', 'robot'] + if not args.channel: + for chan in emu.get_channels(): + if chan not in ctrl_chans: + args.channel = chan + break + if not args.channel: + raise Error(f'only control channels {ctrl_chans} found') + + try: + miniterm = _get_miniterm(emu, args.channel) + emu.cont() + miniterm.start() + miniterm.join(True) + print('--- exit ---') + miniterm.stop() + miniterm.join() + miniterm.close() + except SerialException as err: + raise Error(f'error connecting to channel `{args.channel}`: {err}') + finally: + emu.stop() + + +def _cmd_restart(emu: Emulator, args: argparse.Namespace) -> None: + """Restart the emulator and start executing, unless pause is set.""" + + if emu.running(): + emu.stop() + _cmd_start(emu, args) + + +def _cmd_stop(emu: Emulator, _args: argparse.Namespace) -> None: + """Stop the emulator""" + + emu.stop() + + +def _cmd_reset(emu: Emulator, _args: argparse.Namespace) -> None: + """Perform a software reset.""" + + emu.reset() + + +def _cmd_gdb(emu: Emulator, args: argparse.Namespace) -> None: + """Start a gdb interactive session""" + + executable = args.executable if args.executable else "" + + signal.signal(signal.SIGINT, signal.SIG_IGN) + try: + cmd = [ + emu.get_gdb_cmd(), + '-ex', + f'target remote {emu.get_gdb_remote()}', + executable, + ] + subprocess.run(cmd) + finally: + signal.signal(signal.SIGINT, signal.SIG_DFL) + + +def _cmd_prop_ls(emu: Emulator, args: argparse.Namespace) -> None: + """List emulator object properties.""" + + props = emu.list_properties(args.path) + print(json.dumps(props, indent=4)) + + +def _cmd_prop_get(emu: Emulator, args: argparse.Namespace) -> None: + """Show the emulator's object properties.""" + + print(emu.get_property(args.path, args.property)) + + +def _cmd_prop_set(emu: Emulator, args: argparse.Namespace) -> None: + """Set emulator's object properties.""" + + emu.set_property(args.path, args.property, args.value) + + +def _cmd_term(emu: Emulator, args: argparse.Namespace) -> None: + """Connect with an interactive terminal to an emulator channel""" + + try: + miniterm = _get_miniterm(emu, args.channel) + miniterm.start() + miniterm.join(True) + print('--- exit ---') + miniterm.stop() + miniterm.join() + miniterm.close() + except SerialException as err: + raise Error(f'error connecting to channel `{args.channel}`: {err}') + + +def get_parser() -> argparse.ArgumentParser: + """Command line parser""" + + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + '-i', + '--instance', + help='instance to use (default: %(default)s)', + type=str, + metavar='STRING', + default='default', + ) + parser.add_argument( + '-C', + '--working-dir', + help='path to working directory (default: %(default)s)', + type=Path, + default=os.getenv('PW_EMU_WDIR'), + ) + parser.add_argument( + '-c', + '--config', + help='path config file (default: %(default)s)', + type=str, + default=None, + ) + + subparsers = parser.add_subparsers(dest='command', required=True) + + def add_cmd(name: str, func: Any) -> argparse.ArgumentParser: + subparser = subparsers.add_parser( + name, description=func.__doc__, help=func.__doc__ + ) + subparser.set_defaults(func=func) + return subparser + + start = add_cmd('start', _cmd_start) + restart = add_cmd('restart', _cmd_restart) + + for subparser in [start, restart]: + subparser.add_argument( + 'target', + type=str, + ) + subparser.add_argument( + '--file', + '-f', + metavar='FILE', + help='file to load before starting', + ) + subparser.add_argument( + '--runner', + '-r', + help='emulator to use, automatically detected if not set', + choices=[None, 'qemu', 'renode'], + default=None, + ) + subparser.add_argument( + '--args', + '-a', + help='options to pass to the emulator', + ) + subparser.add_argument( + '--pause', + '-p', + action='store_true', + help='pause the emulator after starting it', + ) + subparser.add_argument( + '--debug', + '-d', + action='store_true', + help='start the emulator in debug mode', + ) + subparser.add_argument( + '--foreground', + '-F', + action='store_true', + help='start the emulator in foreground mode', + ) + + run = add_cmd('run', _cmd_run) + run.add_argument( + 'target', + type=str, + ) + run.add_argument( + 'file', + metavar='FILE', + help='file to load before starting', + ) + run.add_argument( + '--args', + '-a', + help='options to pass to the emulator', + ) + run.add_argument( + '--channel', + '-n', + help='channel to connect the terminal to', + ) + + stop = add_cmd('stop', _cmd_stop) + + load = add_cmd('load', _cmd_load) + load.add_argument( + 'executable', + metavar='FILE', + help='file to load via gdb', + ) + load.add_argument( + '--pause', + '-p', + help='pause the emulator after loading the file', + action='store_true', + ) + load.add_argument( + '--offset', + '-o', + metavar='ADDRESS', + help='address to load the file at', + ) + + reset = add_cmd('reset', _cmd_reset) + + gdb = add_cmd('gdb', _cmd_gdb) + gdb.add_argument( + '--executable', + '-e', + metavar='FILE', + help='file to use for the debugging session', + ) + + prop_ls = add_cmd('prop-ls', _cmd_prop_ls) + prop_ls.add_argument( + 'path', + help='path of the emulator object', + ) + + prop_get = add_cmd('prop-get', _cmd_prop_get) + prop_get.add_argument( + 'path', + help='path of the emulator object', + ) + prop_get.add_argument( + 'property', + help='name of the object property', + ) + + prop_set = add_cmd('prop-set', _cmd_prop_set) + prop_set.add_argument( + 'path', + help='path of the emulator object', + ) + prop_set.add_argument( + 'property', + help='name of the object property', + ) + prop_set.add_argument( + 'value', + help='value to set for the object property', + ) + + gdb_cmds = add_cmd('gdb-cmds', _cmd_gdb_cmds) + gdb_cmds.add_argument( + '--pause', + '-p', + help='do not resume execution after running the commands', + action='store_true', + ) + gdb_cmds.add_argument( + '--executable', + '-e', + metavar='FILE', + help='executable to use while running the gdb commands', + ) + gdb_cmds.add_argument( + 'gdb_cmd', + nargs='+', + help='gdb command to execute', + ) + + term = add_cmd('term', _cmd_term) + term.add_argument( + 'channel', + help='channel name', + ) + + parser.epilog = f"""commands usage: + {start.format_usage().strip()} + {restart.format_usage().strip()} + {stop.format_usage().strip()} + {run.format_usage().strip()} + {load.format_usage().strip()} + {reset.format_usage().strip()} + {gdb.format_usage().strip()} + {prop_ls.format_usage().strip()} + {prop_get.format_usage().strip()} + {prop_set.format_usage().strip()} + {gdb_cmds.format_usage().strip()} + {term.format_usage().strip()} + """ + + return parser + + +def main() -> int: + """Emulators frontend command line interface.""" + + args = get_parser().parse_args() + if not args.working_dir: + args.working_dir = ( + f'{os.getenv("PW_PROJECT_ROOT")}/.pw_emu/{args.instance}' + ) + + try: + emu = Emulator(args.working_dir, args.config) + args.func(emu, args) + except Error as err: + print(err) + return 1 + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/pw_emu/py/pw_emu/frontend.py b/pw_emu/py/pw_emu/frontend.py new file mode 100644 index 0000000000..8ecf7d3fe4 --- /dev/null +++ b/pw_emu/py/pw_emu/frontend.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""User API""" + +import io +import os +import subprocess +import tempfile + +from pathlib import Path +from typing import Any, Optional, List, Union, Dict + +from pw_emu.core import ( + AlreadyRunning, + Config, + ConfigError, + Connector, + Launcher, + InvalidEmulator, + InvalidChannelType, + NotRunning, +) + + +class Emulator: + """Launches, controls and interacts with an emulator instance.""" + + def __init__(self, wdir: Path, config_path: Optional[Path] = None) -> None: + self._wdir = wdir + self._config_path = config_path + self._connector: Optional[Connector] = None + self._launcher: Optional[Launcher] = None + + def _get_launcher(self, target: str) -> Launcher: + """Returns an emulator for a given target. + + If there are multiple emulators for the same target it will return + an arbitrary emulator launcher. + """ + config = Config(self._config_path) + target_config = config.get( + ['targets', target], + optional=False, + entry_type=dict, + ) + for key in target_config.keys(): + try: + return Launcher.get(key, self._config_path) + except InvalidEmulator: + pass + raise ConfigError( + self._config_path, + f'could not determine emulator for target `{target}`', + ) + + def start( + self, + target: str, + file: Optional[Path] = None, + pause: bool = False, + debug: bool = False, + foreground: bool = False, + args: Optional[str] = None, + ) -> None: + """Start the emulator for the given target. + + If file is set the emulator will load the file before starting. + + If pause is True the emulator is paused until the debugger is + connected. + + If debug is True the emulator is run in foreground with debug + output enabled. This is useful for seeing errors, traces, etc. + + If foreground is True the emulator is run in foreground otherwise + it is started in daemon mode. This is useful when there is + another process controlling the emulator's life cycle + (e.g. cuttlefish) + + args are passed directly to the emulator + + """ + if self._connector: + raise AlreadyRunning(self._wdir) + + if self._launcher is None: + self._launcher = self._get_launcher(target) + self._connector = self._launcher.start( + wdir=self._wdir, + target=target, + file=file, + pause=pause, + debug=debug, + foreground=foreground, + args=args, + ) + + def _c(self) -> Connector: + if self._connector is None: + self._connector = Connector.get(self._wdir) + if not self.running(): + raise NotRunning(self._wdir) + return self._connector + + def running(self) -> bool: + """Check if the main emulator process is already running.""" + + try: + return self._c().running() + except NotRunning: + return False + + def _path(self, name: Union[Path, str]) -> Union[Path, str]: + """Returns the full path for a given emulator file.""" + + return os.path.join(self._wdir, name) + + def stop(self): + """Stop the emulator.""" + + return self._c().stop() + + def get_gdb_remote(self) -> str: + """Return a string that can be passed to the target remote gdb + command. + + """ + + chan_type = self._c().get_channel_type('gdb') + + if chan_type == 'tcp': + host, port = self._c().get_channel_addr('gdb') + return f'{host}:{port}' + + if chan_type == 'pty': + return self._c().get_channel_path('gdb') + + raise InvalidChannelType(chan_type) + + def get_gdb_cmd(self) -> List[str]: + """Returns the gdb command for current target.""" + return self._c().get_gdb_cmd() + + def run_gdb_cmds( + self, + commands: List[str], + executable: Optional[str] = None, + pause: bool = False, + ) -> subprocess.CompletedProcess: + """Connect to the target and run the given commands silently + in batch mode. + + The executable is optional but it may be required by some gdb + commands. + + If pause is set do not continue execution after running the + given commands. + + """ + + cmd = self._c().get_gdb_cmd().copy() + if not cmd: + raise ConfigError(self._c().get_config_path(), 'gdb not configured') + + cmd.append('-batch-silent') + cmd.append('-ex') + cmd.append(f'target remote {self.get_gdb_remote()}') + for gdb_cmd in commands: + cmd.append('-ex') + cmd.append(gdb_cmd) + if pause: + cmd.append('-ex') + cmd.append('disconnect') + if executable: + cmd.append(executable) + return subprocess.run(cmd, capture_output=True) + + def reset(self) -> None: + """Perform a software reset.""" + self._c().reset() + + def list_properties(self, path: str) -> List[Dict]: + """Returns the property list for an emulator object. + + The object is identified by a full path. The path is target + specific and the format of the path is backend specific. + + qemu path example: /machine/unattached/device[10] + + renode path example: sysbus.uart + + """ + return self._c().list_properties(path) + + def set_property(self, path: str, prop: str, value: Any) -> None: + """Sets the value of an emulator's object property.""" + + self._c().set_property(path, prop, value) + + def get_property(self, path: str, prop: str) -> Any: + """Returns the value of an emulator's object property.""" + + return self._c().get_property(path, prop) + + def get_channel_type(self, name: str) -> str: + """Returns the channel type + + Currently `pty` or `tcp` are the only supported types. + + """ + + return self._c().get_channel_type(name) + + def get_channel_path(self, name: str) -> str: + """Returns the channel path. Raises InvalidChannelType if this + is not a pty channel. + + """ + + return self._c().get_channel_path(name) + + def get_channel_addr(self, name: str) -> tuple: + """Returns a pair of (host, port) for the channel. Raises + InvalidChannelType if this is not a tcp channel. + + """ + + return self._c().get_channel_addr(name) + + def get_channel_stream( + self, + name: str, + timeout: Optional[float] = None, + ) -> io.RawIOBase: + """Returns a file object for a given host exposed device. + + If timeout is None than reads and writes are blocking. If + timeout is zero the stream is operating in non-blocking + mode. Otherwise read and write will timeout after the given + value. + + """ + + return self._c().get_channel_stream(name, timeout) + + def get_channels(self) -> List[str]: + """Returns the list of available channels.""" + + return self._c().get_channels() + + def set_emu(self, emu: str) -> None: + """Set the emulator type for this instance.""" + + self._launcher = Launcher.get(emu, self._config_path) + + def cont(self) -> None: + """Resume the emulator's execution.""" + + self._c().cont() + + +class TemporaryEmulator(Emulator): + """Temporary emulator instances. + + Manages emulator instances that run in temporary working + directories. The emulator instance is stopped and the working + directory is cleared when the with block completes. + + It also supports interoperability with the pw emu cli, i.e. + starting the emulator with the CLI and controlling / interacting + with it from the API. + + Usage example: + + .. code-block:: python + + # programatically start and load an executable then access it + with TemporaryEmulator() as emu: + emu.start(target, file) + with emu.get_channel_stream(chan) as stream: + ... + + .. code-block:: python + + # or start it form the command line then access it + with TemporaryEmulator() as emu: + build.bazel( + ctx, + "run", + exec_path, + "--run_under=pw emu start --file " + ) + with emu.get_channel_stream(chan) as stream: + ... + + """ + + def __init__( + self, + config_path: Optional[Path] = None, + cleanup: bool = True, + ) -> None: + self._temp = tempfile.TemporaryDirectory() + self._cleanup = cleanup + super().__init__(Path(self._temp.name), config_path) + + def __enter__(self): + # Interoperability with pw emu cli. + os.environ["PW_EMU_WDIR"] = self._wdir + return self + + def __exit__(self, exc, value, traceback) -> None: + self.stop() + del os.environ["PW_EMU_WDIR"] + if self._cleanup: + self._temp.cleanup() diff --git a/pw_emu/py/tests/cli_test.py b/pw_emu/py/tests/cli_test.py new file mode 100644 index 0000000000..452f78a129 --- /dev/null +++ b/pw_emu/py/tests/cli_test.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Tests for the command line interface""" + +import os +import signal +import subprocess +import sys +import time +import unittest + +from pathlib import Path +from typing import List + +from mock_emu_frontend import _mock_emu +from tests.common import ConfigHelper + + +# TODO: b/301382004 - The Python Pigweed package install (into python-venv) +# races with running this test and there is no way to add that package as a test +# depedency without creating circular depedencies. This means we can't rely on +# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. +# +# Run the CLI directly instead of going through pw cli. +_cli_path = Path( + os.path.join(os.environ['PW_ROOT'], 'pw_emu', 'py', 'pw_emu', '__main__.py') +).resolve() +# Run the arm_gdb.py wrapper directly. +_arm_none_eabi_gdb_path = Path( + os.path.join( + os.environ['PW_ROOT'], + 'pw_env_setup', + 'py', + 'pw_env_setup', + 'entry_points', + 'arm_gdb.py', + ) +).resolve() + + +class TestCli(ConfigHelper): + """Test non-interactive commands""" + + _config = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'mock-emu': { + 'tcp_channel': True, + 'gdb_channel': True, + }, + 'gdb': _mock_emu + ['--exit', '--'], + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def _build_cmd(self, args: List[str]) -> List[str]: + cmd = [ + 'python', + str(_cli_path), + '--working-dir', + self._wdir.name, + '--config', + self._config_file, + ] + args + return cmd + + def _run(self, args: List[str], **kwargs) -> subprocess.CompletedProcess: + """Run the CLI and wait for completion""" + return subprocess.run(self._build_cmd(args), **kwargs) + + def _popen(self, args: List[str], **kwargs) -> subprocess.Popen: + """Run the CLI in the background""" + return subprocess.Popen(self._build_cmd(args), **kwargs) + + +class TestNonInteractive(TestCli): + """Test non interactive commands.""" + + def setUp(self) -> None: + super().setUp() + self.assertEqual(self._run(['start', 'test-target']).returncode, 0) + + def tearDown(self) -> None: + self.assertEqual(self._run(['stop']).returncode, 0) + super().tearDown() + + def test_already_running(self) -> None: + self.assertNotEqual(self._run(['start', 'test-target']).returncode, 0) + + def test_gdb_cmds(self) -> None: + status = self._run( + ['gdb-cmds', 'show version'], + ) + self.assertEqual(status.returncode, 0) + + def test_prop_ls(self) -> None: + status = self._run(['prop-ls', 'path1'], stdout=subprocess.PIPE) + self.assertEqual(status.returncode, 0) + self.assertTrue('prop1' in status.stdout.decode('ascii')) + status = self._run(['prop-ls', 'invalid path'], stdout=subprocess.PIPE) + self.assertNotEqual(status.returncode, 0) + + def test_prop_get(self) -> None: + status = self._run( + ['prop-get', 'invalid path', 'prop1'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-get', 'path1', 'invalid prop'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-get', 'path1', 'prop1'], + stdout=subprocess.PIPE, + ) + self.assertEqual(status.returncode, 0) + self.assertTrue('val1' in status.stdout.decode('ascii')) + + def test_prop_set(self) -> None: + status = self._run( + ['prop-set', 'invalid path', 'prop1', 'v'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-set', 'path1', 'invalid prop', 'v'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-set', 'path1', 'prop1', 'value'], + stdout=subprocess.PIPE, + ) + self.assertEqual(status.returncode, 0) + status = self._run( + ['prop-get', 'path1', 'prop1'], + stdout=subprocess.PIPE, + ) + self.assertEqual(status.returncode, 0) + self.assertTrue('value' in status.stdout.decode('ascii'), status.stdout) + + def test_reset(self) -> None: + self.assertEqual(self._run(['reset']).returncode, 0) + self.assertTrue(os.path.exists(os.path.join(self._wdir.name, 'reset'))) + + +class TestForeground(TestCli): + """Test starting in foreground""" + + def _test_common(self, cmd) -> None: + # Run the CLI process in a new session so that we can terminate both the + # CLI and the mock emulator it spawns in the foreground. + args = {} + if sys.platform == 'win32': + args['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + else: + args['start_new_session'] = True + proc = self._popen(cmd, stdout=subprocess.PIPE, **args) + assert proc.stdout + output = proc.stdout.readline() + self.assertTrue( + 'starting mock emulator' in output.decode('utf-8'), + output.decode('utf-8'), + ) + if sys.platform == 'win32': + # See https://bugs.python.org/issue26350 + os.kill(proc.pid, signal.CTRL_BREAK_EVENT) + else: + os.kill(-proc.pid, signal.SIGTERM) + proc.wait() + proc.stdout.close() + + def test_foreground(self) -> None: + self._test_common(['start', '--foreground', 'test-target']) + + def test_debug(self) -> None: + self._test_common(['start', '--debug', 'test-target']) + + +class TestInteractive(TestCli): + """Test interactive commands""" + + def setUp(self) -> None: + super().setUp() + self.assertEqual(self._run(['start', 'test-target']).returncode, 0) + + def tearDown(self) -> None: + self.assertEqual(self._run(['stop']).returncode, 0) + super().tearDown() + + @staticmethod + def _read_nonblocking(fd: int, size: int) -> bytes: + try: + return os.read(fd, size) + except BlockingIOError: + return b'' + + def test_term(self) -> None: + """Test the pw emu term command""" + + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + + # pylint: disable=import-outside-toplevel + # Can't import pty on win32. + import pty + + # pylint: disable=no-member + # Avoid pylint false positive on win32. + pid, fd = pty.fork() + if pid == 0: + status = self._run(['term', 'tcp']) + # pylint: disable=protected-access + # Use os._exit instead of os.exit after fork. + os._exit(status.returncode) + else: + expected = '--- Miniterm on tcp ---' + + # Read the expected string with a timeout. + os.set_blocking(fd, False) + deadline = time.monotonic() + 5 + data = self._read_nonblocking(fd, len(expected)) + while len(data) < len(expected): + time.sleep(0.1) + data += self._read_nonblocking(fd, len(expected) - len(data)) + if time.monotonic() > deadline: + break + self.assertTrue( + expected in data.decode('ascii'), + data + self._read_nonblocking(fd, 100), + ) + + # send CTRL + ']' to terminate miniterm + os.write(fd, b'\x1d') + + # wait for the process to exit, with a timeout + deadline = time.monotonic() + 5 + wait_pid, ret = os.waitpid(pid, os.WNOHANG) + while wait_pid == 0: + time.sleep(0.1) + # Discard input to avoid writer hang on MacOS, + # see https://github.com/python/cpython/issues/97001. + try: + self._read_nonblocking(fd, 100) + except OSError: + # Avoid read errors when the child pair of the pty + # closes when the child terminates. + pass + wait_pid, ret = os.waitpid(pid, os.WNOHANG) + if time.monotonic() > deadline: + break + self.assertEqual(wait_pid, pid) + self.assertEqual(ret, 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/pw_emu/py/tests/common.py b/pw_emu/py/tests/common.py index f0dcc3a3db..341a61ccb7 100644 --- a/pw_emu/py/tests/common.py +++ b/pw_emu/py/tests/common.py @@ -19,8 +19,11 @@ import tempfile import unittest +from pathlib import Path from typing import Any, Optional, Dict +from pw_emu.frontend import Emulator + class ConfigHelper(unittest.TestCase): """Helper that setups and tears down the configuration file""" @@ -39,3 +42,11 @@ def setUp(self) -> None: def tearDown(self) -> None: self._wdir.cleanup() os.unlink(self._config_file) + + +class ConfigHelperWithEmulator(ConfigHelper): + """Helper that setups and tears down the configuration file""" + + def setUp(self) -> None: + super().setUp() + self._emu = Emulator(Path(self._wdir.name), Path(self._config_file)) diff --git a/pw_emu/py/tests/frontend_test.py b/pw_emu/py/tests/frontend_test.py new file mode 100644 index 0000000000..36ea3d7b22 --- /dev/null +++ b/pw_emu/py/tests/frontend_test.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Emulator API tests.""" + +import unittest + +from typing import Any, Dict + +from pw_emu.core import ( + ConfigError, + InvalidChannelType, + InvalidProperty, + InvalidPropertyPath, +) +from mock_emu_frontend import _mock_emu +from tests.common import ConfigHelperWithEmulator + + +class TestEmulator(ConfigHelperWithEmulator): + """Test Emulator APIs.""" + + _config = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'mock-emu': { + 'gdb_channel': True, + }, + 'gdb': _mock_emu + ['--exit', '--'], + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._emu.start('test-target') + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_gdb_target_remote(self) -> None: + output = self._emu.run_gdb_cmds([]).stdout + host, port = self._emu.get_channel_addr('gdb') + self.assertTrue(f'{host}:{port}' in output.decode('utf-8')) + + def test_gdb_commands(self) -> None: + output = self._emu.run_gdb_cmds(['test_gdb_cmd']).stdout + self.assertTrue( + output and '-ex test_gdb_cmd' in output.decode('utf-8'), output + ) + + def test_gdb_executable(self) -> None: + output = self._emu.run_gdb_cmds([], 'test_gdb_exec').stdout + self.assertTrue('test_gdb_exec' in output.decode('utf-8')) + + def test_gdb_pause(self) -> None: + output = self._emu.run_gdb_cmds([], pause=True).stdout + self.assertTrue('-ex disconnect' in output.decode('utf-8')) + + # Minimal testing for APIs that are straight wrappers over Connector APIs. + def test_running(self) -> None: + self.assertTrue(self._emu.running()) + + def test_reset(self) -> None: + self._emu.reset() + + def test_cont(self) -> None: + self._emu.cont() + + def test_list_properties(self) -> None: + with self.assertRaises(InvalidPropertyPath): + self._emu.list_properties('invalid path') + self.assertEqual(self._emu.list_properties('path1'), ['prop1']) + + def test_get_property(self) -> None: + with self.assertRaises(InvalidProperty): + self._emu.get_property('path1', 'invalid property') + with self.assertRaises(InvalidPropertyPath): + self._emu.get_property('invalid path', 'prop1') + self.assertEqual(self._emu.get_property('path1', 'prop1'), 'val1') + + def test_set_property(self) -> None: + with self.assertRaises(InvalidPropertyPath): + self._emu.set_property('invalid path', 'prop1', 'test') + with self.assertRaises(InvalidProperty): + self._emu.set_property('path1', 'invalid property', 'test') + self._emu.set_property('path1', 'prop1', 'val2') + self.assertEqual(self._emu.get_property('path1', 'prop1'), 'val2') + + def test_get_channel_type(self) -> None: + self.assertEqual(self._emu.get_channel_type('gdb'), 'tcp') + + def test_get_channel_path(self) -> None: + with self.assertRaises(InvalidChannelType): + self._emu.get_channel_path('gdb') + + def test_get_channel_addr(self) -> None: + self.assertEqual(len(self._emu.get_channel_addr('gdb')), 2) + + def test_channel_stream(self) -> None: + with self._emu.get_channel_stream('gdb') as _: + pass + + +class TestGdbEmptyConfig(ConfigHelperWithEmulator): + """Check that ConfigError is raised when running gdb with an empty + gdb config. + + """ + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._emu.start('test-target') + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_gdb_config_error(self) -> None: + with self.assertRaises(ConfigError): + self._emu.run_gdb_cmds([]) + + +if __name__ == '__main__': + unittest.main()