diff --git a/scripts/pylib/twister/twisterlib/environment.py b/scripts/pylib/twister/twisterlib/environment.py index dcfc98ab4b2041c..2bbdbfc3665dc64 100644 --- a/scripts/pylib/twister/twisterlib/environment.py +++ b/scripts/pylib/twister/twisterlib/environment.py @@ -832,6 +832,18 @@ def parse_arguments(parser, args, options = None, on_init=True): logger.error("west-flash requires device-testing to be enabled") sys.exit(1) + if options.device_serial_pty and options.device_serial_pty == "rtt": + if options.west_flash is None: + logger.error("--device-serial-pty rtt requires --west-flash") + sys.exit(1) + + # add the following options + options.extra_args += ['CONFIG_USE_SEGGER_RTT=y', + 'CONFIG_RTT_CONSOLE=y', 'CONFIG_CONSOLE=y', + # This option is needed to ensure the uart console is not selected + # when CONFIG_RTT_CONSOLE is enabled due to #81798 + 'CONFIG_UART_CONSOLE=n'] + if not options.testsuite_root: # if we specify a test scenario which is part of a suite directly, do # not set testsuite root to default, just point to the test directory diff --git a/scripts/pylib/twister/twisterlib/handlers.py b/scripts/pylib/twister/twisterlib/handlers.py index 1c92e9f5b6dc25d..2fd466d5d06d853 100755 --- a/scripts/pylib/twister/twisterlib/handlers.py +++ b/scripts/pylib/twister/twisterlib/handlers.py @@ -16,6 +16,7 @@ import select import shlex import signal +import stat import subprocess import sys import threading @@ -684,6 +685,23 @@ def _get_serial_device(self, serial_pty, hardware_serial): return serial_device, ser_pty_process + def _create_serial_pty_script(self, runner): + serial_pty = self.build_dir + '/rtt.sh' + + rtt_cmd = f'west -qqqqq rtt -d {self.build_dir} --skip-rebuild --rtt-quiet' + if runner: + rtt_cmd += f' -r {runner}' + + with open(serial_pty, 'w') as f: + f.write(f'#!/bin/sh\n{rtt_cmd}\n') + + st = os.stat(serial_pty) + os.chmod(serial_pty, st.st_mode | stat.S_IEXEC) + + logger.debug(f'RTT command is "{rtt_cmd}"') + + return serial_pty + def handle(self, harness): runner = None hardware = self.get_hardware() @@ -695,9 +713,14 @@ def handle(self, harness): runner = hardware.runner or self.options.west_runner serial_pty = hardware.serial_pty - serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial) + if serial_pty == 'rtt': + serial_pty = self._create_serial_pty_script(runner) + logger.debug(f'Created RTT script {serial_pty}') - logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud") + if not hardware.flash_before: + serial_device, ser_pty_process = self._get_serial_device( + serial_pty, hardware.serial) + logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud") command = self._create_command(runner, hardware) @@ -716,28 +739,27 @@ def handle(self, harness): if hardware.flash_with_test: flash_timeout += self.get_test_timeout() - serial_port = None - if hardware.flash_before is False: - serial_port = serial_device - - try: - ser = self._create_serial_connection( - hardware, - serial_port, - hardware.baud, - flash_timeout, - serial_pty, - ser_pty_process - ) - except serial.SerialException: - return + halt_monitor_evt = None + t = None + if not hardware.flash_before: + try: + ser = self._create_serial_connection( + hardware, + serial_device, + hardware.baud, + flash_timeout, + serial_pty, + ser_pty_process + ) + except serial.SerialException: + return - halt_monitor_evt = threading.Event() + halt_monitor_evt = threading.Event() - t = threading.Thread(target=self.monitor_serial, daemon=True, - args=(ser, halt_monitor_evt, harness)) - start_time = time.time() - t.start() + t = threading.Thread(target=self.monitor_serial, daemon=True, + args=(ser, halt_monitor_evt, harness)) + start_time = time.time() + t.start() d_log = "{}/device.log".format(self.instance.build_dir) logger.debug('Flash command: %s', command) @@ -756,7 +778,8 @@ def handle(self, harness): flash_error = True with open(d_log, "w") as dlog_fp: dlog_fp.write(stderr.decode()) - halt_monitor_evt.set() + if halt_monitor_evt: + halt_monitor_evt.set() except subprocess.TimeoutExpired: logger.warning("Flash operation timed out.") self.terminate(proc) @@ -769,7 +792,8 @@ def handle(self, harness): dlog_fp.write(stderr.decode()) except subprocess.CalledProcessError: - halt_monitor_evt.set() + if halt_monitor_evt: + halt_monitor_evt.set() self.instance.status = TwisterStatus.ERROR self.instance.reason = "Device issue (Flash error)" flash_error = True @@ -780,28 +804,45 @@ def handle(self, harness): timeout = script_param.get("post_flash_timeout", timeout) self.run_custom_script(post_flash_script, timeout) - # Connect to device after flashing it if hardware.flash_before: + serial_device, ser_pty_process = self._get_serial_device( + serial_pty, hardware.serial) + logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud") + try: - logger.debug(f"Attach serial device {serial_device} @ {hardware.baud} baud") - ser.port = serial_device - ser.open() + ser = self._create_serial_connection( + hardware, + serial_device, + hardware.baud, + flash_timeout, + serial_pty, + ser_pty_process + ) except serial.SerialException as e: self._handle_serial_exception(e, hardware, serial_pty, ser_pty_process) return + halt_monitor_evt = threading.Event() + + t = threading.Thread(target=self.monitor_serial, daemon=True, + args=(ser, halt_monitor_evt, harness)) + start_time = time.time() + t.start() + if not flash_error: # Always wait at most the test timeout here after flashing. - t.join(self.get_test_timeout()) + if t: + t.join(self.get_test_timeout()) else: # When the flash error is due exceptions, # twister tell the monitor serial thread # to close the serial. But it is necessary # for this thread being run first and close # have the change to close the serial. - t.join(0.1) + if t: + t.join(0.1) - if t.is_alive(): + if t and t.is_alive(): logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name)) if ser.isOpen():