diff --git a/pros/cli/misc_commands.py b/pros/cli/misc_commands.py index 4d27591d..56c63217 100644 --- a/pros/cli/misc_commands.py +++ b/pros/cli/misc_commands.py @@ -2,7 +2,6 @@ from pros.cli.common import * from pros.ga.analytics import analytics - @pros_root def misc_commands_cli(): pass diff --git a/pros/cli/v5_utils.py b/pros/cli/v5_utils.py index 67c5c17c..6e9565ef 100644 --- a/pros/cli/v5_utils.py +++ b/pros/cli/v5_utils.py @@ -290,35 +290,3 @@ def capture(file_name: str, port: str, force: bool = False): w.write(file_, i_data) print(f'Saved screen capture to {file_name}') - -@v5.command(aliases=['sv', 'set'], short_help='Set a kernel variable on a connected V5 device') -@click.argument('variable', type=click.Choice(['teamnumber', 'robotname']), required=True) -@click.argument('value', required=True, type=click.STRING, nargs=1) -@default_options -def set_variable(variable, value): - import pros.serial.devices.vex as vex - from pros.serial.ports import DirectPort - - # Get the connected v5 device - port = resolve_v5_port(None, 'system')[0] - if port == None: - return - device = vex.V5Device(DirectPort(port)) - device.kv_write(variable, value) - print(f'{variable} set to {value[:253]}') - -@v5.command(aliases=['rv', 'get'], short_help='Read a kernel variable from a connected V5 device') -@click.argument('variable', type=click.Choice(['teamnumber', 'robotname']), required=True) -@default_options -def read_variable(variable): - import pros.serial.devices.vex as vex - from pros.serial.ports import DirectPort - - # Get the connected v5 device - port = resolve_v5_port(None, 'system')[0] - if port == None: - return - device = vex.V5Device(DirectPort(port)) - value = device.kv_read(variable).decode() - print(f'Value of \'{variable}\' : {value}') - diff --git a/pros/serial/devices/vex/v5_device.py b/pros/serial/devices/vex/v5_device.py index e3c474dd..c65861e5 100644 --- a/pros/serial/devices/vex/v5_device.py +++ b/pros/serial/devices/vex/v5_device.py @@ -892,36 +892,6 @@ def sc_init(self) -> None: self._txrx_ext_struct(0x28, [], '') logger(__name__).debug('Completed ext 0x28 command') - @retries - def kv_read(self, kv: str) -> bytearray: - logger(__name__).debug('Sending ext 0x2e command') - encoded_kv = f'{kv}\0'.encode(encoding='ascii') - tx_payload = struct.pack(f'<{len(encoded_kv)}s', encoded_kv) - # Because the length of the kernel variables is not known, use None to indicate we are recieving an unknown length. - ret = self._txrx_ext_struct(0x2e, tx_payload, None, check_length=False, check_ack=True)[0] - logger(__name__).debug('Completed ext 0x2e command') - return ret - - @retries - def kv_write(self, kv: str, payload: Union[Iterable, bytes, bytearray, str]): - logger(__name__).debug('Sending ext 0x2f command') - encoded_kv = f'{kv}\0'.encode(encoding='ascii') - kv_to_max_bytes = { - 'teamnumber': 7, - 'robotname': 16 - } - if len(kv) > kv_to_max_bytes.get(kv, 254): - logger(__name__).info(f'{kv} is longer than the maximum supported length {kv_to_max_bytes[kv]}, truncating.') - # Trim down size of payload to fit within the 255 byte limit and add null terminator. - payload = payload[:kv_to_max_bytes.get(kv, 254)] + "\0" - if isinstance(payload, str): - payload = payload.encode(encoding='ascii') - tx_fmt =f'<{len(encoded_kv)}s{len(payload)}s' - tx_payload = struct.pack(tx_fmt, encoded_kv, payload) - ret = self._txrx_ext_packet(0x2f, tx_payload, None, check_length=False, check_ack=True) - logger(__name__).debug('Completed ext 0x2f command') - return ret - def _txrx_ext_struct(self, command: int, tx_data: Union[Iterable, bytes, bytearray], unpack_fmt: str, check_length: bool = True, check_ack: bool = True, timeout: Optional[float] = None) -> Tuple: @@ -936,14 +906,10 @@ def _txrx_ext_struct(self, command: int, tx_data: Union[Iterable, bytes, bytearr :param check_ack: If true, then checks the first byte of the extended payload as an AK byte :return: A tuple unpacked according to the unpack_fmt """ - # Calculate the size of unpack_fmt if it is not None - calculated_size = struct.calcsize(unpack_fmt) if unpack_fmt else None - rx = self._txrx_ext_packet(command, tx_data, calculated_size, + rx = self._txrx_ext_packet(command, tx_data, struct.calcsize(unpack_fmt), check_length=check_length, check_ack=check_ack, timeout=timeout) logger(__name__).debug('Unpacking with format: {}'.format(unpack_fmt)) - # Use the provided unpack_fmt if it was not None, otherwise use the recieved size - recieve_format = unpack_fmt if unpack_fmt else '<{}s'.format(len(rx)) - return struct.unpack(recieve_format, rx) + return struct.unpack(unpack_fmt, rx) @classmethod def _rx_ext_packet(cls, msg: Message, command: int, rx_length: int, check_ack: bool = True, @@ -1006,8 +972,7 @@ def _txrx_ext_packet(self, command: int, tx_data: Union[Iterable, bytes, bytearr """ tx_payload = self._form_extended_payload(command, tx_data) rx = self._txrx_packet(0x56, tx_data=tx_payload, timeout=timeout) - # Use rx_length if it was provided, otherwise use the length of the received data - rx_length = rx_length if rx_length else len(rx) + return self._rx_ext_packet(rx, command, rx_length, check_ack=check_ack, check_length=check_length) @classmethod