diff --git a/pros/cli/upload.py b/pros/cli/upload.py index 50d1699b..7b1a2590 100644 --- a/pros/cli/upload.py +++ b/pros/cli/upload.py @@ -31,6 +31,8 @@ def upload_cli(): cls=PROSOption, group='V5 Options', help='Open "run program" screen after uploading, instead of executing' ' program. This option may help with controller connectivity ' 'reliability and prevent robots from running off tables.') +@click.option('--compress-bin/--no-compress-bin', 'compress_bin', cls=PROSOption, group='V5 Options', default=True, + help='Compress the program binary before uploading.') @default_options def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwargs): """ @@ -94,7 +96,8 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg pass # print what was decided - ui.echo('Uploading {} to {} device on {}'.format(path, kwargs['target'], port), nl=False) + compressed_label = ' (compressed) ' if kwargs['compress_bin'] else ' ' + ui.echo(f"Uploading {path}{compressed_label}to {kwargs['target']} device on {port}", nl=False) if kwargs['target'] == 'v5': ui.echo(f' as {args[0]} to slot {kwargs["slot"] + 1}', nl=False) ui.echo('') diff --git a/pros/common/ui/__init__.py b/pros/common/ui/__init__.py index a3a6c11e..3469c3a4 100644 --- a/pros/common/ui/__init__.py +++ b/pros/common/ui/__init__.py @@ -69,7 +69,7 @@ def progressbar(iterable: Iterable = None, length: int = None, label: str = None fill_char: str = '#', empty_char: str = '-', bar_template: str = '%(label)s [%(bar)s] %(info)s', info_sep: str = ' ', width: int = 36): if ismachineoutput(): - return _MachineOutputProgessBar(**locals()) + return _MachineOutputProgressBar(**locals()) else: return click.progressbar(**locals()) @@ -118,18 +118,18 @@ def finalize(method: str, data: Union[str, Dict, object, List[Union[str, Dict, o echo(human_readable) -class _MachineOutputProgessBar(_click_ProgressBar): +class _MachineOutputProgressBar(_click_ProgressBar): def __init__(self, *args, **kwargs): global _current_notify_value kwargs['file'] = open(os.devnull, 'w') self.notify_value = kwargs.pop('notify_value', _current_notify_value) - super(_MachineOutputProgessBar, self).__init__(*args, **kwargs) + super(_MachineOutputProgressBar, self).__init__(*args, **kwargs) def __del__(self): self.file.close() def render_progress(self): - super(_MachineOutputProgessBar, self).render_progress() + super(_MachineOutputProgressBar, self).render_progress() obj = {'text': self.label, 'pct': self.pct} if self.show_eta and self.eta_known and not self.finished: obj['eta'] = self.eta diff --git a/pros/conductor/project/__init__.py b/pros/conductor/project/__init__.py index 5f2ba996..edbdc00b 100644 --- a/pros/conductor/project/__init__.py +++ b/pros/conductor/project/__init__.py @@ -96,6 +96,7 @@ def apply_template(self, template: LocalTemplate, force_system: bool = False, fo remove_empty_directories: bool = False): """ Applies a template to a project + :param remove_empty_directories: :param template: :param force_system: :param force_user: @@ -234,12 +235,15 @@ def make_scan_build(self, build_args: Tuple[str], cdb_file: Optional[Union[str, build_args = [*build_args, f'BINDIR={td_path}'] def libscanbuild_capture(args: argparse.Namespace) -> Tuple[int, Iterable[Compilation]]: + """ + Implementation of compilation database generation. + + :param args: the parsed and validated command line arguments + :return: the exit status of build process. + """ from libscanbuild.intercept import setup_environment, run_build, exec_trace_files, parse_exec_trace, \ compilations from libear import temporary_directory - """ Implementation of compilation database generation. - :param args: the parsed and validated command line arguments - :return: the exit status of build process. """ with temporary_directory(prefix='intercept-') as tmp_dir: # run the build command diff --git a/pros/serial/devices/vex/v5_device.py b/pros/serial/devices/vex/v5_device.py index a6729517..6c0965a5 100644 --- a/pros/serial/devices/vex/v5_device.py +++ b/pros/serial/devices/vex/v5_device.py @@ -1,5 +1,5 @@ -import time - +import gzip +import io import re import struct import typing @@ -9,6 +9,8 @@ from io import BytesIO, StringIO from typing import * +from semantic_version import Spec + from pros.common import * from pros.serial import bytes_to_str, decode_bytes_to_str from pros.serial.ports import BasePort, list_all_comports @@ -116,7 +118,7 @@ def generate_ini_file(self, remote_name: str = None, slot: int = 0, ini: ConfigP def write_program(self, file: typing.BinaryIO, remote_name: str = None, ini: ConfigParser = None, slot: int = 0, file_len: int = -1, run_after: FTCompleteOptions = FTCompleteOptions.DONT_RUN, - target: str = 'flash', quirk: int = 0, **kwargs): + target: str = 'flash', quirk: int = 0, compress_bin: bool = True, **kwargs): remote_base = f'slot_{slot + 1}' if target == 'ddr': self.write_file(file, f'{remote_base}.bin', file_len=file_len, type='bin', @@ -135,7 +137,8 @@ def write_program(self, file: typing.BinaryIO, remote_name: str = None, ini: Con if (quirk & 0xff) == 1: # WRITE BIN FILE - self.write_file(file, f'{remote_base}.bin', file_len=file_len, type='bin', run_after=run_after, **kwargs) + self.write_file(file, f'{remote_base}.bin', file_len=file_len, type='bin', run_after=run_after, + compress=compress_bin, **kwargs) with BytesIO(ini_file.encode(encoding='ascii')) as ini_bin: # WRITE INI FILE self.write_file(ini_bin, f'{remote_base}.ini', type='ini', **kwargs) @@ -146,7 +149,8 @@ def write_program(self, file: typing.BinaryIO, remote_name: str = None, ini: Con # WRITE INI FILE self.write_file(ini_bin, f'{remote_base}.ini', type='ini', **kwargs) # WRITE BIN FILE - self.write_file(file, f'{remote_base}.bin', file_len=file_len, type='bin', run_after=run_after, **kwargs) + self.write_file(file, f'{remote_base}.bin', file_len=file_len, type='bin', run_after=run_after, + compress=compress_bin, **kwargs) else: raise ValueError(f'Unknown quirk option: {quirk}') @@ -171,10 +175,25 @@ def read_file(self, file: typing.IO[bytes], remote_file: str, vid: int_str = 'us self.ft_complete() def write_file(self, file: typing.BinaryIO, remote_file: str, file_len: int = -1, - run_after: FTCompleteOptions = FTCompleteOptions.DONT_RUN, **kwargs): + run_after: FTCompleteOptions = FTCompleteOptions.DONT_RUN, compress: bool = False, **kwargs): if file_len < 0: file_len = file.seek(0, 2) file.seek(0, 0) + if compress and self.status['system_version'] in Spec('>=1.0.5'): + buf = io.BytesIO() + with ui.progressbar(length=file_len, label='Compressing binary') as progress: + with gzip.GzipFile(fileobj=buf, mode='wb') as f: + while True: + data = file.read(16 * 1024) + if not data: + break + f.write(data) + progress.update(len(data)) + file = buf + # recompute file length + file_len = file.seek(0, 2) + file.seek(0, 0) + crc32 = self.VEX_CRC32.compute(file.read(file_len)) file.seek(-file_len, 2) addr = kwargs.get('addr', 0x03800000) @@ -198,9 +217,12 @@ def write_file(self, file: typing.BinaryIO, remote_file: str, file_len: int = -1 progress.update(packet_size) logger(__name__).debug('Completed {} of {} bytes'.format(i + packet_size, file_len)) logger(__name__).debug('Data transfer complete, sending ft complete') + if compress and self.status['system_version'] in Spec('>=1.0.5'): + logger(__name__).info('Closing gzip file') + file.close() self.ft_complete(options=run_after) - def capture_screen(self) -> Tuple[List[int], int, int]: + def capture_screen(self) -> Tuple[List[List[int]], int, int]: self.sc_init() width, height = 512, 272 file_size = width * height * 4 # ARGB diff --git a/pros/serial/interactive/UploadProjectModal.py b/pros/serial/interactive/UploadProjectModal.py index d346308a..75f12fde 100644 --- a/pros/serial/interactive/UploadProjectModal.py +++ b/pros/serial/interactive/UploadProjectModal.py @@ -74,6 +74,9 @@ def project_changed(self, new_project: ExistingProjectParameter): ), 'description': parameters.Parameter( self.project.upload_options.get('description', 'Created with PROS') + ), + 'compress_bin': parameters.BooleanParameter( + self.project.upload_options.get('compress_bin', True) ) } else: @@ -93,6 +96,7 @@ def confirm(self, *args, **kwargs): kwargs['name'] = self.advanced_options['name'].value kwargs['slot'] = self.advanced_options['slot'].value kwargs['description'] = self.advanced_options['description'].value + kwargs['compress_bin'] = self.advanced_options['compress_bin'].value self.exit() get_current_context().invoke(upload, **kwargs) @@ -118,5 +122,6 @@ def build(self) -> Generator[components.Component, None, None]: components.InputBox('Program Name', self.advanced_options['name']), components.InputBox('Slot', self.advanced_options['slot']), components.InputBox('Description', self.advanced_options['description']), + components.Checkbox('Compress Binary', self.advanced_options['compress_bin']), title='Advanced V5 Options', collapsed=self.advanced_options_collapsed)