Skip to content

Commit

Permalink
RPC: Console improvements. (#15916)
Browse files Browse the repository at this point in the history
- Add chip device toolbar for viewing device info including;
  VID, PID, SN, Pairing code/discrimator, fabric, node, pair state
- Add raw serial logger for debugging serial communication.
- Decode EFR32 log lines.
- Decode NFR32 log lines.
- Decode NXP log lines.
- Add raw logger for log streams which are not HDLC encoded.
- Add a helper scripts class, which currently just has a script to
  read the descriptor cluster.
  • Loading branch information
rgoliver authored and pull[bot] committed Aug 3, 2023
1 parent 1ee11e1 commit a5a43ac
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 21 deletions.
6 changes: 5 additions & 1 deletion examples/common/pigweed/rpc_console/py/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ pw_python_package("chip_rpc") {
install_requires = [ "ipython" ]
}
}
sources = [ "chip_rpc/console.py" ]
sources = [
"chip_rpc/console.py",
"chip_rpc/plugins/device_toolbar.py",
"chip_rpc/plugins/helper_scripts.py",
]
python_deps = [
"$dir_pw_console/py",
"$dir_pw_hdlc/py",
Expand Down
136 changes: 116 additions & 20 deletions examples/common/pigweed/rpc_console/py/chip_rpc/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,35 @@
rpcs - used to invoke RPCs
device - the serial device used for communication
client - the pw_rpc.Client
scripts - helper scripts for working with chip devices
protos - protocol buffer messages indexed by proto package
An example RPC command:
rpcs.chip.rpc.DeviceCommon.GetDeviceInfo()
"""

import json
import argparse
from typing import Callable
from collections import namedtuple
from inspect import cleandoc
import logging
import functools
import re
import socket
from concurrent.futures import ThreadPoolExecutor
import sys
import threading
from typing import Any, BinaryIO
import socket
from inspect import cleandoc
import serial # type: ignore
import re

from chip_rpc.plugins.device_toolbar import DeviceToolbar
from chip_rpc.plugins.helper_scripts import HelperScripts
import pw_cli.log
from pw_console import PwConsoleEmbed
from pw_console.__main__ import create_temp_log_file
from pw_console.pyserial_wrapper import SerialWithLogging
from pw_hdlc.rpc import HdlcRpcClient, default_channels
from pw_rpc import callback_client
from pw_rpc.console_tools.console import ClientInfo, flattened_rpc_completions


# Protos
from attributes_service import attributes_service_pb2
from button_service import button_service_pb2
Expand Down Expand Up @@ -103,6 +107,11 @@ def _parse_args():
default=sys.stdout.buffer,
help=('The file to which to write device output (HDLC channel 1); '
'provide - or omit for stdout.'))
parser.add_argument(
'-r',
'--raw_serial',
action="store_true",
help=('Use raw serial instead of HDLC/RPC'))
group.add_argument('-s',
'--socket-addr',
type=str,
Expand All @@ -111,12 +120,53 @@ def _parse_args():
return parser.parse_args()


def _start_ipython_terminal(client: HdlcRpcClient) -> None:
def _start_ipython_raw_terminal() -> None:
"""Starts an interactive IPython terminal with preset variables. This raw
terminal does not use HDLC and provides no RPC functionality, this is
just a serial log viewer."""
local_variables = dict(
LOG=_DEVICE_LOG,
)

welcome_message = cleandoc("""
Welcome to the CHIP Console!
This has been started in raw serial mode,
and all RPC functionality is disabled.
Press F1 for help.
""")

interactive_console = PwConsoleEmbed(
global_vars=local_variables,
local_vars=None,
loggers={
'Device Logs': [_DEVICE_LOG],
'Host Logs': [logging.getLogger()],
'Serial Debug': [logging.getLogger('pw_console.serial_debug_logger')],
},
repl_startup_message=welcome_message,
help_text=__doc__,
app_title="CHIP Console",
)

interactive_console.hide_windows('Host Logs')
interactive_console.hide_windows('Serial Debug')

# Setup Python logger propagation
interactive_console.setup_python_logging()
# Don't send device logs to the root logger.
_DEVICE_LOG.propagate = False
interactive_console.embed()


def _start_ipython_hdlc_terminal(client: HdlcRpcClient) -> None:
"""Starts an interactive IPython terminal with preset variables."""
local_variables = dict(
client=client,
channel_client=client.client.channel(1),
rpcs=client.client.channel(1).rpcs,
scripts=HelperScripts(client.client.channel(1).rpcs),
protos=client.protos.packages,
# Include the active pane logger for creating logs in the repl.
LOG=_DEVICE_LOG,
Expand All @@ -132,7 +182,7 @@ def _start_ipython_terminal(client: HdlcRpcClient) -> None:
Press F1 for help.
Example commands:
rpcs.chip.rpc.DeviceCommon.GetDeviceInfo()
rpcs.chip.rpc.Device.GetDeviceInfo()
LOG.warning('Message appears console log window.')
""")
Expand All @@ -143,13 +193,19 @@ def _start_ipython_terminal(client: HdlcRpcClient) -> None:
loggers={
'Device Logs': [_DEVICE_LOG],
'Host Logs': [logging.getLogger()],
'Serial Debug': [logging.getLogger('pw_console.serial_debug_logger')],
},
repl_startup_message=welcome_message,
help_text=__doc__,
app_title="CHIP Console",
)
interactive_console.hide_windows('Host Logs')

