Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ 📶 Add wireless V5 uploading #47

Merged
merged 7 commits into from
Feb 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 123 additions & 23 deletions pros/serial/devices/vex/v5_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import io
import re
import struct
import time
import typing
from collections import defaultdict
from configparser import ConfigParser
from datetime import datetime, timedelta
from enum import IntEnum
from enum import IntEnum, IntFlag
from io import BytesIO, StringIO
from pathlib import Path
from typing import *
Expand Down Expand Up @@ -43,10 +44,9 @@ def filter_v5_ports(p, locations, names):
# Doesn't work on macOS or Jonathan's Dell, so we have a fallback (below)
user_ports = [p for p in ports if filter_v5_ports(p, ['2'], ['User'])]
system_ports = [p for p in ports if filter_v5_ports(p, ['0'], ['System', 'Communications'])]
joystick_ports = [] # joystick comms are very slow/unusable
# joystick_ports = [p for p in ports if filter_v5_ports(p, ['1'], ['Controller'])]
joystick_ports = [p for p in ports if filter_v5_ports(p, ['1'], ['Controller'])]

# TODO: test this code path (hard)
# Testing this code path is hard!
if len(user_ports) != len(system_ports):
if len(user_ports) > len(system_ports):
user_ports = [p for p in user_ports if p not in system_ports]
Expand All @@ -57,7 +57,7 @@ def filter_v5_ports(p, locations, names):
if p_type.lower() == 'user':
return user_ports
elif p_type.lower() == 'system':
return system_ports
return system_ports + joystick_ports
else:
raise ValueError(f'Invalid port type specified: {p_type}')

Expand All @@ -72,14 +72,27 @@ def natural_key(chunk: str):
if p_type.lower() == 'user':
return [ports[1]]
elif p_type.lower() == 'system':
return [ports[0]]
return [ports[0], *joystick_ports]
else:
raise ValueError(f'Invalid port type specified: {p_type}')
if len(joystick_ports) > 0:
if len(joystick_ports) > 0 and p_type.lower() == 'system':
return joystick_ports
return []


def with_download_channel(f):
"""
Function decorator for use inside V5Device class. Needs to be outside the class because @staticmethod prevents
us from making a function decorator
"""

def wrapped(device, *args, **kwargs):
with V5Device.DownloadChannel(device):
f(device, *args, **kwargs)

return wrapped


def compress_file(file: BinaryIO, file_len: int, label='Compressing binary') -> Tuple[BinaryIO, int]:
buf = io.BytesIO()
with ui.progressbar(length=file_len, label=label) as progress:
Expand All @@ -98,6 +111,7 @@ def compress_file(file: BinaryIO, file_len: int, label='Compressing binary') ->

class V5Device(VEXDevice, SystemDevice):
vid_map = {'user': 1, 'system': 15, 'rms': 16, 'pros': 24, 'mw': 32} # type: Dict[str, int]
channel_map = {'pit': 0, 'download': 1} # type: Dict[str, int]

class FTCompleteOptions(IntEnum):
DONT_RUN = 0
Expand All @@ -107,10 +121,82 @@ class FTCompleteOptions(IntEnum):
VEX_CRC16 = CRC(16, 0x1021) # CRC-16-CCIT
VEX_CRC32 = CRC(32, 0x04C11DB7) # CRC-32 (the one used everywhere but has no name)

class SystemVersion(object):
class Product(IntEnum):
CONTROLLER = 0x11
BRAIN = 0x10

class BrainFlags(IntFlag):
pass

class ControllerFlags(IntFlag):
CONNECTED = 0x02

flag_map = {Product.BRAIN: BrainFlags, Product.CONTROLLER: ControllerFlags}

def __init__(self, data: tuple):
from semantic_version import Version
self.system_version = Version('{}.{}.{}-{}.{}'.format(*data[0:5]))
self.product = V5Device.SystemVersion.Product(data[5])
self.product_flags = self.flag_map[self.product](data[6])

def __str__(self):
return f'System Version: {self.system_version}\n' \
f' Product: {self.product.name}\n' \
f' Product Flags: {self.product_flags.value:x}'

class SystemStatus(object):
def __init__(self, data: tuple):
from semantic_version import Version
self.system_version = Version('{}.{}.{}-{}'.format(*data[0:4]))
self.cpu0_version = Version('{}.{}.{}-{}'.format(*data[4:8]))
self.cpu1_version = Version('{}.{}.{}-{}'.format(*data[8:12]))
self.touch_version = data[12]
self.system_id = data[13]

def __getitem__(self, item):
return self.__dict__[item]

def __init__(self, port: BasePort):
self._status = None
super().__init__(port)

class DownloadChannel(object):
edjubuh marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, device: 'V5Device', timeout: float = 5.):
self.device = device
self.timeout = timeout
self.did_switch = False

