Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Set team number and robot name #210

Merged
merged 8 commits into from
May 18, 2022
1 change: 1 addition & 0 deletions pros/cli/misc_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pros.cli.common import *
from pros.ga.analytics import analytics


@pros_root
def misc_commands_cli():
pass
Expand Down
32 changes: 32 additions & 0 deletions pros/cli/v5_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,35 @@ def capture(file_name: str, port: str, force: bool = False):
w.write(file_, i_data)

print(f'Saved screen capture to {file_name}')

@v5.command(aliases=['sv', 'set'], short_help='Set a kernel variable on a connected V5 device')
@click.argument('variable', type=click.Choice(['teamnumber', 'robotname']), required=True)
@click.argument('value', required=True, type=click.STRING, nargs=1)
@default_options
def set_variable(variable, value):
import pros.serial.devices.vex as vex
from pros.serial.ports import DirectPort

# Get the connected v5 device
port = resolve_v5_port(None, 'system')[0]
if port == None:
return
device = vex.V5Device(DirectPort(port))
device.kv_write(variable, value)
print(f'{variable} set to {value[:253]}')

@v5.command(aliases=['rv', 'get'], short_help='Read a kernel variable from a connected V5 device')
@click.argument('variable', type=click.Choice(['teamnumber', 'robotname']), required=True)
@default_options
def read_variable(variable):
import pros.serial.devices.vex as vex
from pros.serial.ports import DirectPort

# Get the connected v5 device
port = resolve_v5_port(None, 'system')[0]
if port == None:
return
device = vex.V5Device(DirectPort(port))
value = device.kv_read(variable).decode()
print(f'Value of \'{variable}\' : {value}')

41 changes: 38 additions & 3 deletions pros/serial/devices/vex/v5_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,36 @@ def sc_init(self) -> None:
self._txrx_ext_struct(0x28, [], '')
logger(__name__).debug('Completed ext 0x28 command')

@retries
def kv_read(self, kv: str) -> bytearray:
logger(__name__).debug('Sending ext 0x2e command')
encoded_kv = f'{kv}\0'.encode(encoding='ascii')
tx_payload = struct.pack(f'<{len(encoded_kv)}s', encoded_kv)
# Because the length of the kernel variables is not known, use None to indicate we are recieving an unknown length.
ret = self._txrx_ext_struct(0x2e, tx_payload, None, check_length=False, check_ack=True)[0]
logger(__name__).debug('Completed ext 0x2e command')
return ret

@retries
def kv_write(self, kv: str, payload: Union[Iterable, bytes, bytearray, str]):
logger(__name__).debug('Sending ext 0x2f command')
encoded_kv = f'{kv}\0'.encode(encoding='ascii')
kv_to_max_bytes = {
'teamnumber': 7,
'robotname': 16
}
if len(kv) > kv_to_max_bytes.get(kv, 254):
logger(__name__).info(f'{kv} is longer than the maximum supported length {kv_to_max_bytes[kv]}, truncating.')
# Trim down size of payload to fit within the 255 byte limit and add null terminator.
payload = payload[:kv_to_max_bytes.get(kv, 254)] + "\0"
if isinstance(payload, str):
payload = payload.encode(encoding='ascii')
tx_fmt =f'<{len(encoded_kv)}s{len(payload)}s'
tx_payload = struct.pack(tx_fmt, encoded_kv, payload)
ret = self._txrx_ext_packet(0x2f, tx_payload, None, check_length=False, check_ack=True)
logger(__name__).debug('Completed ext 0x2f command')
return ret

def _txrx_ext_struct(self, command: int, tx_data: Union[Iterable, bytes, bytearray],
unpack_fmt: str, check_length: bool = True, check_ack: bool = True,
timeout: Optional[float] = None) -> Tuple:
Expand All @@ -906,10 +936,14 @@ def _txrx_ext_struct(self, command: int, tx_data: Union[Iterable, bytes, bytearr
:param check_ack: If true, then checks the first byte of the extended payload as an AK byte
:return: A tuple unpacked according to the unpack_fmt
"""
rx = self._txrx_ext_packet(command, tx_data, struct.calcsize(unpack_fmt),
# Calculate the size of unpack_fmt if it is not None
calculated_size = struct.calcsize(unpack_fmt) if unpack_fmt else None
rx = self._txrx_ext_packet(command, tx_data, calculated_size,
check_length=check_length, check_ack=check_ack, timeout=timeout)
logger(__name__).debug('Unpacking with format: {}'.format(unpack_fmt))
return struct.unpack(unpack_fmt, rx)
# Use the provided unpack_fmt if it was not None, otherwise use the recieved size
recieve_format = unpack_fmt if unpack_fmt else '<{}s'.format(len(rx))
return struct.unpack(recieve_format, rx)

@classmethod
def _rx_ext_packet(cls, msg: Message, command: int, rx_length: int, check_ack: bool = True,
Expand Down Expand Up @@ -972,7 +1006,8 @@ def _txrx_ext_packet(self, command: int, tx_data: Union[Iterable, bytes, bytearr
"""
tx_payload = self._form_extended_payload(command, tx_data)
rx = self._txrx_packet(0x56, tx_data=tx_payload, timeout=timeout)

# Use rx_length if it was provided, otherwise use the length of the received data
rx_length = rx_length if rx_length else len(rx)
return self._rx_ext_packet(rx, command, rx_length, check_ack=check_ack, check_length=check_length)

@classmethod
Expand Down