diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index f8b502f..e280c88 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -35,15 +35,17 @@ import tempfile from typing import List, Optional, Dict, Tuple, Any +from qubes.utils import sanitize_stderr_for_log + try: from qubes.device_protocol import DeviceInfo from qubes.device_protocol import DeviceInterface from qubes.device_protocol import Port from qubes.ext import utils - from qubes.devices import UnrecognizedDevice def get_assigned_devices(devices): yield from devices.get_assigned_devices() + except ImportError: # This extension supports both the legacy and new device API. # In the case of the legacy backend, functionality is limited. @@ -51,6 +53,7 @@ def get_assigned_devices(devices): from qubesusbproxy import utils class DescriptionOverrider: + # pylint: disable=too-few-public-methods @property def description(self): return self.name @@ -58,7 +61,7 @@ def description(self): class DeviceInfo(DescriptionOverrider, LegacyDeviceInfo): def __init__(self, port): # not supported options in legacy code - self.safe_chars = self.safe_chars.replace(' ', '') + self.safe_chars = self.safe_chars.replace(" ", "") super().__init__(port.backend_domain, port.port_id) # needed but not in legacy DeviceInfo @@ -81,9 +84,7 @@ class Port: devclass: Any class DeviceInterface: - pass - - class UnrecognizedDevice(ValueError): + # pylint: disable=too-few-public-methods pass def get_assigned_devices(devices): @@ -96,24 +97,26 @@ def get_assigned_devices(devices): usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)*$") # should match valid VM name -usb_connected_to_re = re.compile(br"^[a-zA-Z][a-zA-Z0-9_.-]*$") -usb_device_hw_ident_re = re.compile(r'^[0-9a-f]{4}:[0-9a-f]{4} ') +usb_connected_to_re = re.compile(rb"^[a-zA-Z][a-zA-Z0-9_.-]*$") +usb_device_hw_ident_re = re.compile(r"^[0-9a-f]{4}:[0-9a-f]{4} ") -HWDATA_PATH = '/usr/share/hwdata' +HWDATA_PATH = "/usr/share/hwdata" class USBDevice(DeviceInfo): # pylint: disable=too-few-public-methods def __init__(self, backend_domain, port_id): # the superclass can restrict the allowed characters - self.safe_chars = (string.ascii_letters + string.digits - + string.punctuation + ' ') + self.safe_chars = ( + string.ascii_letters + string.digits + string.punctuation + " " + ) port = Port( - backend_domain=backend_domain, port_id=port_id, devclass="usb") - super(USBDevice, self).__init__(port) + backend_domain=backend_domain, port_id=port_id, devclass="usb" + ) + super().__init__(port) - self._qdb_ident = port_id.replace('.', '_') - self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident + self._qdb_ident = port_id.replace(".", "_") + self._qdb_path = "/qubes-usb-devices/" + self._qdb_ident self._vendor_id = None self._product_id = None @@ -219,9 +222,8 @@ def _load_interfaces_from_qubesdb(self) -> List[DeviceInterface]: if not self.backend_domain.is_running(): # don't cache this value return result - untrusted_interfaces: bytes = ( - self.backend_domain.untrusted_qdb.read( - self._qdb_path + '/interfaces') + untrusted_interfaces: bytes = self.backend_domain.untrusted_qdb.read( + self._qdb_path + "/interfaces" ) if not untrusted_interfaces: return result @@ -229,35 +231,40 @@ def _load_interfaces_from_qubesdb(self) -> List[DeviceInterface]: DeviceInterface( self._sanitize(ifc, safe_chars=string.hexdigits), devclass="usb" ) - for ifc in untrusted_interfaces.split(b':') + for ifc in untrusted_interfaces.split(b":") if ifc ] return result def _load_desc_from_qubesdb(self) -> Dict[str, str]: unknown = "unknown" - result = {"vendor": unknown, - "vendor ID": "0000", - "product": unknown, - "product ID": "0000", - "manufacturer": unknown, - "name": unknown, - "serial": unknown} + result = { + "vendor": unknown, + "vendor ID": "0000", + "product": unknown, + "product ID": "0000", + "manufacturer": unknown, + "name": unknown, + "serial": unknown, + } if not self.backend_domain.is_running(): # don't cache this value return result - untrusted_device_desc: bytes = ( - self.backend_domain.untrusted_qdb.read( - self._qdb_path + '/desc') + untrusted_device_desc: bytes = self.backend_domain.untrusted_qdb.read( + self._qdb_path + "/desc" ) if not untrusted_device_desc: return result try: - (untrusted_vend_prod_id, untrusted_manufacturer, - untrusted_name, untrusted_serial - ) = untrusted_device_desc.split(b' ') + ( + untrusted_vend_prod_id, + untrusted_manufacturer, + untrusted_name, + untrusted_serial, + ) = untrusted_device_desc.split(b" ") untrusted_vendor_id, untrusted_product_id = ( - untrusted_vend_prod_id.split(b':')) + untrusted_vend_prod_id.split(b":") + ) except ValueError: # desc doesn't contain correctly formatted data, # but it is not empty. We cannot parse it, @@ -265,27 +272,31 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]: # some information to the user. untrusted_vendor_id, untrusted_product_id = (b"0000", b"0000") (untrusted_manufacturer, untrusted_serial) = ( - unknown.encode() for _ in range(2)) - untrusted_name = untrusted_device_desc.replace(b' ', b'_') + unknown.encode() for _ in range(2) + ) + untrusted_name = untrusted_device_desc.replace(b" ", b"_") # Data successfully loaded, cache these values self._vendor_id = result["vendor ID"] = self._sanitize( - untrusted_vendor_id) + untrusted_vendor_id + ) self._product_id = result["product ID"] = self._sanitize( - untrusted_product_id) + untrusted_product_id + ) vendor, product = self._get_vendor_and_product_names( - self._vendor_id, self._product_id) + self._vendor_id, self._product_id + ) self._vendor = result["vendor"] = self._sanitize(vendor.encode()) self._product = result["product"] = self._sanitize(product.encode()) - self._manufacturer = result["manufacturer"] = ( - self._sanitize(untrusted_manufacturer)) + self._manufacturer = result["manufacturer"] = self._sanitize( + untrusted_manufacturer + ) self._name = result["name"] = self._sanitize(untrusted_name) self._name = result["serial"] = self._sanitize(untrusted_serial) return result def _sanitize( - self, untrusted_device_desc: bytes, - safe_chars: Optional[str] = None + self, untrusted_device_desc: bytes, safe_chars: Optional[str] = None ) -> str: # rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera' if safe_chars is None: @@ -296,29 +307,29 @@ def _sanitize( i = 0 while i < len(untrusted_device_desc): c = chr(untrusted_device_desc[i]) - if c == '\\': + if c == "\\": i += 1 if i >= len(untrusted_device_desc): break c = chr(untrusted_device_desc[i]) - if c == 'x': + if c == "x": i += 2 if i >= len(untrusted_device_desc): break - hex_code = untrusted_device_desc[i - 1: i + 1] + hex_code = untrusted_device_desc[i - 1 : i + 1] try: for j in range(2): - if hex_code[j] not in b'0123456789abcdefABCDEF': + if hex_code[j] not in b"0123456789abcdefABCDEF": raise ValueError() hex_value = int(hex_code, 16) c = chr(hex_value) except ValueError: - c = '_' + c = "_" if c in safe_chars_set: result += c else: - result += '_' + result += "_" i += 1 return result @@ -327,24 +338,28 @@ def attachment(self): if not self.backend_domain.is_running(): return None untrusted_connected_to = self.backend_domain.untrusted_qdb.read( - self._qdb_path + '/connected-to' + self._qdb_path + "/connected-to" ) if not untrusted_connected_to: return None if not usb_connected_to_re.match(untrusted_connected_to): self.backend_domain.log.warning( - 'Device {} has invalid chars in connected-to ' - 'property'.format(self.port_id)) + f"Device {self.port_id} has invalid chars in connected-to " + "property" + ) return None untrusted_connected_to = untrusted_connected_to.decode( - 'ascii', errors='strict') + "ascii", errors="strict" + ) try: connected_to = self.backend_domain.app.domains[ - untrusted_connected_to] + untrusted_connected_to + ] except KeyError: self.backend_domain.log.warning( - f'Device {self.port_id} has invalid VM name in connected-to ' - f'property: {untrusted_connected_to}') + f"Device {self.port_id} has invalid VM name in connected-to " + f"property: {untrusted_connected_to}" + ) return None return connected_to @@ -361,24 +376,24 @@ def device_id(self) -> str: product_id = self._load_desc_from_qubesdb()["product ID"] else: product_id = self._product_id - interfaces = ''.join(repr(ifc) for ifc in self.interfaces) + interfaces = "".join(repr(ifc) for ifc in self.interfaces) serial = self.serial if self.serial != "unknown" else "" - return \ - f'{vendor_id}:{product_id}:{serial}:{interfaces}' + return f"{vendor_id}:{product_id}:{serial}:{interfaces}" @staticmethod def _get_vendor_and_product_names( - vendor_id: str, product_id: str + vendor_id: str, product_id: str ) -> Tuple[str, str]: """ Return tuple of vendor's and product's names for the ids. If the id is not known, return ("unknown", "unknown"). """ - return (USBDevice._load_usb_known_devices() - .get(vendor_id, dict()) - .get(product_id, ("unknown", "unknown")) - ) + return ( + USBDevice._load_usb_known_devices() + .get(vendor_id, {}) + .get(product_id, ("unknown", "unknown")) + ) @staticmethod def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: @@ -396,29 +411,30 @@ def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: # subclass subclass_name <-- single tab # prog-if prog-if_name <-- two tabs result = {} - with open(HWDATA_PATH + '/usb.ids', - encoding='utf-8', errors='ignore') as usb_ids: + with open( + HWDATA_PATH + "/usb.ids", encoding="utf-8", errors="ignore" + ) as usb_ids: for line in usb_ids.readlines(): line = line.rstrip() - if line.startswith('#'): + if line.startswith("#"): # skip comments continue - elif not line: + if not line: # skip empty lines continue - elif line.startswith('\t\t'): + if line.startswith("\t\t"): # skip interfaces continue - elif line.startswith('C '): + if line.startswith("C "): # description of classes starts here, we can finish break - elif line.startswith('\t'): + if line.startswith("\t"): # save vendor, device pair - device_id, _, device_name = line[1:].split(' ', 2) + device_id, _, device_name = line[1:].split(" ", 2) result[vendor_id][device_id] = vendor_name, device_name else: # new vendor - vendor_id, _, vendor_name = line[:].split(' ', 2) + vendor_id, _, vendor_name = line[:].split(" ", 2) result[vendor_id] = {} return result @@ -443,9 +459,9 @@ def modify_qrexec_policy(service, line, add): :param add: True if line should be added, otherwise False :return: None """ - path = '/etc/qubes-rpc/policy/{}'.format(service) + path = f"/etc/qubes-rpc/policy/{service}" while True: - with open(path, 'a+') as policy: + with open(path, "a+") as policy: # take the lock here, it's released by closing the file fcntl.lockf(policy.fileno(), fcntl.LOCK_EX) # While we were waiting for lock, someone could have unlink()ed @@ -468,12 +484,14 @@ def modify_qrexec_policy(service, line, add): if policy_rules: with tempfile.NamedTemporaryFile( - prefix=path, delete=False) as policy_new: - policy_new.write(''.join(policy_rules).encode()) + prefix=path, delete=False + ) as policy_new: + policy_new.write("".join(policy_rules).encode()) policy_new.flush() try: - os.chown(policy_new.name, -1, - grp.getgrnam('qubes').gr_gid) + os.chown( + policy_new.name, -1, grp.getgrnam("qubes").gr_gid + ) os.chmod(policy_new.name, 0o660) except KeyError: # group 'qubes' not found # don't change mode if no 'qubes' group in the system @@ -487,19 +505,20 @@ def modify_qrexec_policy(service, line, add): class USBDeviceExtension(qubes.ext.Extension): def __init__(self): - super(USBDeviceExtension, self).__init__() + super().__init__() # include dom0 devices in listing only when usb-proxy is really # installed there self.usb_proxy_installed_in_dom0 = os.path.exists( - '/etc/qubes-rpc/qubes.USB') + "/etc/qubes-rpc/qubes.USB" + ) self.devices_cache = collections.defaultdict(dict) - @qubes.ext.handler('domain-init', 'domain-load') + @qubes.ext.handler("domain-init", "domain-load") def on_domain_init_load(self, vm, event): """Initialize watching for changes""" - # pylint: disable=unused-argument,no-self-use - vm.watch_qdb_path('/qubes-usb-devices') - if event == 'domain-load': + # pylint: disable=unused-argument + vm.watch_qdb_path("/qubes-usb-devices") + if event == "domain-load": # avoid building a cache on domain-init, as it isn't fully set yet, # and definitely isn't running yet current_devices = { @@ -515,55 +534,64 @@ async def attach_and_notify(self, vm, assignment): device = assignment.device if assignment.mode.value == "ask-to-attach": allowed = await utils.confirm_device_attachment( - device, {vm: assignment}) + device, {vm: assignment} + ) allowed = allowed.strip() if vm.name != allowed: return await self.on_device_attach_usb( - vm, 'device-pre-attach:usb', device, assignment.options) + vm, "device-pre-attach:usb", device, assignment.options + ) await vm.fire_event_async( - 'device-attach:usb', device=device, options=assignment.options) + "device-attach:usb", device=device, options=assignment.options + ) - @qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') + @qubes.ext.handler("domain-qdb-change:/qubes-usb-devices") def on_qdb_change(self, vm, event, path): """A change in QubesDB means a change in a device list.""" - # pylint: disable=unused-argument,no-self-use - current_devices = dict((dev.port_id, dev.attachment) - for dev in self.on_device_list_usb(vm, None)) + # pylint: disable=unused-argument + current_devices = dict( + (dev.port_id, dev.attachment) + for dev in self.on_device_list_usb(vm, None) + ) utils.device_list_change(self, current_devices, vm, path, USBDevice) - @qubes.ext.handler('device-list:usb') + @qubes.ext.handler("device-list:usb") def on_device_list_usb(self, vm, event): - # pylint: disable=unused-argument,no-self-use + # pylint: disable=unused-argument - if not vm.is_running() or not hasattr(vm, 'untrusted_qdb'): + if not vm.is_running() or not hasattr(vm, "untrusted_qdb"): return - if isinstance(vm, qubes.vm.adminvm.AdminVM) and not \ - self.usb_proxy_installed_in_dom0: + if ( + isinstance(vm, qubes.vm.adminvm.AdminVM) + and not self.usb_proxy_installed_in_dom0 + ): return - untrusted_dev_list = vm.untrusted_qdb.list('/qubes-usb-devices/') + untrusted_dev_list = vm.untrusted_qdb.list("/qubes-usb-devices/") if not untrusted_dev_list: return # just get a list of devices, not its every property - untrusted_dev_list = \ - set(path.split('/')[2] for path in untrusted_dev_list) + untrusted_dev_list = set( + path.split("/")[2] for path in untrusted_dev_list + ) for untrusted_qdb_ident in untrusted_dev_list: if not usb_device_re.match(untrusted_qdb_ident): - vm.log.warning('Invalid USB device name detected') + vm.log.warning("Invalid USB device name detected") continue - port_id = untrusted_qdb_ident.replace('_', '.') + port_id = untrusted_qdb_ident.replace("_", ".") yield USBDevice(vm, port_id) - @qubes.ext.handler('device-get:usb') + @qubes.ext.handler("device-get:usb") def on_device_get_usb(self, vm, event, port_id): - # pylint: disable=unused-argument,no-self-use + # pylint: disable=unused-argument if not vm.is_running(): return if vm.untrusted_qdb.list( - '/qubes-usb-devices/' + port_id.replace('.', '_')): + "/qubes-usb-devices/" + port_id.replace(".", "_") + ): yield USBDevice(vm, port_id) @staticmethod @@ -572,14 +600,14 @@ def get_all_devices(app): if not vm.is_running(): continue - for dev in vm.devices['usb']: + for dev in vm.devices["usb"]: # there may be more than one USB-passthrough implementation if isinstance(dev, USBDevice): yield dev - @qubes.ext.handler('device-list-attached:usb') + @qubes.ext.handler("device-list-attached:usb") def on_device_list_attached(self, vm, event, **kwargs): - # pylint: disable=unused-argument,no-self-use + # pylint: disable=unused-argument if not vm.is_running(): return @@ -587,13 +615,14 @@ def on_device_list_attached(self, vm, event, **kwargs): if dev.attachment == vm: yield (dev, {}) - @qubes.ext.handler('device-pre-attach:usb') + @qubes.ext.handler("device-pre-attach:usb") async def on_device_attach_usb(self, vm, event, device, options): # pylint: disable=unused-argument if options: raise qubes.exc.QubesException( - 'USB device attach does not support user options') + "USB device attach does not support user options" + ) if not vm.is_running() or vm.qid == 0: # print(f"Qube is not running, skipping attachment of {device}", @@ -608,65 +637,65 @@ async def on_device_attach_usb(self, vm, event, device, options): if device.attachment: raise qubes.devices.DeviceAlreadyAttached( - 'Device {!s} already attached to {!s}'.format( - device, device.attachment) + f"Device {device} already attached to {device.attachment}" ) stubdom_qrexec = ( - vm.virt_mode == 'hvm' - and vm.features.check_with_template('stubdom-qrexec', False)) + vm.virt_mode == "hvm" + and vm.features.check_with_template("stubdom-qrexec", False) + ) - name = vm.name + '-dm' if stubdom_qrexec else vm.name + name = vm.name + "-dm" if stubdom_qrexec else vm.name extra_kwargs = {} if stubdom_qrexec: - extra_kwargs['stubdom'] = True + extra_kwargs["stubdom"] = True # update the cache before the call, to avoid sending duplicated events # (one on qubesdb watch and the other by the caller of this method) self.devices_cache[device.backend_domain.name][device.port_id] = vm # set qrexec policy to allow this device - policy_line = '{} {} allow,user=root\n'.format(name, - device.backend_domain.name) - modify_qrexec_policy('qubes.USB+{}'.format(device.port_id), - policy_line, True) + policy_line = f"{name} {device.backend_domain.name} allow,user=root\n" + modify_qrexec_policy(f"qubes.USB+{device.port_id}", policy_line, True) try: # and actual attach try: - await vm.run_service_for_stdio('qubes.USBAttach', - user='root', - input='{} {}\n'.format(device.backend_domain.name, - device.port_id).encode(), **extra_kwargs) + await vm.run_service_for_stdio( + "qubes.USBAttach", + user="root", + input=f"{device.backend_domain.name} " + f"{device.port_id}\n".encode(), + **extra_kwargs, + ) except subprocess.CalledProcessError as e: + # pylint: disable=raise-missing-from if e.returncode == 127: raise USBProxyNotInstalled( - "qubes-usb-proxy not installed in the VM") - else: - # TODO: sanitize and include stdout - sanitized_stderr = e.stderr.replace(b"\n", b", ") - sanitized_stderr = ''.join( - [chr(c) for c in sanitized_stderr if 0x20 <= c < 0x80]) - if sanitized_stderr.endswith(", "): - sanitized_stderr = sanitized_stderr[:-2] + "." - raise QubesUSBException( - 'Device attach failed: {}'.format(sanitized_stderr)) + "qubes-usb-proxy not installed in the VM" + ) + raise QubesUSBException( + f"Device attach failed: {sanitize_stderr_for_log(e.output)}" + f" {sanitize_stderr_for_log(e.stderr)}" + ) finally: - modify_qrexec_policy('qubes.USB+{}'.format(device.port_id), - policy_line, False) + modify_qrexec_policy( + f"qubes.USB+{device.port_id}", policy_line, False + ) - @qubes.ext.handler('device-pre-detach:usb') + @qubes.ext.handler("device-pre-detach:usb") async def on_device_detach_usb(self, vm, event, port): - # pylint: disable=unused-argument,no-self-use + # pylint: disable=unused-argument if not vm.is_running() or vm.qid == 0: return - for attached, options in self.on_device_list_attached(vm, event): + for attached, _options in self.on_device_list_attached(vm, event): if attached.port == port: break else: raise QubesUSBException( - f"Device {port} not connected to VM {vm.name}") + f"Device {port} not connected to VM {vm.name}" + ) # update the cache before the call, to avoid sending duplicated events # (one on qubesdb watch and the other by the caller of this method) @@ -675,26 +704,31 @@ async def on_device_detach_usb(self, vm, event, port): try: await backend.run_service_for_stdio( - 'qubes.USBDetach', - user='root', - input='{}\n'.format(attached.port_id).encode()) + "qubes.USBDetach", + user="root", + input=f"{attached.port_id}\n".encode(), + ) except subprocess.CalledProcessError as e: - # TODO: sanitize and include stdout - raise QubesUSBException('Device detach failed') + # pylint: disable=raise-missing-from + raise QubesUSBException( + f"Device detach failed: {sanitize_stderr_for_log(e.output)}" + f" {sanitize_stderr_for_log(e.stderr)}" + ) - @qubes.ext.handler('device-pre-assign:usb') + @qubes.ext.handler("device-pre-assign:usb") async def on_device_assign_usb(self, vm, event, device, options): # pylint: disable=unused-argument if options: raise qubes.exc.QubesException( - 'USB device assignment does not support user options') + "USB device assignment does not support user options" + ) - @qubes.ext.handler('domain-start') + @qubes.ext.handler("domain-start") async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument to_attach = {} - assignments = get_assigned_devices(vm.devices['usb']) + assignments = get_assigned_devices(vm.devices["usb"]) # the most specific assignments first for assignment in reversed(sorted(assignments)): for device in assignment.devices: @@ -705,7 +739,9 @@ async def on_domain_start(self, vm, _event, **_kwargs): if not assignment.matches(device): print( "Unrecognized identity, skipping attachment of device " - f"from the port {assignment}", file=sys.stderr) + f"from the port {assignment}", + file=sys.stderr, + ) continue # chose first assignment (the most specific) and ignore rest if device not in to_attach: @@ -714,16 +750,17 @@ async def on_domain_start(self, vm, _event, **_kwargs): in_progress = set() for assignment in to_attach.values(): in_progress.add( - asyncio.ensure_future(self.attach_and_notify(vm, assignment))) + asyncio.ensure_future(self.attach_and_notify(vm, assignment)) + ) if in_progress: await asyncio.wait(in_progress) - @qubes.ext.handler('domain-shutdown') + @qubes.ext.handler("domain-shutdown") async def on_domain_shutdown(self, vm, _event, **_kwargs): # pylint: disable=unused-argument - vm.fire_event('device-list-change:usb') + vm.fire_event("device-list-change:usb") - @qubes.ext.handler('qubes-close', system=True) + @qubes.ext.handler("qubes-close", system=True) def on_qubes_close(self, app, event): # pylint: disable=unused-argument self.devices_cache.clear() diff --git a/qubesusbproxy/tests.py b/qubesusbproxy/tests.py index 9459d4b..754ff31 100644 --- a/qubesusbproxy/tests.py +++ b/qubesusbproxy/tests.py @@ -21,6 +21,7 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # +import time import unittest from unittest import mock from unittest.mock import Mock, AsyncMock @@ -28,7 +29,6 @@ import jinja2 import qubes.tests.extra -import time core2 = False core3 = False @@ -41,9 +41,10 @@ from qubes.device_protocol import DeviceAssignment, VirtualDevice, Port def make_assignment(backend, ident, auto_attach=False): - return DeviceAssignment(VirtualDevice(Port( - backend, ident, 'usb')), - mode='auto-attach' if auto_attach else 'manual') + return DeviceAssignment( + VirtualDevice(Port(backend, ident, "usb")), + mode="auto-attach" if auto_attach else "manual", + ) def assign(test, collection, assignment): test.loop.run_until_complete(collection.assign(assignment)) @@ -57,6 +58,7 @@ def unassign(test, collection, assignment): from qubes.devices import DeviceAssignment def make_assignment(backend, ident, required=False): + # pylint: disable=unexpected-keyword-arg return DeviceAssignment(backend, ident, persistent=required) def assign(test, collection, assignment): @@ -64,7 +66,7 @@ def assign(test, collection, assignment): def unassign(test, collection, assignment): test.loop.run_until_complete(collection.detach(assignment)) - + LEGACY = True core3 = True @@ -80,54 +82,61 @@ def unassign(test, collection, assignment): is_r40 = False try: - with open('/etc/qubes-release') as f: - if 'R4.0' in f.read(): + with open("/etc/qubes-release") as f: + if "R4.0" in f.read(): is_r40 = True except FileNotFoundError: pass -GADGET_PREREQ = '&&'.join([ - "modprobe dummy_hcd", - "modprobe usb_f_mass_storage", - "mount|grep -q configfs", - "test -d /sys/class/udc/dummy_udc.0", -]) - -GADGET_PREPARE = ';'.join([ - "set -e -x", - "cd /sys/kernel/config/usb_gadget", - "mkdir test_g1; cd test_g1", - "echo 0x1234 > idProduct", - "echo 0x1234 > idVendor", - "mkdir strings/0x409", - "echo 0123456789 > strings/0x409/serialnumber", - "echo Qubes > strings/0x409/manufacturer", - "echo Test device > strings/0x409/product", - "mkdir configs/c.1", - "mkdir functions/mass_storage.ms1", - "truncate -s 512M /var/tmp/test-file", - "echo /var/tmp/test-file > functions/mass_storage.ms1/lun.0/file", - "ln -s functions/mass_storage.ms1 configs/c.1", - "echo dummy_udc.0 > UDC", - "sleep 2; udevadm settle", -]) +GADGET_PREREQ = "&&".join( + [ + "modprobe dummy_hcd", + "modprobe usb_f_mass_storage", + "mount|grep -q configfs", + "test -d /sys/class/udc/dummy_udc.0", + ] +) + +GADGET_PREPARE = ";".join( + [ + "set -e -x", + "cd /sys/kernel/config/usb_gadget", + "mkdir test_g1; cd test_g1", + "echo 0x1234 > idProduct", + "echo 0x1234 > idVendor", + "mkdir strings/0x409", + "echo 0123456789 > strings/0x409/serialnumber", + "echo Qubes > strings/0x409/manufacturer", + "echo Test device > strings/0x409/product", + "mkdir configs/c.1", + "mkdir functions/mass_storage.ms1", + "truncate -s 512M /var/tmp/test-file", + "echo /var/tmp/test-file > functions/mass_storage.ms1/lun.0/file", + "ln -s functions/mass_storage.ms1 configs/c.1", + "echo dummy_udc.0 > UDC", + "sleep 2; udevadm settle", + ] +) def create_usb_gadget(vm): vm.start() - p = vm.run(GADGET_PREREQ, user="root", - passio_popen=True, passio_stderr=True) + p = vm.run( + GADGET_PREREQ, user="root", passio_popen=True, passio_stderr=True + ) (_, _stderr) = p.communicate() if p.returncode != 0: raise unittest.SkipTest("missing USB Gadget subsystem") - p = vm.run(GADGET_PREPARE, user="root", - passio_popen=True, passio_stderr=True) + p = vm.run( + GADGET_PREPARE, user="root", passio_popen=True, passio_stderr=True + ) (_, stderr) = p.communicate() if p.returncode != 0: raise RuntimeError("Failed to setup USB gadget: " + stderr.decode()) p = vm.run( - 'ls /sys/bus/platform/devices/dummy_hcd.0/usb*|grep -x .-.', - passio_popen=True) + "ls /sys/bus/platform/devices/dummy_hcd.0/usb*|grep -x .-.", + passio_popen=True, + ) (stdout, _) = p.communicate() stdout = stdout.strip() if not stdout: @@ -138,26 +147,30 @@ def create_usb_gadget(vm): def remove_usb_gadget(vm): assert vm.is_running() - retcode = vm.run("echo > /sys/kernel/config/usb_gadget/test_g1/UDC", - user="root", wait=True) + retcode = vm.run( + "echo > /sys/kernel/config/usb_gadget/test_g1/UDC", + user="root", + wait=True, + ) if retcode != 0: raise RuntimeError("Failed to disable USB gadget") def recreate_usb_gadget(vm): - '''Re-create the gadget previously created with *create_usb_gadget*, + """Re-create the gadget previously created with *create_usb_gadget*, then removed with *remove_usb_gadget*. - ''' - - reconnect = ";".join([ - "cd /sys/kernel/config/usb_gadget", - "mkdir test_g1; cd test_g1", - "echo dummy_udc.0 > UDC", - "sleep 2; udevadm settle", - ]) - - p = vm.run(reconnect, user="root", - passio_popen=True, passio_stderr=True) + """ + + reconnect = ";".join( + [ + "cd /sys/kernel/config/usb_gadget", + "mkdir test_g1; cd test_g1", + "echo dummy_udc.0 > UDC", + "sleep 2; udevadm settle", + ] + ) + + p = vm.run(reconnect, user="root", passio_popen=True, passio_stderr=True) (_, stderr) = p.communicate() if p.returncode != 0: raise RuntimeError("Failed to re-create USB gadget: " + stderr.decode()) @@ -165,84 +178,114 @@ def recreate_usb_gadget(vm): class TC_00_USBProxy(qubes.tests.extra.ExtraTestCase): def setUp(self): - if 'whonix-gw' in self.template: - self.skipTest('whonix-gw does not have qubes-usb-proxy') - super(TC_00_USBProxy, self).setUp() + if "whonix-gw" in self.template: + self.skipTest("whonix-gw does not have qubes-usb-proxy") + super().setUp() vms = self.create_vms(["backend", "frontend"]) (self.backend, self.frontend) = vms - self.qrexec_policy('qubes.USB', self.frontend.name, self.backend.name) + self.qrexec_policy("qubes.USB", self.frontend.name, self.backend.name) self.dummy_usb_dev = create_usb_gadget(self.backend).decode() def test_000_attach_detach(self): self.frontend.start() # TODO: check qubesdb entries - self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format( - self.backend.name, - self.dummy_usb_dev)), 0, - "qubes.USBAttach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBAttach", + user="root", + input=f"{self.backend.name} {self.dummy_usb_dev}\n", + ), + 0, + "qubes.USBAttach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) # TODO: check qubesdb entries - self.assertEqual(self.frontend.run_service('qubes.USBDetach', - user='root', - input="{} {}\n".format( - self.backend.name, - self.dummy_usb_dev)), 0, - "qubes.USBDetach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device disconnection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBDetach", + user="root", + input=f"{self.backend.name} {self.dummy_usb_dev}\n", + ), + 0, + "qubes.USBDetach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 1, + "Device disconnection failed", + ) def test_010_attach_detach_vid_pid(self): self.frontend.start() # TODO: check qubesdb entries - self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format( - self.backend.name, - "0x1234.0x1234")), 0, - "qubes.USBAttach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBAttach", + user="root", + input=f"{self.backend.name} 0x1234.0x1234\n", + ), + 0, + "qubes.USBAttach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) # TODO: check qubesdb entries - self.assertEqual(self.frontend.run_service('qubes.USBDetach', - user='root', - input="{} {}\n".format( - self.backend.name, - "0x1234.0x1234")), 0, - "qubes.USBDetach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device disconnection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBDetach", + user="root", + input=f"{self.backend.name} 0x1234.0x1234\n", + ), + 0, + "qubes.USBDetach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 1, + "Device disconnection failed", + ) def test_020_detach_on_remove(self): self.frontend.start() - self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format( - self.backend.name, - self.dummy_usb_dev)), 0, - "qubes.USBAttach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBAttach", + user="root", + input=f"{self.backend.name} {self.dummy_usb_dev}\n", + ), + 0, + "qubes.USBAttach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) remove_usb_gadget(self.backend) # FIXME: usb-export script may update qubesdb/disconnect with 1sec delay time.sleep(2) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device not cleaned up") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 1, + "Device not cleaned up", + ) # TODO: check for kernel errors? class TC_10_USBProxy_core2(qubes.tests.extra.ExtraTestCase): def setUp(self): - super(TC_10_USBProxy_core2, self).setUp() - vms = self.create_vms(["backend", "frontend"]) - (self.backend, self.frontend) = vms - self.qrexec_policy('qubes.USB', self.frontend.name, self.backend.name) + super().setUp() + self.backend, self.frontend = self.create_vms(["backend", "frontend"]) + self.qrexec_policy("qubes.USB", self.frontend.name, self.backend.name) self.dummy_usb_dev = create_usb_gadget(self.backend) - self.usbdev_name = '{}:{}'.format(self.backend.name, self.dummy_usb_dev) + self.usbdev_name = f"{self.backend.name}:{self.dummy_usb_dev}" def test_000_list_all(self): usb_list = qubes.qubesutils.usb_list(self.qc) @@ -256,47 +299,55 @@ def test_020_attach(self): self.frontend.start() usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: - qubes.qubesutils.usb_attach(self.qc, - self.frontend, - usb_list[self.usbdev_name]) + qubes.qubesutils.usb_attach( + self.qc, self.frontend, usb_list[self.usbdev_name] + ) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertEqual(usb_list[self.usbdev_name]['connected-to'], - self.frontend) + self.assertEqual( + usb_list[self.usbdev_name]["connected-to"], self.frontend + ) def test_030_detach(self): self.frontend.start() usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + qubes.qubesutils.usb_attach( + self.qc, self.frontend, usb_list[self.usbdev_name] + ) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) - qubes.qubesutils.usb_detach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + qubes.qubesutils.usb_detach( + self.qc, self.frontend, usb_list[self.usbdev_name] + ) # FIXME: usb-export script may update qubesdb with 1sec delay time.sleep(2) usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertIsNone(usb_list[self.usbdev_name]['connected-to']) + self.assertIsNone(usb_list[self.usbdev_name]["connected-to"]) - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + self.assertNotEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device disconnection failed", + ) def test_040_detach_all(self): self.frontend.start() usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + qubes.qubesutils.usb_attach( + self.qc, self.frontend, usb_list[self.usbdev_name] + ) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -305,37 +356,44 @@ def test_040_detach_all(self): time.sleep(2) usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertIsNone(usb_list[self.usbdev_name]['connected-to']) + self.assertIsNone(usb_list[self.usbdev_name]["connected-to"]) - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + self.assertNotEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device disconnection failed", + ) def test_050_list_attached(self): - """ Attached device should not be listed as further attachable """ + """Attached device should not be listed as further attachable""" self.frontend.start() usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - usb_list_front_pre = qubes.qubesutils.usb_list(self.qc, - vm=self.frontend) + usb_list_front_pre = qubes.qubesutils.usb_list( + self.qc, vm=self.frontend + ) try: - qubes.qubesutils.usb_attach(self.qc, - self.frontend, - usb_list[self.usbdev_name]) + qubes.qubesutils.usb_attach( + self.qc, self.frontend, usb_list[self.usbdev_name] + ) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertEqual(usb_list[self.usbdev_name]['connected-to'], - self.frontend) + self.assertEqual( + usb_list[self.usbdev_name]["connected-to"], self.frontend + ) - usb_list_front_post = qubes.qubesutils.usb_list(self.qc, - vm=self.frontend) + usb_list_front_post = qubes.qubesutils.usb_list( + self.qc, vm=self.frontend + ) self.assertEqual(usb_list_front_pre, usb_list_front_post) @@ -343,8 +401,9 @@ def test_060_auto_detach_on_remove(self): self.frontend.start() usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + qubes.qubesutils.usb_attach( + self.qc, self.frontend, usb_list[self.usbdev_name] + ) except qubes.qubesutils.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -354,180 +413,201 @@ def test_060_auto_detach_on_remove(self): usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) self.assertNotIn(self.usbdev_name, usb_list) - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + self.assertNotEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device disconnection failed", + ) def test_070_attach_not_installed_front(self): self.frontend.start() # simulate package not installed - retcode = self.frontend.run("rm -f /etc/qubes-rpc/qubes.USBAttach", - user="root", wait=True) + retcode = self.frontend.run( + "rm -f /etc/qubes-rpc/qubes.USBAttach", user="root", wait=True + ) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) with self.assertRaises(qubes.qubesutils.USBProxyNotInstalled): - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + qubes.qubesutils.usb_attach( + self.qc, self.frontend, usb_list[self.usbdev_name] + ) @unittest.expectedFailure def test_075_attach_not_installed_back(self): self.frontend.start() # simulate package not installed - retcode = self.backend.run("rm -f /etc/qubes-rpc/qubes.USB", - user="root", wait=True) + retcode = self.backend.run( + "rm -f /etc/qubes-rpc/qubes.USB", user="root", wait=True + ) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) try: - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) + qubes.qubesutils.usb_attach( + self.qc, self.frontend, usb_list[self.usbdev_name] + ) except qubes.qubesutils.USBProxyNotInstalled: pass except Exception as e: self.fail( - 'Wrong exception raised (expected USBProxyNotInstalled): ' - '{!r}'.format(e)) + "Wrong exception raised (expected USBProxyNotInstalled): " + + f"{e!r}" + ) else: - self.fail('USBProxyNotInstalled not raised') + self.fail("USBProxyNotInstalled not raised") class TC_20_USBProxy_core3(qubes.tests.extra.ExtraTestCase): # noinspection PyAttributeOutsideInit def setUp(self): - super(TC_20_USBProxy_core3, self).setUp() - vms = self.create_vms(["backend", "frontend"]) - (self.backend, self.frontend) = vms - self.qrexec_policy('qubes.USB', self.frontend.name, self.backend.name) + super().setUp() + self.backend, self.frontend = self.create_vms(["backend", "frontend"]) + self.qrexec_policy("qubes.USB", self.frontend.name, self.backend.name) self.usbdev_ident = create_usb_gadget(self.backend).decode() - self.usbdev_name = '{}:{}:{}'.format( - self.backend.name, self.usbdev_ident, - "1234:1234:0123456789:u080650") + self.usbdev_name = ( + f"{self.backend.name}:{self.usbdev_ident}" + ":1234:1234:0123456789:u080650" + ) def tearDown(self): # remove vms in this specific order, otherwise there may remain stray # dependency between them (so, objects leaks) self.remove_vms((self.frontend, self.backend)) - super(TC_20_USBProxy_core3, self).tearDown() + super().tearDown() def test_000_list(self): - usb_list = self.backend.devices['usb'] + usb_list = self.backend.devices["usb"] self.assertIn(self.usbdev_name, [str(dev) for dev in usb_list]) def test_010_assign(self): - usb_dev = self.backend.devices['usb'][self.usbdev_ident] + usb_dev = self.backend.devices["usb"][self.usbdev_ident] ass = make_assignment(self.backend, self.usbdev_ident, auto_attach=True) - assign(self, self.frontend.devices['usb'], ass) + assign(self, self.frontend.devices["usb"], ass) self.assertIsNone(usb_dev.attachment) try: self.frontend.start() except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) self.assertEqual(usb_dev.attachment, self.frontend) - @unittest.mock.patch('qubes.ext.utils.confirm_device_attachment') + @unittest.mock.patch("qubes.ext.utils.confirm_device_attachment") @unittest.skipIf(LEGACY, "new feature") def test_011_assign_ask(self, confirm): confirm.return_value = self.frontend.name - usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = DeviceAssignment(VirtualDevice(Port( - self.backend, self.usbdev_ident, 'usb')), - mode='ask-to-attach') - assign(self, self.frontend.devices['usb'], ass) + usb_dev = self.backend.devices["usb"][self.usbdev_ident] + ass = DeviceAssignment( + VirtualDevice(Port(self.backend, self.usbdev_ident, "usb")), + mode="ask-to-attach", + ) + assign(self, self.frontend.devices["usb"], ass) self.assertIsNone(usb_dev.attachment) try: self.frontend.start() except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) self.assertEqual(usb_dev.attachment, self.frontend) def test_020_attach(self): self.frontend.start() - usb_dev = self.backend.devices['usb'][self.usbdev_ident] + usb_dev = self.backend.devices["usb"][self.usbdev_ident] ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) self.assertEqual(usb_dev.attachment, self.frontend) def test_030_detach(self): self.frontend.start() - usb_dev = self.backend.devices['usb'][self.usbdev_ident] + usb_dev = self.backend.devices["usb"][self.usbdev_ident] ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.loop.run_until_complete( - self.frontend.devices['usb'].detach(ass)) + self.loop.run_until_complete(self.frontend.devices["usb"].detach(ass)) # FIXME: usb-export script may update qubesdb with 1sec delay self.loop.run_until_complete(asyncio.sleep(2)) self.assertIsNone(usb_dev.attachment) - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + self.assertNotEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device disconnection failed", + ) def test_040_unassign(self): - usb_dev = self.backend.devices['usb'][self.usbdev_ident] + usb_dev = self.backend.devices["usb"][self.usbdev_ident] ass = make_assignment(self.backend, self.usbdev_ident, auto_attach=True) - assign(self, self.frontend.devices['usb'], ass) + assign(self, self.frontend.devices["usb"], ass) self.assertIsNone(usb_dev.attachment) - unassign(self, self.frontend.devices['usb'], ass) + unassign(self, self.frontend.devices["usb"], ass) self.assertIsNone(usb_dev.attachment) def test_050_list_attached(self): - """ Attached device should not be listed as further attachable """ + """Attached device should not be listed as further attachable""" self.frontend.start() - usb_list = self.backend.devices['usb'] + usb_list = self.backend.devices["usb"] - usb_list_front_pre = list(self.frontend.devices['usb']) + usb_list_front_pre = list(self.frontend.devices["usb"]) ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) self.assertEqual(usb_list[self.usbdev_ident].attachment, self.frontend) - usb_list_front_post = list(self.frontend.devices['usb']) + usb_list_front_post = list(self.frontend.devices["usb"]) self.assertEqual(usb_list_front_pre, usb_list_front_post) def test_060_auto_detach_on_remove(self): self.frontend.start() - usb_list = self.backend.devices['usb'] + usb_list = self.backend.devices["usb"] ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -536,16 +616,18 @@ def test_060_auto_detach_on_remove(self): self.loop.run_until_complete(asyncio.sleep(2)) self.assertNotIn(self.usbdev_name, [str(dev) for dev in usb_list]) - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + self.assertNotEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device disconnection failed", + ) def test_061_auto_attach_on_reconnect(self): self.frontend.start() - usb_list = self.backend.devices['usb'] + usb_list = self.backend.devices["usb"] ass = make_assignment(self.backend, self.usbdev_ident, auto_attach=True) try: - assign(self, self.frontend.devices['usb'], ass) + assign(self, self.frontend.devices["usb"], ass) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -555,25 +637,27 @@ def test_061_auto_attach_on_reconnect(self): while self.usbdev_name in (str(dev) for dev in usb_list): self.loop.run_until_complete(asyncio.sleep(1)) timeout -= 1 - self.assertGreater(timeout, 0, 'timeout on device remove') + self.assertGreater(timeout, 0, "timeout on device remove") recreate_usb_gadget(self.backend) timeout = 5 while self.usbdev_name not in (str(dev) for dev in usb_list): self.loop.run_until_complete(asyncio.sleep(1)) timeout -= 1 - self.assertGreater(timeout, 0, 'timeout on device create') + self.assertGreater(timeout, 0, "timeout on device create") self.loop.run_until_complete(asyncio.sleep(5)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device reconnection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device reconnection failed", + ) def test_062_ask_to_attach_on_start(self): self.frontend.start() - usb_list = self.backend.devices['usb'] + usb_list = self.backend.devices["usb"] ass = make_assignment(self.backend, self.usbdev_ident, auto_attach=True) try: - assign(self, self.frontend.devices['usb'], ass) + assign(self, self.frontend.devices["usb"], ass) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -583,79 +667,89 @@ def test_062_ask_to_attach_on_start(self): while self.usbdev_name in (str(dev) for dev in usb_list): self.loop.run_until_complete(asyncio.sleep(1)) timeout -= 1 - self.assertGreater(timeout, 0, 'timeout on device remove') + self.assertGreater(timeout, 0, "timeout on device remove") recreate_usb_gadget(self.backend) timeout = 5 while self.usbdev_name not in (str(dev) for dev in usb_list): self.loop.run_until_complete(asyncio.sleep(1)) timeout -= 1 - self.assertGreater(timeout, 0, 'timeout on device create') + self.assertGreater(timeout, 0, "timeout on device create") self.loop.run_until_complete(asyncio.sleep(5)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device reconnection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device reconnection failed", + ) def test_070_attach_not_installed_front(self): self.frontend.start() # simulate package not installed - retcode = self.frontend.run("rm -f /etc/qubes-rpc/qubes.USBAttach", - user="root", wait=True) + retcode = self.frontend.run( + "rm -f /etc/qubes-rpc/qubes.USBAttach", user="root", wait=True + ) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") ass = make_assignment(self.backend, self.usbdev_ident) with self.assertRaises(qubesusbproxy.core3ext.USBProxyNotInstalled): self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) @unittest.expectedFailure def test_075_attach_not_installed_back(self): self.frontend.start() # simulate package not installed - retcode = self.backend.run("rm -f /etc/qubes-rpc/qubes.USB", - user="root", wait=True) + retcode = self.backend.run( + "rm -f /etc/qubes-rpc/qubes.USB", user="root", wait=True + ) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") ass = make_assignment(self.backend, self.usbdev_ident) try: with self.assertRaises(qubesusbproxy.core3ext.USBProxyNotInstalled): self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.QubesUSBException as e: - self.fail('Generic exception raise instead of specific ' - 'USBProxyNotInstalled: ' + str(e)) + self.fail( + "Generic exception raise instead of specific " + "USBProxyNotInstalled: " + str(e) + ) def test_080_attach_existing_policy(self): self.frontend.start() # this override policy file, but during normal execution it shouldn't # exist, so should be ok, especially on a testing system with open( - '/etc/qubes-rpc/policy/qubes.USB+{}'.format(self.usbdev_ident), - 'w+') as policy_file: - policy_file.write('# empty policy\n') + f"/etc/qubes-rpc/policy/qubes.USB+{self.usbdev_ident}", "w+" + ) as policy_file: + policy_file.write("# empty policy\n") ass = make_assignment(self.backend, self.usbdev_ident) - self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.loop.run_until_complete(self.frontend.devices["usb"].attach(ass)) @unittest.skipIf(is_r40, "Not supported on R4.0") def test_090_attach_stubdom(self): - self.frontend.virt_mode = 'hvm' - self.frontend.features['stubdom-qrexec'] = True + self.frontend.virt_mode = "hvm" + self.frontend.features["stubdom-qrexec"] = True self.frontend.start() ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) time.sleep(5) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) -class TestQubesDB(object): +class TestQubesDB: def __init__(self, data): self._data = data @@ -666,29 +760,29 @@ def list(self, prefix): return [key for key in self._data if key.startswith(prefix)] -class TestApp(object): +class TestApp: class Domains(dict): - def __init__(self): - super().__init__() - def __iter__(self): return iter(self.values()) def __init__(self): #: jinja2 environment for libvirt XML templates self.env = jinja2.Environment( - loader=jinja2.FileSystemLoader([ - 'templates', - '/etc/qubes/templates', - '/usr/share/qubes/templates', - ]), + loader=jinja2.FileSystemLoader( + [ + "templates", + "/etc/qubes/templates", + "/usr/share/qubes/templates", + ] + ), undefined=jinja2.StrictUndefined, - autoescape=True) + autoescape=True, + ) self.domains = TestApp.Domains() self.vmm = mock.Mock() -class TestDeviceCollection(object): +class TestDeviceCollection: def __init__(self, backend_vm, devclass): self._exposed = [] self._assigned = [] @@ -707,26 +801,23 @@ def __getitem__(self, port_id): for dev in self._exposed: if dev.port_id == port_id: return dev + raise KeyError() class TestVM(qubes.tests.TestEmitter): - def __init__( - self, qdb, running=True, name='test-vm', *args, **kwargs): - super(TestVM, self).__init__(*args, **kwargs) + def __init__(self, qdb, running=True, name="test-vm", **kwargs): + super().__init__(**kwargs) self.name = name self.untrusted_qdb = TestQubesDB(qdb) self.libvirt_domain = mock.Mock() self.features = mock.Mock() - self.features.check_with_template.side_effect = ( - lambda name, default: - '4.2' if name == 'qubes-agent-version' - else None) + self.features.check_with_template.side_effect = lambda name, default: ( + "4.2" if name == "qubes-agent-version" else None + ) self.is_running = lambda: running self.log = mock.Mock() self.app = TestApp() - self.devices = { - 'testclass': TestDeviceCollection(self, 'testclass') - } + self.devices = {"testclass": TestDeviceCollection(self, "testclass")} def __hash__(self): return hash(self.name) @@ -734,6 +825,7 @@ def __hash__(self): def __eq__(self, other): if isinstance(other, TestVM): return self.name == other.name + return False def __str__(self): return self.name @@ -741,15 +833,15 @@ def __str__(self): def get_qdb(attachment=None): result = { - '/qubes-usb-devices/1-1/desc': b'1a0a:badd USB-IF Test\x20Device', - '/qubes-usb-devices/1-1/interfaces': b':ffff00:020600:0a0000:', - '/qubes-usb-devices/1-1/usb-ver': b'2', - '/qubes-usb-devices/1-2/desc': b'1a0a:badd USB-IF Test\x20Device\x202', - '/qubes-usb-devices/1-2/interfaces': b':0acafe:', - '/qubes-usb-devices/1-2/usb-ver': b'3', + "/qubes-usb-devices/1-1/desc": b"1a0a:badd USB-IF Test\x20Device", + "/qubes-usb-devices/1-1/interfaces": b":ffff00:020600:0a0000:", + "/qubes-usb-devices/1-1/usb-ver": b"2", + "/qubes-usb-devices/1-2/desc": b"1a0a:badd USB-IF Test\x20Device\x202", + "/qubes-usb-devices/1-2/interfaces": b":0acafe:", + "/qubes-usb-devices/1-2/usb-ver": b"3", } if attachment: - result['/qubes-usb-devices/1-1/connected-to'] = attachment.encode() + result["/qubes-usb-devices/1-1/connected-to"] = attachment.encode() return result @@ -761,274 +853,332 @@ def setUp(self): @staticmethod def added_assign_setup(attachment=None): - back_vm = TestVM(qdb=get_qdb(attachment), name='sys-usb') - front = TestVM({}, name='front-vm') - dom0 = TestVM({}, name='dom0') - back_vm.app.domains['sys-usb'] = back_vm - back_vm.app.domains['front-vm'] = front + back_vm = TestVM(qdb=get_qdb(attachment), name="sys-usb") + front = TestVM({}, name="front-vm") + dom0 = TestVM({}, name="dom0") + back_vm.app.domains["sys-usb"] = back_vm + back_vm.app.domains["front-vm"] = front back_vm.app.domains[0] = dom0 front.app = back_vm.app dom0.app = back_vm.app - back_vm.app.vmm.configure_mock(**{'offline_mode': False}) + back_vm.app.vmm.configure_mock(**{"offline_mode": False}) fire_event_async = mock.Mock() front.fire_event_async = fire_event_async - back_vm.devices['usb'] = TestDeviceCollection( - backend_vm=back_vm, devclass='usb') - front.devices['usb'] = TestDeviceCollection( - backend_vm=front, devclass='usb') - dom0.devices['usb'] = TestDeviceCollection( - backend_vm=dom0, devclass='usb') + back_vm.devices["usb"] = TestDeviceCollection( + backend_vm=back_vm, devclass="usb" + ) + front.devices["usb"] = TestDeviceCollection( + backend_vm=front, devclass="usb" + ) + dom0.devices["usb"] = TestDeviceCollection( + backend_vm=dom0, devclass="usb" + ) return back_vm, front def test_010_on_qdb_change_multiple_assignments_including_full(self): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - full_assig = DeviceAssignment(VirtualDevice( - exp_dev.port, exp_dev.device_id), mode='auto-attach', - options={'pid': 'did'}) - port_assign = DeviceAssignment(VirtualDevice( - exp_dev.port, '*'), mode='auto-attach', - options={'pid': 'any'}) - dev_assign = DeviceAssignment(VirtualDevice(Port( - exp_dev.backend_domain, '*', 'usb'), - exp_dev.device_id), mode='auto-attach', - options={'any': 'did'}) - - front.devices['usb']._assigned.append(dev_assign) - front.devices['usb']._assigned.append(port_assign) - front.devices['usb']._assigned.append(full_assig) - back.devices['usb']._exposed.append( - qubesusbproxy.core3ext.USBDevice(back, '1-1')) + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + full_assig = DeviceAssignment( + VirtualDevice(exp_dev.port, exp_dev.device_id), + mode="auto-attach", + options={"pid": "did"}, + ) + port_assign = DeviceAssignment( + VirtualDevice(exp_dev.port, "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + front.devices["usb"]._assigned.append(full_assig) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(back, "1-1") + ) self.ext.attach_and_notify = Mock() loop = asyncio.get_event_loop() - with mock.patch('asyncio.wait'): - with mock.patch('asyncio.ensure_future'): + with mock.patch("asyncio.wait"): + with mock.patch("asyncio.ensure_future"): loop.run_until_complete(self.ext.on_domain_start(front, None)) - self.assertEqual(self.ext.attach_and_notify.call_args[0][1].options, - {'pid': 'did'}) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"pid": "did"} + ) def test_011_on_qdb_change_multiple_assignments_port_vs_dev(self): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - port_assign = DeviceAssignment(VirtualDevice( - exp_dev.port, '*'), mode='auto-attach', - options={'pid': 'any'}) - dev_assign = DeviceAssignment(VirtualDevice(Port( - exp_dev.backend_domain, '*', 'usb'), - exp_dev.device_id), mode='auto-attach', - options={'any': 'did'}) - - front.devices['usb']._assigned.append(dev_assign) - front.devices['usb']._assigned.append(port_assign) - back.devices['usb']._exposed.append( - qubesusbproxy.core3ext.USBDevice(back, '1-1')) + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + port_assign = DeviceAssignment( + VirtualDevice(exp_dev.port, "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(back, "1-1") + ) self.ext.attach_and_notify = Mock() loop = asyncio.get_event_loop() - with mock.patch('asyncio.wait'): - with mock.patch('asyncio.ensure_future'): + with mock.patch("asyncio.wait"): + with mock.patch("asyncio.ensure_future"): loop.run_until_complete(self.ext.on_domain_start(front, None)) - self.assertEqual(self.ext.attach_and_notify.call_args[0][1].options, - {'pid': 'any'}) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"pid": "any"} + ) def test_012_on_qdb_change_multiple_assignments_dev(self): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - port_assign = DeviceAssignment(VirtualDevice(Port( - exp_dev.backend_domain, '1-2', 'usb'), - '*'), mode='auto-attach', - options={'pid': 'any'}) - dev_assign = DeviceAssignment(VirtualDevice(Port( - exp_dev.backend_domain, '*', 'usb'), - exp_dev.device_id), mode='auto-attach', options={'any': 'did'}) - - front.devices['usb']._assigned.append(dev_assign) - front.devices['usb']._assigned.append(port_assign) - back.devices['usb']._exposed.append( - qubesusbproxy.core3ext.USBDevice(back, '1-1')) - back.devices['usb']._exposed.append( - qubesusbproxy.core3ext.USBDevice(back, '1-2')) + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + port_assign = DeviceAssignment( + VirtualDevice(Port(exp_dev.backend_domain, "1-2", "usb"), "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(back, "1-1") + ) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(back, "1-2") + ) self.ext.attach_and_notify = Mock() loop = asyncio.get_event_loop() - with mock.patch('asyncio.wait'): - with mock.patch('asyncio.ensure_future'): + with mock.patch("asyncio.wait"): + with mock.patch("asyncio.ensure_future"): loop.run_until_complete(self.ext.on_domain_start(front, None)) - self.assertEqual(self.ext.attach_and_notify.call_args[0][1].options, - {'any': 'did'}) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"any": "did"} + ) def test_013_on_qdb_change_two_fronts(self): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - assign = DeviceAssignment(exp_dev, mode='auto-attach') + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + assmnt = DeviceAssignment(exp_dev, mode="auto-attach") - front.devices['usb']._assigned.append(assign) - back.devices['usb']._assigned.append(assign) - back.devices['usb']._exposed.append(exp_dev) + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) - resolver_path = 'qubes.ext.utils.resolve_conflicts_and_attach' + resolver_path = "qubes.ext.utils.resolve_conflicts_and_attach" with mock.patch(resolver_path, new_callable=Mock) as resolver: - with mock.patch('asyncio.ensure_future'): + with mock.patch("asyncio.ensure_future"): self.ext.on_qdb_change(back, None, None) resolver.assert_called_once_with( - self.ext, {'1-1': {front: assign, back: assign}}) + self.ext, {"1-1": {front: assmnt, back: assmnt}} + ) - @unittest.mock.patch('asyncio.create_subprocess_shell') + @unittest.mock.patch("asyncio.create_subprocess_shell") def test_014_failed_confirmation(self, shell): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - assign = DeviceAssignment(exp_dev, mode='auto-attach') + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + assmnt = DeviceAssignment(exp_dev, mode="auto-attach") - front.devices['usb']._assigned.append(assign) - back.devices['usb']._assigned.append(assign) - back.devices['usb']._exposed.append(exp_dev) + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) proc = AsyncMock() shell.return_value = proc proc.communicate = AsyncMock() - proc.communicate.return_value = (b'nonsense', b'') + proc.communicate.return_value = (b"nonsense", b"") loop = asyncio.get_event_loop() self.ext.attach_and_notify = AsyncMock() - loop.run_until_complete(qubes.ext.utils.resolve_conflicts_and_attach( - self.ext, {'1-1': {front: assign, back: assign}})) + loop.run_until_complete( + qubes.ext.utils.resolve_conflicts_and_attach( + self.ext, {"1-1": {front: assmnt, back: assmnt}} + ) + ) self.ext.attach_and_notify.assert_not_called() - @unittest.mock.patch('asyncio.create_subprocess_shell') + @unittest.mock.patch("asyncio.create_subprocess_shell") def test_015_successful_confirmation(self, shell): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - assign = DeviceAssignment(exp_dev, mode='auto-attach') + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + assmnt = DeviceAssignment(exp_dev, mode="auto-attach") - front.devices['usb']._assigned.append(assign) - back.devices['usb']._assigned.append(assign) - back.devices['usb']._exposed.append(exp_dev) + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) proc = AsyncMock() shell.return_value = proc proc.communicate = AsyncMock() - proc.communicate.return_value = (b'front-vm', b'') + proc.communicate.return_value = (b"front-vm", b"") loop = asyncio.get_event_loop() self.ext.attach_and_notify = AsyncMock() - loop.run_until_complete(qubes.ext.utils.resolve_conflicts_and_attach( - self.ext, {'1-1': {front: assign, back: assign}})) - self.ext.attach_and_notify.assert_called_once_with(front, assign) + loop.run_until_complete( + qubes.ext.utils.resolve_conflicts_and_attach( + self.ext, {"1-1": {front: assmnt, back: assmnt}} + ) + ) + self.ext.attach_and_notify.assert_called_once_with(front, assmnt) def test_016_on_qdb_change_ask(self): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - assign = DeviceAssignment(exp_dev, mode='ask-to-attach') + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + assmnt = DeviceAssignment(exp_dev, mode="ask-to-attach") - front.devices['usb']._assigned.append(assign) - back.devices['usb']._exposed.append(exp_dev) + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) - resolver_path = 'qubes.ext.utils.resolve_conflicts_and_attach' + resolver_path = "qubes.ext.utils.resolve_conflicts_and_attach" with mock.patch(resolver_path, new_callable=Mock) as resolver: - with mock.patch('asyncio.ensure_future'): + with mock.patch("asyncio.ensure_future"): self.ext.on_qdb_change(back, None, None) - resolver.assert_called_once_with( - self.ext, {'1-1': {front: assign}}) + resolver.assert_called_once_with(self.ext, {"1-1": {front: assmnt}}) def test_020_on_startup_multiple_assignments_including_full(self): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - full_assig = DeviceAssignment(VirtualDevice( - exp_dev.port, exp_dev.device_id), mode='auto-attach', - options={'pid': 'did'}) - port_assign = DeviceAssignment(VirtualDevice( - exp_dev.port, '*'), mode='auto-attach', - options={'pid': 'any'}) - dev_assign = DeviceAssignment(VirtualDevice(Port( - exp_dev.backend_domain, '*', 'usb'), - exp_dev.device_id), mode='auto-attach', - options={'any': 'did'}) - - front.devices['usb']._assigned.append(dev_assign) - front.devices['usb']._assigned.append(port_assign) - front.devices['usb']._assigned.append(full_assig) - back.devices['usb']._exposed.append( - qubesusbproxy.core3ext.USBDevice(back, '1-1')) + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + full_assig = DeviceAssignment( + VirtualDevice(exp_dev.port, exp_dev.device_id), + mode="auto-attach", + options={"pid": "did"}, + ) + port_assign = DeviceAssignment( + VirtualDevice(exp_dev.port, "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + front.devices["usb"]._assigned.append(full_assig) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(back, "1-1") + ) self.ext.attach_and_notify = AsyncMock() loop = asyncio.get_event_loop() loop.run_until_complete(self.ext.on_domain_start(front, None)) - self.assertEqual(self.ext.attach_and_notify.call_args[0][1].options, - {'pid': 'did'}) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"pid": "did"} + ) def test_021_on_startup_multiple_assignments_port_vs_dev(self): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - port_assign = DeviceAssignment(VirtualDevice( - exp_dev.port, '*'), mode='auto-attach', - options={'pid': 'any'}) - dev_assign = DeviceAssignment(VirtualDevice(Port( - exp_dev.backend_domain, '*', 'usb'), - exp_dev.device_id), mode='auto-attach', - options={'any': 'did'}) - - front.devices['usb']._assigned.append(dev_assign) - front.devices['usb']._assigned.append(port_assign) - back.devices['usb']._exposed.append( - qubesusbproxy.core3ext.USBDevice(back, '1-1')) + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + port_assign = DeviceAssignment( + VirtualDevice(exp_dev.port, "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(back, "1-1") + ) loop = asyncio.get_event_loop() self.ext.attach_and_notify = AsyncMock() loop.run_until_complete(self.ext.on_domain_start(front, None)) - self.assertEqual(self.ext.attach_and_notify.call_args[0][1].options, - {'pid': 'any'}) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"pid": "any"} + ) def test_022_on_startup_multiple_assignments_dev(self): back, front = self.added_assign_setup() - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - port_assign = DeviceAssignment(VirtualDevice(Port( - exp_dev.backend_domain, '1-2', 'usb'), - '*'), mode='auto-attach', - options={'pid': 'any'}) - dev_assign = DeviceAssignment(VirtualDevice(Port( - exp_dev.backend_domain, '*', 'usb'), - exp_dev.device_id), mode='auto-attach', options={'any': 'did'}) - - front.devices['usb']._assigned.append(dev_assign) - front.devices['usb']._assigned.append(port_assign) - back.devices['usb']._exposed.append( - qubesusbproxy.core3ext.USBDevice(back, '1-1')) - back.devices['usb']._exposed.append( - qubesusbproxy.core3ext.USBDevice(back, '1-2')) + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + port_assign = DeviceAssignment( + VirtualDevice(Port(exp_dev.backend_domain, "1-2", "usb"), "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(back, "1-1") + ) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(back, "1-2") + ) self.ext.attach_and_notify = AsyncMock() loop = asyncio.get_event_loop() loop.run_until_complete(self.ext.on_domain_start(front, None)) - self.assertEqual(self.ext.attach_and_notify.call_args[0][1].options, - {'any': 'did'}) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"any": "did"} + ) def test_023_on_startup_already_attached(self): - back, front = self.added_assign_setup(attachment='sys-usb') + back, front = self.added_assign_setup(attachment="sys-usb") - exp_dev = qubesusbproxy.core3ext.USBDevice(back, '1-1') - assign = DeviceAssignment(VirtualDevice( - exp_dev.port, exp_dev.device_id), mode='auto-attach') + exp_dev = qubesusbproxy.core3ext.USBDevice(back, "1-1") + assmnt = DeviceAssignment( + VirtualDevice(exp_dev.port, exp_dev.device_id), mode="auto-attach" + ) - front.devices['usb']._assigned.append(assign) - back.devices['usb']._exposed.append(exp_dev) + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) self.ext.attach_and_notify = Mock() loop = asyncio.get_event_loop() - with mock.patch('asyncio.ensure_future'): + with mock.patch("asyncio.ensure_future"): loop.run_until_complete(self.ext.on_domain_start(front, None)) self.ext.attach_and_notify.assert_not_called() @@ -1041,6 +1191,7 @@ def list_tests(): tests += [TC_20_USBProxy_core3] return tests + def list_unit_tests(): tests = [] if core3: diff --git a/qubesusbproxy/utils.py b/qubesusbproxy/utils.py index 641e065..b2de316 100644 --- a/qubesusbproxy/utils.py +++ b/qubesusbproxy/utils.py @@ -2,7 +2,7 @@ # # The Qubes OS Project, https://www.qubes-os.org # -# Copyright (C) 2023 Piotr Bartman-Szwarc +# Copyright (C) 2024 Piotr Bartman-Szwarc # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -30,33 +30,43 @@ def device_list_change( - ext: qubes.ext.Extension, current_devices, - vm, path, device_class: Type[qubes.device_protocol.DeviceInfo] + ext: qubes.ext.Extension, + current_devices, + vm, + path, + device_class: Type[qubes.device_protocol.DeviceInfo], ): - devclass = device_class.__name__[:-len('Device')].lower() + devclass = device_class.__name__[: -len("Device")].lower() if path is not None: - vm.fire_event(f'device-list-change:{devclass}') + vm.fire_event(f"device-list-change:{devclass}") - added, attached, detached, removed = ( - compare_device_cache(vm, ext.devices_cache, current_devices)) + added, attached, detached, removed = compare_device_cache( + vm, ext.devices_cache, current_devices + ) # send events about devices detached/attached outside by themselves for port_id, front_vm in detached.items(): dev = device_class(vm, port_id) - asyncio.ensure_future(front_vm.fire_event_async( - f'device-detach:{devclass}', port=dev.port)) + asyncio.ensure_future( + front_vm.fire_event_async( + f"device-detach:{devclass}", port=dev.port + ) + ) for port_id in removed: device = device_class(vm, port_id) - vm.fire_event(f'device-removed:{devclass}', port=device.port) + vm.fire_event(f"device-removed:{devclass}", port=device.port) for port_id in added: device = device_class(vm, port_id) - vm.fire_event(f'device-added:{devclass}', device=device) + vm.fire_event(f"device-added:{devclass}", device=device) for dev_ident, front_vm in attached.items(): dev = device_class(vm, dev_ident) # options are unknown, device already attached - asyncio.ensure_future(front_vm.fire_event_async( - f'device-attach:{devclass}', device=dev, options={})) + asyncio.ensure_future( + front_vm.fire_event_async( + f"device-attach:{devclass}", device=dev, options={} + ) + ) ext.devices_cache[vm.name] = current_devices @@ -64,29 +74,35 @@ def device_list_change( for front_vm in vm.app.domains: if not front_vm.is_running(): continue - for assignment in reversed(sorted( - front_vm.devices[devclass].get_assigned_devices())): + for assignment in reversed( + sorted(front_vm.devices[devclass].get_assigned_devices()) + ): for device in assignment.devices: - if (assignment.matches(device) - and device.port_id in added - and device.port_id not in attached + if ( + assignment.matches(device) + and device.port_id in added + and device.port_id not in attached ): frontends = to_attach.get(device.port_id, {}) # make it unique ass = assignment.clone( - device=VirtualDevice(device.port, device.device_id)) + device=VirtualDevice(device.port, device.device_id) + ) curr = frontends.get(front_vm, None) if curr is None or curr < ass: # chose the most specific assignment frontends[front_vm] = ass to_attach[device.port_id] = frontends - for port_id, frontends in to_attach.items(): + asyncio.ensure_future(resolve_conflicts_and_attach(ext, to_attach)) + + +async def resolve_conflicts_and_attach(ext, to_attach): + for _, frontends in to_attach.items(): if len(frontends) > 1: # unique device = tuple(frontends.values())[0].device - target_name = asyncio.ensure_future( - confirm_device_attachment(device, frontends)).result() + target_name = await confirm_device_attachment(device, frontends) for front in frontends: if front.name == target_name: target = front @@ -101,7 +117,7 @@ def device_list_change( target = tuple(frontends.keys())[0] assignment = frontends[target] - asyncio.ensure_future(ext.attach_and_notify(target, assignment)) + await ext.attach_and_notify(target, assignment) def compare_device_cache(vm, devices_cache, current_devices): @@ -146,13 +162,19 @@ async def confirm_device_attachment(device, frontends) -> str: # pylint: disable=consider-using-with # vm names are safe to just join by spaces proc = await asyncio.create_subprocess_shell( - " ".join(["qubes-device-attach-confirm", device.backend_domain.name, - device.port_id, "'" + device.description + "'", - *front_names]), - stdout=asyncio.subprocess.PIPE + " ".join( + [ + "qubes-device-attach-confirm", + device.backend_domain.name, + device.port_id, + "'" + device.description + "'", + *front_names, + ] + ), + stdout=asyncio.subprocess.PIPE, ) (target_name, _) = await proc.communicate() - target_name = target_name.decode() + target_name = target_name.decode(encoding="ascii") if target_name in front_names: return target_name return ""