def __enter__(self):
version = self.device.query_system_version()
if version.product == V5Device.SystemVersion.Product.CONTROLLER:
self.device.default_timeout = 2.
if V5Device.SystemVersion.ControllerFlags.CONNECTED not in version.product_flags:
raise VEXCommError('V5 Controller doesn\'t appear to be connected to a V5 Brain', version)
ui.echo('Transferring V5 to download channel')
self.device.ft_transfer_channel('download')
logger(__name__).debug('Sleeping for a while to let V5 start channel transfer')
time.sleep(.25) # wait at least 250ms before starting to poll controller if it's connected yet
version = self.device.query_system_version()
start_time = time.time()
# ask controller every 25 ms if it's connected until it is
while V5Device.SystemVersion.ControllerFlags.CONNECTED not in version.product_flags and \
time.time() - start_time < self.timeout:
version = self.device.query_system_version()
time.sleep(0.25)
if V5Device.SystemVersion.ControllerFlags.CONNECTED not in version.product_flags:
raise VEXCommError('Could not transfer V5 Controller to download channel', version)
logger(__name__).info('V5 should been transferred to higher bandwidth download channel')
return self
else:
return self

def __exit__(self, *exc):
version = self.device.query_system_version()
if version.product == V5Device.SystemVersion.Product.CONTROLLER:
self.device.ft_transfer_channel('pit')
ui.echo('V5 has been transferred back to pit channel')

@property
def status(self):
if not self._status:
Expand All @@ -121,6 +207,12 @@ def status(self):
def can_compress(self):
return self.status['system_version'] in Spec('>=1.0.5')

@property
def is_wireless(self):
version = self.query_system_version()
return version.product == V5Device.SystemVersion.Product.CONTROLLER and \
V5Device.SystemVersion.ControllerFlags.CONNECTED in version.product_flags