interactive_console.add_sentence_completer(completions)
interactive_console.add_bottom_toolbar(
DeviceToolbar(client.client.channel(1).rpcs))

interactive_console.hide_windows('Host Logs')
interactive_console.hide_windows('Serial Debug')

# Setup Python logger propagation
interactive_console.setup_python_logging()
Expand Down Expand Up @@ -183,12 +239,32 @@ def write_to_output(data: bytes,
unused_output: BinaryIO = sys.stdout.buffer,):
log_line = data
RegexStruct = namedtuple('RegexStruct', 'platform type regex match_num')
LEVEL_MAPPING = {"I": logging.INFO, "W": logging.WARNING,
"E": logging.ERROR, "F": logging.FATAL, "V": logging.DEBUG, "D": logging.DEBUG}
LEVEL_MAPPING = {"I": logging.INFO, "W": logging.WARNING, "P": logging.INFO,
"E": logging.ERROR, "F": logging.FATAL, "V": logging.DEBUG, "D": logging.DEBUG,
"<inf>": logging.INFO, "<dbg>": logging.DEBUG, "<err>": logging.ERROR,
"<info >": logging.INFO, "<warn >": logging.WARNING,
"<error >": logging.ERROR, "<detail>": logging.DEBUG}

ESP_CHIP_REGEX = r"(?P<level>[IWEFV]) \((?P<time>\d+)\) (?P<mod>chip\[[a-zA-Z]+\]):\s(?P<msg>.*)"
ESP_APP_REGEX = r"(?P<level>[IWEFVD]) \((?P<time>\d+)\) (?P<mod>[a-z\-_A-Z]+):\s(?P<msg>.*)"

EFR_CHIP_REGEX = r"(?P<level><detail>|<info >|<error >|<warn >)\s(?P<mod>\[[a-zA-Z\-]+\])\s(?P<msg>.*)"
EFR_APP_REGEX = r"<efr32 >\s(?P<msg>.*)"

NRF_CHIP_REGEX = r"\[(?P<time>\d+)\] (?P<level><inf>|<dbg>|<err>) chip.*: \[(?P<mod>[a-z\-A-Z]+)\](?P<msg>.*)"
NRF_APP_REGEX = r"\[(?P<time>\d+)\] (?P<level><inf>|<dbg>|<err>) (?P<msg>.*)"

NXP_CHIP_REGEX = r"\[(?P<time>\d+)\]\[(?P<level>[EPDF])\]\[(?P<mod>[a-z\-A-Z]+)\](?P<msg>.*)"
NXP_APP_REGEX = r"\[(?P<time>\d+)\]\[(?P<mod>[a-z\-A-Z]+)\](?P<msg>.*)"

LogRegexes = [RegexStruct("ESP", "CHIP", re.compile(ESP_CHIP_REGEX), 4),
RegexStruct("ESP", "APP", re.compile(ESP_APP_REGEX), 4)
RegexStruct("ESP", "APP", re.compile(ESP_APP_REGEX), 4),
RegexStruct("EFR", "CHIP", re.compile(EFR_CHIP_REGEX), 3),
RegexStruct("EFR", "APP", re.compile(EFR_APP_REGEX), 1),
RegexStruct("NRF", "CHIP", re.compile(NRF_CHIP_REGEX), 4),
RegexStruct("NRF", "APP", re.compile(NRF_APP_REGEX), 3),
RegexStruct("NXP", "CHIP", re.compile(NXP_CHIP_REGEX), 4),
RegexStruct("NXP", "APP", re.compile(NXP_APP_REGEX), 3)
]
for line in log_line.decode(errors="surrogateescape").splitlines():
fields = {'level': logging.INFO, "time": "",
Expand All @@ -205,8 +281,20 @@ def write_to_output(data: bytes,
"time": fields["time"], "type": fields["type"], "mod": fields["mod"]}})


def _read_raw_serial(read: Callable[[], bytes], output):
"""Continuously read and pass to output."""
with ThreadPoolExecutor() as executor:
while True:
try:
data = read()
except Exception as exc: # pylint: disable=broad-except
continue
if data:
output(data)


