diff --git a/.gitignore b/.gitignore index 69281e79..db7f7eef 100644 --- a/.gitignore +++ b/.gitignore @@ -132,3 +132,8 @@ dmypy.json # Pyre type checker .pyre/ +# Ubuntu +.Trash-*/ + +# binarys or dumps +*.bin diff --git a/README.md b/README.md index 6c8256dc..cac5c9ad 100644 --- a/README.md +++ b/README.md @@ -1,60 +1,120 @@ # joycontrol + +Branch: master->amiibo_edits + Emulate Nintendo Switch Controllers over Bluetooth. -Tested on Ubuntu 19.10, and with Raspberry Pi 3B+ and 4B Raspbian GNU/Linux 10 (buster) +Tested on Raspberry 4B Raspbian, should work on 3B+ too and anything that can do the setup. ## Features Emulation of JOYCON_R, JOYCON_L and PRO_CONTROLLER. Able to send: - button commands - stick state -- ~~nfc data~~ (removed, see [#80](https://github.com/mart1nro/joycontrol/issues/80)) +- nfc for amiibo read & owner registration ## Installation -- Install dependencies - -Ubuntu: Install the `dbus-python` and `libhidapi-hidraw0` packages +- Install dependencies + Raspbian: ```bash -sudo apt install python3-dbus libhidapi-hidraw0 +sudo apt install python3-dbus libhidapi-hidraw0 libbluetooth-dev bluez ``` - -Arch Linux Derivatives: Install the `hidapi` and `bluez-utils-compat`(AUR) packages - - -- Clone the repository and install the joycontrol package to get missing dependencies (Note: Controller script needs super user rights, so python packages must be installed as root). In the joycontrol folder run: + Python: (a setup.py is present but not yet up to date) + Note that pip here _has_ to be run as root, as otherwise the packages are not available to the root user. ```bash -sudo pip3 install . +sudo pip3 install aioconsole hid crc8 ``` -- Consider to disable the bluez "input" plugin, see [#8](https://github.com/mart1nro/joycontrol/issues/8) + If you are unsure if the packages are properly installed, try running `sudo python3` and import each using `import package_name`. + +- setup bluetooth + - [I shouldn't have to say this, but] make sure you have a working Bluetooth adapter\ + If you are running inside a VM, the PC might but not the VM. Check for a controller using `bluetoothctl show` or `bluetoothctl list`. Also a good indicator it the actual os reporting to not have bluetooth anymore. + - disable SDP [only necessary when pairing]\ + change the `ExecStart` parameter in `/lib/systemd/system/bluetooth.service` to `ExecStart=/usr/lib/bluetooth/bluetoothd -C -P sap,input,avrcp`.\ + This is to remove the additional reported features as the switch only looks for a controller.\ + This also breaks all other Bluetooth gadgets, as this also disabled the needed drivers. + - disable input plugin [experimental alternative to above when not pairing]\ + When not pairing, you can get away with only disabling the `input` plugin, only breaking bluetooth-input devices on your PC. Do so by changing `ExecStart` to `ExecStart=/usr/lib/bluetooth/bluetoothd -C -P input` instead. + - Restart bluetooth-deamon to apply the changes: + ```bash + sudo systemctl daemon-reload + sudo systemctl restart bluetooth.service + ``` + - see [Issue #4](https://github.com/Poohl/joycontrol/issues/4) if despite that the switch doesn't connect or disconnects randomly. ## Command line interface example -- Run the script +There is a simple CLI (`sudo python3 run_controller_cli.py`) provided with this app. Startup-options are: +``` +usage: run_controller_cli.py [-h] [-l LOG] [-d DEVICE_ID] + [--spi_flash SPI_FLASH] [-r RECONNECT_BT_ADDR] + [--nfc NFC] + controller + +positional arguments: + controller JOYCON_R, JOYCON_L or PRO_CONTROLLER + +optional arguments: + -h, --help show this help message and exit + -l LOG, --log LOG BT-communication logfile output + -d DEVICE_ID, --device_id DEVICE_ID + not fully working yet, the BT-adapter to use + --spi_flash SPI_FLASH + controller SPI-memory dump to use + -r RECONNECT_BT_ADDR, --reconnect_bt_addr RECONNECT_BT_ADDR + The Switch console Bluetooth address (or "auto" for + automatic detection), for reconnecting as an already + paired controller. + --nfc NFC amiibo dump placed on the controller. Equivalent to + the nfc command. + +``` + +To use the script: +- start it (this is a minimal example) ```bash sudo python3 run_controller_cli.py PRO_CONTROLLER ``` -This will create a PRO_CONTROLLER instance waiting for the Switch to connect. +- The cli does sanity checks on startup, you might get promps telling you they failed. Check the command-line options and your setup in this case. (Note: not the logging messages). You can however still try to proceed, sometimes it works despite the warnings. + +- Afterwards a PRO_CONTROLLER instance waiting for the Switch to connect is created. -- Open the "Change Grip/Order" menu of the Switch +- If you didn't pass the `-r` option, Open the "Change Grip/Order" menu of the Switch and wait for it to pair. -The Switch only pairs with new controllers in the "Change Grip/Order" menu. +- If you already connected the emulated controller once, you can use the reconnect option of the script (`-r `). Don't open the "Change Grip/Order" menu in this case, just make sure the switch is turned on. You can find out a paired mac address using the `bluetoothctl paired-devices` system command or pass `-r auto` as address for automatic detection. -Note: If you already connected an emulated controller once, you can use the reconnect option of the script (-r "\"). -This does not require the "Change Grip/Order" menu to be opened. You can find out a paired mac address using the "bluetoothctl" system command. +- After connecting, a command line interface is opened. + Note: Press \ if you don't see a prompt. -- After connecting, a command line interface is opened. Note: Press \ if you don't see a prompt. + Call "help" to see a list of available commands. -Call "help" to see a list of available commands. +## API -- If you call "test_buttons", the emulated controller automatically navigates to the "Test Controller Buttons" menu. +See the `run_controller_cli.py` for an example how to use the API. A minimal example: +```python +from joycontrol.protocol import controller_protocol_factory +from joycontrol.server import create_hid_server +from joycontrol.controller import Controller + +# the type of controller to create +controller = Controller.PRO_CONTROLLER # or JOYCON_L or JOYCON_R +# a callback to create the corresponding protocol once a connection is established +factory = controller_protocol_factory(controller) +# start the emulated controller +transport, protocol = await create_hid_server(factory) +# get a reference to the state beeing emulated. +controller_state = protocol.get_controller_state() +# wait for input to be accepted +await controller_state.connect() +# some sample input +controller_state.button_state.set_button('a', True) +# wait for it to be sent at least once +await controller_state.send() +``` ## Issues -- Some bluetooth adapters seem to cause disconnects for reasons unknown, try to use an usb adapter instead -- Incompatibility with Bluetooth "input" plugin requires a bluetooth restart, see [#8](https://github.com/mart1nro/joycontrol/issues/8) -- It seems like the Switch is slower processing incoming messages while in the "Change Grip/Order" menu. - This causes flooding of packets and makes pairing somewhat inconsistent. - Not sure yet what exactly a real controller does to prevent that. - A workaround is to use the reconnect option after a controller was paired once, so that - opening of the "Change Grip/Order" menu is not required. +- Some bluetooth adapters seem to cause disconnects for reasons unknown, try to use an usb adapter or a raspi instead. +- Incompatibility with Bluetooth "input" plugin requires it to be disabled (along with the others), see [Issue #8](https://github.com/mart1nro/joycontrol/issues/8) +- The reconnect doesn't ever connect, `bluetoothctl` shows the connection constantly turning on and off. This means the switch tries initial pairing, you have to unpair the switch and try without the `-r` option again. - ... ## Thanks @@ -66,3 +126,5 @@ Call "help" to see a list of available commands. [Nintendo_Switch_Reverse_Engineering](https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering) [console_pairing_session](https://github.com/timmeh87/switchnotes/blob/master/console_pairing_session) + +[Hardware Issues thread](https://github.com/Poohl/joycontrol/issues/4) diff --git a/joycontrol/controller_state.py b/joycontrol/controller_state.py index 000d5f86..e2750a45 100644 --- a/joycontrol/controller_state.py +++ b/joycontrol/controller_state.py @@ -69,7 +69,7 @@ async def connect(self): """ Waits until the switch is paired with the controller and accepts button commands """ - await self._protocol.sig_set_player_lights.wait() + await self._protocol.sig_input_ready.wait() class ButtonState: @@ -160,11 +160,13 @@ def getter(): self.zl, self.zl_is_set = button_method_factory('_byte_3', 7) def set_button(self, button, pushed=True): + button = button.lower() if button not in self._available_buttons: raise ValueError(f'Given button "{button}" is not available to {self.controller.device_name()}.') getattr(self, button)(pushed=pushed) def get_button(self, button): + button = button.lower() if button not in self._available_buttons: raise ValueError(f'Given button "{button}" is not available to {self.controller.device_name()}.') return getattr(self, f'{button}_is_set')() @@ -186,6 +188,9 @@ def __iter__(self): def clear(self): self._byte_1 = self._byte_2 = self._byte_3 = 0 + def __bytes__(self): + return bytes([self._byte_1, self._byte_2, self._byte_3]) + async def button_press(controller_state, *buttons): """ diff --git a/joycontrol/debug.py b/joycontrol/debug.py new file mode 100644 index 00000000..81ce6787 --- /dev/null +++ b/joycontrol/debug.py @@ -0,0 +1,13 @@ + +delay_override = False +delay = 1/15 + +async def debug(*args): + global delay_override + global delay + if len(args) > 0: + delay_override = True + delay = 1/float(args[0]) + +def get_delay(old): + return delay if delay_override else old diff --git a/joycontrol/device.py b/joycontrol/device.py index f92c0d6e..a8cd85ff 100644 --- a/joycontrol/device.py +++ b/joycontrol/device.py @@ -13,30 +13,73 @@ class HidDevice: def __init__(self, device_id=None): + self._device_id = device_id bus = dbus.SystemBus() - # Get Bluetooth adapter from dbus interface - manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager') - for path, ifaces in manager.GetManagedObjects().items(): + for path, ifaces in bus.get_object('org.bluez', '/').GetManagedObjects(dbus_interface='org.freedesktop.DBus.ObjectManager').items(): adapter_info = ifaces.get('org.bluez.Adapter1') - if adapter_info is None: - continue - elif device_id is None or device_id == adapter_info['Address'] or path.endswith(str(device_id)): - obj = bus.get_object('org.bluez', path) - self.adapter = dbus.Interface(obj, 'org.bluez.Adapter1') - self.address = adapter_info['Address'] - self._adapter_name = path.split('/')[-1] - - self.properties = dbus.Interface(self.adapter, 'org.freedesktop.DBus.Properties') + if adapter_info and (device_id is None or device_id == adapter_info['Address'] or path.endswith(str(device_id))): + self.dev = bus.get_object('org.bluez', path) break else: raise ValueError(f'Adapter {device_id} not found.') + self.adapter = dbus.Interface(self.dev, 'org.bluez.Adapter1') + # The sad news is someone decided that this convoluted mess passing + # strings back and forth to get properties would be simpler than literal + # adapter.some_property = 4 or adapter.some_property_set(4) + self.properties = dbus.Interface(self.dev, 'org.freedesktop.DBus.Properties') + self._adapter_name = self.dev.object_path.split("/")[-1] + def get_address(self) -> str: """ :returns adapter Bluetooth address """ - return self.address + return str(self.properties.Get(self.adapter.dbus_interface, "Address")) + + async def set_address(self, bt_addr, interactive=True): + if not interactive: + return False + # TODO: automated detection + print(f"Attempting to change the bluetooth MAC to {bt_addr}") + print("please choose your method:") + print("\t1: bdaddr - ericson, csr, TI, broadcom, zeevo, st") + print("\t2: hcitool - intel chipsets") + print("\t3: hcitool - cypress (raspberri pi 3B+ & 4B)") + print("\tx: abort, dont't change") + hci_version = " ".join(reversed(list(map(lambda h: '0x' + h, bt_addr.split(":"))))) + c = input() + if c == '1': + await utils.run_system_command(f'bdaddr -i {self._adapter_name} {bt_addr}') + elif c == '2': + await utils.run_system_command(f'hcitool cmd 0x3f 0x0031 {hci_version}') + elif c == '3': + await utils.run_system_command(f'hcitool cmd 0x3f 0x001 {hci_version}') + else: + return False + await utils.run_system_command("hciconfig hci0 reset") + await utils.run_system_command("systemctl restart bluetooth.service") + + # now we have to reget all dbus-shenanigans because we just restarted it's service. + self.__init__(self._device_id) + + if self.get_address() != bt_addr: + logger.info("Failed to set btaddr") + return False + else: + logger.info(f"Changed bt_addr to {bt_addr}") + return True + + def get_paired_switches(self): + switches = [] + for path, ifaces in dbus.SystemBus().get_object('org.bluez', '/').GetManagedObjects('org.freedesktop.DBus.ObjectManager', dbus_interface='org.freedesktop.DBus.ObjectManager').items(): + d = ifaces.get("org.bluez.Device1") + if d and d['Name'] == "Nintendo Switch": + switches += [path] + return switches + + def unpair_path(self, path): + self.adapter.RemoveDevice(path) def powered(self, boolean=True): self.properties.Set(self.adapter.dbus_interface, 'Powered', boolean) @@ -60,6 +103,8 @@ async def set_class(self, cls='0x002508'): """ logger.info(f'setting device class to {cls}...') await utils.run_system_command(f'hciconfig {self._adapter_name} class {cls}') + if self.properties.Get(self.adapter.dbus_interface, "Class") != int(cls, base=0): + logger.error(f"Could not set class to the required {cls}. Connecting probably won't work.") async def set_name(self, name: str): """ @@ -69,6 +114,9 @@ async def set_name(self, name: str): logger.info(f'setting device name to {name}...') self.properties.Set(self.adapter.dbus_interface, 'Alias', name) + def get_UUIDs(self): + return self.properties.Get(self.adapter.dbus_interface, "UUIDs") + @staticmethod def register_sdp_record(record_path): _uuid = str(uuid.uuid4()) @@ -85,4 +133,6 @@ def register_sdp_record(record_path): manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.ProfileManager1") manager.RegisterProfile(HID_PATH, _uuid, opts) - return _uuid + @staticmethod + def get_address_of_paired_path(path): + return str(dbus.SystemBus().get_object('org.bluez', path).Get('org.bluez.Device1', "Address", dbus_interface='org.freedesktop.DBus.Properties')) diff --git a/joycontrol/mcu.py b/joycontrol/mcu.py new file mode 100644 index 00000000..621183dc --- /dev/null +++ b/joycontrol/mcu.py @@ -0,0 +1,426 @@ +import enum +import logging +import crc8 +import traceback +import asyncio + +from joycontrol.controller_state import ControllerState +from joycontrol.nfc_tag import NFCTag + +logger = logging.getLogger(__name__) + + +############################################################### +## This simulates the MCU in the right joycon/Pro-Controller ## +############################################################### +# WARNING: THIS IS ONE GIANT RACE CONDITION, DON'T DO THINGS FAST +# No I won't fix this, I have had enough of this asyncio b***s*** +# DIY or STFU +# This is sufficient to read or write one amiibo when simulating +# a Pro-Controller +# multiple can mess up the internal state +# anything but amiboo is not supported +# TODO: +# - figure out the NFC-content transfer, currently everything is hardcoded to work wich amiibo +# see https://github.com/CTCaer/jc_toolkit/blob/5.2.0/jctool/jctool.cpp l2456ff for sugesstions +# - IR-camera +# - writing to Amiibo the proper way +# - verify right joycon +# - Figure out the UID index in write commands, currently the check is just uncommented... + +# These Values are used in set_power, set_config and get_status packets +# But not all of them can appear in every one +# see https://github.com/CTCaer/jc_toolkit/blob/5.2.0/jctool/jctool.cpp l 2359 for set_config +class MCUPowerState(enum.Enum): + SUSPENDED = 0x00 # set_power + READY = 0x01 # set_power, set_config, get_status + READY_UPDATE = 0x02 + CONFIGURED_NFC = 0x04 # set_config, get_status + # CONFIGURED_IR = 0x05 # TODO: support this + # CONFIGUERED_UPDATE = 0x06 + +SET_POWER_VALUES = ( + MCUPowerState.SUSPENDED.value, + MCUPowerState.READY.value, + # MCUPowerState.READY_UPDATE.value, +) + +SET_CONFIG_VALUES = ( + MCUPowerState.READY.value, + MCUPowerState.CONFIGURED_NFC.value, + # MCUPowerState.CONFIGURED_IR.value, +) + +GET_STATUS_VALUES = ( + MCUPowerState.READY.value, + # MCUPowerState.READY_UPDATE.value, + MCUPowerState.CONFIGURED_NFC.value, + # MCUPowerState.CONFIGURED_IR.value +) + + +def MCU_crc(data): + if not isinstance(data, bytes): + data = bytes(data) + my_hash = crc8.crc8() + my_hash.update(data) + # At this point I'm not even mad this works... + return my_hash.digest()[0] + + +class NFC_state(enum.Enum): + NONE = 0x00 + POLL = 0x01 + PENDING_READ = 0x02 + WRITING = 0x03 + AWAITING_WRITE = 0x04 + PROCESSING_WRITE = 0x05 + POLL_AGAIN = 0x09 + + +def pack_message(*args, background=0, checksum=MCU_crc, length=313): + """ + convinience function that packes + * hex strings + * byte lists + * integer lists + * Enums + * integers + into a 313 bytes long MCU response + """ + data = bytearray([background] * length) + cur = 0 + for arg in args: + if isinstance(arg, str): + arg = bytes.fromhex(arg) + elif isinstance(arg, int): + arg = bytes([arg]) + elif isinstance(arg, enum.Enum): + arg = bytes([arg.value]) + else: + arg = bytes(arg) + arg_len = len(arg) + if arg_len + cur > length: + logger.warning("MCU: too long message packed") + data[cur:cur + arg_len] = arg + cur += arg_len + if checksum: + data[-1] = checksum(data[0:-1]) + return data + + +class MicroControllerUnit: + def __init__(self, controller: ControllerState): + + self.power_state = MCUPowerState.SUSPENDED + + # NOT USED + # Just a store for the remaining configuration data + self.configuration = None + + self.nfc_state = NFC_state.NONE + # Counter for testing state transitions + self.nfc_counter = 0 + + self._last_poll_uid = None + + # after writing we need to "remove" the amiibo + # sending the detection of a bunch of zeros works + # (pretty sure it just panics the switch) + self.remove_amiibo = NFCTag(data=bytes(540)) + # weather or not the next n POLLs without a tag should assume the remove_amiibo + self._pending_active_remove = 0 + + # remove the tag from the controller after a successful read + # NOT IMPLEMENTED self.remove_nfc_after_read = False + # remove the tag from the controller after a successful write, the switch demands this + self.remove_nfc_after_write = True + + # controllerstate to look for nfc-data + self._controller = controller + + # long messages are split, this keeps track of in and outgoing transfers + self.seq_no = 0 + self.ack_seq_no = 0 + # self.expect_data = False + self.received_data = [] + + # We are getting 0x11 commands to do something, but cannot answer directly + # responses have to be passed in regular input reports + # If there was no command, this is the default report + self.no_response = pack_message(0xff) + self.response_queue = [] + # to prevent the queue from becoming too laggy, limit it's size + # the length after which we start dropping packets + self.max_response_queue_len = 4 + + def _flush_response_queue(self): + self.response_queue = [] + + def _queue_response(self, resp): + if len(self.response_queue) < self.max_response_queue_len: + self.response_queue.append(resp) + else: + logger.warning("Full queue, dropped outgoing MCU packet") + + # used to queue messages that cannot be dropped, as we don't have any kind of resend-mechanism + def _force_queue_response(self, resp): + self.response_queue.append(resp) + if len(self.response_queue) > self.max_response_queue_len: + logger.warning("Forced response queue") + + def set_remove_nfc_after_read(self, value): + pass + # self.remove_nfc_after_read = value + + def _get_status_data(self, args=None): + """ + create a status packet to be used when responding to 1101 commands (outside NFC-mode) + """ + if self.power_state == MCUPowerState.SUSPENDED: + logger.warning("MCU: status request when disabled") + return self.no_response + elif self.power_state.value in GET_STATUS_VALUES: + return pack_message("0100000008001b", self.power_state) + + def _get_nfc_status_data(self, args): + """ + Generate a NFC-Status report to be sent back to switch + This is 40% of all logic in this file, as all we do in NFC-mode is send these responses + but some (the read/write-tag ones) are hardcoded somewhere else + @param args: not used + @return: the status-message + """ + self.nfc_counter -= 1 + nfc_tag = self._controller.get_nfc() + + # after a write to get out of the screen report the empty amiibo + if self.nfc_state in (NFC_state.POLL, NFC_state.POLL_AGAIN) and (self.remove_nfc_after_write or not nfc_tag) and self._pending_active_remove > 0: + nfc_tag = self.remove_amiibo + self._pending_active_remove -= 1 + + if self.nfc_state == NFC_state.PROCESSING_WRITE and self.nfc_counter <= 0: + self.nfc_state = NFC_state.NONE + elif self.nfc_state == NFC_state.POLL: + if nfc_tag and nfc_tag.getUID() == self._last_poll_uid: + self.nfc_state = NFC_state.POLL_AGAIN + else: + self._last_poll_uid = nfc_tag.getUID() if nfc_tag else None + elif self.nfc_state == NFC_state.POLL_AGAIN: + if not nfc_tag or nfc_tag.getUID() != self._last_poll_uid: + self.nfc_state = NFC_state.POLL + self._last_poll_uid = nfc_tag.getUID() if nfc_tag else None + + if nfc_tag and self.nfc_state != NFC_state.NONE: + # states POLL, POLL_AGAIN, AWAITING_WRITE, WRITING, PROCESSING_WRITE can include the UID + out = pack_message("2a0005", self.seq_no, self.ack_seq_no, "0931", self.nfc_state, + "0000000101020007", nfc_tag.getUID()) + else: # seqno and ackseqno should be 0 if we're not doing anything fancy + out = pack_message("2a000500000931", self.nfc_state) + + return out + + async def process_nfc_write(self, command): + """ + After all data regarding the write to the tag has been received, this function actually applies the changed + @param command: the entire write request + @return: None + """ + logger.info("MCU: processing nfc write") + nfc_tag: NFCTag = self._controller.get_nfc() + if not nfc_tag: + logger.error("nfc_tag is none, couldn't write") + return + if command[1] != 0x07: # panic wrong UUID length + logger.error(f"UID length is {command[1]} (not 7), aborting") + return + if bytes(command[2:9]) != nfc_tag.getUID(): + logger.error(f"self.nfc_tag.uid and target uid aren't equal, are {bytes(nfc_tag.getUID()).hex()} and {bytes(command[2:9]).hex()}") + # return # wrong UUID, won't write to wrong UUID + if nfc_tag.is_mutable(): + nfc_tag.create_backup() + else: + nfc_tag.set_mutable(True) + + # write write-lock + nfc_tag.data[16:20] = command[13:17] + + i = 22 + while i + 1 < len(command): + addr = command[i] * 4 + leng = command[i + 1] + data = command[i + 2:i + 2 + leng] + if addr == 0 or leng == 0: + break + nfc_tag.write(addr, data) + i += 2 + leng + + # remove write lock + nfc_tag.data[16:20] = command[17:21] + nfc_tag.save() + return + + def handle_nfc_subcommand(self, com, data): + """ + This generates responses for NFC commands and thus implements the entire + NFC-behaviour + @param com: the NFC-command (not the 0x02, the byte after that) + @param data: the remaining data + """ + if com == 0x04: # status / response request + # the switch spams this up to 8 times a frame, there is no way to respond to all + self._queue_response(self._get_nfc_status_data(data)) + elif com == 0x01: # start polling, should we queue a nfc_status? + logger.debug("MCU-NFC: start polling") + self.nfc_state = NFC_state.POLL + elif com == 0x06: # read/write + logger.debug(f"MCU-NFC Read/write {data[6:13]}") + nfc_tag = self._controller.get_nfc() + if nfc_tag: + # python usually doesn't care about data-types but a list is not an array.... how I hate this crap + if bytes(data[6:13]) == bytes(7): # This is the UID, 0 means read anything + logger.info("MCU-NFC: reading Tag...") + self._flush_response_queue() + # Data is sent in 2 packages plus a trailer + # the first one contains a lot of fixed header and the UUID + # the length and packetnumber is in there somewhere, see + # https://github.com/CTCaer/jc_toolkit/blob/5.2.0/jctool/jctool.cpp line 2523ff + self._force_queue_response(pack_message( + "3a0007010001310200000001020007", + nfc_tag.getUID(), + "000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000", + nfc_tag.data[0:245] + )) + # the second one is mostely data, followed by presumably anything (zeroes work) + self._force_queue_response(pack_message( + "3a000702000927", + nfc_tag.data[245:540] + )) + # the trailer includes the UUID again + # this is not actually the trailer, joycons send both packets again but with the first bytes in the + # first one 0x2a then this again. But it seems to work without? + self._force_queue_response(pack_message( + "2a000500000931040000000101020007", + nfc_tag.getUID() + )) + # elif bytes(data[6:13]) == nfc_tag.getUID(): # we should check the UID + else: # the UID is nonzero, so I a write to that tag follows + logger.info(f"MCU-NFC: setup writing tag {data[6:13]}") + self._force_queue_response(pack_message( + "3a0007010008400200000001020007", nfc_tag.getUID(), # standard header for write, some bytes differ from read + "00000000fdb0c0a434c9bf31690030aaef56444b0f602627366d5a281adc697fde0d6cbc010303000000000000f110ffee" + # any guesses are welcome. The end seems like something generic, a magic number? + )) + self.received_data = [] + self.nfc_state = NFC_state.AWAITING_WRITE + else: + logger.error("had no NFC tag when read/write was initiated") + # elif com == 0x00: # cancel eveyhting -> exit mode? + elif com == 0x02: # stop polling, respond? + logger.debug("MCU-NFC: stop polling...") + self.nfc_state = NFC_state.NONE + self._last_poll_uid = None + elif com == 0x08: # write NTAG + if data[0] == 0 and data[2] == 0x08: # never seen, single packet as entire payload + asyncio.ensure_future(self.process_nfc_write(data[4: 4 + data[3]])) + logger.warning("MCU-NFC write valid but WTF") + return + if data[0] <= self.ack_seq_no: # we already saw this one + logger.info(f"MCU-NFC write packet repeat {data[0]}") + pass + elif data[0] == 1 + self.ack_seq_no: # next packet in sequence + self.received_data += data[4: 4 + data[3]] + self.ack_seq_no += 1 + logger.debug(f"MCU-NFC write packet {self.ack_seq_no}") + else: # panic we missed/skipped something + logger.warning(f"MCU-NFC write unexpected packet, expected {self.ack_seq_no} got {data[0]}, aborting.") + self.ack_seq_no = 0 + self.nfc_state = NFC_state.WRITING + self._force_queue_response(self._get_nfc_status_data(data)) + if data[2] == 0x08: # end of sequence + self.ack_seq_no = 0 + self.nfc_state = NFC_state.PROCESSING_WRITE + self.nfc_counter = 4 + self._pending_active_remove = 4 # Dunno, anything > 2 works most likely + asyncio.ensure_future(self.process_nfc_write(self.received_data)) + else: + logger.error("unhandled NFC subcommand", com, data) + + # I don't actually know if we are supposed to change the MCU-data based on + # regular subcommands, but the switch is spamming the status-requests anyway, + # so sending responses seems to not hurt + + # protocoll-callback + def entered_31_input_mode(self): + self._flush_response_queue() + self.power_state = MCUPowerState.READY + self._queue_response(self._get_status_data()) + + # protocoll-callback + def set_power_state_cmd(self, power_state): + logger.debug(f"MCU: Set power state cmd {power_state}") + self._flush_response_queue() + if power_state in SET_POWER_VALUES: + self.power_state = MCUPowerState(power_state) + else: + logger.error(f"not implemented power state {power_state}") + self.power_state = MCUPowerState.READY + self._queue_response(self._get_status_data()) + + # protocoll-callback + def set_config_cmd(self, config): + if self.power_state == MCUPowerState.SUSPENDED: + if config[2] == 0: + # the switch does this during initial setup, presumably to disable + # any MCU weirdness + pass + else: + logger.warning("Set MCU Config not in READY mode") + elif self.power_state == MCUPowerState.READY: + if config[2] in SET_CONFIG_VALUES: + self.power_state = MCUPowerState(config[2]) + self.configuration = config + logger.debug(f"MCU Set configuration {self.power_state} {self.configuration}") + else: + self.power_state = MCUPowerState.READY + logger.error(f"Not implemented configuration written {config[2]} {config}") + if self.power_state == MCUPowerState.CONFIGURED_NFC: + # reset all nfc-related parts + self.nfc_state = NFC_state.NONE + self.nfc_counter = 0 + self.seq_no = 0 + self.ack_seq_no = 0 + self._queue_response(self._get_status_data()) + + # protocoll-callback + def received_11(self, subcommand, subcommanddata): + """ + This function handles all 0x11 output-reports. + @param subcommand: the subcommand as integer + @param subcommanddata: the remaining data + @return: None + """ + if subcommand == 0x01: + # status request + self._queue_response(self._get_status_data(subcommanddata)) + elif subcommand == 0x02: + # NFC command + if self.power_state != MCUPowerState.CONFIGURED_NFC: + logger.warning(f"NFC command ({subcommand} {subcommanddata}) outside NFC mode, ignoring") + else: + self.handle_nfc_subcommand(subcommanddata[0], subcommanddata[1:]) + else: + logger.error(f"unknown 0x11 subcommand {subcommand} {subcommanddata}") + + # protocoll hook + def get_data(self): + """ + The function returning what is to write into mcu data of tha outgoing 0x31 packet + + usually it is some queued response + @return: the data + """ + if len(self.response_queue) > 0: + return self.response_queue.pop(0) + else: + return self.no_response diff --git a/joycontrol/my_semaphore.py b/joycontrol/my_semaphore.py new file mode 100644 index 00000000..08ee1b11 --- /dev/null +++ b/joycontrol/my_semaphore.py @@ -0,0 +1,87 @@ +import asyncio + +class _Request: + def __init__(self, value, loop): + self.value = value + self.future = loop.create_future() + + +class MySemaphore(asyncio.Semaphore): + """ + An implementation of the asyncio-Semaphore with a few more features. + Most this code is copied from the original CPython implementation. + """ + def __init__(self, value): + super().__init__(value) + self._value = value + self._waiters = [] # Normal people would use an actual Queue. The standard queues are shit + self._aquired = 0 + + def _check_next(self): + while self._waiters and (self._waiters[0].future.done() or self._value >= self._waiters[0].value): + r = self._waiters.pop(0) + if not r.future.done(): + r.future.set_result(None) + return + + async def acquire(self, count=1): + if count < 0: + raise ValueError("Semaphore acquire with count < 0") + while self._value < count: + r = _Request(count, self._loop) + self._waiters.append(r) + try: + await r.future + except: + r.future.cancel() + # original has an if here, we wont take the call anymore, call the next one + self._check_next() + raise + self._aquired += count + self._value -= count + self._check_next() + return True + + def reduce(self, value): + self._value -= value + + def increase(self, value): + self._value += value + self._check_next() + + def get_value(self): + return self._value + + def get_aquired(self): + return self._aquired + + def release(self, count=1): + if count < 0: + raise ValueError("Semaphore release with 0 < count") + self._value += count + self._aquired -= count + self._check_next() + +class MyBoundedSemaphore(MySemaphore): + """ + Äquivalent to asyncio.BoundedSemaphore, + also with more features + """ + def __init__(self, limit=1, value=None): + super().__init__(value if not value is None else limit) + self._limit = limit + + def get_limit(self): + return self._limit + + def set_limit(self, value): + self._limit = value + self._value = min(self._value, self._limit) + + def release(self, count=1, best_effort=False): + if self._value + count > self._limit: + if best_effort: + count = self._limit - self._value + else: + raise ValueError('BoundedSemaphore released too many times') + super().release(count) diff --git a/joycontrol/nfc_tag.py b/joycontrol/nfc_tag.py new file mode 100644 index 00000000..fca001f6 --- /dev/null +++ b/joycontrol/nfc_tag.py @@ -0,0 +1,104 @@ +import enum +from os import path as ph + +import logging + + +logger = logging.getLogger(__name__) + +unnamed_saves = 0 + +def get_savepath(hint='/tmp/amiibo'): + global unnamed_saves + unnamed_saves += 1 + if hint.endswith('.bin'): + hint = hint[:-4] + while True: + path = hint + '_' + str(unnamed_saves) + '.bin' + if not ph.exists(path): + break + unnamed_saves += 1 + return path + + +unnamed_backups = 0 + +def get_backuppath(hint='/tmp/amiibo.bin'): + global unnamed_backups + unnamed_backups += 1 + while True: + path = hint + '.bak' + str(unnamed_backups) + if not ph.exists(path): + break + unnamed_backups += 1 + return path + + +class NFCTagType(enum.Enum): + AMIIBO = enum.auto + + +class NFCTag: + """ + Class that represents a (Amiibo) NFC-Tag usually backed by a file. If needed files are generated. + """ + def __init__(self, data, tag_type: NFCTagType = NFCTagType.AMIIBO, mutable=False, source=None): + self.data: bytearray = bytearray(data) + self.tag_type: NFCTagType = tag_type + self.mutable: bool = mutable + self.source: str = source + if self.tag_type == NFCTagType.AMIIBO: + if len(self.data) == 540: + pass + elif len(self.data) == 572: + logger.info("Long amiibo loaded, manufacturer signature is ignored") + else: + logger.warning("Illegal Amiibo tag size") + + @classmethod + def load_amiibo(cls, source): + # if someone want to make this async have fun + with open(source, "rb") as reader: + return NFCTag(data=bytearray(reader.read()), tag_type=NFCTagType.AMIIBO, source=source) + + def create_backup(self): + """ + copy the file backing this Tag + """ + path = get_backuppath(self.source) + logger.info(f"creating amiibo backup at {path}") + with open(path, "wb") as writer: + writer.write(self.data) + + def set_mutable(self, mutable=True): + """ + By default tags are marked immutable to prevent corruption. To make them mutable create a backup first. + @param mutable: + @return: + """ + if mutable > self.mutable: + self.create_backup() + self.mutable = mutable + + def save(self): + if not self.source: + self.source = get_savepath() + with open(self.source, "wb") as writer: + writer.write(self.data) + logger.info(f"Saved altered amiibo as {self.source}") + + def getUID(self): + return self.data[0:3] + self.data[4:8] + + def is_mutable(self): + return self.mutable + + def write(self, idx, data): + if idx > len(self.data) or idx+len(data) > len(self.data): + logger.error(f"I Fucking hate pyhton {idx}, {bytes(data).hex()} {len(data)}") + if not self.mutable: + logger.warning("Ignored amiibo write to non-mutable amiibo") + else: + + self.data[idx:idx + len(data)] = data + diff --git a/joycontrol/protocol.py b/joycontrol/protocol.py index f83a9d9a..19758c03 100644 --- a/joycontrol/protocol.py +++ b/joycontrol/protocol.py @@ -2,71 +2,112 @@ import logging import time from asyncio import BaseTransport, BaseProtocol -from contextlib import suppress from typing import Optional, Union, Tuple, Text +import math -from joycontrol import utils +import enum +import joycontrol.debug as debug +import socket +import joycontrol.utils as utils +import struct from joycontrol.controller import Controller from joycontrol.controller_state import ControllerState from joycontrol.memory import FlashMemory from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID from joycontrol.transport import NotConnectedError +from joycontrol.mcu import MicroControllerUnit logger = logging.getLogger(__name__) -def controller_protocol_factory(controller: Controller, spi_flash=None): +def controller_protocol_factory(controller: Controller, spi_flash=None, reconnect = False): if isinstance(spi_flash, bytes): spi_flash = FlashMemory(spi_flash_memory_data=spi_flash) def create_controller_protocol(): - return ControllerProtocol(controller, spi_flash=spi_flash) + return ControllerProtocol(controller, spi_flash=spi_flash, reconnect = reconnect) return create_controller_protocol +class SwitchState(enum.Enum): + STANDARD = enum.auto, + GRIP_MENU = enum.auto, + AWAITING_MAX_SLOTS = enum.auto + +close_pairing_menu_map = { + Controller.JOYCON_R: ['x', 'a', 'home'], + Controller.JOYCON_L: ['down', 'left'], + Controller.PRO_CONTROLLER: ['a', 'b', 'home'] +} + +close_pairing_masks = { + Controller.JOYCON_R: int.from_bytes(bytes([0x2 | 0x8, 0x10, 0]), "big"), + Controller.JOYCON_L: int.from_bytes(bytes([0, 0, 0x1 | 0x8]), "big"), + Controller.PRO_CONTROLLER: int.from_bytes(bytes([0x4 | 0x8, 0x10, 0]), "big") +} class ControllerProtocol(BaseProtocol): - def __init__(self, controller: Controller, spi_flash: FlashMemory = None): + def __init__(self, controller: Controller, spi_flash: FlashMemory = None, reconnect = False): self.controller = controller self.spi_flash = spi_flash - self.transport = None - # Increases for each input report send, should overflow at 0x100 - self._input_report_timer = 0x00 - - self._data_received = asyncio.Event() + # time when the timer started. + self._input_report_timer_start = None self._controller_state = ControllerState(self, controller, spi_flash=spi_flash) self._controller_state_sender = None + self._writer_thread = None - # None = Just answer to sub commands - self._input_report_mode = None + self._mcu = MicroControllerUnit(self._controller_state) - # This event gets triggered once the Switch assigns a player number to the controller and accepts user inputs - self.sig_set_player_lights = asyncio.Event() + self._is_pairing = not reconnect - async def send_controller_state(self): - """ - Waits for the controller state to be send. + # input mode + self.delay_map = { + None: math.inf, # subcommands only + 0x3F: 1.0, + 0x21: math.inf, # shouldn't happen + 0x30: 1/60, # this needs revising, but 120 seems too fast + # 0x30: 1/120 if self.controller == Controller.PRO_CONTROLLER else 1/60, + 0x31: 1/60 + } + self._input_report_wakeup = asyncio.Event() + self._set_mode(None) - Raises NotConnected exception if the transport is not connected or the connection was lost. - """ - # TODO: Call write directly if in continuously sending input report mode + # "Pausing"-mechanism. + self._not_paused = asyncio.Event() + self._not_paused.set() - if self.transport is None: - raise NotConnectedError('Transport not registered.') + self.sig_input_ready = asyncio.Event() + self.sig_data_received = asyncio.Event() - self._controller_state.sig_is_send.clear() +# INTERNAL - # wrap into a future to be able to set an exception in case of a disconnect - self._controller_state_sender = asyncio.ensure_future(self._controller_state.sig_is_send.wait()) - await self._controller_state_sender - self._controller_state_sender = None + def _set_mode(self, mode, delay=None): + + if mode == 0x21: + logger.error("Shouldn't go into subcommand mode") + + self._input_report_mode = mode + if delay: + self.send_delay = self.delay_map[mode] + elif self._is_pairing: + self.send_delay = 1/15 + elif mode in self.delay_map: + self.send_delay = self.delay_map[mode] + else: + logger.warning(f"Unknown delay for mode {mode}, assuming 1/15") + self.send_delay = 1/15 + + if mode in [0x30, 0x31, 0x32, 0x33]: + # sig input ready, writer + pass + + self._input_report_wakeup.set() - async def write(self, input_report: InputReport): + async def _write(self, input_report): """ - Sets timer byte and current button state in the input report and sends it. Fires sig_is_send event in the controller state afterwards. Raises NotConnected exception if the transport is not connected or the connection was lost. @@ -74,39 +115,147 @@ async def write(self, input_report: InputReport): if self.transport is None: raise NotConnectedError('Transport not registered.') - # set button and stick data of input report - input_report.set_button_status(self._controller_state.button_state) - if self._controller_state.l_stick_state is None: - l_stick = [0x00, 0x00, 0x00] - else: - l_stick = self._controller_state.l_stick_state - if self._controller_state.r_stick_state is None: - r_stick = [0x00, 0x00, 0x00] - else: - r_stick = self._controller_state.r_stick_state - input_report.set_stick_status(l_stick, r_stick) + if self._is_pairing and (int.from_bytes(input_report.data[4:7], "big") & close_pairing_masks[self.controller]): + # this is a bit too early, but so far no + logger.info('left change Grip/Order menu') + self._is_pairing = False + self._set_mode(self._input_report_mode) - # set timer byte of input report - input_report.set_timer(self._input_report_timer) - self._input_report_timer = (self._input_report_timer + 1) % 0x100 + if not self._not_paused.is_set(): + logger.warning("Write while paused") await self.transport.write(input_report) self._controller_state.sig_is_send.set() - def get_controller_state(self) -> ControllerState: - return self._controller_state + def _generate_input_report(self, mode=None): + input_report = InputReport() + if not mode: + mode = self._input_report_mode - async def wait_for_output_report(self): + if not mode: + raise ValueError("cannot generate Report without Mode") + + input_report.set_input_report_id(mode) + if mode == 0x3F: + input_report.data[1:3] = [0x28, 0xca, 0x08] + input_report.data[4:11] = [0x40,0x8A, 0x4F, 0x8A, 0xD0, 0x7E, 0xDF, 0x7F] + else: + if self._input_report_timer_start: + input_report.set_timer(round((time.time() - self._input_report_timer_start) / 0.005) % 0x100) + else: + input_report.set_timer(0) + input_report.set_misc() + input_report.set_button_status(self._controller_state.button_state) + input_report.set_stick_status(self._controller_state.l_stick_state, self._controller_state.r_stick_state) + input_report.set_vibrator_input() + if mode == 0x21: + pass # subcommand is set outside + elif mode in [0x30, 0x31, 0x32, 0x33]: + input_report.set_6axis_data() + + if mode == 0x31: + input_report.set_ir_nfc_data(self._mcu.get_data()) + return input_report + + async def _writer(self): """ - Waits until an output report from the Switch is received. + This continuously sends input reports to the switch. + This relies on the asyncio scheduler to sneak the additional + subcommand-replies in """ - self._data_received.clear() - await self._data_received.wait() + logger.info("writer started") + while self.transport: + await self._not_paused.wait() + last_send_time = time.time() + input_report = self._generate_input_report() + try: + await self._write(input_report) + except: + break + # calculate delay + self.send_delay = debug.get_delay(self.send_delay) #debug hook + active_time = time.time() - last_send_time + sleep_time = self.send_delay - active_time + if sleep_time < 0: + logger.warning(f'Code is running {abs(sleep_time)} s too slow!') + sleep_time = 0 + + try: + await asyncio.wait_for(self._input_report_wakeup.wait(), timeout=sleep_time) + self._input_report_wakeup.clear() + except asyncio.TimeoutError as err: + pass + + logger.warning("Writer exited...") + return None + + async def _reply_to_sub_command(self, report): + # classify sub command + try: + sub_command = report.get_sub_command() + except NotImplementedError as err: + logger.warning(err) + return False + + if sub_command is None: + raise ValueError('Received output report does not contain a sub command') + + logging.info(f'received Sub command {sub_command}') + + sub_command_data = report.get_sub_command_data() + assert sub_command_data is not None + + response_report = self._generate_input_report(mode=0x21) + + try: + # answer to sub command + if sub_command == SubCommand.REQUEST_DEVICE_INFO: + await self._command_request_device_info(response_report, sub_command_data) + + elif sub_command == SubCommand.SET_SHIPMENT_STATE: + await self._command_set_shipment_state(response_report, sub_command_data) + + elif sub_command == SubCommand.SPI_FLASH_READ: + await self._command_spi_flash_read(response_report, sub_command_data) + + elif sub_command == SubCommand.SET_INPUT_REPORT_MODE: + await self._command_set_input_report_mode(response_report, sub_command_data) + + elif sub_command == SubCommand.TRIGGER_BUTTONS_ELAPSED_TIME: + await self._command_trigger_buttons_elapsed_time(response_report, sub_command_data) + + elif sub_command == SubCommand.ENABLE_6AXIS_SENSOR: + await self._command_enable_6axis_sensor(response_report, sub_command_data) + + elif sub_command == SubCommand.ENABLE_VIBRATION: + await self._command_enable_vibration(response_report, sub_command_data) + + elif sub_command == SubCommand.SET_NFC_IR_MCU_CONFIG: + await self._command_set_nfc_ir_mcu_config(response_report, sub_command_data) + + elif sub_command == SubCommand.SET_NFC_IR_MCU_STATE: + await self._command_set_nfc_ir_mcu_state(response_report, sub_command_data) + + elif sub_command == SubCommand.SET_PLAYER_LIGHTS: + await self._command_set_player_lights(response_report, sub_command_data) + else: + logger.warning(f'Sub command 0x{sub_command.value:02x} not implemented - ignoring') + return False + + await self._write(response_report) + + except NotImplementedError as err: + logger.error(f'Failed to answer {sub_command} - {err}') + return False + return True + +# transport hooks def connection_made(self, transport: BaseTransport) -> None: logger.debug('Connection established.') self.transport = transport + self._input_report_timer_start = time.time() def connection_lost(self, exc: Optional[Exception] = None) -> None: if self.transport is not None: @@ -121,95 +270,8 @@ def error_received(self, exc: Exception) -> None: # TODO? raise NotImplementedError() - async def input_report_mode_full(self): - """ - Continuously sends: - 0x30 input reports containing the controller state OR - 0x31 input reports containing the controller state and nfc data - """ - if self.transport.is_reading(): - raise ValueError('Transport must be paused in full input report mode') - - # send state at 66Hz - send_delay = 0.015 - await asyncio.sleep(send_delay) - last_send_time = time.time() - - input_report = InputReport() - input_report.set_vibrator_input() - input_report.set_misc() - if self._input_report_mode is None: - raise ValueError('Input report mode is not set.') - input_report.set_input_report_id(self._input_report_mode) - - reader = asyncio.ensure_future(self.transport.read()) - - try: - while True: - reply_send = False - if reader.done(): - data = await reader - - reader = asyncio.ensure_future(self.transport.read()) - - try: - report = OutputReport(list(data)) - output_report_id = report.get_output_report_id() - - if output_report_id == OutputReportID.RUMBLE_ONLY: - # TODO - pass - elif output_report_id == OutputReportID.SUB_COMMAND: - reply_send = await self._reply_to_sub_command(report) - elif output_report_id == OutputReportID.REQUEST_IR_NFC_MCU: - # TODO NFC - raise NotImplementedError('NFC communictation is not implemented.') - else: - logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE') - except ValueError as v_err: - logger.warning(f'Report parsing error "{v_err}" - IGNORE') - except NotImplementedError as err: - logger.warning(err) - - if reply_send: - # Hack: Adding a delay here to avoid flooding during pairing - await asyncio.sleep(0.3) - else: - # write 0x30 input report. - # TODO: set some sensor data - input_report.set_6axis_data() - - # TODO NFC - set nfc data - if input_report.get_input_report_id() == 0x31: - pass - - await self.write(input_report) - - # calculate delay - current_time = time.time() - time_delta = time.time() - last_send_time - sleep_time = send_delay - time_delta - last_send_time = current_time - - if sleep_time < 0: - # logger.warning(f'Code is running {abs(sleep_time)} s too slow!') - sleep_time = 0 - - await asyncio.sleep(sleep_time) - - except NotConnectedError as err: - # Stop 0x30 input report mode if disconnected. - logger.error(err) - finally: - # cleanup - self._input_report_mode = None - # cancel the reader - with suppress(asyncio.CancelledError, NotConnectedError): - if reader.cancel(): - await reader - async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) -> None: - self._data_received.set() + self.sig_data_received.set() try: report = OutputReport(list(data)) @@ -225,70 +287,60 @@ async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) if output_report_id == OutputReportID.SUB_COMMAND: await self._reply_to_sub_command(report) - # elif output_report_id == OutputReportID.RUMBLE_ONLY: - # pass + elif output_report_id == OutputReportID.RUMBLE_ONLY: + # TODO Rumble + pass + elif output_report_id == OutputReportID.REQUEST_IR_NFC_MCU: + self._mcu.received_11(report.data[11], report.get_sub_command_data()) + pass else: logger.warning(f'Output report {output_report_id} not implemented - ignoring') - async def _reply_to_sub_command(self, report): - # classify sub command - try: - sub_command = report.get_sub_command() - except NotImplementedError as err: - logger.warning(err) - return False - - if sub_command is None: - raise ValueError('Received output report does not contain a sub command') - logging.info(f'received output report - Sub command {sub_command}') +# event lisnter hooks - sub_command_data = report.get_sub_command_data() - assert sub_command_data is not None + async def send_controller_state(self): + """ + Waits for the controller state to be send. - try: - # answer to sub command - if sub_command == SubCommand.REQUEST_DEVICE_INFO: - await self._command_request_device_info(sub_command_data) + Raises NotConnected exception if the transport is not connected or the connection was lost. + """ + # TODO: Call write directly if in continuously sending input report mode - elif sub_command == SubCommand.SET_SHIPMENT_STATE: - await self._command_set_shipment_state(sub_command_data) + if self.transport is None: + raise NotConnectedError('Transport not registered.') - elif sub_command == SubCommand.SPI_FLASH_READ: - await self._command_spi_flash_read(sub_command_data) + if not self._not_paused.is_set(): + await self._write(self._generate_input_report()) + else: + self._controller_state.sig_is_send.clear() - elif sub_command == SubCommand.SET_INPUT_REPORT_MODE: - await self._command_set_input_report_mode(sub_command_data) + # wrap into a future to be able to set an exception in case of a disconnect + self._controller_state_sender = asyncio.ensure_future(self._controller_state.sig_is_send.wait()) + await self._controller_state_sender + self._controller_state_sender = None - elif sub_command == SubCommand.TRIGGER_BUTTONS_ELAPSED_TIME: - await self._command_trigger_buttons_elapsed_time(sub_command_data) + async def wait_for_output_report(self): + """ + Waits until an output report from the Switch is received. + """ + self.sig_data_received.clear() + await self.sig_data_received.wait() - elif sub_command == SubCommand.ENABLE_6AXIS_SENSOR: - await self._command_enable_6axis_sensor(sub_command_data) + def pause(self): + logger.info("paused") + self._not_paused.clear() - elif sub_command == SubCommand.ENABLE_VIBRATION: - await self._command_enable_vibration(sub_command_data) + def unpause(self): + logger.info("unpaused") + self._not_paused.set() - elif sub_command == SubCommand.SET_NFC_IR_MCU_CONFIG: - await self._command_set_nfc_ir_mcu_config(sub_command_data) - - elif sub_command == SubCommand.SET_NFC_IR_MCU_STATE: - await self._command_set_nfc_ir_mcu_state(sub_command_data) + def get_controller_state(self) -> ControllerState: + return self._controller_state - elif sub_command == SubCommand.SET_PLAYER_LIGHTS: - await self._command_set_player_lights(sub_command_data) - else: - logger.warning(f'Sub command 0x{sub_command.value:02x} not implemented - ignoring') - return False - except NotImplementedError as err: - logger.error(f'Failed to answer {sub_command} - {err}') - return False - return True +# subcommands - async def _command_request_device_info(self, sub_command_data): - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() + async def _command_request_device_info(self, input_report, sub_command_data): address = self.transport.get_extra_info('sockname') assert address is not None @@ -297,27 +349,18 @@ async def _command_request_device_info(self, sub_command_data): input_report.set_ack(0x82) input_report.sub_0x02_device_info(bd_address, controller=self.controller) - await self.write(input_report) - - async def _command_set_shipment_state(self, sub_command_data): - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() + return input_report + async def _command_set_shipment_state(self, input_report, sub_command_data): input_report.set_ack(0x80) input_report.reply_to_subcommand_id(0x08) + return input_report - await self.write(input_report) - - async def _command_spi_flash_read(self, sub_command_data): + async def _command_spi_flash_read(self, input_report, sub_command_data): """ Replies with 0x21 input report containing requested data from the flash memory. :param sub_command_data: input report sub command data bytes """ - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() - input_report.set_ack(0x90) # parse offset @@ -336,52 +379,22 @@ async def _command_spi_flash_read(self, sub_command_data): spi_flash_data = size * [0x00] input_report.sub_0x10_spi_flash_read(offset, size, spi_flash_data) - await self.write(input_report) + return input_report - async def _command_set_input_report_mode(self, sub_command_data): + async def _command_set_input_report_mode(self, input_report, sub_command_data): if self._input_report_mode == sub_command_data[0]: logger.warning(f'Already in input report mode {sub_command_data[0]} - ignoring request') - # Start input report reader - if sub_command_data[0] in (0x30, 0x31): - new_reader = asyncio.ensure_future(self.input_report_mode_full()) - else: - logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request') - return - - # Replace the currently running reader with the input report mode sender, - # which will also handle incoming requests in the future - - self.transport.pause_reading() - - # We need to replace the reader in the future because this function was probably called by it - async def set_reader(): - await self.transport.set_reader(new_reader) - - logger.info(f'Setting input report mode to {hex(sub_command_data[0])}...') - self._input_report_mode = sub_command_data[0] - - self.transport.resume_reading() - - asyncio.ensure_future(set_reader()).add_done_callback( - utils.create_error_check_callback() - ) + self._set_mode(sub_command_data[0]) # Send acknowledgement - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() input_report.set_ack(0x80) input_report.reply_to_subcommand_id(0x03) - await self.write(input_report) - - async def _command_trigger_buttons_elapsed_time(self, sub_command_data): - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() + return input_report + async def _command_trigger_buttons_elapsed_time(self, input_report, sub_command_data): input_report.set_ack(0x83) input_report.reply_to_subcommand_id(SubCommand.TRIGGER_BUTTONS_ELAPSED_TIME) # Hack: We assume this command is only used during pairing - Set values so the Switch assigns a player number @@ -393,33 +406,22 @@ async def _command_trigger_buttons_elapsed_time(self, sub_command_data): else: raise NotImplementedError(self.controller) - await self.write(input_report) - - async def _command_enable_6axis_sensor(self, sub_command_data): - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() + return input_report + async def _command_enable_6axis_sensor(self, input_report, sub_command_data): input_report.set_ack(0x80) input_report.reply_to_subcommand_id(0x40) - await self.write(input_report) - - async def _command_enable_vibration(self, sub_command_data): - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() + return input_report + async def _command_enable_vibration(self, input_report, sub_command_data): input_report.set_ack(0x80) input_report.reply_to_subcommand_id(SubCommand.ENABLE_VIBRATION.value) - await self.write(input_report) + return input_report - async def _command_set_nfc_ir_mcu_config(self, sub_command_data): - # TODO NFC - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() + async def _command_set_nfc_ir_mcu_config(self, input_report, sub_command_data): + self._mcu.set_config_cmd(sub_command_data) input_report.set_ack(0xA0) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value) @@ -428,13 +430,10 @@ async def _command_set_nfc_ir_mcu_config(self, sub_command_data): for i in range(len(data)): input_report.data[16 + i] = data[i] - await self.write(input_report) + return input_report - async def _command_set_nfc_ir_mcu_state(self, sub_command_data): - # TODO NFC - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() + async def _command_set_nfc_ir_mcu_state(self, input_report, sub_command_data): + self._mcu.set_power_state_cmd(sub_command_data[0]) if sub_command_data[0] == 0x01: # 0x01 = Resume @@ -447,17 +446,12 @@ async def _command_set_nfc_ir_mcu_state(self, sub_command_data): else: raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} ' f'not implemented.') + return input_report - await self.write(input_report) - - async def _command_set_player_lights(self, sub_command_data): - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() - + async def _command_set_player_lights(self, input_report, sub_command_data): input_report.set_ack(0x80) input_report.reply_to_subcommand_id(SubCommand.SET_PLAYER_LIGHTS.value) - await self.write(input_report) - - self.sig_set_player_lights.set() + self._writer_thread = utils.start_asyncio_thread(self._writer()) + self.sig_input_ready.set() + return input_report diff --git a/joycontrol/report.py b/joycontrol/report.py index 32182459..fad0fdaa 100644 --- a/joycontrol/report.py +++ b/joycontrol/report.py @@ -10,7 +10,7 @@ class InputReport: """ def __init__(self, data=None): if not data: - self.data = [0x00] * 364 + self.data = [0x00] * 363 # all input reports are prepended with 0xA1 self.data[0] = 0xA1 else: @@ -66,8 +66,8 @@ def set_stick_status(self, left_stick, right_stick): """ Sets the joystick status bytes """ - self.set_left_analog_stick(bytes(left_stick)) - self.set_right_analog_stick(bytes(right_stick)) + self.set_left_analog_stick(bytes(left_stick) if left_stick else bytes(3)) + self.set_right_analog_stick(bytes(right_stick) if right_stick else bytes(3)) def set_left_analog_stick(self, left_stick_bytes): """ @@ -113,12 +113,11 @@ def set_6axis_data(self): self.data[i] = 0x00 def set_ir_nfc_data(self, data): - if 50 + len(data) > len(self.data): - raise ValueError('Too much data.') - - # write to data - for i in range(len(data)): - self.data[50 + i] = data[i] + if len(data) > 313: + raise ValueError(f'Too much data {len(data)} > 313.') + elif len(data) != 313: + print("warning : too short mcu data") + self.data[50:50+len(data)] = data def reply_to_subcommand_id(self, _id): if isinstance(_id, SubCommand): diff --git a/joycontrol/server.py b/joycontrol/server.py index eb3e19a4..8ba5e323 100644 --- a/joycontrol/server.py +++ b/joycontrol/server.py @@ -20,9 +20,8 @@ async def _send_empty_input_reports(transport): await transport.write(report) await asyncio.sleep(1) - async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id=None, reconnect_bt_addr=None, - capture_file=None): + capture_file=None, interactive=False): """ :param protocol_factory: Factory function returning a ControllerProtocol instance :param ctl_psm: hid control channel port @@ -37,23 +36,46 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id= Otherwise, the function assumes an initial pairing with the console was already done and reconnects to the provided Bluetooth address. :param capture_file: opened file to log incoming and outgoing messages + :param interactive: whether or not questions to the user via input and print are allowed :returns transport for input reports and protocol which handles incoming output reports """ protocol = protocol_factory() + hid = HidDevice(device_id=device_id) + + bt_addr = hid.get_address() + #if bt_addr[:8] != "94:58:CB": + # await hid.set_address("94:58:CB" + bt_addr[8:], interactive=interactive) + # bt_addr = hid.get_address() + if reconnect_bt_addr is None: + if interactive: + if len(hid.get_UUIDs()) > 3: + print("too many SPD-records active, Switch might refuse connection.") + print("try modifieing /lib/systemd/system/bluetooth.service and see") + print("https://github.com/Poohl/joycontrol/issues/4 if it doesn't work") + for sw in hid.get_paired_switches(): + print(f"Warning: a switch ({sw}) was found paired, do you want to unpair it?") + i = input("y/n [y]: ") + if i == '' or i == 'y' or i == 'Y': + hid.unpair_path(sw) + else: + if len(hid.get_UUIDs()) > 3: + logger.warning("detected too many SDP-records. Switch might refuse connection.") + b = hid.get_paired_switches() + if b: + logger.warning(f"Attempting initial pairing, but switches are paired: {b}") + ctl_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) itr_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) ctl_sock.setblocking(False) itr_sock.setblocking(False) ctl_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) itr_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - try: - hid = HidDevice(device_id=device_id) - ctl_sock.bind((hid.address, ctl_psm)) - itr_sock.bind((hid.address, itr_psm)) + try: + ctl_sock.bind((bt_addr, ctl_psm)) + itr_sock.bind((bt_addr, itr_psm)) except OSError as err: logger.warning(err) # If the ports are already taken, this probably means that the bluez "input" plugin is enabled. @@ -65,18 +87,17 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id= logger.info('Restarting bluetooth service...') await utils.run_system_command('systemctl restart bluetooth.service') await asyncio.sleep(1) - hid = HidDevice(device_id=device_id) - ctl_sock.bind((socket.BDADDR_ANY, ctl_psm)) - itr_sock.bind((socket.BDADDR_ANY, itr_psm)) + ctl_sock.bind((bt_addr, ctl_psm)) + itr_sock.bind((bt_addr, itr_psm)) ctl_sock.listen(1) itr_sock.listen(1) hid.powered(True) hid.pairable(True) - + # setting bluetooth adapter name to the device we wish to emulate await hid.set_name(protocol.controller.device_name()) @@ -87,12 +108,12 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id= # Already registered (If multiple controllers are being emulated and this method is called consecutive times) logger.debug(dbus_err) - # set the device class to "Gamepad/joystick" - await hid.set_class() - # start advertising hid.discoverable() + # set the device class to "Gamepad/joystick" + await hid.set_class() + logger.info('Waiting for Switch to connect... Please open the "Change Grip/Order" menu.') loop = asyncio.get_event_loop() @@ -107,6 +128,32 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id= hid.pairable(False) else: + if reconnect_bt_addr.lower() == 'auto': + paths = hid.get_paired_switches() + path = "" + if not paths: + logger.fatal("couldn't find paired switch to reconnect to, terminating...") + exit(1) + elif len(paths) > 1: + if interactive: + print("found the following paired switches, please choose one:") + for i, p in paths.items(): + print(f" {i}: {p}") + choice = input(f"number 1 - {len(paths)} [1]:") + if not choice: + path = paths[0] + else: + path = paths[int(choice)-1] + else: + path = paths[0] + logger.warning(f"Automatic reconnect address chose {path} out of {paths}") + else: + path = paths[0] + logger.info(f"auto detected paired switch {path}") + reconnect_bt_addr = hid.get_address_of_paired_path(path) + else: + # Todo: figure out if we're actually paired + pass # Reconnection to reconnect_bt_addr client_ctl = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) client_itr = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) @@ -115,6 +162,11 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id= client_ctl.setblocking(False) client_itr.setblocking(False) + # I have spent 8 hours, one stackoverflow question and read pythons socket sourcecode + # to find tis fucking option somewhere in a GNUC API description. (here: https://www.gnu.org/software/libc/manual/html_node/Socket_002dLevel-Options.html) + # FUCK LINUX OPEN SOURCE. I'd rather have a DOCUMENTATION than the source of this garbage. + client_ctl.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0) + client_itr.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0) # create transport for the established connection and activate the HID protocol transport = L2CAP_Transport(asyncio.get_event_loop(), protocol, client_itr, client_ctl, 50, capture_file=capture_file) protocol.connection_made(transport) diff --git a/joycontrol/transport.py b/joycontrol/transport.py index f2ff62df..8f224563 100644 --- a/joycontrol/transport.py +++ b/joycontrol/transport.py @@ -2,9 +2,13 @@ import logging import struct import time +import socket +import math from typing import Any +from contextlib import suppress from joycontrol import utils +from joycontrol.my_semaphore import MyBoundedSemaphore logger = logging.getLogger(__name__) @@ -12,9 +16,8 @@ class NotConnectedError(ConnectionResetError): pass - class L2CAP_Transport(asyncio.Transport): - def __init__(self, loop, protocol, itr_sock, ctr_sock, read_buffer_size, capture_file=None) -> None: + def __init__(self, loop, protocol, itr_sock, ctr_sock, read_buffer_size, capture_file=None, flow_control = 4) -> None: super(L2CAP_Transport, self).__init__() self._loop = loop @@ -23,7 +26,7 @@ def __init__(self, loop, protocol, itr_sock, ctr_sock, read_buffer_size, capture self._itr_sock = itr_sock self._ctr_sock = ctr_sock - self._read_buffer_size = read_buffer_size + self._capture_file = capture_file self._extra_info = { 'peername': self._itr_sock.getpeername(), @@ -32,61 +35,47 @@ def __init__(self, loop, protocol, itr_sock, ctr_sock, read_buffer_size, capture } self._is_closing = False - self._is_reading = asyncio.Event() - self._capture_file = capture_file + # writing control + self._write_lock = asyncio.Event() + self._write_lock.set() + self._write_lock_thread = utils.start_asyncio_thread(self._write_lock_monitor(), ignore=asyncio.CancelledError) + self._write_window = MyBoundedSemaphore(flow_control) + self._write_window_thread = utils.start_asyncio_thread(self._write_window_monitor(), ignore=asyncio.CancelledError) - # start underlying reader - self._read_thread = None + # reading control + self._read_buffer_size = read_buffer_size + self._is_reading = asyncio.Event() self._is_reading.set() - self.start_reader() + self._read_thread = utils.start_asyncio_thread(self._reader(), ignore=asyncio.CancelledError) + + async def _write_window_monitor(self): + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as hci: + hci.bind((0,)) + hci.setblocking(False) + # 0x04 = HCI_EVT; 0x13 = Number of completed packets + hci.setsockopt(socket.SOL_HCI, socket.HCI_FILTER, struct.pack("IIIh2x", 1 << 0x04, (1 << 0x13), 0, 0)) + + while True: + data = await self._loop.sock_recv(hci, 10) + self._write_window.release(data[6] + data[7] * 0x100, best_effort=True) + + async def _write_lock_monitor(self): + with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as hci: + hci.bind((0,)) + hci.setblocking(False) + # 0x04 = HCI_EVT; 0x1b = Max Slots Change + hci.setsockopt(socket.SOL_HCI, socket.HCI_FILTER, struct.pack("IIIh2x", 1 << 0x04, (1 << 0x1b), 0, 0)) + while True: + data = await self._loop.sock_recv(hci, 10) + if data[5] < 5: + self.pause_writing() + await asyncio.sleep(1) + self.resume_writing() async def _reader(self): while True: - try: - data = await self.read() - except NotConnectedError: - self._read_thread = None - break - - await self._protocol.report_received(data, self._itr_sock.getpeername()) - - def start_reader(self): - """ - Starts the transport reader which calls the protocols report_received function for every incoming message - """ - if self._read_thread is not None: - raise ValueError('Reader is already running.') - - self._read_thread = asyncio.ensure_future(self._reader()) - - # Create callback in case the reader is failing - callback = utils.create_error_check_callback(ignore=asyncio.CancelledError) - self._read_thread.add_done_callback(callback) - - async def set_reader(self, reader: asyncio.Future): - """ - Cancel the currently running reader and register the new one. - A reader is a coroutine that calls this transports 'read' function. - The 'read' function calls can be paused by calling pause_reading of this transport. - :param reader: future reader - """ - if self._read_thread is not None: - # cancel currently running reader - if self._read_thread.cancel(): - try: - await self._read_thread - except asyncio.CancelledError: - pass - - # Create callback for debugging in case the reader is failing - err_callback = utils.create_error_check_callback(ignore=asyncio.CancelledError) - reader.add_done_callback(err_callback) - - self._read_thread = reader - - def get_reader(self): - return self._read_thread + await self._protocol.report_received(await self.read(), self._itr_sock.getpeername()) async def read(self): """ @@ -104,7 +93,6 @@ async def read(self): # disconnect happened logger.error('No data received.') self._protocol.connection_lost() - raise NotConnectedError('No data received.') if self._capture_file is not None: # write data to log file @@ -114,6 +102,25 @@ async def read(self): return data +# Base transport API + + def write_eof(): + raise NotImplementedError("cannot write EOF") + + def get_extra_info(self, name: Any, default=None) -> Any: + return self._extra_info.get(name, default) + + def is_closing(self) -> bool: + return self._is_closing + + def set_protocol(self, protocol: asyncio.BaseProtocol) -> None: + self._protocol = protocol + + def get_protocol(self) -> asyncio.BaseProtocol: + return self._protocol + +# Read-Transport API + def is_reading(self) -> bool: """ :returns True if the reader is running @@ -135,6 +142,29 @@ def resume_reading(self) -> None: def set_read_buffer_size(self, size): self._read_buffer_size = size +# Write-Transport API: +# This is not compliant to the official trasnport API, as the core methods +# are asnyc. This is because the official API has no control over time and +# imho is quite lacking... + + def abort(): + raise NotImplementedError() + + def can_write_eof(): + return False + + def get_write_buffer_size(): + return self._write_window.get_aquired() + + def get_write_buffer_limits(): + return (0, self._write_window.get_limit()) + + def set_write_buffer_limits(high=None, low=None): + if low: + raise NotImplementedError("Cannot set a lower bound for in flight data...") + + self._write_window.set_limit(high) + async def write(self, data: Any) -> None: if isinstance(data, bytes): _bytes = data @@ -150,24 +180,30 @@ async def write(self, data: Any) -> None: # logger.debug(f'sending "{_bytes}"') try: + await self._write_window.acquire() + await self._write_lock.wait() await self._loop.sock_sendall(self._itr_sock, _bytes) except OSError as err: logger.error(err) self._protocol.connection_lost() - raise NotConnectedError(err) except ConnectionResetError as err: logger.error(err) self._protocol.connection_lost() - raise err - def abort(self) -> None: - raise NotImplementedError + async def writelines(*data): + for d in data: + await self.write(data) - def get_extra_info(self, name: Any, default=None) -> Any: - return self._extra_info.get(name, default) + def pause_writing(self): + logger.info("pause transport write") + self._write_lock.clear() - def is_closing(self) -> bool: - return self._is_closing + def resume_writing(self): + logger.info("resume transport write") + self._write_lock.set() + + def is_writing(self): + return not self._write_lock.is_set() async def close(self): """ @@ -176,19 +212,23 @@ async def close(self): if not self._is_closing: # was not already closed self._is_closing = True - if self._read_thread.cancel(): - # wait for reader to cancel - try: - await self._read_thread - except asyncio.CancelledError: - pass + + self.pause_reading() + self.pause_writing() + + self._read_thread.cancel() + self._write_lock_thread.cancel() + self._write_window_thread.cancel() + + with suppress(asyncio.CancelledError): + await self._read_thread + with suppress(asyncio.CancelledError): + await self._write_lock_thread + with suppress(asyncio.CancelledError): + await self._write_window_thread # interrupt connection should be closed first self._itr_sock.close() self._ctr_sock.close() - def set_protocol(self, protocol: asyncio.BaseProtocol) -> None: - self._protocol = protocol - - def get_protocol(self) -> asyncio.BaseProtocol: - return self._protocol + self._protocol.connection_lost(None) diff --git a/joycontrol/utils.py b/joycontrol/utils.py index 4eadc318..803e59ff 100644 --- a/joycontrol/utils.py +++ b/joycontrol/utils.py @@ -80,6 +80,21 @@ async def run_system_command(cmd): return proc.returncode, stdout, stderr +def start_asyncio_thread(func, ignore=None): + """ + Yes, these are not actual threads. But for all asyncio intents and purposes + they behave like they are. + """ + out = asyncio.ensure_future(func) + out.add_done_callback( + create_error_check_callback(ignore=ignore) + ) + return out + +async def aio_chain(*args): + for a in args: + await a + """ async def get_bt_mac_address(dev=0): ret, stdout, stderr = await run_system_command(f'hciconfig hci{dev}') diff --git a/mcu.md b/mcu.md new file mode 100644 index 00000000..9deafdab --- /dev/null +++ b/mcu.md @@ -0,0 +1,109 @@ + +# MCU +MarvelCinematicUniverse? + +This is a chip on the switch's controllers handling all kinds of high data throughput stuff. + +There seem to be 5 modes (OFF, starting, ON, NFC, IR, UPDATE) and a 'busy' flag. + +Requests are being sent by the switch using 11 output reports, responses are piggibacked to regular input using the MCU-datafield in 31 input reports (and everything is sent as a response to a request). All responses feature a checksum in the last byte that is computed as follows: + +checksum: `[crc8(mcu_data[0:-1])]` + +Request notation: The values given will always be the byte after `a2` and the one ten bytes after that (maybe followed by subsequent bytes) + +All numbers are in continuous hexadecimal + +# Generic MCU requests + +* If there is no response to be sent, the MCU-datafield is + + `no_request`: `ff00....00[checksum]` + +* At first 31 mode is enabled. (`01 0331`) + + `response`: `0100...00[checksum]` + +* then the MCU is enabled (`01 2201`) + + the MCU goes through starting -> firmware-update -> on, not sure if we have to respond to this, the next command is always a status request so sending this dosn't hurt. + + The firware-update phase can (and should) be skipped. + + response: `[status | no_request]` + +* A status request (`11 01`), response: + + `status`: `0100[busy]0008001b[power_state]` + + where busy is `ff` after initial enable for a while, then goes to `00` +and power_state is `01` for off, `00` for starting, `01` for on, `04` for NFC mode, `06` for firmware-update (this is not sure) + +## NFC & Amiibo + +Here I describe what I found the nfc-reading process of amiibos looks like: + +### generic stuff: + +* the Tag data `nfc_data` is 540 bytes in length + +* the UID is 7 of those bytes a follows: + + `tag_uid`: `[nfc_data(0;3(][nfc_data(4;8(]` + +### NFC Requests + +* command: configure NFC (`01 2121`) + + response: nothing (but the command-ack is weird) + +* get status / start discovery: `11 0204` + + This is spammed like hell and seems to be some other kind of nfc-status-request + + `nfc_status`: `2a000500000931[nfc_state]00..00[checksum]` + + where nfc_state is + - `00` for nothing/polling startup or something like this + - `01` for polling without success + - `01` for polling with success, followed by `poll_success`: `0000000101020007[tag_uid]` + - `02` for pending read, followed by `[poll_success]` + - `09` for polling with success, same tag as last time, followed by `[poll_success]` + + Note: the joycon thrice reported 09 followed by just 00. in response to 0204 commands after stop-polling + +* poll (`11 0201`) + + look for a new nfc tag now + + response: noting, maybe `[nfc_status]` + +* read (`11 0206`) + + respond with the 3 read packets read1 read2 read3 followed by no_request + + Note: it seems every packet is requested individually using a 0204, as on of the bytes in the request increments from 0 to 3 shortly before/after the data is sent. + + the Packets: + + read1: `3a0007010001310200000001020007[TAG_UID]000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000[nfc_data(0;245(][checksum]` + + read2: `3a000702000927[nfc_data(245;540(][checksum]` + + read3: `2a000500000931040000000101020007[TAG_UID]00...00[checksum]` + +* stop polling (`11 0202`) + + after a poll, presumably stop looking for a tag discovered during poll command + + no response + +* cancel (`11 0200`) + + No idea + +# Resources + +* [jctool.cpp](https://github.com/CTCaer/jc_toolkit/blob/5.2.0/jctool/jctool.cpp) c.a. line 2523 + +* [bluetooth_hid_subcommands.md](https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering/blob/master/bluetooth_hid_subcommands_notes.md) diff --git a/run_controller_cli.py b/run_controller_cli.py index c84597b6..b70deaf8 100755 --- a/run_controller_cli.py +++ b/run_controller_cli.py @@ -7,6 +7,7 @@ from aioconsole import ainput +import joycontrol.debug as debug from joycontrol import logging_default as log, utils from joycontrol.command_line_interface import ControllerCLI from joycontrol.controller import Controller @@ -14,6 +15,7 @@ from joycontrol.memory import FlashMemory from joycontrol.protocol import controller_protocol_factory from joycontrol.server import create_hid_server +from joycontrol.nfc_tag import NFCTag logger = logging.getLogger(__name__) @@ -163,7 +165,6 @@ async def mash_button(controller_state, button, interval): # await future to trigger exceptions in case something went wrong await user_input - def _register_commands_with_controller_state(controller_state, cli): """ Commands registered here can use the given controller state. @@ -195,6 +196,18 @@ async def mash(*args): cli.add_command(mash.__name__, mash) + async def click(*args): + + if not args: + raise ValueError('"click" command requires a button!') + + await controller_state.connect() + ensure_valid_button(controller_state, *args) + + await button_push(controller_state, *args) + + cli.add_command(click.__name__, click) + # Hold a button command async def hold(*args): """ @@ -248,24 +261,40 @@ async def nfc(*args): nfc Set controller state NFC content to file nfc remove Remove NFC content from controller state """ - logger.error('NFC Support was removed from joycontrol - see https://github.com/mart1nro/joycontrol/issues/80') + #logger.error('NFC Support was removed from joycontrol - see https://github.com/mart1nro/joycontrol/issues/80') if controller_state.get_controller() == Controller.JOYCON_L: raise ValueError('NFC content cannot be set for JOYCON_L') elif not args: - raise ValueError('"nfc" command requires file path to an nfc dump as argument!') + raise ValueError('"nfc" command requires file path to an nfc dump or "remove" as argument!') elif args[0] == 'remove': controller_state.set_nfc(None) print('Removed nfc content.') else: - _loop = asyncio.get_event_loop() - with open(args[0], 'rb') as nfc_file: - content = await _loop.run_in_executor(None, nfc_file.read) - controller_state.set_nfc(content) + controller_state.set_nfc(NFCTag.load_amiibo(args[0])) + print("added nfc content") cli.add_command(nfc.__name__, nfc) + async def pause(*args): + """ + Pause regular input + """ + controller_state._protocol.pause() + + cli.add_command(pause.__name__, pause) + + async def unpause(*args): + """ + unpause regular input + """ + controller_state._protocol.unpause() + + cli.add_command(unpause.__name__, unpause) async def _main(args): + # Get controller name to emulate from arguments + controller = Controller.from_arg(args.controller) + # parse the spi flash if args.spi_flash: with open(args.spi_flash, 'rb') as spi_flash_file: @@ -274,17 +303,16 @@ async def _main(args): # Create memory containing default controller stick calibration spi_flash = FlashMemory() - # Get controller name to emulate from arguments - controller = Controller.from_arg(args.controller) with utils.get_output(path=args.log, default=None) as capture_file: # prepare the the emulated controller - factory = controller_protocol_factory(controller, spi_flash=spi_flash) + factory = controller_protocol_factory(controller, spi_flash=spi_flash, reconnect = args.reconnect_bt_addr) ctl_psm, itr_psm = 17, 19 transport, protocol = await create_hid_server(factory, reconnect_bt_addr=args.reconnect_bt_addr, ctl_psm=ctl_psm, itr_psm=itr_psm, capture_file=capture_file, - device_id=args.device_id) + device_id=args.device_id, + interactive=True) controller_state = protocol.get_controller_state() @@ -292,6 +320,7 @@ async def _main(args): cli = ControllerCLI(controller_state) _register_commands_with_controller_state(controller_state, cli) cli.add_command('amiibo', ControllerCLI.deprecated('Command was removed - use "nfc" instead!')) + cli.add_command(debug.debug.__name__, debug.debug) # set default nfc content supplied by argument if args.nfc is not None: @@ -316,12 +345,12 @@ async def _main(args): parser = argparse.ArgumentParser() parser.add_argument('controller', help='JOYCON_R, JOYCON_L or PRO_CONTROLLER') - parser.add_argument('-l', '--log') - parser.add_argument('-d', '--device_id') - parser.add_argument('--spi_flash') + parser.add_argument('-l', '--log', help="BT-communication logfile output") + parser.add_argument('-d', '--device_id', help='not fully working yet, the BT-adapter to use') + parser.add_argument('--spi_flash', help="controller SPI-memory dump to use") parser.add_argument('-r', '--reconnect_bt_addr', type=str, default=None, - help='The Switch console Bluetooth address, for reconnecting as an already paired controller') - parser.add_argument('--nfc', type=str, default=None) + help='The Switch console Bluetooth address (or "auto" for automatic detection), for reconnecting as an already paired controller.') + parser.add_argument('--nfc', type=str, default=None, help="amiibo dump placed on the controller. Äquivalent to the nfc command.") args = parser.parse_args() loop = asyncio.get_event_loop() diff --git a/scripts/.gitignore b/scripts/.gitignore new file mode 100644 index 00000000..a8a0dcec --- /dev/null +++ b/scripts/.gitignore @@ -0,0 +1 @@ +*.bin diff --git a/scripts/change_btaddr.sh b/scripts/change_btaddr.sh new file mode 100755 index 00000000..38c8e958 --- /dev/null +++ b/scripts/change_btaddr.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# changes the vendor part (first 3 bytes) of the Mac address on a raspi 4B (tested) +# and 3B+ (untestd) to 94:58:CB for Nintendo Co. Ltd. + +# For some reason after a reboot you have to run +# sudo hcitool cmd 0x3f 0x001 0x66 0x55 0x44 0x33 0x22 0x11 +# where 11:22:33:44:55:66 is your mac address. +# (yes the ordering is on purpose, pass in reverse to hcitool) + +if [ -z "$1" ] +then + bdaddr_dev=$(bluetoothctl show | grep -Eo '(:[0-9a-fA-F]{2}){3}\s') + target_addr="94:58:CB${bdaddr_dev}" + echo "detected dev id: ${bdaddr_dev}" +else + target_addr=$1 +fi + +echo "changing address to ${target_addr}" +bdaddr -i hci0 "${target_addr}" +hciconfig hci0 reset +systemctl restart bluetooth.service + +echo "success" diff --git a/scripts/hcimon.py b/scripts/hcimon.py new file mode 100755 index 00000000..14d0b492 --- /dev/null +++ b/scripts/hcimon.py @@ -0,0 +1,16 @@ +import socket +from struct import pack + +hci = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) +hci.bind((0,)) +# 0x10 = 1 << 4 = HCI_EVT + +# first arg (0x10): 1 << MESSAGE_TYPE For multiple "or" them together +# 2nd and 3rd arg: subtypes you want. same format as first arg. For event called "EVENT CODE" +# 2nd arg for the first 32 Codes, 3rd arg for 33-64. +# 4th arg: dunno, 0 works +hci.setsockopt(socket.SOL_HCI, socket.HCI_FILTER, pack("IIIh2x", 0x10, 1 << 0x1b, 0, 0)) +#hci.setsockopt(socket.SOL_HCI, socket.HCI_FILTER, pack("IIIh2x", 0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF, 0)) + +while True: + print(hci.recv(300).hex()) diff --git a/scripts/joycon_ip_proxy.py b/scripts/joycon_ip_proxy.py new file mode 100755 index 00000000..09fe0bb9 --- /dev/null +++ b/scripts/joycon_ip_proxy.py @@ -0,0 +1,245 @@ +import argparse +import asyncio +import logging +import os +import socket +import re + +import hid + +from joycontrol import logging_default as log, utils +from joycontrol.device import HidDevice +from joycontrol.server import PROFILE_PATH +from joycontrol.utils import AsyncHID + +logger = logging.getLogger(__name__) + +async def recv_to_queue(queue, src, side): + while True: + data = await src() + await queue.put(data) + #try: + # queue.put_nowait(data) + #except asyncio.QueueFull: + # print(side, "overrun") + # #queue.get_nowait() + # #queue.put_nowait(data) + if queue.qsize() > 1: + print(side, queue.qsize()) + +async def send_from_queue(queue, dst, printd=False): + # @yamakaky I too would love to use python 3.8 but raspis ship with 3.7 :( + while True: + data = await queue.get() + await dst(data) + if printd: + print("send") + +def read_from_sock(sock): + async def internal(): + return await asyncio.get_event_loop().sock_recv(sock, 500) + return internal + +def write_to_sock(sock): + async def internal(data): + return await asyncio.get_event_loop().sock_sendall(sock, data) + return internal + +async def connect_bt(bt_addr): + loop = asyncio.get_event_loop() + ctl = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) + itr = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) + + # See https://bugs.python.org/issue27929?@ok_message=issue%2027929%20versions%20edited%20ok&@template=item + # bug here: https://github.com/python/cpython/blob/5e29021a5eb10baa9147fd977cab82fa3f652bf0/Lib/asyncio/selector_events.py#L495 + # should be + # if hasattr(socket, 'AF_INET') or hasattr(socket, 'AF_INET6') sock.family in (socket.AF_INET, socket.AF_INET6): + # or something similar + # ctl.setblocking(0) + # itr.setblocking(0) + # await loop.sock_connect(ctl, (bt_addr, 17)) + # await loop.sock_connect(itr, (bt_addr, 19)) + ctl.connect((bt_addr, 17)) + itr.connect((bt_addr, 19)) + ctl.setblocking(0) + itr.setblocking(0) + return ctl, itr + +async def accept_bt(): + loop = asyncio.get_event_loop() + + ctl_srv = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) + itr_srv = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) + + print('Waitng for the Switch... Please open the "Change Grip/Order" menu.') + + ctl_srv.setblocking(False) + itr_srv.setblocking(False) + + ctl_srv.bind((socket.BDADDR_ANY, 17)) + itr_srv.bind((socket.BDADDR_ANY, 19)) + + ctl_srv.listen(1) + itr_srv.listen(1) + + emulated_hid = HidDevice() + # setting bluetooth adapter name and class to the device we wish to emulate + await emulated_hid.set_name('Joy-Con (R)') + logger.info('Advertising the Bluetooth SDP record...') + emulated_hid.register_sdp_record(PROFILE_PATH) + #emulated_hid.powered(True) + emulated_hid.discoverable(True) + #emulated_hid.pairable(True) + await emulated_hid.set_class() + + ctl, ctl_address = await loop.sock_accept(ctl_srv) + print(f'Accepted connection at psm 17 from {ctl_address}') + itr, itr_address = await loop.sock_accept(itr_srv) + print(f'Accepted connection at psm 19 from {itr_address}') + assert ctl_address[0] == itr_address[0] + + # stop advertising + emulated_hid.discoverable(False) + ctl_srv.close() + itr_srv.close() + + return ctl, itr + +def bt_to_callbacks(ctl, itr): + def internal(): + itr.close() + ctl.close() + return read_from_sock(itr), write_to_sock(itr), internal + +class NoDatagramProtocol(asyncio.DatagramProtocol): + """ + This is a gutted version of the protcol-transport paradigm in asyncio that + has an api more similar to sockets. More perciseley the protocoll part. + Neccessary because asyncio has no datagram socket support. + + @param peer: the address of the peer to "connect" to. + """ + def __init__(self, peer): + self.peer = peer + self.connected = asyncio.Event() + self.transport = None + self.readQueue = asyncio.Queue(10) + + async def read(self): + return await self.readQueue.get() + + async def write(self, data): + await self.connected.wait() + self.transport.sendto(data, self.peer) + + def datagram_received(self, data, addr): + if self.peer == addr: + try: + self.readQueue.put_nowait(data) + except: + print("dropped packet") + else: + print("warning: unknown source, dropped") + + def connection_made(self, transport): + self.transport = transport + self.connected.set() + + def error_received(self, exc): + print('Error received:', exc) + + def connection_lost(self, exc): + self.connected.clear() + self.transport = None + + +async def connectEth(eth, server=False): + ip, port = eth.split(':') + port = int(port) + + t, p = await asyncio.get_event_loop().create_datagram_endpoint(lambda: NoDatagramProtocol((ip, port)), local_addr=('0.0.0.0', port), remote_addr=(ip, port)) + + # replaces the syn-ack handshake with just sending a single packet to test the + # connection beforehand + if server: + await p.read() + else: + await p.write(bytes(10)) + + return p.read, p.write, t.close + +async def _main(sw_addr, jc_addr, buffer=10): + # loop = asyncio.get_event_loop() + + jc_eth = not re.match("([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}", jc_addr) + sw_eth = not re.match("([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}", sw_addr) + + sw_any = sw_addr == "00:00:00:00:00:00" + + print("jc_eth", jc_eth, "sw_eth", sw_eth) + + jc_queue = asyncio.Queue(buffer) + sw_queue = asyncio.Queue(buffer) + + send_to_jc = None + recv_from_jc = None + cleanup_jc = None + + send_to_switch = None + recv_from_switch = None + cleanup_switch = None + try: + # DONT do if-else here, because order should be easily adjustable + if not jc_eth: + print("waiting for joycon") + recv_from_jc, send_to_jc, cleanup_jc = bt_to_callbacks(*await connect_bt(jc_addr)) + + if jc_eth: + print("opening joycon eth") + recv_from_jc, send_to_jc, cleanup_jc = await connectEth(jc_addr, True) + + if sw_eth: + print("opening switch eth") + recv_from_switch, send_to_switch, cleanup_switch = await connectEth(sw_addr, False) + + if not sw_eth: + if not sw_any: + print("waiting for switch") + recv_from_switch, send_to_switch, cleanup_switch = bt_to_callbacks(*await connect_bt(sw_addr)) + else: + recv_from_switch, send_to_switch, cleanup_switch = bt_to_callbacks(*await accept_bt()) + + print("stared forwarding") + await asyncio.gather( + asyncio.ensure_future(recv_to_queue(jc_queue, recv_from_jc, ">")), + asyncio.ensure_future(send_from_queue(jc_queue, send_to_switch)), + asyncio.ensure_future(recv_to_queue(sw_queue, recv_from_switch, "<")), + asyncio.ensure_future(send_from_queue(sw_queue, send_to_jc)), + ) + finally: + if cleanup_switch: + cleanup_switch() + if cleanup_jc: + cleanup_jc() + + + +if __name__ == '__main__': + # check if root + if not os.geteuid() == 0: + raise PermissionError('Script must be run as root!') + + parser = argparse.ArgumentParser(description="Acts as proxy for Switch-joycon communtcation between the two given addresses.\n Start the instance forwarding to the Switch directly first") + parser.add_argument('-S', '--switch', type=str, default=None, + help='talk to switch at the given address. Either a BT-MAC or a tcp ip:port combo. 00:00:00:00:00:00 for pair mode.') + parser.add_argument('-J', '--joycon', type=str, default=None, + help='talk to switch at the given address. Either a BT-MAC or a tcp ip:port combo.') + parser.add_argument('-B', '--buffer', type=int, default=10, + help='the buffersize to use for in and output.') + + args = parser.parse_args() + if not args.switch or not args.joycon: + print("missing args") + exit(1) + + asyncio.run(_main(args.switch, args.joycon, buffer=args.buffer)) diff --git a/scripts/logparser.txt b/scripts/logparser.txt new file mode 100644 index 00000000..ec5d2a5e --- /dev/null +++ b/scripts/logparser.txt @@ -0,0 +1,10 @@ + +Settings to be used with this fork of wireshark: +https://gitlab.com/paulniklasweiss/wireshark/-/tree/regex_text_import +to import the message log format into wireshark + + +regex: $(?[<>]\s*(?