Skip to content

Commit

Permalink
twister: support segger real-time-transfer (rtt) for serial_pty
Browse files Browse the repository at this point in the history
Support RTT (Segger Real-time Transfer) for reading console
messages with twister when testing with hardware.

Tested with:

twister -p nucleo_l496zg --device-testing --flash-before \
  --west-runner openocd --west-flash --device-serial-pty rtt \
  -T samples/hello_world

cat twister-out/nucleo_l496zg_stm32l496xx/samples/hello_world/\
sample.basic.helloworld/handler.log
*** Booting Zephyr OS build v4.0.0-749-gc9e567da3747 ***
Hello World! nucleo_l496zg/stm32l496xx

Signed-off-by: Chris Friedt <[email protected]>
  • Loading branch information
Chris Friedt committed Nov 26, 2024
1 parent e81d766 commit 04de997
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 31 deletions.
12 changes: 12 additions & 0 deletions scripts/pylib/twister/twisterlib/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,18 @@ def parse_arguments(parser, args, options = None, on_init=True):
logger.error("west-flash requires device-testing to be enabled")
sys.exit(1)

if options.device_serial_pty and options.device_serial_pty == "rtt":
if options.west_flash is None:
logger.error("--device-serial-pty rtt requires --west-flash")
sys.exit(1)

# add the following options
options.extra_args += ['CONFIG_USE_SEGGER_RTT=y',
'CONFIG_RTT_CONSOLE=y', 'CONFIG_CONSOLE=y',
# This option is needed to ensure the uart console is not selected
# when CONFIG_RTT_CONSOLE is enabled due to #81798
'CONFIG_UART_CONSOLE=n']

if not options.testsuite_root:
# if we specify a test scenario which is part of a suite directly, do
# not set testsuite root to default, just point to the test directory
Expand Down
103 changes: 72 additions & 31 deletions scripts/pylib/twister/twisterlib/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import select
import shlex
import signal
import stat
import subprocess
import sys
import threading
Expand Down Expand Up @@ -684,6 +685,23 @@ def _get_serial_device(self, serial_pty, hardware_serial):

return serial_device, ser_pty_process

def _create_serial_pty_script(self, runner):
serial_pty = self.build_dir + '/rtt.sh'

rtt_cmd = f'west -qqqqq rtt -d {self.build_dir} --skip-rebuild --rtt-quiet'
if runner:
rtt_cmd += f' -r {runner}'

with open(serial_pty, 'w') as f:
f.write(f'#!/bin/sh\n{rtt_cmd}\n')

st = os.stat(serial_pty)
os.chmod(serial_pty, st.st_mode | stat.S_IEXEC)

logger.debug(f'RTT command is "{rtt_cmd}"')

return serial_pty

def handle(self, harness):
runner = None
hardware = self.get_hardware()
Expand All @@ -695,9 +713,14 @@ def handle(self, harness):
runner = hardware.runner or self.options.west_runner
serial_pty = hardware.serial_pty

serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial)
if serial_pty == 'rtt':
serial_pty = self._create_serial_pty_script(runner)
logger.debug(f'Created RTT script {serial_pty}')

logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")
if not hardware.flash_before:
serial_device, ser_pty_process = self._get_serial_device(
serial_pty, hardware.serial)
logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")

command = self._create_command(runner, hardware)

Expand All @@ -716,28 +739,27 @@ def handle(self, harness):
if hardware.flash_with_test:
flash_timeout += self.get_test_timeout()

serial_port = None
if hardware.flash_before is False:
serial_port = serial_device

try:
ser = self._create_serial_connection(
hardware,
serial_port,
hardware.baud,
flash_timeout,
serial_pty,
ser_pty_process
)
except serial.SerialException:
return
halt_monitor_evt = None
t = None
if not hardware.flash_before:
try:
ser = self._create_serial_connection(
hardware,
serial_device,
hardware.baud,
flash_timeout,
serial_pty,
ser_pty_process
)
except serial.SerialException:
return

halt_monitor_evt = threading.Event()
halt_monitor_evt = threading.Event()

t = threading.Thread(target=self.monitor_serial, daemon=True,
args=(ser, halt_monitor_evt, harness))
start_time = time.time()
t.start()
t = threading.Thread(target=self.monitor_serial, daemon=True,
args=(ser, halt_monitor_evt, harness))
start_time = time.time()
t.start()

d_log = "{}/device.log".format(self.instance.build_dir)
logger.debug('Flash command: %s', command)
Expand All @@ -756,7 +778,8 @@ def handle(self, harness):
flash_error = True
with open(d_log, "w") as dlog_fp:
dlog_fp.write(stderr.decode())
halt_monitor_evt.set()
if halt_monitor_evt:
halt_monitor_evt.set()
except subprocess.TimeoutExpired:
logger.warning("Flash operation timed out.")
self.terminate(proc)
Expand All @@ -769,7 +792,8 @@ def handle(self, harness):
dlog_fp.write(stderr.decode())

except subprocess.CalledProcessError:
halt_monitor_evt.set()
if halt_monitor_evt:
halt_monitor_evt.set()
self.instance.status = TwisterStatus.ERROR
self.instance.reason = "Device issue (Flash error)"
flash_error = True
Expand All @@ -780,28 +804,45 @@ def handle(self, harness):
timeout = script_param.get("post_flash_timeout", timeout)
self.run_custom_script(post_flash_script, timeout)

# Connect to device after flashing it
if hardware.flash_before:
serial_device, ser_pty_process = self._get_serial_device(
serial_pty, hardware.serial)
logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")

try:
logger.debug(f"Attach serial device {serial_device} @ {hardware.baud} baud")
ser.port = serial_device
ser.open()
ser = self._create_serial_connection(
hardware,
serial_device,
hardware.baud,
flash_timeout,
serial_pty,
ser_pty_process
)
except serial.SerialException as e:
self._handle_serial_exception(e, hardware, serial_pty, ser_pty_process)
return

halt_monitor_evt = threading.Event()

t = threading.Thread(target=self.monitor_serial, daemon=True,
args=(ser, halt_monitor_evt, harness))
start_time = time.time()
t.start()

if not flash_error:
# Always wait at most the test timeout here after flashing.
t.join(self.get_test_timeout())
if t:
t.join(self.get_test_timeout())
else:
# When the flash error is due exceptions,
# twister tell the monitor serial thread
# to close the serial. But it is necessary
# for this thread being run first and close
# have the change to close the serial.
t.join(0.1)
if t:
t.join(0.1)

if t.is_alive():
if t and t.is_alive():
logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name))

if ser.isOpen():
Expand Down

0 comments on commit 04de997

Please sign in to comment.