def console(device: str, baudrate: int,
socket_addr: str, output: Any) -> int:
socket_addr: str, output: Any, raw_serial: bool) -> int:
"""Starts an interactive RPC console for HDLC."""
# argparse.FileType doesn't correctly handle '-' for binary files.
if output is sys.stdout:
Expand All @@ -215,8 +303,10 @@ def console(device: str, baudrate: int,
logfile = create_temp_log_file()
pw_cli.log.install(logging.INFO, True, False, logfile)

serial_impl = SerialWithLogging

if socket_addr is None:
serial_device = serial.Serial(device, baudrate, timeout=1)
serial_device = serial_impl(device, baudrate, timeout=1)
def read(): return serial_device.read(8192)
write = serial_device.write
else:
Expand All @@ -233,11 +323,17 @@ def read(): return serial_device.read(8192)
default_stream_timeout_s=None,
)

_start_ipython_terminal(
HdlcRpcClient(read, PROTOS, default_channels(write),
lambda data: write_to_output(data, output),
client_impl=callback_client_impl)
)
if raw_serial:
threading.Thread(target=_read_raw_serial,
daemon=True,
args=(read, write_to_output)).start()
_start_ipython_raw_terminal()
else:
_start_ipython_hdlc_terminal(
HdlcRpcClient(read, PROTOS, default_channels(write),
lambda data: write_to_output(data, output),
client_impl=callback_client_impl)
)
return 0


Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" Toolbar plugin for viewing device status in console.
Uses the Device RPC service to read device info and state and display it
in the toolbar whenever the 'Refresh' button is pressed.
"""

from prompt_toolkit.layout import WindowAlign

from pw_console.plugin_mixin import PluginMixin
from pw_console.widgets import ToolbarButton, WindowPaneToolbar


class DeviceToolbar(WindowPaneToolbar, PluginMixin):
"""Toolbar for displaying Matter device info."""
TOOLBAR_HEIGHT = 1

def _format_field(self, name, value):
return [('class:theme-bg-active class:theme-fg-active', '{}:'.format(name)),
('class:theme-bg-active class:theme-fg-cyan', '{}'.format(value)),
('', ' ')]

def _update_toolbar_text(self):
""" Read the device info using RPCs, and populate the toolbar values.
"""
tokens = []
self.plugin_logger.debug('DeviceToolbar _update_toolbar_text')

status, device_info = self.rpcs.chip.rpc.Device.GetDeviceInfo()
if not status.ok():
self.formatted_text = [self._format_field(
"ERROR GetDeviceInfo", status.name)]
return

tokens.extend(self._format_field("VID", device_info.vendor_id))
tokens.extend(self._format_field("PID", device_info.product_id))
tokens.extend(self._format_field("SN", device_info.serial_number))

status, pairing = self.rpcs.chip.rpc.Device.GetPairingState()
if not status.ok():
self.formatted_text = [
self._format_field("ERROR GetPairingState", status.name)]
return

if pairing:
self.formatted_state = ('class:theme-fg-blue', "PAIRING")
tokens.extend(self._format_field(
"Pairing Code", device_info.pairing_info.code))
tokens.extend(self._format_field(
"Pairing Discriminator", device_info.pairing_info.discriminator))
else:
status, device_state = self.rpcs.chip.rpc.Device.GetDeviceState()
if not status.ok():
self.formatted_text = [
self._format_field("ERROR GetDeviceState", status.name)]
return

if len(device_state.fabric_info) == 0 or device_state.fabric_info[0].node_id == 0:
self.formatted_state = ('class:theme-fg-blue', "DISCONNECTED")
else:
self.formatted_state = ('class:theme-fg-blue', "CONNECTED")
# Only show the first fabric info if multiple.
tokens.extend(self._format_field(
"Fabric", device_info.fabric_state[0].fabric_id))
tokens.extend(self._format_field(
"Node", device_info.fabric_state[0].node_id))
self.formatted_text = tokens

def get_left_text_tokens(self):
"""Formatted text to display on the far left side."""
return self.formatted_text

def get_right_text_tokens(self):
"""Formatted text to display on the far right side."""
return [self.formatted_state]

def __init__(self, rpcs, *args, **kwargs):
super().__init__(*args,
center_section_align=WindowAlign.RIGHT,
**kwargs)

self.rpcs = rpcs

self.show_toolbar = True
self.formatted_text = []
self.formatted_state = ('class:theme-fg-blue',
'MATTER Toolbar')

# Buttons for display in the center
self.add_button(
ToolbarButton(description='Refresh',
mouse_handler=self._update_toolbar_text))

# Set plugin options
self.plugin_init(
plugin_logger_name='chip_rpc_device_toolbar',
)
Loading

0 comments on commit a5a43ac

Please sign in to comment.