From 8772108b263a3379a3c53cf8b6cb14576889bb7d Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 12 Aug 2024 18:16:14 +0200 Subject: [PATCH] q-dev: refactor device_protocol.py --- qubesadmin/backup/core2.py | 4 +- qubesadmin/backup/core3.py | 4 +- qubesadmin/backup/restore.py | 14 +- qubesadmin/device_protocol.py | 771 +++++++++++------- qubesadmin/devices.py | 47 +- qubesadmin/events/__init__.py | 4 +- qubesadmin/tests/app.py | 42 +- .../tests/backup/backupcompatibility.py | 8 +- qubesadmin/tests/devices.py | 184 +++-- qubesadmin/tests/tools/qvm_device.py | 86 +- qubesadmin/tests/tools/qvm_start.py | 54 +- qubesadmin/tools/qvm_device.py | 10 +- qubesadmin/tools/qvm_start.py | 22 +- 13 files changed, 711 insertions(+), 539 deletions(-) diff --git a/qubesadmin/backup/core2.py b/qubesadmin/backup/core2.py index eed3d1ca9..d46f4495d 100644 --- a/qubesadmin/backup/core2.py +++ b/qubesadmin/backup/core2.py @@ -336,10 +336,10 @@ def import_core2_vm(self, element): if pcidevs: pcidevs = ast.literal_eval(pcidevs) for pcidev in pcidevs: - ident = pcidev.replace(':', '_') + port_id = pcidev.replace(':', '_') options = {'no-strict-reset': True} if not pci_strictreset else {} options['required'] = True - vm.devices['pci'][('dom0', ident)] = options + vm.devices['pci'][('dom0', port_id)] = options def load(self): with open(self.store, encoding='utf-8') as fh: diff --git a/qubesadmin/backup/core3.py b/qubesadmin/backup/core3.py index a9dac95a6..80efb3357 100644 --- a/qubesadmin/backup/core3.py +++ b/qubesadmin/backup/core3.py @@ -122,14 +122,14 @@ def import_core3_vm(self, element): assert bus_name is not None for node in bus_node.findall('./device'): backend_domain = node.get('backend-domain') - ident = node.get('id') + port_id = node.get('id') options = {} for opt_node in node.findall('./option'): opt_name = opt_node.get('name') options[opt_name] = opt_node.text options['required'] = device_protocol.qbool( node.get('required', 'yes')) - vm.devices[bus_name][(backend_domain, ident)] = options + vm.devices[bus_name][(backend_domain, port_id)] = options # extract base properties if vm.klass == 'AdminVM': diff --git a/qubesadmin/backup/restore.py b/qubesadmin/backup/restore.py index 3f272bfd3..cce035307 100644 --- a/qubesadmin/backup/restore.py +++ b/qubesadmin/backup/restore.py @@ -50,7 +50,7 @@ from qubesadmin.backup import BackupVM from qubesadmin.backup.core2 import Core2Qubes from qubesadmin.backup.core3 import Core3Qubes -from qubesadmin.device_protocol import DeviceAssignment, Port +from qubesadmin.device_protocol import DeviceAssignment, Port, Device from qubesadmin.exc import QubesException from qubesadmin.utils import size_to_human @@ -2086,19 +2086,19 @@ def _restore_vms_metadata(self, restore_info): tag, vm.name, err) for bus in vm.devices: - for backend_domain, ident in vm.devices[bus]: - options = vm.devices[bus][(backend_domain, ident)] + for backend_domain, port_id in vm.devices[bus]: + options = vm.devices[bus][(backend_domain, port_id)] if 'required' in options: required = options['required'] del options['required'] else: required = False assignment = DeviceAssignment( - Port( + Device(Port( backend_domain=self.app.domains[backend_domain], - ident=ident, + port_id=port_id, devclass=bus, - ), + )), options=options, mode='required' if required else 'auto-attach', ) @@ -2107,7 +2107,7 @@ def _restore_vms_metadata(self, restore_info): new_vm.devices[bus].assign(assignment) except Exception as err: # pylint: disable=broad-except self.log.error('Error assigning device %s:%s to %s: %s', - bus, ident, vm.name, err) + bus, port_id, vm.name, err) # Set VM dependencies - only non-default setting for vm in vms.values(): diff --git a/qubesadmin/device_protocol.py b/qubesadmin/device_protocol.py index 045bc3d59..d57b28670 100644 --- a/qubesadmin/device_protocol.py +++ b/qubesadmin/device_protocol.py @@ -32,7 +32,7 @@ import string import sys from enum import Enum -from typing import Optional, Dict, Any, List, Union, Tuple +from typing import Optional, Dict, Any, List, Union, Tuple, Callable import qubesadmin.exc @@ -73,76 +73,12 @@ def qbool(value): return bool(value) -class Port: - """ - Class of a *bus* device port with *ident* exposed by a *backend domain*. - - Attributes: - backend_domain (QubesVM): The domain which exposes devices, - e.g.`sys-usb`. - ident (str): A unique identifier for the port within the backend domain. - devclass (str): The class of the port (e.g., 'usb', 'pci'). - """ +class DeviceSerializer: ALLOWED_CHARS_KEY = set( - string.digits + string.ascii_letters - + r"!#$%&()*+,-./:;<>?@[\]^_{|}~") + string.digits + string.ascii_letters + + r"!#$%&()*+,-./:;<>?@[\]^_{|}~") ALLOWED_CHARS_PARAM = ALLOWED_CHARS_KEY.union(set(string.punctuation + ' ')) - def __init__(self, backend_domain, ident, devclass): - self.__backend_domain = backend_domain - self.__ident = ident - self.__devclass = devclass - - def __hash__(self): - return hash((self.backend_domain.name, self.ident, self.devclass)) - - def __eq__(self, other): - if isinstance(other, Port): - return ( - self.backend_domain == other.backend_domain and - self.ident == other.ident and - self.devclass == other.devclass - ) - raise TypeError(f"Comparing instances of 'Port' and '{type(other)}' " - "is not supported") - - def __lt__(self, other): - if isinstance(other, Port): - return (self.backend_domain.name, self.devclass, self.ident) < \ - (other.backend_domain.name, other.devclass, other.ident) - raise TypeError(f"Comparing instances of 'Port' and '{type(other)}' " - "is not supported") - - def __repr__(self): - return f"[{self.backend_domain.name}]:{self.devclass}:{self.ident}" - - def __str__(self): - return f"{self.backend_domain.name}:{self.ident}" - - @property - def ident(self) -> str: - """ - Immutable port identifier. - - Unique for given domain and devclass. - """ - return self.__ident - - @property - def backend_domain(self) -> QubesVM: - """ Which domain exposed this port. (immutable)""" - return self.__backend_domain - - @property - def devclass(self) -> str: - """ Immutable port class such like: 'usb', 'pci' etc. - - For unknown classes "peripheral" is returned. - """ - if self.__devclass: - return self.__devclass - return "peripheral" - @classmethod def unpack_properties( cls, untrusted_serialization: bytes @@ -171,24 +107,24 @@ def unpack_properties( values = [] ut_key, _, ut_rest = ut_decoded.partition("='") - key = sanitize_str( - ut_key, cls.ALLOWED_CHARS_KEY, + key = cls.sanitize_str( + ut_key.strip(), cls.ALLOWED_CHARS_KEY, error_message='Invalid chars in property name: ') keys.append(key) while "='" in ut_rest: ut_value_key, _, ut_rest = ut_rest.partition("='") ut_value, _, ut_key = ut_value_key.rpartition("' ") - value = sanitize_str( - deserialize_str(ut_value), cls.ALLOWED_CHARS_PARAM, + value = cls.sanitize_str( + cls.deserialize_str(ut_value), cls.ALLOWED_CHARS_PARAM, error_message='Invalid chars in property value: ') values.append(value) - key = sanitize_str( - ut_key, cls.ALLOWED_CHARS_KEY, + key = cls.sanitize_str( + ut_key.strip(), cls.ALLOWED_CHARS_KEY, error_message='Invalid chars in property name: ') keys.append(key) ut_value = ut_rest[:-1] # ending ' - value = sanitize_str( - deserialize_str(ut_value), cls.ALLOWED_CHARS_PARAM, + value = cls.sanitize_str( + cls.deserialize_str(ut_value), cls.ALLOWED_CHARS_PARAM, error_message='Invalid chars in property value: ') values.append(value) @@ -202,21 +138,23 @@ def unpack_properties( return properties, options @classmethod - def pack_property(cls, key: str, value: str): + def pack_property(cls, key: str, value: Optional[str]): """ Add property `key=value` to serialization. """ - key = sanitize_str( + if value is None: + return b'' + key = cls.sanitize_str( key, cls.ALLOWED_CHARS_KEY, error_message='Invalid chars in property name: ') - value = sanitize_str( - serialize_str(value), cls.ALLOWED_CHARS_PARAM, + value = cls.sanitize_str( + cls.serialize_str(value), cls.ALLOWED_CHARS_PARAM, error_message='Invalid chars in property value: ') return key.encode('ascii') + b'=' + value.encode('ascii') @staticmethod - def check_device_properties( - expected_port: 'Port', properties: Dict[str, Any]): + def parse_basic_device_properties( + expected_device: 'Device', properties: Dict[str, Any]): """ Validates properties against an expected port configuration. @@ -226,25 +164,345 @@ def check_device_properties( UnexpectedDeviceProperty: If any property does not match the expected values. """ - expected = expected_port + expected = expected_device.port exp_vm_name = expected.backend_domain.name if properties.get('backend_domain', exp_vm_name) != exp_vm_name: raise UnexpectedDeviceProperty( f"Got device exposed by {properties['backend_domain']}" f"when expected devices from {exp_vm_name}.") - properties['backend_domain'] = expected.backend_domain + properties.pop('backend_domain', None) - if properties.get('ident', expected.ident) != expected.ident: + if properties.get('port_id', expected.port_id) != expected.port_id: raise UnexpectedDeviceProperty( - f"Got device from port: {properties['ident']} " - f"when expected port: {expected.ident}.") - properties['ident'] = expected.ident - + f"Got device from port: {properties['port_id']} " + f"when expected port: {expected.port_id}.") + properties.pop('port_id', None) + + if expected.devclass == '*': + expected = Port( + expected.backend_domain, + expected.port_id, + properties.get('devclass', None)) if properties.get('devclass', expected.devclass) != expected.devclass: raise UnexpectedDeviceProperty( f"Got {properties['devclass']} device " f"when expected {expected.devclass}.") - properties['devclass'] = expected.devclass + properties.pop('devclass', None) + + expected_devid = expected_device.device_id + # device id is optional + if expected_devid != '*': + if properties.get('device_id', expected_devid) != expected_devid: + raise UnexpectedDeviceProperty( + f"Unrecognized device identity '{properties['device_id']}' " + f"expected '{expected_device.device_id}'" + ) + + properties['port'] = expected + + @staticmethod + def serialize_str(value: str): + """ + Serialize python string to ensure consistency. + """ + return "'" + str(value).replace("'", r"\'") + "'" + + @staticmethod + def deserialize_str(value: str): + """ + Deserialize python string to ensure consistency. + """ + return value.replace(r"\'", "'") + + @staticmethod + def sanitize_str( + untrusted_value: str, + allowed_chars: set, + replace_char: str = None, + error_message: str = "" + ) -> str: + """ + Sanitize given untrusted string. + + If `replace_char` is not None, ignore `error_message` and replace invalid + characters with the string. + """ + if replace_char is None: + not_allowed_chars = set(untrusted_value) - allowed_chars + if not_allowed_chars: + raise ProtocolError(error_message + repr(not_allowed_chars)) + return untrusted_value + result = "" + for char in untrusted_value: + if char in allowed_chars: + result += char + else: + result += replace_char + return result + + +class Port: + """ + Class of a *bus* device port with *port id* exposed by a *backend domain*. + + Attributes: + backend_domain (QubesVM): The domain which exposes devices, + e.g.`sys-usb`. + port_id (str): A unique identifier for the port within the backend domain. + devclass (str): The class of the port (e.g., 'usb', 'pci'). + """ + def __init__(self, backend_domain, port_id, devclass): + self.__backend_domain = backend_domain + self.__port_id = port_id + self.__devclass = devclass + + def __hash__(self): + return hash((self.backend_domain.name, self.port_id, self.devclass)) + + def __eq__(self, other): + if isinstance(other, Port): + return ( + self.backend_domain == other.backend_domain and + self.port_id == other.port_id and + self.devclass == other.devclass + ) + return False + + def __lt__(self, other): + if isinstance(other, Port): + return (self.backend_domain.name, self.devclass, self.port_id) < \ + (other.backend_domain.name, other.devclass, other.port_id) + raise TypeError(f"Comparing instances of 'Port' and '{type(other)}' " + "is not supported") + + def __repr__(self): + return f"{self.backend_domain.name}+{self.port_id}" + + def __str__(self): + return f"{self.backend_domain.name}:{self.port_id}" + + @classmethod + def from_qarg( + cls, representation: str, devclass, domains, blind=False + ) -> 'Port': + if blind: + get_domain = domains.get_blind + else: + get_domain = domains.get + return cls._parse(representation, devclass, get_domain, '+') + + @classmethod + def from_str(cls, representation: str, devclass, domains) -> 'Port': + get_domain = domains.get + return cls._parse(representation, devclass, get_domain, ':') + + @classmethod + def _parse( + cls, + representation: str, + devclass: str, + get_domain: Callable, + sep: str + ) -> 'Port': + backend_name, port_id = representation.split(sep, 1) + backend = get_domain(backend_name) + return cls(backend_domain=backend, port_id=port_id, devclass=devclass) + + @property + def port_id(self) -> str: + """ + Immutable port identifier. + + Unique for given domain and devclass. + """ + return self.__port_id + + @property + def backend_domain(self) -> QubesVM: + """ Which domain exposed this port. (immutable)""" + return self.__backend_domain + + @property + def devclass(self) -> str: + """ Immutable port class such like: 'usb', 'pci' etc. + + For unknown classes "peripheral" is returned. + """ + if self.__devclass: + return self.__devclass + return "peripheral" + + +class Device: + def __init__( + self, + port: Optional[Port] = None, + device_id: Optional[str] = None, + ): + self.port: Optional[Port] = port + self._device_id = device_id if device_id else '*' + + def clone(self, **kwargs): + """ + Clone object and substitute attributes with explicitly given. + """ + attr = { + "port": self.port, + "device_id": self.device_id, + } + attr.update(kwargs) + return self.__class__(**attr) + + @property + def port(self): + return self._port + + @port.setter + def port(self, value): + self._port = value if value is not None else '*' + + @property + def device_id(self): + return self._device_id + + @device_id.setter + def device_id(self, value): + self._device_id = value if value else '*' + + @property + def backend_domain(self): + if self.port != '*': + return self.port.backend_domain + return None + + @property + def port_id(self): + if self.port != '*': + return self.port.port_id + return None + + @property + def devclass(self): + if self.port != '*': + return self.port.devclass + return None + + def __hash__(self): + return hash((self.port, self.device_id)) + + def __eq__(self, other): + if isinstance(other, (Device, DeviceAssignment)): + result = ( + self.port == other.port + and self.device_id == other.device_id + ) + return result + if isinstance(other, Port): + return ( + self.port == other + and self.device_id == '*' + ) + return super().__eq__(other) + + def __lt__(self, other): + """ + Desired order (important for auto-attachment): + + 1. : + 2. :* + 3. *: + 4. *:* + """ + if isinstance(other, (Device, DeviceAssignment)): + if self.port == '*' and other.port != '*': + return True + if self.port != '*' and other.port == '*': + return False + reprs = {self: [self.port], other: [other.port]} + for obj in reprs: + if obj.device_id != '*': + reprs[obj].append(obj.device_id) + return reprs[self] < reprs[other] + elif isinstance(other, Port): + _other = Device(other, '*') + return self < _other + else: + raise TypeError( + f"Comparing instances of {type(self)} and '{type(other)}' " + "is not supported") + + def __repr__(self): + return f"{self.port!r}:{self.device_id}" + + def __str__(self): + return f"{self.port}:{self.device_id}" + + @classmethod + def from_qarg( + cls, + representation: str, + devclass, + domains, + blind=False, + backend=None, + ) -> 'Device': + if backend is None: + if blind: + get_domain = domains.get_blind + else: + get_domain = domains.get + else: + get_domain = None + return cls._parse(representation, devclass, get_domain, backend, '+') + + @classmethod + def from_str( + cls, representation: str, devclass: Optional[str], domains, + backend=None + ) -> 'Device': + if backend is None: + get_domain = domains.get + else: + get_domain = None + return cls._parse(representation, devclass, get_domain, backend, ':') + + @classmethod + def _parse( + cls, + representation: str, + devclass: Optional[str], + get_domain: Callable, + backend, + sep: str + ) -> 'Device': + if backend is None: + backend_name, identity = representation.split(sep, 1) + backend = get_domain(backend_name) + else: + identity = representation + port_id, _, devid = identity.partition(':') + if devid in ('', '*'): + devid = '*' + return cls( + Port(backend_domain=backend, port_id=port_id, devclass=devclass), + device_id=devid + ) + + def serialize(self) -> bytes: + """ + Serialize an object to be transmitted via Qubes API. + """ + properties = b' '.join( + DeviceSerializer.pack_property(key, value) + for key, value in ( + ('device_id', self.device_id), + ('port_id', self.port_id), + ('devclass', self.devclass))) + + properties += b' ' + DeviceSerializer.pack_property( + 'backend_domain', self.backend_domain.name) + + return properties class DeviceCategory(Enum): @@ -427,15 +685,12 @@ def _load_classes(bus: str): return result -class DeviceInfo(Port): +class DeviceInfo(Device): """ Holds all information about a device """ def __init__( self, - port: Optional[Port] = None, - backend_domain: Optional = None, - ident: Optional = None, - devclass: Optional = None, + port: Port, vendor: Optional[str] = None, product: Optional[str] = None, manufacturer: Optional[str] = None, @@ -444,12 +699,10 @@ def __init__( interfaces: Optional[List[DeviceInterface]] = None, parent: Optional[Port] = None, attachment: Optional[QubesVM] = None, - self_identity: Optional[str] = None, + device_id: Optional[str] = None, **kwargs ): - if port is None: - port = Port(backend_domain, ident, devclass) - super().__init__(port.backend_domain, port.ident, port.devclass) + super().__init__(port, device_id) self._vendor = vendor self._product = product @@ -459,43 +712,9 @@ def __init__( self._interfaces = interfaces self._parent = parent self._attachment = attachment - self._self_identity = self_identity self.data = kwargs - def __hash__(self): - return hash(self.port)# self.self_identity)) - - def __eq__(self, other): - if isinstance(other, DeviceInfo): - return ( - self.port == other.port - # and self.self_identity == other.self_identity - ) - else: - return super().__lt__(other) - - def __lt__(self, other): - if isinstance(other, DeviceInfo): - # return (self.port, self.self_identity) < \ - # (other.port, other.self_identity) - return self.port < other.port - else: - return super().__lt__(other) - - def __repr__(self): - return f"{self.port!r}"#:{self.self_identity}" - - def __str__(self): - return f"{self.port}"#:{self.self_identity}" - - @property - def port(self) -> Port: - """ - Device port visible in Qubes. - """ - return Port(self.backend_domain, self.ident, self.devclass) - @property def vendor(self) -> str: """ @@ -606,7 +825,7 @@ def interfaces(self) -> List[DeviceInterface]: return self._interfaces @property - def parent_device(self) -> Optional[Port]: + def parent_device(self) -> Optional[Device]: """ The parent device, if any. @@ -616,7 +835,7 @@ def parent_device(self) -> Optional[Port]: return self._parent @property - def subdevices(self) -> List['DeviceInfo']: + def subdevices(self) -> List[Device]: """ The list of children devices if any. @@ -624,7 +843,7 @@ def subdevices(self) -> List['DeviceInfo']: the subdevices id should be here. """ return [dev for dev in self.backend_domain.devices[self.devclass] - if dev.parent_device.ident == self.ident] + if dev.parent_device.port.port_id == self.port_id] @property def attachment(self) -> Optional[QubesVM]: @@ -637,35 +856,32 @@ def serialize(self) -> bytes: """ Serialize an object to be transmitted via Qubes API. """ - # 'backend_domain', 'attachment', 'interfaces', 'data', 'parent_device' + properties = Device.serialize(self) + # 'attachment', 'interfaces', 'data', 'parent_device' # are not string, so they need special treatment - default_attrs = { - 'ident', 'devclass', 'vendor', 'product', 'manufacturer', 'name', - 'serial', 'self_identity'} - properties = b' '.join( - self.pack_property(key, value) - for key, value in ((key, getattr(self, key)) - for key in default_attrs)) - - properties += b' ' + self.pack_property( - 'backend_domain', self.backend_domain.name) + default = DeviceInfo(self.port) + default_attrs = {'vendor', 'product', 'manufacturer', 'name', 'serial'} + properties += b' ' + b' '.join( + DeviceSerializer.pack_property(key, value) for key, value in ( + (key, getattr(self, key)) for key in default_attrs + if getattr(self, key) != getattr(default, key))) if self.attachment: - properties = self.pack_property( + properties = DeviceSerializer.pack_property( 'attachment', self.attachment.name) - properties += b' ' + self.pack_property( + properties += b' ' + DeviceSerializer.pack_property( 'interfaces', ''.join(repr(ifc) for ifc in self.interfaces)) if self.parent_device is not None: - properties += b' ' + self.pack_property( - 'parent_ident', self.parent_device.ident) - properties += b' ' + self.pack_property( + properties += b' ' + DeviceSerializer.pack_property( + 'parent_port_id', self.parent_device.port_id) + properties += b' ' + DeviceSerializer.pack_property( 'parent_devclass', self.parent_device.devclass) for key, value in self.data.items(): - properties += b' ' + self.pack_property("_" + key, value) + properties += b' ' + DeviceSerializer.pack_property("_" + key, value) return properties @@ -679,23 +895,16 @@ def deserialize( """ Recovers a serialized object, see: :py:meth:`serialize`. """ - identity, _, rest = serialization.partition(b' ') - identity = identity.decode('ascii', errors='ignore') - ident, devid = identity.split(':', 1) - if devid == 'None': # TODO - devid = None - device = UnknownDevice( - backend_domain=expected_backend_domain, - ident=ident, - devclass=expected_devclass, - self_identity=devid - ) + head, _, rest = serialization.partition(b' ') + device = Device.from_str( + head.decode('ascii', errors='ignore'), expected_devclass, + domains=None, backend=expected_backend_domain) try: device = cls._deserialize(rest, device) # pylint: disable=broad-exception-caught except Exception as exc: - print(exc, file=sys.stderr) + device = UnknownDevice.from_device(device) return device @@ -703,15 +912,17 @@ def deserialize( def _deserialize( cls, untrusted_serialization: bytes, - expected_device: 'DeviceInfo' + expected_device: Device ) -> 'DeviceInfo': """ Actually deserializes the object. """ - properties, options = cls.unpack_properties(untrusted_serialization) + properties, options = DeviceSerializer.unpack_properties( + untrusted_serialization) properties.update(options) - cls.check_device_properties(expected_device, properties) + DeviceSerializer.parse_basic_device_properties( + expected_device, properties) if 'attachment' not in properties or not properties['attachment']: properties['attachment'] = None @@ -720,18 +931,6 @@ def _deserialize( properties['attachment'] = app.domains.get_blind( properties['attachment']) - if properties['devclass'] != expected_device.devclass: - raise UnexpectedDeviceProperty( - f"Got {properties['devclass']} device " - f"when expected {expected_device.devclass}.") - - if (expected_device.self_identity is not None and - properties['self_identity'] != expected_device.self_identity): - raise UnexpectedDeviceProperty( - f"Unrecognized device identity '{properties['self_identity']}' " - f"expected '{expected_device.self_identity}'" - ) - if 'interfaces' in properties: interfaces = properties['interfaces'] interfaces = [ @@ -742,24 +941,16 @@ def _deserialize( if 'parent_ident' in properties: properties['parent'] = Port( backend_domain=expected_device.backend_domain, - ident=properties['parent_ident'], + port_id=properties['parent_ident'], devclass=properties['parent_devclass'], ) del properties['parent_ident'] del properties['parent_devclass'] - port = Port( - properties['backend_domain'], - properties['ident'], - properties['devclass']) - del properties['backend_domain'] - del properties['ident'] - del properties['devclass'] - - return cls(port, **properties) + return cls(**properties) @property - def self_identity(self) -> str: + def device_id(self) -> str: """ Get additional identification of device presented by device itself. @@ -774,58 +965,22 @@ def self_identity(self) -> str: to be plugged to the same port). For a common user it is all the data she uses to recognize the device. """ - if not self._self_identity: + if not self._device_id: return "0000:0000::?******" - return self._self_identity - - -def serialize_str(value: str): - """ - Serialize python string to ensure consistency. - """ - return "'" + str(value).replace("'", r"\'") + "'" - - -def deserialize_str(value: str): - """ - Deserialize python string to ensure consistency. - """ - return value.replace(r"\'", "'") - + return self._device_id -def sanitize_str( - untrusted_value: str, - allowed_chars: set, - replace_char: str = None, - error_message: str = "" -) -> str: - """ - Sanitize given untrusted string. - - If `replace_char` is not None, ignore `error_message` and replace invalid - characters with the string. - """ - if replace_char is None: - not_allowed_chars = set(untrusted_value) - allowed_chars - if not_allowed_chars: - raise ProtocolError(error_message + repr(not_allowed_chars)) - return untrusted_value - result = "" - for char in untrusted_value: - if char in allowed_chars: - result += char - else: - result += replace_char - return result + @device_id.setter + def device_id(self, value): + # Do not auto-override value like in super class + self._device_id = value class UnknownDevice(DeviceInfo): # pylint: disable=too-few-public-methods """Unknown device - for example, exposed by domain not running currently""" - - def __init__(self, backend_domain, ident, *, devclass, **kwargs): - port = Port(backend_domain, ident, devclass) - super().__init__(port, **kwargs) + @staticmethod + def from_device(device) -> 'UnknownDevice': + return UnknownDevice(device.port, device_id=device.device_id) class AssignmentMode(Enum): @@ -835,7 +990,7 @@ class AssignmentMode(Enum): REQUIRED = "required" -class DeviceAssignment(Port): +class DeviceAssignment: """ Maps a device to a frontend_domain. There are 3 flags `attached`, `automatically_attached` and `required`. @@ -857,56 +1012,95 @@ class DeviceAssignment(Port): def __init__( self, - port: Optional[Port] = None, - backend_domain: Optional = None, - ident: Optional = None, - devclass: Optional = None, - device_identity=None, + device: Device, frontend_domain=None, options=None, mode: Union[str, AssignmentMode] = "manual", ): - if port is None: - port = Port(backend_domain, ident, devclass) - super().__init__(port.backend_domain, port.ident, port.devclass) + if isinstance(device, DeviceInfo): + device = Device(device.port, device.device_id) + self._device_ident = device self.__options = options or {} if isinstance(mode, AssignmentMode): self.mode = mode else: self.mode = AssignmentMode(mode) self.frontend_domain = frontend_domain - self.device_identity = device_identity def clone(self, **kwargs): """ Clone object and substitute attributes with explicitly given. """ - port = kwargs.get( - "port", Port(self.backend_domain, self.ident, self.devclass)) + kwargs["device"] = kwargs.get( + "device", Device( + Port(self.backend_domain, self.port_id, self.devclass), + self.device_id + )) attr = { "options": self.options, "mode": self.mode, - "device_identity": self.device_identity, "frontend_domain": self.frontend_domain, } attr.update(kwargs) - return self.__class__(port, **attr) + return self.__class__(**attr) + + def __repr__(self): + return f"{self._device_ident!r}" + + def __str__(self): + return f"{self._device_ident}" + + def __hash__(self): + return hash(self._device_ident) + + def __eq__(self, other): + if isinstance(other, (Device, DeviceAssignment)): + result = ( + self.port == other.port + and self.device_id == other.device_id + ) + return result + return False + + def __lt__(self, other): + if isinstance(other, DeviceAssignment): + return self._device_ident < other._device_ident + if isinstance(other, Device): + return self._device_ident < other + raise TypeError( + f"Comparing instances of {type(self)} and '{type(other)}' " + "is not supported") + + @property + def backend_domain(self): + return self._device_ident.port.backend_domain + + @property + def port_id(self): + return self._device_ident.port.port_id + + @property + def devclass(self): + return self._device_ident.port.devclass + + @property + def device_id(self): + return self._device_ident.device_id @property def device(self) -> DeviceInfo: """Get DeviceInfo object corresponding to this DeviceAssignment""" - dev = self.backend_domain.devices[self.devclass][self.ident] + dev = self.backend_domain.devices[self.devclass][self.port_id] # TODO: device identity could not match - # if (self.device_identity is not None - # and self.device_identity != dev.self_identity): - # raise ProtocolError( - # "Device identity does not match, expected " - # f"'{self.device_identity}' got '{dev.self_identity}'") - # TODO - # return UnknownDevice( - # self.backend_domain, self.ident, devclass=self.devclass) return dev + @property + def port(self) -> Port: + """ + Device port visible in Qubes. + """ + return Port(self.backend_domain, self.port_id, self.devclass) + @property def frontend_domain(self) -> Optional[QubesVM]: """ Which domain the device is attached/assigned to. """ @@ -938,10 +1132,6 @@ def required(self) -> bool: """ return self.mode == AssignmentMode.REQUIRED - @required.setter - def required(self, required: bool): - self.mode = AssignmentMode.REQUIRED - @property def attach_automatically(self) -> bool: """ @@ -954,10 +1144,6 @@ def attach_automatically(self) -> bool: AssignmentMode.REQUIRED ) - @attach_automatically.setter - def attach_automatically(self, attach_automatically: bool): - self.mode = AssignmentMode.AUTO - @property def options(self) -> Dict[str, Any]: """ Device options (same as in the legacy API). """ @@ -972,23 +1158,15 @@ def serialize(self) -> bytes: """ Serialize an object to be transmitted via Qubes API. """ - properties = b' '.join( - self.pack_property(key, value) - for key, value in ( - ('mode', self.mode.value), - ('device_identity', self.device_identity), - ('ident', self.ident), - ('devclass', self.devclass))) - - properties += b' ' + self.pack_property( - 'backend_domain', self.backend_domain.name) - + properties = self._device_ident.serialize() + properties += b' ' + DeviceSerializer.pack_property( + 'mode', self.mode.value) if self.frontend_domain is not None: - properties += b' ' + self.pack_property( + properties += b' ' + DeviceSerializer.pack_property( 'frontend_domain', self.frontend_domain.name) for key, value in self.options.items(): - properties += b' ' + self.pack_property("_" + key, value) + properties += b' ' + DeviceSerializer.pack_property("_" + key, value) return properties @@ -996,42 +1174,35 @@ def serialize(self) -> bytes: def deserialize( cls, serialization: bytes, - expected_port: Port, - expected_identity: Optional[str], + expected_device: Device, ) -> 'DeviceAssignment': """ Recovers a serialized object, see: :py:meth:`serialize`. """ try: - result = cls._deserialize( - serialization, expected_port, expected_identity) + result = cls._deserialize(serialization, expected_device) except Exception as exc: - raise ProtocolError() from exc + raise ProtocolError(str(exc)) from exc return result @classmethod def _deserialize( cls, untrusted_serialization: bytes, - expected_port: Port, - expected_identity: Optional[str], + expected_device: Device, ) -> 'DeviceAssignment': """ Actually deserializes the object. """ - properties, options = cls.unpack_properties(untrusted_serialization) + properties, options = DeviceSerializer.unpack_properties( + untrusted_serialization) properties['options'] = options - import sys; print(f'{expected_identity=}', f'{expected_port=}', file=sys.stderr) # TODO debug - cls.check_device_properties(expected_port, properties) - del properties['backend_domain'] - del properties['ident'] - del properties['devclass'] + DeviceSerializer.parse_basic_device_properties( + expected_device, properties) + # we do not need port, we need device + del properties['port'] + properties.pop('device_id', None) + properties['device'] = expected_device - assignment = cls(expected_port, **properties) - if (expected_identity - and assignment.device.self_identity != expected_identity): - raise UnexpectedDeviceProperty( - f"Got device with identity {assignment.device.self_identity}" - f"when expected devices with identity {expected_identity}.") - return assignment + return cls(**properties) diff --git a/qubesadmin/devices.py b/qubesadmin/devices.py index 5063a0b81..55a8d1842 100644 --- a/qubesadmin/devices.py +++ b/qubesadmin/devices.py @@ -28,7 +28,7 @@ Devices can be of different classes (like 'pci', 'usb', etc.). Each device class is implemented by an extension. -Devices are identified by pair of (backend domain, `ident`), where `ident` is +Devices are identified by pair of (backend domain, `port_id`), where `port_id` is :py:class:`str`. """ import itertools @@ -36,7 +36,7 @@ class is implemented by an extension. import qubesadmin.exc from qubesadmin.device_protocol import (Port, DeviceInfo, UnknownDevice, - DeviceAssignment) + DeviceAssignment, Device) class DeviceCollection: @@ -115,8 +115,7 @@ def _add(self, assignment: DeviceAssignment, action: str) -> None: self._vm.qubesd_call( None, f'admin.vm.device.{self._class}.{action.capitalize()}', - f'{assignment.backend_domain!s}+{assignment.ident!s}', - assignment.serialize() + repr(assignment), assignment.serialize() ) def _remove(self, assignment: DeviceAssignment, action: str) -> None: @@ -135,7 +134,7 @@ def _remove(self, assignment: DeviceAssignment, action: str) -> None: self._vm.qubesd_call( None, f'admin.vm.device.{self._class}.{action.capitalize()}', - f'{assignment.backend_domain!s}+{assignment.ident!s}' + repr(assignment) ) def get_dedicated_devices(self) -> Iterable[DeviceAssignment]: @@ -153,19 +152,12 @@ def get_attached_devices(self) -> Iterable[DeviceAssignment]: assignments_str = self._vm.qubesd_call( None, 'admin.vm.device.{}.Attached'.format(self._class)).decode() for assignment_str in assignments_str.splitlines(): - device, _, untrusted_rest = assignment_str.partition(' ') - backend_domain_name, identity = device.split('+', 1) - ident, devid = identity.split(':', 1) - if devid == 'None': # TODO - devid = None - backend_domain = self._vm.app.domains.get_blind(backend_domain_name) - import sys; print(f"{identity=}, {ident=}, {devid=}", file=sys.stderr) # TODO debug + head, _, untrusted_rest = assignment_str.partition(' ') + device = Device.from_qarg( + head, self._class, self._vm.app.domains, blind=True) yield DeviceAssignment.deserialize( - untrusted_rest.encode('ascii'), - expected_port=Port(backend_domain, ident, self._class), - expected_identity=devid, - ) + untrusted_rest.encode('ascii'), expected_device=device) def get_assigned_devices( self, required_only: bool = False @@ -178,18 +170,12 @@ def get_assigned_devices( assignments_str = self._vm.qubesd_call( None, 'admin.vm.device.{}.Assigned'.format(self._class)).decode() for assignment_str in assignments_str.splitlines(): - device, _, untrusted_rest = assignment_str.partition(' ') - backend_domain_name, identity = device.split('+', 1) - ident, devid = identity.split(':', 1) - if devid == 'None': # TODO - devid = None - backend_domain = self._vm.app.domains.get_blind(backend_domain_name) + head, _, untrusted_rest = assignment_str.partition(' ') + device = Device.from_qarg( + head, self._class, self._vm.app.domains, blind=True) assignment = DeviceAssignment.deserialize( - untrusted_rest.encode('ascii'), - expected_port=Port(backend_domain, ident, self._class), - expected_identity=devid, - ) + untrusted_rest.encode('ascii'), expected_device=device) if not required_only or assignment.required: yield assignment @@ -219,8 +205,7 @@ def update_assignment(self, device: Port, required: Optional[bool]): self._vm.qubesd_call( None, 'admin.vm.device.{}.Set.assignment'.format(self._class), - '{!s}+{!s}'.format(device.backend_domain, device.ident), - repr(required).encode('utf-8') + repr(device), repr(required).encode('utf-8') ) __iter__ = get_exposed_devices @@ -232,7 +217,7 @@ def clear_cache(self): self._dev_cache.clear() def __getitem__(self, item): - """Get device object with given ident. + """Get device object with given port_id. :returns: py:class:`DeviceInfo` @@ -246,12 +231,12 @@ def __getitem__(self, item): return self._dev_cache[item] # then look for available devices for dev in self.get_exposed_devices(): - if dev.ident == item: + if dev.port_id == item: self._dev_cache[item] = dev return dev # if still nothing, return UnknownDevice instance for the reason # explained in docstring, but don't cache it - return UnknownDevice(self._vm, item, devclass=self._class) + return UnknownDevice(Port(self._vm, item, devclass=self._class)) class DeviceManager(dict): diff --git a/qubesadmin/events/__init__.py b/qubesadmin/events/__init__.py index 3631b2513..0ce1c0952 100644 --- a/qubesadmin/events/__init__.py +++ b/qubesadmin/events/__init__.py @@ -232,9 +232,9 @@ def handle(self, subject, event, **kwargs): if event.startswith('device-') and 'device' in kwargs: try: devclass = event.split(':', 1)[1] - backend_domain, ident = kwargs['device'].split(':', 1) + backend_domain, port_id = kwargs['device'].split(':', 1) kwargs['device'] = self.app.domains.get_blind(backend_domain)\ - .devices[devclass][ident] + .devices[devclass][port_id] except (KeyError, ValueError): pass # invalidate cache if needed; call it before other handlers diff --git a/qubesadmin/tests/app.py b/qubesadmin/tests/app.py index 7aa10e4be..b5afedcbc 100644 --- a/qubesadmin/tests/app.py +++ b/qubesadmin/tests/app.py @@ -735,23 +735,22 @@ def test_043_clone_devices(self): self.app.expected_calls[ ('test-vm', 'admin.vm.device.pci.Assigned', None, None)] = \ - (b"0\0test-vm2+dev1 ident='dev1' devclass='pci' " - b"backend_domain='test-vm2' attach_automatically='yes' " + (b"0\0test-vm2+dev1 port_id='dev1' devclass='pci' " + b"backend_domain='test-vm2' mode='auto-attach' " b"_ro='yes'\n" - b"test-vm3+dev2 ident='dev2' devclass='pci' " - b"backend_domain='test-vm3' attach_automatically='yes' " - b"required='yes'\n") + b"test-vm3+dev2 port_id='dev2' devclass='pci' " + b"backend_domain='test-vm3' mode='required'\n") self.app.expected_calls[ - ('new-name', 'admin.vm.device.pci.Assign', 'test-vm2+dev1', - b"required='no' attach_automatically='yes' ident='dev1' " - b"devclass='pci' backend_domain='test-vm2' " + ('new-name', 'admin.vm.device.pci.Assign', 'test-vm2+dev1:*', + b"device_id='*' port_id='dev1' " + b"devclass='pci' backend_domain='test-vm2' mode='auto-attach' " b"frontend_domain='new-name' _ro='yes'")] = b'0\0' self.app.expected_calls[ - ('new-name', 'admin.vm.device.pci.Assign', 'test-vm3+dev2', - b"required='yes' attach_automatically='yes' ident='dev2' " - b"devclass='pci' backend_domain='test-vm3' " + ('new-name', 'admin.vm.device.pci.Assign', 'test-vm3+dev2:*', + b"device_id='*' port_id='dev2' " + b"devclass='pci' backend_domain='test-vm3' mode='required' " b"frontend_domain='new-name'")] = b'0\0' new_vm = self.app.clone_vm('test-vm', 'new-name') @@ -783,23 +782,22 @@ def test_044_clone_devices_fail(self): self.app.expected_calls[ ('test-vm', 'admin.vm.device.pci.Assigned', None, None)] = \ - (b"0\0test-vm2+dev1 ident='dev1' devclass='pci' " - b"backend_domain='test-vm2' attach_automatically='yes' " + (b"0\0test-vm2+dev1 port_id='dev1' devclass='pci' " + b"backend_domain='test-vm2' mode='auto-attach' " b"_ro='yes'\n" - b"test-vm3+dev2 ident='dev2' devclass='pci' " - b"backend_domain='test-vm3' attach_automatically='yes' " - b"required='yes'\n") + b"test-vm3+dev2 port_id='dev2' devclass='pci' " + b"backend_domain='test-vm3' mode='required'\n") self.app.expected_calls[ - ('new-name', 'admin.vm.device.pci.Assign', 'test-vm2+dev1', - b"required='no' attach_automatically='yes' ident='dev1' " - b"devclass='pci' backend_domain='test-vm2' " + ('new-name', 'admin.vm.device.pci.Assign', 'test-vm2+dev1:*', + b"device_id='*' port_id='dev1' " + b"devclass='pci' backend_domain='test-vm2' mode='auto-attach' " b"frontend_domain='new-name' _ro='yes'")] = b'0\0' self.app.expected_calls[ - ('new-name', 'admin.vm.device.pci.Assign', 'test-vm3+dev2', - b"required='yes' attach_automatically='yes' ident='dev2' " - b"devclass='pci' backend_domain='test-vm3' " + ('new-name', 'admin.vm.device.pci.Assign', 'test-vm3+dev2:*', + b"device_id='*' port_id='dev2' " + b"devclass='pci' backend_domain='test-vm3' mode='required' " b"frontend_domain='new-name'")] = \ b'2\0QubesException\0\0something happened\0' diff --git a/qubesadmin/tests/backup/backupcompatibility.py b/qubesadmin/tests/backup/backupcompatibility.py index c629247c9..453c309f4 100644 --- a/qubesadmin/tests/backup/backupcompatibility.py +++ b/qubesadmin/tests/backup/backupcompatibility.py @@ -1433,14 +1433,14 @@ def setup_expected_calls(self, parsed_qubes_xml, templates_map=None): str(value).encode() if value is not None else b'')] = b'0\0' for bus, devices in vm['devices'].items(): - for (backend_domain, ident), options in devices.items(): + for (backend_domain, port_id), options in devices.items(): encoded_options = \ - (f"required='yes' attach_automatically='yes' " - f"ident='{ident}' devclass='{bus}' backend_domain='{backend_domain}'" + (f"mode='required' port_id='{port_id}' devclass='{bus}'" + f" backend_domain='{backend_domain}'" f" frontend_domain='{name}'".encode()) self.app.expected_calls[ (name, 'admin.vm.device.{}.Assign'.format(bus), - '{}+{}'.format(backend_domain, ident), + '{}+{}'.format(backend_domain, port_id), encoded_options)] = b'0\0' for feature, value in vm['features'].items(): diff --git a/qubesadmin/tests/devices.py b/qubesadmin/tests/devices.py index 49fae93f4..2965866b9 100644 --- a/qubesadmin/tests/devices.py +++ b/qubesadmin/tests/devices.py @@ -21,10 +21,13 @@ import qubesadmin.tests import qubesadmin.device_protocol +from qubesadmin.device_protocol import (DeviceAssignment, Port, Device, + DeviceInfo, UnknownDevice) + serialized_test_device = ( - b"0\0dev1 ident='dev1' devclass='test' vendor='itl' product='test-device' " - b"manufacturer='itl' backend_domain='test-vm' interfaces='?******' ") + b"0\0dev1 port_id='dev1' devclass='test' vendor='itl' product='test-device'" + b" manufacturer='itl' backend_domain='test-vm' interfaces='?******' ") class TC_00_DeviceCollection(qubesadmin.tests.QubesTestCase): @@ -43,9 +46,9 @@ def test_000_available(self): devices = list(self.vm.devices['test'].get_exposed_devices()) self.assertEqual(len(devices), 1) dev = devices[0] - self.assertIsInstance(dev, qubesadmin.device_protocol.DeviceInfo) + self.assertIsInstance(dev, DeviceInfo) self.assertEqual(dev.backend_domain, self.vm) - self.assertEqual(dev.ident, 'dev1') + self.assertEqual(dev.port_id, 'dev1') self.assertEqual( dev.description, '?******: unknown vendor unknown test device') self.assertEqual(dev.data, {}) @@ -59,9 +62,9 @@ def test_001_available_desc(self): devices = list(self.vm.devices['test'].get_exposed_devices()) self.assertEqual(len(devices), 1) dev = devices[0] - self.assertIsInstance(dev, qubesadmin.device_protocol.DeviceInfo) + self.assertIsInstance(dev, DeviceInfo) self.assertEqual(dev.backend_domain, self.vm) - self.assertEqual(dev.ident, 'dev1') + self.assertEqual(dev.port_id, 'dev1') self.assertEqual(dev.description, '?******: itl test-device') self.assertEqual(dev.data, {}) self.assertEqual(str(dev.port), 'test-vm:dev1') @@ -73,9 +76,9 @@ def test_002_available_options(self): devices = list(self.vm.devices['test'].get_exposed_devices()) self.assertEqual(len(devices), 1) dev = devices[0] - self.assertIsInstance(dev, qubesadmin.device_protocol.DeviceInfo) + self.assertIsInstance(dev, DeviceInfo) self.assertEqual(dev.backend_domain, self.vm) - self.assertEqual(dev.ident, 'dev1') + self.assertEqual(dev.port_id, 'dev1') self.assertEqual(dev.description, '?******: itl test-device') self.assertEqual(dev.data, {'ro': 'True', 'other': '123'}) self.assertEqual(str(dev.port), 'test-vm:dev1') @@ -86,9 +89,9 @@ def test_010_getitem(self): ('test-vm', 'admin.vm.device.test.Available', None, None)] = \ serialized_test_device + b"\n" dev = self.vm.devices['test']['dev1'] - self.assertIsInstance(dev, qubesadmin.device_protocol.DeviceInfo) + self.assertIsInstance(dev, DeviceInfo) self.assertEqual(dev.backend_domain, self.vm) - self.assertEqual(dev.ident, 'dev1') + self.assertEqual(dev.port_id, 'dev1') self.assertEqual(dev.description, '?******: itl test-device') self.assertEqual(dev.data, {}) self.assertEqual(str(dev.port), 'test-vm:dev1') @@ -99,9 +102,9 @@ def test_011_getitem_missing(self): ('test-vm', 'admin.vm.device.test.Available', None, None)] = \ serialized_test_device + b"\n" dev = self.vm.devices['test']['dev2'] - self.assertIsInstance(dev, qubesadmin.device_protocol.UnknownDevice) + self.assertIsInstance(dev, UnknownDevice) self.assertEqual(dev.backend_domain, self.vm) - self.assertEqual(dev.ident, 'dev2') + self.assertEqual(dev.port_id, 'dev2') self.assertEqual(dev.description, '?******: unknown vendor unknown test device') self.assertEqual(dev.data, {}) @@ -110,27 +113,27 @@ def test_011_getitem_missing(self): def test_020_attach(self): self.app.expected_calls[ - ('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1', - b"required='no' attach_automatically='no' ident='dev1' " - b"devclass='test' backend_domain='test-vm2' " + ('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1:*', + b"device_id='*' port_id='dev1' devclass='test' " + b"backend_domain='test-vm2' mode='manual' " b"frontend_domain='test-vm'")] = \ b'0\0' - assign = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Port( - self.app.domains['test-vm2'], 'dev1', devclass='test',)) + assign = DeviceAssignment( + Device(Port( + self.app.domains['test-vm2'], 'dev1', devclass='test',))) self.vm.devices['test'].attach(assign) self.assertAllCalled() def test_021_attach_options(self): self.app.expected_calls[ - ('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1', - b"required='no' attach_automatically='no' ident='dev1' " - b"devclass='test' backend_domain='test-vm2' " + ('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1:*', + b"device_id='*' port_id='dev1' devclass='test' " + b"backend_domain='test-vm2' mode='manual' " b"frontend_domain='test-vm' _ro='True' " b"_something='value'")] = b'0\0' - assign = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Port( - self.app.domains['test-vm2'], 'dev1', devclass='test')) + assign = DeviceAssignment( + Device(Port( + self.app.domains['test-vm2'], 'dev1', devclass='test'))) assign.options['ro'] = True assign.options['something'] = 'value' self.vm.devices['test'].attach(assign) @@ -138,26 +141,26 @@ def test_021_attach_options(self): def test_022_attach_required(self): self.app.expected_calls[ - ('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1', - b"required='yes' attach_automatically='yes' ident='dev1' " - b"devclass='test' backend_domain='test-vm2' " + ('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1:*', + b"device_id='*' port_id='dev1' devclass='test' " + b"backend_domain='test-vm2' mode='required' " b"frontend_domain='test-vm'")] = b'0\0' - assign = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Port( - self.app.domains['test-vm2'], 'dev1', devclass='test'), + assign = DeviceAssignment( + Device(Port( + self.app.domains['test-vm2'], 'dev1', devclass='test')), mode='required') self.vm.devices['test'].attach(assign) self.assertAllCalled() def test_023_attach_required_options(self): self.app.expected_calls[ - ('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1', - b"required='yes' attach_automatically='yes' ident='dev1' " - b"devclass='test' backend_domain='test-vm2' " + ('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1:*', + b"device_id='*' port_id='dev1' devclass='test' " + b"backend_domain='test-vm2' mode='required' " b"frontend_domain='test-vm' _ro='True'")] = b'0\0' - assign = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Port( - self.app.domains['test-vm2'], 'dev1', devclass='test'), + assign = DeviceAssignment( + Device(Port( + self.app.domains['test-vm2'], 'dev1', devclass='test')), mode='required') assign.options['ro'] = True self.vm.devices['test'].attach(assign) @@ -165,38 +168,38 @@ def test_023_attach_required_options(self): def test_030_detach(self): self.app.expected_calls[ - ('test-vm', 'admin.vm.device.test.Detach', 'test-vm2+dev1', + ('test-vm', 'admin.vm.device.test.Detach', 'test-vm2+dev1:*', None)] = b'0\0' - assign = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Port( - self.app.domains['test-vm2'], 'dev1', devclass='test')) + assign = DeviceAssignment( + Device(Port( + self.app.domains['test-vm2'], 'dev1', devclass='test'))) self.vm.devices['test'].detach(assign) self.assertAllCalled() def test_040_dedicated(self): self.app.expected_calls[ ('test-vm', 'admin.vm.device.test.Attached', None, None)] = \ - (b"0\0test-vm2+dev1 backend_domain='test-vm2' ident='dev1' " - b"attach_automatically='no' required='no' devclass='test' " + (b"0\0test-vm2+dev1 backend_domain='test-vm2' port_id='dev1' " + b"mode='manual' devclass='test' " b"frontend_domain='test-vm'\n") self.app.expected_calls[ ('test-vm', 'admin.vm.device.test.Assigned', None, None)] = \ (b"0\0test-vm3+dev2 backend_domain='test-vm3' devclass='test' " - b"ident='dev2' attach_automatically='yes' required='yes' " + b"port_id='dev2' mode='required' " b"frontend_domain='test-vm'\n") self.app.expected_calls[ ('test-vm2', 'admin.vm.device.test.Available', None, None)] = \ - b'0\0dev1 description=desc\n' + b"0\0dev1 description='desc'\n" self.app.expected_calls[ ('test-vm3', 'admin.vm.device.test.Available', None, None)] = \ - b'0\0dev2 description=desc\n' + b"0\0dev2 description='desc'\n" dedicated = sorted(list( self.vm.devices['test'].get_dedicated_devices())) self.assertEqual(len(dedicated), 2) - self.assertIsInstance(dedicated[0], qubesadmin.device_protocol.DeviceAssignment) + self.assertIsInstance(dedicated[0], DeviceAssignment) self.assertEqual(dedicated[0].backend_domain, self.app.domains['test-vm2']) - self.assertEqual(dedicated[0].ident, 'dev1') + self.assertEqual(dedicated[0].port_id, 'dev1') self.assertEqual(dedicated[0].frontend_domain, self.app.domains['test-vm']) self.assertEqual(dedicated[0].options, {}) @@ -204,10 +207,10 @@ def test_040_dedicated(self): self.assertEqual(dedicated[0].device, self.app.domains['test-vm2'].devices['test']['dev1']) - self.assertIsInstance(dedicated[1], qubesadmin.device_protocol.DeviceAssignment) + self.assertIsInstance(dedicated[1], DeviceAssignment) self.assertEqual(dedicated[1].backend_domain, self.app.domains['test-vm3']) - self.assertEqual(dedicated[1].ident, 'dev2') + self.assertEqual(dedicated[1].port_id, 'dev2') self.assertEqual(dedicated[1].frontend_domain, self.app.domains['test-vm']) self.assertEqual(dedicated[1].options, {}) @@ -220,31 +223,31 @@ def test_040_dedicated(self): def test_041_assignments_options(self): self.app.expected_calls[ ('test-vm', 'admin.vm.device.test.Attached', None, None)] = \ - (b"0\0test-vm2+dev1 backend_domain='test-vm2' ident='dev1' " - b"attach_automatically='no' required='no' devclass='test' " + (b"0\0test-vm2+dev1 backend_domain='test-vm2' port_id='dev1' " + b"mode='manual' devclass='test' " b"frontend_domain='test-vm' _ro='True'\n") self.app.expected_calls[ ('test-vm', 'admin.vm.device.test.Assigned', None, None)] = \ (b"0\0test-vm3+dev2 backend_domain='test-vm3' devclass='test' " - b"ident='dev2' attach_automatically='yes' required='yes' " + b"port_id='dev2' mode='required' " b"frontend_domain='test-vm' _ro='False'\n") assigns = sorted(list( self.vm.devices['test'].get_dedicated_devices())) self.assertEqual(len(assigns), 2) - self.assertIsInstance(assigns[0], qubesadmin.device_protocol.DeviceAssignment) + self.assertIsInstance(assigns[0], DeviceAssignment) self.assertEqual(assigns[0].backend_domain, self.app.domains['test-vm2']) - self.assertEqual(assigns[0].ident, 'dev1') + self.assertEqual(assigns[0].port_id, 'dev1') self.assertEqual(assigns[0].frontend_domain, self.app.domains['test-vm']) self.assertEqual(assigns[0].options, {'ro': 'True'}) self.assertEqual(assigns[0].required, False) self.assertEqual(assigns[0].devclass, 'test') - self.assertIsInstance(assigns[1], qubesadmin.device_protocol.DeviceAssignment) + self.assertIsInstance(assigns[1], DeviceAssignment) self.assertEqual(assigns[1].backend_domain, self.app.domains['test-vm3']) - self.assertEqual(assigns[1].ident, 'dev2') + self.assertEqual(assigns[1].port_id, 'dev2') self.assertEqual(assigns[1].frontend_domain, self.app.domains['test-vm']) self.assertEqual(assigns[1].options, {'ro': 'False'}) @@ -256,62 +259,75 @@ def test_041_assignments_options(self): def test_050_required(self): self.app.expected_calls[ ('test-vm', 'admin.vm.device.test.Assigned', None, None)] = \ - (b"0\0test-vm2+dev1 backend_domain='test-vm2' ident='dev1' " - b"attach_automatically='no' required='no'\n" + (b"0\0test-vm2+dev1 backend_domain='test-vm2' port_id='dev1' " + b"mode='manual'\n" b"test-vm3+dev2 backend_domain='test-vm3' " - b"ident='dev2' attach_automatically='yes' required='yes'\n") + b"port_id='dev2' mode='required'\n") devs = list(self.vm.devices['test'].get_assigned_devices( required_only=True)) self.assertEqual(len(devs), 1) - self.assertIsInstance(devs[0], qubesadmin.device_protocol.DeviceAssignment) + self.assertIsInstance(devs[0], DeviceAssignment) self.assertEqual(devs[0].backend_domain, self.app.domains['test-vm3']) - self.assertEqual(devs[0].ident, 'dev2') + self.assertEqual(devs[0].port_id, 'dev2') self.assertAllCalled() def test_060_attached(self): self.app.expected_calls[ ('test-vm', 'admin.vm.device.test.Attached', None, None)] = \ - (b"0\0test-vm2+dev1 backend_domain='test-vm2' ident='dev1' " - b"attach_automatically='no' required='no'\n" - b"test-vm3+dev2 backend_domain='test-vm3' ident='dev2' " - b"attach_automatically='yes' required='no'\n") + (b"0\0test-vm2+dev1 backend_domain='test-vm2' port_id='dev1' " + b"mode='manual'\n" + b"test-vm3+dev2 backend_domain='test-vm3' port_id='dev2' " + b"mode='required'\n") devs = list(self.vm.devices['test'].get_attached_devices()) self.assertEqual(len(devs), 2) - self.assertIsInstance(devs[0], qubesadmin.device_protocol.DeviceAssignment) + self.assertIsInstance(devs[0], DeviceAssignment) self.assertEqual(devs[0].backend_domain, self.app.domains['test-vm2']) - self.assertEqual(devs[0].ident, 'dev1') - self.assertIsInstance(devs[1], qubesadmin.device_protocol.DeviceAssignment) + self.assertEqual(devs[0].port_id, 'dev1') + self.assertIsInstance(devs[1], DeviceAssignment) self.assertEqual(devs[1].backend_domain, self.app.domains['test-vm3']) - self.assertEqual(devs[1].ident, 'dev2') + self.assertEqual(devs[1].port_id, 'dev2') self.assertAllCalled() def test_070_update_assignment(self): self.app.expected_calls[ - ('test-vm', 'admin.vm.device.test.Set.assignment', 'test-vm2+dev1', - b'True')] = b'0\0' - dev = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Port( - self.app.domains['test-vm2'], devclass='test', ident='dev1')) + ('test-vm', 'admin.vm.device.test.Set.assignment', + 'test-vm2+dev1:*', b'True')] = b'0\0' + dev = DeviceAssignment( + Device( + Port( + self.app.domains['test-vm2'], + devclass='test', + port_id='dev1'), + )) self.vm.devices['test'].update_assignment(dev, True) self.assertAllCalled() def test_071_update_assignment_false(self): self.app.expected_calls[ - ('test-vm', 'admin.vm.device.test.Set.assignment', 'test-vm2+dev1', - b'False')] = b'0\0' - dev = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Port( - self.app.domains['test-vm2'], devclass='test', ident='dev1')) + ('test-vm', 'admin.vm.device.test.Set.assignment', + 'test-vm2+dev1:*', b'False')] = b'0\0' + dev = DeviceAssignment( + Device( + Port( + self.app.domains['test-vm2'], + devclass='test', + port_id='dev1'), + )) self.vm.devices['test'].update_assignment(dev, False) self.assertAllCalled() def test_072_update_assignment_none(self): self.app.expected_calls[ - ('test-vm', 'admin.vm.device.test.Set.assignment', 'test-vm2+dev1', - b'None')] = b'0\0' - dev = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Port( - self.app.domains['test-vm2'], devclass='test', ident='dev1')) + ('test-vm', 'admin.vm.device.test.Set.assignment', + 'test-vm2+dev1:*', b'None')] = b'0\0' + dev = DeviceAssignment( + Device( + Port( + self.app.domains['test-vm2'], + devclass='test', + port_id='dev1', + ) + )) self.vm.devices['test'].update_assignment(dev, None) self.assertAllCalled() diff --git a/qubesadmin/tests/tools/qvm_device.py b/qubesadmin/tests/tools/qvm_device.py index d96925900..8aee0a6b0 100644 --- a/qubesadmin/tests/tools/qvm_device.py +++ b/qubesadmin/tests/tools/qvm_device.py @@ -45,7 +45,7 @@ def setUp(self): b'test-vm3 class=AppVM state=Running\n') self.expected_device_call( 'test-vm1', 'Available', - b"0\0dev1 ident='dev1' devclass='testclass' vendor='itl'" + b"0\0dev1 port_id='dev1' devclass='testclass' vendor='itl'" b" product='test-device' backend_domain='test-vm1'" ) self.vm1 = self.app.domains['test-vm1'] @@ -58,7 +58,7 @@ def test_000_list_all(self): """ self.expected_device_call( 'test-vm2', 'Available', - b"0\0dev2 ident='dev2' devclass='testclass' vendor='? `'" + b"0\0dev2 port_id='dev2' devclass='testclass' vendor='? `'" b" product='test-device' backend_domain='test-vm2'" ) self.expected_device_call('test-vm3', 'Available') @@ -86,10 +86,10 @@ def test_001_list_assigned_required(self): # This shouldn't be listed self.expected_device_call( 'test-vm2', 'Available', - b"0\0dev2 ident='dev2' devclass='testclass' backend_domain='test-vm2'\n") + b"0\0dev2 port_id='dev2' devclass='testclass' backend_domain='test-vm2'\n") self.expected_device_call( 'test-vm3', 'Available', - b"0\0dev3 ident='dev3' devclass='testclass' backend_domain='test-vm3' vendor='evil inc.' product='test-device-3'\n" + b"0\0dev3 port_id='dev3' devclass='testclass' backend_domain='test-vm3' vendor='evil inc.' product='test-device-3'\n" ) self.expected_device_call('test-vm1', 'Attached') self.expected_device_call('test-vm2', 'Attached') @@ -97,18 +97,16 @@ def test_001_list_assigned_required(self): self.expected_device_call('test-vm1', 'Assigned') self.expected_device_call( 'test-vm2', 'Assigned', - b"0\0test-vm1+dev1 ident='dev1' devclass='testclass' " - b"backend_domain='test-vm1' attach_automatically='yes' " - b"required='yes' _option='other option' _extra_opt='yes'\n" - b"test-vm3+dev3 ident='dev3' devclass='testclass' " - b"backend_domain='test-vm3' attach_automatically='yes' " - b"required='yes'\n" + b"0\0test-vm1+dev1 port_id='dev1' devclass='testclass' " + b"backend_domain='test-vm1' " + b"mode='required' _option='other option' _extra_opt='yes'\n" + b"test-vm3+dev3 port_id='dev3' devclass='testclass' " + b"backend_domain='test-vm3' mode='required'\n" ) self.expected_device_call( 'test-vm3', 'Assigned', - b"0\0test-vm1+dev1 ident='dev1' devclass='testclass' " - b"backend_domain='test-vm1' attach_automatically='yes' " - b"required='yes' _option='test option'\n" + b"0\0test-vm1+dev1 port_id='dev1' devclass='testclass' " + b"backend_domain='test-vm1' mode='required' _option='test option'\n" ) with qubesadmin.tests.tools.StdoutBuffer() as buf: @@ -129,15 +127,14 @@ def test_002_list_attach(self): # This shouldn't be listed self.expected_device_call( 'test-vm2', 'Available', - b"0\0dev2 ident='dev1' devclass='testclass' backend_domain='test-vm2'\n") + b"0\0dev2 port_id='dev1' devclass='testclass' backend_domain='test-vm2'\n") self.expected_device_call('test-vm3', 'Available') self.expected_device_call('test-vm1', 'Attached') self.expected_device_call('test-vm2', 'Attached') self.expected_device_call( 'test-vm3', 'Attached', - b"0\0test-vm1+dev1 ident='dev1' devclass='testclass' " - b"backend_domain='test-vm1' attach_automatically='yes' " - b"required='yes'\n" + b"0\0test-vm1+dev1 port_id='dev1' devclass='testclass' " + b"backend_domain='test-vm1' mode='required'\n" ) self.expected_device_call('test-vm1', 'Assigned') self.expected_device_call('test-vm2', 'Assigned') @@ -169,9 +166,10 @@ def test_003_list_device_classes(self): def test_010_attach(self): """ Test attach action """ self.app.expected_calls[( - 'test-vm2', 'admin.vm.device.testclass.Attach', 'test-vm1+dev1', - b"required='no' attach_automatically='no' ident='dev1' " - b"devclass='testclass' backend_domain='test-vm1' " + 'test-vm2', 'admin.vm.device.testclass.Attach', + 'test-vm1+dev1:*', + b"device_id='*' port_id='dev1' " + b"devclass='testclass' backend_domain='test-vm1' mode='manual' " b"frontend_domain='test-vm2'")] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'attach', 'test-vm2', 'test-vm1:dev1'], app=self.app) @@ -180,9 +178,10 @@ def test_010_attach(self): def test_011_attach_options(self): """ Test `read-only` attach option """ self.app.expected_calls[( - 'test-vm2', 'admin.vm.device.testclass.Attach', 'test-vm1+dev1', - b"required='no' attach_automatically='no' ident='dev1' " - b"devclass='testclass' backend_domain='test-vm1' " + 'test-vm2', 'admin.vm.device.testclass.Attach', + 'test-vm1+dev1:*', + b"device_id='*' port_id='dev1' " + b"devclass='testclass' backend_domain='test-vm1' mode='manual' " b"frontend_domain='test-vm2' _read-only='yes'")] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'attach', '-o', 'ro=True', 'test-vm2', 'test-vm1:dev1'], @@ -226,7 +225,7 @@ def test_020_detach(self): """ Test detach action """ self.app.expected_calls[ ('test-vm2', 'admin.vm.device.testclass.Detach', - 'test-vm1+dev1', None)] = b'0\0' + 'test-vm1+dev1:*', None)] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'detach', 'test-vm2', 'test-vm1:dev1'], app=self.app) self.assertAllCalled() @@ -235,7 +234,7 @@ def test_021_detach_unknown(self): """ Test detach action """ self.app.expected_calls[ ('test-vm2', 'admin.vm.device.testclass.Detach', - 'test-vm1+dev7', None)] = b'0\0' + 'test-vm1+dev7:*', None)] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'detach', 'test-vm2', 'test-vm1:dev7'], app=self.app) self.assertAllCalled() @@ -247,10 +246,10 @@ def test_022_detach_all(self): b'0\0test-vm1+dev1\ntest-vm1+dev2\n' self.app.expected_calls[ ('test-vm2', 'admin.vm.device.testclass.Detach', - 'test-vm1+dev1', None)] = b'0\0' + 'test-vm1+dev1:*', None)] = b'0\0' self.app.expected_calls[ ('test-vm2', 'admin.vm.device.testclass.Detach', - 'test-vm1+dev2', None)] = b'0\0' + 'test-vm1+dev2:*', None)] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'detach', 'test-vm2'], app=self.app) self.assertAllCalled() @@ -259,10 +258,11 @@ def test_030_assign(self): """ Test assign action """ self.app.domains['test-vm2'].is_running = lambda: True self.app.expected_calls[( - 'test-vm2', 'admin.vm.device.testclass.Assign', 'test-vm1+dev1', - b"required='no' attach_automatically='yes' ident='dev1' " + 'test-vm2', 'admin.vm.device.testclass.Assign', + 'test-vm1+dev1:0000:0000::?******', + b"device_id='0000:0000::?******' port_id='dev1' " b"devclass='testclass' backend_domain='test-vm1' " - b"frontend_domain='test-vm2' device_identity='0000:0000::?******'" + b"mode='auto-attach' frontend_domain='test-vm2'" )] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'assign', 'test-vm2', 'test-vm1:dev1'], app=self.app) @@ -272,10 +272,11 @@ def test_031_assign_required(self): """ Test assign as required """ self.app.domains['test-vm2'].is_running = lambda: True self.app.expected_calls[( - 'test-vm2', 'admin.vm.device.testclass.Assign', 'test-vm1+dev1', - b"required='yes' attach_automatically='yes' ident='dev1' " - b"devclass='testclass' backend_domain='test-vm1' " - b"frontend_domain='test-vm2' device_identity='0000:0000::?******'" + 'test-vm2', 'admin.vm.device.testclass.Assign', + 'test-vm1+dev1:0000:0000::?******', + b"device_id='0000:0000::?******' port_id='dev1' " + b"devclass='testclass' backend_domain='test-vm1' mode='required' " + b"frontend_domain='test-vm2'" )] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'assign', '--required', 'test-vm2', 'test-vm1:dev1'], app=self.app) @@ -285,11 +286,12 @@ def test_032_assign_options(self): """ Test `read-only` assign option """ self.app.domains['test-vm2'].is_running = lambda: True self.app.expected_calls[( - 'test-vm2', 'admin.vm.device.testclass.Assign', 'test-vm1+dev1', - b"required='no' attach_automatically='yes' ident='dev1' " + 'test-vm2', 'admin.vm.device.testclass.Assign', + 'test-vm1+dev1:0000:0000::?******', + b"device_id='0000:0000::?******' port_id='dev1' " b"devclass='testclass' backend_domain='test-vm1' " - b"frontend_domain='test-vm2' _read-only='yes' " - b"device_identity='0000:0000::?******'")] = b'0\0' + b"mode='auto-attach' frontend_domain='test-vm2' _read-only='yes'" + )] = b'0\0' with qubesadmin.tests.tools.StdoutBuffer() as buf: qubesadmin.tools.qvm_device.main( ['testclass', 'assign', '--ro', 'test-vm2', 'test-vm1:dev1'], @@ -333,7 +335,7 @@ def test_040_unassign(self): """ Test unassign action """ self.app.expected_calls[ ('test-vm2', 'admin.vm.device.testclass.Unassign', - 'test-vm1+dev1', None)] = b'0\0' + 'test-vm1+dev1:*', None)] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'unassign', 'test-vm2', 'test-vm1:dev1'], app=self.app) self.assertAllCalled() @@ -342,7 +344,7 @@ def test_041_unassign_unknown(self): """ Test unassign action """ self.app.expected_calls[ ('test-vm2', 'admin.vm.device.testclass.Unassign', - 'test-vm1+dev7', None)] = b'0\0' + 'test-vm1+dev7:*', None)] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'unassign', 'test-vm2', 'test-vm1:dev7'], app=self.app) self.assertAllCalled() @@ -355,10 +357,10 @@ def test_042_unassign_all(self): b"test-vm1+dev2 devclass='testclass'\n") self.app.expected_calls[ ('test-vm2', 'admin.vm.device.testclass.Unassign', - 'test-vm1+dev1', None)] = b'0\0' + 'test-vm1+dev1:*', None)] = b'0\0' self.app.expected_calls[ ('test-vm2', 'admin.vm.device.testclass.Unassign', - 'test-vm1+dev2', None)] = b'0\0' + 'test-vm1+dev2:*', None)] = b'0\0' qubesadmin.tools.qvm_device.main( ['testclass', 'unassign', 'test-vm2'], app=self.app) self.assertAllCalled() diff --git a/qubesadmin/tests/tools/qvm_start.py b/qubesadmin/tests/tools/qvm_start.py index 338cdb422..6d03d5400 100644 --- a/qubesadmin/tests/tools/qvm_start.py +++ b/qubesadmin/tests/tools/qvm_start.py @@ -81,13 +81,13 @@ def test_010_drive_cdrom(self): self.app.expected_calls[ ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Assign', 'dom0+sr0', - b"required='yes' attach_automatically='yes' ident='sr0' " - b"devclass='block' backend_domain='dom0' " + ('some-vm', 'admin.vm.device.block.Assign', 'dom0+sr0:*', + b"device_id='*' port_id='sr0' " + b"devclass='block' backend_domain='dom0' mode='required' " b"frontend_domain='some-vm' _devtype='cdrom' " b"_read-only='True'")] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Unassign', 'dom0+sr0', None)] = \ + ('some-vm', 'admin.vm.device.block.Unassign', 'dom0+sr0:*', None)] = \ b'0\x00' qubesadmin.tools.qvm_start.main(['--cdrom=dom0:sr0', 'some-vm'], app=self.app) @@ -104,13 +104,13 @@ def test_011_drive_disk(self): self.app.expected_calls[ ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Assign', 'dom0+sdb1', - b"required='yes' attach_automatically='yes' ident='sdb1' " - b"devclass='block' backend_domain='dom0' " + ('some-vm', 'admin.vm.device.block.Assign', 'dom0+sdb1:*', + b"device_id='*' port_id='sdb1' " + b"devclass='block' backend_domain='dom0' mode='required' " b"frontend_domain='some-vm' _devtype='disk' " b"_read-only='False'")] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Unassign', 'dom0+sdb1', + ('some-vm', 'admin.vm.device.block.Unassign', 'dom0+sdb1:*', None)] = b'0\x00' qubesadmin.tools.qvm_start.main(['--hd=dom0:sdb1', 'some-vm'], app=self.app) @@ -127,13 +127,13 @@ def test_012_drive_disk(self): self.app.expected_calls[ ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Assign', 'dom0+sdb1', - b"required='yes' attach_automatically='yes' ident='sdb1' " - b"devclass='block' backend_domain='dom0' " + ('some-vm', 'admin.vm.device.block.Assign', 'dom0+sdb1:*', + b"device_id='*' port_id='sdb1' " + b"devclass='block' backend_domain='dom0' mode='required' " b"frontend_domain='some-vm' _devtype='disk' " b"_read-only='False'")] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Unassign', 'dom0+sdb1', + ('some-vm', 'admin.vm.device.block.Unassign', 'dom0+sdb1:*', None)] = b'0\x00' qubesadmin.tools.qvm_start.main(['--drive=hd:dom0:sdb1', 'some-vm'], app=self.app) @@ -154,13 +154,13 @@ def test_013_drive_loop_local(self, mock_subprocess): self.app.expected_calls[ ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Assign', 'dom0+loop12', - b"required='yes' attach_automatically='yes' ident='loop12' " - b"devclass='block' backend_domain='dom0' " + ('some-vm', 'admin.vm.device.block.Assign', 'dom0+loop12:*', + b"device_id='*' port_id='loop12' " + b"devclass='block' backend_domain='dom0' mode='required' " b"frontend_domain='some-vm' _devtype='cdrom' " b"_read-only='True'")] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Unassign', 'dom0+loop12', None + ('some-vm', 'admin.vm.device.block.Unassign', 'dom0+loop12:*', None )] = b'0\x00loop12\n' mock_subprocess.return_value = b"/dev/loop12" qubesadmin.tools.qvm_start.main([ @@ -186,13 +186,13 @@ def test_014_drive_loop_remote(self): self.app.expected_calls[ ('some-vm', 'admin.vm.Start', None, None)] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Assign', 'other-vm+loop7', - b"required='yes' attach_automatically='yes' ident='loop7' " - b"devclass='block' backend_domain='other-vm' " + ('some-vm', 'admin.vm.device.block.Assign', 'other-vm+loop7:*', + b"device_id='*' port_id='loop7' " + b"devclass='block' backend_domain='other-vm' mode='required' " b"frontend_domain='some-vm' _devtype='cdrom' " b"_read-only='True'")] = b'0\x00' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Unassign', 'other-vm+loop7', + ('some-vm', 'admin.vm.device.block.Unassign', 'other-vm+loop7:*', None)] = b'0\x00' self.app.expected_calls[ ('other-vm', 'admin.vm.feature.CheckWithTemplate', 'vmexec', @@ -220,9 +220,9 @@ def test_015_drive_failed_start(self): ('some-vm', 'admin.vm.CurrentState', None, None)] = \ b'0\x00power_state=Halted' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Assign', 'other-vm+loop7', - b"required='yes' attach_automatically='yes' ident='loop7' " - b"devclass='block' backend_domain='other-vm' " + ('some-vm', 'admin.vm.device.block.Assign', 'other-vm+loop7:*', + b"device_id='*' port_id='loop7' " + b"devclass='block' backend_domain='other-vm' mode='required' " b"frontend_domain='some-vm' _devtype='cdrom' " b"_read-only='True'")] = b'0\x00' self.app.expected_calls[ @@ -230,7 +230,7 @@ def test_015_drive_failed_start(self): b'2\x00QubesException\x00\x00An error occurred\x00' self.app.expected_calls[ ('some-vm', 'admin.vm.device.block.Detach', - 'other-vm+loop7', None)] = b'0\x00' + 'other-vm+loop7:*', None)] = b'0\x00' qubesadmin.tools.qvm_start.main([ '--cdrom=other-vm:loop7', 'some-vm'], @@ -247,9 +247,9 @@ def test_016_drive_failed_attach(self): ('some-vm', 'admin.vm.CurrentState', None, None)] = \ b'0\x00power_state=Halted' self.app.expected_calls[ - ('some-vm', 'admin.vm.device.block.Assign', 'other-vm+loop7', - b"required='yes' attach_automatically='yes' ident='loop7' " - b"devclass='block' backend_domain='other-vm' " + ('some-vm', 'admin.vm.device.block.Assign', 'other-vm+loop7:*', + b"device_id='*' port_id='loop7' " + b"devclass='block' backend_domain='other-vm' mode='required' " b"frontend_domain='some-vm' _devtype='cdrom' " b"_read-only='True'")] = \ b'2\x00QubesException\x00\x00An error occurred\x00' diff --git a/qubesadmin/tools/qvm_device.py b/qubesadmin/tools/qvm_device.py index 4e0c54799..31536ace8 100644 --- a/qubesadmin/tools/qvm_device.py +++ b/qubesadmin/tools/qvm_device.py @@ -67,7 +67,7 @@ class Line(object): # pylint: disable=too-few-public-methods def __init__(self, device: DeviceInfo, attached_to=None): - self.ident = "{!s}:{!s}".format(device.backend_domain, device.ident) + self.ident = "{!s}:{!s}".format(device.backend_domain, device.port_id) # TODO! self.description = device.description self.attached_to = attached_to if attached_to else "" self.frontends = [] @@ -207,7 +207,8 @@ def assign_device(args): """ vm = args.domains[0] device = args.device - identity = device.self_identity if not args.port else None + if not args.port: + device.device_id = None options = dict(opt.split('=', 1) for opt in args.option or []) if args.ro: options['read-only'] = 'yes' @@ -219,7 +220,6 @@ def assign_device(args): mode = 'ask-to-attach' assignment = DeviceAssignment( device, - device_identity=identity, mode=mode, options=options ) @@ -227,7 +227,7 @@ def assign_device(args): if vm.is_running() and not assignment.attached and not args.quiet: print("Assigned. To attach you can now restart domain or run: \n" f"\tqvm-{assignment.devclass} attach {vm} " - f"{assignment.backend_domain}:{assignment.ident}") + f"{assignment.backend_domain}:{assignment.port_id}") def unassign_device(args): @@ -253,7 +253,7 @@ def _unassign_and_show_message(assignment, vm, args): if assignment.attached and not args.quiet: print("Unassigned. To detach you can now restart domain or run: \n" f"\tqvm-{assignment.devclass} detach {vm} " - f"{assignment.backend_domain}:{assignment.ident}") + f"{assignment.backend_domain}:{assignment.port_id}") def info_device(args): diff --git a/qubesadmin/tools/qvm_start.py b/qubesadmin/tools/qvm_start.py index 631e82e3d..485a8b32f 100644 --- a/qubesadmin/tools/qvm_start.py +++ b/qubesadmin/tools/qvm_start.py @@ -101,7 +101,7 @@ def get_drive_assignment(app, drive_str): drive_str = drive_str[len('hd:'):] try: - backend_domain_name, ident = drive_str.split(':', 1) + backend_domain_name, port_id = drive_str.split(':', 1) except ValueError: raise ValueError("Incorrect image name: image must be in the format " "of VMNAME:full_path, for example " @@ -111,7 +111,7 @@ def get_drive_assignment(app, drive_str): except KeyError: raise qubesadmin.exc.QubesVMNotFoundError( 'No such VM: %s', backend_domain_name) - if ident.startswith('/'): + if port_id.startswith('/'): # it is a path - if we're running in dom0, try to call losetup to # export the device, otherwise reject if app.qubesd_connection_type == 'qrexec': @@ -121,11 +121,11 @@ def get_drive_assignment(app, drive_str): try: if backend_domain.klass == 'AdminVM': loop_name = subprocess.check_output( - ['sudo', 'losetup', '-f', '--show', ident]) + ['sudo', 'losetup', '-f', '--show', port_id]) loop_name = loop_name.strip() else: untrusted_loop_name, _ = backend_domain.run_with_args( - 'losetup', '-f', '--show', ident, + 'losetup', '-f', '--show', port_id, user='root') untrusted_loop_name = untrusted_loop_name.strip() allowed_chars = string.ascii_lowercase + string.digits + '/' @@ -138,18 +138,18 @@ def get_drive_assignment(app, drive_str): del untrusted_loop_name except subprocess.CalledProcessError: raise qubesadmin.exc.QubesException( - 'Failed to setup loop device for %s', ident) + 'Failed to setup loop device for %s', port_id) assert loop_name.startswith(b'/dev/loop') - ident = loop_name.decode().split('/')[2] + port_id = loop_name.decode().split('/')[2] # wait for device to appear # FIXME: convert this to waiting for event timeout = 10 - while isinstance(backend_domain.devices['block'][ident], + while isinstance(backend_domain.devices['block'][port_id], qubesadmin.device_protocol.UnknownDevice): if timeout == 0: raise qubesadmin.exc.QubesException( 'Timeout waiting for {}:{} device to appear'.format( - backend_domain.name, ident)) + backend_domain.name, port_id)) timeout -= 1 time.sleep(1) @@ -158,11 +158,11 @@ def get_drive_assignment(app, drive_str): 'read-only': devtype == 'cdrom' } assignment = qubesadmin.device_protocol.DeviceAssignment( - qubesadmin.device_protocol.Port( + qubesadmin.device_protocol.Device(qubesadmin.device_protocol.Port( backend_domain=backend_domain, - ident=ident, + port_id=port_id, devclass='block', - ), + )), options=options, mode="required")