-
-
Notifications
You must be signed in to change notification settings - Fork 109
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7b755c7
commit 458c1c1
Showing
11 changed files
with
160 additions
and
171 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,10 +5,10 @@ | |
# Copyright (C) 2010-2016 Joanna Rutkowska <[email protected]> | ||
# Copyright (C) 2015-2016 Wojtek Porczyk <[email protected]> | ||
# Copyright (C) 2016 Bahtiar `kalkin-` Gadimov <[email protected]> | ||
# Copyright (C) 2017 Marek Marczykowski-Górecki | ||
# Copyright (C) 2017 Marek Marczykowski-Górecki | ||
# <[email protected]> | ||
# Copyright (C) 2024 Piotr Bartman-Szwarc | ||
# <[email protected]> | ||
# <[email protected]> | ||
# | ||
# This library is free software; you can redistribute it and/or | ||
# modify it under the terms of the GNU Lesser General Public | ||
|
@@ -53,101 +53,82 @@ def qbool(value): | |
return qubes.property.bool(None, None, value) | ||
|
||
|
||
class Device: | ||
class Port: | ||
""" | ||
Basic class of a *bus* device with *ident* exposed by a *backend domain*. | ||
Class of a *bus* device port with *ident* exposed by a *backend domain*. | ||
Attributes: | ||
backend_domain (QubesVM): The domain which exposes devices, | ||
e.g.`sys-usb`. | ||
ident (str): A unique identifier for the device within | ||
the backend domain. | ||
devclass (str, optional): The class of the device (e.g., 'usb', 'pci'). | ||
ident (str): A unique identifier for the port within the backend domain. | ||
devclass (str): The class of the port (e.g., 'usb', 'pci'). | ||
""" | ||
ALLOWED_CHARS_KEY = set( | ||
string.digits + string.ascii_letters | ||
+ r"!#$%&()*+,-./:;<>?@[\]^_{|}~") | ||
ALLOWED_CHARS_PARAM = ALLOWED_CHARS_KEY.union(set(string.punctuation + ' ')) | ||
|
||
def __init__(self, backend_domain, ident, devclass=None): | ||
def __init__(self, backend_domain, ident, devclass): | ||
self.__backend_domain = backend_domain | ||
self.__ident = ident | ||
self.__bus = devclass | ||
self.__devclass = devclass | ||
|
||
def __hash__(self): | ||
return hash((str(self.backend_domain), self.ident)) | ||
return hash((self.backend_domain.name, self.ident, self.devclass)) | ||
|
||
def __eq__(self, other): | ||
if isinstance(other, Device): | ||
if isinstance(other, Port): | ||
return ( | ||
self.backend_domain == other.backend_domain and | ||
self.ident == other.ident | ||
self.ident == other.ident and | ||
self.devclass == other.devclass | ||
) | ||
raise TypeError(f"Comparing instances of 'Device' and '{type(other)}' " | ||
raise TypeError(f"Comparing instances of 'Port' and '{type(other)}' " | ||
"is not supported") | ||
|
||
def __lt__(self, other): | ||
if isinstance(other, Device): | ||
return (self.backend_domain.name, self.ident) < \ | ||
(other.backend_domain.name, other.ident) | ||
raise TypeError(f"Comparing instances of 'Device' and '{type(other)}' " | ||
if isinstance(other, Port): | ||
return (self.backend_domain.name, self.devclass, self.ident) < \ | ||
(other.backend_domain.name, other.devclass, other.ident) | ||
raise TypeError(f"Comparing instances of 'Port' and '{type(other)}' " | ||
"is not supported") | ||
|
||
def __repr__(self): | ||
return "[%s]:%s" % (self.backend_domain, self.ident) | ||
return f"[{self.backend_domain.name}]:{self.devclass}:{self.ident}" | ||
|
||
def __str__(self): | ||
return '{!s}:{!s}'.format(self.backend_domain, self.ident) | ||
return f"{self.backend_domain.name}:{self.ident}" | ||
|
||
@property | ||
def ident(self) -> str: | ||
""" | ||
Immutable device identifier. | ||
Immutable port identifier. | ||
Unique for given domain and device type. | ||
Unique for given domain and devclass. | ||
""" | ||
return self.__ident | ||
|
||
@property | ||
def backend_domain(self) -> QubesVM: | ||
""" Which domain provides this device. (immutable)""" | ||
""" Which domain exposed this port. (immutable)""" | ||
return self.__backend_domain | ||
|
||
@property | ||
def devclass(self) -> str: | ||
""" Immutable* Device class such like: 'usb', 'pci' etc. | ||
""" Immutable port class such like: 'usb', 'pci' etc. | ||
For unknown devices "peripheral" is returned. | ||
*see `@devclass.setter` | ||
For unknown classes "peripheral" is returned. | ||
""" | ||
if self.__bus: | ||
return self.__bus | ||
if self.__devclass: | ||
return self.__devclass | ||
return "peripheral" | ||
|
||
@property | ||
def devclass_is_set(self) -> bool: | ||
""" | ||
Returns true if devclass is already initialised. | ||
""" | ||
return bool(self.__bus) | ||
|
||
@devclass.setter | ||
def devclass(self, devclass: str): | ||
""" Once a value is set, it should not be overridden. | ||
However, if it has not been set, i.e., the value is `None`, | ||
we can override it.""" | ||
if self.__bus is not None: | ||
raise TypeError("Attribute devclass is immutable") | ||
self.__bus = devclass | ||
|
||
@classmethod | ||
def unpack_properties( | ||
cls, untrusted_serialization: bytes | ||
) -> Tuple[Dict, Dict]: | ||
""" | ||
Unpacks basic device properties from a serialized encoded string. | ||
Unpacks basic port properties from a serialized encoded string. | ||
Returns: | ||
tuple: A tuple containing two dictionaries, properties and options, | ||
|
@@ -215,17 +196,17 @@ def pack_property(cls, key: str, value: str): | |
|
||
@staticmethod | ||
def check_device_properties( | ||
expected_device: 'Device', properties: Dict[str, Any]): | ||
expected_port: 'Port', properties: Dict[str, Any]): | ||
""" | ||
Validates properties against an expected device configuration. | ||
Validates properties against an expected port configuration. | ||
Modifies `properties`. | ||
Raises: | ||
UnexpectedDeviceProperty: If any property does not match | ||
the expected values. | ||
""" | ||
expected = expected_device | ||
expected = expected_port | ||
exp_vm_name = expected.backend_domain.name | ||
if properties.get('backend_domain', exp_vm_name) != exp_vm_name: | ||
raise UnexpectedDeviceProperty( | ||
|
@@ -239,13 +220,11 @@ def check_device_properties( | |
f"when expected id: {expected.ident}.") | ||
properties['ident'] = expected.ident | ||
|
||
if expected.devclass_is_set: | ||
if (properties.get('devclass', expected.devclass) | ||
!= expected.devclass): | ||
raise UnexpectedDeviceProperty( | ||
f"Got {properties['devclass']} device " | ||
f"when expected {expected.devclass}.") | ||
properties['devclass'] = expected.devclass | ||
if properties.get('devclass', expected.devclass) != expected.devclass: | ||
raise UnexpectedDeviceProperty( | ||
f"Got {properties['devclass']} device " | ||
f"when expected {expected.devclass}.") | ||
properties['devclass'] = expected.devclass | ||
|
||
|
||
class DeviceCategory(Enum): | ||
|
@@ -428,27 +407,24 @@ def _load_classes(bus: str): | |
return result | ||
|
||
|
||
class DeviceInfo(Device): | ||
class DeviceInfo(Port): | ||
""" Holds all information about a device """ | ||
|
||
def __init__( | ||
self, | ||
backend_domain: QubesVM, | ||
ident: str, | ||
*, | ||
devclass: Optional[str] = None, | ||
port: Port, | ||
vendor: Optional[str] = None, | ||
product: Optional[str] = None, | ||
manufacturer: Optional[str] = None, | ||
name: Optional[str] = None, | ||
serial: Optional[str] = None, | ||
interfaces: Optional[List[DeviceInterface]] = None, | ||
parent: Optional[Device] = None, | ||
parent: Optional[Port] = None, | ||
attachment: Optional[QubesVM] = None, | ||
self_identity: Optional[str] = None, | ||
**kwargs | ||
): | ||
super().__init__(backend_domain, ident, devclass) | ||
super().__init__(port.backend_domain, port.ident, port.devclass) | ||
|
||
self._vendor = vendor | ||
self._product = product | ||
|
@@ -462,6 +438,13 @@ def __init__( | |
|
||
self.data = kwargs | ||
|
||
@property | ||
def port(self) -> Port: | ||
""" | ||
Device port visible in Qubes. | ||
""" | ||
return Port(self.backend_domain, self.ident, self.devclass) | ||
|
||
@property | ||
def vendor(self) -> str: | ||
""" | ||
|
@@ -570,7 +553,7 @@ def interfaces(self) -> List[DeviceInterface]: | |
return self._interfaces | ||
|
||
@property | ||
def parent_device(self) -> Optional[Device]: | ||
def parent_device(self) -> Optional[Port]: | ||
""" | ||
The parent device, if any. | ||
|
@@ -663,28 +646,27 @@ def deserialize( | |
def _deserialize( | ||
cls, | ||
untrusted_serialization: bytes, | ||
expected_device: Device | ||
expected_port: Port | ||
) -> 'DeviceInfo': | ||
""" | ||
Actually deserializes the object. | ||
""" | ||
properties, options = cls.unpack_properties(untrusted_serialization) | ||
properties.update(options) | ||
|
||
cls.check_device_properties(expected_device, properties) | ||
cls.check_device_properties(expected_port, properties) | ||
|
||
if 'attachment' not in properties or not properties['attachment']: | ||
properties['attachment'] = None | ||
else: | ||
app = expected_device.backend_domain.app | ||
app = expected_port.backend_domain.app | ||
properties['attachment'] = app.domains.get_blind( | ||
properties['attachment']) | ||
|
||
if (expected_device.devclass_is_set | ||
and properties['devclass'] != expected_device.devclass): | ||
if properties['devclass'] != expected_port.devclass: | ||
raise UnexpectedDeviceProperty( | ||
f"Got {properties['devclass']} device " | ||
f"when expected {expected_device.devclass}.") | ||
f"when expected {expected_port.devclass}.") | ||
|
||
if 'interfaces' in properties: | ||
interfaces = properties['interfaces'] | ||
|
@@ -694,15 +676,23 @@ def _deserialize( | |
properties['interfaces'] = interfaces | ||
|
||
if 'parent_ident' in properties: | ||
properties['parent'] = Device( | ||
backend_domain=expected_device.backend_domain, | ||
properties['parent'] = Port( | ||
backend_domain=expected_port.backend_domain, | ||
ident=properties['parent_ident'], | ||
devclass=properties['parent_devclass'], | ||
) | ||
del properties['parent_ident'] | ||
del properties['parent_devclass'] | ||
|
||
return cls(**properties) | ||
port = Port( | ||
properties['backend_domain'], | ||
properties['ident'], | ||
properties['devclass']) | ||
del properties['backend_domain'] | ||
del properties['ident'] | ||
del properties['devclass'] | ||
|
||
return cls(port, **properties) | ||
|
||
@property | ||
def self_identity(self) -> str: | ||
|
@@ -770,10 +760,11 @@ class UnknownDevice(DeviceInfo): | |
"""Unknown device - for example, exposed by domain not running currently""" | ||
|
||
def __init__(self, backend_domain, ident, *, devclass, **kwargs): | ||
super().__init__(backend_domain, ident, devclass=devclass, **kwargs) | ||
port = Port(backend_domain, ident, devclass) | ||
super().__init__(port, **kwargs) | ||
|
||
|
||
class DeviceAssignment(Device): | ||
class DeviceAssignment(Port): | ||
""" Maps a device to a frontend_domain. | ||
There are 3 flags `attached`, `automatically_attached` and `required`. | ||
|
@@ -821,7 +812,7 @@ def clone(self, **kwargs): | |
return self.__class__(**attr) | ||
|
||
@classmethod | ||
def from_device(cls, device: Device, **kwargs) -> 'DeviceAssignment': | ||
def from_device(cls, device: Port, **kwargs) -> 'DeviceAssignment': | ||
""" | ||
Get assignment of the device. | ||
""" | ||
|
@@ -923,13 +914,13 @@ def serialize(self) -> bytes: | |
def deserialize( | ||
cls, | ||
serialization: bytes, | ||
expected_device: Device, | ||
expected_port: Port, | ||
) -> 'DeviceAssignment': | ||
""" | ||
Recovers a serialized object, see: :py:meth:`serialize`. | ||
""" | ||
try: | ||
result = cls._deserialize(serialization, expected_device) | ||
result = cls._deserialize(serialization, expected_port) | ||
except Exception as exc: | ||
raise ProtocolError() from exc | ||
return result | ||
|
@@ -938,15 +929,15 @@ def deserialize( | |
def _deserialize( | ||
cls, | ||
untrusted_serialization: bytes, | ||
expected_device: Device, | ||
expected_port: Port, | ||
) -> 'DeviceAssignment': | ||
""" | ||
Actually deserializes the object. | ||
""" | ||
properties, options = cls.unpack_properties(untrusted_serialization) | ||
properties['options'] = options | ||
|
||
cls.check_device_properties(expected_device, properties) | ||
cls.check_device_properties(expected_port, properties) | ||
|
||
properties['attach_automatically'] = qbool( | ||
properties.get('attach_automatically', 'no')) | ||
|
Oops, something went wrong.