diff --git a/.github/workflows/validate_and_release.yml b/.github/workflows/validate_and_release.yml index 7a53dbd..075a0c7 100644 --- a/.github/workflows/validate_and_release.yml +++ b/.github/workflows/validate_and_release.yml @@ -68,7 +68,7 @@ jobs: fetch-depth: 0 - uses: actions/setup-python@v4 with: - python-version: 3.8 + python-version: 3.11 - name: Semantic Release uses: python-semantic-release/python-semantic-release@v9.8.6 with: diff --git a/Makefile b/Makefile index ba68b10..d163db8 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ POETRY := $(shell command -v poetry 2> /dev/null) # he result so that we can use this as a target. SYSTEM_MIDI_REMOTE_SCRIPTS_DIR := $(shell ls -d /Applications/Ableton\ Live\ 12\ *.app/Contents/App-Resources/MIDI\ Remote\ Scripts 2> /dev/null | head -n 1 | sed 's/ /\\ /g') -TEST_PROJECT_SET_NAMES := backlight default overrides standalone wide_clip_launch +TEST_PROJECT_SET_NAMES := alt_initial_mode backlight default overrides standalone wide_clip_launch TEST_PROJECT_DIR := tests/modeStep_tests_project TEST_PROJECT_SETS := $(addprefix $(TEST_PROJECT_DIR)/, $(addsuffix .als, $(TEST_PROJECT_SET_NAMES))) @@ -30,7 +30,7 @@ check: .make.install __ext__/AbletonLive12_MIDIRemoteScripts/README.md .PHONY: test test: .make.install $(TEST_PROJECT_SETS) - $(POETRY) run pytest + $(POETRY) run pytest tests/ .PHONY: img img: .make.install @@ -52,7 +52,7 @@ $(TEST_PROJECT_DIR)/%.als: .make.install $(TEST_PROJECT_DIR)/create_set.py $(POETRY) run python $(TEST_PROJECT_DIR)/create_set.py $* touch $@ -.make.install: pyproject.toml poetry.lock +.make.install: poetry.lock @if [ -z $(POETRY) ]; then echo "Poetry could not be found. See https://python-poetry.org/docs/"; exit 2; fi $(POETRY) install touch $@ diff --git a/control_surface/__init__.py b/control_surface/__init__.py index 60597d3..cfe3a75 100644 --- a/control_surface/__init__.py +++ b/control_surface/__init__.py @@ -40,6 +40,7 @@ create_mappings, ) from .mixer import MixerComponent +from .ping import PingComponent from .recording import RecordingComponent from .scene import SceneComponent from .session import SessionComponent @@ -59,8 +60,13 @@ from .undo_redo import UndoRedoComponent from .view_control import ViewControlComponent +if typing.TYPE_CHECKING: + from typing_extensions import TypeAlias + logger = logging.getLogger(__name__) +T = typing.TypeVar("T") + def get_capabilities(): return { @@ -186,9 +192,9 @@ class Specification(ControlSurfaceSpecification): # Force the controller into standalone mode when exiting (this will be redundant if # a standalone mode is already active.) The disconnect program change message will # be appended below, if configured. - goodbye_messages: typing.Collection[ - typing.Tuple[int, ...] - ] = SYSEX_STANDALONE_MODE_ON_REQUESTS + goodbye_messages: typing.Collection[typing.Tuple[int, ...]] = ( + SYSEX_STANDALONE_MODE_ON_REQUESTS + ) send_goodbye_messages_last = True component_map = { @@ -196,6 +202,7 @@ class Specification(ControlSurfaceSpecification): "Device": create_device_component, "Hardware": HardwareComponent, "Mixer": MixerComponent, + "Ping": PingComponent, # The recording component has some special init in the default component map, # but we're overriding it. "Recording": RecordingComponent, @@ -215,6 +222,10 @@ class Specification(ControlSurfaceSpecification): create_mappings_function = create_mappings +Predicate: TypeAlias = typing.Callable[[T], bool] +MidiBytes: TypeAlias = typing.Tuple[int, ...] + + class modeStep(ControlSurface): def __init__(self, specification=Specification, *a, c_instance=None, **k): # A new control surface gets constructed when the song is changed, so we can @@ -248,7 +259,11 @@ def __init__(self, specification=Specification, *a, c_instance=None, **k): ] # Internal tracker during connect/reconnect events. - self._mode_after_identified = self._configuration.initial_mode + self.__mode_after_identified = self._configuration.initial_mode + + self.__suppressing_send_midi_predicate: typing.Optional[ + Predicate[MidiBytes] + ] = None # For hacking around the weird LED behavior when updating the backlight. self.__is_suppressing_hardware: bool = False @@ -284,18 +299,8 @@ def _create_elements(self, specification: ControlSurfaceSpecification): # type: def setup(self): super().setup() - hardware = self.component_map["Hardware"] - # Activate the background program before doing anything. The program change will - # get sent when the controller is placed into `_stanadlone_init_mode`. No-op if - # no background program has been set. - hardware.standalone_program = self._configuration.background_program - - # Turn on hosted mode by default, so it doesn't need to be specified explicitly - # in normal (non-standalone) mode layers. - hardware.standalone = False - - # Activate `_disabled` mode, which will enable the hardware controller in its - # `on_leave` callback. + # Activate `_disabled` mode, which will enable the hardware component when it + # exits. self.main_modes.selected_mode = DISABLED_MODE_NAME # Listen for backlight color values, to hack around the weird LED behavior when @@ -321,6 +326,35 @@ def _add_mode(self, mode_name, mode_spec, modes_component): if mode_name == DISABLED_MODE_NAME: modes_component.selected_mode = mode_name + # Prevent outgoing MIDI messages from being sent. + @contextmanager + def suppressing_send_midi( + self, + # If given, only suppress messages for which this returns True (i.e. only + # messages for which this returns False will be sent). + predicate: typing.Optional[Predicate[MidiBytes]] = None, + ): + last_predicate = self.__suppressing_send_midi_predicate + try: + self.__suppressing_send_midi_predicate = ( + (lambda _: True) if predicate is None else predicate + ) + + yield + finally: + self.__suppressing_send_midi_predicate = last_predicate + + def _do_send_midi(self, midi_event_bytes: MidiBytes): + if ( + self.__suppressing_send_midi_predicate is None + or not self.__suppressing_send_midi_predicate(midi_event_bytes) + ): + # logger.info(f"send MIDI: {midi_event_bytes}") + return super()._do_send_midi(midi_event_bytes) + + # logger.info(f"suppressed MIDI message: {midi_event_bytes}") + return False + def _create_identification(self, specification): identification = super()._create_identification(specification) assert self.__on_is_identified_changed_local @@ -329,81 +363,69 @@ def _create_identification(self, specification): def on_identified(self, response_bytes): super().on_identified(response_bytes) - logger.info("identified SoftStep 2 device") - # Cancel any pending timeout checks. - if not self._identity_response_timeout_task.is_killed: - self._identity_response_timeout_task.kill() + # We'll reach this point after the SoftStep successfully responds to an identity + # request, which gets sent any time `is_identified` is set to False, i.e. during + # startup and when port settings change. + # + # For port changes, we don't know for sure whether the SoftStep was disconnected + # (as opposed to a different device), so it's safest to always go through the + # full startup process. False positives will briefly interrupt the device if + # it's already connected, but some built-in control surfaces also have this + # issue. + logger.info("identified SoftStep 2 device") - # We'll reach this point on startup, as well as when MIDI ports change (due to - # hardware disconnects/reconnects or changes in the Live settings). Don't do - # anything unless we're currently in disabled mode, i.e. unless we're - # transitioning from a disconnected controller - there's no need to switch in - # and out of standalone mode otherwise. - if ( - self.main_modes.selected_mode is None - or self.main_modes.selected_mode == DISABLED_MODE_NAME - ): - # Force the controller into standalone mode, and send the standalone - # background program, if any. - self.main_modes.selected_mode = STANDALONE_INIT_MODE_NAME - - # After a short delay, load the main desired mode. This - # ensures that all MIDI messages for initialization in - # standalone mode get sent before the main mode begins to - # load, which avoids weird issues with MIDI batching etc. - if not self._on_identified_task.is_killed: - self._on_identified_task.kill() - self._on_identified_task.restart() - - # Invoked after a delay if an identity request is sent but no - # response is received. - def _on_identity_response_timeout(self): - # Store the mode that we should enable when/if the controller - # is (re-)connected. The checks ensure that the first time - # this is called (or if it's somehow called multiple times - # before callbacks have finished running), this case won't be - # reached and the mode variable will keep its current value. + # This should have already been run by the `is_identified` listener when the + # property was set to `False`, e.g. at the beginning of the port change + # event. But it's harmless to run it again, and this serves as a failsafe to + # enter disabled mode, forcing all relevant sysex/CC messages to be re-sent as + # we enter other modes. + self.__store_state_and_disable() + + # Next force the controller into standalone mode, and send the standalone + # background program (if any). + self.main_modes.selected_mode = STANDALONE_INIT_MODE_NAME + + # After a short delay, load the main desired mode. This ensures that all MIDI + # messages for initialization in standalone mode get sent before the main mode + # begins to load, which avoids weird issues with MIDI batching etc. + if not self._on_identified_task.is_killed: + self._on_identified_task.kill() + self._on_identified_task.restart() + + # Store any state needed to restore the controller to its current state (if it's + # active), and place the controller into disabled mode if it isn't already. + def __store_state_and_disable(self): + # If a mode is currently active (other than passthrough modes during startup), + # store it so it can be enabled when/if the controller is (re-)activated. if ( self.main_modes.selected_mode and self.main_modes.selected_mode != DISABLED_MODE_NAME and self.main_modes.selected_mode != STANDALONE_INIT_MODE_NAME ): - self._mode_after_identified = self.main_modes.selected_mode - - # Enter disabled mode, which relinquishes control of everything. This ensures - # that sysex state values will be invalidated (by disconnecting their control - # elements), and nothing will be bound when the controller is next identified, - # so we won't send a bunch of LED messages before placing it into hosted - # mode. (This could still happen if the controller were connected and - # disconnected quickly, but in any case it would just potentially mess with - # standalone-mode LEDs.) - self.main_modes.selected_mode = DISABLED_MODE_NAME + self.__mode_after_identified = self.main_modes.selected_mode + + if self.main_modes.selected_mode != DISABLED_MODE_NAME: + self.main_modes.selected_mode = DISABLED_MODE_NAME @listens("is_identified") def __on_is_identified_changed_local(self, is_identified: bool): - # This will trigger on startup, and whenever a new identity - # request is sent to an already-identified controller - # (e.g. when devices are connected/disconnected). If we don't - # get a timely response, we can assume the controller was - # physically disconnected. + logger.info(f"Is identified: {is_identified}") + # The positive case gets handled in `on_identified`. if not is_identified: - self._identity_response_timeout_task.restart() - - @lazy_attribute - def _identity_response_timeout_task(self): - assert self.specification - # The `identity_request_delay` is the delay before a second identity request is - # sent. Let this elapse twice before considering the SoftStep disconnected. - timeout = self.specification.identity_request_delay * 2 - identity_response_timeout_task = self._tasks.add( - task.sequence( - task.wait(timeout), - task.run(self._on_identity_response_timeout), - ) - ) - identity_response_timeout_task.kill() - return identity_response_timeout_task + # We'll reach this point on startup, and whenever the port settings change + # (e.g. when devices are connected or disconnected). + # + # We can't get any details about changes in port settings, so even if the + # SoftStep was previously identified, we don't know at this point whether + # it's connected. Disable the control surface immediately, to avoid a user + # mode potentially being active on reconnect before device setup has + # completed. + # + # Live will send one or more identity requests in the background. The + # control surface will be re-enabled when and if the SoftStep responds + # correctly. + self.__store_state_and_disable() @lazy_attribute def _on_identified_task(self): @@ -415,8 +437,8 @@ def _on_identified_task(self): def _after_identified(self): mode = ( - self._mode_after_identified - if self._mode_after_identified is not None + self.__mode_after_identified + if self.__mode_after_identified is not None else self._configuration.initial_mode ) self.main_modes.selected_mode = mode diff --git a/control_surface/clip_slot.py b/control_surface/clip_slot.py index 5440c92..b30496b 100644 --- a/control_surface/clip_slot.py +++ b/control_surface/clip_slot.py @@ -20,9 +20,9 @@ def __init__(self, *a, configuration: Optional[Configuration] = None, **k): assert configuration - self._launch_pressed_delayed_action: Optional[ - ClipSlotAction - ] = configuration.clip_long_press_action + self._launch_pressed_delayed_action: Optional[ClipSlotAction] = ( + configuration.clip_long_press_action + ) assert self.__on_launch_button_pressed_delayed self.__on_launch_button_pressed_delayed.subject = self.launch_button diff --git a/control_surface/configuration.py b/control_surface/configuration.py index 680a4a4..33bbfac 100644 --- a/control_surface/configuration.py +++ b/control_surface/configuration.py @@ -76,7 +76,7 @@ class Configuration(NamedTuple): wide_clip_launch: bool = False # Quantization settings. - quantize_to: Quantization = "sixtenth" + quantize_to: Quantization = "sixtenth" # [sic] quantize_amount: float = 1.0 # 1.0 is full quantization. # Whether to scroll the session ring along with scenes/tracks. diff --git a/control_surface/display.py b/control_surface/display.py index 8d91b91..00c9008 100644 --- a/control_surface/display.py +++ b/control_surface/display.py @@ -221,9 +221,9 @@ def notification_signal( ) -> Optional[ScrollingNotificationData]: # Original signal is the result of applying the notification's factory # method. - orig_notification_data: Optional[ - Union[str, NotificationData] - ] = orig_notification_signal(state, event) + orig_notification_data: Optional[Union[str, NotificationData]] = ( + orig_notification_signal(state, event) + ) if orig_notification_data is not None: # logger.info(f"got notification: {orig_notification_data}") @@ -340,7 +340,10 @@ def main_view(state) -> Content: # the main mode text after the edit window has been closed. timestamp = component_state.edit_window_updated_at - 0.01 - elif main_mode_category is MainModeCategory.standalone: + elif main_mode_category in ( + MainModeCategory.standalone, + MainModeCategory.hidden, + ): # Make sure the text stays as `None` to avoid any rendering. pass diff --git a/control_surface/elements/elements.py b/control_surface/elements/elements.py index 803e984..a3253ed 100644 --- a/control_surface/elements/elements.py +++ b/control_surface/elements/elements.py @@ -602,14 +602,14 @@ def _create_display(self): self.display = DisplayElement() def _create_sysex(self): - # Sysex toggles for device functions. + # Sysex toggles for device functions. Avoid setting default values for these, so + # that messages don't get triggered until we really want them to be. self.add_element( "backlight_sysex", SysexToggleElement, on_messages=[sysex.SYSEX_BACKLIGHT_ON_REQUEST], off_messages=[sysex.SYSEX_BACKLIGHT_OFF_REQUEST], optimized=True, - default_value=False, ) self.add_element( @@ -618,7 +618,6 @@ def _create_sysex(self): on_messages=sysex.SYSEX_STANDALONE_MODE_ON_REQUESTS, off_messages=sysex.SYSEX_STANDALONE_MODE_OFF_REQUESTS, optimized=True, - default_value=True, ) # Sysex input used by the test mode to check whether the control surface is diff --git a/control_surface/elements/slider.py b/control_surface/elements/slider.py index 3e09855..a66838e 100644 --- a/control_surface/elements/slider.py +++ b/control_surface/elements/slider.py @@ -114,7 +114,6 @@ def _update_popup(self): class LatchableSliderElement(ProcessedSliderElement): - """Generic slider control for immitating SoftStep's "live" sources, e.g. pressure or XY position. Latching can optionally be enabled. """ diff --git a/control_surface/hardware.py b/control_surface/hardware.py index 91435d5..7ac5603 100644 --- a/control_surface/hardware.py +++ b/control_surface/hardware.py @@ -1,10 +1,9 @@ import logging -from typing import Any, Callable, Optional, Union +from typing import Callable, Optional, Union from ableton.v2.control_surface.control.sysex import ColorSysexControl from ableton.v3.base import depends from ableton.v3.control_surface.component import Component -from ableton.v3.control_surface.controls import ButtonControl logger = logging.getLogger(__name__) @@ -32,8 +31,6 @@ class HardwareComponent(Component): backlight_sysex: ColorSysexControl.State = NullableColorSysexControl(color=None) # type: ignore standalone_sysex: ColorSysexControl.State = ColorSysexControl(color=False) # type: ignore - ping_button: Any = ButtonControl() - @depends(send_midi=None) def __init__( self, @@ -49,10 +46,10 @@ def __init__( # The program change to send when switching into standalone mode. self._standalone_program: Optional[int] = None - # Initial values for device properties - these will get set externally when - # actually setting up this component. - self._backlight: Optional[bool] = None # None for unmanaged. - self._standalone: bool = False + # Initial values for device properties. None indicates an unmanaged/unknown + # property. + self._backlight: Optional[bool] = None + self._standalone: Optional[bool] = None @property def backlight(self) -> Optional[bool]: @@ -64,11 +61,11 @@ def backlight(self, backlight: bool): self._update_backlight() @property - def standalone(self): + def standalone(self) -> Optional[bool]: return self._standalone @standalone.setter - def standalone(self, standalone: bool): + def standalone(self, standalone: Optional[bool]): self._standalone = standalone self._update_standalone() @@ -86,22 +83,17 @@ def standalone_program(self, standalone_program: Union[int, None]): self._standalone_program = standalone_program self._update_standalone_program() - @ping_button.pressed - def ping_input(self, _): - logger.info("ponging ping") - self.ping_button.control_element.send_value(True) - def update(self): super().update() self._update_standalone() self._update_backlight() def _update_backlight(self): - if self.is_enabled(): + if self.is_enabled() and self.backlight is not None: self.backlight_sysex.color = self.backlight def _update_standalone(self): - if self.is_enabled(): + if self.is_enabled() and self.standalone is not None: self.standalone_sysex.color = self._standalone self._update_standalone_program() diff --git a/control_surface/mappings.py b/control_surface/mappings.py index 721ca13..314ba96 100644 --- a/control_surface/mappings.py +++ b/control_surface/mappings.py @@ -272,12 +272,18 @@ def create(self) -> Mappings: mappings: Mappings = {} mappings["Hardware"] = dict( - # Turned off by default, since we'll be in `_disabled` mode at startup. This - # shouldn't be toggled except when entering/exiting `_disabled` mode. + # Initially disabled. Enabling/disabling this component should only happen + # during transitions from/to disabled mode; all other modes can assume that + # it's enabled. enable=False, # Permanent hardware mappings. backlight_sysex="backlight_sysex", standalone_sysex="standalone_sysex", + ) + + mappings["Ping"] = dict( + # Always enabled. + enable=True, # Ping input used by tests. ping_button="ping_sysex", ) @@ -285,7 +291,8 @@ def create(self) -> Mappings: def set_backlight(backlight: bool): self._get_component("Hardware").backlight = backlight - # Toggle modes. Actions get mapped to the cycle buttons. + # Toggle modes, controlled by individual mode components. Actions get mapped to + # the cycle buttons. for name, initial_state, set_state in ( ( "Auto_Arm_Modes", @@ -307,11 +314,36 @@ def set_backlight(backlight: bool): mappings["Main_Modes"] = { "modes_component_type": MainModesComponent, # Base mode where no values should ever be sent. Active while the controller - # is disconnected, which gives us better control over the order of - # operations when it connects. + # is disconnected (or while its connection state is unknown). Transitions + # into and out of this mode are managed by the main control surface + # instance. DISABLED_MODE_NAME: { "modes": [ + # Make sure the background has no bindings, so no more LED updates get sent. LayerMode(self._get_component("Background"), Layer()), + # Drop all pending messages. + CallFunctionMode(on_enter_fn=self.__drop_accumulated_ccs), + # Explicitly mark the standalone status as unmanaged, and don't + # reset it on mode exit. Subsequent modes will update it as + # necessary. + PersistentSetAttributeMode( + self._get_component("Hardware"), "standalone", None + ), + # Explicitly mark the backlight as unmanaged, and reset it to the + # previous value on mode exit, i.e. re-send the current backlight + # state (if any). + # + # Note this assumes that we won't be interacting with the backlight + # toggle component while disabled mode is active. + SetAttributeMode( + self._get_component("Hardware"), "backlight", None + ), + # Disable the hardware component on entry, and re-enable it on + # entry. This isn't totally necessary as the controlled parameters + # are already marked as unmanaged, but it serves as a failsafe + # against sysex messages being sent, and it's a clean way to enable + # the hardware component after device identification, but leave it + # disabled during init. InvertedMode(EnablingMode(self._get_component("Hardware"))), ] }, @@ -321,16 +353,12 @@ def set_backlight(backlight: bool): STANDALONE_INIT_MODE_NAME: { "modes": [ # Unlike other standalone modes, the init mode doesn't get exited - # via the `standalone_exit_button`, which would normally invoke the - # transition to hosted mode. Instead, we need to enter hosted mode - # explicitly when leaving the mode. - InvertedMode( - PersistentSetAttributeMode( - self._get_component("Hardware"), "standalone", False - ) - ), + # via the `standalone_exit_button`, which would normally insert a + # delay between sending the standalone background program and + # swtiching to a hosted mode. Instead, we set the background program + # on mode entry, and rely on the fact that the control surface + # inserts a delay explicitly while it passes through this mode. self._enter_standalone_mode(self._configuration.background_program), - # TODO: Make sure LEDs are cleared. ] }, } @@ -354,17 +382,46 @@ def _get_component(self, name: str) -> Component: assert component return component + def __set_standalone_if_modified(self, standalone: Optional[bool]): + hardware = self._get_component("Hardware") + + # The hardware component's `standalone` setter will potentially send sysex + # values even if the new standalone state matches the old one. Check the value + # first to avoid sending unnecessary sysex updates (which cause momentary + # unresponsiveness on the controller). + if hardware.standalone != standalone: + hardware.standalone = standalone + + # Force MIDI updates next time elements are updated, even if the control surface + # thinks the update is unnecessary. This gets called within + # `__drop_accumulated_ccs`, and should also be called when returning to hosted mode + # from standalone mode. + def __clear_send_caches(self): + elements = self._control_surface.elements + assert elements + + elements.display.clear_send_cache() + for light in elements.lights_raw: + light.clear_send_cache() + + # Clear any pending CC messages, without actually sending them. + def __drop_accumulated_ccs(self): + with self._control_surface.suppressing_send_midi( + # CC status byte is 0xBx. + lambda msg: 0xB0 <= msg[0] < 0xC0 + ): + self._control_surface._ownership_handler.commit_ownership_changes() + self._control_surface._flush_midi_messages() + + # The control surface might now have elements in its MIDI cache which weren't + # actually sent. + self.__clear_send_caches() + # Return a mode which enters standalone mode and activates the given program (if # any), and returns to the background program (if any) on exit. def _enter_standalone_mode(self, standalone_program: Optional[int]) -> Mode: hardware = self._get_component("Hardware") - def clear_light_caches(): - elements = self._control_surface.elements - assert elements - for light in elements.lights_raw: - light.clear_send_cache() - def set_standalone_program(standalone_program: Optional[int]): if ( standalone_program is not None @@ -377,19 +434,28 @@ def set_standalone_program(standalone_program: Optional[int]): hardware.standalone_program = standalone_program return CompoundMode( - # We don't have control of the LEDs, so make sure everything gets rendered - # as we re-enter hosted mode. This also ensures that LED states will all be - # rendered on disconnect/reconnect events, since we pass through - # _standalone_init mode in that case. - CallFunctionMode(on_exit_fn=clear_light_caches), + # Drop accumulated MIDI data before beginning the transition to standalone + # mode. This avoids potential batching issues with message ordering while MIDI + # messages are being accumulated. CC messages are also pointless at this + # stage, since the hardware will take control of the interface in standalone + # mode. + CallFunctionMode(on_enter_fn=self.__drop_accumulated_ccs), + # Flush accumulated MIDI data before beginning the transition to standalone + # mode. + # + # In principle this is unnecessary as `__drop_accumulated_ccs` also calls + # this method on entry, but this serves as a failsafe in case anything weird + # happens with component states while the controller is in standalone mode. + CallFunctionMode(on_exit_fn=self.__clear_send_caches), # Set the program attribute before actually switching into standalone mode, # so that we don't send an extra message for whatever program is currently - # active. + # active while we set the hardware's standalone state. If the device is + # already in standalone mode, this will just send the message immediately. CallFunctionMode( on_enter_fn=partial(set_standalone_program, standalone_program) ), - # Send the standalone message on enter, but not the hosted mode message on - # exit. + # Set the standalone state on enter (if necessary), but don't revert + # it to the previous value on exit. # # Regardless of whether `_flush_midi_messages()` is called, # `_c_instance.send_midi` seems to batch messages such that sysex messages @@ -404,7 +470,9 @@ def set_standalone_program(standalone_program: Optional[int]): # _standalone_init_mode (where the background PC gets sent at mode entry, # and the transition to hosted mode is handled explicitly in the mode # definition). - PersistentSetAttributeMode(hardware, "standalone", True), + CallFunctionMode( + on_enter_fn=partial(self.__set_standalone_if_modified, True) + ), # The SoftStep seems to keep track of the current LED states for each # standalone preset in the setlist. Whenever a preset is loaded, the Init # source will fire (potentially setting some LED states explicitly), and any @@ -508,6 +576,9 @@ def _mode_select_button(self): def _action_button(self): return get_element("buttons", 1, 4) + # Create one of the main user-facing modes (including user standalone modes). Apply + # any overrides from the configuration, and for hosted modes, set up hardware + # elements. def _create_main_mode(self, mode: MainMode) -> RootModeSpecification: main_mode_specification = self._main_mode_factory(mode)() @@ -516,6 +587,8 @@ def _create_main_mode(self, mode: MainMode) -> RootModeSpecification: behaviour = None mode_category = get_main_mode_category(mode) + # Check whether this mode is assigned to short-press anywhere on the mode select + # screen. def is_leading_mode( key_mapping: Optional[ModeSelectKeySpecification], ): @@ -564,12 +637,17 @@ def is_leading_mode( # Control layers which are present in all non-standalone modes. main_modes_mode: SimpleModeSpecification + hardware_mode: SimpleModeSpecification expression_mode: SimpleModeSpecification if mode_category is MainModeCategory.standalone: # Standalone modes shouldn't bind anything (except the exit button, which # gets set up elsewhere). main_modes_mode = CallFunctionMode() expression_mode = CallFunctionMode() + + # Hardware changes for standalone modes get handled by the modes themselves + # via `_enter_standalone_mode`. + hardware_mode = CallFunctionMode() else: # Bind the mode select button. main_modes_mode = { @@ -582,6 +660,10 @@ def is_leading_mode( component="Device", expression_pedal="expression_slider" ) + hardware_mode = CallFunctionMode( + on_enter_fn=partial(self.__set_standalone_if_modified, False) + ) + # Navigation controls. navigation_mode = self._navigation_mode(*(self._mode_navigation_targets(mode))) @@ -605,6 +687,8 @@ def is_leading_mode( return { "modes": [ + # Enable hosted mode if necessary. + hardware_mode, # Special key safety strategy if any. key_safety_mode, # Make sure any unbound LEDs are turned off. @@ -767,32 +851,15 @@ def _edit_track_controls_mode(self, name: MainMode) -> _MainModeSpecification: def _standalone_mode(self, name: MainMode) -> _MainModeSpecification: index = int(get_index_str(name)) - # Force all current MIDI/control state to be written to the device. This - # works-ish - sometimes CCs still get sent after the switch to standalone mode - # for some reason. But they seem to always get flushed before the main program - # change is sent. - def flush(): - self._control_surface._ownership_handler.commit_ownership_changes() - self._control_surface._flush_midi_messages() - - # Make sure the display gets re-rendered in hosted mode even if the text - # hasn't changed. - elements = self._control_surface.elements - assert elements - elements.display.clear_send_cache() - return [ + # Make sure the background has no bindings, so no more LED updates get sent. + LayerMode(self._get_component("Background"), Layer()), # We use the same CC as nav left (80) for the exit button, so a) we don't # reduce the number of useful CCs for MIDI mapping in standalone mode, and # b) the mode select button could be triggered by button mashing if the # controller ever got stuck in hosted mode when it was supposed to be in # standalone mode. dict(component="Main_Modes", standalone_exit_button="nav_left_button"), - # Make sure all LEDs are cleared. This affects the standalone background - # program state. - CallFunctionMode(on_enter_fn=flush), - # Make sure the background has no bindings, so no more LED updates get sent. - LayerMode(self._get_component("Background"), Layer()), # Enter standalone mode and select the program. self._enter_standalone_mode(index - 1), ] diff --git a/control_surface/mode.py b/control_surface/mode.py index ef36a2c..24154b7 100644 --- a/control_surface/mode.py +++ b/control_surface/mode.py @@ -354,9 +354,8 @@ def _finish_standalone_transition_task(self): return finish_standalone_transition_task def _finish_standalone_transition(self): - # After the delay, the background program change message should have been sent, - # so we can now send the actual sysexes to switch to hosted mode. - self._hardware.standalone = False + # After the delay, the background program change message should have been + # sent. Now we can safely switch to a hosted mode. if self.__standalone_transition_is_mode_select: self.push_mode(MODE_SELECT_MODE_NAME) else: diff --git a/control_surface/ping.py b/control_surface/ping.py new file mode 100644 index 0000000..c34c566 --- /dev/null +++ b/control_surface/ping.py @@ -0,0 +1,18 @@ +import logging +from typing import Any + +from ableton.v3.control_surface.component import Component +from ableton.v3.control_surface.controls import ButtonControl + +logger = logging.getLogger(__name__) + + +# Component to respond to pings from the test runner. +class PingComponent(Component): + ping_button: Any = ButtonControl() + + @ping_button.pressed + def ping_input(self, _): + if self.is_enabled(): + logger.info("ponging ping") + self.ping_button.control_element.send_value(True) diff --git a/poetry.lock b/poetry.lock index 6328f22..acb2bcd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -21,6 +21,17 @@ files = [ {file = "future-0.18.3.tar.gz", hash = "sha256:34a17436ed1e96697a86f9de3d15a3b0be01d8bc8de9c1dffd59fb8234ed5307"}, ] +[[package]] +name = "gherkin-official" +version = "29.0.0" +description = "Gherkin parser (official, by Cucumber team)" +optional = false +python-versions = "*" +files = [ + {file = "gherkin_official-29.0.0-py3-none-any.whl", hash = "sha256:26967b0d537a302119066742669e0e8b663e632769330be675457ae993e1d1bc"}, + {file = "gherkin_official-29.0.0.tar.gz", hash = "sha256:dbea32561158f02280d7579d179b019160d072ce083197625e2f80a6776bb9eb"}, +] + [[package]] name = "iniconfig" version = "2.0.0" @@ -32,6 +43,17 @@ files = [ {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, ] +[[package]] +name = "janus" +version = "2.0.0" +description = "Mixed sync-async queue to interoperate between asyncio tasks and classic threads" +optional = false +python-versions = ">=3.9" +files = [ + {file = "janus-2.0.0-py3-none-any.whl", hash = "sha256:7e6449d34eab04cd016befbd7d8c0d8acaaaab67cb59e076a69149f9031745f9"}, + {file = "janus-2.0.0.tar.gz", hash = "sha256:0970f38e0e725400496c834a368a67ee551dc3b5ad0a257e132f5b46f2e77770"}, +] + [[package]] name = "mako" version = "1.3.8" @@ -287,58 +309,60 @@ nodejs = ["nodejs-wheel-binaries"] [[package]] name = "pytest" -version = "7.4.4" +version = "8.3.4" description = "pytest: simple powerful testing with Python" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "pytest-7.4.4-py3-none-any.whl", hash = "sha256:b090cdf5ed60bf4c45261be03239c2c1c22df034fbffe691abe93cd80cea01d8"}, - {file = "pytest-7.4.4.tar.gz", hash = "sha256:2cf0005922c6ace4a3e2ec8b4080eb0d9753fdc93107415332f50ce9e7994280"}, + {file = "pytest-8.3.4-py3-none-any.whl", hash = "sha256:50e16d954148559c9a74109af1eaf0c945ba2d8f30f0a3d3335edde19788b6f6"}, + {file = "pytest-8.3.4.tar.gz", hash = "sha256:965370d062bce11e73868e0335abac31b4d3de0e82f4007408d242b4f8610761"}, ] [package.dependencies] colorama = {version = "*", markers = "sys_platform == \"win32\""} iniconfig = "*" packaging = "*" -pluggy = ">=0.12,<2.0" +pluggy = ">=1.5,<2" [package.extras] -testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] [[package]] name = "pytest-asyncio" -version = "0.21.2" +version = "0.25.0" description = "Pytest support for asyncio" optional = false -python-versions = ">=3.7" +python-versions = ">=3.9" files = [ - {file = "pytest_asyncio-0.21.2-py3-none-any.whl", hash = "sha256:ab664c88bb7998f711d8039cacd4884da6430886ae8bbd4eded552ed2004f16b"}, - {file = "pytest_asyncio-0.21.2.tar.gz", hash = "sha256:d67738fc232b94b326b9d060750beb16e0074210b98dd8b58a5239fa2a154f45"}, + {file = "pytest_asyncio-0.25.0-py3-none-any.whl", hash = "sha256:db5432d18eac6b7e28b46dcd9b69921b55c3b1086e85febfe04e70b18d9e81b3"}, + {file = "pytest_asyncio-0.25.0.tar.gz", hash = "sha256:8c0610303c9e0442a5db8604505fc0f545456ba1528824842b37b4a626cbf609"}, ] [package.dependencies] -pytest = ">=7.0.0" +pytest = ">=8.2,<9" [package.extras] -docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"] -testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy (>=0.931)", "pytest-trio (>=0.7.0)"] +docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1)"] +testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] [[package]] name = "pytest-bdd" -version = "6.1.1" +version = "8.1.0" description = "BDD for pytest" optional = false -python-versions = ">=3.7,<4.0" +python-versions = ">=3.9" files = [ - {file = "pytest_bdd-6.1.1-py3-none-any.whl", hash = "sha256:57eba5878d77036f356a85fb1d108cb061d8af4fb4d032b1a424fa9abe9e498b"}, - {file = "pytest_bdd-6.1.1.tar.gz", hash = "sha256:138af3592bcce5d4684b0d690777cf199b39ce45d423ca28086047ffe6111010"}, + {file = "pytest_bdd-8.1.0-py3-none-any.whl", hash = "sha256:2124051e71a05ad7db15296e39013593f72ebf96796e1b023a40e5453c47e5fb"}, + {file = "pytest_bdd-8.1.0.tar.gz", hash = "sha256:ef0896c5cd58816dc49810e8ff1d632f4a12019fb3e49959b2d349ffc1c9bfb5"}, ] [package.dependencies] +gherkin-official = ">=29.0.0,<30.0.0" Mako = "*" +packaging = "*" parse = "*" parse-type = "*" -pytest = ">=6.2.0" +pytest = ">=7.0.0" typing-extensions = "*" [[package]] @@ -396,28 +420,29 @@ jupyter = ["ipywidgets (>=7.5.1,<9)"] [[package]] name = "ruff" -version = "0.1.15" +version = "0.8.3" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" files = [ - {file = "ruff-0.1.15-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:5fe8d54df166ecc24106db7dd6a68d44852d14eb0729ea4672bb4d96c320b7df"}, - {file = "ruff-0.1.15-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6f0bfbb53c4b4de117ac4d6ddfd33aa5fc31beeaa21d23c45c6dd249faf9126f"}, - {file = "ruff-0.1.15-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e0d432aec35bfc0d800d4f70eba26e23a352386be3a6cf157083d18f6f5881c8"}, - {file = "ruff-0.1.15-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:9405fa9ac0e97f35aaddf185a1be194a589424b8713e3b97b762336ec79ff807"}, - {file = "ruff-0.1.15-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c66ec24fe36841636e814b8f90f572a8c0cb0e54d8b5c2d0e300d28a0d7bffec"}, - {file = "ruff-0.1.15-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:6f8ad828f01e8dd32cc58bc28375150171d198491fc901f6f98d2a39ba8e3ff5"}, - {file = "ruff-0.1.15-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:86811954eec63e9ea162af0ffa9f8d09088bab51b7438e8b6488b9401863c25e"}, - {file = "ruff-0.1.15-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:fd4025ac5e87d9b80e1f300207eb2fd099ff8200fa2320d7dc066a3f4622dc6b"}, - {file = "ruff-0.1.15-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b17b93c02cdb6aeb696effecea1095ac93f3884a49a554a9afa76bb125c114c1"}, - {file = "ruff-0.1.15-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ddb87643be40f034e97e97f5bc2ef7ce39de20e34608f3f829db727a93fb82c5"}, - {file = "ruff-0.1.15-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:abf4822129ed3a5ce54383d5f0e964e7fef74a41e48eb1dfad404151efc130a2"}, - {file = "ruff-0.1.15-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6c629cf64bacfd136c07c78ac10a54578ec9d1bd2a9d395efbee0935868bf852"}, - {file = "ruff-0.1.15-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:1bab866aafb53da39c2cadfb8e1c4550ac5340bb40300083eb8967ba25481447"}, - {file = "ruff-0.1.15-py3-none-win32.whl", hash = "sha256:2417e1cb6e2068389b07e6fa74c306b2810fe3ee3476d5b8a96616633f40d14f"}, - {file = "ruff-0.1.15-py3-none-win_amd64.whl", hash = "sha256:3837ac73d869efc4182d9036b1405ef4c73d9b1f88da2413875e34e0d6919587"}, - {file = "ruff-0.1.15-py3-none-win_arm64.whl", hash = "sha256:9a933dfb1c14ec7a33cceb1e49ec4a16b51ce3c20fd42663198746efc0427360"}, - {file = "ruff-0.1.15.tar.gz", hash = "sha256:f6dfa8c1b21c913c326919056c390966648b680966febcb796cc9d1aaab8564e"}, + {file = "ruff-0.8.3-py3-none-linux_armv6l.whl", hash = "sha256:8d5d273ffffff0acd3db5bf626d4b131aa5a5ada1276126231c4174543ce20d6"}, + {file = "ruff-0.8.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:e4d66a21de39f15c9757d00c50c8cdd20ac84f55684ca56def7891a025d7e939"}, + {file = "ruff-0.8.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:c356e770811858bd20832af696ff6c7e884701115094f427b64b25093d6d932d"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9c0a60a825e3e177116c84009d5ebaa90cf40dfab56e1358d1df4e29a9a14b13"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:75fb782f4db39501210ac093c79c3de581d306624575eddd7e4e13747e61ba18"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7f26bc76a133ecb09a38b7868737eded6941b70a6d34ef53a4027e83913b6502"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:01b14b2f72a37390c1b13477c1c02d53184f728be2f3ffc3ace5b44e9e87b90d"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:53babd6e63e31f4e96ec95ea0d962298f9f0d9cc5990a1bbb023a6baf2503a82"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1ae441ce4cf925b7f363d33cd6570c51435972d697e3e58928973994e56e1452"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7c65bc0cadce32255e93c57d57ecc2cca23149edd52714c0c5d6fa11ec328cd"}, + {file = "ruff-0.8.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5be450bb18f23f0edc5a4e5585c17a56ba88920d598f04a06bd9fd76d324cb20"}, + {file = "ruff-0.8.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:8faeae3827eaa77f5721f09b9472a18c749139c891dbc17f45e72d8f2ca1f8fc"}, + {file = "ruff-0.8.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:db503486e1cf074b9808403991663e4277f5c664d3fe237ee0d994d1305bb060"}, + {file = "ruff-0.8.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:6567be9fb62fbd7a099209257fef4ad2c3153b60579818b31a23c886ed4147ea"}, + {file = "ruff-0.8.3-py3-none-win32.whl", hash = "sha256:19048f2f878f3ee4583fc6cb23fb636e48c2635e30fb2022b3a1cd293402f964"}, + {file = "ruff-0.8.3-py3-none-win_amd64.whl", hash = "sha256:f7df94f57d7418fa7c3ffb650757e0c2b96cf2501a0b192c18e4fb5571dfada9"}, + {file = "ruff-0.8.3-py3-none-win_arm64.whl", hash = "sha256:fe2756edf68ea79707c8d68b78ca9a58ed9af22e430430491ee03e718b5e4936"}, + {file = "ruff-0.8.3.tar.gz", hash = "sha256:5e7558304353b84279042fc584a4f4cb8a07ae79b2bf3da1a7551d960b5626d3"}, ] [[package]] @@ -442,6 +467,24 @@ files = [ {file = "svgwrite-1.4.3.zip", hash = "sha256:a8fbdfd4443302a6619a7f76bc937fc683daf2628d9b737c891ec08b8ce524c3"}, ] +[[package]] +name = "typeguard" +version = "4.4.1" +description = "Run-time type checker for Python" +optional = false +python-versions = ">=3.9" +files = [ + {file = "typeguard-4.4.1-py3-none-any.whl", hash = "sha256:9324ec07a27ec67fc54a9c063020ca4c0ae6abad5e9f0f9804ca59aee68c6e21"}, + {file = "typeguard-4.4.1.tar.gz", hash = "sha256:0d22a89d00b453b47c49875f42b6601b961757541a2e1e0ef517b6e24213c21b"}, +] + +[package.dependencies] +typing-extensions = ">=4.10.0" + +[package.extras] +doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme (>=1.3.0)"] +test = ["coverage[toml] (>=7)", "mypy (>=1.2.0)", "pytest (>=7)"] + [[package]] name = "typing-extensions" version = "4.12.2" @@ -456,4 +499,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "7adfcc218c1ff429a3ed52cf53e57790a9f321d011ef2284a733b469a8b814a7" +content-hash = "4cec59486e684ed997c92d73f0ea6cb373d5161f99c29ad9f2bbec404202ce97" diff --git a/pyproject.toml b/pyproject.toml index 2f0b26b..c89f9fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,9 @@ future = "0.18.3" # Dependencies for testing and build tasks. These won't be available at runtime in Live. [tool.poetry.group.dev.dependencies] +# Thread-safe asyncio-aware queue, used to process MIDI messages in tests. +janus = "^2.0.0" + # MIDI input/output for tests. mido = "^1.3.2" python-rtmidi = "^1.5.2" @@ -26,26 +29,29 @@ python-rtmidi = "^1.5.2" pyright = "^1.1.348" # Test framework. -pytest = "^7.1.0" -pytest-asyncio = "^0.21.1" -pytest-bdd = "^6.1.1" +pytest = "^8.3.4" +pytest-asyncio = "^0.25.0" +pytest-bdd = "^8.1.0" # Pretty printer for tests. rich = "^13.7.0" # Formatter and linter -ruff = "^0.1.13" +ruff = "^0.8.0" # Used to generate images for the README. svgwrite = "^1.4.2" +# Runtime typechecking, used for validating test method arguments. +typeguard = "^4.4.1" + # Backport post-python3.7 typing extensions. This can't be imported at runtime, but can # be used in .pyi files or with the TYPE_CHECKING variable. typing-extensions = "^4.7.0" [tool.pyright] -# Live's built-in python version as of v12.0.2 is 3.7.3. -pythonVersion = "3.7" +# Live's built-in python version as of v12.1 is 3.11.6. +pythonVersion = "3.11" exclude = ["**/__pycache__", ".git", ".venv", "__ext__/"] extraPaths = ["./__ext__/AbletonLive12_MIDIRemoteScripts"] @@ -57,12 +63,8 @@ reportMissingModuleSource = false venvPath = "." venv = ".venv" -[tool.pytest.ini_options] -# Allow output. -addopts = "--capture=no" - [tool.ruff] -target-version = "py37" +target-version = "py311" # Exclude generated libs. extend-exclude = ["__ext__"] diff --git a/tests/backlight.feature b/tests/backlight.feature index 23838f9..11a3ada 100644 --- a/tests/backlight.feature +++ b/tests/backlight.feature @@ -1,6 +1,7 @@ Feature: Backlight management Scenario Outline: Toggling the backlight when it is unconfigured - Given the set is open + Given the SS2 is connected + And the set is open And the SS2 is initialized Then the backlight should be diff --git a/tests/basics.feature b/tests/basics.feature index e91bd82..e69b095 100644 --- a/tests/basics.feature +++ b/tests/basics.feature @@ -1,6 +1,7 @@ Feature: Basic usage Background: - Given the default set is open + Given the SS2 is connected + And the default set is open And the SS2 is initialized Scenario: Putting the device into hosted mode diff --git a/tests/clip_launch.feature b/tests/clip_launch.feature index 10262c7..205d72e 100644 --- a/tests/clip_launch.feature +++ b/tests/clip_launch.feature @@ -1,6 +1,7 @@ Feature: Clip launch grid Scenario: Using a 1x8 clip launch grid - Given the wide_clip_launch set is open + Given the SS2 is connected + And the wide_clip_launch set is open And the SS2 is initialized # Make sure track 1 is selected, to avoid any inconsistencies if @@ -23,7 +24,7 @@ Feature: Clip launch grid # name. And I press nav right And I press nav left - Then the display should be "1-MI" + Then the display should be scrolling "1-MIDI" When I press key 0 Then the mode select screen should be active diff --git a/tests/configuration.feature b/tests/configuration.feature new file mode 100644 index 0000000..2f323e2 --- /dev/null +++ b/tests/configuration.feature @@ -0,0 +1,12 @@ +Feature: Configuration settings (besides overrides) + Scenario: Alternate initial mode and initial previous mode + Given the SS2 is connected + And the alt_initial_mode set is open + And the SS2 is initialized + + Then the display should be "Util" + And light 8 should be off + And light 0 should be solid green + + When I long-press key 0 + Then the display should be "XY" diff --git a/tests/conftest.py b/tests/conftest.py index efad187..3c82be8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,38 +1,46 @@ from __future__ import annotations import asyncio +import functools import importlib.machinery import importlib.util import os -import queue import time import webbrowser +from contextlib import ExitStack, asynccontextmanager from enum import Enum from functools import partial +from threading import Lock from typing import ( TYPE_CHECKING, Any, + AsyncGenerator, Awaitable, Callable, Collection, + Concatenate, + Coroutine, Dict, Generator, List, Optional, + ParamSpec, + Sequence, Tuple, TypeVar, Union, ) +import janus import mido -from pytest import FixtureRequest, fixture, mark +from pytest import fixture from pytest_bdd import given, parsers, then, when from pytest_bdd.parser import Feature, Step -from pytest_bdd.utils import get_args from rich.console import Console from rich.table import Table from rich.text import Text -from typing_extensions import Never, TypeAlias +from typeguard import typechecked +from typing_extensions import Never if TYPE_CHECKING: # The type checker sees packages in the project root. @@ -40,8 +48,8 @@ import control_surface.sysex as sysex else: # Outside the type checker, we don't have direct import access to the main control - # surface, but the sysex constants would be too annoying to duplicate. Load it manually - # from the path, see + # surface, but the hardware and sysex constants would be too annoying to + # duplicate. Load it manually from the path, see # https://csatlas.com/python-import-file-module/#import_a_file_in_a_different_directory. def _load_module_from_path(name: str, path: str): path = os.path.join(path, f"{name}.py") @@ -72,12 +80,11 @@ def _load_module_from_path(name: str, path: str): is_debug = "DEBUG" in os.environ T = TypeVar("T") +P = ParamSpec("P") -# Time between update messages before the state can be considered stable, i.e. fully -# updated by Live. This should be shorter than the display scroll duration (0.2s). -STABILITY_DURATION = 0.15 -# Time between iterations of polling loops. -POLL_INTERVAL = 0.03 +# Standard identity request, see +# http://midi.teragonaudio.com/tech/midispec/identity.htm. +IDENTITY_REQUEST_SYSEX = (0xF0, 0x7E, 0x7F, 0x06, 0x01, 0xF7) # Copypasta but we want to isolate the test files from the main module. RED_LED_BASE_CC = 20 @@ -91,6 +98,31 @@ def _load_module_from_path(name: str, path: str): DISPLAY_BASE_CC = 50 DISPLAY_WIDTH = 4 +# Exit button in standalone modes. +STANDALONE_EXIT_CC = 80 + +MIDI_CHANNEL = 0 + +# The number of seconds to wait after the controller responds to a ping before +# considering it responsive. Particularly when booting Live to open a set, there's +# usually a period where the controller reacts slowly after responding to a ping. +RESPONSIVENESS_DELAY = 1.5 + +# Time after user actions to wait for potential responses from Live, e.g. when +# considering whether the device state is stable. +MIDI_RESPONSE_DELAY = 0.3 + +# Time between incoming messages before the device state can be considered stable, +# i.e. fully updated by Live. This should be shorter than the framerate of scrolling +# text, i.e. 0.2s. +STABILITY_DELAY = 0.15 + +# Seconds to wait to trigger a long press. +LONG_PRESS_DELAY = 0.6 + +# Time between iterations of polling loops. +POLL_INTERVAL = 0.03 + # The number of separate messages, which we'll track individually, which need to be sent # to switch modes. NUM_STANDALONE_TOGGLE_MESSAGES = len(sysex.SYSEX_STANDALONE_MODE_ON_REQUESTS) @@ -98,6 +130,7 @@ def _load_module_from_path(name: str, path: str): # Error handling. +@typechecked def pytest_bdd_step_error(step: Step, feature: Feature, step_func_args: Dict[str, Any]): console = Console() console.print( @@ -108,6 +141,7 @@ def pytest_bdd_step_error(step: Step, feature: Feature, step_func_args: Dict[str device_state.print() +@typechecked def pytest_bdd_after_step(step: Step, feature: Feature, step_func_args: Dict[str, Any]): if is_debug: console = Console() @@ -119,60 +153,87 @@ def pytest_bdd_after_step(step: Step, feature: Feature, step_func_args: Dict[str device_state.print() -# Emulation of the SoftStep LED/Display state based on incoming MIDI messages. +# Read-only view of the SoftStep LED/Display state based on incoming MIDI messages. class DeviceState: class UpdateCategory(Enum): lights = "lights" display = "display" + backlight = "backlight" + mode = "mode" + program = "program" def __init__(self) -> None: - # Indexed by (physical key number - 1), i.e. from the bottom left. - num_keys = hardware.NUM_ROWS * hardware.NUM_COLS - self._red_values: List[int] = [0] * num_keys - self._green_values: List[int] = [0] * num_keys + # Directly-set set LED values, indexed by ((physical key number - 1) % 10), + # i.e. from the bottom left. + self._red_values: List[Optional[int]] + self._green_values: List[Optional[int]] # Pending values for setting colors via the older API. - self._deprecated_led_values: List[Optional[int]] = [ - None - ] * NUM_DEPRECATED_LED_FIELDS + self._deprecated_led_values: List[Optional[int]] - # Display initially filled with spaces. - self._display_values: List[int] = [32] * DISPLAY_WIDTH + # Display characters. + self._display_values: List[Optional[int]] # States of individual toggles for standalone mode. Each one can be true # (standalone mode), false (hosted mode), or None if no corresponding message # has been received. These messages are expected to be received in succession, # so we should never stay in a state with some toggles flipped and some not. - self._standalone_toggles: List[Optional[bool]] = [ - None - ] * NUM_STANDALONE_TOGGLE_MESSAGES + self._standalone_toggles: List[Optional[bool]] # Backlight on/off, or unset (None). - self._backlight: Optional[bool] = None + self._backlight: Optional[bool] - self._identity_request_event = asyncio.Event() - self._ping_event = asyncio.Event() + # Most recent standalone mode program. + self._standalone_program: Optional[int] - # Last update times by category, so we can detect whether the device is being - # actively updated. - self._update_times: Dict[DeviceState.UpdateCategory, float] = {} - for category in DeviceState.UpdateCategory: - self._update_times[category] = 0.0 + # Initialize all state values. + self.reset() + + # Tracker for validation error suppression prior to device init. + self.__allow_ccs_until_managed: bool = False - # Additional message handlers. - self._message_listeners: List[Optional[Callable[[mido.Message], Any]]] = [] + # Reset all properties to unknown/unmanaged. + def reset(self) -> None: + self._reset_leds_and_display() + + self._standalone_toggles = [None] * NUM_STANDALONE_TOGGLE_MESSAGES + + self._backlight = None + + # Most recent standalone mode program. + self._standalone_program = None + + # Reset just the LEDs and display to unknown/unmanaged. This is used when switching + # to standalone mode, where these values can change based on user actions + # (i.e. independently of CC messages being sent to the device). + def _reset_leds_and_display(self) -> None: + num_keys = hardware.NUM_ROWS * hardware.NUM_COLS + + self._red_values = [None] * num_keys + self._green_values = [None] * num_keys + self._deprecated_led_values = [None] * NUM_DEPRECATED_LED_FIELDS + + self._display_values = [None] * DISPLAY_WIDTH @property - def red_values(self): + def red_values(self) -> Sequence[Optional[int]]: return self._red_values @property - def green_values(self): + def green_values(self) -> Sequence[Optional[int]]: return self._green_values @property - def display_text(self) -> str: - return "".join(chr(value) for value in self._display_values) + def display_text(self) -> Optional[str]: + assert len(self._display_values) == DISPLAY_WIDTH + result = "" + for value in self._display_values: + # If any character values are unknown, treat the whole display content as + # unknown. + if value is None: + return None + result += chr(value) + return result # Individual trackers for the various messages that need to be sent to enter/exit # standalone mode. @@ -180,30 +241,80 @@ def display_text(self) -> str: def standalone_toggles(self) -> Collection[Optional[bool]]: return self._standalone_toggles + # The most recent program sent while the controller was in standalone mode. + @property + def standalone_program(self) -> Optional[int]: + return self._standalone_program + @property def backlight(self) -> Optional[bool]: return self._backlight - # Returns a remove function. - def add_message_listener( - self, - listener: Callable[[mido.Message], Any], - ) -> Callable[[], None]: - index = len(self._message_listeners) - self._message_listeners.append(listener) - - def remove(): - self._message_listeners[index] = None - - return remove - - def receive_message(self, msg: mido.Message): - if msg.is_cc() and msg.dict()["channel"] == 0: - _, cc, value = msg.bytes() + def receive_message(self, message: mido.Message) -> "DeviceState.UpdateCategory": + """Validate and process an incoming MIDI message. + + This method imposes strict validation rules, and throws errors in some cases + that wouldn't cause any hardware issues (e.g. stray program change messages), + but which indicate something unexpected happening with the control surface. + + """ + msg_type, msg_channel = [ + message.dict().get(field, None) for field in ("type", "channel") + ] + + # Check whether the message type is generally allowed, based on the current + # state of the device. + if len(set(self.standalone_toggles)) == 1: + # All toggles are the same, we're not in the middle of a transition. + standalone_status = list(self.standalone_toggles)[0] + + if message.is_cc(): + assert standalone_status is False or ( + # Check whether we're suppressing these errors during init. + # + # Note that it should only be possible for this to be `True` when + # the standalone status is `None`. + self.__allow_ccs_until_managed + ), f"CC messages are only expected in hosted mode: {message}" + elif msg_type == "program_change": + assert ( + standalone_status is True + ), f"Program Change messages are only expected in standalone mode: {message}" + elif msg_type == "sysex": + # These are allowed. + pass + else: + raise RuntimeError(f"Unexpected message type: {message}") - # Commit the LED color from the deprecated API. This isn't an exact replica - # of the real hardware behavior, which has more edge cases and bugs, but - # we're only using this feature in a controlled way to render solid yellow. + else: + # If the standalone toggles aren't all the same, i.e. if we're in the + # process of transitioning between standalone and hosted mode, only allow + # additional standalone toggle messages. + assert ( + msg_type == "sysex" + ), f"Non-sysex messages not allowed while switching between standalone and hosted mode: {message}" + assert any( + matches_sysex(message, t) + for t in sysex.SYSEX_STANDALONE_MODE_ON_REQUESTS + + sysex.SYSEX_STANDALONE_MODE_OFF_REQUESTS + ), f"Invalid sysex message while switching between standalone and hosted mode: {message}" + + # Now handle the message in the interface. + if message.is_cc(): + assert ( + msg_channel == MIDI_CHANNEL + ), f"Got CC on unexpected channel: {message}" + + _, cc, value = message.bytes() + + # A few of these get sent immediately after activating LEDs via the + # deprecated color API, which (evidently) causes the hardware to flush + # updates and avoids issues when configuring additional LEDs via this API. + # + # When we receive one of these messages, commit the LED color from the + # deprecated API. This isn't an exact replica of the real hardware behavior, + # which has more edge cases and bugs, but we're only using this feature in a + # controlled way to render solid yellow. if cc == CLEAR_CC: if all([value is not None for value in self._deprecated_led_values]): location, color, state = self._deprecated_led_values @@ -214,90 +325,109 @@ def receive_message(self, msg: mido.Message): assert state is not None values[location] = state self._deprecated_led_values = [None] * len(self._deprecated_led_values) - - # Detect values that update internal arrays. - for base_cc, values, category in ( - ( - DEPRECATED_LED_BASE_CC, - self._deprecated_led_values, - DeviceState.UpdateCategory.lights, - ), - (RED_LED_BASE_CC, self._red_values, DeviceState.UpdateCategory.lights), - ( - GREEN_LED_BASE_CC, - self._green_values, - DeviceState.UpdateCategory.lights, - ), - ( - DISPLAY_BASE_CC, - self._display_values, - DeviceState.UpdateCategory.display, - ), - ): - if base_cc <= cc < base_cc + len(values): - values[cc - base_cc] = value - self._update_times[category] = time.time() - - # Identity request, see http://midi.teragonaudio.com/tech/midispec/identity.htm. - elif matches_sysex(msg, (0xF0, 0x7E, 0x7F, 0x06, 0x01, 0xF7)): - # Greeting request flag gets set forever once the message has been - # received. - if not self._identity_request_event.is_set(): - self._identity_request_event.set() - elif matches_sysex(msg, sysex.SYSEX_PING_RESPONSE): - # Ping response flag gets cleared immediately after notifying any - # current listeners. - self._ping_event.set() - self._ping_event.clear() - elif matches_sysex(msg, sysex.SYSEX_BACKLIGHT_OFF_REQUEST): + return DeviceState.UpdateCategory.lights + else: + # Detect values that update internal arrays. + for base_cc, values, category in ( + ( + DEPRECATED_LED_BASE_CC, + self._deprecated_led_values, + DeviceState.UpdateCategory.lights, + ), + ( + RED_LED_BASE_CC, + self._red_values, + DeviceState.UpdateCategory.lights, + ), + ( + GREEN_LED_BASE_CC, + self._green_values, + DeviceState.UpdateCategory.lights, + ), + ( + DISPLAY_BASE_CC, + self._display_values, + DeviceState.UpdateCategory.display, + ), + ): + if base_cc <= cc < base_cc + len(values): + values[cc - base_cc] = value + + return category + + # Handle program changes, verify that we're in standalone mode. + elif msg_type == "program_change": + assert ( + msg_channel == MIDI_CHANNEL + ), f"Got Program Change on unexpected channel: {message}" + + # We already verified that we're in standalone mode above, but sanity + # check to be sure. + assert all(t is True for t in self.standalone_toggles) + + program = message.dict()["program"] + assert isinstance(program, int) + self._standalone_program = program + return DeviceState.UpdateCategory.program + + # Handle backlight toggle messages. + elif matches_sysex(message, sysex.SYSEX_BACKLIGHT_OFF_REQUEST): self._backlight = False - elif matches_sysex(msg, sysex.SYSEX_BACKLIGHT_ON_REQUEST): + return DeviceState.UpdateCategory.backlight + elif matches_sysex(message, sysex.SYSEX_BACKLIGHT_ON_REQUEST): self._backlight = True - else: + return DeviceState.UpdateCategory.backlight + + # The only other allowed messages are the sysexes to toggle between + # standalone/hosted mode. + elif msg_type == "sysex": # Set standalone toggle flags if appropriate. for requests, standalone in ( (sysex.SYSEX_STANDALONE_MODE_ON_REQUESTS, True), (sysex.SYSEX_STANDALONE_MODE_OFF_REQUESTS, False), ): for idx, request in enumerate(requests): - if matches_sysex(msg, request): + if matches_sysex(message, request): self._standalone_toggles[idx] = standalone - for message_listener in self._message_listeners: - if message_listener: - message_listener(msg) - - async def wait_for_identity_request(self): - await self._identity_request_event.wait() - - async def wait_for_ping_response(self): - await self._ping_event.wait() - - # Wait until no updates to the given state type(s) have been received for the given - # duration. Timing is imprecise (but should never give false positives), as this - # uses polling rather than proper async notifications. - async def wait_until_stable( - self, - categories: Optional[Collection[DeviceState.UpdateCategory]] = None, - duration: float = STABILITY_DURATION, - ): - if categories is None: - categories = list(DeviceState.UpdateCategory) - - while True: - current_time = time.time() - needs_stability_since = current_time - duration - if all( - [ - self._update_times[category] <= needs_stability_since - for category in categories - ] - ): - break - await asyncio.sleep(POLL_INTERVAL) - + # Clear the CC validation error suppression if the device mode + # is now fully managed. + if all(t is not None for t in self._standalone_toggles): + self.__allow_ccs_until_managed = False + + # If we just switched to standalone mode, the LEDs and display + # are no longer under our control. + if all(t is True for t in self._standalone_toggles): + self._reset_leds_and_display() + + return DeviceState.UpdateCategory.mode + + # If we haven't returned by this point, the message is unrecognized. + raise ValueError(f"Unrecognized message: {message}") + + # Don't error on incoming CCs as long as the standalone/hosted mode status is + # unmanaged. Restore the default validation behavior once the status has been set + # explicitly. If the standalone/hosted mode status is already set explicitly, this + # is a no-op. + # + # This can be used to avoid validation errors when the device is quickly + # disconnected and reconnected. In such cases, Live seems to send some interface + # updates prior to `port_settings_changed` being fired, i.e. before it realizes that + # the device needs to be re-identified. + # + # Stray CCs at this stage should be harmless (as the device will be fully + # reinitialized once `port_settings_changed` eventually fires). At worst they could + # interfere with transient LED states of the initial standalone mode when the device + # boots up. + def allow_ccs_until_managed(self): + # Only set the variable if the state is at least partially unmanaged. + if any(t is None for t in self._standalone_toggles): + self.__allow_ccs_until_managed = True + + # Generate a console-printable representation of the LED states and display text. def _create_table(self) -> Table: led_state_representations = { + None: ("??", ""), # unknown/unmanaged 0: (" ", ""), # off 1: ("ON", "reverse"), # on 2: ("BL", ""), # normal blink @@ -305,9 +435,10 @@ def _create_table(self) -> Table: # flash omitted as it's not used. } - def led(red: int, green: int) -> Union[str, Text]: + def led(red: Optional[int], green: Optional[int]) -> Union[str, Text]: style = "" - state = 0 + state: Optional[int] = 0 + if red == 0 and green == 0: pass elif red == 0: @@ -319,7 +450,7 @@ def led(red: int, green: int) -> Union[str, Text]: elif red == green: style = "yellow" state = red # either way. - else: + else: # red and green differ but are both nonzero. raise RuntimeError("mixed LED states not supported") text, state_style = led_state_representations[state] @@ -338,7 +469,16 @@ def led(red: int, green: int) -> Union[str, Text]: ) for index in range(num_key_cols) ], - Text(self.display_text) if base_offset > 0 else "", + ( + Text( + "?" * DISPLAY_WIDTH + if self.display_text is None + else self.display_text + ) + # Render the display text on the top row. + if base_offset > 0 + else "" + ), ) return table @@ -349,175 +489,394 @@ def print(self, console: Optional[Console] = None): console.print(self._create_table()) -# Convert an async step to sync. Adapted from -# https://github.com/pytest-dev/pytest-bdd/issues/223#issuecomment-332084037. -# -# Hack also adapted from pytest_bdd's _get_scenario_decorator to automatically get the -# loop fixture while still injecting other fixtures. -def sync(*args: Callable[..., Awaitable[T]]) -> Callable[..., T]: - [fn] = args - func_args = get_args(fn) +# Decorator for async Device instance methods, which causes any errors during parallel +# message handling to be thrown immediately. +def guard_message_exceptions( + fn: Callable[Concatenate["Device", P], Coroutine[Any, Any, T]], +) -> Callable[Concatenate["Device", P], Awaitable[T]]: + @functools.wraps(fn) + async def wrapper(*a, **k): + device = a[0] + assert isinstance(device, Device) - # Tell pytest about the original fixtures. - @mark.usefixtures(*func_args) - def synced_fn(request: FixtureRequest, loop: asyncio.AbstractEventLoop): - fixture_values = [request.getfixturevalue(arg) for arg in func_args] - return loop.run_until_complete(fn(*fixture_values)) + async def raise_exception(): + await device._exception_event.wait() - return synced_fn + exc = device._exception + assert exc is not None + raise exc + async with asyncio.TaskGroup() as task_group: + # If this throws an exception, it will be raised and the whole group will be + # destroyed. + exception_task = task_group.create_task(raise_exception()) -def matches_sysex( - message: mido.Message, sysex_bytes: Union[List[int], Tuple[int, ...]] -): - message_attrs = message.dict() - if message_attrs["type"] != "sysex": - return False - data = message_attrs["data"] - # Strip the F0/F7 at the start/end of the byte list. - return all(x == y for x, y in zip(data, sysex_bytes[1:-1])) + # If this finishes before an exception is raised, we can cancel the + # exception task to tear down the group. + result = await task_group.create_task(fn(*a, **k)) + exception_task.cancel() + return result -def _cleanup_runnable(runnable: asyncio.Future): - try: - exception = runnable.exception() - if exception is not None: - raise exception - except asyncio.InvalidStateError: - # Task is unfinished, this is allowed. - pass - runnable.cancel() + return wrapper -@fixture -def loop(): - loop = asyncio.get_event_loop() - loop.set_debug(True) - return loop +# Device emulation with disconnect/reconnect functionality. +# +# This should be used within an async `with` statement to ensure that the message +# handler is properly started and cleaned up, and all interaction should occur within +# the same async loop. +class Device: + def __init__( + self, + # If provided, forward all incoming MIDI messages to these ports. Used for + # visual feedback on the connected hardware, if any. + relay_ports: Collection[mido.ports.BaseOutput] = [], + ): + self._ioport: Optional[mido.ports.BaseIOPort] = None + self._device_state: DeviceState = DeviceState() + self._relay_ports = relay_ports -# MIDI I/O controller. -@fixture -def ioport(): - port_name = "modeStep test" - with mido.open_ioport(port_name, virtual=True) as ioport: # type: ignore - yield ioport + # Message queues into which incoming messages should be placed. This includes + # the main queue used by `_process_messages` to handle internal updates and + # functionality, plus any active `incoming_messages` queues. + # + # `None` will be pushed when the device is torn down, i.e. no more messages can + # be received. + self.__queues: Dict[int, janus.Queue[mido.Message]] = {} + self.__queues_lock = Lock() + # Background task for processing incoming messages. + self.__process_messages_task: Optional[asyncio.Task] = None -# Cheap thrills - relay the test port to the physical device for visual feedback during -# tests, if available. -@fixture -def relay_port() -> Generator[Optional[mido.ports.BaseOutput], Never, None]: - port_name = "SoftStep Control Surface" - try: - with mido.open_output(port_name) as relay_port: # type: ignore - yield relay_port - except Exception: - yield None # No problem if we can't get it. + # Trackers for exceptions thrown while processing incoming messages. + self._exception_event = asyncio.Event() + self._exception: Optional[Exception] = None + + # Trackers for messages that aren't part of the main device state. + self.__identity_request_event = asyncio.Event() + self.__ping_event = asyncio.Event() + # Last update times by category, so we can detect whether the device is being + # actively updated. + self.__update_times: Dict[DeviceState.UpdateCategory, float] = {} + for category in DeviceState.UpdateCategory: + self.__update_times[category] = 0.0 -MessageQueue: TypeAlias = "asyncio.Queue[mido.Message]" + @asynccontextmanager + async def incoming_messages( + self, + ) -> AsyncGenerator[janus.AsyncQueue[mido.Message], Never]: + queue: janus.Queue[mido.Message] = janus.Queue() + queue_id: int + with self.__queues_lock: + # Get a unique ID for this queue. + queue_id = max([0, *self.__queues.keys()]) + 1 + + # Store the queue so that it will be populated by the incoming message + # handler. + self.__queues[queue_id] = queue + + try: + yield queue.async_q + finally: + # Remove the queue from future message handling. + with self.__queues_lock: + del self.__queues[queue_id] + + # Clean up background tasks. + await queue.aclose() + @property + def device_state(self) -> DeviceState: + return self._device_state -# Async queue for MIDI messages received from Live. -@fixture -def message_queue( - ioport: mido.ports.BaseInput, - loop: asyncio.AbstractEventLoop, - relay_port: Optional[mido.ports.BaseOutput], -) -> Generator[MessageQueue, Never, None]: - # The async queue isn't thread safe, so we need a wrapper. - threaded_queue: queue.Queue[mido.Message] = queue.Queue() - - def receive_all_messages(): - # Simply iterating over the ioport doesn't exit cleanly when the port is closed. - while not ioport.closed: - # Drain all unprocessed messages. + @property + def is_connected(self) -> bool: + return self._ioport is not None + + def connect(self): + if self._ioport is not None: + raise RuntimeError("Emulated device is already connected") + + port_name = "modeStep test" + self._ioport = mido.open_ioport( # type: ignore + port_name, virtual=True, callback=self.__on_message + ) + + def disconnect(self): + if self._ioport is None: + raise RuntimeError("Emulated device is not connected") + + self._ioport.close() + self._ioport = None + + self.reset() + + # Reset all state to unmanaged, and forget whether an identity request has been + # received. This allows waiting for the device to be re-initialized (for example + # when opening a new set), even if it hasn't been disconnected. + def reset(self): + self.device_state.reset() + self.__identity_request_event.clear() + + def send(self, message: mido.Message): + if self._ioport is None: + raise RuntimeError("Emulated device is not connected") + self._ioport.send(message) + + # Root-level MIDI message handler, invoked by mido in a separate thread. + def __on_message(self, message: mido.Message): + with self.__queues_lock: + for queue in self.__queues.values(): + # Thread-safe synchronous queue view. The main thread will receive these + # via the async view. + queue.sync_q.put(message) + + # Process all incoming messages (including across disconnects/reconnects), and send + # responses (e.g. to identity requests) as necessary. This is intended to run as a + # background task as long as the device is active. + async def _process_messages(self): + async with self.incoming_messages() as queue: while True: - message = ioport.poll() + message = await queue.get() + + # Exit once the device is cleaned up. if message is None: break - else: - threaded_queue.put(message) - time.sleep(POLL_INTERVAL) + try: + self._process_message(message) + except Exception as exc: + # Store the exception and notify anyone listening. This causes + # methods decorated with `guard_message_actions` to exit + # immediately. + self._exception = exc + self._exception_event.set() + + # Propagate up the chain. + raise exc + + # Process a single message internally: + # + # - update the device state + # - respond to identity requests + # - notify about ping responses + def _process_message(self, message: mido.Message): + # Forward to any relays for hardware visual feedback. + for relay_port in self._relay_ports: + relay_port.send(message) + + # Identity request sent by Live during startup (potentially more than once) + # and when MIDI ports change. + if matches_sysex(message, IDENTITY_REQUEST_SYSEX): + # Send a response immediately. + + # Greeting request flag gets set forever (until a disconnect) once the + # message has been received. + if not self.__identity_request_event.is_set(): + self.__identity_request_event.set() + + self.send( + mido.Message( + "sysex", + data=( + (0x7E, 0x7F, 0x06, 0x02) + + sysex.MANUFACTURER_ID_BYTES + + sysex.DEVICE_FAMILY_BYTES + ), + ) + ) - # Receive messages on the thread-safe queue in the background. - receive_all_messages_executor = loop.run_in_executor(None, receive_all_messages) + # Response to a ping request. This validates that we're actually connected with + # modeStep, and not a different control surface. + elif matches_sysex(message, sysex.SYSEX_PING_RESPONSE): + # Ping response flag gets cleared immediately after notifying any + # current listeners. + self.__ping_event.set() + self.__ping_event.clear() + + # Any other messages are expected to be device state updates. This will throw an + # error if the message is unrecognized or unexpected in the current state. + else: + update_category: DeviceState.UpdateCategory = ( + self._device_state.receive_message(message) + ) + self.__update_times[update_category] = time.time() - # Main async message queue. - message_queue: MessageQueue = asyncio.Queue() + @guard_message_exceptions + async def wait_for_identity_request(self): + await self.__identity_request_event.wait() - # Repeatedly poll the threaded queue and add items to the async queue. - async def poll_messages(): - while not ioport.closed: - while not threaded_queue.empty(): - message = threaded_queue.get_nowait() - assert message is not None - await message_queue.put(message) - if relay_port is not None: - relay_port.send(message) + @guard_message_exceptions + async def wait_for_ping_response(self): + await self.__ping_event.wait() - # Yield the async loop execution. + # Wait until no updates to the given state type(s) have been received for the given + # duration. Timing is imprecise (but should never give false positives), as this + # uses polling rather than proper async notifications. + @guard_message_exceptions + async def wait_until_stable( + self, + categories: Optional[Collection[DeviceState.UpdateCategory]] = None, + duration: float = STABILITY_DELAY, + ): + if categories is None: + categories = list(DeviceState.UpdateCategory) + + while True: + current_time = time.time() + needs_stability_since = current_time - duration + if all( + [ + self.__update_times[category] <= needs_stability_since + for category in categories + ] + ): + break await asyncio.sleep(POLL_INTERVAL) - poll_messages_task = loop.create_task(poll_messages()) - yield message_queue + @guard_message_exceptions + async def wait_for_initialization( + self, require_display: bool, timeout: float = 30.0 + ): + async with asyncio.timeout(timeout): + # Wait until the device is explicitly placed into standalone or hosted mode. + while any(t is None for t in self.device_state.standalone_toggles): + await asyncio.sleep(POLL_INTERVAL) + + if require_display: + # Wait until something gets rendered to the display. + while not (self.device_state.display_text or "").strip(): + await asyncio.sleep(POLL_INTERVAL) + + # Wait until the control surface starts responding to inputs (this can take a second + # or so if Live was just started). + received_pong = False + + async def send_pings(): + while not received_pong: + self.send( + mido.Message("sysex", data=sysex.SYSEX_PING_REQUEST[1:-1]) + ) + await asyncio.sleep(0.3) + + send_pings_task = asyncio.create_task(send_pings()) + await self.wait_for_ping_response() + received_pong = True - for runnable in (poll_messages_task, receive_all_messages_executor): - _cleanup_runnable(runnable) + # Throw any exceptions. + await send_pings_task + # Another brief delay to make sure the control surface is responsive (there are intermittent + # issues if we don't add this). + await asyncio.sleep(RESPONSIVENESS_DELAY) -# After setting up the MIDI message queue, create a device state emulator and start -# parsing incoming MIDI messages. + # Use this object within an asyn `with` context to run the message processor in the + # background. + async def __aenter__(self) -> Device: + if self.__process_messages_task is not None: + raise RuntimeError("Message processing task is already running") + self.__process_messages_task = asyncio.create_task(self._process_messages()) + return self + + async def __aexit__(self, *args): + if self.is_connected: + self.disconnect() + + if self.__process_messages_task is None: + raise RuntimeError("Message processing task is not running") + self.__process_messages_task.cancel() + + # Ensure the task has actually been cancelled to avoid errors on exit, see + # https://stackoverflow.com/questions/77974525/what-is-the-right-way-to-await-cancelling-an-asyncio-task. + try: + await self.__process_messages_task + except asyncio.CancelledError: + # We expect the task to be cancelled. Any other errors should be bubbled up. + pass + self.__process_messages_task = None + + # If we reached this point, we expect that the processing task finished + # successfully, i.e. no exceptions were thrown. + assert not self._exception_event.is_set() + + +# Convert an async step to sync. Adapted from +# https://github.com/pytest-dev/pytest-bdd/issues/223#issuecomment-1646969954. +# +# This only needs to be used with test steps (given, when, etc.). Async fixtures are +# handled automatically by pytest-asyncio. +def sync(fn: Callable[P, Coroutine[Any, Any, T]]) -> Callable[P, T]: + @functools.wraps(fn) + def wrapper(*args, **kwargs) -> T: + return asyncio.get_event_loop().run_until_complete(fn(*args, **kwargs)) + + return wrapper + + +def matches_sysex( + message: mido.Message, sysex_bytes: Union[List[int], Tuple[int, ...]] +): + message_attrs = message.dict() + if message_attrs["type"] != "sysex": + return False + data = message_attrs["data"] + # Strip the F0/F7 at the start/end of the byte list. + return all(x == y for x, y in zip(data, sysex_bytes[1:-1], strict=True)) + + +# Cheap thrills - relay the test port to the physical device for visual feedback during +# tests, if available. @fixture -def device_state( - message_queue: MessageQueue, - loop: asyncio.AbstractEventLoop, -) -> Generator[DeviceState, Never, None]: - device_state = DeviceState() +@typechecked +def relay_port() -> Generator[Optional[mido.ports.BaseOutput], None, None]: + port_name = "SoftStep Control Surface" + with ExitStack() as stack: + relay_port: Optional[mido.ports.BaseOutput] = None + try: + relay_port = stack.enter_context(mido.open_output(port_name)) # type: ignore + except Exception: + pass # No problem if we can't get it. + yield relay_port - async def receive_messages(): - while True: - message = await message_queue.get() - device_state.receive_message(message) - receive_messages_task = loop.create_task(receive_messages()) - yield device_state +@fixture +@typechecked +async def device( + relay_port: mido.ports.BaseOutput, +) -> AsyncGenerator[Device, None]: + async with Device(relay_ports=[relay_port]) as device: + yield device + - _cleanup_runnable(receive_messages_task) +# After setting up the MIDI message queue, create a device state emulator and start +# parsing incoming MIDI messages. +@fixture +@typechecked +def device_state( + device: Device, +) -> DeviceState: + return device.device_state # Wait to receive an identity request from Live, then send a response back. @fixture -@sync +@typechecked async def device_identified( - ioport: mido.ports.BaseOutput, - device_state: DeviceState, -): - await device_state.wait_for_identity_request() - ioport.send( - mido.Message( - "sysex", - # Identity response. - data=( - (0x7E, 0x7F, 0x06, 0x02) - + sysex.MANUFACTURER_ID_BYTES - + sysex.DEVICE_FAMILY_BYTES - ), - ) - ) + device: Device, +) -> bool: + await device.wait_for_identity_request() return True # Open a Live set by name. This will reload the control surface, resetting the mode and -# session ring position. Do this in a Background to get a clean control surface at the -# beginning of each scenario. +# session ring position. # # See the Live project in this directory (and the python helper there) for the available # sets. -@given(parsers.parse("the {set_name} set is open")) -def given_set_is_open(set_name): +def _open_live_set(set_name: str): dir = os.path.dirname(os.path.realpath(__file__)) set_name = os.path.normpath(set_name) assert "/" not in set_name # sanity check @@ -529,42 +888,6 @@ def given_set_is_open(set_name): webbrowser.open(f"file://{set_file}", autoraise=False) -# Wait for Live to send initial CC updates after the device has been identified. -@given("the SS2 is initialized") -@sync -async def given_control_surface_is_initialized( - device_identified: bool, - device_state: DeviceState, - loop: asyncio.AbstractEventLoop, - ioport: mido.ports.BaseOutput, -): - assert device_identified - - # Wait until something gets rendered to the display. - while not device_state.display_text.strip(): - await asyncio.sleep(POLL_INTERVAL) - - # Wait until the control surface starts reponding to inputs (this can take a second - # or so if Live was just started). - received_pong = False - - async def send_pings(): - while not received_pong: - ioport.send(mido.Message("sysex", data=sysex.SYSEX_PING_REQUEST[1:-1])) - await asyncio.sleep(0.3) - - send_pings_task = loop.create_task(send_pings()) - await device_state.wait_for_ping_response() - received_pong = True - - # Throw any exceptions. - await send_pings_task - - # Another brief delay to make sure the control surface is responsive (there are intermittent - # issues if we don't add this delay). - await asyncio.sleep(0.2) - - def _get_index_for_key(key_number: int): return (key_number - 1) % (hardware.NUM_ROWS * hardware.NUM_COLS) @@ -584,123 +907,216 @@ def _cc_message(control: int, value: int): return mido.Message("control_change", control=control, value=value) -async def action_hold(cc: int, port: mido.ports.BaseOutput): - port.send(_cc_message(cc, 1)) +async def action_hold(cc: int, device: Device): + device.send(_cc_message(cc, 1)) -async def action_release(cc: int, port: mido.ports.BaseOutput): - port.send(_cc_message(cc, 0)) +async def action_release(cc: int, device: Device): + device.send(_cc_message(cc, 0)) -async def action_press(cc: int, port: mido.ports.BaseOutput, duration): - await action_hold(cc, port) +async def action_press(cc: int, device: Device, duration: float): + await action_hold(cc, device) await asyncio.sleep(duration) - await action_release(cc, port) + await action_release(cc, device) # Generic action runner for statements like "When I press key 0", which all have some # common setup. -def cc_action( +async def cc_action( cc: int, action: str, - port: mido.ports.BaseOutput, - loop: asyncio.AbstractEventLoop, + device: Device, ): - handlers: Dict[str, Callable[[int, mido.ports.BaseOutput], Any]] = { + handlers: Dict[str, Callable[[int, Device], Awaitable[Any]]] = { "press": partial(action_press, duration=0.05), - "long-press": partial(action_press, duration=0.6), + "long-press": partial(action_press, duration=LONG_PRESS_DELAY), "hold": action_hold, "release": action_release, } if action not in handlers: raise ValueError(f"Unrecognized action: {action}") - loop.run_until_complete(handlers[action](cc, port)) + + await handlers[action](cc, device) # Wait for the device LED state to stabilize after pressing a key. -def stabilize_after_cc_action( - loop: asyncio.AbstractEventLoop, - device_state: DeviceState, - duration: float = STABILITY_DURATION, +async def stabilize_after_cc_action( + device: Device, + duration: float = STABILITY_DELAY, + initial_duration: float = MIDI_RESPONSE_DELAY, ): - # Add a pause before.. - loop.run_until_complete(asyncio.sleep(duration)) - # ...wait for stability... - loop.run_until_complete(device_state.wait_until_stable(duration=duration)) - # ...and a pause after for good measure. - loop.run_until_complete(asyncio.sleep(duration)) + # Add a pause beforehand, to allow CCs to start coming in. + await asyncio.sleep(initial_duration) + + # Wait until the device state has stabilized. + await device.wait_until_stable(duration=duration) + + # Add a pause after for good measure. + await asyncio.sleep(duration) + + +@given(parsers.parse("the SS2 is connected")) +@when(parsers.parse("I connect the SS2")) +@typechecked +def given_device_is_connected(device: Device): + device.connect() + + +@given(parsers.parse("the {set_name} set is open")) +@when(parsers.parse("I open the {set_name} set")) +@typechecked +def given_set_is_open(set_name: str): + _open_live_set(set_name) + + +# Wait for Live to send initial CC updates after the device has been identified. +@given("the SS2 is initialized") +@when("I wait for the SS2 to be initialized") +@sync +@typechecked +async def given_device_is_initialized( + device_identified: bool, + device: Device, +): + assert device_identified + await device.wait_for_initialization(require_display=True) + + +@given("the SS2 is initialized in standalone mode") +@when("I wait for the SS2 to be initialized in standalone mode") +@sync +@typechecked +async def given_device_is_initialized_in_standalone_mode( + device_identified: bool, + device: Device, +): + assert device_identified + await device.wait_for_initialization(require_display=False) + + +# No-op step. This allows for optional steps when using multiple `Examples`, e.g. by +# templatizing the step text. +@when(parsers.parse("I do nothing")) +@typechecked +def when_noop(): + pass + + +@when(parsers.parse("I disconnect the SS2")) +@typechecked +def when_disconnect(device: Device): + device.disconnect() + + +@when(parsers.parse("I forget the SS2's state")) +@typechecked +def when_forget_state(device: Device): + device.reset() + + +@when(parsers.parse("I allow stray interface updates until initialization")) +@typechecked +def when_allow_ccs_until_managed(device_state: DeviceState): + device_state.allow_ccs_until_managed() @when(parsers.parse("I {action} key {key_number:d}")) -def when_key_action( +@sync +@typechecked +async def when_key_action( key_number: int, action: str, - ioport: mido.ports.BaseOutput, - loop: asyncio.AbstractEventLoop, - device_state: DeviceState, + device: Device, ): cc = get_cc_for_key(key_number) - cc_action(cc, action, ioport, loop) - stabilize_after_cc_action(loop, device_state) + await cc_action(cc, action, device) + await stabilize_after_cc_action(device) # Take an action and don't wait for the device state to stabilize. Useful for short # invocations of the "hold" action in particular, to avoid potentially triggering a # long-press. @when(parsers.parse("I {action} key {key_number:d} without waiting")) -def when_key_action_without_waiting( +@sync +@typechecked +async def when_key_action_without_waiting( key_number: int, action: str, - ioport: mido.ports.BaseOutput, - loop: asyncio.AbstractEventLoop, + device: Device, ): cc = get_cc_for_key(key_number) - cc_action(cc, action, ioport, loop) + await cc_action(cc, action, device) + + +@when(parsers.parse("I {action} the standalone exit button")) +@sync +@typechecked +async def when_standalone_exit_action( + action: str, + device: Device, +): + await cc_action(STANDALONE_EXIT_CC, action, device) + await stabilize_after_cc_action(device) + + +@when(parsers.parse("I {action} the standalone exit button without waiting")) +@sync +@typechecked +async def when_standalone_exit_action_without_waiting( + action: str, + device: Device, +): + await cc_action(STANDALONE_EXIT_CC, action, device) @when(parsers.parse("I {action} key {key_number:d} {direction:w}")) -def when_directional_action( +@sync +@typechecked +async def when_directional_action( key_number: int, action: str, direction: str, - ioport: mido.ports.BaseOutput, - loop: asyncio.AbstractEventLoop, - device_state: DeviceState, + device: Device, ): cc = get_cc_for_key(key_number, direction=getattr(hardware.KeyDirection, direction)) - cc_action(cc, action, ioport, loop) - stabilize_after_cc_action(loop, device_state) + await cc_action(cc, action, device) + await stabilize_after_cc_action(device) @when(parsers.parse("I {action} nav {direction:w}")) -def when_nav_action( +@sync +@typechecked +async def when_nav_action( action: str, direction: str, - ioport: mido.ports.BaseOutput, - loop: asyncio.AbstractEventLoop, - device_state: DeviceState, + device: Device, ): cc = hardware.get_cc_for_nav(getattr(hardware.KeyDirection, direction)) - cc_action(cc, action, ioport, loop) - stabilize_after_cc_action(loop, device_state) + await cc_action(cc, action, device) + await stabilize_after_cc_action(device) @when(parsers.parse("I wait for the popup to clear")) @sync +@typechecked async def when_wait_for_popup(): await asyncio.sleep(0.8) @when(parsers.parse("I wait to trigger a long-press")) @sync +@typechecked async def when_wait_for_long_press(): - await asyncio.sleep(0.5) + await asyncio.sleep(LONG_PRESS_DELAY) # Parameters in the parser breaks `@sync` currently. @when(parsers.parse("I wait for {delay:f}s")) -def when_wait(delay, loop): - loop.run_until_complete(asyncio.sleep(delay)) +@sync +@typechecked +async def when_wait(delay: float): + await asyncio.sleep(delay) def get_color(key_number: int, device_state: DeviceState): @@ -744,46 +1160,72 @@ def assert_matches_color(key_number: int, color: str, device_state: DeviceState) @then(parsers.parse("light {key_number:d} should be {color}")) +@typechecked def should_be_color(key_number: int, color: str, device_state: DeviceState): assert_matches_color(key_number, color, device_state) @then(parsers.parse("lights {start:d}-{end:d} should be {color}")) +@typechecked def should_be_colors(start: int, end: int, color: str, device_state: DeviceState): for i in range(start, end + 1): assert_matches_color(i, color, device_state) @then(parsers.parse('the display should be "{text}"')) -def should_be_text(text: str, device_state: DeviceState): - assert device_state.display_text.rstrip() == text +@typechecked +def should_be_text( + text: str, + device_state: DeviceState, +): + assert device_state.display_text is not None + assert device_state.display_text.strip() == text + + +@then(parsers.parse('the display should be scrolling "{text}"')) +@typechecked +def should_be_scrolling_text(text: str, device_state: DeviceState): + assert device_state.display_text is not None + assert device_state.display_text in text @then(parsers.parse("the mode select screen should be active")) +@typechecked def should_be_mode_select(device_state: DeviceState): assert device_state.display_text in (" __ ", "__ ", "_ _", " __") @then("the backlight should be on") +@typechecked def should_be_backlight_on(device_state: DeviceState): assert device_state.backlight is True @then("the backlight should be off") +@typechecked def should_be_backlight_off(device_state: DeviceState): assert device_state.backlight is False @then("the backlight should be unmanaged") +@typechecked def should_be_backlight_unmanaged(device_state: DeviceState): assert device_state.backlight is None @then("the SS2 should be in standalone mode") +@typechecked def should_be_standalone_mode(device_state: DeviceState): assert all([t is True for t in device_state.standalone_toggles]) @then("the SS2 should be in hosted mode") +@typechecked def should_be_hosted_mode(device_state: DeviceState): assert all([t is False for t in device_state.standalone_toggles]) + + +@then(parsers.parse("standalone program {standalone_program:d} should be active")) +@typechecked +def should_be_standalone_program(standalone_program: int, device_state: DeviceState): + assert device_state.standalone_program == standalone_program diff --git a/tests/connection.feature b/tests/connection.feature new file mode 100644 index 0000000..febc201 --- /dev/null +++ b/tests/connection.feature @@ -0,0 +1,136 @@ +Feature: Device connection/disconnection events + Scenario Outline: Reconnecting the SS2 + Given the SS2 is connected + And the default set is open + And the SS2 is initialized + + Then the display should be "Trns" + + # Disconnect/reconnect and make sure the initial mode loads again within a short delay. + When I disconnect the SS2 + And I wait for s + And + And I connect the SS2 + And I wait for the SS2 to be initialized + + Then the display should be "Trns" + And light 5 should be solid red + And light 0 should be solid green + + # Switch to another mode and make sure the UI gets updated on reconnect. + When I press key 0 + Then the mode select screen should be active + + When I press key 7 + Then the display should be "Prss" + + When I disconnect the SS2 + And I wait for s + And + And I connect the SS2 + And I wait for the SS2 to be initialized + + Then the display should be "Prss" + And light 0 should be solid green + + # Switch to the mode select screen and make sure it gets reloaded on reconnect. + When I press key 0 + Then the mode select screen should be active + + When I disconnect the SS2 + And I wait for s + And + And I connect the SS2 + And I wait for the SS2 to be initialized + + Then the mode select screen should be active + And light 0 should be solid red + + # Make sure the mode select screen exits back to the correct place. + When I press key 0 + Then the display should be "Prss" + And light 0 should be solid green + + # Make sure the quick-switch history was preserved. + When I long-press key 0 + Then the display should be "Trns" + And light 0 should be solid green + And light 5 should be solid red + + # With a short delay, Live might send stray CCs after reconnect but before the device + # is re-initialized. With a longer delay, we expect that no stray CCs will be sent. + Examples: + | delay | before_connect_action | + | 0.1 | I allow stray interface updates until initialization | + | 4.0 | I do nothing | + + Scenario: Connecting the SS2 after a set is loaded + Given the default set is open + + When I wait for 5.0s + And I connect the SS2 + And I wait for the SS2 to be initialized + + Then the display should be "Trns" + And light 5 should be solid red + And light 0 should be solid green + + Scenario: Opening other sets + Given the SS2 is connected + And the default set is open + And the SS2 is initialized + + Then the display should be "Trns" + + # Switch the mode so we can make sure it gets reset when the set is reopened. + When I press key 0 + And I press key 8 + Then the display should be "Incr" + + # "Forget" the device state so that we can wait for it to be re-initialized. + When I forget the SS2's state + And I allow stray interface updates until initialization + And I open the default set + And I wait for the SS2 to be initialized + + Then the display should be "Trns" + + When I long-press key 0 + Then the mode select screen should be active + + # Open a different set with a different configuration, to make sure the + # configuration can change between sets without restarting Live. + When I forget the SS2's state + And I allow stray interface updates until initialization + And I open the alt_initial_mode set + And I wait for the SS2 to be initialized + + Then the display should be "Util" + + Scenario: Reconnecting the SS2 when a standalone mode is active + Given the SS2 is connected + And the standalone set is open + And the SS2 is initialized + + Then the SS2 should be in hosted mode + And the display should be "Trns" + + When I press key 0 + Then the mode select screen should be active + + When I press key 3 + Then the SS2 should be in standalone mode + And standalone program 0 should be active + + When I disconnect the SS2 + And I wait for 4.0s + And I connect the SS2 + And I wait for the SS2 to be initialized in standalone mode + + Then the SS2 should be in standalone mode + And standalone program 0 should be active + + When I long-press the standalone exit button + Then the SS2 should be in hosted mode + And the display should be "Trns" + And light 0 should be solid green \ No newline at end of file diff --git a/tests/modeStep_tests_project/create_set.py b/tests/modeStep_tests_project/create_set.py index 2920c42..297a3e1 100644 --- a/tests/modeStep_tests_project/create_set.py +++ b/tests/modeStep_tests_project/create_set.py @@ -25,6 +25,10 @@ def _asdict(self): configurations: Dict[str, Configuration] = { "default": Configuration(), + "alt_initial_mode": Configuration( + initial_mode="utility", + initial_last_mode="device_parameters_xy", + ), "backlight": Configuration(backlight=True), "overrides": Configuration( override_elements={ diff --git a/tests/overrides.feature b/tests/overrides.feature index 4d5f90d..f516d43 100644 --- a/tests/overrides.feature +++ b/tests/overrides.feature @@ -1,6 +1,7 @@ Feature: Overrides from config Background: - Given the overrides set is open + Given the SS2 is connected + And the overrides set is open And the SS2 is initialized Scenario: Element overrides diff --git a/tests/pytest.ini b/tests/pytest.ini new file mode 100644 index 0000000..61ea174 --- /dev/null +++ b/tests/pytest.ini @@ -0,0 +1,22 @@ +# This file needs to live in the tests directory (i.e. we can't use the +# pyproject.toml-style config) in order to set the pytest root to something other than +# the actual modeStep module directory. Otherwise, pytest attempts to import the +# modeStep module at the beginning of each test, which doesn't work because Live +# dependencies aren't available outside the Live runtime. +# +# Notes: the bad import started happening sometime after pytest@6. It happens during +# collector setup within `SetupState#setup` (called within `pytest_runtest_setup`), +# where apparently a `Package` collector gets created for the module +# directory. Customizing `pytest_collect_directory` doesn't help, since the import +# happens at runner time rather than at initialization/collection time. + +[pytest] +# Allow output. +addopts = "--capture=no" + +# pytest-bdd doesn't natively play nicely with asyncio (see +# https://github.com/pytest-dev/pytest-bdd/issues/223), but setting +# this to `auto` allows async fixtures (including those with `yield` +# statements) to be used transparently. +asyncio_mode=auto + diff --git a/tests/standalone_modes.feature b/tests/standalone_modes.feature index d4695b0..416c0c4 100644 --- a/tests/standalone_modes.feature +++ b/tests/standalone_modes.feature @@ -1,6 +1,12 @@ Feature: Standalone modes Background: - Given the standalone set is open + Given the SS2 is connected + And the standalone set is open + + Scenario: Setting the standalone background mode + Given the SS2 is initialized + Then the SS2 should be in hosted mode + And the standalone background program should be active Scenario: Switching into and out of standalone modes Given the SS2 is initialized @@ -19,7 +25,7 @@ Feature: Standalone modes And the SS2 should be in standalone mode # Go back to mode select. - When I hold the standalone exit button + When I hold the standalone exit button without waiting Then releasing the standalone exit button should enter hosted mode # Sanity check. And the SS2 should be in hosted mode @@ -35,7 +41,7 @@ Feature: Standalone modes And releasing key 0 should enter standalone program 1 # Quick-switch back to the main mode. - When I hold the standalone exit button + When I hold the standalone exit button without waiting And I wait to trigger a long-press Then releasing the standalone exit button should enter hosted mode And the display should be "Prss" @@ -50,7 +56,7 @@ Feature: Standalone modes And releasing key 5 should enter standalone program 2 # Go back and select the primary standalone mode. - When I hold the standalone exit button + When I hold the standalone exit button without waiting Then releasing the standalone exit button should enter hosted mode And the mode select screen should be active @@ -58,16 +64,16 @@ Feature: Standalone modes Then releasing key 5 should enter standalone program 1 # Switch directly between the standalone modes. - When I hold the standalone exit button + When I hold the standalone exit button without waiting And I wait to trigger a long-press Then releasing the standalone exit button should switch directly to standalone program 2 - When I hold the standalone exit button + When I hold the standalone exit button without waiting And I wait to trigger a long-press Then releasing the standalone exit button should switch directly to standalone program 1 # Go back to a hosted mode and quick-switch again. - When I hold the standalone exit button + When I hold the standalone exit button without waiting Then releasing the standalone exit button should enter hosted mode And the mode select screen should be active @@ -79,7 +85,7 @@ Feature: Standalone modes Then light 0 should be fast-blinking green And releasing key 0 should enter standalone program 1 - When I hold the standalone exit button + When I hold the standalone exit button without waiting And I wait to trigger a long-press Then releasing the standalone exit button should enter hosted mode And the display should be "Incr" @@ -90,7 +96,7 @@ Feature: Standalone modes And I wait to trigger a long-press Then releasing key 0 should enter standalone program 1 - When I hold the standalone exit button + When I hold the standalone exit button without waiting Then releasing the standalone exit button should enter hosted mode And the mode select screen should be active @@ -98,11 +104,11 @@ Feature: Standalone modes And I wait to trigger a long-press Then releasing key 5 should enter standalone program 2 - When I hold the standalone exit button + When I hold the standalone exit button without waiting And I wait to trigger a long-press Then releasing the standalone exit button should switch directly to standalone program 1 - When I hold the standalone exit button + When I hold the standalone exit button without waiting Then releasing the standalone exit button should enter hosted mode And the mode select screen should be active @@ -131,7 +137,7 @@ Feature: Standalone modes Then light 4 should be fast-blinking green And releasing key 4 should enter standalone program 3 - When I hold the standalone exit button + When I hold the standalone exit button without waiting Then releasing the standalone exit button should enter hosted mode And the mode select screen should be active @@ -144,6 +150,6 @@ Feature: Standalone modes When I hold key 3 without waiting Then releasing key 3 should enter standalone program 0 - When I hold the standalone exit button + When I hold the standalone exit button without waiting Then releasing the standalone exit button should enter hosted mode And the mode select screen should be active diff --git a/tests/test_configuration.py b/tests/test_configuration.py new file mode 100644 index 0000000..a5fd99d --- /dev/null +++ b/tests/test_configuration.py @@ -0,0 +1,3 @@ +from pytest_bdd import scenarios + +scenarios("configuration.feature") diff --git a/tests/test_connection.py b/tests/test_connection.py new file mode 100644 index 0000000..8faca0f --- /dev/null +++ b/tests/test_connection.py @@ -0,0 +1,3 @@ +from pytest_bdd import scenarios + +scenarios("connection.feature") diff --git a/tests/test_standalone_modes.py b/tests/test_standalone_modes.py index d1026e6..417d890 100644 --- a/tests/test_standalone_modes.py +++ b/tests/test_standalone_modes.py @@ -1,25 +1,27 @@ import asyncio -from contextlib import contextmanager -from typing import Collection, Generator, Tuple +from contextlib import asynccontextmanager +from typing import AsyncGenerator, Collection, Tuple, Union +import janus import mido from conftest import ( + STANDALONE_EXIT_CC, + Device, DeviceState, cc_action, get_cc_for_key, matches_sysex, stabilize_after_cc_action, + sync, sysex, ) -from pytest_bdd import parsers, scenarios, then, when -from typing_extensions import Never +from pytest_bdd import parsers, scenarios, then +from typeguard import typechecked +from typing_extensions import Never, TypeAlias # Standalone background program configured in the standalone Live set. BACKGROUND_PROGRAM = 10 -# Exit button in standalone modes. -STANDALONE_EXIT_CC = 80 - scenarios("standalone_modes.feature") @@ -34,25 +36,24 @@ def _dequeue_sysex( return queue -@contextmanager -def _message_queue( - device_state: DeviceState, -) -> Generator["asyncio.Queue[mido.Message]", Never, None]: - queue: asyncio.Queue[mido.Message] = asyncio.Queue() +MessageOrException: TypeAlias = Union[Tuple[mido.Message, None], Tuple[None, Exception]] - def on_message(message: mido.Message): - queue.put_nowait(message) - remove_listener = device_state.add_message_listener(on_message) - yield queue - remove_listener() +@asynccontextmanager +async def _message_queue( + device: Device, +) -> AsyncGenerator[ + janus.AsyncQueue[mido.Message], + Never, +]: + async with device.incoming_messages() as queue: + yield queue -@when("I hold the standalone exit button") -def when_hold_standalone_exit( - ioport: mido.ports.BaseOutput, loop: asyncio.AbstractEventLoop -): - cc_action(STANDALONE_EXIT_CC, "hold", port=ioport, loop=loop) +async def _get_next_message( + message_queue: janus.AsyncQueue[mido.Message], timeout: float = 5.0 +) -> mido.Message: + return await asyncio.wait_for(message_queue.get(), timeout=timeout) @then( @@ -60,12 +61,10 @@ def when_hold_standalone_exit( "releasing key {key_number:d} should enter standalone program {program:d}" ) ) -def should_enter_standalone_program( - key_number: int, - program: int, - loop: asyncio.AbstractEventLoop, - ioport: mido.ports.BaseOutput, - device_state: DeviceState, +@sync +@typechecked +async def should_enter_standalone_program( + key_number: int, program: int, device: Device, device_state: DeviceState ): # Make sure the device is fully out of standalone mode. assert all([t is False for t in device_state.standalone_toggles]) @@ -75,10 +74,11 @@ def should_enter_standalone_program( remaining_standalone_requests = sysex.SYSEX_STANDALONE_MODE_ON_REQUESTS received_main_program = False - with _message_queue(device_state) as queue: - cc_action(get_cc_for_key(key_number), "release", port=ioport, loop=loop) + async with _message_queue(device) as queue: + await cc_action(get_cc_for_key(key_number), "release", device) + num_initial_ccs: int = 0 while not received_main_program: - message = loop.run_until_complete(queue.get()) + message = await _get_next_message(queue) message_attrs = message.dict() if message_attrs["type"] == "sysex": @@ -98,23 +98,43 @@ def should_enter_standalone_program( else: raise RuntimeError(f"received unexpected program change: {message}") elif message.is_cc(): - # CCs can be received as long as the main standalone program hasn't been - # sent. - pass + # Allow up to a small number of initial CCs due to the interface being + # updated, since these can potentially get sent between the release CC + # message and the actual standalone mode activation. The worst case here + # appears to be 6 CCs in total (4 display CCs and 2 LED CCs). + assert ( + len(remaining_standalone_requests) + == len(sysex.SYSEX_STANDALONE_MODE_ON_REQUESTS) + ), f"Received CC message after beginning standalone transition: {message}" + + num_initial_ccs += 1 + assert ( + num_initial_ccs <= 6 + ), f"Got too many CCs before beginning standalone mode transition: {message}" + else: - raise RuntimeError(f"received unrecognized message: {message}") + raise RuntimeError(f"Received unrecognized message: {message}") # Sanity check, this should never fail given the business logic above. assert ( - queue.empty() - and all(device_state.standalone_toggles) + all(device_state.standalone_toggles) and len(remaining_standalone_requests) == 0 and received_main_program ) - # Wait a little while to make sure no CCs get sent. - loop.run_until_complete(asyncio.sleep(0.5)) - assert queue.empty() + # Wait a little while to make sure no additional CCs or other messages get sent. + await asyncio.sleep(0.5) + assert ( + queue.empty() + ), f"Received additional messages after standalone transition:\n{queue})" + + +@then("the standalone background program should be active") +@typechecked +def should_have_standalone_background_program_active(device_state: DeviceState): + assert ( + device_state.standalone_program == BACKGROUND_PROGRAM + ), f"Expected background program ({BACKGROUND_PROGRAM}) to be active, but got {device_state.standalone_program}" @then( @@ -122,41 +142,45 @@ def should_enter_standalone_program( "releasing the standalone exit button should switch directly to standalone program {program:d}" ) ) -def should_switch_directly_to_standalone_program( +@sync +@typechecked +async def should_switch_directly_to_standalone_program( program: int, + device: Device, device_state: DeviceState, - loop: asyncio.AbstractEventLoop, - ioport: mido.ports.BaseOutput, ): # Make sure we're currently in standalone mode. assert all(device_state.standalone_toggles) - with _message_queue(device_state) as queue: - cc_action(STANDALONE_EXIT_CC, "release", port=ioport, loop=loop) + async with _message_queue(device) as queue: + await cc_action(STANDALONE_EXIT_CC, "release", device) # Make sure we get one message for the program change. - message = loop.run_until_complete(queue.get()) + message = await _get_next_message(queue) message_attrs = message.dict() assert message_attrs["type"] == "program_change" assert message_attrs["program"] == program # Wait a little while, and make sure we haven't gotten any other messages. - loop.run_until_complete(asyncio.sleep(0.5)) - assert queue.empty() + await asyncio.sleep(0.5) + assert ( + queue.empty() + ), f"Received additional messages after direct standalone transition:\n{queue})" @then("releasing the standalone exit button should enter hosted mode") -def should_enter_hosted_mode( - loop: asyncio.AbstractEventLoop, - ioport: mido.ports.BaseOutput, +@sync +@typechecked +async def should_enter_hosted_mode( + device: Device, device_state: DeviceState, ): # Make sure we're fully in standalone mode to begin. assert all(device_state.standalone_toggles) - with _message_queue(device_state) as queue: - cc_action(STANDALONE_EXIT_CC, "release", port=ioport, loop=loop) - stabilize_after_cc_action(loop=loop, device_state=device_state) + async with _message_queue(device) as queue: + await cc_action(STANDALONE_EXIT_CC, "release", device) + await stabilize_after_cc_action(device) # First message needs to be the background program. - message = loop.run_until_complete(queue.get()) + message = await _get_next_message(queue) message_attrs = message.dict() assert ( message_attrs["type"] == "program_change" @@ -166,16 +190,26 @@ def should_enter_hosted_mode( # Now make sure we get the right sysex messages. remaining_hosted_requests = sysex.SYSEX_STANDALONE_MODE_OFF_REQUESTS while any(remaining_hosted_requests): - message = loop.run_until_complete(queue.get()) + message = await _get_next_message(queue) message_attrs = message.dict() - if message_attrs["type"] == "sysex": - remaining_hosted_requests = _dequeue_sysex( - message, remaining_hosted_requests - ) + assert ( + message_attrs["type"] == "sysex" + ), f"Got non-sysex message before switching out of standalone mode: {message}" + + remaining_hosted_requests = _dequeue_sysex( + message, remaining_hosted_requests + ) # Sanity checks. assert ( all([d is False for d in device_state.standalone_toggles]) and len(remaining_hosted_requests) == 0 ) + + # All additional messages to this point should be CCs. + while not queue.empty(): + message = queue.get_nowait() + assert ( + message.is_cc() + ), f"Got non-CC message after switching to hosted mode: {message}" diff --git a/tests/track_controls_modes.feature b/tests/track_controls_modes.feature index 15b94e1..5feacd0 100644 --- a/tests/track_controls_modes.feature +++ b/tests/track_controls_modes.feature @@ -1,6 +1,7 @@ Feature: Track controls modes Background: - Given the default set is open + Given the SS2 is connected + And the default set is open And the SS2 is initialized Scenario: Volume buttons @@ -326,14 +327,14 @@ Feature: Track controls modes # Go to the bottom controls and actually perform the delete. When I press key 1 - And I hold key 0 Then the display should be "Vol" - When I wait to trigger a long-press + When I hold key 0 + And I wait to trigger a long-press Then light 0 should be fast-blinking red # Wait for the delete. - When I wait for 1.1s + When I wait for 0.8s Then light 0 should be solid red And the display should be "DeL5" And light 5 should be off