From b532bdac304158b4890df8e2d0013d6dc03e8417 Mon Sep 17 00:00:00 2001 From: "regicidal.plutophage" <36969337+regicidalplutophage@users.noreply.github.com> Date: Wed, 18 Dec 2024 20:30:18 +0300 Subject: [PATCH] hid watchdog --- kmk/hid.py | 806 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 494 insertions(+), 312 deletions(-) diff --git a/kmk/hid.py b/kmk/hid.py index 7d65409a2..2817ffc28 100644 --- a/kmk/hid.py +++ b/kmk/hid.py @@ -1,371 +1,553 @@ -import supervisor -import usb_hid -from micropython import const - -from storage import getmount - -from kmk.keys import ConsumerKey, KeyboardKey, ModifierKey, MouseKey -from kmk.utils import Debug, clamp - try: - from adafruit_ble import BLERadio - from adafruit_ble.advertising.standard import ProvideServicesAdvertisement - from adafruit_ble.services.standard.hid import HIDService + from typing import Callable, Optional except ImportError: - # BLE not supported on this platform pass +from collections import namedtuple +from keypad import Event as KeyEvent + +from kmk.hid import BLEHID, USBHID, AbstractHID, HIDModes +from kmk.keys import KC, Key +from kmk.modules import Module +from kmk.scanners.keypad import MatrixScanner +from kmk.scheduler import Task, cancel_task, create_task, get_due_task +from kmk.utils import Debug + +debug = Debug('kmk.keyboard') + +KeyBufferFrame = namedtuple( + 'KeyBufferFrame', ('key', 'is_pressed', 'int_coord', 'index') +) + + +def debug_error(module, message: str, error: Exception): + if debug.enabled: + debug( + message, ': ', error.__class__.__name__, ': ', error, name=module.__module__ + ) + + +class Sandbox: + matrix_update = None + secondary_matrix_update = None + active_layers = None + + +class KMKKeyboard: + def __init__( + self, + keymap=[], + coord_mapping=None, + matrix=None, + modules=[], + extensions=[], + ): + self.keymap = keymap + self.coord_mapping = coord_mapping + self.matrix = matrix + self.modules = modules + self.extensions = extensions + + ##### + # User-configurable + keymap = [] + coord_mapping = None + + row_pins = None + col_pins = None + diode_orientation = None + matrix = None + + modules = [] + extensions = [] + sandbox = Sandbox() + + ##### + # Internal State + keys_pressed = set() + axes = set() + _coordkeys_pressed = {} + hid_type = HIDModes.USB + secondary_hid_type = None + _hid_helper = None + _hid_send_enabled = False + hid_pending = False + matrix_update = None + secondary_matrix_update = None + matrix_update_queue = [] + _trigger_powersave_enable = False + _trigger_powersave_disable = False + _go_args = None + _resume_buffer = [] + _resume_buffer_x = [] + + # this should almost always be PREpended to, replaces + # former use of reversed_active_layers which had pointless + # overhead (the underlying list was never used anyway) + active_layers = [0] + + def __repr__(self) -> str: + return self.__class__.__name__ + + def _send_hid(self) -> None: + if not self._hid_send_enabled: + return -debug = Debug(__name__) - - -class HIDModes: - NOOP = 0 # currently unused; for testing? - USB = 1 - BLE = 2 - - ALL_MODES = (NOOP, USB, BLE) - - -class HIDReportTypes: - KEYBOARD = 1 - MOUSE = 2 - CONSUMER = 3 - SYSCONTROL = 4 - - -class HIDUsage: - KEYBOARD = 0x06 - MOUSE = 0x02 - CONSUMER = 0x01 - SYSCONTROL = 0x80 - - -class HIDUsagePage: - CONSUMER = 0x0C - KEYBOARD = MOUSE = SYSCONTROL = 0x01 - - -HID_REPORT_SIZES = { - HIDReportTypes.KEYBOARD: 8, - HIDReportTypes.MOUSE: 4, - HIDReportTypes.CONSUMER: 2, - HIDReportTypes.SYSCONTROL: 8, # TODO find the correct value for this -} - - -class AbstractHID: - REPORT_BYTES = 8 - - def __init__(self, **kwargs): - - self._evt = bytearray(self.REPORT_BYTES) - self._evt[0] = HIDReportTypes.KEYBOARD - self._nkro = False + if debug.enabled: + if self.keys_pressed: + debug('keys_pressed=', self.keys_pressed) + if self.axes: + debug('axes=', self.axes) - # bodgy NKRO autodetect + self._hid_helper.create_report(self.keys_pressed, self.axes) try: - self.hid_send(self._evt) - if debug.enabled: - debug('use 6KRO') - except ValueError: - self.REPORT_BYTES = 17 - self._evt = bytearray(self.REPORT_BYTES) - self._evt[0] = HIDReportTypes.KEYBOARD - self._nkro = True - if debug.enabled: - debug('use NKRO') - - self._prev_evt = bytearray(self.REPORT_BYTES) + self._hid_helper.send() + except Exception as err: + debug_error(self._hid_helper, 'send', err) - # Landmine alert for HIDReportTypes.KEYBOARD: byte index 1 of this view - # is "reserved" and evidently (mostly?) unused. However, other modes (or - # at least consumer, so far) will use this byte, which is the main reason - # this view exists. For KEYBOARD, use report_mods and report_non_mods - self.report_keys = memoryview(self._evt)[1:] + self.hid_pending = False - self.report_mods = memoryview(self._evt)[1:2] - self.report_non_mods = memoryview(self._evt)[3:] + for axis in self.axes: + axis.move(self, 0) - self._cc_report = bytearray(HID_REPORT_SIZES[HIDReportTypes.CONSUMER] + 1) - self._cc_report[0] = HIDReportTypes.CONSUMER - self._cc_pending = False + def _handle_matrix_report(self, kevent: KeyEvent) -> None: + if kevent is not None: + self._on_matrix_changed(kevent) - self._pd_report = bytearray(HID_REPORT_SIZES[HIDReportTypes.MOUSE] + 1) - self._pd_report[0] = HIDReportTypes.MOUSE - self._pd_pending = False - - # bodgy pointing device panning autodetect + def _find_key_in_map(self, int_coord: int) -> Key: try: - self.hid_send(self._pd_report) - if debug.enabled: - debug('use no pan') + idx = self.coord_mapping.index(int_coord) except ValueError: - self._pd_report = bytearray(6) - self._pd_report[0] = HIDReportTypes.MOUSE if debug.enabled: - debug('use pan') - except KeyError: - if debug.enabled: - debug('mouse disabled') - - def __repr__(self): - return f'{self.__class__.__name__}(REPORT_BYTES={self.REPORT_BYTES})' - - def create_report(self, keys_pressed, axes): - self.clear_all() - - for key in keys_pressed: - if isinstance(key, KeyboardKey): - self.add_key(key) - elif isinstance(key, ModifierKey): - self.add_modifier(key) - elif isinstance(key, ConsumerKey): - self.add_cc(key) - elif isinstance(key, MouseKey): - self.add_pd(key) - - for axis in axes: - self.move_axis(axis) - - def hid_send(self, evt): - # Don't raise a NotImplementedError so this can serve as our "dummy" HID - # when MCU/board doesn't define one to use (which should almost always be - # the CircuitPython-targeting one, except when unit testing or doing - # something truly bizarre. This will likely change eventually when Bluetooth - # is added) - pass - - def send(self): - if self._evt != self._prev_evt: - self._prev_evt[:] = self._evt - self.hid_send(self._evt) - - if self._cc_pending: - self.hid_send(self._cc_report) - self._cc_pending = False - - if self._pd_pending: - self.hid_send(self._pd_report) - self._pd_pending = False + debug('no such int_coord: ', int_coord) + return None + + key = None + for layer in self.active_layers: + try: + key = self.keymap[layer][idx] + except IndexError: + if debug.enabled: + debug('keymap IndexError: idx=', idx, ' layer=', layer) + + if key and key != KC.TRNS: + break + + return key + + def _on_matrix_changed(self, kevent: KeyEvent) -> None: + int_coord = kevent.key_number + is_pressed = kevent.pressed + + key = None + if not is_pressed: + try: + key = self._coordkeys_pressed[int_coord] + except KeyError: + if debug.enabled: + debug('release w/o press: ', int_coord) + + if key is None: + key = self._find_key_in_map(int_coord) + + if key is None: + return - return self + if debug.enabled: + debug(kevent, ': ', key) - def clear_all(self): - for idx, _ in enumerate(self.report_keys): - self.report_keys[idx] = 0x00 + self.pre_process_key(key, is_pressed, int_coord) - self.remove_cc() - self.remove_pd() - self.clear_axis() + def _process_resume_buffer(self): + ''' + Resume the processing of buffered, delayed, deferred, etc. key events + emitted by modules. - return self + We use a copy of the `_resume_buffer` as a working buffer. The working + buffer holds all key events in the correct order for processing. If + during processing new events are pushed to the `_resume_buffer`, they + are prepended to the working buffer (which may not be emptied), in + order to preserve key event order. + We also double-buffer `_resume_buffer` with `_resume_buffer_x`, only + copying the reference to hopefully safe some time on allocations. + ''' - def clear_non_modifiers(self): - for idx, _ in enumerate(self.report_non_mods): - self.report_non_mods[idx] = 0x00 + buffer, self._resume_buffer = self._resume_buffer, self._resume_buffer_x - return self + while buffer: + ksf = buffer.pop(0) + key = ksf.key - def add_modifier(self, modifier): - if isinstance(modifier, ModifierKey): - self.report_mods[0] |= modifier.code - else: - self.report_mods[0] |= modifier + # Handle any unaccounted-for layer shifts by looking up the key resolution again. + if ksf.int_coord is not None: + key = self._find_key_in_map(ksf.int_coord) - return self + # Resume the processing of the key event and update the HID report + # when applicable. + self.pre_process_key(key, ksf.is_pressed, ksf.int_coord, ksf.index) - def remove_modifier(self, modifier): - if isinstance(modifier, ModifierKey): - self.report_mods[0] ^= modifier.code - else: - self.report_mods[0] ^= modifier + if self.hid_pending: + self._send_hid() + self.hid_pending = False - return self + # Any newly buffered key events must be prepended to the working + # buffer. + if self._resume_buffer: + self._resume_buffer.extend(buffer) + buffer.clear() + buffer, self._resume_buffer = self._resume_buffer, buffer - def add_key(self, key): - if not self._nkro: - # Try to find the first empty slot in the key report, and fill it - idx = self._evt.find(b'\x00', 3) + self._resume_buffer_x = buffer - if idx < len(self._evt): - self._evt[idx] = key.code + @property + def debug_enabled(self) -> bool: + return debug.enabled + + @debug_enabled.setter + def debug_enabled(self, enabled: bool): + debug.enabled = enabled + + def pre_process_key( + self, + key: Key, + is_pressed: bool, + int_coord: Optional[int] = None, + index: int = 0, + ) -> None: + for module in self.modules[index:]: + try: + key = module.process_key(self, key, is_pressed, int_coord) + if key is None: + break + except Exception as err: + debug_error(module, 'process_key', err) + + if int_coord is not None: + if is_pressed: + self._coordkeys_pressed[int_coord] = key else: - # TODO what do we do here?...... - pass - else: - self.report_keys[(key.code >> 3) + 1] |= 1 << (key.code & 0x07) - - def remove_key(self, key): - if not self._nkro: - code = key.code.to_bytes(1, 'little') - idx = self._evt.find(code, 3) - self._evt[idx] = 0x00 - else: - self.report_keys[(key.code >> 3) + 1] &= ~(1 << (key.code & 0x07)) - - def add_cc(self, cc): - # Add (or write over) consumer control report. There can only be one CC - # active at any time. - memoryview(self._cc_report)[1:3] = cc.code.to_bytes(2, 'little') - self._cc_pending = True - - def remove_cc(self): - # Remove consumer control report. - report = memoryview(self._cc_report)[1:3] - if report != b'\x00\x00': - report[:] = b'\x00\x00' - self._cc_pending = True - - def add_pd(self, key): - self._pd_report[1] |= key.code - self._pd_pending = True - - def remove_pd(self): - if self._pd_report[1]: - self._pd_pending = True - self._pd_report[1] = 0x00 - - def move_axis(self, axis): - delta = clamp(axis.delta, -127, 127) - axis.delta -= delta - try: - self._pd_report[axis.code + 2] = 0xFF & delta - self._pd_pending = True - except IndexError: + try: + del self._coordkeys_pressed[int_coord] + except KeyError: + if debug.enabled: + debug('release w/o press:', int_coord) if debug.enabled: - debug('Axis(', axis.code, ') not supported') + debug('coordkeys_pressed=', self._coordkeys_pressed) - def clear_axis(self): - for idx in range(2, len(self._pd_report)): - self._pd_report[idx] = 0x00 + if key: + self.process_key(key, is_pressed, int_coord) - def has_key(self, key): - if isinstance(key, ModifierKey): - return bool(self.report_mods[0] & key.code) + def process_key( + self, key: Key, is_pressed: bool, int_coord: Optional[int] = None + ) -> None: + if is_pressed: + key.on_press(self, int_coord) else: - if not self._nkro: - code = key.code.to_bytes(1, 'little') - return self.report_non_mods.find(code) > 0 - else: - part = self.report_keys[(key.code >> 3) + 1] - return bool(part & (1 << (key.code & 0x07))) - return False - - -class USBHID(AbstractHID): - REPORT_BYTES = 9 - - def __init__(self, **kwargs): - - self.devices = {} - - for device in usb_hid.devices: - us = device.usage - up = device.usage_page + key.on_release(self, int_coord) + + def resume_process_key( + self, + module: Module, + key: Key, + is_pressed: bool, + int_coord: Optional[int] = None, + reprocess: Optional[bool] = False, + ) -> None: + index = self.modules.index(module) + (0 if reprocess else 1) + ksf = KeyBufferFrame( + key=key, is_pressed=is_pressed, int_coord=int_coord, index=index + ) + self._resume_buffer.append(ksf) + + def remove_key(self, keycode: Key) -> None: + self.process_key(keycode, False) + + def add_key(self, keycode: Key) -> None: + self.process_key(keycode, True) + + def tap_key(self, keycode: Key) -> None: + self.add_key(keycode) + # On the next cycle, we'll remove the key. + self.set_timeout(0, lambda: self.remove_key(keycode)) + + def set_timeout(self, after_ticks: int, callback: Callable[[None], None]) -> [Task]: + return create_task(callback, after_ms=after_ticks) + + def cancel_timeout(self, timeout_key: int) -> None: + cancel_task(timeout_key) + + def _process_timeouts(self) -> None: + for task in get_due_task(): + task() + + def _init_coord_mapping(self) -> None: + ''' + Attempt to sanely guess a coord_mapping if one is not provided. No-op + if `kmk.extensions.split.Split` is used, it provides equivalent + functionality in `on_bootup` + + To save RAM on boards that don't use Split, we don't import Split + and do an isinstance check, but instead do string detection + ''' + if any(x.__class__.__module__ == 'kmk.modules.split' for x in self.modules): + return - if up == HIDUsagePage.CONSUMER and us == HIDUsage.CONSUMER: - self.devices[HIDReportTypes.CONSUMER] = device - elif up == HIDUsagePage.KEYBOARD and us == HIDUsage.KEYBOARD: - self.devices[HIDReportTypes.KEYBOARD] = device - elif up == HIDUsagePage.MOUSE and us == HIDUsage.MOUSE: - self.devices[HIDReportTypes.MOUSE] = device - elif up == HIDUsagePage.SYSCONTROL and us == HIDUsage.SYSCONTROL: - self.devices[HIDReportTypes.SYSCONTROL] = device + if not self.coord_mapping: + cm = [] + for m in self.matrix: + cm.extend(m.coord_mapping) + self.coord_mapping = tuple(cm) + + def _init_hid(self) -> None: + if self.hid_type == HIDModes.NOOP: + self._hid_helper = AbstractHID + elif self.hid_type == HIDModes.USB: + self._hid_helper = USBHID + elif self.hid_type == HIDModes.BLE: + self._hid_helper = BLEHID + else: + self._hid_helper = AbstractHID + self._hid_helper = self._hid_helper(**self._go_args) + self._hid_send_enabled = True - super().__init__(**kwargs) + if debug.enabled: + debug('hid=', self._hid_helper) - def hid_send(self, evt): - if not supervisor.runtime.usb_connected: - return + def _deinit_hid(self) -> None: + self._hid_helper.clear_all() + self._hid_helper.send() - # int, can be looked up in HIDReportTypes - reporting_device_const = evt[0] + def _init_matrix(self) -> None: + if self.matrix is None: + self.matrix = MatrixScanner( + column_pins=self.col_pins, + row_pins=self.row_pins, + columns_to_anodes=self.diode_orientation, + ) - return self.devices[reporting_device_const].send_report(evt[1:]) + try: + self.matrix = tuple(iter(self.matrix)) + offset = 0 + for matrix in self.matrix: + matrix.offset = offset + offset += matrix.key_count + except TypeError: + self.matrix = (self.matrix,) + + if debug.enabled: + debug('matrix=', [_.__class__.__name__ for _ in self.matrix]) + + def during_bootup(self) -> None: + # Modules and extensions that fail `during_bootup` get removed from + # their respective lists. This serves as a self-check mechanism; any + # modules or extensions that initialize peripherals or data structures + # should do that in `during_bootup`. + for idx, module in enumerate(self.modules): + try: + module.during_bootup(self) + except Exception as err: + debug_error(module, 'during_bootup', err) + self.modules[idx] = None + + self.modules[:] = [_ for _ in self.modules if _] + + if debug.enabled: + debug('modules=', [_.__class__.__name__ for _ in self.modules]) + + for idx, ext in enumerate(self.extensions): + try: + ext.during_bootup(self) + except Exception as err: + debug_error(ext, 'during_bootup', err) + self.extensions[idx] = None + + self.extensions[:] = [_ for _ in self.extensions if _] + + if debug.enabled: + debug('extensions=', [_.__class__.__name__ for _ in self.extensions]) + + def before_matrix_scan(self) -> None: + for module in self.modules: + try: + module.before_matrix_scan(self) + except Exception as err: + debug_error(module, 'before_matrix_scan', err) + + for ext in self.extensions: + try: + ext.before_matrix_scan(self.sandbox) + except Exception as err: + debug_error(ext, 'before_matrix_scan', err) + + def after_matrix_scan(self) -> None: + for module in self.modules: + try: + module.after_matrix_scan(self) + except Exception as err: + debug_error(module, 'after_matrix_scan', err) + + for ext in self.extensions: + try: + ext.after_matrix_scan(self.sandbox) + except Exception as err: + debug_error(ext, 'after_matrix_scan', err) + + def before_hid_send(self) -> None: + for module in self.modules: + try: + module.before_hid_send(self) + except Exception as err: + debug_error(module, 'before_hid_send', err) + + for ext in self.extensions: + try: + ext.before_hid_send(self.sandbox) + except Exception as err: + debug_error(ext, 'before_hid_send', err) + + def after_hid_send(self) -> None: + for module in self.modules: + try: + module.after_hid_send(self) + except Exception as err: + debug_error(module, 'after_hid_send', err) + + for ext in self.extensions: + try: + ext.after_hid_send(self.sandbox) + except Exception as err: + debug_error(ext, 'after_hid_send', err) + + def powersave_enable(self) -> None: + for module in self.modules: + try: + module.on_powersave_enable(self) + except Exception as err: + debug_error(module, 'powersave_enable', err) + + for ext in self.extensions: + try: + ext.on_powersave_enable(self.sandbox) + except Exception as err: + debug_error(ext, 'powersave_enable', err) + + def powersave_disable(self) -> None: + for module in self.modules: + try: + module.on_powersave_disable(self) + except Exception as err: + debug_error(module, 'powersave_disable', err) + + for ext in self.extensions: + try: + ext.on_powersave_disable(self.sandbox) + except Exception as err: + debug_error(ext, 'powersave_disable', err) + + def deinit(self) -> None: + for module in self.modules: + try: + module.deinit(self) + except Exception as err: + debug_error(module, 'deinit', err) + + for ext in self.extensions: + try: + ext.deinit(self.sandbox) + except Exception as err: + debug_error(ext, 'deinit', err) + + def go(self, hid_type=HIDModes.USB, secondary_hid_type=None, **kwargs) -> None: + try: + self._init( + hid_type=hid_type, + secondary_hid_type=secondary_hid_type, + **kwargs, + ) + while True: + self._main_loop() + except Exception as err: + import traceback + traceback.print_exception(err) + finally: + debug('cleaning up...') + self._deinit_hid() + self.deinit() + debug('...done') -class BLEHID(AbstractHID): - BLE_APPEARANCE_HID_KEYBOARD = const(961) - # Hardcoded in CPy - MAX_CONNECTIONS = const(2) + if not debug.enabled: + import supervisor - def __init__(self, ble_name=str(getmount('/').label), **kwargs): + supervisor.reload() - self.ble_name = ble_name - self.ble = BLERadio() - self.ble.name = self.ble_name - self.hid = HIDService() - self.hid.protocol_mode = 0 # Boot protocol - super().__init__(**kwargs) + def _init( + self, + hid_type: HIDModes = HIDModes.USB, + secondary_hid_type: Optional[HIDModes] = None, + **kwargs, + ) -> None: + self._go_args = kwargs + self.hid_type = hid_type + self.secondary_hid_type = secondary_hid_type - # Security-wise this is not right. While you're away someone turns - # on your keyboard and they can pair with it nice and clean and then - # listen to keystrokes. - # On the other hand we don't have LESC so it's like shouting your - # keystrokes in the air - if not self.ble.connected or not self.hid.devices: - self.start_advertising() + if debug.enabled: + debug('Initialising ', self) - @property - def devices(self): - '''Search through the provided list of devices to find the ones with the - send_report attribute.''' - if not self.ble.connected: - return {} + self._init_hid() + self._init_matrix() + self._init_coord_mapping() + self.during_bootup() - result = {} + if debug.enabled: + import gc - for device in self.hid.devices: - if not hasattr(device, 'send_report'): - continue - us = device.usage - up = device.usage_page + gc.collect() + debug('mem_info used:', gc.mem_alloc(), ' free:', gc.mem_free()) - if up == HIDUsagePage.CONSUMER and us == HIDUsage.CONSUMER: - result[HIDReportTypes.CONSUMER] = device - continue + def _main_loop(self) -> None: + self.sandbox.active_layers = self.active_layers.copy() - if up == HIDUsagePage.KEYBOARD and us == HIDUsage.KEYBOARD: - result[HIDReportTypes.KEYBOARD] = device - continue + self.before_matrix_scan() - if up == HIDUsagePage.MOUSE and us == HIDUsage.MOUSE: - result[HIDReportTypes.MOUSE] = device - continue + self._process_resume_buffer() - if up == HIDUsagePage.SYSCONTROL and us == HIDUsage.SYSCONTROL: - result[HIDReportTypes.SYSCONTROL] = device - continue + for matrix in self.matrix: + update = matrix.scan_for_changes() + if update: + self.matrix_update = update + break + self.sandbox.matrix_update = self.matrix_update + self.sandbox.secondary_matrix_update = self.secondary_matrix_update - return result + self.after_matrix_scan() - def hid_send(self, evt): - if not self.ble.connected: - return + if self.secondary_matrix_update: + self.matrix_update_queue.append(self.secondary_matrix_update) + self.secondary_matrix_update = None - # int, can be looked up in HIDReportTypes - reporting_device_const = evt[0] + if self.matrix_update: + self.matrix_update_queue.append(self.matrix_update) + self.matrix_update = None - device = self.devices[reporting_device_const] + # only handle one key per cycle. + if self.matrix_update_queue: + self._handle_matrix_report(self.matrix_update_queue.pop(0)) - report_size = len(device._characteristic.value) - while len(evt) < report_size + 1: - evt.append(0) + self.before_hid_send() - return device.send_report(evt[1 : report_size + 1]) # noqa: E203 + if self.hid_pending: + self._send_hid() - def clear_bonds(self): - import _bleio + self._process_timeouts() - _bleio.adapter.erase_bonding() + if self.hid_pending: + self._send_hid() - def start_advertising(self): - if not self.ble.advertising: - advertisement = ProvideServicesAdvertisement(self.hid) - advertisement.appearance = self.BLE_APPEARANCE_HID_KEYBOARD + self.after_hid_send() - self.ble.start_advertising(advertisement) + if self._trigger_powersave_enable: + self.powersave_enable() - def stop_advertising(self): - self.ble.stop_advertising() + if self._trigger_powersave_disable: + self.powersave_disable()