def generate_cold_hash(self, project: Project, extra: dict):
keys = {k: t.version for k, t in project.templates.items()}
keys.update(extra)
Expand Down Expand Up @@ -188,6 +280,7 @@ def generate_ini_file(self, remote_name: str = None, slot: int = 0, ini: ConfigP
logger(__name__).info(f'Created ini: {ini_str.getvalue()}')
return ini_str.getvalue()

@with_download_channel
def write_program(self, file: typing.BinaryIO, remote_name: str = None, ini: ConfigParser = None, slot: int = 0,
file_len: int = -1, run_after: FTCompleteOptions = FTCompleteOptions.DONT_RUN,
target: str = 'flash', quirk: int = 0, linked_file: Optional[typing.BinaryIO] = None,
Expand Down Expand Up @@ -435,6 +528,9 @@ def write_file(self, file: typing.BinaryIO, remote_file: str, file_len: int = -1
if compress and self.can_compress:
file, file_len = compress_file(file, file_len)

if self.is_wireless and file_len > 0x25000:
confirm(f'You\'re about to upload {file_len} bytes wirelessly. This could take some time, and you should '
f'consider uploading directly with a wire.', abort=True, default=False)
crc32 = self.VEX_CRC32.compute(file.read(file_len))
file.seek(0, 0)
addr = kwargs.get('addr', 0x03800000)
Expand Down Expand Up @@ -463,6 +559,7 @@ def write_file(self, file: typing.BinaryIO, remote_file: str, file_len: int = -1
file.close()
self.ft_complete(options=run_after)

@with_download_channel
def capture_screen(self) -> Tuple[List[List[int]], int, int]:
self.sc_init()
width, height = 512, 272
Expand Down Expand Up @@ -502,11 +599,21 @@ def read_ini(self, remote_name: str) -> Optional[ConfigParser]:
return None

@retries
def query_system_version(self) -> bytearray:
logger(__name__).debug('Sending simple 0xA4 command')
ret = self._txrx_simple_packet(0xA4, 0x08)
logger(__name__).debug('Completed simple 0xA4 command')
return ret
def query_system_version(self) -> SystemVersion:
logger(__name__).debug('Sending simple 0xA408 command')
ret = self._txrx_simple_struct(0xA4, '>8B')
logger(__name__).debug('Completed simple 0xA408 command')
return V5Device.SystemVersion(ret)

@retries
def ft_transfer_channel(self, channel: int_str):
logger(__name__).debug(f'Transferring to {channel} channel')
logger(__name__).debug('Sending ext 0x10 command')
if isinstance(channel, str):
channel = self.channel_map[channel]
assert isinstance(channel, int) and 0 <= channel <= 1
self._txrx_ext_packet(0x10, struct.pack('<2B', 1, channel), rx_length=0)
logger(__name__).debug('Completed ext 0x10 command')

@retries
def ft_initialize(self, file_name: str, **kwargs) -> Dict[str, Any]:
Expand Down Expand Up @@ -703,18 +810,11 @@ def get_device_status(self):
raise NotImplementedError()

@retries
def get_system_status(self) -> Dict[str, Any]:
def get_system_status(self) -> SystemStatus:
logger(__name__).debug('Sending ext 0x22 command')
rx = self._txrx_ext_struct(0x22, [], "<x12B3xBI12x")
logger(__name__).debug('Completed ext 0x22 command')
from semantic_version import Version
return {
'system_version': Version('{}.{}.{}-{}'.format(*rx[0:4])),
'cpu0_version': Version('{}.{}.{}-{}'.format(*rx[4:8])),
'cpu1_version': Version('{}.{}.{}-{}'.format(*rx[8:12])),
'touch_version': rx[12],
'system_id': rx[13]
}
return V5Device.SystemStatus(rx)

@retries
def sc_init(self) -> None:
Expand All @@ -728,7 +828,7 @@ def sc_init(self) -> None:

def _txrx_ext_struct(self, command: int, tx_data: Union[Iterable, bytes, bytearray],
unpack_fmt: str, check_length: bool = True, check_ack: bool = True,
timeout: float = 0.1) -> Tuple:
timeout: Optional[float] = None) -> Tuple:
"""
Transmits and receives an extended command to the V5, automatically unpacking the values according to unpack_fmt
which gets passed into struct.unpack. The size of the payload is determined from the fmt string
Expand Down Expand Up @@ -792,7 +892,7 @@ def _rx_ext_packet(cls, msg: Message, command: int, rx_length: int, check_ack: b

def _txrx_ext_packet(self, command: int, tx_data: Union[Iterable, bytes, bytearray],
rx_length: int, check_length: bool = True,
check_ack: bool = True, timeout: float = 0.1) -> Message:
check_ack: bool = True, timeout: Optional[float] = None) -> Message:
"""
Transmits and receives an extended command to the V5.
:param command: Extended command code
Expand Down
18 changes: 12 additions & 6 deletions pros/serial/devices/vex/vex_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pros.common import *
from pros.serial import bytes_to_str

from pros.serial.ports import BasePort
from . import comm_error
from .message import Message
from ..generic_device import GenericDevice
Expand All @@ -18,6 +18,10 @@ class VEXDevice(GenericDevice):
ACK_BYTE = 0x76
NACK_BYTE = 0xFF

def __init__(self, port: BasePort, timeout=0.1):
super().__init__(port)
self.default_timeout = timeout

@retries
def query_system(self) -> bytearray:
"""
Expand All @@ -27,11 +31,11 @@ def query_system(self) -> bytearray:
logger(__name__).debug('Sending simple 0x21 command')
return self._txrx_simple_packet(0x21, 0x0A)

def _txrx_simple_struct(self, command: int, unpack_fmt: str, timeout: float = 0.1) -> Tuple:
def _txrx_simple_struct(self, command: int, unpack_fmt: str, timeout: Optional[float] = None) -> Tuple:
rx = self._txrx_simple_packet(command, struct.calcsize(unpack_fmt), timeout=timeout)
return struct.unpack(unpack_fmt, rx)

def _txrx_simple_packet(self, command: int, rx_len: int, timeout: float = 0.1) -> bytearray:
def _txrx_simple_packet(self, command: int, rx_len: int, timeout: Optional[float] = None) -> bytearray:
"""
Transmits a simple command to the VEX device, performs the standard quality of message checks, then
returns the payload.
Expand All @@ -47,12 +51,14 @@ def _txrx_simple_packet(self, command: int, rx_len: int, timeout: float = 0.1) -
raise comm_error.VEXCommError("Received data doesn't match expected length", msg)
return msg['payload']

def _rx_packet(self, timeout: float = 0.01) -> Dict[str, Union[Union[int, bytes, bytearray], Any]]:
def _rx_packet(self, timeout: Optional[float] = None) -> Dict[str, Union[Union[int, bytes, bytearray], Any]]:
# Optimized to read as quickly as possible w/o delay
start_time = time.time()
response_header = bytes([0xAA, 0x55])
response_header_stack = list(response_header)
rx = bytearray()
if timeout is None:
timeout = self.default_timeout
while (len(rx) > 0 or time.time() - start_time < timeout) and len(response_header_stack) > 0:
b = self.port.read(1)
if len(b) == 0:
Expand All @@ -66,7 +72,7 @@ def _rx_packet(self, timeout: float = 0.01) -> Dict[str, Union[Union[int, bytes,
response_header_stack = bytearray(response_header)
rx = bytearray()
if not rx == bytearray(response_header):
edjubuh marked this conversation as resolved.
Show resolved Hide resolved
raise IOError(f"Couldn't find the response header in the device response. "
raise IOError(f"Couldn't find the response header in the device response after {timeout} s. "
f"Got {rx.hex()} but was expecting {response_header.hex()}")
rx.extend(self.port.read(1))
command = rx[-1]
Expand Down Expand Up @@ -95,7 +101,7 @@ def _tx_packet(self, command: int, tx_data: Union[Iterable, bytes, bytearray, No
return tx

def _txrx_packet(self, command: int, tx_data: Union[Iterable, bytes, bytearray, None] = None,
timeout: float = 0.1) -> Message:
timeout: Optional[float] = None) -> Message:
"""
Goes through a send/receive cycle with a VEX device.
Transmits the command with the optional additional payload, then reads and parses the outer layer
Expand Down
1 change: 1 addition & 0 deletions pros/serial/ports/direct_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

def create_serial_port(port_name: str, timeout: Optional[float] = 1.0) -> serial.Serial:
try:
logger(__name__).debug(f'Opening serial port {port_name}')
port = serial.Serial(port_name, baudrate=115200, bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE)
port.timeout = timeout
Expand Down