diff --git a/qrexec/policy/parser.py b/qrexec/policy/parser.py index 8013edb2..3f3f7df0 100644 --- a/qrexec/policy/parser.py +++ b/qrexec/policy/parser.py @@ -30,6 +30,7 @@ import itertools import logging import pathlib +import re import string from typing import ( @@ -235,11 +236,11 @@ def __new__(cls, token: str, *, filepath: Optional[pathlib.Path]=None, orig_token = token # first, adjust some aliases - if token == "dom0": + if token in ("dom0", "uuid:00000000-0000-0000-0000-000000000000"): # TODO: log a warning in Qubes 4.1 token = "@adminvm" - # if user specified just qube name, use it directly + # if user specified just qube name or UUID, use it directly if not (token.startswith("@") or token == "*"): return super().__new__(cls, token) @@ -300,13 +301,28 @@ def __init__(self, token: str, *, filepath: Optional[pathlib.Path]=None, # This replaces is_match() and is_match_single(). def match( self, - other: Optional[str], + other: str, *, system_info: FullSystemInfo, source: Optional["VMToken"]=None ) -> bool: """Check if this token matches opposite token""" - # pylint: disable=unused-argument + # pylint: disable=unused-argument,too-many-return-statements + if self == "@adminvm": + return other == "@adminvm" + info = system_info["domains"] + if self.startswith("uuid:"): + if other.startswith("uuid:"): + return self == other + try: + return self[5:] == info[str(other)]["uuid"] + except KeyError: + return False + if other.startswith("uuid:"): + try: + return other[5:] == info[str(self)]["uuid"] + except KeyError: + return False return self == other def is_special_value(self) -> bool: @@ -339,9 +355,12 @@ def expand(self, *, system_info: FullSystemInfo) -> Iterable[VMToken]: This is used as part of :py:meth:`Policy.collect_targets_for_ask()`. """ - if self in system_info["domains"]: - yield IntendedTarget(self) - + info = system_info["domains"] + if self in info: + if self.startswith("uuid:"): + yield IntendedTarget(type(self)(info[self])) + else: + yield IntendedTarget(self) class Target(_BaseTarget): # pylint: disable=missing-docstring @@ -362,10 +381,12 @@ def __new__( return super().__new__(cls, value, filepath=filepath, lineno=lineno) # type: ignore +_uuid_regex = re.compile(r"\A[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}\Z") + # this method (with overloads in subclasses) was verify_target_value class IntendedTarget(VMToken): # pylint: disable=missing-docstring - def verify(self, *, system_info: FullSystemInfo) -> VMToken: + def verify(self, *, system_info: FullSystemInfo) -> Optional[VMToken]: """Check if given value names valid target This function check if given value is not only syntactically correct, @@ -410,7 +431,7 @@ class WildcardVM(Source, Target): def match( self, - other: Optional[str], + other: str, *, system_info: FullSystemInfo, source: Optional[VMToken]=None @@ -443,7 +464,7 @@ class AnyVM(Source, Target): def match( self, - other: Optional[str], + other: str, *, system_info: FullSystemInfo, source: Optional[VMToken]=None @@ -476,7 +497,7 @@ class TypeVM(Source, Target): def match( self, - other: Optional[str], + other: str, *, system_info: FullSystemInfo, source: Optional[VMToken]=None @@ -499,7 +520,7 @@ class TagVM(Source, Target): def match( self, - other: Optional[str], + other: str, *, system_info: FullSystemInfo, source: Optional[VMToken]=None @@ -522,7 +543,7 @@ class DispVM(Target, Redirect, IntendedTarget): def match( self, - other: Optional[str], + other: str, *, system_info: FullSystemInfo, source: Optional[VMToken]=None @@ -556,7 +577,7 @@ class DispVMTemplate(Source, Target, Redirect, IntendedTarget): def match( self, - other: Optional[str], + other: str, *, system_info: FullSystemInfo, source: Optional[VMToken]=None @@ -590,14 +611,13 @@ class DispVMTag(Source, Target): def match( self, - other: Optional[str], + other: str, *, system_info: FullSystemInfo, source: Optional[VMToken]=None ) -> bool: - if isinstance(other, DispVM): - assert source is not None - other = other.get_dispvm_template(source, system_info=system_info) + if isinstance(other, DispVM) and source is not None: + return self == other.get_dispvm_template(source, system_info=system_info) if not isinstance(other, DispVMTemplate): # 1) original other may have been neither @dispvm: nor @dispvm diff --git a/qrexec/tests/policy_parser.py b/qrexec/tests/policy_parser.py index e0e9a112..29370a3e 100644 --- a/qrexec/tests/policy_parser.py +++ b/qrexec/tests/policy_parser.py @@ -38,6 +38,7 @@ "default_dispvm": "default-dvm", "template_for_dispvms": False, "power_state": "Running", + "uuid": "00000000-0000-0000-0000-000000000000", }, "test-vm1": { "tags": ["tag1", "tag2"], @@ -45,6 +46,7 @@ "default_dispvm": "default-dvm", "template_for_dispvms": False, "power_state": "Running", + "uuid": "c9024a97-9b15-46cc-8341-38d75d5d421b", }, "test-vm2": { "tags": ["tag2"], @@ -52,6 +54,7 @@ "default_dispvm": "default-dvm", "template_for_dispvms": False, "power_state": "Running", + "uuid": "b3eb69d0-f9d9-4c3c-ad5c-454500303ea4", }, "test-vm3": { "tags": ["tag3"], @@ -59,6 +62,7 @@ "default_dispvm": "default-dvm", "template_for_dispvms": True, "power_state": "Halted", + "uuid": "fa6d56e8-a89d-4106-aa62-22e172a43c8b", }, "default-dvm": { "tags": [], @@ -66,6 +70,7 @@ "default_dispvm": "default-dvm", "template_for_dispvms": True, "power_state": "Halted", + "uuid": "f3e538bd-4427-4697-bed7-45ef3270df21", }, "test-invalid-dvm": { "tags": ["tag1", "tag2"], @@ -73,6 +78,7 @@ "default_dispvm": "test-vm1", "template_for_dispvms": False, "power_state": "Halted", + "uuid": "c4fa3586-a6b6-4dc4-bdda-c9e7375a12b5", }, "test-no-dvm": { "tags": ["tag1", "tag2"], @@ -80,6 +86,7 @@ "default_dispvm": None, "template_for_dispvms": False, "power_state": "Halted", + "uuid": "53a450b9-a454-4416-8adb-46812257ad29", }, "test-template": { "tags": ["tag1", "tag2"], @@ -87,6 +94,7 @@ "default_dispvm": "default-dvm", "template_for_dispvms": False, "power_state": "Halted", + "uuid": "a9fe2b04-9fd5-4e95-be20-162433d64de0", }, "test-standalone": { "tags": ["tag1", "tag2"], @@ -94,6 +102,7 @@ "default_dispvm": "default-dvm", "template_for_dispvms": False, "power_state": "Halted", + "uuid": "6d7a02b5-532b-467f-b9fb-6596bae03c33", }, }, } @@ -110,6 +119,11 @@ async def __call__(self, *args, **kwargs): class TC_00_VMToken(unittest.TestCase): + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + for i, j in SYSTEM_INFO["domains"].items(): + j["name"] = i + def test_010_Source(self): # with self.assertRaises(exc.PolicySyntaxError): # parser.Source(None) @@ -120,6 +134,11 @@ def test_010_Source(self): parser.Source("*") with self.assertRaises(exc.PolicySyntaxError): parser.Source("@default") + parser.Source("uuid:d8a249f1-b02b-4944-a9e5-437def2fbe2c") + with self.assertRaises(exc.PolicySyntaxError): + parser.Source("@uuid:") + with self.assertRaises(exc.PolicySyntaxError): + parser.Source("@uuid:d8a249f1-b02b-4944-a9e5-437def2fbe2c") parser.Source("@type:AppVM") parser.Source("@tag:tag1") with self.assertRaises(exc.PolicySyntaxError): @@ -150,6 +169,7 @@ def test_020_Target(self): parser.Target("@dispvm") parser.Target("@dispvm:default-dvm") parser.Target("@dispvm:@tag:tag3") + parser.Target("uuid:d8a249f1-b02b-4944-a9e5-437def2fbe2c") with self.assertRaises(exc.PolicySyntaxError): parser.Target("@invalid") @@ -163,19 +183,22 @@ def test_020_Target(self): parser.Target("@type:") def test_021_Target_expand(self): - self.assertCountEqual( - parser.Target("test-vm1").expand(system_info=SYSTEM_INFO), + self.assertEqual( + list(parser.Target("test-vm1").expand(system_info=SYSTEM_INFO)), ["test-vm1"], ) - self.assertCountEqual( - parser.Target("@adminvm").expand(system_info=SYSTEM_INFO), + self.assertEqual( + list(parser.Target("@adminvm").expand(system_info=SYSTEM_INFO)), ["@adminvm"], ) - self.assertCountEqual( - parser.Target("dom0").expand(system_info=SYSTEM_INFO), ["@adminvm"] + self.assertEqual( + list(parser.Target("dom0").expand(system_info=SYSTEM_INFO)), ["@adminvm"] ) - self.assertCountEqual( - parser.Target("@anyvm").expand(system_info=SYSTEM_INFO), + self.assertEqual( + list(parser.Target("uuid:00000000-0000-0000-0000-000000000000").expand(system_info=SYSTEM_INFO)), ["@adminvm"] + ) + self.assertEqual( + list(parser.Target("@anyvm").expand(system_info=SYSTEM_INFO)), [ "test-vm1", "test-vm2", @@ -286,6 +309,7 @@ def test_030_Redirect(self): parser.Redirect("test-vm1") parser.Redirect("@adminvm") parser.Redirect("dom0") + parser.Redirect("uuid:00000000-0000-0000-0000-000000000000") with self.assertRaises(exc.PolicySyntaxError): parser.Redirect("@anyvm") with self.assertRaises(exc.PolicySyntaxError): @@ -313,6 +337,7 @@ def test_030_Redirect(self): parser.Redirect("@type:") def test_040_IntendedTarget(self): + parser.IntendedTarget("uuid:00000000-0000-0000-0000-000000000000") parser.IntendedTarget("test-vm1") parser.IntendedTarget("@adminvm") parser.IntendedTarget("dom0") @@ -344,6 +369,10 @@ def test_040_IntendedTarget(self): def test_100_match_single(self): # pytest: disable=no-self-use cases = [ + ("uuid:00000000-0000-0000-0000-000000000000", "@adminvm", True), + ("uuid:00000000-0000-0000-0000-000000000000", "dom0", True), + ("uuid:00000000-0000-0000-0000-000000000000", "@dispvm:default-dvm", False), + ("uuid:00000000-0000-0000-0000-000000000000", "test-vm1", False), ("@anyvm", "test-vm1", True), ("@anyvm", "@default", True), ("@default", "@default", True), diff --git a/qrexec/utils.py b/qrexec/utils.py index 348b36d0..603c7228 100644 --- a/qrexec/utils.py +++ b/qrexec/utils.py @@ -115,6 +115,7 @@ class SystemInfoEntry(TypedDict): power_state: str icon: str guivm: Optional[str] + uuid: Optional[str] SystemInfo: 'TypeAlias' = Dict[str, SystemInfoEntry] @@ -136,7 +137,15 @@ def get_system_info() -> FullSystemInfo: """ system_info = qubesd_call("dom0", "internal.GetSystemInfo") - return cast(SystemInfo, json.loads(system_info.decode("utf-8"))) + system_info_decoded = cast(FullSystemInfo, json.loads(system_info.decode("utf-8"))) + inner = system_info_decoded["domains"] + for i, j in list(inner.items()): + j["name"] = i + try: + inner["uuid:" + j["uuid"]] = j + except KeyError: + pass + return system_info_decoded def prepare_subprocess_kwds(input: object) -> Dict[str, object]: